diff --git a/packages/content-fetch/src/job.ts b/packages/content-fetch/src/job.ts index 7c4f21651..7e7681a73 100644 --- a/packages/content-fetch/src/job.ts +++ b/packages/content-fetch/src/job.ts @@ -1,12 +1,32 @@ import { Queue } from 'bullmq' import { redis } from './redis' -const QUEUE_NAME = 'omnivore-content-fetch' +const QUEUE_NAME = 'omnivore-backend-queue' +const JOB_NAME = 'save-page' interface savePageJob { url: string userId: string data: unknown + isRss: boolean + isImport: boolean +} + +const getPriority = (job: savePageJob): number => { + // we want to prioritized jobs by the expected time to complete + // lower number means higher priority + // priority 1: jobs that are expected to finish immediately + // priority 5: jobs that are expected to finish in less than 10 second + // priority 10: jobs that are expected to finish in less than 10 minutes + // priority 100: jobs that are expected to finish in less than 1 hour + if (job.isRss) { + return 10 + } + if (job.isImport) { + return 100 + } + + return 5 } const createQueue = (): Queue | undefined => { @@ -22,12 +42,13 @@ export const queueSavePageJob = async (savePageJobs: savePageJob[]) => { } const jobs = savePageJobs.map((job) => ({ - name: 'save-page', + name: JOB_NAME, data: job.data, opts: { jobId: `${job.userId}-${job.url}`, removeOnComplete: true, removeOnFail: true, + priority: getPriority(job), }, })) diff --git a/packages/content-fetch/src/redis.ts b/packages/content-fetch/src/redis.ts index 900bf958c..433826498 100644 --- a/packages/content-fetch/src/redis.ts +++ b/packages/content-fetch/src/redis.ts @@ -11,35 +11,14 @@ export const redis = new Redis(url || 'redis://localhost:6379', { rejectUnauthorized: false, // for self-signed certs } : undefined, - reconnectOnError: (err) => { - const targetErrors = [/READONLY/, /ETIMEDOUT/] - - targetErrors.forEach((targetError) => { - if (targetError.test(err.message)) { - // Only reconnect when the error contains the keyword - return true - } - }) - - return false - }, - retryStrategy: (times) => { - if (times > 10) { - // End reconnecting after a specific number of tries and flush all commands with a individual error - return null - } - - // reconnect after - return Math.min(times * 50, 2000) - }, + maxRetriesPerRequest: null, + offlineQueue: false, }) -// graceful shutdown -process.on('SIGINT', () => { - ;(async () => { - console.log('SIGINT signal received: closing HTTP server') - await redis.quit() - console.log('redis connection closed') - process.exit() - })() +redis.on('connect', () => { + console.log('Redis connected') +}) + +redis.on('error', (err) => { + console.error('Redis error', err) }) diff --git a/packages/content-fetch/src/request_handler.ts b/packages/content-fetch/src/request_handler.ts index 9959f639b..748d4366b 100644 --- a/packages/content-fetch/src/request_handler.ts +++ b/packages/content-fetch/src/request_handler.ts @@ -52,16 +52,18 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { const body = req.body - let users = body.users || [] // users is used when saving article for multiple users + // users is used when saving article for multiple users + let users = body.users || [] const userId = body.userId const folder = body.folder + // userId is used when saving article for a single user if (userId) { users = [ { id: userId, folder: body.folder, }, - ] // userId is used when saving article for a single user + ] } const articleSavingRequestId = body.saveRequestId const state = body.state @@ -125,6 +127,8 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { savedAt, publishedAt, }, + isRss: !!rssFeedUrl, + isImport: !!taskId, })) const result = await queueSavePageJob(savePageJobs)