From 87b4ec503e44776069e94bb083ff345272f4d3e8 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Mon, 15 Jul 2024 21:47:27 +0800 Subject: [PATCH] enqueue content-fetch task to the queue --- packages/api/src/jobs/bulk_action.ts | 4 +- packages/api/src/jobs/rss/refreshAllFeeds.ts | 6 +- packages/api/src/jobs/rss/refreshFeed.ts | 11 +- packages/api/src/queue-processor.ts | 27 +-- .../src/services/create_page_save_request.ts | 18 +- packages/api/src/utils/createTask.ts | 163 +++++++----------- packages/api/test/util.ts | 4 +- packages/content-fetch/src/worker.ts | 5 +- 8 files changed, 103 insertions(+), 135 deletions(-) diff --git a/packages/api/src/jobs/bulk_action.ts b/packages/api/src/jobs/bulk_action.ts index 28ee8461c..a24f9b597 100644 --- a/packages/api/src/jobs/bulk_action.ts +++ b/packages/api/src/jobs/bulk_action.ts @@ -1,5 +1,5 @@ import { BulkActionType } from '../generated/graphql' -import { getBackendQueue } from '../queue-processor' +import { getQueue } from '../queue-processor' import { batchUpdateLibraryItems } from '../services/library_item' import { logger } from '../utils/logger' @@ -18,7 +18,7 @@ export const BULK_ACTION_JOB_NAME = 'bulk-action' export const bulkAction = async (data: BulkActionData) => { const { userId, action, query, labelIds, args, batchSize, count } = data - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { throw new Error('Queue not initialized') } diff --git a/packages/api/src/jobs/rss/refreshAllFeeds.ts b/packages/api/src/jobs/rss/refreshAllFeeds.ts index a75385187..29ef6f894 100644 --- a/packages/api/src/jobs/rss/refreshAllFeeds.ts +++ b/packages/api/src/jobs/rss/refreshAllFeeds.ts @@ -1,7 +1,7 @@ import { Job } from 'bullmq' import { DataSource } from 'typeorm' import { v4 as uuid } from 'uuid' -import { getBackendQueue, JOB_VERSION } from '../../queue-processor' +import { getQueue, JOB_VERSION } from '../../queue-processor' import { validateUrl } from '../../services/create_page_save_request' import { getJobPriority, RssSubscriptionGroup } from '../../utils/createTask' import { stringToHash } from '../../utils/helpers' @@ -124,7 +124,7 @@ const updateSubscriptionGroup = async ( } export const queueRSSRefreshAllFeedsJob = async () => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return false } @@ -144,7 +144,7 @@ export const queueRSSRefreshFeedJob = async ( payload: any, options = { priority: 'low' as QueuePriority } ): Promise => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } diff --git a/packages/api/src/jobs/rss/refreshFeed.ts b/packages/api/src/jobs/rss/refreshFeed.ts index 78069a118..e0418a91c 100644 --- a/packages/api/src/jobs/rss/refreshFeed.ts +++ b/packages/api/src/jobs/rss/refreshFeed.ts @@ -4,7 +4,6 @@ import { parseHTML } from 'linkedom' import Parser, { Item } from 'rss-parser' import { v4 as uuid } from 'uuid' import { FetchContentType } from '../../entity/subscription' -import { env } from '../../env' import { ArticleSavingRequestStatus } from '../../generated/graphql' import { redisDataSource } from '../../redis_data_source' import { validateUrl } from '../../services/create_page_save_request' @@ -14,7 +13,7 @@ import { updateSubscriptions, } from '../../services/update_subscription' import { findActiveUser } from '../../services/user' -import createHttpTaskWithToken from '../../utils/createTask' +import { enqueueFetchContentJob } from '../../utils/createTask' import { cleanUrl } from '../../utils/helpers' import { createThumbnailProxyUrl } from '../../utils/imageproxy' import { logger } from '../../utils/logger' @@ -332,7 +331,7 @@ const fetchContentAndCreateItem = async ( feedUrl: string, item: RssFeedItem ) => { - const payload = { + const data = { users, source: 'rss-feeder', url: item.link.trim(), @@ -343,11 +342,7 @@ const fetchContentAndCreateItem = async ( } try { - const task = await createHttpTaskWithToken({ - queue: 'omnivore-rss-feed-queue', - taskHandlerUrl: env.queue.contentFetchGCFUrl, - payload, - }) + const task = await enqueueFetchContentJob(data) return !!task } catch (error) { logger.error('Error while creating task', error) diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index d9b762937..9b7ea8ff3 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -82,7 +82,8 @@ import { CACHED_READING_POSITION_PREFIX } from './services/cached_reading_positi import { getJobPriority } from './utils/createTask' import { logger } from './utils/logger' -export const QUEUE_NAME = 'omnivore-backend-queue' +export const BACKEND_QUEUE_NAME = 'omnivore-backend-queue' +export const CONTENT_FETCH_QUEUE_NAME = 'omnivore-content-fetch-queue' export const JOB_VERSION = 'v001' const jobLatency = new client.Histogram({ @@ -94,8 +95,8 @@ const jobLatency = new client.Histogram({ registerMetric(jobLatency) -export const getBackendQueue = async ( - name = QUEUE_NAME +export const getQueue = async ( + name = BACKEND_QUEUE_NAME ): Promise => { if (!redisDataSource.workerRedisClient) { throw new Error('Can not create queues, redis is not initialized') @@ -124,7 +125,7 @@ export const createJobId = (jobName: string, userId: string) => `${jobName}_${userId}_${JOB_VERSION}` export const getJob = async (jobId: string, queueName?: string) => { - const queue = await getBackendQueue(queueName) + const queue = await getQueue(queueName) if (!queue) { return } @@ -152,12 +153,12 @@ export const jobStateToTaskState = ( export const createWorker = (connection: ConnectionOptions) => new Worker( - QUEUE_NAME, + BACKEND_QUEUE_NAME, async (job: Job) => { const executeJob = async (job: Job) => { switch (job.name) { case 'refresh-all-feeds': { - const queue = await getBackendQueue() + const queue = await getQueue() const counts = await queue?.getJobCounts('prioritized') if (counts && counts.wait > 1000) { return @@ -239,7 +240,7 @@ export const createWorker = (connection: ConnectionOptions) => ) const setupCronJobs = async () => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { logger.error('Unable to setup cron jobs. Queue is not available.') return @@ -278,7 +279,7 @@ const main = async () => { }) app.get('/metrics', async (_, res) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { res.sendStatus(400) return @@ -295,7 +296,7 @@ const main = async () => { jobsTypes.forEach((metric, idx) => { output += `# TYPE omnivore_queue_messages_${metric} gauge\n` - output += `omnivore_queue_messages_${metric}{queue="${QUEUE_NAME}"} ${counts[metric]}\n` + output += `omnivore_queue_messages_${metric}{queue="${BACKEND_QUEUE_NAME}"} ${counts[metric]}\n` }) if (redisDataSource.redisClient) { @@ -311,7 +312,7 @@ const main = async () => { ) if (cursor != '0') { output += `# TYPE omnivore_read_position_messages gauge\n` - output += `omnivore_read_position_messages{queue="${QUEUE_NAME}"} ${10_001}\n` + output += `omnivore_read_position_messages{queue="${BACKEND_QUEUE_NAME}"} ${10_001}\n` } else if (batch) { output += `# TYPE omnivore_read_position_messages gauge\n` output += `omnivore_read_position_messages{} ${batch.length}\n` @@ -324,10 +325,10 @@ const main = async () => { const currentTime = Date.now() const ageInSeconds = (currentTime - oldestJobs[0].timestamp) / 1000 output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` - output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE_NAME}"} ${ageInSeconds}\n` + output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${BACKEND_QUEUE_NAME}"} ${ageInSeconds}\n` } else { output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` - output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE_NAME}"} ${0}\n` + output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${BACKEND_QUEUE_NAME}"} ${0}\n` } const metrics = await getMetrics() @@ -357,7 +358,7 @@ const main = async () => { await setupCronJobs() - const queueEvents = new QueueEvents(QUEUE_NAME, { + const queueEvents = new QueueEvents(BACKEND_QUEUE_NAME, { connection: workerRedisClient, }) diff --git a/packages/api/src/services/create_page_save_request.ts b/packages/api/src/services/create_page_save_request.ts index 86d45765d..3d9a8ae6d 100644 --- a/packages/api/src/services/create_page_save_request.ts +++ b/packages/api/src/services/create_page_save_request.ts @@ -9,7 +9,7 @@ import { } from '../generated/graphql' import { createPubSubClient, PubsubClient } from '../pubsub' import { redisDataSource } from '../redis_data_source' -import { enqueueParseRequest } from '../utils/createTask' +import { enqueueFetchContentJob } from '../utils/createTask' import { cleanUrl, generateSlug } from '../utils/helpers' import { logger } from '../utils/logger' import { createOrUpdateLibraryItem } from './library_item' @@ -160,18 +160,22 @@ export const createPageSaveRequest = async ({ logger.debug('priority', { priority }) // enqueue task to parse item - await enqueueParseRequest({ + await enqueueFetchContentJob({ url, - userId, - saveRequestId: libraryItem.id, + users: [ + { + folder, + id: userId, + libraryItemId: libraryItem.id, + }, + ], priority, state, labels, locale, timezone, - savedAt, - publishedAt, - folder, + savedAt: savedAt?.toISOString(), + publishedAt: publishedAt?.toISOString(), rssFeedUrl: subscription, }) diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index e95de4fd5..fcaeb68f5 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -65,7 +65,11 @@ import { UploadContentJobData, UPLOAD_CONTENT_JOB, } from '../jobs/upload_content' -import { getBackendQueue, JOB_VERSION } from '../queue-processor' +import { + CONTENT_FETCH_QUEUE_NAME, + getQueue, + JOB_VERSION, +} from '../queue-processor' import { redisDataSource } from '../redis_data_source' import { writeDigest } from '../services/digest' import { signFeatureToken } from '../services/features' @@ -78,6 +82,8 @@ import View = google.cloud.tasks.v2.Task.View // Instantiates a client. const client = new CloudTasksClient() +const FETCH_CONTENT_JOB = 'fetch-content' + /** * we want to prioritized jobs by the expected time to complete * lower number means higher priority @@ -94,11 +100,13 @@ export const getJobPriority = (jobName: string): number => { case SYNC_READ_POSITIONS_JOB_NAME: case SEND_EMAIL_JOB: case UPDATE_HOME_JOB: + case `${FETCH_CONTENT_JOB}_high`: return 1 case TRIGGER_RULE_JOB_NAME: case CALL_WEBHOOK_JOB_NAME: case AI_SUMMARIZE_JOB_NAME: case PROCESS_YOUTUBE_VIDEO_JOB_NAME: + case `${FETCH_CONTENT_JOB}_low`: return 5 case BULK_ACTION_JOB_NAME: case `${REFRESH_FEED_JOB_NAME}_high`: @@ -320,6 +328,24 @@ export const deleteTask = async ( } } +export interface fetchContentJobData { + url: string + users: Array<{ + id: string + folder?: string + libraryItemId: string + }> + priority?: 'low' | 'high' + state?: ArticleSavingRequestStatus + labels?: Array + locale?: string + timezone?: string + savedAt?: string + publishedAt?: string + folder?: string + rssFeedUrl?: string +} + /** * Enqueues the task for the article content parsing with Puppeteer by URL * @param url - URL address of the article to parse @@ -329,88 +355,27 @@ export const deleteTask = async ( * @param queue - Queue name * @returns Name of the task created */ -export const enqueueParseRequest = async ({ - url, - userId, - saveRequestId, - priority = 'high', - queue = env.queue.name, - state, - labels, - locale, - timezone, - savedAt, - publishedAt, - folder, - rssFeedUrl, -}: { - url: string - userId: string - saveRequestId: string - priority?: 'low' | 'high' - queue?: string - state?: ArticleSavingRequestStatus - labels?: CreateLabelInput[] - locale?: string - timezone?: string - savedAt?: Date - publishedAt?: Date - folder?: string - rssFeedUrl?: string -}): Promise => { - const { GOOGLE_CLOUD_PROJECT } = process.env - const payload = { - url, - userId, - saveRequestId, - state, - labels, - locale, - timezone, - savedAt, - publishedAt, - folder, - rssFeedUrl, - priority, +export const enqueueFetchContentJob = async ( + data: fetchContentJobData +): Promise => { + const priority = data.priority || 'high' + + const queue = await getQueue(CONTENT_FETCH_QUEUE_NAME) + if (!queue) { + throw new Error('No queue found') } - // If there is no Google Cloud Project Id exposed, it means that we are in local environment - if (env.dev.isLocal || !GOOGLE_CLOUD_PROJECT) { - if (env.queue.contentFetchUrl) { - // Calling the handler function directly. - setTimeout(() => { - axios.post(env.queue.contentFetchUrl, payload).catch((error) => { - logError(error) - logger.error( - `Error occurred while requesting local puppeteer-parse function\nPlease, ensure your function is set up properly and running using "yarn start" from the "/pkg/gcf/puppeteer-parse" folder` - ) - }) - }, 0) - } - return '' - } - - // use GCF url for low priority tasks - const taskHandlerUrl = - priority === 'low' - ? env.queue.contentFetchGCFUrl - : env.queue.contentFetchUrl - - const createdTasks = await createHttpTaskWithToken({ - project: GOOGLE_CLOUD_PROJECT, - payload, - priority, - taskHandlerUrl, - queue, + const job = await queue.add(FETCH_CONTENT_JOB, data, { + priority: getJobPriority(`${FETCH_CONTENT_JOB}_${priority}`), + attempts: priority === 'high' ? 5 : 2, }) - if (!createdTasks || !createdTasks[0].name) { - logger.error(`Unable to get the name of the task`, { - payload, - createdTasks, - }) - throw new CreateTaskError(`Unable to get the name of the task`) + + if (!job || !job.id) { + logger.error('Error while enqueuing fetch-content job', data) + throw new Error('Error while enqueuing fetch-content job') } - return createdTasks[0].name + + return job.id } export const enqueueReminder = async ( @@ -629,7 +594,7 @@ export const enqueueExportToIntegration = async ( integrationId: string, userId: string ) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -647,7 +612,7 @@ export const enqueueThumbnailJob = async ( userId: string, libraryItemId: string ) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -714,7 +679,7 @@ export const enqueueRssFeedFetch = async ( } export const enqueueTriggerRuleJob = async (data: TriggerRuleJobData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -726,7 +691,7 @@ export const enqueueTriggerRuleJob = async (data: TriggerRuleJobData) => { } export const enqueueWebhookJob = async (data: CallWebhookJobData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -738,7 +703,7 @@ export const enqueueWebhookJob = async (data: CallWebhookJobData) => { } export const enqueueAISummarizeJob = async (data: AISummarizeJobData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -752,7 +717,7 @@ export const enqueueAISummarizeJob = async (data: AISummarizeJobData) => { export const enqueueProcessYouTubeVideo = async ( data: ProcessYouTubeVideoJobData ) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -767,7 +732,7 @@ export const enqueueProcessYouTubeVideo = async ( export const enqueueProcessYouTubeTranscript = async ( data: ProcessYouTubeTranscriptJobData ) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -780,7 +745,7 @@ export const enqueueProcessYouTubeTranscript = async ( } export const bulkEnqueueUpdateLabels = async (data: UpdateLabelsData[]) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return [] } @@ -806,7 +771,7 @@ export const bulkEnqueueUpdateLabels = async (data: UpdateLabelsData[]) => { } export const enqueueUpdateHighlight = async (data: UpdateHighlightData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -825,7 +790,7 @@ export const enqueueUpdateHighlight = async (data: UpdateHighlightData) => { } export const enqueueBulkAction = async (data: BulkActionData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -846,7 +811,7 @@ export const enqueueBulkAction = async (data: BulkActionData) => { } export const enqueueExportItem = async (jobData: ExportItemJobData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -862,7 +827,7 @@ export const enqueueExportItem = async (jobData: ExportItemJobData) => { } export const enqueueSendEmail = async (jobData: SendEmailJobData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -881,7 +846,7 @@ export const scheduledDigestJobOptions = ( }) export const removeDigestJobs = async (userId: string) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { throw new Error('No queue found') } @@ -911,7 +876,7 @@ export const enqueueCreateDigest = async ( data: CreateDigestData, schedule?: CreateDigestJobSchedule ): Promise => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { throw new Error('No queue found') } @@ -974,7 +939,7 @@ export const enqueueCreateDigest = async ( export const enqueueBulkUploadContentJob = async ( data: UploadContentJobData[] ) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return '' } @@ -998,7 +963,7 @@ export const updateHomeJobId = (userId: string) => `${UPDATE_HOME_JOB}_${userId}_${JOB_VERSION}` export const enqueueUpdateHomeJob = async (data: UpdateHomeJobData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -1016,7 +981,7 @@ export const updateScoreJobId = (userId: string) => `${SCORE_LIBRARY_ITEM_JOB}_${userId}_${JOB_VERSION}` export const enqueueScoreJob = async (data: ScoreLibraryItemJobData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -1034,7 +999,7 @@ export const enqueueGeneratePreviewContentJob = async ( libraryItemId: string, userId: string ) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -1056,7 +1021,7 @@ export const enqueueGeneratePreviewContentJob = async ( } export const enqueuePruneTrashJob = async (numDays: number) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -1075,7 +1040,7 @@ export const enqueuePruneTrashJob = async (numDays: number) => { } export const enqueueExpireFoldersJob = async () => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } diff --git a/packages/api/test/util.ts b/packages/api/test/util.ts index 6f043d98a..a20c12a76 100644 --- a/packages/api/test/util.ts +++ b/packages/api/test/util.ts @@ -4,7 +4,7 @@ import { nanoid } from 'nanoid' import supertest from 'supertest' import { v4 } from 'uuid' import { makeApolloServer } from '../src/apollo' -import { createWorker, QUEUE_NAME } from '../src/queue-processor' +import { BACKEND_QUEUE_NAME, createWorker } from '../src/queue-processor' import { createApp } from '../src/server' import { corsConfig } from '../src/utils/corsConfig' @@ -26,7 +26,7 @@ export const stopApolloServer = async () => { export const startWorker = (connection: ConnectionOptions) => { worker = createWorker(connection) - queueEvents = new QueueEvents(QUEUE_NAME, { + queueEvents = new QueueEvents(BACKEND_QUEUE_NAME, { connection, }) } diff --git a/packages/content-fetch/src/worker.ts b/packages/content-fetch/src/worker.ts index 72fe6a736..7f3dfc29b 100644 --- a/packages/content-fetch/src/worker.ts +++ b/packages/content-fetch/src/worker.ts @@ -38,7 +38,10 @@ const createWorker = (redisDataSource: RedisDataSource) => { { connection: redisDataSource.queueRedisClient, autorun: true, // start processing jobs immediately - lockDuration: 60_000, // 1 minute + limiter: { + max: 50, + duration: 1000, // 1 second + }, } ) }