From 5bd272dde0f92955e7a0b051dc56909211bbae35 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Wed, 21 Aug 2024 12:08:23 +0800 Subject: [PATCH] use one queue with different priority for fetching content of rss feed item or saved url --- packages/api/src/jobs/rss/refreshFeed.ts | 10 ++- packages/api/src/queue-processor.ts | 2 - .../src/services/create_page_save_request.ts | 4 +- packages/api/src/utils/createTask.ts | 46 ++++------ packages/content-fetch/src/app.ts | 86 ++++++++----------- packages/content-fetch/src/job.ts | 15 ++-- packages/content-fetch/src/worker.ts | 39 ++------- 7 files changed, 78 insertions(+), 124 deletions(-) diff --git a/packages/api/src/jobs/rss/refreshFeed.ts b/packages/api/src/jobs/rss/refreshFeed.ts index 7882522da..43cacb011 100644 --- a/packages/api/src/jobs/rss/refreshFeed.ts +++ b/packages/api/src/jobs/rss/refreshFeed.ts @@ -35,6 +35,7 @@ interface RefreshFeedRequest { fetchContentTypes: FetchContentType[] folders: FolderType[] refreshContext?: RSSRefreshContext + priority?: 'low' | 'high' } export const isRefreshFeedRequest = (data: any): data is RefreshFeedRequest => { @@ -332,7 +333,8 @@ const createTask = async ( const fetchContentAndCreateItem = async ( users: UserConfig[], feedUrl: string, - item: RssFeedItem + item: RssFeedItem, + priority = 'low' as 'low' | 'high' ) => { const data: FetchContentJobData = { users, @@ -342,7 +344,7 @@ const fetchContentAndCreateItem = async ( rssFeedUrl: feedUrl, savedAt: item.isoDate, publishedAt: item.isoDate, - priority: 'low', + priority, } try { @@ -703,6 +705,7 @@ export const _refreshFeed = async (request: RefreshFeedRequest) => { fetchContentTypes, folders, refreshContext, + priority, } = request logger.info('Processing feed', feedUrl, { refreshContext: refreshContext }) @@ -771,7 +774,8 @@ export const _refreshFeed = async (request: RefreshFeedRequest) => { await fetchContentAndCreateItem( Array.from(task.users.values()), feedUrl, - task.item + task.item, + priority ) } diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index 9468321a7..aa5f2e7ed 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -84,8 +84,6 @@ import { logger } from './utils/logger' export const BACKEND_QUEUE_NAME = 'omnivore-backend-queue' export const CONTENT_FETCH_QUEUE = 'omnivore-content-fetch-queue' -export const CONTENT_FETCH_SLOW_QUEUE = 'omnivore-content-fetch-slow-queue' -export const CONTENT_FETCH_RSS_QUEUE = 'omnivore-content-fetch-rss-queue' export const JOB_VERSION = 'v001' diff --git a/packages/api/src/services/create_page_save_request.ts b/packages/api/src/services/create_page_save_request.ts index 1c2723a99..cde6140c3 100644 --- a/packages/api/src/services/create_page_save_request.ts +++ b/packages/api/src/services/create_page_save_request.ts @@ -57,7 +57,7 @@ const addRecentSavedItem = async (userId: string) => { // default: use normal queue const getPriorityByRateLimit = async ( userId: string -): Promise<'low' | 'high' | undefined> => { +): Promise<'low' | 'high'> => { const redisClient = redisDataSource.redisClient if (redisClient) { const oneMinuteAgo = Date.now() - 60 * 1000 @@ -75,6 +75,8 @@ const getPriorityByRateLimit = async ( logger.error('Failed to get priority by rate limit', { userId, error }) } } + + return 'high' } export const validateUrl = (url: string): URL => { diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index c35b39da7..4e6a4f862 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -65,13 +65,7 @@ import { UploadContentJobData, UPLOAD_CONTENT_JOB, } from '../jobs/upload_content' -import { - CONTENT_FETCH_QUEUE, - CONTENT_FETCH_RSS_QUEUE, - CONTENT_FETCH_SLOW_QUEUE, - getQueue, - JOB_VERSION, -} from '../queue-processor' +import { CONTENT_FETCH_QUEUE, getQueue, JOB_VERSION } from '../queue-processor' import { redisDataSource } from '../redis_data_source' import { writeDigest } from '../services/digest' import { signFeatureToken } from '../services/features' @@ -102,18 +96,21 @@ export const getJobPriority = (jobName: string): number => { case SYNC_READ_POSITIONS_JOB_NAME: case SEND_EMAIL_JOB: case UPDATE_HOME_JOB: - case FETCH_CONTENT_JOB: + case `${FETCH_CONTENT_JOB}_high`: return 1 case TRIGGER_RULE_JOB_NAME: case CALL_WEBHOOK_JOB_NAME: case AI_SUMMARIZE_JOB_NAME: case PROCESS_YOUTUBE_VIDEO_JOB_NAME: + case `${FETCH_CONTENT_JOB}_low`: + case `${FETCH_CONTENT_JOB}_rss_high`: return 5 case BULK_ACTION_JOB_NAME: case `${REFRESH_FEED_JOB_NAME}_high`: case PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME: case UPLOAD_CONTENT_JOB: case SCORE_LIBRARY_ITEM_JOB: + case `${FETCH_CONTENT_JOB}_rss_low`: return 10 case `${REFRESH_FEED_JOB_NAME}_low`: case EXPORT_ITEM_JOB_NAME: @@ -336,7 +333,7 @@ export interface FetchContentJobData { folder?: string libraryItemId: string }> - priority?: 'low' | 'high' + priority: 'low' | 'high' state?: ArticleSavingRequestStatus labels?: Array locale?: string @@ -360,39 +357,32 @@ export interface FetchContentJobData { export const enqueueFetchContentJob = async ( data: FetchContentJobData ): Promise => { - const getQueueName = (data: FetchContentJobData) => { - if (data.rssFeedUrl) { - return CONTENT_FETCH_RSS_QUEUE - } - - if (data.priority === 'low') { - return CONTENT_FETCH_SLOW_QUEUE - } - - return CONTENT_FETCH_QUEUE - } - - const queueName = getQueueName(data) - const queue = await getQueue(queueName) + const queue = await getQueue(CONTENT_FETCH_QUEUE) if (!queue) { throw new Error('No queue found') } + const jobName = `${FETCH_CONTENT_JOB}${data.rssFeedUrl ? '_rss' : ''}_${ + data.priority + }` + // sort the data to make sure the hash is consistent const sortedData = JSON.stringify(data, Object.keys(data).sort()) const jobId = `${FETCH_CONTENT_JOB}_${stringToHash( sortedData )}_${JOB_VERSION}` + const priority = getJobPriority(jobName) + const job = await queue.add(FETCH_CONTENT_JOB, data, { - priority: getJobPriority(FETCH_CONTENT_JOB), - attempts: data.priority === 'low' ? 2 : 3, + jobId, + removeOnComplete: true, + removeOnFail: true, + priority, + attempts: 2, backoff: { type: 'exponential', delay: 2000, }, - jobId, - removeOnComplete: true, - removeOnFail: true, }) if (!job || !job.id) { diff --git a/packages/content-fetch/src/app.ts b/packages/content-fetch/src/app.ts index 3cf0e4982..8ffb1ef6b 100644 --- a/packages/content-fetch/src/app.ts +++ b/packages/content-fetch/src/app.ts @@ -2,10 +2,10 @@ import { RedisDataSource } from '@omnivore/utils' import { JobType } from 'bullmq' import express, { Express } from 'express' import asyncHandler from 'express-async-handler' -import { createWorkers, getQueue } from './worker' +import { createWorker, getQueue, QUEUE } from './worker' const main = () => { - console.log('[worker]: starting workers') + console.log('Starting worker...') const app: Express = express() const port = process.env.PORT || 3002 @@ -22,16 +22,7 @@ const main = () => { }, }) - const workers = createWorkers(redisDataSource) - - const closeWorkers = async () => { - await Promise.all( - workers.map(async (worker) => { - await worker.close() - console.log('worker closed:', worker.name) - }) - ) - } + const worker = createWorker(redisDataSource) // respond healthy to auto-scaler. app.get('/_ah/health', (req, res) => res.sendStatus(200)) @@ -39,9 +30,10 @@ const main = () => { app.get( '/lifecycle/prestop', asyncHandler(async (_req, res) => { - console.log('prestop lifecycle hook called.') - await closeWorkers() - console.log('workers closed') + console.log('Prestop lifecycle hook called.') + + await worker.close() + console.log('Worker closed') res.sendStatus(200) }) @@ -52,37 +44,31 @@ const main = () => { asyncHandler(async (_, res) => { let output = '' - for (const worker of workers) { - const queueName = worker.name - const queue = await getQueue( - redisDataSource.queueRedisClient, - queueName - ) + const queue = await getQueue(redisDataSource.queueRedisClient) - const jobsTypes: Array = [ - 'active', - 'failed', - 'completed', - 'prioritized', - ] - const counts = await queue.getJobCounts(...jobsTypes) + const jobsTypes: Array = [ + 'active', + 'failed', + 'completed', + 'prioritized', + ] + const counts = await queue.getJobCounts(...jobsTypes) - jobsTypes.forEach((metric) => { - output += `# TYPE omnivore_queue_messages_${metric} gauge\n` - output += `omnivore_queue_messages_${metric}{queue="${queueName}"} ${counts[metric]}\n` - }) + jobsTypes.forEach((metric) => { + output += `# TYPE omnivore_queue_messages_${metric} gauge\n` + output += `omnivore_queue_messages_${metric}{queue="${QUEUE}"} ${counts[metric]}\n` + }) - // Export the age of the oldest prioritized job in the queue - const oldestJobs = await queue.getJobs(['prioritized'], 0, 1, true) - if (oldestJobs.length > 0) { - const currentTime = Date.now() - const ageInSeconds = (currentTime - oldestJobs[0].timestamp) / 1000 - output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` - output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${queueName}"} ${ageInSeconds}\n` - } else { - output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` - output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${queueName}"} ${0}\n` - } + // Export the age of the oldest prioritized job in the queue + const oldestJobs = await queue.getJobs(['prioritized'], 0, 1, true) + if (oldestJobs.length > 0) { + const currentTime = Date.now() + const ageInSeconds = (currentTime - oldestJobs[0].timestamp) / 1000 + output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` + output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE}"} ${ageInSeconds}\n` + } else { + output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` + output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE}"} ${0}\n` } res.status(200).setHeader('Content-Type', 'text/plain').send(output) @@ -90,27 +76,27 @@ const main = () => { ) const server = app.listen(port, () => { - console.log(`[worker]: Workers started`) + console.log('Worker started') }) const gracefulShutdown = async (signal: string) => { - console.log(`[worker]: Received ${signal}, closing server...`) + console.log(`Received ${signal}, closing server...`) await new Promise((resolve) => { server.close((err) => { - console.log('[worker]: Express server closed') + console.log('Express server closed') if (err) { - console.log('[worker]: error stopping server', { err }) + console.log('Error stopping server', { err }) } resolve() }) }) - await closeWorkers() - console.log('[worker]: Workers closed') + await worker.close() + console.log('Worker closed') await redisDataSource.shutdown() - console.log('[worker]: Redis connection closed') + console.log('Redis connection closed') process.exit(0) } diff --git a/packages/content-fetch/src/job.ts b/packages/content-fetch/src/job.ts index ae282bfb7..e3465b922 100644 --- a/packages/content-fetch/src/job.ts +++ b/packages/content-fetch/src/job.ts @@ -38,23 +38,24 @@ const getPriority = (job: SavePageJob): number => { // priority 5: jobs that are expected to finish in less than 10 second // priority 10: jobs that are expected to finish in less than 10 minutes // priority 100: jobs that are expected to finish in less than 1 hour - if (job.isRss) { - return 10 - } if (job.isImport) { return 100 } - return job.priority === 'low' ? 10 : 1 + if (job.isRss) { + return job.priority === 'low' ? 10 : 5 + } + + return job.priority === 'low' ? 5 : 1 } const getAttempts = (job: SavePageJob): number => { - if (job.isRss || job.isImport) { - // we don't want to retry rss or import jobs + if (job.isImport) { + // we don't want to retry import jobs return 1 } - return 3 + return job.isRss ? 2 : 3 } const getOpts = (job: SavePageJob): BulkJobOptions => { diff --git a/packages/content-fetch/src/worker.ts b/packages/content-fetch/src/worker.ts index ffa12962e..731178c94 100644 --- a/packages/content-fetch/src/worker.ts +++ b/packages/content-fetch/src/worker.ts @@ -2,14 +2,11 @@ import { RedisDataSource } from '@omnivore/utils' import { Job, Queue, QueueEvents, RedisClient, Worker } from 'bullmq' import { JobData, processFetchContentJob } from './request_handler' -const FAST_QUEUE = 'omnivore-content-fetch-queue' -const SLOW_QUEUE = 'omnivore-content-fetch-slow-queue' -const RSS_QUEUE = 'omnivore-content-fetch-rss-queue' -const QUEUE_NAMES = [FAST_QUEUE, SLOW_QUEUE, RSS_QUEUE] as const +export const QUEUE = 'omnivore-content-fetch-queue' export const getQueue = async ( connection: RedisClient, - queueName: string + queueName = QUEUE ): Promise => { const queue = new Queue(queueName, { connection, @@ -30,27 +27,10 @@ export const getQueue = async ( return queue } -const createWorker = (redisDataSource: RedisDataSource, queueName: string) => { - const getLimiter = (queueName: string) => { - switch (queueName) { - case SLOW_QUEUE: - return { - max: 5, - duration: 1000, // 1 second - } - case RSS_QUEUE: - return { - max: 5, - duration: 1000, // 1 second - } - default: - return { - max: 100, - duration: 1000, // 1 second - } - } - } - +export const createWorker = ( + redisDataSource: RedisDataSource, + queueName = QUEUE +) => { const worker = new Worker( queueName, async (job: Job) => { @@ -60,7 +40,6 @@ const createWorker = (redisDataSource: RedisDataSource, queueName: string) => { { connection: redisDataSource.queueRedisClient, autorun: true, // start processing jobs immediately - limiter: getLimiter(queueName), } ) @@ -90,9 +69,3 @@ const createWorker = (redisDataSource: RedisDataSource, queueName: string) => { return worker } - -export const createWorkers = (redisDataSource: RedisDataSource) => { - return QUEUE_NAMES.map((queueName) => - createWorker(redisDataSource, queueName) - ) -}