diff --git a/packages/content-fetch/package.json b/packages/content-fetch/package.json index 28f70ed19..087ded95f 100644 --- a/packages/content-fetch/package.json +++ b/packages/content-fetch/package.json @@ -7,15 +7,16 @@ "build/src" ], "dependencies": { - "bullmq": "^5.1.1", - "dotenv": "^8.2.0", - "express": "^4.17.1", - "posthog-node": "^3.6.3", "@google-cloud/functions-framework": "^3.0.0", "@google-cloud/storage": "^7.0.1", "@omnivore/puppeteer-parse": "^1.0.0", "@omnivore/utils": "1.0.0", - "@sentry/serverless": "^7.77.0" + "@sentry/serverless": "^7.77.0", + "bullmq": "^5.1.1", + "dotenv": "^8.2.0", + "express": "^4.17.1", + "express-async-handler": "^1.2.0", + "posthog-node": "^3.6.3" }, "devDependencies": { "chai": "^4.3.6", @@ -27,7 +28,8 @@ "lint": "eslint src --ext ts,js,tsx,jsx", "build": "tsc", "start": "node build/src/app.js", - "start_gcf": "functions-framework --port=9090 --target=puppeteer" + "start_gcf": "functions-framework --port=9090 --target=puppeteer", + "start_worker": "node build/src/worker.js" }, "volta": { "extends": "../../package.json" diff --git a/packages/content-fetch/src/request_handler.ts b/packages/content-fetch/src/request_handler.ts index 6a427434d..0fb6765e9 100644 --- a/packages/content-fetch/src/request_handler.ts +++ b/packages/content-fetch/src/request_handler.ts @@ -12,7 +12,7 @@ interface UserConfig { folder?: string } -interface RequestBody { +export interface JobData { url: string userId?: string saveRequestId: string @@ -175,36 +175,37 @@ const incrementContentFetchFailure = async ( } } -export const contentFetchRequestHandler: RequestHandler = async (req, res) => { +export const processFetchContentJob = async ( + redisDataSource: RedisDataSource, + data: JobData +) => { const functionStartTime = Date.now() - const body = req.body - // users is used when saving article for multiple users - let users = body.users || [] - const userId = body.userId + let users = data.users || [] + const userId = data.userId // userId is used when saving article for a single user if (userId) { users = [ { id: userId, - folder: body.folder, - libraryItemId: body.saveRequestId, + folder: data.folder, + libraryItemId: data.saveRequestId, }, ] } - const articleSavingRequestId = body.saveRequestId - const state = body.state - const labels = body.labels - const source = body.source || 'puppeteer-parse' - const taskId = body.taskId // taskId is used to update import status - const url = body.url - const locale = body.locale - const timezone = body.timezone - const rssFeedUrl = body.rssFeedUrl - const savedAt = body.savedAt - const publishedAt = body.publishedAt - const priority = body.priority + const articleSavingRequestId = data.saveRequestId + const state = data.state + const labels = data.labels + const source = data.source || 'puppeteer-parse' + const taskId = data.taskId // taskId is used to update import status + const url = data.url + const locale = data.locale + const timezone = data.timezone + const rssFeedUrl = data.rssFeedUrl + const savedAt = data.savedAt + const publishedAt = data.publishedAt + const priority = data.priority const logRecord: LogRecord = { url, @@ -225,25 +226,13 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { console.log(`Article parsing request`, logRecord) - // create redis source - const redisDataSource = new RedisDataSource({ - cache: { - url: process.env.REDIS_URL, - cert: process.env.REDIS_CERT, - }, - mq: { - url: process.env.MQ_REDIS_URL, - cert: process.env.MQ_REDIS_CERT, - }, - }) - try { const domain = new URL(url).hostname const isBlocked = await isDomainBlocked(redisDataSource, domain) if (isBlocked) { console.log('domain is blocked', domain) - return res.sendStatus(200) + return } const key = cacheKey(url, locale, timezone) @@ -312,7 +301,7 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { logRecord.error = 'unknown error' } - return res.sendStatus(500) + throw error } finally { logRecord.totalTime = Date.now() - functionStartTime console.log(`parse-page result`, logRecord) @@ -330,7 +319,29 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { }, } ) + } +} +export const contentFetchRequestHandler: RequestHandler = async (req, res) => { + const data = req.body + + // create redis source + const redisDataSource = new RedisDataSource({ + cache: { + url: process.env.REDIS_URL, + cert: process.env.REDIS_CERT, + }, + mq: { + url: process.env.MQ_REDIS_URL, + cert: process.env.MQ_REDIS_CERT, + }, + }) + + try { + await processFetchContentJob(redisDataSource, data) + } catch (error) { + return res.sendStatus(500) + } finally { await redisDataSource.shutdown() } diff --git a/packages/content-fetch/src/worker.ts b/packages/content-fetch/src/worker.ts new file mode 100644 index 000000000..72fe6a736 --- /dev/null +++ b/packages/content-fetch/src/worker.ts @@ -0,0 +1,186 @@ +import { RedisDataSource } from '@omnivore/utils' +import { Job, JobType, Queue, QueueEvents, RedisClient, Worker } from 'bullmq' +import express, { Express } from 'express' +import asyncHandler from 'express-async-handler' +import { JobData, processFetchContentJob } from './request_handler' + +const QUEUE_NAME = 'omnivore-content-fetch-queue' + +export const getContentFetchQueue = async ( + connection: RedisClient +): Promise => { + const queue = new Queue(QUEUE_NAME, { + connection, + defaultJobOptions: { + backoff: { + type: 'exponential', + delay: 2000, // 2 seconds + }, + removeOnComplete: { + age: 24 * 3600, // keep up to 24 hours + }, + removeOnFail: { + age: 7 * 24 * 3600, // keep up to 7 days + }, + }, + }) + await queue.waitUntilReady() + return queue +} + +const createWorker = (redisDataSource: RedisDataSource) => { + return new Worker( + QUEUE_NAME, + async (job: Job) => { + // process the job + await processFetchContentJob(redisDataSource, job.data) + }, + { + connection: redisDataSource.queueRedisClient, + autorun: true, // start processing jobs immediately + lockDuration: 60_000, // 1 minute + } + ) +} + +const main = () => { + console.log('[worker]: starting worker') + + const app: Express = express() + const port = process.env.PORT || 3002 + + // create redis source + const redisDataSource = new RedisDataSource({ + cache: { + url: process.env.REDIS_URL, + cert: process.env.REDIS_CERT, + }, + mq: { + url: process.env.MQ_REDIS_URL, + cert: process.env.MQ_REDIS_CERT, + }, + }) + + // respond healthy to auto-scaler. + app.get('/_ah/health', (req, res) => res.sendStatus(200)) + + app.get( + '/lifecycle/prestop', + asyncHandler(async (_req, res) => { + console.log('prestop lifecycle hook called.') + await worker.close() + res.sendStatus(200) + }) + ) + + app.get( + '/metrics', + asyncHandler(async (_, res) => { + const queue = await getContentFetchQueue(redisDataSource.queueRedisClient) + if (!queue) { + res.sendStatus(400) + return + } + + let output = '' + const jobsTypes: JobType[] = [ + 'active', + 'failed', + 'completed', + 'prioritized', + ] + const counts = await queue.getJobCounts(...jobsTypes) + + jobsTypes.forEach((metric) => { + output += `# TYPE omnivore_queue_messages_${metric} gauge\n` + output += `omnivore_queue_messages_${metric}{queue="${QUEUE_NAME}"} ${counts[metric]}\n` + }) + + // Export the age of the oldest prioritized job in the queue + const oldestJobs = await queue.getJobs(['prioritized'], 0, 1, true) + if (oldestJobs.length > 0) { + const currentTime = Date.now() + const ageInSeconds = (currentTime - oldestJobs[0].timestamp) / 1000 + output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` + output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE_NAME}"} ${ageInSeconds}\n` + } else { + output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` + output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE_NAME}"} ${0}\n` + } + + res.status(200).setHeader('Content-Type', 'text/plain').send(output) + }) + ) + + const server = app.listen(port, () => { + console.log(`[worker]: started`) + }) + + const worker = createWorker(redisDataSource) + + const queueEvents = new QueueEvents(QUEUE_NAME, { + connection: redisDataSource.queueRedisClient, + }) + + queueEvents.on('added', (job) => { + console.log('added job: ', job.jobId, job.name) + }) + + queueEvents.on('removed', (job) => { + console.log('removed job: ', job.jobId) + }) + + queueEvents.on('completed', (job) => { + console.log('completed job: ', job.jobId) + }) + + queueEvents.on('failed', (job) => { + console.log('failed job: ', job.jobId) + }) + + const gracefulShutdown = async (signal: string) => { + console.log(`[worker]: Received ${signal}, closing server...`) + await new Promise((resolve) => { + server.close((err) => { + console.log('[worker]: Express server closed') + if (err) { + console.log('[worker]: error stopping server', { err }) + } + + resolve() + }) + }) + + await worker.close() + console.log('[worker]: Worker closed') + + await redisDataSource.shutdown() + console.log('[worker]: Redis connection closed') + + process.exit(0) + } + + const handleShutdown = (signal: string) => { + return () => { + void gracefulShutdown(signal) + } + } + + process.on('SIGTERM', handleShutdown('SIGTERM')) + process.on('SIGINT', handleShutdown('SIGINT')) + + process.on('uncaughtException', (error) => { + console.error('Uncaught Exception:', error) + handleShutdown('uncaughtException') + }) + + process.on('unhandledRejection', (reason, promise) => { + console.error('Unhandled Rejection at:', promise, 'reason:', reason) + handleShutdown('unhandledRejection') + }) +} + +// only call main if the file was called from the CLI and wasn't required from another module +if (require.main === module) { + main() +} diff --git a/yarn.lock b/yarn.lock index 7dae9e991..4cc38bb0e 100644 --- a/yarn.lock +++ b/yarn.lock @@ -15721,6 +15721,11 @@ expr-eval@^2.0.2: resolved "https://registry.yarnpkg.com/expr-eval/-/expr-eval-2.0.2.tgz#fa6f044a7b0c93fde830954eb9c5b0f7fbc7e201" integrity sha512-4EMSHGOPSwAfBiibw3ndnP0AvjDWLsMvGOvWEZ2F96IGk0bIVdjQisOHxReSkE13mHcfbuCiXw+G4y0zv6N8Eg== +express-async-handler@^1.2.0: + version "1.2.0" + resolved "https://registry.yarnpkg.com/express-async-handler/-/express-async-handler-1.2.0.tgz#ffc9896061d90f8d2e71a2d2b8668db5b0934391" + integrity sha512-rCSVtPXRmQSW8rmik/AIb2P0op6l7r1fMW538yyvTMltCO4xQEWMmobfrIxN2V1/mVrgxB8Az3reYF6yUZw37w== + express-http-context2@^1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/express-http-context2/-/express-http-context2-1.0.0.tgz#58cd9fb0d233739e0dcd7aabb766d1dc74522d77"