only disconnect redis message queue client if it is different from cache client

This commit is contained in:
Hongbo Wu
2024-07-04 16:09:32 +08:00
parent 0a1b7345e9
commit 649463b785
4 changed files with 86 additions and 60 deletions

View File

@ -1,3 +1,4 @@
import { RedisDataSource } from '@omnivore/utils'
import * as chai from 'chai'
import { expect } from 'chai'
import chaiString from 'chai-string'
@ -11,13 +12,25 @@ chai.use(chaiString)
describe('Test csv importer', () => {
let stub: ImportContext
let redisDataSource: RedisDataSource
beforeEach(() => {
stub = stubImportCtx()
redisDataSource = new RedisDataSource({
cache: {
url: process.env.REDIS_URL,
cert: process.env.REDIS_CERT,
},
mq: {
url: process.env.MQ_REDIS_URL,
cert: process.env.MQ_REDIS_CERT,
},
})
stub = stubImportCtx(redisDataSource.cacheClient)
})
afterEach(async () => {
await stub.redisClient.quit()
await redisDataSource.shutdown()
})
describe('Load a simple CSV file', () => {

View File

@ -1,4 +1,5 @@
import { Readability } from '@omnivore/readability'
import { RedisDataSource } from '@omnivore/utils'
import * as chai from 'chai'
import { expect } from 'chai'
import chaiString from 'chai-string'
@ -13,50 +14,70 @@ import { stubImportCtx } from '../util'
chai.use(chaiString)
describe('Load a simple _matter_history file', () => {
it('should find the URL of each row', async () => {
const urls: URL[] = []
const stream = fs.createReadStream('./test/matter/data/_matter_history.csv')
const stub = stubImportCtx()
stub.urlHandler = (ctx: ImportContext, url): Promise<void> => {
urls.push(url)
return Promise.resolve()
}
describe('matter importer', () => {
let stub: ImportContext
let redisDataSource: RedisDataSource
await importMatterHistoryCsv(stub, stream)
expect(stub.countFailed).to.equal(0)
expect(stub.countImported).to.equal(1)
expect(urls).to.eql([
new URL('https://www.bloomberg.com/features/2022-the-crypto-story/'),
])
beforeEach(() => {
redisDataSource = new RedisDataSource({
cache: {
url: process.env.REDIS_URL,
cert: process.env.REDIS_CERT,
},
mq: {
url: process.env.MQ_REDIS_URL,
cert: process.env.MQ_REDIS_CERT,
},
})
await stub.redisClient.quit()
})
})
describe('Load archive file', () => {
it('should find the URL of each row', async () => {
const urls: URL[] = []
const stream = fs.createReadStream('./test/matter/data/Archive.zip')
const stub = stubImportCtx()
stub.contentHandler = (
ctx: ImportContext,
url: URL,
title: string,
originalContent: string,
parseResult: Readability.ParseResult
): Promise<void> => {
urls.push(url)
return Promise.resolve()
}
await importMatterArchive(stub, stream)
expect(stub.countFailed).to.equal(0)
expect(stub.countImported).to.equal(1)
expect(urls).to.eql([
new URL('https://www.bloomberg.com/features/2022-the-crypto-story/'),
])
await stub.redisClient.quit()
stub = stubImportCtx(redisDataSource.cacheClient)
})
afterEach(async () => {
await redisDataSource.shutdown()
})
describe('Load a simple _matter_history file', () => {
it('should find the URL of each row', async () => {
const urls: URL[] = []
const stream = fs.createReadStream(
'./test/matter/data/_matter_history.csv'
)
stub.urlHandler = (ctx: ImportContext, url): Promise<void> => {
urls.push(url)
return Promise.resolve()
}
await importMatterHistoryCsv(stub, stream)
expect(stub.countFailed).to.equal(0)
expect(stub.countImported).to.equal(1)
expect(urls).to.eql([
new URL('https://www.bloomberg.com/features/2022-the-crypto-story/'),
])
})
})
describe('Load archive file', () => {
it('should find the URL of each row', async () => {
const urls: URL[] = []
const stream = fs.createReadStream('./test/matter/data/Archive.zip')
stub.contentHandler = (
ctx: ImportContext,
url: URL,
title: string,
originalContent: string,
parseResult: Readability.ParseResult
): Promise<void> => {
urls.push(url)
return Promise.resolve()
}
await importMatterArchive(stub, stream)
expect(stub.countFailed).to.equal(0)
expect(stub.countImported).to.equal(1)
expect(urls).to.eql([
new URL('https://www.bloomberg.com/features/2022-the-crypto-story/'),
])
})
})
})

View File

@ -1,19 +1,8 @@
import { Readability } from '@omnivore/readability'
import { RedisDataSource } from '@omnivore/utils'
import Redis from 'ioredis'
import { ArticleSavingRequestStatus, ImportContext } from '../src'
export const stubImportCtx = (): ImportContext => {
const redisDataSource = new RedisDataSource({
cache: {
url: process.env.REDIS_URL,
cert: process.env.REDIS_CERT,
},
mq: {
url: process.env.MQ_REDIS_URL,
cert: process.env.MQ_REDIS_CERT,
},
})
export const stubImportCtx = (redisClient: Redis): ImportContext => {
return {
userId: '',
countImported: 0,
@ -35,7 +24,7 @@ export const stubImportCtx = (): ImportContext => {
): Promise<void> => {
return Promise.resolve()
},
redisClient: redisDataSource.cacheClient,
redisClient,
taskId: '',
source: 'csv-importer',
}

View File

@ -28,9 +28,12 @@ export class RedisDataSource {
async shutdown(): Promise<void> {
try {
await this.queueRedisClient?.quit()
await this.cacheClient?.quit()
if (this.queueRedisClient !== this.cacheClient) {
await this.queueRedisClient.quit()
}
console.log('redis shutdown complete')
} catch (err) {
console.error('error while shutting down redis', err)