From 08fbb8aebfc50ddfc2a7b1e46cacbe101b90386f Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Thu, 25 Jul 2024 18:47:42 +0800 Subject: [PATCH] use different queues for fast,slow and rss content fetch jobs --- packages/api/src/jobs/rss/refreshFeed.ts | 8 +- packages/api/src/queue-processor.ts | 5 +- .../src/services/create_page_save_request.ts | 1 - packages/api/src/utils/createTask.ts | 31 ++- packages/content-fetch/Dockerfile-gcf | 53 ----- packages/content-fetch/package.json | 6 +- packages/content-fetch/src/app.ts | 162 +++++++++++++--- packages/content-fetch/src/index.ts | 31 --- packages/content-fetch/src/request_handler.ts | 27 --- packages/content-fetch/src/worker.ts | 181 +++++------------- yarn.lock | 2 +- 11 files changed, 212 insertions(+), 295 deletions(-) delete mode 100644 packages/content-fetch/Dockerfile-gcf delete mode 100644 packages/content-fetch/src/index.ts diff --git a/packages/api/src/jobs/rss/refreshFeed.ts b/packages/api/src/jobs/rss/refreshFeed.ts index e0418a91c..7882522da 100644 --- a/packages/api/src/jobs/rss/refreshFeed.ts +++ b/packages/api/src/jobs/rss/refreshFeed.ts @@ -13,7 +13,10 @@ import { updateSubscriptions, } from '../../services/update_subscription' import { findActiveUser } from '../../services/user' -import { enqueueFetchContentJob } from '../../utils/createTask' +import { + enqueueFetchContentJob, + FetchContentJobData, +} from '../../utils/createTask' import { cleanUrl } from '../../utils/helpers' import { createThumbnailProxyUrl } from '../../utils/imageproxy' import { logger } from '../../utils/logger' @@ -331,7 +334,7 @@ const fetchContentAndCreateItem = async ( feedUrl: string, item: RssFeedItem ) => { - const data = { + const data: FetchContentJobData = { users, source: 'rss-feeder', url: item.link.trim(), @@ -339,6 +342,7 @@ const fetchContentAndCreateItem = async ( rssFeedUrl: feedUrl, savedAt: item.isoDate, publishedAt: item.isoDate, + priority: 'low', } try { diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index 9b7ea8ff3..9468321a7 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -83,7 +83,10 @@ import { getJobPriority } from './utils/createTask' import { logger } from './utils/logger' export const BACKEND_QUEUE_NAME = 'omnivore-backend-queue' -export const CONTENT_FETCH_QUEUE_NAME = 'omnivore-content-fetch-queue' +export const CONTENT_FETCH_QUEUE = 'omnivore-content-fetch-queue' +export const CONTENT_FETCH_SLOW_QUEUE = 'omnivore-content-fetch-slow-queue' +export const CONTENT_FETCH_RSS_QUEUE = 'omnivore-content-fetch-rss-queue' + export const JOB_VERSION = 'v001' const jobLatency = new client.Histogram({ diff --git a/packages/api/src/services/create_page_save_request.ts b/packages/api/src/services/create_page_save_request.ts index 3d9a8ae6d..1c2723a99 100644 --- a/packages/api/src/services/create_page_save_request.ts +++ b/packages/api/src/services/create_page_save_request.ts @@ -157,7 +157,6 @@ export const createPageSaveRequest = async ({ // get priority by checking rate limit if not specified priority = priority || (await getPriorityByRateLimit(userId)) - logger.debug('priority', { priority }) // enqueue task to parse item await enqueueFetchContentJob({ diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index fcaeb68f5..68905cd0e 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -66,7 +66,9 @@ import { UPLOAD_CONTENT_JOB, } from '../jobs/upload_content' import { - CONTENT_FETCH_QUEUE_NAME, + CONTENT_FETCH_QUEUE, + CONTENT_FETCH_RSS_QUEUE, + CONTENT_FETCH_SLOW_QUEUE, getQueue, JOB_VERSION, } from '../queue-processor' @@ -100,13 +102,12 @@ export const getJobPriority = (jobName: string): number => { case SYNC_READ_POSITIONS_JOB_NAME: case SEND_EMAIL_JOB: case UPDATE_HOME_JOB: - case `${FETCH_CONTENT_JOB}_high`: + case FETCH_CONTENT_JOB: return 1 case TRIGGER_RULE_JOB_NAME: case CALL_WEBHOOK_JOB_NAME: case AI_SUMMARIZE_JOB_NAME: case PROCESS_YOUTUBE_VIDEO_JOB_NAME: - case `${FETCH_CONTENT_JOB}_low`: return 5 case BULK_ACTION_JOB_NAME: case `${REFRESH_FEED_JOB_NAME}_high`: @@ -328,7 +329,7 @@ export const deleteTask = async ( } } -export interface fetchContentJobData { +export interface FetchContentJobData { url: string users: Array<{ id: string @@ -344,6 +345,7 @@ export interface fetchContentJobData { publishedAt?: string folder?: string rssFeedUrl?: string + source?: string } /** @@ -356,18 +358,29 @@ export interface fetchContentJobData { * @returns Name of the task created */ export const enqueueFetchContentJob = async ( - data: fetchContentJobData + data: FetchContentJobData ): Promise => { - const priority = data.priority || 'high' + const getQueueName = (data: FetchContentJobData) => { + if (data.rssFeedUrl) { + return CONTENT_FETCH_RSS_QUEUE + } - const queue = await getQueue(CONTENT_FETCH_QUEUE_NAME) + if (data.priority === 'low') { + return CONTENT_FETCH_SLOW_QUEUE + } + + return CONTENT_FETCH_QUEUE + } + + const queueName = getQueueName(data) + const queue = await getQueue(queueName) if (!queue) { throw new Error('No queue found') } const job = await queue.add(FETCH_CONTENT_JOB, data, { - priority: getJobPriority(`${FETCH_CONTENT_JOB}_${priority}`), - attempts: priority === 'high' ? 5 : 2, + priority: getJobPriority(FETCH_CONTENT_JOB), + attempts: data.priority === 'low' ? 2 : 5, }) if (!job || !job.id) { diff --git a/packages/content-fetch/Dockerfile-gcf b/packages/content-fetch/Dockerfile-gcf deleted file mode 100644 index 7691ab1ae..000000000 --- a/packages/content-fetch/Dockerfile-gcf +++ /dev/null @@ -1,53 +0,0 @@ -FROM node:18.16-alpine - -# Installs latest Chromium (92) package. -RUN apk add --no-cache \ - chromium \ - nss \ - freetype \ - harfbuzz \ - ca-certificates \ - ttf-freefont \ - nodejs \ - yarn \ - g++ \ - make \ - python3 - -WORKDIR /app - -ENV CHROMIUM_PATH /usr/bin/chromium-browser -ENV LAUNCH_HEADLESS=true -ENV PORT 9090 - -COPY package.json . -COPY yarn.lock . -COPY tsconfig.json . -COPY .prettierrc . -COPY .eslintrc . - -COPY /packages/content-fetch/package.json ./packages/content-fetch/package.json -COPY /packages/content-handler/package.json ./packages/content-handler/package.json -COPY /packages/puppeteer-parse/package.json ./packages/puppeteer-parse/package.json -COPY /packages/utils/package.json ./packages/utils/package.json - -RUN yarn install --pure-lockfile - -ADD /packages/content-handler ./packages/content-handler -ADD /packages/puppeteer-parse ./packages/puppeteer-parse -ADD /packages/content-fetch ./packages/content-fetch -ADD /packages/utils ./packages/utils -RUN yarn workspace @omnivore/utils build -RUN yarn workspace @omnivore/content-handler build -RUN yarn workspace @omnivore/puppeteer-parse build -RUN yarn workspace @omnivore/content-fetch build - -# After building, fetch the production dependencies -RUN rm -rf /app/packages/content-fetch/node_modules -RUN rm -rf /app/node_modules -RUN yarn install --pure-lockfile --production - -EXPOSE 9090 - -# USER pptruser -ENTRYPOINT ["yarn", "workspace", "@omnivore/content-fetch", "start_gcf"] diff --git a/packages/content-fetch/package.json b/packages/content-fetch/package.json index 087ded95f..82abe8bfb 100644 --- a/packages/content-fetch/package.json +++ b/packages/content-fetch/package.json @@ -7,11 +7,9 @@ "build/src" ], "dependencies": { - "@google-cloud/functions-framework": "^3.0.0", "@google-cloud/storage": "^7.0.1", "@omnivore/puppeteer-parse": "^1.0.0", "@omnivore/utils": "1.0.0", - "@sentry/serverless": "^7.77.0", "bullmq": "^5.1.1", "dotenv": "^8.2.0", "express": "^4.17.1", @@ -27,9 +25,7 @@ "test:typecheck": "tsc --noEmit", "lint": "eslint src --ext ts,js,tsx,jsx", "build": "tsc", - "start": "node build/src/app.js", - "start_gcf": "functions-framework --port=9090 --target=puppeteer", - "start_worker": "node build/src/worker.js" + "start": "node build/src/app.js" }, "volta": { "extends": "../../package.json" diff --git a/packages/content-fetch/src/app.ts b/packages/content-fetch/src/app.ts index f3b949dfd..3cf0e4982 100644 --- a/packages/content-fetch/src/app.ts +++ b/packages/content-fetch/src/app.ts @@ -1,34 +1,138 @@ -import 'dotenv/config' -import express from 'express' -import { contentFetchRequestHandler } from './request_handler' +import { RedisDataSource } from '@omnivore/utils' +import { JobType } from 'bullmq' +import express, { Express } from 'express' +import asyncHandler from 'express-async-handler' +import { createWorkers, getQueue } from './worker' -const app = express() +const main = () => { + console.log('[worker]: starting workers') -app.use(express.json()) -app.use(express.urlencoded({ extended: true })) + const app: Express = express() + const port = process.env.PORT || 3002 -if (!process.env.VERIFICATION_TOKEN) { - throw new Error('VERIFICATION_TOKEN environment variable is not set') + // create redis source + const redisDataSource = new RedisDataSource({ + cache: { + url: process.env.REDIS_URL, + cert: process.env.REDIS_CERT, + }, + mq: { + url: process.env.MQ_REDIS_URL, + cert: process.env.MQ_REDIS_CERT, + }, + }) + + const workers = createWorkers(redisDataSource) + + const closeWorkers = async () => { + await Promise.all( + workers.map(async (worker) => { + await worker.close() + console.log('worker closed:', worker.name) + }) + ) + } + + // respond healthy to auto-scaler. + app.get('/_ah/health', (req, res) => res.sendStatus(200)) + + app.get( + '/lifecycle/prestop', + asyncHandler(async (_req, res) => { + console.log('prestop lifecycle hook called.') + await closeWorkers() + console.log('workers closed') + + res.sendStatus(200) + }) + ) + + app.get( + '/metrics', + asyncHandler(async (_, res) => { + let output = '' + + for (const worker of workers) { + const queueName = worker.name + const queue = await getQueue( + redisDataSource.queueRedisClient, + queueName + ) + + const jobsTypes: Array = [ + 'active', + 'failed', + 'completed', + 'prioritized', + ] + const counts = await queue.getJobCounts(...jobsTypes) + + jobsTypes.forEach((metric) => { + output += `# TYPE omnivore_queue_messages_${metric} gauge\n` + output += `omnivore_queue_messages_${metric}{queue="${queueName}"} ${counts[metric]}\n` + }) + + // Export the age of the oldest prioritized job in the queue + const oldestJobs = await queue.getJobs(['prioritized'], 0, 1, true) + if (oldestJobs.length > 0) { + const currentTime = Date.now() + const ageInSeconds = (currentTime - oldestJobs[0].timestamp) / 1000 + output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` + output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${queueName}"} ${ageInSeconds}\n` + } else { + output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` + output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${queueName}"} ${0}\n` + } + } + + res.status(200).setHeader('Content-Type', 'text/plain').send(output) + }) + ) + + const server = app.listen(port, () => { + console.log(`[worker]: Workers started`) + }) + + const gracefulShutdown = async (signal: string) => { + console.log(`[worker]: Received ${signal}, closing server...`) + await new Promise((resolve) => { + server.close((err) => { + console.log('[worker]: Express server closed') + if (err) { + console.log('[worker]: error stopping server', { err }) + } + + resolve() + }) + }) + + await closeWorkers() + console.log('[worker]: Workers closed') + + await redisDataSource.shutdown() + console.log('[worker]: Redis connection closed') + + process.exit(0) + } + + // eslint-disable-next-line @typescript-eslint/no-misused-promises + process.on('SIGINT', () => gracefulShutdown('SIGINT')) + + // eslint-disable-next-line @typescript-eslint/no-misused-promises + process.on('SIGTERM', () => gracefulShutdown('SIGTERM')) + + process.on('uncaughtException', function (err) { + // Handle the error safely + console.error(err, 'Uncaught exception') + }) + + process.on('unhandledRejection', (reason, promise) => { + // Handle the error safely + console.error({ promise, reason }, 'Unhandled Rejection at: Promise') + }) } -app.get('/_ah/health', (req, res) => res.sendStatus(200)) - -app.all('/', (req, res, next) => { - if (req.method !== 'GET' && req.method !== 'POST') { - console.error('request method is not GET or POST') - return res.sendStatus(405) - } - - if (req.query.token !== process.env.VERIFICATION_TOKEN) { - console.error('query does not include valid token') - return res.sendStatus(403) - } - - return contentFetchRequestHandler(req, res, next) -}) - -const PORT = process.env.PORT ? parseInt(process.env.PORT) : 8080 -app.listen(PORT, () => { - console.log(`App listening on port ${PORT}`) - console.log('Press Ctrl+C to quit.') -}) +// only call main if the file was called from the CLI and wasn't required from another module +if (require.main === module) { + main() +} diff --git a/packages/content-fetch/src/index.ts b/packages/content-fetch/src/index.ts deleted file mode 100644 index d3bc341d1..000000000 --- a/packages/content-fetch/src/index.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { HttpFunction } from '@google-cloud/functions-framework' -import * as Sentry from '@sentry/serverless' -import 'dotenv/config' -import { contentFetchRequestHandler } from './request_handler' - -Sentry.GCPFunction.init({ - dsn: process.env.SENTRY_DSN, - tracesSampleRate: 0, -}) - -/** - * Cloud Function entry point, HTTP trigger. - * Loads the requested URL via Puppeteer, captures page content and sends it to backend - * - * @param {Object} req Cloud Function request context. - * @param {Object} res Cloud Function response context. - */ -export const puppeteer = Sentry.GCPFunction.wrapHttpFunction( - contentFetchRequestHandler as HttpFunction -) - -/** - * Cloud Function entry point, HTTP trigger. - * Loads the requested URL via Puppeteer and captures a screenshot of the provided element - * - * @param {Object} req Cloud Function request context. - * Inlcudes: - * * url - URL address of the page to open - * @param {Object} res Cloud Function response context. - */ -// exports.preview = Sentry.GCPFunction.wrapHttpFunction(preview); diff --git a/packages/content-fetch/src/request_handler.ts b/packages/content-fetch/src/request_handler.ts index 0fb6765e9..a698ac9e6 100644 --- a/packages/content-fetch/src/request_handler.ts +++ b/packages/content-fetch/src/request_handler.ts @@ -2,7 +2,6 @@ import { Storage } from '@google-cloud/storage' import { fetchContent } from '@omnivore/puppeteer-parse' import { RedisDataSource } from '@omnivore/utils' import 'dotenv/config' -import { RequestHandler } from 'express' import { analytics } from './analytics' import { queueSavePageJob } from './job' @@ -321,29 +320,3 @@ export const processFetchContentJob = async ( ) } } - -export const contentFetchRequestHandler: RequestHandler = async (req, res) => { - const data = req.body - - // create redis source - const redisDataSource = new RedisDataSource({ - cache: { - url: process.env.REDIS_URL, - cert: process.env.REDIS_CERT, - }, - mq: { - url: process.env.MQ_REDIS_URL, - cert: process.env.MQ_REDIS_CERT, - }, - }) - - try { - await processFetchContentJob(redisDataSource, data) - } catch (error) { - return res.sendStatus(500) - } finally { - await redisDataSource.shutdown() - } - - res.sendStatus(200) -} diff --git a/packages/content-fetch/src/worker.ts b/packages/content-fetch/src/worker.ts index 7f3dfc29b..7cb09c393 100644 --- a/packages/content-fetch/src/worker.ts +++ b/packages/content-fetch/src/worker.ts @@ -1,15 +1,17 @@ import { RedisDataSource } from '@omnivore/utils' -import { Job, JobType, Queue, QueueEvents, RedisClient, Worker } from 'bullmq' -import express, { Express } from 'express' -import asyncHandler from 'express-async-handler' +import { Job, Queue, QueueEvents, RedisClient, Worker } from 'bullmq' import { JobData, processFetchContentJob } from './request_handler' -const QUEUE_NAME = 'omnivore-content-fetch-queue' +const FAST_QUEUE = 'omnivore-content-fetch-queue' +const SLOW_QUEUE = 'omnivore-content-fetch-slow-queue' +const RSS_QUEUE = 'omnivore-content-fetch-rss-queue' +const QUEUE_NAMES = [FAST_QUEUE, SLOW_QUEUE, RSS_QUEUE] as const -export const getContentFetchQueue = async ( - connection: RedisClient +export const getQueue = async ( + connection: RedisClient, + queueName: string ): Promise => { - const queue = new Queue(QUEUE_NAME, { + const queue = new Queue(queueName, { connection, defaultJobOptions: { backoff: { @@ -28,9 +30,29 @@ export const getContentFetchQueue = async ( return queue } -const createWorker = (redisDataSource: RedisDataSource) => { - return new Worker( - QUEUE_NAME, +const createWorker = (redisDataSource: RedisDataSource, queueName: string) => { + const getLimiter = (queueName: string) => { + switch (queueName) { + case SLOW_QUEUE: + return { + max: 5, + duration: 1000, // 1 second + } + case RSS_QUEUE: + return { + max: 3, + duration: 1000, // 1 second + } + default: + return { + max: 10, + duration: 1000, // 1 second + } + } + } + + const worker = new Worker( + queueName, async (job: Job) => { // process the job await processFetchContentJob(redisDataSource, job.data) @@ -38,152 +60,39 @@ const createWorker = (redisDataSource: RedisDataSource) => { { connection: redisDataSource.queueRedisClient, autorun: true, // start processing jobs immediately - limiter: { - max: 50, - duration: 1000, // 1 second - }, + limiter: getLimiter(queueName), } ) -} -const main = () => { - console.log('[worker]: starting worker') - - const app: Express = express() - const port = process.env.PORT || 3002 - - // create redis source - const redisDataSource = new RedisDataSource({ - cache: { - url: process.env.REDIS_URL, - cert: process.env.REDIS_CERT, - }, - mq: { - url: process.env.MQ_REDIS_URL, - cert: process.env.MQ_REDIS_CERT, - }, + worker.on('error', (err) => { + console.error('worker error:', err) }) - // respond healthy to auto-scaler. - app.get('/_ah/health', (req, res) => res.sendStatus(200)) - - app.get( - '/lifecycle/prestop', - asyncHandler(async (_req, res) => { - console.log('prestop lifecycle hook called.') - await worker.close() - res.sendStatus(200) - }) - ) - - app.get( - '/metrics', - asyncHandler(async (_, res) => { - const queue = await getContentFetchQueue(redisDataSource.queueRedisClient) - if (!queue) { - res.sendStatus(400) - return - } - - let output = '' - const jobsTypes: JobType[] = [ - 'active', - 'failed', - 'completed', - 'prioritized', - ] - const counts = await queue.getJobCounts(...jobsTypes) - - jobsTypes.forEach((metric) => { - output += `# TYPE omnivore_queue_messages_${metric} gauge\n` - output += `omnivore_queue_messages_${metric}{queue="${QUEUE_NAME}"} ${counts[metric]}\n` - }) - - // Export the age of the oldest prioritized job in the queue - const oldestJobs = await queue.getJobs(['prioritized'], 0, 1, true) - if (oldestJobs.length > 0) { - const currentTime = Date.now() - const ageInSeconds = (currentTime - oldestJobs[0].timestamp) / 1000 - output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` - output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE_NAME}"} ${ageInSeconds}\n` - } else { - output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` - output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE_NAME}"} ${0}\n` - } - - res.status(200).setHeader('Content-Type', 'text/plain').send(output) - }) - ) - - const server = app.listen(port, () => { - console.log(`[worker]: started`) - }) - - const worker = createWorker(redisDataSource) - - const queueEvents = new QueueEvents(QUEUE_NAME, { + const queueEvents = new QueueEvents(queueName, { connection: redisDataSource.queueRedisClient, }) queueEvents.on('added', (job) => { - console.log('added job: ', job.jobId, job.name) + console.log('added job:', job.jobId, job.name) }) queueEvents.on('removed', (job) => { - console.log('removed job: ', job.jobId) + console.log('removed job:', job.jobId) }) queueEvents.on('completed', (job) => { - console.log('completed job: ', job.jobId) + console.log('completed job:', job.jobId) }) queueEvents.on('failed', (job) => { - console.log('failed job: ', job.jobId) + console.log('failed job:', job.jobId) }) - const gracefulShutdown = async (signal: string) => { - console.log(`[worker]: Received ${signal}, closing server...`) - await new Promise((resolve) => { - server.close((err) => { - console.log('[worker]: Express server closed') - if (err) { - console.log('[worker]: error stopping server', { err }) - } - - resolve() - }) - }) - - await worker.close() - console.log('[worker]: Worker closed') - - await redisDataSource.shutdown() - console.log('[worker]: Redis connection closed') - - process.exit(0) - } - - const handleShutdown = (signal: string) => { - return () => { - void gracefulShutdown(signal) - } - } - - process.on('SIGTERM', handleShutdown('SIGTERM')) - process.on('SIGINT', handleShutdown('SIGINT')) - - process.on('uncaughtException', (error) => { - console.error('Uncaught Exception:', error) - handleShutdown('uncaughtException') - }) - - process.on('unhandledRejection', (reason, promise) => { - console.error('Unhandled Rejection at:', promise, 'reason:', reason) - handleShutdown('unhandledRejection') - }) + return worker } -// only call main if the file was called from the CLI and wasn't required from another module -if (require.main === module) { - main() +export const createWorkers = (redisDataSource: RedisDataSource) => { + return QUEUE_NAMES.map((queueName) => + createWorker(redisDataSource, queueName) + ) } diff --git a/yarn.lock b/yarn.lock index 4cc38bb0e..460282576 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2799,7 +2799,7 @@ google-gax "^3.5.7" protobufjs "^7.2.5" -"@google-cloud/functions-framework@3.1.2", "@google-cloud/functions-framework@^3.0.0": +"@google-cloud/functions-framework@3.1.2": version "3.1.2" resolved "https://registry.yarnpkg.com/@google-cloud/functions-framework/-/functions-framework-3.1.2.tgz#2cd92ce4307bf7f32555d028dca22e398473b410" integrity sha512-pYvEH65/Rqh1JNPdcBmorcV7Xoom2/iOSmbtYza8msro7Inl+qOYxbyMiQfySD2gwAyn38WyWPRqsDRcf/BFLg==