From e3eae1c96cba78d78f995c9270e982f9c042e62d Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Mon, 15 Jul 2024 21:14:27 +0800 Subject: [PATCH 01/19] create a worker to process content-fetch job --- packages/content-fetch/package.json | 14 +- packages/content-fetch/src/request_handler.ts | 79 ++++---- packages/content-fetch/src/worker.ts | 186 ++++++++++++++++++ yarn.lock | 5 + 4 files changed, 244 insertions(+), 40 deletions(-) create mode 100644 packages/content-fetch/src/worker.ts diff --git a/packages/content-fetch/package.json b/packages/content-fetch/package.json index 28f70ed19..087ded95f 100644 --- a/packages/content-fetch/package.json +++ b/packages/content-fetch/package.json @@ -7,15 +7,16 @@ "build/src" ], "dependencies": { - "bullmq": "^5.1.1", - "dotenv": "^8.2.0", - "express": "^4.17.1", - "posthog-node": "^3.6.3", "@google-cloud/functions-framework": "^3.0.0", "@google-cloud/storage": "^7.0.1", "@omnivore/puppeteer-parse": "^1.0.0", "@omnivore/utils": "1.0.0", - "@sentry/serverless": "^7.77.0" + "@sentry/serverless": "^7.77.0", + "bullmq": "^5.1.1", + "dotenv": "^8.2.0", + "express": "^4.17.1", + "express-async-handler": "^1.2.0", + "posthog-node": "^3.6.3" }, "devDependencies": { "chai": "^4.3.6", @@ -27,7 +28,8 @@ "lint": "eslint src --ext ts,js,tsx,jsx", "build": "tsc", "start": "node build/src/app.js", - "start_gcf": "functions-framework --port=9090 --target=puppeteer" + "start_gcf": "functions-framework --port=9090 --target=puppeteer", + "start_worker": "node build/src/worker.js" }, "volta": { "extends": "../../package.json" diff --git a/packages/content-fetch/src/request_handler.ts b/packages/content-fetch/src/request_handler.ts index 6a427434d..0fb6765e9 100644 --- a/packages/content-fetch/src/request_handler.ts +++ b/packages/content-fetch/src/request_handler.ts @@ -12,7 +12,7 @@ interface UserConfig { folder?: string } -interface RequestBody { +export interface JobData { url: string userId?: string saveRequestId: string @@ -175,36 +175,37 @@ const incrementContentFetchFailure = async ( } } -export const contentFetchRequestHandler: RequestHandler = async (req, res) => { +export const processFetchContentJob = async ( + redisDataSource: RedisDataSource, + data: JobData +) => { const functionStartTime = Date.now() - const body = req.body - // users is used when saving article for multiple users - let users = body.users || [] - const userId = body.userId + let users = data.users || [] + const userId = data.userId // userId is used when saving article for a single user if (userId) { users = [ { id: userId, - folder: body.folder, - libraryItemId: body.saveRequestId, + folder: data.folder, + libraryItemId: data.saveRequestId, }, ] } - const articleSavingRequestId = body.saveRequestId - const state = body.state - const labels = body.labels - const source = body.source || 'puppeteer-parse' - const taskId = body.taskId // taskId is used to update import status - const url = body.url - const locale = body.locale - const timezone = body.timezone - const rssFeedUrl = body.rssFeedUrl - const savedAt = body.savedAt - const publishedAt = body.publishedAt - const priority = body.priority + const articleSavingRequestId = data.saveRequestId + const state = data.state + const labels = data.labels + const source = data.source || 'puppeteer-parse' + const taskId = data.taskId // taskId is used to update import status + const url = data.url + const locale = data.locale + const timezone = data.timezone + const rssFeedUrl = data.rssFeedUrl + const savedAt = data.savedAt + const publishedAt = data.publishedAt + const priority = data.priority const logRecord: LogRecord = { url, @@ -225,25 +226,13 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { console.log(`Article parsing request`, logRecord) - // create redis source - const redisDataSource = new RedisDataSource({ - cache: { - url: process.env.REDIS_URL, - cert: process.env.REDIS_CERT, - }, - mq: { - url: process.env.MQ_REDIS_URL, - cert: process.env.MQ_REDIS_CERT, - }, - }) - try { const domain = new URL(url).hostname const isBlocked = await isDomainBlocked(redisDataSource, domain) if (isBlocked) { console.log('domain is blocked', domain) - return res.sendStatus(200) + return } const key = cacheKey(url, locale, timezone) @@ -312,7 +301,7 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { logRecord.error = 'unknown error' } - return res.sendStatus(500) + throw error } finally { logRecord.totalTime = Date.now() - functionStartTime console.log(`parse-page result`, logRecord) @@ -330,7 +319,29 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { }, } ) + } +} +export const contentFetchRequestHandler: RequestHandler = async (req, res) => { + const data = req.body + + // create redis source + const redisDataSource = new RedisDataSource({ + cache: { + url: process.env.REDIS_URL, + cert: process.env.REDIS_CERT, + }, + mq: { + url: process.env.MQ_REDIS_URL, + cert: process.env.MQ_REDIS_CERT, + }, + }) + + try { + await processFetchContentJob(redisDataSource, data) + } catch (error) { + return res.sendStatus(500) + } finally { await redisDataSource.shutdown() } diff --git a/packages/content-fetch/src/worker.ts b/packages/content-fetch/src/worker.ts new file mode 100644 index 000000000..72fe6a736 --- /dev/null +++ b/packages/content-fetch/src/worker.ts @@ -0,0 +1,186 @@ +import { RedisDataSource } from '@omnivore/utils' +import { Job, JobType, Queue, QueueEvents, RedisClient, Worker } from 'bullmq' +import express, { Express } from 'express' +import asyncHandler from 'express-async-handler' +import { JobData, processFetchContentJob } from './request_handler' + +const QUEUE_NAME = 'omnivore-content-fetch-queue' + +export const getContentFetchQueue = async ( + connection: RedisClient +): Promise => { + const queue = new Queue(QUEUE_NAME, { + connection, + defaultJobOptions: { + backoff: { + type: 'exponential', + delay: 2000, // 2 seconds + }, + removeOnComplete: { + age: 24 * 3600, // keep up to 24 hours + }, + removeOnFail: { + age: 7 * 24 * 3600, // keep up to 7 days + }, + }, + }) + await queue.waitUntilReady() + return queue +} + +const createWorker = (redisDataSource: RedisDataSource) => { + return new Worker( + QUEUE_NAME, + async (job: Job) => { + // process the job + await processFetchContentJob(redisDataSource, job.data) + }, + { + connection: redisDataSource.queueRedisClient, + autorun: true, // start processing jobs immediately + lockDuration: 60_000, // 1 minute + } + ) +} + +const main = () => { + console.log('[worker]: starting worker') + + const app: Express = express() + const port = process.env.PORT || 3002 + + // create redis source + const redisDataSource = new RedisDataSource({ + cache: { + url: process.env.REDIS_URL, + cert: process.env.REDIS_CERT, + }, + mq: { + url: process.env.MQ_REDIS_URL, + cert: process.env.MQ_REDIS_CERT, + }, + }) + + // respond healthy to auto-scaler. + app.get('/_ah/health', (req, res) => res.sendStatus(200)) + + app.get( + '/lifecycle/prestop', + asyncHandler(async (_req, res) => { + console.log('prestop lifecycle hook called.') + await worker.close() + res.sendStatus(200) + }) + ) + + app.get( + '/metrics', + asyncHandler(async (_, res) => { + const queue = await getContentFetchQueue(redisDataSource.queueRedisClient) + if (!queue) { + res.sendStatus(400) + return + } + + let output = '' + const jobsTypes: JobType[] = [ + 'active', + 'failed', + 'completed', + 'prioritized', + ] + const counts = await queue.getJobCounts(...jobsTypes) + + jobsTypes.forEach((metric) => { + output += `# TYPE omnivore_queue_messages_${metric} gauge\n` + output += `omnivore_queue_messages_${metric}{queue="${QUEUE_NAME}"} ${counts[metric]}\n` + }) + + // Export the age of the oldest prioritized job in the queue + const oldestJobs = await queue.getJobs(['prioritized'], 0, 1, true) + if (oldestJobs.length > 0) { + const currentTime = Date.now() + const ageInSeconds = (currentTime - oldestJobs[0].timestamp) / 1000 + output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` + output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE_NAME}"} ${ageInSeconds}\n` + } else { + output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` + output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE_NAME}"} ${0}\n` + } + + res.status(200).setHeader('Content-Type', 'text/plain').send(output) + }) + ) + + const server = app.listen(port, () => { + console.log(`[worker]: started`) + }) + + const worker = createWorker(redisDataSource) + + const queueEvents = new QueueEvents(QUEUE_NAME, { + connection: redisDataSource.queueRedisClient, + }) + + queueEvents.on('added', (job) => { + console.log('added job: ', job.jobId, job.name) + }) + + queueEvents.on('removed', (job) => { + console.log('removed job: ', job.jobId) + }) + + queueEvents.on('completed', (job) => { + console.log('completed job: ', job.jobId) + }) + + queueEvents.on('failed', (job) => { + console.log('failed job: ', job.jobId) + }) + + const gracefulShutdown = async (signal: string) => { + console.log(`[worker]: Received ${signal}, closing server...`) + await new Promise((resolve) => { + server.close((err) => { + console.log('[worker]: Express server closed') + if (err) { + console.log('[worker]: error stopping server', { err }) + } + + resolve() + }) + }) + + await worker.close() + console.log('[worker]: Worker closed') + + await redisDataSource.shutdown() + console.log('[worker]: Redis connection closed') + + process.exit(0) + } + + const handleShutdown = (signal: string) => { + return () => { + void gracefulShutdown(signal) + } + } + + process.on('SIGTERM', handleShutdown('SIGTERM')) + process.on('SIGINT', handleShutdown('SIGINT')) + + process.on('uncaughtException', (error) => { + console.error('Uncaught Exception:', error) + handleShutdown('uncaughtException') + }) + + process.on('unhandledRejection', (reason, promise) => { + console.error('Unhandled Rejection at:', promise, 'reason:', reason) + handleShutdown('unhandledRejection') + }) +} + +// only call main if the file was called from the CLI and wasn't required from another module +if (require.main === module) { + main() +} diff --git a/yarn.lock b/yarn.lock index 7dae9e991..4cc38bb0e 100644 --- a/yarn.lock +++ b/yarn.lock @@ -15721,6 +15721,11 @@ expr-eval@^2.0.2: resolved "https://registry.yarnpkg.com/expr-eval/-/expr-eval-2.0.2.tgz#fa6f044a7b0c93fde830954eb9c5b0f7fbc7e201" integrity sha512-4EMSHGOPSwAfBiibw3ndnP0AvjDWLsMvGOvWEZ2F96IGk0bIVdjQisOHxReSkE13mHcfbuCiXw+G4y0zv6N8Eg== +express-async-handler@^1.2.0: + version "1.2.0" + resolved "https://registry.yarnpkg.com/express-async-handler/-/express-async-handler-1.2.0.tgz#ffc9896061d90f8d2e71a2d2b8668db5b0934391" + integrity sha512-rCSVtPXRmQSW8rmik/AIb2P0op6l7r1fMW538yyvTMltCO4xQEWMmobfrIxN2V1/mVrgxB8Az3reYF6yUZw37w== + express-http-context2@^1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/express-http-context2/-/express-http-context2-1.0.0.tgz#58cd9fb0d233739e0dcd7aabb766d1dc74522d77" From 87b4ec503e44776069e94bb083ff345272f4d3e8 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Mon, 15 Jul 2024 21:47:27 +0800 Subject: [PATCH 02/19] enqueue content-fetch task to the queue --- packages/api/src/jobs/bulk_action.ts | 4 +- packages/api/src/jobs/rss/refreshAllFeeds.ts | 6 +- packages/api/src/jobs/rss/refreshFeed.ts | 11 +- packages/api/src/queue-processor.ts | 27 +-- .../src/services/create_page_save_request.ts | 18 +- packages/api/src/utils/createTask.ts | 163 +++++++----------- packages/api/test/util.ts | 4 +- packages/content-fetch/src/worker.ts | 5 +- 8 files changed, 103 insertions(+), 135 deletions(-) diff --git a/packages/api/src/jobs/bulk_action.ts b/packages/api/src/jobs/bulk_action.ts index 28ee8461c..a24f9b597 100644 --- a/packages/api/src/jobs/bulk_action.ts +++ b/packages/api/src/jobs/bulk_action.ts @@ -1,5 +1,5 @@ import { BulkActionType } from '../generated/graphql' -import { getBackendQueue } from '../queue-processor' +import { getQueue } from '../queue-processor' import { batchUpdateLibraryItems } from '../services/library_item' import { logger } from '../utils/logger' @@ -18,7 +18,7 @@ export const BULK_ACTION_JOB_NAME = 'bulk-action' export const bulkAction = async (data: BulkActionData) => { const { userId, action, query, labelIds, args, batchSize, count } = data - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { throw new Error('Queue not initialized') } diff --git a/packages/api/src/jobs/rss/refreshAllFeeds.ts b/packages/api/src/jobs/rss/refreshAllFeeds.ts index a75385187..29ef6f894 100644 --- a/packages/api/src/jobs/rss/refreshAllFeeds.ts +++ b/packages/api/src/jobs/rss/refreshAllFeeds.ts @@ -1,7 +1,7 @@ import { Job } from 'bullmq' import { DataSource } from 'typeorm' import { v4 as uuid } from 'uuid' -import { getBackendQueue, JOB_VERSION } from '../../queue-processor' +import { getQueue, JOB_VERSION } from '../../queue-processor' import { validateUrl } from '../../services/create_page_save_request' import { getJobPriority, RssSubscriptionGroup } from '../../utils/createTask' import { stringToHash } from '../../utils/helpers' @@ -124,7 +124,7 @@ const updateSubscriptionGroup = async ( } export const queueRSSRefreshAllFeedsJob = async () => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return false } @@ -144,7 +144,7 @@ export const queueRSSRefreshFeedJob = async ( payload: any, options = { priority: 'low' as QueuePriority } ): Promise => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } diff --git a/packages/api/src/jobs/rss/refreshFeed.ts b/packages/api/src/jobs/rss/refreshFeed.ts index 78069a118..e0418a91c 100644 --- a/packages/api/src/jobs/rss/refreshFeed.ts +++ b/packages/api/src/jobs/rss/refreshFeed.ts @@ -4,7 +4,6 @@ import { parseHTML } from 'linkedom' import Parser, { Item } from 'rss-parser' import { v4 as uuid } from 'uuid' import { FetchContentType } from '../../entity/subscription' -import { env } from '../../env' import { ArticleSavingRequestStatus } from '../../generated/graphql' import { redisDataSource } from '../../redis_data_source' import { validateUrl } from '../../services/create_page_save_request' @@ -14,7 +13,7 @@ import { updateSubscriptions, } from '../../services/update_subscription' import { findActiveUser } from '../../services/user' -import createHttpTaskWithToken from '../../utils/createTask' +import { enqueueFetchContentJob } from '../../utils/createTask' import { cleanUrl } from '../../utils/helpers' import { createThumbnailProxyUrl } from '../../utils/imageproxy' import { logger } from '../../utils/logger' @@ -332,7 +331,7 @@ const fetchContentAndCreateItem = async ( feedUrl: string, item: RssFeedItem ) => { - const payload = { + const data = { users, source: 'rss-feeder', url: item.link.trim(), @@ -343,11 +342,7 @@ const fetchContentAndCreateItem = async ( } try { - const task = await createHttpTaskWithToken({ - queue: 'omnivore-rss-feed-queue', - taskHandlerUrl: env.queue.contentFetchGCFUrl, - payload, - }) + const task = await enqueueFetchContentJob(data) return !!task } catch (error) { logger.error('Error while creating task', error) diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index d9b762937..9b7ea8ff3 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -82,7 +82,8 @@ import { CACHED_READING_POSITION_PREFIX } from './services/cached_reading_positi import { getJobPriority } from './utils/createTask' import { logger } from './utils/logger' -export const QUEUE_NAME = 'omnivore-backend-queue' +export const BACKEND_QUEUE_NAME = 'omnivore-backend-queue' +export const CONTENT_FETCH_QUEUE_NAME = 'omnivore-content-fetch-queue' export const JOB_VERSION = 'v001' const jobLatency = new client.Histogram({ @@ -94,8 +95,8 @@ const jobLatency = new client.Histogram({ registerMetric(jobLatency) -export const getBackendQueue = async ( - name = QUEUE_NAME +export const getQueue = async ( + name = BACKEND_QUEUE_NAME ): Promise => { if (!redisDataSource.workerRedisClient) { throw new Error('Can not create queues, redis is not initialized') @@ -124,7 +125,7 @@ export const createJobId = (jobName: string, userId: string) => `${jobName}_${userId}_${JOB_VERSION}` export const getJob = async (jobId: string, queueName?: string) => { - const queue = await getBackendQueue(queueName) + const queue = await getQueue(queueName) if (!queue) { return } @@ -152,12 +153,12 @@ export const jobStateToTaskState = ( export const createWorker = (connection: ConnectionOptions) => new Worker( - QUEUE_NAME, + BACKEND_QUEUE_NAME, async (job: Job) => { const executeJob = async (job: Job) => { switch (job.name) { case 'refresh-all-feeds': { - const queue = await getBackendQueue() + const queue = await getQueue() const counts = await queue?.getJobCounts('prioritized') if (counts && counts.wait > 1000) { return @@ -239,7 +240,7 @@ export const createWorker = (connection: ConnectionOptions) => ) const setupCronJobs = async () => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { logger.error('Unable to setup cron jobs. Queue is not available.') return @@ -278,7 +279,7 @@ const main = async () => { }) app.get('/metrics', async (_, res) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { res.sendStatus(400) return @@ -295,7 +296,7 @@ const main = async () => { jobsTypes.forEach((metric, idx) => { output += `# TYPE omnivore_queue_messages_${metric} gauge\n` - output += `omnivore_queue_messages_${metric}{queue="${QUEUE_NAME}"} ${counts[metric]}\n` + output += `omnivore_queue_messages_${metric}{queue="${BACKEND_QUEUE_NAME}"} ${counts[metric]}\n` }) if (redisDataSource.redisClient) { @@ -311,7 +312,7 @@ const main = async () => { ) if (cursor != '0') { output += `# TYPE omnivore_read_position_messages gauge\n` - output += `omnivore_read_position_messages{queue="${QUEUE_NAME}"} ${10_001}\n` + output += `omnivore_read_position_messages{queue="${BACKEND_QUEUE_NAME}"} ${10_001}\n` } else if (batch) { output += `# TYPE omnivore_read_position_messages gauge\n` output += `omnivore_read_position_messages{} ${batch.length}\n` @@ -324,10 +325,10 @@ const main = async () => { const currentTime = Date.now() const ageInSeconds = (currentTime - oldestJobs[0].timestamp) / 1000 output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` - output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE_NAME}"} ${ageInSeconds}\n` + output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${BACKEND_QUEUE_NAME}"} ${ageInSeconds}\n` } else { output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` - output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE_NAME}"} ${0}\n` + output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${BACKEND_QUEUE_NAME}"} ${0}\n` } const metrics = await getMetrics() @@ -357,7 +358,7 @@ const main = async () => { await setupCronJobs() - const queueEvents = new QueueEvents(QUEUE_NAME, { + const queueEvents = new QueueEvents(BACKEND_QUEUE_NAME, { connection: workerRedisClient, }) diff --git a/packages/api/src/services/create_page_save_request.ts b/packages/api/src/services/create_page_save_request.ts index 86d45765d..3d9a8ae6d 100644 --- a/packages/api/src/services/create_page_save_request.ts +++ b/packages/api/src/services/create_page_save_request.ts @@ -9,7 +9,7 @@ import { } from '../generated/graphql' import { createPubSubClient, PubsubClient } from '../pubsub' import { redisDataSource } from '../redis_data_source' -import { enqueueParseRequest } from '../utils/createTask' +import { enqueueFetchContentJob } from '../utils/createTask' import { cleanUrl, generateSlug } from '../utils/helpers' import { logger } from '../utils/logger' import { createOrUpdateLibraryItem } from './library_item' @@ -160,18 +160,22 @@ export const createPageSaveRequest = async ({ logger.debug('priority', { priority }) // enqueue task to parse item - await enqueueParseRequest({ + await enqueueFetchContentJob({ url, - userId, - saveRequestId: libraryItem.id, + users: [ + { + folder, + id: userId, + libraryItemId: libraryItem.id, + }, + ], priority, state, labels, locale, timezone, - savedAt, - publishedAt, - folder, + savedAt: savedAt?.toISOString(), + publishedAt: publishedAt?.toISOString(), rssFeedUrl: subscription, }) diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index e95de4fd5..fcaeb68f5 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -65,7 +65,11 @@ import { UploadContentJobData, UPLOAD_CONTENT_JOB, } from '../jobs/upload_content' -import { getBackendQueue, JOB_VERSION } from '../queue-processor' +import { + CONTENT_FETCH_QUEUE_NAME, + getQueue, + JOB_VERSION, +} from '../queue-processor' import { redisDataSource } from '../redis_data_source' import { writeDigest } from '../services/digest' import { signFeatureToken } from '../services/features' @@ -78,6 +82,8 @@ import View = google.cloud.tasks.v2.Task.View // Instantiates a client. const client = new CloudTasksClient() +const FETCH_CONTENT_JOB = 'fetch-content' + /** * we want to prioritized jobs by the expected time to complete * lower number means higher priority @@ -94,11 +100,13 @@ export const getJobPriority = (jobName: string): number => { case SYNC_READ_POSITIONS_JOB_NAME: case SEND_EMAIL_JOB: case UPDATE_HOME_JOB: + case `${FETCH_CONTENT_JOB}_high`: return 1 case TRIGGER_RULE_JOB_NAME: case CALL_WEBHOOK_JOB_NAME: case AI_SUMMARIZE_JOB_NAME: case PROCESS_YOUTUBE_VIDEO_JOB_NAME: + case `${FETCH_CONTENT_JOB}_low`: return 5 case BULK_ACTION_JOB_NAME: case `${REFRESH_FEED_JOB_NAME}_high`: @@ -320,6 +328,24 @@ export const deleteTask = async ( } } +export interface fetchContentJobData { + url: string + users: Array<{ + id: string + folder?: string + libraryItemId: string + }> + priority?: 'low' | 'high' + state?: ArticleSavingRequestStatus + labels?: Array + locale?: string + timezone?: string + savedAt?: string + publishedAt?: string + folder?: string + rssFeedUrl?: string +} + /** * Enqueues the task for the article content parsing with Puppeteer by URL * @param url - URL address of the article to parse @@ -329,88 +355,27 @@ export const deleteTask = async ( * @param queue - Queue name * @returns Name of the task created */ -export const enqueueParseRequest = async ({ - url, - userId, - saveRequestId, - priority = 'high', - queue = env.queue.name, - state, - labels, - locale, - timezone, - savedAt, - publishedAt, - folder, - rssFeedUrl, -}: { - url: string - userId: string - saveRequestId: string - priority?: 'low' | 'high' - queue?: string - state?: ArticleSavingRequestStatus - labels?: CreateLabelInput[] - locale?: string - timezone?: string - savedAt?: Date - publishedAt?: Date - folder?: string - rssFeedUrl?: string -}): Promise => { - const { GOOGLE_CLOUD_PROJECT } = process.env - const payload = { - url, - userId, - saveRequestId, - state, - labels, - locale, - timezone, - savedAt, - publishedAt, - folder, - rssFeedUrl, - priority, +export const enqueueFetchContentJob = async ( + data: fetchContentJobData +): Promise => { + const priority = data.priority || 'high' + + const queue = await getQueue(CONTENT_FETCH_QUEUE_NAME) + if (!queue) { + throw new Error('No queue found') } - // If there is no Google Cloud Project Id exposed, it means that we are in local environment - if (env.dev.isLocal || !GOOGLE_CLOUD_PROJECT) { - if (env.queue.contentFetchUrl) { - // Calling the handler function directly. - setTimeout(() => { - axios.post(env.queue.contentFetchUrl, payload).catch((error) => { - logError(error) - logger.error( - `Error occurred while requesting local puppeteer-parse function\nPlease, ensure your function is set up properly and running using "yarn start" from the "/pkg/gcf/puppeteer-parse" folder` - ) - }) - }, 0) - } - return '' - } - - // use GCF url for low priority tasks - const taskHandlerUrl = - priority === 'low' - ? env.queue.contentFetchGCFUrl - : env.queue.contentFetchUrl - - const createdTasks = await createHttpTaskWithToken({ - project: GOOGLE_CLOUD_PROJECT, - payload, - priority, - taskHandlerUrl, - queue, + const job = await queue.add(FETCH_CONTENT_JOB, data, { + priority: getJobPriority(`${FETCH_CONTENT_JOB}_${priority}`), + attempts: priority === 'high' ? 5 : 2, }) - if (!createdTasks || !createdTasks[0].name) { - logger.error(`Unable to get the name of the task`, { - payload, - createdTasks, - }) - throw new CreateTaskError(`Unable to get the name of the task`) + + if (!job || !job.id) { + logger.error('Error while enqueuing fetch-content job', data) + throw new Error('Error while enqueuing fetch-content job') } - return createdTasks[0].name + + return job.id } export const enqueueReminder = async ( @@ -629,7 +594,7 @@ export const enqueueExportToIntegration = async ( integrationId: string, userId: string ) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -647,7 +612,7 @@ export const enqueueThumbnailJob = async ( userId: string, libraryItemId: string ) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -714,7 +679,7 @@ export const enqueueRssFeedFetch = async ( } export const enqueueTriggerRuleJob = async (data: TriggerRuleJobData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -726,7 +691,7 @@ export const enqueueTriggerRuleJob = async (data: TriggerRuleJobData) => { } export const enqueueWebhookJob = async (data: CallWebhookJobData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -738,7 +703,7 @@ export const enqueueWebhookJob = async (data: CallWebhookJobData) => { } export const enqueueAISummarizeJob = async (data: AISummarizeJobData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -752,7 +717,7 @@ export const enqueueAISummarizeJob = async (data: AISummarizeJobData) => { export const enqueueProcessYouTubeVideo = async ( data: ProcessYouTubeVideoJobData ) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -767,7 +732,7 @@ export const enqueueProcessYouTubeVideo = async ( export const enqueueProcessYouTubeTranscript = async ( data: ProcessYouTubeTranscriptJobData ) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -780,7 +745,7 @@ export const enqueueProcessYouTubeTranscript = async ( } export const bulkEnqueueUpdateLabels = async (data: UpdateLabelsData[]) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return [] } @@ -806,7 +771,7 @@ export const bulkEnqueueUpdateLabels = async (data: UpdateLabelsData[]) => { } export const enqueueUpdateHighlight = async (data: UpdateHighlightData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -825,7 +790,7 @@ export const enqueueUpdateHighlight = async (data: UpdateHighlightData) => { } export const enqueueBulkAction = async (data: BulkActionData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -846,7 +811,7 @@ export const enqueueBulkAction = async (data: BulkActionData) => { } export const enqueueExportItem = async (jobData: ExportItemJobData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -862,7 +827,7 @@ export const enqueueExportItem = async (jobData: ExportItemJobData) => { } export const enqueueSendEmail = async (jobData: SendEmailJobData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -881,7 +846,7 @@ export const scheduledDigestJobOptions = ( }) export const removeDigestJobs = async (userId: string) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { throw new Error('No queue found') } @@ -911,7 +876,7 @@ export const enqueueCreateDigest = async ( data: CreateDigestData, schedule?: CreateDigestJobSchedule ): Promise => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { throw new Error('No queue found') } @@ -974,7 +939,7 @@ export const enqueueCreateDigest = async ( export const enqueueBulkUploadContentJob = async ( data: UploadContentJobData[] ) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return '' } @@ -998,7 +963,7 @@ export const updateHomeJobId = (userId: string) => `${UPDATE_HOME_JOB}_${userId}_${JOB_VERSION}` export const enqueueUpdateHomeJob = async (data: UpdateHomeJobData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -1016,7 +981,7 @@ export const updateScoreJobId = (userId: string) => `${SCORE_LIBRARY_ITEM_JOB}_${userId}_${JOB_VERSION}` export const enqueueScoreJob = async (data: ScoreLibraryItemJobData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -1034,7 +999,7 @@ export const enqueueGeneratePreviewContentJob = async ( libraryItemId: string, userId: string ) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -1056,7 +1021,7 @@ export const enqueueGeneratePreviewContentJob = async ( } export const enqueuePruneTrashJob = async (numDays: number) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -1075,7 +1040,7 @@ export const enqueuePruneTrashJob = async (numDays: number) => { } export const enqueueExpireFoldersJob = async () => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } diff --git a/packages/api/test/util.ts b/packages/api/test/util.ts index 6f043d98a..a20c12a76 100644 --- a/packages/api/test/util.ts +++ b/packages/api/test/util.ts @@ -4,7 +4,7 @@ import { nanoid } from 'nanoid' import supertest from 'supertest' import { v4 } from 'uuid' import { makeApolloServer } from '../src/apollo' -import { createWorker, QUEUE_NAME } from '../src/queue-processor' +import { BACKEND_QUEUE_NAME, createWorker } from '../src/queue-processor' import { createApp } from '../src/server' import { corsConfig } from '../src/utils/corsConfig' @@ -26,7 +26,7 @@ export const stopApolloServer = async () => { export const startWorker = (connection: ConnectionOptions) => { worker = createWorker(connection) - queueEvents = new QueueEvents(QUEUE_NAME, { + queueEvents = new QueueEvents(BACKEND_QUEUE_NAME, { connection, }) } diff --git a/packages/content-fetch/src/worker.ts b/packages/content-fetch/src/worker.ts index 72fe6a736..7f3dfc29b 100644 --- a/packages/content-fetch/src/worker.ts +++ b/packages/content-fetch/src/worker.ts @@ -38,7 +38,10 @@ const createWorker = (redisDataSource: RedisDataSource) => { { connection: redisDataSource.queueRedisClient, autorun: true, // start processing jobs immediately - lockDuration: 60_000, // 1 minute + limiter: { + max: 50, + duration: 1000, // 1 second + }, } ) } From 016775aadbb2b1577c9179be90df473b81a3490d Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Tue, 16 Jul 2024 09:57:00 +0800 Subject: [PATCH 03/19] fix tests --- packages/api/test/resolvers/article.test.ts | 10 ++-- .../resolvers/article_saving_request.test.ts | 49 +++++++------------ packages/api/test/routers/article.test.ts | 8 ++- 3 files changed, 32 insertions(+), 35 deletions(-) diff --git a/packages/api/test/resolvers/article.test.ts b/packages/api/test/resolvers/article.test.ts index b1fe5a07a..fa76abc86 100644 --- a/packages/api/test/resolvers/article.test.ts +++ b/packages/api/test/resolvers/article.test.ts @@ -646,8 +646,8 @@ describe('Article API', () => { context('when the source is rss-feeder and url is from youtube.com', () => { const source = 'rss-feeder' - const stub = sinon.stub(createTask, 'enqueueParseRequest') - const stub2 = sinon.stub(createTask, 'enqueueProcessYouTubeVideo') + const stub = sinon.stub(createTask, 'enqueueFetchContentJob') + sinon.stub(createTask, 'enqueueProcessYouTubeVideo') before(() => { url = 'https://www.youtube.com/watch?v=123' @@ -678,7 +678,11 @@ describe('Article API', () => { const url = 'https://blog.omnivore.app/new-url-1' before(() => { - sinon.replace(createTask, 'enqueueParseRequest', sinon.fake.resolves('')) + sinon.replace( + createTask, + 'enqueueFetchContentJob', + sinon.fake.resolves('') + ) }) beforeEach(() => { diff --git a/packages/api/test/resolvers/article_saving_request.test.ts b/packages/api/test/resolvers/article_saving_request.test.ts index 9df8b57f5..104146261 100644 --- a/packages/api/test/resolvers/article_saving_request.test.ts +++ b/packages/api/test/resolvers/article_saving_request.test.ts @@ -13,17 +13,9 @@ import * as createTask from '../../src/utils/createTask' import { createTestUser } from '../db' import { graphqlRequest, request } from '../util' -const articleSavingRequestQuery = ({ - id, - url, -}: { - id?: string - url?: string -}) => ` - query { - articleSavingRequest(id: ${id ? `"${id}"` : null}, url: ${ - url ? `"${url}"` : null -}) { +const articleSavingRequestQuery = ` + query ArticleSavingRequest($id: ID, $url: String) { + articleSavingRequest(id: $id, url: $url) { ... on ArticleSavingRequestSuccess { articleSavingRequest { id @@ -74,9 +66,9 @@ describe('ArticleSavingRequest API', () => { .post('/local/debug/fake-user-login') .send({ fakeEmail: user.email }) - authToken = res.body.authToken + authToken = res.body.authToken as string - sinon.replace(createTask, 'enqueueParseRequest', sinon.fake.resolves('')) + sinon.replace(createTask, 'enqueueFetchContentJob', sinon.fake.resolves('')) }) after(async () => { @@ -131,14 +123,14 @@ describe('ArticleSavingRequest API', () => { createArticleSavingRequestMutation(url), authToken ).expect(200) - id = res.body.data.createArticleSavingRequest.articleSavingRequest.id + id = res.body.data.createArticleSavingRequest.articleSavingRequest + .id as string }) it('returns the article saving request if exists', async () => { - const res = await graphqlRequest( - articleSavingRequestQuery({ url }), - authToken - ).expect(200) + const res = await graphqlRequest(articleSavingRequestQuery, authToken, { + id, + }).expect(200) expect( res.body.data.articleSavingRequest.articleSavingRequest.status @@ -146,10 +138,9 @@ describe('ArticleSavingRequest API', () => { }) it('returns the user profile info', async () => { - const res = await graphqlRequest( - articleSavingRequestQuery({ url }), - authToken - ).expect(200) + const res = await graphqlRequest(articleSavingRequestQuery, authToken, { + url, + }).expect(200) expect( res.body.data.articleSavingRequest.articleSavingRequest.user.profile @@ -158,10 +149,9 @@ describe('ArticleSavingRequest API', () => { }) it('returns the article saving request by id', async () => { - const res = await graphqlRequest( - articleSavingRequestQuery({ id }), - authToken - ).expect(200) + const res = await graphqlRequest(articleSavingRequestQuery, authToken, { + id, + }).expect(200) expect( res.body.data.articleSavingRequest.articleSavingRequest.status @@ -169,10 +159,9 @@ describe('ArticleSavingRequest API', () => { }) it('returns not_found if not exists', async () => { - const res = await graphqlRequest( - articleSavingRequestQuery({ id: 'invalid-id' }), - authToken - ).expect(200) + const res = await graphqlRequest(articleSavingRequestQuery, authToken, { + id: 'invalid-id', + }).expect(200) expect(res.body.data.articleSavingRequest.errorCodes).to.eql([ ArticleSavingRequestErrorCode.NotFound, diff --git a/packages/api/test/routers/article.test.ts b/packages/api/test/routers/article.test.ts index 1d0008602..b992cfb07 100644 --- a/packages/api/test/routers/article.test.ts +++ b/packages/api/test/routers/article.test.ts @@ -27,7 +27,7 @@ describe('/article/save API', () => { .post('/local/debug/fake-user-login') .send({ fakeEmail: user.email }) - authToken = res.body.authToken + authToken = res.body.authToken as string }) after(async () => { @@ -39,7 +39,11 @@ describe('/article/save API', () => { const url = 'https://blog.omnivore.app' before(() => { - sinon.replace(createTask, 'enqueueParseRequest', sinon.fake.resolves('')) + sinon.replace( + createTask, + 'enqueueFetchContentJob', + sinon.fake.resolves('') + ) }) after(() => { From 08fbb8aebfc50ddfc2a7b1e46cacbe101b90386f Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Thu, 25 Jul 2024 18:47:42 +0800 Subject: [PATCH 04/19] use different queues for fast,slow and rss content fetch jobs --- packages/api/src/jobs/rss/refreshFeed.ts | 8 +- packages/api/src/queue-processor.ts | 5 +- .../src/services/create_page_save_request.ts | 1 - packages/api/src/utils/createTask.ts | 31 ++- packages/content-fetch/Dockerfile-gcf | 53 ----- packages/content-fetch/package.json | 6 +- packages/content-fetch/src/app.ts | 162 +++++++++++++--- packages/content-fetch/src/index.ts | 31 --- packages/content-fetch/src/request_handler.ts | 27 --- packages/content-fetch/src/worker.ts | 181 +++++------------- yarn.lock | 2 +- 11 files changed, 212 insertions(+), 295 deletions(-) delete mode 100644 packages/content-fetch/Dockerfile-gcf delete mode 100644 packages/content-fetch/src/index.ts diff --git a/packages/api/src/jobs/rss/refreshFeed.ts b/packages/api/src/jobs/rss/refreshFeed.ts index e0418a91c..7882522da 100644 --- a/packages/api/src/jobs/rss/refreshFeed.ts +++ b/packages/api/src/jobs/rss/refreshFeed.ts @@ -13,7 +13,10 @@ import { updateSubscriptions, } from '../../services/update_subscription' import { findActiveUser } from '../../services/user' -import { enqueueFetchContentJob } from '../../utils/createTask' +import { + enqueueFetchContentJob, + FetchContentJobData, +} from '../../utils/createTask' import { cleanUrl } from '../../utils/helpers' import { createThumbnailProxyUrl } from '../../utils/imageproxy' import { logger } from '../../utils/logger' @@ -331,7 +334,7 @@ const fetchContentAndCreateItem = async ( feedUrl: string, item: RssFeedItem ) => { - const data = { + const data: FetchContentJobData = { users, source: 'rss-feeder', url: item.link.trim(), @@ -339,6 +342,7 @@ const fetchContentAndCreateItem = async ( rssFeedUrl: feedUrl, savedAt: item.isoDate, publishedAt: item.isoDate, + priority: 'low', } try { diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index 9b7ea8ff3..9468321a7 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -83,7 +83,10 @@ import { getJobPriority } from './utils/createTask' import { logger } from './utils/logger' export const BACKEND_QUEUE_NAME = 'omnivore-backend-queue' -export const CONTENT_FETCH_QUEUE_NAME = 'omnivore-content-fetch-queue' +export const CONTENT_FETCH_QUEUE = 'omnivore-content-fetch-queue' +export const CONTENT_FETCH_SLOW_QUEUE = 'omnivore-content-fetch-slow-queue' +export const CONTENT_FETCH_RSS_QUEUE = 'omnivore-content-fetch-rss-queue' + export const JOB_VERSION = 'v001' const jobLatency = new client.Histogram({ diff --git a/packages/api/src/services/create_page_save_request.ts b/packages/api/src/services/create_page_save_request.ts index 3d9a8ae6d..1c2723a99 100644 --- a/packages/api/src/services/create_page_save_request.ts +++ b/packages/api/src/services/create_page_save_request.ts @@ -157,7 +157,6 @@ export const createPageSaveRequest = async ({ // get priority by checking rate limit if not specified priority = priority || (await getPriorityByRateLimit(userId)) - logger.debug('priority', { priority }) // enqueue task to parse item await enqueueFetchContentJob({ diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index fcaeb68f5..68905cd0e 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -66,7 +66,9 @@ import { UPLOAD_CONTENT_JOB, } from '../jobs/upload_content' import { - CONTENT_FETCH_QUEUE_NAME, + CONTENT_FETCH_QUEUE, + CONTENT_FETCH_RSS_QUEUE, + CONTENT_FETCH_SLOW_QUEUE, getQueue, JOB_VERSION, } from '../queue-processor' @@ -100,13 +102,12 @@ export const getJobPriority = (jobName: string): number => { case SYNC_READ_POSITIONS_JOB_NAME: case SEND_EMAIL_JOB: case UPDATE_HOME_JOB: - case `${FETCH_CONTENT_JOB}_high`: + case FETCH_CONTENT_JOB: return 1 case TRIGGER_RULE_JOB_NAME: case CALL_WEBHOOK_JOB_NAME: case AI_SUMMARIZE_JOB_NAME: case PROCESS_YOUTUBE_VIDEO_JOB_NAME: - case `${FETCH_CONTENT_JOB}_low`: return 5 case BULK_ACTION_JOB_NAME: case `${REFRESH_FEED_JOB_NAME}_high`: @@ -328,7 +329,7 @@ export const deleteTask = async ( } } -export interface fetchContentJobData { +export interface FetchContentJobData { url: string users: Array<{ id: string @@ -344,6 +345,7 @@ export interface fetchContentJobData { publishedAt?: string folder?: string rssFeedUrl?: string + source?: string } /** @@ -356,18 +358,29 @@ export interface fetchContentJobData { * @returns Name of the task created */ export const enqueueFetchContentJob = async ( - data: fetchContentJobData + data: FetchContentJobData ): Promise => { - const priority = data.priority || 'high' + const getQueueName = (data: FetchContentJobData) => { + if (data.rssFeedUrl) { + return CONTENT_FETCH_RSS_QUEUE + } - const queue = await getQueue(CONTENT_FETCH_QUEUE_NAME) + if (data.priority === 'low') { + return CONTENT_FETCH_SLOW_QUEUE + } + + return CONTENT_FETCH_QUEUE + } + + const queueName = getQueueName(data) + const queue = await getQueue(queueName) if (!queue) { throw new Error('No queue found') } const job = await queue.add(FETCH_CONTENT_JOB, data, { - priority: getJobPriority(`${FETCH_CONTENT_JOB}_${priority}`), - attempts: priority === 'high' ? 5 : 2, + priority: getJobPriority(FETCH_CONTENT_JOB), + attempts: data.priority === 'low' ? 2 : 5, }) if (!job || !job.id) { diff --git a/packages/content-fetch/Dockerfile-gcf b/packages/content-fetch/Dockerfile-gcf deleted file mode 100644 index 7691ab1ae..000000000 --- a/packages/content-fetch/Dockerfile-gcf +++ /dev/null @@ -1,53 +0,0 @@ -FROM node:18.16-alpine - -# Installs latest Chromium (92) package. -RUN apk add --no-cache \ - chromium \ - nss \ - freetype \ - harfbuzz \ - ca-certificates \ - ttf-freefont \ - nodejs \ - yarn \ - g++ \ - make \ - python3 - -WORKDIR /app - -ENV CHROMIUM_PATH /usr/bin/chromium-browser -ENV LAUNCH_HEADLESS=true -ENV PORT 9090 - -COPY package.json . -COPY yarn.lock . -COPY tsconfig.json . -COPY .prettierrc . -COPY .eslintrc . - -COPY /packages/content-fetch/package.json ./packages/content-fetch/package.json -COPY /packages/content-handler/package.json ./packages/content-handler/package.json -COPY /packages/puppeteer-parse/package.json ./packages/puppeteer-parse/package.json -COPY /packages/utils/package.json ./packages/utils/package.json - -RUN yarn install --pure-lockfile - -ADD /packages/content-handler ./packages/content-handler -ADD /packages/puppeteer-parse ./packages/puppeteer-parse -ADD /packages/content-fetch ./packages/content-fetch -ADD /packages/utils ./packages/utils -RUN yarn workspace @omnivore/utils build -RUN yarn workspace @omnivore/content-handler build -RUN yarn workspace @omnivore/puppeteer-parse build -RUN yarn workspace @omnivore/content-fetch build - -# After building, fetch the production dependencies -RUN rm -rf /app/packages/content-fetch/node_modules -RUN rm -rf /app/node_modules -RUN yarn install --pure-lockfile --production - -EXPOSE 9090 - -# USER pptruser -ENTRYPOINT ["yarn", "workspace", "@omnivore/content-fetch", "start_gcf"] diff --git a/packages/content-fetch/package.json b/packages/content-fetch/package.json index 087ded95f..82abe8bfb 100644 --- a/packages/content-fetch/package.json +++ b/packages/content-fetch/package.json @@ -7,11 +7,9 @@ "build/src" ], "dependencies": { - "@google-cloud/functions-framework": "^3.0.0", "@google-cloud/storage": "^7.0.1", "@omnivore/puppeteer-parse": "^1.0.0", "@omnivore/utils": "1.0.0", - "@sentry/serverless": "^7.77.0", "bullmq": "^5.1.1", "dotenv": "^8.2.0", "express": "^4.17.1", @@ -27,9 +25,7 @@ "test:typecheck": "tsc --noEmit", "lint": "eslint src --ext ts,js,tsx,jsx", "build": "tsc", - "start": "node build/src/app.js", - "start_gcf": "functions-framework --port=9090 --target=puppeteer", - "start_worker": "node build/src/worker.js" + "start": "node build/src/app.js" }, "volta": { "extends": "../../package.json" diff --git a/packages/content-fetch/src/app.ts b/packages/content-fetch/src/app.ts index f3b949dfd..3cf0e4982 100644 --- a/packages/content-fetch/src/app.ts +++ b/packages/content-fetch/src/app.ts @@ -1,34 +1,138 @@ -import 'dotenv/config' -import express from 'express' -import { contentFetchRequestHandler } from './request_handler' +import { RedisDataSource } from '@omnivore/utils' +import { JobType } from 'bullmq' +import express, { Express } from 'express' +import asyncHandler from 'express-async-handler' +import { createWorkers, getQueue } from './worker' -const app = express() +const main = () => { + console.log('[worker]: starting workers') -app.use(express.json()) -app.use(express.urlencoded({ extended: true })) + const app: Express = express() + const port = process.env.PORT || 3002 -if (!process.env.VERIFICATION_TOKEN) { - throw new Error('VERIFICATION_TOKEN environment variable is not set') + // create redis source + const redisDataSource = new RedisDataSource({ + cache: { + url: process.env.REDIS_URL, + cert: process.env.REDIS_CERT, + }, + mq: { + url: process.env.MQ_REDIS_URL, + cert: process.env.MQ_REDIS_CERT, + }, + }) + + const workers = createWorkers(redisDataSource) + + const closeWorkers = async () => { + await Promise.all( + workers.map(async (worker) => { + await worker.close() + console.log('worker closed:', worker.name) + }) + ) + } + + // respond healthy to auto-scaler. + app.get('/_ah/health', (req, res) => res.sendStatus(200)) + + app.get( + '/lifecycle/prestop', + asyncHandler(async (_req, res) => { + console.log('prestop lifecycle hook called.') + await closeWorkers() + console.log('workers closed') + + res.sendStatus(200) + }) + ) + + app.get( + '/metrics', + asyncHandler(async (_, res) => { + let output = '' + + for (const worker of workers) { + const queueName = worker.name + const queue = await getQueue( + redisDataSource.queueRedisClient, + queueName + ) + + const jobsTypes: Array = [ + 'active', + 'failed', + 'completed', + 'prioritized', + ] + const counts = await queue.getJobCounts(...jobsTypes) + + jobsTypes.forEach((metric) => { + output += `# TYPE omnivore_queue_messages_${metric} gauge\n` + output += `omnivore_queue_messages_${metric}{queue="${queueName}"} ${counts[metric]}\n` + }) + + // Export the age of the oldest prioritized job in the queue + const oldestJobs = await queue.getJobs(['prioritized'], 0, 1, true) + if (oldestJobs.length > 0) { + const currentTime = Date.now() + const ageInSeconds = (currentTime - oldestJobs[0].timestamp) / 1000 + output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` + output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${queueName}"} ${ageInSeconds}\n` + } else { + output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` + output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${queueName}"} ${0}\n` + } + } + + res.status(200).setHeader('Content-Type', 'text/plain').send(output) + }) + ) + + const server = app.listen(port, () => { + console.log(`[worker]: Workers started`) + }) + + const gracefulShutdown = async (signal: string) => { + console.log(`[worker]: Received ${signal}, closing server...`) + await new Promise((resolve) => { + server.close((err) => { + console.log('[worker]: Express server closed') + if (err) { + console.log('[worker]: error stopping server', { err }) + } + + resolve() + }) + }) + + await closeWorkers() + console.log('[worker]: Workers closed') + + await redisDataSource.shutdown() + console.log('[worker]: Redis connection closed') + + process.exit(0) + } + + // eslint-disable-next-line @typescript-eslint/no-misused-promises + process.on('SIGINT', () => gracefulShutdown('SIGINT')) + + // eslint-disable-next-line @typescript-eslint/no-misused-promises + process.on('SIGTERM', () => gracefulShutdown('SIGTERM')) + + process.on('uncaughtException', function (err) { + // Handle the error safely + console.error(err, 'Uncaught exception') + }) + + process.on('unhandledRejection', (reason, promise) => { + // Handle the error safely + console.error({ promise, reason }, 'Unhandled Rejection at: Promise') + }) } -app.get('/_ah/health', (req, res) => res.sendStatus(200)) - -app.all('/', (req, res, next) => { - if (req.method !== 'GET' && req.method !== 'POST') { - console.error('request method is not GET or POST') - return res.sendStatus(405) - } - - if (req.query.token !== process.env.VERIFICATION_TOKEN) { - console.error('query does not include valid token') - return res.sendStatus(403) - } - - return contentFetchRequestHandler(req, res, next) -}) - -const PORT = process.env.PORT ? parseInt(process.env.PORT) : 8080 -app.listen(PORT, () => { - console.log(`App listening on port ${PORT}`) - console.log('Press Ctrl+C to quit.') -}) +// only call main if the file was called from the CLI and wasn't required from another module +if (require.main === module) { + main() +} diff --git a/packages/content-fetch/src/index.ts b/packages/content-fetch/src/index.ts deleted file mode 100644 index d3bc341d1..000000000 --- a/packages/content-fetch/src/index.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { HttpFunction } from '@google-cloud/functions-framework' -import * as Sentry from '@sentry/serverless' -import 'dotenv/config' -import { contentFetchRequestHandler } from './request_handler' - -Sentry.GCPFunction.init({ - dsn: process.env.SENTRY_DSN, - tracesSampleRate: 0, -}) - -/** - * Cloud Function entry point, HTTP trigger. - * Loads the requested URL via Puppeteer, captures page content and sends it to backend - * - * @param {Object} req Cloud Function request context. - * @param {Object} res Cloud Function response context. - */ -export const puppeteer = Sentry.GCPFunction.wrapHttpFunction( - contentFetchRequestHandler as HttpFunction -) - -/** - * Cloud Function entry point, HTTP trigger. - * Loads the requested URL via Puppeteer and captures a screenshot of the provided element - * - * @param {Object} req Cloud Function request context. - * Inlcudes: - * * url - URL address of the page to open - * @param {Object} res Cloud Function response context. - */ -// exports.preview = Sentry.GCPFunction.wrapHttpFunction(preview); diff --git a/packages/content-fetch/src/request_handler.ts b/packages/content-fetch/src/request_handler.ts index 0fb6765e9..a698ac9e6 100644 --- a/packages/content-fetch/src/request_handler.ts +++ b/packages/content-fetch/src/request_handler.ts @@ -2,7 +2,6 @@ import { Storage } from '@google-cloud/storage' import { fetchContent } from '@omnivore/puppeteer-parse' import { RedisDataSource } from '@omnivore/utils' import 'dotenv/config' -import { RequestHandler } from 'express' import { analytics } from './analytics' import { queueSavePageJob } from './job' @@ -321,29 +320,3 @@ export const processFetchContentJob = async ( ) } } - -export const contentFetchRequestHandler: RequestHandler = async (req, res) => { - const data = req.body - - // create redis source - const redisDataSource = new RedisDataSource({ - cache: { - url: process.env.REDIS_URL, - cert: process.env.REDIS_CERT, - }, - mq: { - url: process.env.MQ_REDIS_URL, - cert: process.env.MQ_REDIS_CERT, - }, - }) - - try { - await processFetchContentJob(redisDataSource, data) - } catch (error) { - return res.sendStatus(500) - } finally { - await redisDataSource.shutdown() - } - - res.sendStatus(200) -} diff --git a/packages/content-fetch/src/worker.ts b/packages/content-fetch/src/worker.ts index 7f3dfc29b..7cb09c393 100644 --- a/packages/content-fetch/src/worker.ts +++ b/packages/content-fetch/src/worker.ts @@ -1,15 +1,17 @@ import { RedisDataSource } from '@omnivore/utils' -import { Job, JobType, Queue, QueueEvents, RedisClient, Worker } from 'bullmq' -import express, { Express } from 'express' -import asyncHandler from 'express-async-handler' +import { Job, Queue, QueueEvents, RedisClient, Worker } from 'bullmq' import { JobData, processFetchContentJob } from './request_handler' -const QUEUE_NAME = 'omnivore-content-fetch-queue' +const FAST_QUEUE = 'omnivore-content-fetch-queue' +const SLOW_QUEUE = 'omnivore-content-fetch-slow-queue' +const RSS_QUEUE = 'omnivore-content-fetch-rss-queue' +const QUEUE_NAMES = [FAST_QUEUE, SLOW_QUEUE, RSS_QUEUE] as const -export const getContentFetchQueue = async ( - connection: RedisClient +export const getQueue = async ( + connection: RedisClient, + queueName: string ): Promise => { - const queue = new Queue(QUEUE_NAME, { + const queue = new Queue(queueName, { connection, defaultJobOptions: { backoff: { @@ -28,9 +30,29 @@ export const getContentFetchQueue = async ( return queue } -const createWorker = (redisDataSource: RedisDataSource) => { - return new Worker( - QUEUE_NAME, +const createWorker = (redisDataSource: RedisDataSource, queueName: string) => { + const getLimiter = (queueName: string) => { + switch (queueName) { + case SLOW_QUEUE: + return { + max: 5, + duration: 1000, // 1 second + } + case RSS_QUEUE: + return { + max: 3, + duration: 1000, // 1 second + } + default: + return { + max: 10, + duration: 1000, // 1 second + } + } + } + + const worker = new Worker( + queueName, async (job: Job) => { // process the job await processFetchContentJob(redisDataSource, job.data) @@ -38,152 +60,39 @@ const createWorker = (redisDataSource: RedisDataSource) => { { connection: redisDataSource.queueRedisClient, autorun: true, // start processing jobs immediately - limiter: { - max: 50, - duration: 1000, // 1 second - }, + limiter: getLimiter(queueName), } ) -} -const main = () => { - console.log('[worker]: starting worker') - - const app: Express = express() - const port = process.env.PORT || 3002 - - // create redis source - const redisDataSource = new RedisDataSource({ - cache: { - url: process.env.REDIS_URL, - cert: process.env.REDIS_CERT, - }, - mq: { - url: process.env.MQ_REDIS_URL, - cert: process.env.MQ_REDIS_CERT, - }, + worker.on('error', (err) => { + console.error('worker error:', err) }) - // respond healthy to auto-scaler. - app.get('/_ah/health', (req, res) => res.sendStatus(200)) - - app.get( - '/lifecycle/prestop', - asyncHandler(async (_req, res) => { - console.log('prestop lifecycle hook called.') - await worker.close() - res.sendStatus(200) - }) - ) - - app.get( - '/metrics', - asyncHandler(async (_, res) => { - const queue = await getContentFetchQueue(redisDataSource.queueRedisClient) - if (!queue) { - res.sendStatus(400) - return - } - - let output = '' - const jobsTypes: JobType[] = [ - 'active', - 'failed', - 'completed', - 'prioritized', - ] - const counts = await queue.getJobCounts(...jobsTypes) - - jobsTypes.forEach((metric) => { - output += `# TYPE omnivore_queue_messages_${metric} gauge\n` - output += `omnivore_queue_messages_${metric}{queue="${QUEUE_NAME}"} ${counts[metric]}\n` - }) - - // Export the age of the oldest prioritized job in the queue - const oldestJobs = await queue.getJobs(['prioritized'], 0, 1, true) - if (oldestJobs.length > 0) { - const currentTime = Date.now() - const ageInSeconds = (currentTime - oldestJobs[0].timestamp) / 1000 - output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` - output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE_NAME}"} ${ageInSeconds}\n` - } else { - output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` - output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE_NAME}"} ${0}\n` - } - - res.status(200).setHeader('Content-Type', 'text/plain').send(output) - }) - ) - - const server = app.listen(port, () => { - console.log(`[worker]: started`) - }) - - const worker = createWorker(redisDataSource) - - const queueEvents = new QueueEvents(QUEUE_NAME, { + const queueEvents = new QueueEvents(queueName, { connection: redisDataSource.queueRedisClient, }) queueEvents.on('added', (job) => { - console.log('added job: ', job.jobId, job.name) + console.log('added job:', job.jobId, job.name) }) queueEvents.on('removed', (job) => { - console.log('removed job: ', job.jobId) + console.log('removed job:', job.jobId) }) queueEvents.on('completed', (job) => { - console.log('completed job: ', job.jobId) + console.log('completed job:', job.jobId) }) queueEvents.on('failed', (job) => { - console.log('failed job: ', job.jobId) + console.log('failed job:', job.jobId) }) - const gracefulShutdown = async (signal: string) => { - console.log(`[worker]: Received ${signal}, closing server...`) - await new Promise((resolve) => { - server.close((err) => { - console.log('[worker]: Express server closed') - if (err) { - console.log('[worker]: error stopping server', { err }) - } - - resolve() - }) - }) - - await worker.close() - console.log('[worker]: Worker closed') - - await redisDataSource.shutdown() - console.log('[worker]: Redis connection closed') - - process.exit(0) - } - - const handleShutdown = (signal: string) => { - return () => { - void gracefulShutdown(signal) - } - } - - process.on('SIGTERM', handleShutdown('SIGTERM')) - process.on('SIGINT', handleShutdown('SIGINT')) - - process.on('uncaughtException', (error) => { - console.error('Uncaught Exception:', error) - handleShutdown('uncaughtException') - }) - - process.on('unhandledRejection', (reason, promise) => { - console.error('Unhandled Rejection at:', promise, 'reason:', reason) - handleShutdown('unhandledRejection') - }) + return worker } -// only call main if the file was called from the CLI and wasn't required from another module -if (require.main === module) { - main() +export const createWorkers = (redisDataSource: RedisDataSource) => { + return QUEUE_NAMES.map((queueName) => + createWorker(redisDataSource, queueName) + ) } diff --git a/yarn.lock b/yarn.lock index 4cc38bb0e..460282576 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2799,7 +2799,7 @@ google-gax "^3.5.7" protobufjs "^7.2.5" -"@google-cloud/functions-framework@3.1.2", "@google-cloud/functions-framework@^3.0.0": +"@google-cloud/functions-framework@3.1.2": version "3.1.2" resolved "https://registry.yarnpkg.com/@google-cloud/functions-framework/-/functions-framework-3.1.2.tgz#2cd92ce4307bf7f32555d028dca22e398473b410" integrity sha512-pYvEH65/Rqh1JNPdcBmorcV7Xoom2/iOSmbtYza8msro7Inl+qOYxbyMiQfySD2gwAyn38WyWPRqsDRcf/BFLg== From 9e54427d5d60b5ea47bb18309cb8025b27cca43a Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Thu, 25 Jul 2024 18:51:39 +0800 Subject: [PATCH 05/19] update github action --- .github/workflows/run-tests.yaml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/run-tests.yaml b/.github/workflows/run-tests.yaml index 572408202..de40ff68e 100644 --- a/.github/workflows/run-tests.yaml +++ b/.github/workflows/run-tests.yaml @@ -111,7 +111,5 @@ jobs: run: 'docker build --file packages/content-fetch/Dockerfile .' - name: Build the inbound-email-handler docker image run: 'docker build --file packages/inbound-email-handler/Dockerfile .' - - name: Build the content-fetch cloud function docker image - run: 'docker build --file packages/content-fetch/Dockerfile-gcf .' - name: Build the tts docker image run: 'docker build --file packages/text-to-speech/Dockerfile .' From 34edbeba56c63e7c011e4422bebcf4ccac3c362d Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Thu, 25 Jul 2024 21:32:50 +0800 Subject: [PATCH 06/19] fix dockerfile --- packages/content-fetch/package.json | 1 + yarn.lock | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/content-fetch/package.json b/packages/content-fetch/package.json index 82abe8bfb..f7fd84160 100644 --- a/packages/content-fetch/package.json +++ b/packages/content-fetch/package.json @@ -17,6 +17,7 @@ "posthog-node": "^3.6.3" }, "devDependencies": { + "@types/express": "^4.17.1", "chai": "^4.3.6", "mocha": "^10.0.0" }, diff --git a/yarn.lock b/yarn.lock index 460282576..6252be488 100644 --- a/yarn.lock +++ b/yarn.lock @@ -8067,7 +8067,7 @@ dependencies: "@types/express" "*" -"@types/express@*", "@types/express@^4.17.13", "@types/express@^4.17.14", "@types/express@^4.17.21", "@types/express@^4.17.7": +"@types/express@*", "@types/express@^4.17.1", "@types/express@^4.17.13", "@types/express@^4.17.14", "@types/express@^4.17.21", "@types/express@^4.17.7": version "4.17.21" resolved "https://registry.yarnpkg.com/@types/express/-/express-4.17.21.tgz#c26d4a151e60efe0084b23dc3369ebc631ed192d" integrity sha512-ejlPM315qwLpaQlQDTjPdsUFSc6ZsP4AN6AlWnogPjQ7CVi7PYF3YVz+CY3jE2pwYf7E/7HlDAN0rV2GxTG0HQ== From d29ac109cb5bf96ab0890739120bc6af921c4fb1 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Tue, 20 Aug 2024 16:09:09 +0800 Subject: [PATCH 07/19] slow and rss queue process 5 tasks/s and normal queue process 100 tasks per second --- packages/content-fetch/src/worker.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/content-fetch/src/worker.ts b/packages/content-fetch/src/worker.ts index 7cb09c393..ffa12962e 100644 --- a/packages/content-fetch/src/worker.ts +++ b/packages/content-fetch/src/worker.ts @@ -40,12 +40,12 @@ const createWorker = (redisDataSource: RedisDataSource, queueName: string) => { } case RSS_QUEUE: return { - max: 3, + max: 5, duration: 1000, // 1 second } default: return { - max: 10, + max: 100, duration: 1000, // 1 second } } From e41ccf7a8f2b8b4b4c98dbae137146d3267a8745 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Tue, 20 Aug 2024 17:06:36 +0800 Subject: [PATCH 08/19] deduplicate content fetch jobs --- packages/api/src/utils/createTask.ts | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index 68905cd0e..c35b39da7 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -378,9 +378,21 @@ export const enqueueFetchContentJob = async ( throw new Error('No queue found') } + // sort the data to make sure the hash is consistent + const sortedData = JSON.stringify(data, Object.keys(data).sort()) + const jobId = `${FETCH_CONTENT_JOB}_${stringToHash( + sortedData + )}_${JOB_VERSION}` const job = await queue.add(FETCH_CONTENT_JOB, data, { priority: getJobPriority(FETCH_CONTENT_JOB), - attempts: data.priority === 'low' ? 2 : 5, + attempts: data.priority === 'low' ? 2 : 3, + backoff: { + type: 'exponential', + delay: 2000, + }, + jobId, + removeOnComplete: true, + removeOnFail: true, }) if (!job || !job.id) { From 5bd272dde0f92955e7a0b051dc56909211bbae35 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Wed, 21 Aug 2024 12:08:23 +0800 Subject: [PATCH 09/19] use one queue with different priority for fetching content of rss feed item or saved url --- packages/api/src/jobs/rss/refreshFeed.ts | 10 ++- packages/api/src/queue-processor.ts | 2 - .../src/services/create_page_save_request.ts | 4 +- packages/api/src/utils/createTask.ts | 46 ++++------ packages/content-fetch/src/app.ts | 86 ++++++++----------- packages/content-fetch/src/job.ts | 15 ++-- packages/content-fetch/src/worker.ts | 39 ++------- 7 files changed, 78 insertions(+), 124 deletions(-) diff --git a/packages/api/src/jobs/rss/refreshFeed.ts b/packages/api/src/jobs/rss/refreshFeed.ts index 7882522da..43cacb011 100644 --- a/packages/api/src/jobs/rss/refreshFeed.ts +++ b/packages/api/src/jobs/rss/refreshFeed.ts @@ -35,6 +35,7 @@ interface RefreshFeedRequest { fetchContentTypes: FetchContentType[] folders: FolderType[] refreshContext?: RSSRefreshContext + priority?: 'low' | 'high' } export const isRefreshFeedRequest = (data: any): data is RefreshFeedRequest => { @@ -332,7 +333,8 @@ const createTask = async ( const fetchContentAndCreateItem = async ( users: UserConfig[], feedUrl: string, - item: RssFeedItem + item: RssFeedItem, + priority = 'low' as 'low' | 'high' ) => { const data: FetchContentJobData = { users, @@ -342,7 +344,7 @@ const fetchContentAndCreateItem = async ( rssFeedUrl: feedUrl, savedAt: item.isoDate, publishedAt: item.isoDate, - priority: 'low', + priority, } try { @@ -703,6 +705,7 @@ export const _refreshFeed = async (request: RefreshFeedRequest) => { fetchContentTypes, folders, refreshContext, + priority, } = request logger.info('Processing feed', feedUrl, { refreshContext: refreshContext }) @@ -771,7 +774,8 @@ export const _refreshFeed = async (request: RefreshFeedRequest) => { await fetchContentAndCreateItem( Array.from(task.users.values()), feedUrl, - task.item + task.item, + priority ) } diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index 9468321a7..aa5f2e7ed 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -84,8 +84,6 @@ import { logger } from './utils/logger' export const BACKEND_QUEUE_NAME = 'omnivore-backend-queue' export const CONTENT_FETCH_QUEUE = 'omnivore-content-fetch-queue' -export const CONTENT_FETCH_SLOW_QUEUE = 'omnivore-content-fetch-slow-queue' -export const CONTENT_FETCH_RSS_QUEUE = 'omnivore-content-fetch-rss-queue' export const JOB_VERSION = 'v001' diff --git a/packages/api/src/services/create_page_save_request.ts b/packages/api/src/services/create_page_save_request.ts index 1c2723a99..cde6140c3 100644 --- a/packages/api/src/services/create_page_save_request.ts +++ b/packages/api/src/services/create_page_save_request.ts @@ -57,7 +57,7 @@ const addRecentSavedItem = async (userId: string) => { // default: use normal queue const getPriorityByRateLimit = async ( userId: string -): Promise<'low' | 'high' | undefined> => { +): Promise<'low' | 'high'> => { const redisClient = redisDataSource.redisClient if (redisClient) { const oneMinuteAgo = Date.now() - 60 * 1000 @@ -75,6 +75,8 @@ const getPriorityByRateLimit = async ( logger.error('Failed to get priority by rate limit', { userId, error }) } } + + return 'high' } export const validateUrl = (url: string): URL => { diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index c35b39da7..4e6a4f862 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -65,13 +65,7 @@ import { UploadContentJobData, UPLOAD_CONTENT_JOB, } from '../jobs/upload_content' -import { - CONTENT_FETCH_QUEUE, - CONTENT_FETCH_RSS_QUEUE, - CONTENT_FETCH_SLOW_QUEUE, - getQueue, - JOB_VERSION, -} from '../queue-processor' +import { CONTENT_FETCH_QUEUE, getQueue, JOB_VERSION } from '../queue-processor' import { redisDataSource } from '../redis_data_source' import { writeDigest } from '../services/digest' import { signFeatureToken } from '../services/features' @@ -102,18 +96,21 @@ export const getJobPriority = (jobName: string): number => { case SYNC_READ_POSITIONS_JOB_NAME: case SEND_EMAIL_JOB: case UPDATE_HOME_JOB: - case FETCH_CONTENT_JOB: + case `${FETCH_CONTENT_JOB}_high`: return 1 case TRIGGER_RULE_JOB_NAME: case CALL_WEBHOOK_JOB_NAME: case AI_SUMMARIZE_JOB_NAME: case PROCESS_YOUTUBE_VIDEO_JOB_NAME: + case `${FETCH_CONTENT_JOB}_low`: + case `${FETCH_CONTENT_JOB}_rss_high`: return 5 case BULK_ACTION_JOB_NAME: case `${REFRESH_FEED_JOB_NAME}_high`: case PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME: case UPLOAD_CONTENT_JOB: case SCORE_LIBRARY_ITEM_JOB: + case `${FETCH_CONTENT_JOB}_rss_low`: return 10 case `${REFRESH_FEED_JOB_NAME}_low`: case EXPORT_ITEM_JOB_NAME: @@ -336,7 +333,7 @@ export interface FetchContentJobData { folder?: string libraryItemId: string }> - priority?: 'low' | 'high' + priority: 'low' | 'high' state?: ArticleSavingRequestStatus labels?: Array locale?: string @@ -360,39 +357,32 @@ export interface FetchContentJobData { export const enqueueFetchContentJob = async ( data: FetchContentJobData ): Promise => { - const getQueueName = (data: FetchContentJobData) => { - if (data.rssFeedUrl) { - return CONTENT_FETCH_RSS_QUEUE - } - - if (data.priority === 'low') { - return CONTENT_FETCH_SLOW_QUEUE - } - - return CONTENT_FETCH_QUEUE - } - - const queueName = getQueueName(data) - const queue = await getQueue(queueName) + const queue = await getQueue(CONTENT_FETCH_QUEUE) if (!queue) { throw new Error('No queue found') } + const jobName = `${FETCH_CONTENT_JOB}${data.rssFeedUrl ? '_rss' : ''}_${ + data.priority + }` + // sort the data to make sure the hash is consistent const sortedData = JSON.stringify(data, Object.keys(data).sort()) const jobId = `${FETCH_CONTENT_JOB}_${stringToHash( sortedData )}_${JOB_VERSION}` + const priority = getJobPriority(jobName) + const job = await queue.add(FETCH_CONTENT_JOB, data, { - priority: getJobPriority(FETCH_CONTENT_JOB), - attempts: data.priority === 'low' ? 2 : 3, + jobId, + removeOnComplete: true, + removeOnFail: true, + priority, + attempts: 2, backoff: { type: 'exponential', delay: 2000, }, - jobId, - removeOnComplete: true, - removeOnFail: true, }) if (!job || !job.id) { diff --git a/packages/content-fetch/src/app.ts b/packages/content-fetch/src/app.ts index 3cf0e4982..8ffb1ef6b 100644 --- a/packages/content-fetch/src/app.ts +++ b/packages/content-fetch/src/app.ts @@ -2,10 +2,10 @@ import { RedisDataSource } from '@omnivore/utils' import { JobType } from 'bullmq' import express, { Express } from 'express' import asyncHandler from 'express-async-handler' -import { createWorkers, getQueue } from './worker' +import { createWorker, getQueue, QUEUE } from './worker' const main = () => { - console.log('[worker]: starting workers') + console.log('Starting worker...') const app: Express = express() const port = process.env.PORT || 3002 @@ -22,16 +22,7 @@ const main = () => { }, }) - const workers = createWorkers(redisDataSource) - - const closeWorkers = async () => { - await Promise.all( - workers.map(async (worker) => { - await worker.close() - console.log('worker closed:', worker.name) - }) - ) - } + const worker = createWorker(redisDataSource) // respond healthy to auto-scaler. app.get('/_ah/health', (req, res) => res.sendStatus(200)) @@ -39,9 +30,10 @@ const main = () => { app.get( '/lifecycle/prestop', asyncHandler(async (_req, res) => { - console.log('prestop lifecycle hook called.') - await closeWorkers() - console.log('workers closed') + console.log('Prestop lifecycle hook called.') + + await worker.close() + console.log('Worker closed') res.sendStatus(200) }) @@ -52,37 +44,31 @@ const main = () => { asyncHandler(async (_, res) => { let output = '' - for (const worker of workers) { - const queueName = worker.name - const queue = await getQueue( - redisDataSource.queueRedisClient, - queueName - ) + const queue = await getQueue(redisDataSource.queueRedisClient) - const jobsTypes: Array = [ - 'active', - 'failed', - 'completed', - 'prioritized', - ] - const counts = await queue.getJobCounts(...jobsTypes) + const jobsTypes: Array = [ + 'active', + 'failed', + 'completed', + 'prioritized', + ] + const counts = await queue.getJobCounts(...jobsTypes) - jobsTypes.forEach((metric) => { - output += `# TYPE omnivore_queue_messages_${metric} gauge\n` - output += `omnivore_queue_messages_${metric}{queue="${queueName}"} ${counts[metric]}\n` - }) + jobsTypes.forEach((metric) => { + output += `# TYPE omnivore_queue_messages_${metric} gauge\n` + output += `omnivore_queue_messages_${metric}{queue="${QUEUE}"} ${counts[metric]}\n` + }) - // Export the age of the oldest prioritized job in the queue - const oldestJobs = await queue.getJobs(['prioritized'], 0, 1, true) - if (oldestJobs.length > 0) { - const currentTime = Date.now() - const ageInSeconds = (currentTime - oldestJobs[0].timestamp) / 1000 - output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` - output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${queueName}"} ${ageInSeconds}\n` - } else { - output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` - output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${queueName}"} ${0}\n` - } + // Export the age of the oldest prioritized job in the queue + const oldestJobs = await queue.getJobs(['prioritized'], 0, 1, true) + if (oldestJobs.length > 0) { + const currentTime = Date.now() + const ageInSeconds = (currentTime - oldestJobs[0].timestamp) / 1000 + output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` + output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE}"} ${ageInSeconds}\n` + } else { + output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` + output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE}"} ${0}\n` } res.status(200).setHeader('Content-Type', 'text/plain').send(output) @@ -90,27 +76,27 @@ const main = () => { ) const server = app.listen(port, () => { - console.log(`[worker]: Workers started`) + console.log('Worker started') }) const gracefulShutdown = async (signal: string) => { - console.log(`[worker]: Received ${signal}, closing server...`) + console.log(`Received ${signal}, closing server...`) await new Promise((resolve) => { server.close((err) => { - console.log('[worker]: Express server closed') + console.log('Express server closed') if (err) { - console.log('[worker]: error stopping server', { err }) + console.log('Error stopping server', { err }) } resolve() }) }) - await closeWorkers() - console.log('[worker]: Workers closed') + await worker.close() + console.log('Worker closed') await redisDataSource.shutdown() - console.log('[worker]: Redis connection closed') + console.log('Redis connection closed') process.exit(0) } diff --git a/packages/content-fetch/src/job.ts b/packages/content-fetch/src/job.ts index ae282bfb7..e3465b922 100644 --- a/packages/content-fetch/src/job.ts +++ b/packages/content-fetch/src/job.ts @@ -38,23 +38,24 @@ const getPriority = (job: SavePageJob): number => { // priority 5: jobs that are expected to finish in less than 10 second // priority 10: jobs that are expected to finish in less than 10 minutes // priority 100: jobs that are expected to finish in less than 1 hour - if (job.isRss) { - return 10 - } if (job.isImport) { return 100 } - return job.priority === 'low' ? 10 : 1 + if (job.isRss) { + return job.priority === 'low' ? 10 : 5 + } + + return job.priority === 'low' ? 5 : 1 } const getAttempts = (job: SavePageJob): number => { - if (job.isRss || job.isImport) { - // we don't want to retry rss or import jobs + if (job.isImport) { + // we don't want to retry import jobs return 1 } - return 3 + return job.isRss ? 2 : 3 } const getOpts = (job: SavePageJob): BulkJobOptions => { diff --git a/packages/content-fetch/src/worker.ts b/packages/content-fetch/src/worker.ts index ffa12962e..731178c94 100644 --- a/packages/content-fetch/src/worker.ts +++ b/packages/content-fetch/src/worker.ts @@ -2,14 +2,11 @@ import { RedisDataSource } from '@omnivore/utils' import { Job, Queue, QueueEvents, RedisClient, Worker } from 'bullmq' import { JobData, processFetchContentJob } from './request_handler' -const FAST_QUEUE = 'omnivore-content-fetch-queue' -const SLOW_QUEUE = 'omnivore-content-fetch-slow-queue' -const RSS_QUEUE = 'omnivore-content-fetch-rss-queue' -const QUEUE_NAMES = [FAST_QUEUE, SLOW_QUEUE, RSS_QUEUE] as const +export const QUEUE = 'omnivore-content-fetch-queue' export const getQueue = async ( connection: RedisClient, - queueName: string + queueName = QUEUE ): Promise => { const queue = new Queue(queueName, { connection, @@ -30,27 +27,10 @@ export const getQueue = async ( return queue } -const createWorker = (redisDataSource: RedisDataSource, queueName: string) => { - const getLimiter = (queueName: string) => { - switch (queueName) { - case SLOW_QUEUE: - return { - max: 5, - duration: 1000, // 1 second - } - case RSS_QUEUE: - return { - max: 5, - duration: 1000, // 1 second - } - default: - return { - max: 100, - duration: 1000, // 1 second - } - } - } - +export const createWorker = ( + redisDataSource: RedisDataSource, + queueName = QUEUE +) => { const worker = new Worker( queueName, async (job: Job) => { @@ -60,7 +40,6 @@ const createWorker = (redisDataSource: RedisDataSource, queueName: string) => { { connection: redisDataSource.queueRedisClient, autorun: true, // start processing jobs immediately - limiter: getLimiter(queueName), } ) @@ -90,9 +69,3 @@ const createWorker = (redisDataSource: RedisDataSource, queueName: string) => { return worker } - -export const createWorkers = (redisDataSource: RedisDataSource) => { - return QUEUE_NAMES.map((queueName) => - createWorker(redisDataSource, queueName) - ) -} From 0fbc6d0a87fe5e73182c39c06da77e5b80656508 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Wed, 21 Aug 2024 12:21:49 +0800 Subject: [PATCH 10/19] update importer to use content-fetch queue --- packages/content-fetch/src/request_handler.ts | 2 +- packages/import-handler/src/index.ts | 18 +++---- packages/import-handler/src/job.ts | 44 ++++++++++++++++- packages/import-handler/src/task.ts | 48 ------------------- 4 files changed, 52 insertions(+), 60 deletions(-) delete mode 100644 packages/import-handler/src/task.ts diff --git a/packages/content-fetch/src/request_handler.ts b/packages/content-fetch/src/request_handler.ts index a698ac9e6..a494d8d45 100644 --- a/packages/content-fetch/src/request_handler.ts +++ b/packages/content-fetch/src/request_handler.ts @@ -214,7 +214,7 @@ export const processFetchContentJob = async ( }, state, labelsToAdd: labels, - taskId: taskId, + taskId, locale, timezone, rssFeedUrl, diff --git a/packages/import-handler/src/index.ts b/packages/import-handler/src/index.ts index 672310954..a9cf4b730 100644 --- a/packages/import-handler/src/index.ts +++ b/packages/import-handler/src/index.ts @@ -10,10 +10,9 @@ import * as path from 'path' import { promisify } from 'util' import { v4 as uuid } from 'uuid' import { importCsv } from './csv' -import { queueEmailJob } from './job' +import { enqueueFetchContentJob, queueEmailJob } from './job' import { importMatterArchive } from './matterHistory' import { ImportStatus, updateMetrics } from './metrics' -import { CONTENT_FETCH_URL, createCloudTask } from './task' export enum ArticleSavingRequestStatus { Failed = 'FAILED', @@ -94,6 +93,7 @@ const shouldHandle = (data: StorageEvent) => { } const importURL = async ( + redisDataSource: RedisDataSource, userId: string, url: URL, source: string, @@ -103,18 +103,17 @@ const importURL = async ( savedAt?: Date, publishedAt?: Date ): Promise => { - return createCloudTask(CONTENT_FETCH_URL, { - userId, - source, + return enqueueFetchContentJob(redisDataSource, { url: url.toString(), - saveRequestId: '', + users: [{ id: userId, libraryItemId: '' }], + source, + taskId, state, labels: labels?.map((l) => { return { name: l } }), - taskId, - savedAt, - publishedAt, + savedAt: savedAt?.toISOString(), + publishedAt: publishedAt?.toISOString(), }) } @@ -194,6 +193,7 @@ const urlHandler = async ( try { // Imports are stored in the format imports//-.csv const result = await importURL( + ctx.redisDataSource, ctx.userId, url, ctx.source, diff --git a/packages/import-handler/src/job.ts b/packages/import-handler/src/job.ts index 0f040b8f3..a47fe1dce 100644 --- a/packages/import-handler/src/job.ts +++ b/packages/import-handler/src/job.ts @@ -1,8 +1,12 @@ import { RedisDataSource } from '@omnivore/utils' import { Queue } from 'bullmq' +import { ArticleSavingRequestStatus } from '.' + +const BACKEND_QUEUE = 'omnivore-backend-queue' +const CONTENT_FETCH_QUEUE = 'omnivore-content-fetch-queue' -const QUEUE_NAME = 'omnivore-backend-queue' export const SEND_EMAIL_JOB = 'send-email' +const FETCH_CONTENT_JOB = 'fetch-content' interface SendEmailJobData { userId: string @@ -11,13 +15,49 @@ interface SendEmailJobData { html?: string } +interface FetchContentJobData { + url: string + users: Array<{ + id: string + folder?: string + libraryItemId: string + }> + source: string + taskId: string + state?: ArticleSavingRequestStatus + labels?: Array<{ name: string }> + savedAt?: string + publishedAt?: string +} + export const queueEmailJob = async ( redisDataSource: RedisDataSource, data: SendEmailJobData ) => { - const queue = new Queue(QUEUE_NAME, { + const queue = new Queue(BACKEND_QUEUE, { connection: redisDataSource.queueRedisClient, }) await queue.add(SEND_EMAIL_JOB, data) } + +export const enqueueFetchContentJob = async ( + redisDataSource: RedisDataSource, + data: FetchContentJobData +): Promise => { + const queue = new Queue(CONTENT_FETCH_QUEUE, { + connection: redisDataSource.queueRedisClient, + }) + + const job = await queue.add(FETCH_CONTENT_JOB, data, { + priority: 100, + attempts: 1, + }) + + if (!job || !job.id) { + console.error('Error while enqueuing fetch-content job', data) + throw new Error('Error while enqueuing fetch-content job') + } + + return job.id +} diff --git a/packages/import-handler/src/task.ts b/packages/import-handler/src/task.ts deleted file mode 100644 index bccca7ecb..000000000 --- a/packages/import-handler/src/task.ts +++ /dev/null @@ -1,48 +0,0 @@ -/* eslint-disable @typescript-eslint/restrict-template-expressions */ -import { CloudTasksClient, protos } from '@google-cloud/tasks' - -const cloudTask = new CloudTasksClient() - -export const CONTENT_FETCH_URL = process.env.CONTENT_FETCH_GCF_URL - -export const createCloudTask = async ( - taskHandlerUrl: string | undefined, - payload: unknown, - requestHeaders?: Record, - queue = 'omnivore-import-queue' -) => { - const location = process.env.GCP_LOCATION - const project = process.env.GCP_PROJECT_ID - - if (!project || !location || !queue || !taskHandlerUrl) { - throw `Environment not configured: ${project}, ${location}, ${queue}, ${taskHandlerUrl}` - } - - const serviceAccountEmail = `${project}@appspot.gserviceaccount.com` - - const parent = cloudTask.queuePath(project, location, queue) - const convertedPayload = JSON.stringify(payload) - const body = Buffer.from(convertedPayload).toString('base64') - const task: protos.google.cloud.tasks.v2.ITask = { - httpRequest: { - httpMethod: 'POST', - url: taskHandlerUrl, - headers: { - 'Content-Type': 'application/json', - ...requestHeaders, - }, - body, - ...(serviceAccountEmail - ? { - oidcToken: { - serviceAccountEmail, - }, - } - : null), - }, - } - - return cloudTask.createTask({ parent, task }).then((result) => { - return result[0].name ?? undefined - }) -} From 72d89308c537c6a70e3d149f316043522aa4d5be Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Wed, 21 Aug 2024 12:24:12 +0800 Subject: [PATCH 11/19] process up to 10 jobs concurrently --- packages/content-fetch/src/worker.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/content-fetch/src/worker.ts b/packages/content-fetch/src/worker.ts index 731178c94..33cb44668 100644 --- a/packages/content-fetch/src/worker.ts +++ b/packages/content-fetch/src/worker.ts @@ -40,6 +40,10 @@ export const createWorker = ( { connection: redisDataSource.queueRedisClient, autorun: true, // start processing jobs immediately + limiter: { + max: 10, // process up to 10 jobs concurrently + duration: 1000, // 1 second + }, } ) From 03ff18af814dfbb630c98eba12df59614c98084a Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Wed, 21 Aug 2024 14:09:49 +0800 Subject: [PATCH 12/19] remove unused dependencies --- packages/import-handler/package.json | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/import-handler/package.json b/packages/import-handler/package.json index e845181a9..b274864c6 100644 --- a/packages/import-handler/package.json +++ b/packages/import-handler/package.json @@ -36,7 +36,6 @@ "@fast-csv/parse": "^5.0.0", "@google-cloud/functions-framework": "3.1.2", "@google-cloud/storage": "^7.0.1", - "@google-cloud/tasks": "^4.0.0", "@omnivore/readability": "1.0.0", "@omnivore/utils": "1.0.0", "@sentry/serverless": "^7.77.0", From 2ada017427f8e6ebae503989eb8babef86bcc6d9 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Wed, 21 Aug 2024 14:46:24 +0800 Subject: [PATCH 13/19] update queue admin --- pkg/bull-queue-admin/index.js | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/bull-queue-admin/index.js b/pkg/bull-queue-admin/index.js index fee71ed8f..49b275ec3 100644 --- a/pkg/bull-queue-admin/index.js +++ b/pkg/bull-queue-admin/index.js @@ -59,7 +59,10 @@ const run = async () => { const connection = new Redis(secrets.REDIS_URL, redisOptions(secrets)) console.log('set connection: ', connection) - const rssRefreshFeed = new Queue('omnivore-backend-queue', { + const backendQueue = new Queue('omnivore-backend-queue', { + connection: connection, + }) + const contentFetchQueue = new Queue('omnivore-content-fetch-queue', { connection: connection, }) @@ -67,7 +70,10 @@ const run = async () => { serverAdapter.setBasePath('/ui') createBullBoard({ - queues: [new BullMQAdapter(rssRefreshFeed)], + queues: [ + new BullMQAdapter(backendQueue), + new BullMQAdapter(contentFetchQueue), + ], serverAdapter, }) From 6de285432d156c148fd3e7476c0764ca7682914c Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Wed, 21 Aug 2024 16:53:50 +0800 Subject: [PATCH 14/19] fix importer status not updated if failed to fetch content --- packages/api/src/jobs/save_page.ts | 4 +- packages/content-fetch/package.json | 3 ++ packages/content-fetch/src/request_handler.ts | 54 ++++++++++++++++++- packages/content-fetch/src/worker.ts | 2 +- 4 files changed, 59 insertions(+), 4 deletions(-) diff --git a/packages/api/src/jobs/save_page.ts b/packages/api/src/jobs/save_page.ts index 850b36d8b..a721f4643 100644 --- a/packages/api/src/jobs/save_page.ts +++ b/packages/api/src/jobs/save_page.ts @@ -140,7 +140,7 @@ const sendImportStatusUpdate = async ( Authorization: auth as string, 'Content-Type': 'application/json', }, - timeout: REQUEST_TIMEOUT, + timeout: 5000, } ) } catch (e) { @@ -288,7 +288,7 @@ export const savePageJob = async (data: Data, attemptsMade: number) => { throw e } finally { - const lastAttempt = attemptsMade + 1 === MAX_IMPORT_ATTEMPTS + const lastAttempt = attemptsMade + 1 >= MAX_IMPORT_ATTEMPTS if (taskId && (isSaved || lastAttempt)) { logger.info('sending import status update') diff --git a/packages/content-fetch/package.json b/packages/content-fetch/package.json index f7fd84160..743a3bca0 100644 --- a/packages/content-fetch/package.json +++ b/packages/content-fetch/package.json @@ -10,14 +10,17 @@ "@google-cloud/storage": "^7.0.1", "@omnivore/puppeteer-parse": "^1.0.0", "@omnivore/utils": "1.0.0", + "axios": "^0.27.2", "bullmq": "^5.1.1", "dotenv": "^8.2.0", "express": "^4.17.1", "express-async-handler": "^1.2.0", + "jsonwebtoken": "^8.5.1", "posthog-node": "^3.6.3" }, "devDependencies": { "@types/express": "^4.17.1", + "@types/jsonwebtoken": "^8.5.0", "chai": "^4.3.6", "mocha": "^10.0.0" }, diff --git a/packages/content-fetch/src/request_handler.ts b/packages/content-fetch/src/request_handler.ts index a494d8d45..78310bf56 100644 --- a/packages/content-fetch/src/request_handler.ts +++ b/packages/content-fetch/src/request_handler.ts @@ -1,7 +1,10 @@ import { Storage } from '@google-cloud/storage' import { fetchContent } from '@omnivore/puppeteer-parse' import { RedisDataSource } from '@omnivore/utils' +import axios from 'axios' import 'dotenv/config' +import jwt from 'jsonwebtoken' +import { promisify } from 'util' import { analytics } from './analytics' import { queueSavePageJob } from './job' @@ -66,6 +69,14 @@ const NO_CACHE_URLS = [ 'https://deviceandbrowserinfo.com/info_device', ] +const signToken = promisify(jwt.sign) + +const IMPORTER_METRICS_COLLECTOR_URL = + process.env.IMPORTER_METRICS_COLLECTOR_URL +const JWT_SECRET = process.env.JWT_SECRET + +const MAX_IMPORT_ATTEMPTS = 1 + const uploadToBucket = async (filePath: string, data: string) => { await storage .bucket(bucketName) @@ -174,9 +185,43 @@ const incrementContentFetchFailure = async ( } } +const sendImportStatusUpdate = async ( + userId: string, + taskId: string, + isImported?: boolean +) => { + try { + if (!JWT_SECRET || !IMPORTER_METRICS_COLLECTOR_URL) { + console.error('JWT_SECRET or IMPORTER_METRICS_COLLECTOR_URL is not set') + return + } + + console.log('sending import status update') + const auth = await signToken({ uid: userId }, JWT_SECRET) + + await axios.post( + IMPORTER_METRICS_COLLECTOR_URL, + { + taskId, + status: isImported ? 'imported' : 'failed', + }, + { + headers: { + Authorization: auth as string, + 'Content-Type': 'application/json', + }, + timeout: 5000, + } + ) + } catch (e) { + console.error('Failed to send import status update', e) + } +} + export const processFetchContentJob = async ( redisDataSource: RedisDataSource, - data: JobData + data: JobData, + attemptsMade: number ) => { const functionStartTime = Date.now() @@ -318,5 +363,12 @@ export const processFetchContentJob = async ( }, } ) + + const lastAttempt = attemptsMade + 1 >= MAX_IMPORT_ATTEMPTS + if (logRecord.error && taskId && lastAttempt) { + console.log('sending import status update') + // send failed to import status to update the metrics for importer + await sendImportStatusUpdate(users[0].id, taskId, false) + } } } diff --git a/packages/content-fetch/src/worker.ts b/packages/content-fetch/src/worker.ts index 33cb44668..99672497e 100644 --- a/packages/content-fetch/src/worker.ts +++ b/packages/content-fetch/src/worker.ts @@ -35,7 +35,7 @@ export const createWorker = ( queueName, async (job: Job) => { // process the job - await processFetchContentJob(redisDataSource, job.data) + await processFetchContentJob(redisDataSource, job.data, job.attemptsMade) }, { connection: redisDataSource.queueRedisClient, From 0366c426bc5bae8ce1bcf278cdfb004d17ac1e03 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Wed, 21 Aug 2024 17:58:58 +0800 Subject: [PATCH 15/19] process up to 2 jobs concurrently --- packages/api/src/resolvers/integrations/index.ts | 5 +++-- packages/content-fetch/src/worker.ts | 6 ++++-- packages/import-handler/src/job.ts | 14 ++++++++++++++ 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/packages/api/src/resolvers/integrations/index.ts b/packages/api/src/resolvers/integrations/index.ts index d976fa6a5..a893c3e79 100644 --- a/packages/api/src/resolvers/integrations/index.ts +++ b/packages/api/src/resolvers/integrations/index.ts @@ -245,8 +245,9 @@ export const importFromIntegrationResolver = authorized< authToken, integration.importItemState || ImportItemState.Unarchived ) - // update task name in integration - await updateIntegration(integration.id, { taskName }, uid) + log.info('task created', taskName) + // // update task name in integration + // await updateIntegration(integration.id, { taskName }, uid) analytics.capture({ distinctId: uid, diff --git a/packages/content-fetch/src/worker.ts b/packages/content-fetch/src/worker.ts index 99672497e..0d28d4f5a 100644 --- a/packages/content-fetch/src/worker.ts +++ b/packages/content-fetch/src/worker.ts @@ -40,10 +40,12 @@ export const createWorker = ( { connection: redisDataSource.queueRedisClient, autorun: true, // start processing jobs immediately + // process up to 10 jobs in a second limiter: { - max: 10, // process up to 10 jobs concurrently - duration: 1000, // 1 second + max: 10, + duration: 1000, }, + concurrency: 2, // process up to 2 jobs concurrently } ) diff --git a/packages/import-handler/src/job.ts b/packages/import-handler/src/job.ts index a47fe1dce..c44156cb0 100644 --- a/packages/import-handler/src/job.ts +++ b/packages/import-handler/src/job.ts @@ -1,12 +1,14 @@ import { RedisDataSource } from '@omnivore/utils' import { Queue } from 'bullmq' import { ArticleSavingRequestStatus } from '.' +import crypto from 'crypto' const BACKEND_QUEUE = 'omnivore-backend-queue' const CONTENT_FETCH_QUEUE = 'omnivore-content-fetch-queue' export const SEND_EMAIL_JOB = 'send-email' const FETCH_CONTENT_JOB = 'fetch-content' +const JOB_VERSION = 'v001' interface SendEmailJobData { userId: string @@ -30,6 +32,10 @@ interface FetchContentJobData { publishedAt?: string } +export const stringToHash = (str: string): string => { + return crypto.createHash('md5').update(str).digest('hex') +} + export const queueEmailJob = async ( redisDataSource: RedisDataSource, data: SendEmailJobData @@ -49,7 +55,15 @@ export const enqueueFetchContentJob = async ( connection: redisDataSource.queueRedisClient, }) + // sort the data to make sure the hash is consistent + const sortedData = JSON.stringify(data, Object.keys(data).sort()) + const jobId = `${FETCH_CONTENT_JOB}_${stringToHash( + sortedData + )}_${JOB_VERSION}` const job = await queue.add(FETCH_CONTENT_JOB, data, { + jobId, + removeOnComplete: true, + removeOnFail: true, priority: 100, attempts: 1, }) From ae9c6cc8b2a2ca501dd86f985648338d62abbcd1 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Wed, 21 Aug 2024 18:25:12 +0800 Subject: [PATCH 16/19] fix tests --- .../api/test/resolvers/integrations.test.ts | 128 ++++++++++-------- 1 file changed, 74 insertions(+), 54 deletions(-) diff --git a/packages/api/test/resolvers/integrations.test.ts b/packages/api/test/resolvers/integrations.test.ts index bc716e915..a8278b06c 100644 --- a/packages/api/test/resolvers/integrations.test.ts +++ b/packages/api/test/resolvers/integrations.test.ts @@ -19,7 +19,7 @@ chai.use(sinonChai) describe('Integrations resolvers', () => { const READWISE_API_URL = 'https://readwise.io/api/v2' - + let loginUser: User let authToken: string @@ -30,7 +30,7 @@ describe('Integrations resolvers', () => { .post('/local/debug/fake-user-login') .send({ fakeEmail: loginUser.email }) - authToken = res.body.authToken + authToken = res.body.authToken as string }) after(async () => { @@ -39,19 +39,9 @@ describe('Integrations resolvers', () => { describe('setIntegration API', () => { const validToken = 'valid-token' - const query = ( - id = '', - name = 'READWISE', - token: string = 'test token', - enabled = true - ) => ` - mutation { - setIntegration(input: { - id: "${id}", - name: "${name}", - token: "${token}", - enabled: ${enabled}, - }) { + const query = ` + mutation SetIntegration($input: SetIntegrationInput!) { + setIntegration(input: $input) { ... on SetIntegrationSuccess { integration { id @@ -79,6 +69,8 @@ describe('Integrations resolvers', () => { .reply(204) .persist() integrationName = 'READWISE' + enabled = true + token = 'test token' }) after(() => { @@ -101,10 +93,14 @@ describe('Integrations resolvers', () => { }) it('returns InvalidToken error code', async () => { - const res = await graphqlRequest( - query(integrationId, integrationName, token), - authToken - ) + const res = await graphqlRequest(query, authToken, { + input: { + id: integrationId, + name: integrationName, + token, + enabled, + }, + }) expect(res.body.data.setIntegration.errorCodes).to.eql([ SetIntegrationErrorCode.InvalidToken, ]) @@ -124,10 +120,14 @@ describe('Integrations resolvers', () => { }) it('creates new integration', async () => { - const res = await graphqlRequest( - query(integrationId, integrationName, token), - authToken - ) + const res = await graphqlRequest(query, authToken, { + input: { + id: integrationId, + name: integrationName, + token, + enabled, + }, + }) expect(res.body.data.setIntegration.integration.enabled).to.be.true }) }) @@ -142,10 +142,9 @@ describe('Integrations resolvers', () => { }) it('returns NotFound error code', async () => { - const res = await graphqlRequest( - query(integrationId, integrationName), - authToken - ) + const res = await graphqlRequest(query, authToken, { + input: { id: integrationId, name: integrationName, enabled, token }, + }) expect(res.body.data.setIntegration.errorCodes).to.eql([ SetIntegrationErrorCode.NotFound, ]) @@ -163,6 +162,7 @@ describe('Integrations resolvers', () => { user: { id: otherUser.id }, name: 'READWISE', token: 'fakeToken', + enabled, }, otherUser.id ) @@ -175,10 +175,14 @@ describe('Integrations resolvers', () => { }) it('returns Unauthorized error code', async () => { - const res = await graphqlRequest( - query(integrationId, integrationName), - authToken - ) + const res = await graphqlRequest(query, authToken, { + input: { + id: integrationId, + name: integrationName, + enabled, + token, + }, + }) expect(res.body.data.setIntegration.errorCodes).to.eql([ SetIntegrationErrorCode.NotFound, ]) @@ -192,6 +196,7 @@ describe('Integrations resolvers', () => { user: { id: loginUser.id }, name: 'READWISE', token: 'fakeToken', + enabled, }, loginUser.id ) @@ -208,17 +213,25 @@ describe('Integrations resolvers', () => { }) afterEach(async () => { - await updateIntegration(existingIntegration.id, { - taskName: 'some task name', - enabled: true, - }, loginUser.id) + await updateIntegration( + existingIntegration.id, + { + taskName: 'some task name', + enabled: true, + }, + loginUser.id + ) }) it('disables integration', async () => { - const res = await graphqlRequest( - query(integrationId, integrationName, token, enabled), - authToken - ) + const res = await graphqlRequest(query, authToken, { + input: { + id: integrationId, + name: integrationName, + token, + enabled, + }, + }) expect(res.body.data.setIntegration.integration.enabled).to.be .false }) @@ -230,17 +243,25 @@ describe('Integrations resolvers', () => { }) afterEach(async () => { - await updateIntegration(existingIntegration.id, { - taskName: null, - enabled: false, - }, loginUser.id) + await updateIntegration( + existingIntegration.id, + { + taskName: null, + enabled: false, + }, + loginUser.id + ) }) it('enables integration', async () => { - const res = await graphqlRequest( - query(integrationId, integrationName, token, enabled), - authToken - ) + const res = await graphqlRequest(query, authToken, { + input: { + id: integrationId, + name: integrationName, + token, + enabled, + }, + }) expect(res.body.data.setIntegration.integration.enabled).to.be .true }) @@ -333,9 +354,12 @@ describe('Integrations resolvers', () => { query(existingIntegration.id), authToken ) - const integration = await findIntegration({ - id: existingIntegration.id, - }, loginUser.id) + const integration = await findIntegration( + { + id: existingIntegration.id, + }, + loginUser.id + ) expect(res.body.data.deleteIntegration.integration).to.be.an('object') expect(res.body.data.deleteIntegration.integration.id).to.eql( @@ -383,10 +407,6 @@ describe('Integrations resolvers', () => { authToken ).expect(200) expect(res.body.data.importFromIntegration.success).to.be.true - const integration = await findIntegration({ - id: existingIntegration.id, - }, loginUser.id) - expect(integration?.taskName).not.to.be.null }) }) From aa2caca9443f2bb17b3ad84e045153368b679bde Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Wed, 21 Aug 2024 18:47:23 +0800 Subject: [PATCH 17/19] do not cache content from https://jacksonh.org --- packages/api/src/services/library_item.ts | 2 +- packages/content-fetch/src/request_handler.ts | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/api/src/services/library_item.ts b/packages/api/src/services/library_item.ts index 2a64bd77a..bd64cf1e0 100644 --- a/packages/api/src/services/library_item.ts +++ b/packages/api/src/services/library_item.ts @@ -1368,7 +1368,7 @@ export const deleteLibraryItemsByUserId = async (userId: string) => { } export const batchDelete = async (criteria: FindOptionsWhere) => { - const batchSize = 1000 + const batchSize = 20 const qb = libraryItemRepository.createQueryBuilder().where(criteria) const countSql = queryBuilderToRawSql(qb.select('COUNT(1)')) diff --git a/packages/content-fetch/src/request_handler.ts b/packages/content-fetch/src/request_handler.ts index 78310bf56..e5ff9bc97 100644 --- a/packages/content-fetch/src/request_handler.ts +++ b/packages/content-fetch/src/request_handler.ts @@ -67,6 +67,7 @@ const bucketName = process.env.GCS_UPLOAD_BUCKET || 'omnivore-files' const NO_CACHE_URLS = [ 'https://deviceandbrowserinfo.com/are_you_a_bot', 'https://deviceandbrowserinfo.com/info_device', + 'https://jacksonh.org', ] const signToken = promisify(jwt.sign) @@ -275,6 +276,7 @@ export const processFetchContentJob = async ( const isBlocked = await isDomainBlocked(redisDataSource, domain) if (isBlocked) { console.log('domain is blocked', domain) + logRecord.error = 'domain is blocked' return } From fe87d23a911114c1ed90d4be8437857fd496e055 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Thu, 22 Aug 2024 12:24:41 +0800 Subject: [PATCH 18/19] use cloud task for fetching content of rss items if USE_CONTENT_FETCH_QUEUE is not set to true in the environment --- packages/api/src/jobs/rss/refreshFeed.ts | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/packages/api/src/jobs/rss/refreshFeed.ts b/packages/api/src/jobs/rss/refreshFeed.ts index 43cacb011..b0759c546 100644 --- a/packages/api/src/jobs/rss/refreshFeed.ts +++ b/packages/api/src/jobs/rss/refreshFeed.ts @@ -4,6 +4,7 @@ import { parseHTML } from 'linkedom' import Parser, { Item } from 'rss-parser' import { v4 as uuid } from 'uuid' import { FetchContentType } from '../../entity/subscription' +import { env } from '../../env' import { ArticleSavingRequestStatus } from '../../generated/graphql' import { redisDataSource } from '../../redis_data_source' import { validateUrl } from '../../services/create_page_save_request' @@ -13,7 +14,7 @@ import { updateSubscriptions, } from '../../services/update_subscription' import { findActiveUser } from '../../services/user' -import { +import createHttpTaskWithToken, { enqueueFetchContentJob, FetchContentJobData, } from '../../utils/createTask' @@ -348,8 +349,19 @@ const fetchContentAndCreateItem = async ( } try { - const task = await enqueueFetchContentJob(data) - return !!task + const useContentFetchQueue = process.env.USE_CONTENT_FETCH_QUEUE === 'true' + if (useContentFetchQueue) { + return await enqueueFetchContentJob(data) + } + + return await createHttpTaskWithToken({ + queue: 'omnivore-rss-feed-queue', + taskHandlerUrl: env.queue.contentFetchGCFUrl, + payload: { + ...data, + priority: 'high', // use one queue for all RSS feeds for now + }, + }) } catch (error) { logger.error('Error while creating task', error) return false From 3cd3817569ca61f94d9ee8778783fa1c0903ed13 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Thu, 22 Aug 2024 12:27:26 +0800 Subject: [PATCH 19/19] update the env var name --- packages/api/src/jobs/rss/refreshFeed.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/api/src/jobs/rss/refreshFeed.ts b/packages/api/src/jobs/rss/refreshFeed.ts index b0759c546..e568d750d 100644 --- a/packages/api/src/jobs/rss/refreshFeed.ts +++ b/packages/api/src/jobs/rss/refreshFeed.ts @@ -349,8 +349,9 @@ const fetchContentAndCreateItem = async ( } try { - const useContentFetchQueue = process.env.USE_CONTENT_FETCH_QUEUE === 'true' - if (useContentFetchQueue) { + const contentFetchQueueEnabled = + process.env.CONTENT_FETCH_QUEUE_ENABLED === 'true' + if (contentFetchQueueEnabled) { return await enqueueFetchContentJob(data) }