use different queues for fast,slow and rss content fetch jobs

This commit is contained in:
Hongbo Wu
2024-07-25 18:47:42 +08:00
parent 016775aadb
commit 08fbb8aebf
11 changed files with 212 additions and 295 deletions

View File

@ -13,7 +13,10 @@ import {
updateSubscriptions, updateSubscriptions,
} from '../../services/update_subscription' } from '../../services/update_subscription'
import { findActiveUser } from '../../services/user' import { findActiveUser } from '../../services/user'
import { enqueueFetchContentJob } from '../../utils/createTask' import {
enqueueFetchContentJob,
FetchContentJobData,
} from '../../utils/createTask'
import { cleanUrl } from '../../utils/helpers' import { cleanUrl } from '../../utils/helpers'
import { createThumbnailProxyUrl } from '../../utils/imageproxy' import { createThumbnailProxyUrl } from '../../utils/imageproxy'
import { logger } from '../../utils/logger' import { logger } from '../../utils/logger'
@ -331,7 +334,7 @@ const fetchContentAndCreateItem = async (
feedUrl: string, feedUrl: string,
item: RssFeedItem item: RssFeedItem
) => { ) => {
const data = { const data: FetchContentJobData = {
users, users,
source: 'rss-feeder', source: 'rss-feeder',
url: item.link.trim(), url: item.link.trim(),
@ -339,6 +342,7 @@ const fetchContentAndCreateItem = async (
rssFeedUrl: feedUrl, rssFeedUrl: feedUrl,
savedAt: item.isoDate, savedAt: item.isoDate,
publishedAt: item.isoDate, publishedAt: item.isoDate,
priority: 'low',
} }
try { try {

View File

@ -83,7 +83,10 @@ import { getJobPriority } from './utils/createTask'
import { logger } from './utils/logger' import { logger } from './utils/logger'
export const BACKEND_QUEUE_NAME = 'omnivore-backend-queue' export const BACKEND_QUEUE_NAME = 'omnivore-backend-queue'
export const CONTENT_FETCH_QUEUE_NAME = 'omnivore-content-fetch-queue' export const CONTENT_FETCH_QUEUE = 'omnivore-content-fetch-queue'
export const CONTENT_FETCH_SLOW_QUEUE = 'omnivore-content-fetch-slow-queue'
export const CONTENT_FETCH_RSS_QUEUE = 'omnivore-content-fetch-rss-queue'
export const JOB_VERSION = 'v001' export const JOB_VERSION = 'v001'
const jobLatency = new client.Histogram({ const jobLatency = new client.Histogram({

View File

@ -157,7 +157,6 @@ export const createPageSaveRequest = async ({
// get priority by checking rate limit if not specified // get priority by checking rate limit if not specified
priority = priority || (await getPriorityByRateLimit(userId)) priority = priority || (await getPriorityByRateLimit(userId))
logger.debug('priority', { priority })
// enqueue task to parse item // enqueue task to parse item
await enqueueFetchContentJob({ await enqueueFetchContentJob({

View File

@ -66,7 +66,9 @@ import {
UPLOAD_CONTENT_JOB, UPLOAD_CONTENT_JOB,
} from '../jobs/upload_content' } from '../jobs/upload_content'
import { import {
CONTENT_FETCH_QUEUE_NAME, CONTENT_FETCH_QUEUE,
CONTENT_FETCH_RSS_QUEUE,
CONTENT_FETCH_SLOW_QUEUE,
getQueue, getQueue,
JOB_VERSION, JOB_VERSION,
} from '../queue-processor' } from '../queue-processor'
@ -100,13 +102,12 @@ export const getJobPriority = (jobName: string): number => {
case SYNC_READ_POSITIONS_JOB_NAME: case SYNC_READ_POSITIONS_JOB_NAME:
case SEND_EMAIL_JOB: case SEND_EMAIL_JOB:
case UPDATE_HOME_JOB: case UPDATE_HOME_JOB:
case `${FETCH_CONTENT_JOB}_high`: case FETCH_CONTENT_JOB:
return 1 return 1
case TRIGGER_RULE_JOB_NAME: case TRIGGER_RULE_JOB_NAME:
case CALL_WEBHOOK_JOB_NAME: case CALL_WEBHOOK_JOB_NAME:
case AI_SUMMARIZE_JOB_NAME: case AI_SUMMARIZE_JOB_NAME:
case PROCESS_YOUTUBE_VIDEO_JOB_NAME: case PROCESS_YOUTUBE_VIDEO_JOB_NAME:
case `${FETCH_CONTENT_JOB}_low`:
return 5 return 5
case BULK_ACTION_JOB_NAME: case BULK_ACTION_JOB_NAME:
case `${REFRESH_FEED_JOB_NAME}_high`: case `${REFRESH_FEED_JOB_NAME}_high`:
@ -328,7 +329,7 @@ export const deleteTask = async (
} }
} }
export interface fetchContentJobData { export interface FetchContentJobData {
url: string url: string
users: Array<{ users: Array<{
id: string id: string
@ -344,6 +345,7 @@ export interface fetchContentJobData {
publishedAt?: string publishedAt?: string
folder?: string folder?: string
rssFeedUrl?: string rssFeedUrl?: string
source?: string
} }
/** /**
@ -356,18 +358,29 @@ export interface fetchContentJobData {
* @returns Name of the task created * @returns Name of the task created
*/ */
export const enqueueFetchContentJob = async ( export const enqueueFetchContentJob = async (
data: fetchContentJobData data: FetchContentJobData
): Promise<string> => { ): Promise<string> => {
const priority = data.priority || 'high' const getQueueName = (data: FetchContentJobData) => {
if (data.rssFeedUrl) {
return CONTENT_FETCH_RSS_QUEUE
}
const queue = await getQueue(CONTENT_FETCH_QUEUE_NAME) if (data.priority === 'low') {
return CONTENT_FETCH_SLOW_QUEUE
}
return CONTENT_FETCH_QUEUE
}
const queueName = getQueueName(data)
const queue = await getQueue(queueName)
if (!queue) { if (!queue) {
throw new Error('No queue found') throw new Error('No queue found')
} }
const job = await queue.add(FETCH_CONTENT_JOB, data, { const job = await queue.add(FETCH_CONTENT_JOB, data, {
priority: getJobPriority(`${FETCH_CONTENT_JOB}_${priority}`), priority: getJobPriority(FETCH_CONTENT_JOB),
attempts: priority === 'high' ? 5 : 2, attempts: data.priority === 'low' ? 2 : 5,
}) })
if (!job || !job.id) { if (!job || !job.id) {

View File

@ -1,53 +0,0 @@
FROM node:18.16-alpine
# Installs latest Chromium (92) package.
RUN apk add --no-cache \
chromium \
nss \
freetype \
harfbuzz \
ca-certificates \
ttf-freefont \
nodejs \
yarn \
g++ \
make \
python3
WORKDIR /app
ENV CHROMIUM_PATH /usr/bin/chromium-browser
ENV LAUNCH_HEADLESS=true
ENV PORT 9090
COPY package.json .
COPY yarn.lock .
COPY tsconfig.json .
COPY .prettierrc .
COPY .eslintrc .
COPY /packages/content-fetch/package.json ./packages/content-fetch/package.json
COPY /packages/content-handler/package.json ./packages/content-handler/package.json
COPY /packages/puppeteer-parse/package.json ./packages/puppeteer-parse/package.json
COPY /packages/utils/package.json ./packages/utils/package.json
RUN yarn install --pure-lockfile
ADD /packages/content-handler ./packages/content-handler
ADD /packages/puppeteer-parse ./packages/puppeteer-parse
ADD /packages/content-fetch ./packages/content-fetch
ADD /packages/utils ./packages/utils
RUN yarn workspace @omnivore/utils build
RUN yarn workspace @omnivore/content-handler build
RUN yarn workspace @omnivore/puppeteer-parse build
RUN yarn workspace @omnivore/content-fetch build
# After building, fetch the production dependencies
RUN rm -rf /app/packages/content-fetch/node_modules
RUN rm -rf /app/node_modules
RUN yarn install --pure-lockfile --production
EXPOSE 9090
# USER pptruser
ENTRYPOINT ["yarn", "workspace", "@omnivore/content-fetch", "start_gcf"]

View File

@ -7,11 +7,9 @@
"build/src" "build/src"
], ],
"dependencies": { "dependencies": {
"@google-cloud/functions-framework": "^3.0.0",
"@google-cloud/storage": "^7.0.1", "@google-cloud/storage": "^7.0.1",
"@omnivore/puppeteer-parse": "^1.0.0", "@omnivore/puppeteer-parse": "^1.0.0",
"@omnivore/utils": "1.0.0", "@omnivore/utils": "1.0.0",
"@sentry/serverless": "^7.77.0",
"bullmq": "^5.1.1", "bullmq": "^5.1.1",
"dotenv": "^8.2.0", "dotenv": "^8.2.0",
"express": "^4.17.1", "express": "^4.17.1",
@ -27,9 +25,7 @@
"test:typecheck": "tsc --noEmit", "test:typecheck": "tsc --noEmit",
"lint": "eslint src --ext ts,js,tsx,jsx", "lint": "eslint src --ext ts,js,tsx,jsx",
"build": "tsc", "build": "tsc",
"start": "node build/src/app.js", "start": "node build/src/app.js"
"start_gcf": "functions-framework --port=9090 --target=puppeteer",
"start_worker": "node build/src/worker.js"
}, },
"volta": { "volta": {
"extends": "../../package.json" "extends": "../../package.json"

View File

@ -1,34 +1,138 @@
import 'dotenv/config' import { RedisDataSource } from '@omnivore/utils'
import express from 'express' import { JobType } from 'bullmq'
import { contentFetchRequestHandler } from './request_handler' import express, { Express } from 'express'
import asyncHandler from 'express-async-handler'
import { createWorkers, getQueue } from './worker'
const app = express() const main = () => {
console.log('[worker]: starting workers')
app.use(express.json()) const app: Express = express()
app.use(express.urlencoded({ extended: true })) const port = process.env.PORT || 3002
if (!process.env.VERIFICATION_TOKEN) { // create redis source
throw new Error('VERIFICATION_TOKEN environment variable is not set') const redisDataSource = new RedisDataSource({
cache: {
url: process.env.REDIS_URL,
cert: process.env.REDIS_CERT,
},
mq: {
url: process.env.MQ_REDIS_URL,
cert: process.env.MQ_REDIS_CERT,
},
})
const workers = createWorkers(redisDataSource)
const closeWorkers = async () => {
await Promise.all(
workers.map(async (worker) => {
await worker.close()
console.log('worker closed:', worker.name)
})
)
}
// respond healthy to auto-scaler.
app.get('/_ah/health', (req, res) => res.sendStatus(200))
app.get(
'/lifecycle/prestop',
asyncHandler(async (_req, res) => {
console.log('prestop lifecycle hook called.')
await closeWorkers()
console.log('workers closed')
res.sendStatus(200)
})
)
app.get(
'/metrics',
asyncHandler(async (_, res) => {
let output = ''
for (const worker of workers) {
const queueName = worker.name
const queue = await getQueue(
redisDataSource.queueRedisClient,
queueName
)
const jobsTypes: Array<JobType> = [
'active',
'failed',
'completed',
'prioritized',
]
const counts = await queue.getJobCounts(...jobsTypes)
jobsTypes.forEach((metric) => {
output += `# TYPE omnivore_queue_messages_${metric} gauge\n`
output += `omnivore_queue_messages_${metric}{queue="${queueName}"} ${counts[metric]}\n`
})
// Export the age of the oldest prioritized job in the queue
const oldestJobs = await queue.getJobs(['prioritized'], 0, 1, true)
if (oldestJobs.length > 0) {
const currentTime = Date.now()
const ageInSeconds = (currentTime - oldestJobs[0].timestamp) / 1000
output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n`
output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${queueName}"} ${ageInSeconds}\n`
} else {
output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n`
output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${queueName}"} ${0}\n`
}
}
res.status(200).setHeader('Content-Type', 'text/plain').send(output)
})
)
const server = app.listen(port, () => {
console.log(`[worker]: Workers started`)
})
const gracefulShutdown = async (signal: string) => {
console.log(`[worker]: Received ${signal}, closing server...`)
await new Promise<void>((resolve) => {
server.close((err) => {
console.log('[worker]: Express server closed')
if (err) {
console.log('[worker]: error stopping server', { err })
}
resolve()
})
})
await closeWorkers()
console.log('[worker]: Workers closed')
await redisDataSource.shutdown()
console.log('[worker]: Redis connection closed')
process.exit(0)
}
// eslint-disable-next-line @typescript-eslint/no-misused-promises
process.on('SIGINT', () => gracefulShutdown('SIGINT'))
// eslint-disable-next-line @typescript-eslint/no-misused-promises
process.on('SIGTERM', () => gracefulShutdown('SIGTERM'))
process.on('uncaughtException', function (err) {
// Handle the error safely
console.error(err, 'Uncaught exception')
})
process.on('unhandledRejection', (reason, promise) => {
// Handle the error safely
console.error({ promise, reason }, 'Unhandled Rejection at: Promise')
})
} }
app.get('/_ah/health', (req, res) => res.sendStatus(200)) // only call main if the file was called from the CLI and wasn't required from another module
if (require.main === module) {
app.all('/', (req, res, next) => { main()
if (req.method !== 'GET' && req.method !== 'POST') { }
console.error('request method is not GET or POST')
return res.sendStatus(405)
}
if (req.query.token !== process.env.VERIFICATION_TOKEN) {
console.error('query does not include valid token')
return res.sendStatus(403)
}
return contentFetchRequestHandler(req, res, next)
})
const PORT = process.env.PORT ? parseInt(process.env.PORT) : 8080
app.listen(PORT, () => {
console.log(`App listening on port ${PORT}`)
console.log('Press Ctrl+C to quit.')
})

View File

@ -1,31 +0,0 @@
import { HttpFunction } from '@google-cloud/functions-framework'
import * as Sentry from '@sentry/serverless'
import 'dotenv/config'
import { contentFetchRequestHandler } from './request_handler'
Sentry.GCPFunction.init({
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 0,
})
/**
* Cloud Function entry point, HTTP trigger.
* Loads the requested URL via Puppeteer, captures page content and sends it to backend
*
* @param {Object} req Cloud Function request context.
* @param {Object} res Cloud Function response context.
*/
export const puppeteer = Sentry.GCPFunction.wrapHttpFunction(
contentFetchRequestHandler as HttpFunction
)
/**
* Cloud Function entry point, HTTP trigger.
* Loads the requested URL via Puppeteer and captures a screenshot of the provided element
*
* @param {Object} req Cloud Function request context.
* Inlcudes:
* * url - URL address of the page to open
* @param {Object} res Cloud Function response context.
*/
// exports.preview = Sentry.GCPFunction.wrapHttpFunction(preview);

View File

@ -2,7 +2,6 @@ import { Storage } from '@google-cloud/storage'
import { fetchContent } from '@omnivore/puppeteer-parse' import { fetchContent } from '@omnivore/puppeteer-parse'
import { RedisDataSource } from '@omnivore/utils' import { RedisDataSource } from '@omnivore/utils'
import 'dotenv/config' import 'dotenv/config'
import { RequestHandler } from 'express'
import { analytics } from './analytics' import { analytics } from './analytics'
import { queueSavePageJob } from './job' import { queueSavePageJob } from './job'
@ -321,29 +320,3 @@ export const processFetchContentJob = async (
) )
} }
} }
export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
const data = <JobData>req.body
// create redis source
const redisDataSource = new RedisDataSource({
cache: {
url: process.env.REDIS_URL,
cert: process.env.REDIS_CERT,
},
mq: {
url: process.env.MQ_REDIS_URL,
cert: process.env.MQ_REDIS_CERT,
},
})
try {
await processFetchContentJob(redisDataSource, data)
} catch (error) {
return res.sendStatus(500)
} finally {
await redisDataSource.shutdown()
}
res.sendStatus(200)
}

View File

@ -1,15 +1,17 @@
import { RedisDataSource } from '@omnivore/utils' import { RedisDataSource } from '@omnivore/utils'
import { Job, JobType, Queue, QueueEvents, RedisClient, Worker } from 'bullmq' import { Job, Queue, QueueEvents, RedisClient, Worker } from 'bullmq'
import express, { Express } from 'express'
import asyncHandler from 'express-async-handler'
import { JobData, processFetchContentJob } from './request_handler' import { JobData, processFetchContentJob } from './request_handler'
const QUEUE_NAME = 'omnivore-content-fetch-queue' const FAST_QUEUE = 'omnivore-content-fetch-queue'
const SLOW_QUEUE = 'omnivore-content-fetch-slow-queue'
const RSS_QUEUE = 'omnivore-content-fetch-rss-queue'
const QUEUE_NAMES = [FAST_QUEUE, SLOW_QUEUE, RSS_QUEUE] as const
export const getContentFetchQueue = async ( export const getQueue = async (
connection: RedisClient connection: RedisClient,
queueName: string
): Promise<Queue> => { ): Promise<Queue> => {
const queue = new Queue(QUEUE_NAME, { const queue = new Queue(queueName, {
connection, connection,
defaultJobOptions: { defaultJobOptions: {
backoff: { backoff: {
@ -28,9 +30,29 @@ export const getContentFetchQueue = async (
return queue return queue
} }
const createWorker = (redisDataSource: RedisDataSource) => { const createWorker = (redisDataSource: RedisDataSource, queueName: string) => {
return new Worker( const getLimiter = (queueName: string) => {
QUEUE_NAME, switch (queueName) {
case SLOW_QUEUE:
return {
max: 5,
duration: 1000, // 1 second
}
case RSS_QUEUE:
return {
max: 3,
duration: 1000, // 1 second
}
default:
return {
max: 10,
duration: 1000, // 1 second
}
}
}
const worker = new Worker(
queueName,
async (job: Job<JobData>) => { async (job: Job<JobData>) => {
// process the job // process the job
await processFetchContentJob(redisDataSource, job.data) await processFetchContentJob(redisDataSource, job.data)
@ -38,152 +60,39 @@ const createWorker = (redisDataSource: RedisDataSource) => {
{ {
connection: redisDataSource.queueRedisClient, connection: redisDataSource.queueRedisClient,
autorun: true, // start processing jobs immediately autorun: true, // start processing jobs immediately
limiter: { limiter: getLimiter(queueName),
max: 50,
duration: 1000, // 1 second
},
} }
) )
}
const main = () => { worker.on('error', (err) => {
console.log('[worker]: starting worker') console.error('worker error:', err)
const app: Express = express()
const port = process.env.PORT || 3002
// create redis source
const redisDataSource = new RedisDataSource({
cache: {
url: process.env.REDIS_URL,
cert: process.env.REDIS_CERT,
},
mq: {
url: process.env.MQ_REDIS_URL,
cert: process.env.MQ_REDIS_CERT,
},
}) })
// respond healthy to auto-scaler. const queueEvents = new QueueEvents(queueName, {
app.get('/_ah/health', (req, res) => res.sendStatus(200))
app.get(
'/lifecycle/prestop',
asyncHandler(async (_req, res) => {
console.log('prestop lifecycle hook called.')
await worker.close()
res.sendStatus(200)
})
)
app.get(
'/metrics',
asyncHandler(async (_, res) => {
const queue = await getContentFetchQueue(redisDataSource.queueRedisClient)
if (!queue) {
res.sendStatus(400)
return
}
let output = ''
const jobsTypes: JobType[] = [
'active',
'failed',
'completed',
'prioritized',
]
const counts = await queue.getJobCounts(...jobsTypes)
jobsTypes.forEach((metric) => {
output += `# TYPE omnivore_queue_messages_${metric} gauge\n`
output += `omnivore_queue_messages_${metric}{queue="${QUEUE_NAME}"} ${counts[metric]}\n`
})
// Export the age of the oldest prioritized job in the queue
const oldestJobs = await queue.getJobs(['prioritized'], 0, 1, true)
if (oldestJobs.length > 0) {
const currentTime = Date.now()
const ageInSeconds = (currentTime - oldestJobs[0].timestamp) / 1000
output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n`
output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE_NAME}"} ${ageInSeconds}\n`
} else {
output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n`
output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE_NAME}"} ${0}\n`
}
res.status(200).setHeader('Content-Type', 'text/plain').send(output)
})
)
const server = app.listen(port, () => {
console.log(`[worker]: started`)
})
const worker = createWorker(redisDataSource)
const queueEvents = new QueueEvents(QUEUE_NAME, {
connection: redisDataSource.queueRedisClient, connection: redisDataSource.queueRedisClient,
}) })
queueEvents.on('added', (job) => { queueEvents.on('added', (job) => {
console.log('added job: ', job.jobId, job.name) console.log('added job:', job.jobId, job.name)
}) })
queueEvents.on('removed', (job) => { queueEvents.on('removed', (job) => {
console.log('removed job: ', job.jobId) console.log('removed job:', job.jobId)
}) })
queueEvents.on('completed', (job) => { queueEvents.on('completed', (job) => {
console.log('completed job: ', job.jobId) console.log('completed job:', job.jobId)
}) })
queueEvents.on('failed', (job) => { queueEvents.on('failed', (job) => {
console.log('failed job: ', job.jobId) console.log('failed job:', job.jobId)
}) })
const gracefulShutdown = async (signal: string) => { return worker
console.log(`[worker]: Received ${signal}, closing server...`)
await new Promise<void>((resolve) => {
server.close((err) => {
console.log('[worker]: Express server closed')
if (err) {
console.log('[worker]: error stopping server', { err })
}
resolve()
})
})
await worker.close()
console.log('[worker]: Worker closed')
await redisDataSource.shutdown()
console.log('[worker]: Redis connection closed')
process.exit(0)
}
const handleShutdown = (signal: string) => {
return () => {
void gracefulShutdown(signal)
}
}
process.on('SIGTERM', handleShutdown('SIGTERM'))
process.on('SIGINT', handleShutdown('SIGINT'))
process.on('uncaughtException', (error) => {
console.error('Uncaught Exception:', error)
handleShutdown('uncaughtException')
})
process.on('unhandledRejection', (reason, promise) => {
console.error('Unhandled Rejection at:', promise, 'reason:', reason)
handleShutdown('unhandledRejection')
})
} }
// only call main if the file was called from the CLI and wasn't required from another module export const createWorkers = (redisDataSource: RedisDataSource) => {
if (require.main === module) { return QUEUE_NAMES.map((queueName) =>
main() createWorker(redisDataSource, queueName)
)
} }

View File

@ -2799,7 +2799,7 @@
google-gax "^3.5.7" google-gax "^3.5.7"
protobufjs "^7.2.5" protobufjs "^7.2.5"
"@google-cloud/functions-framework@3.1.2", "@google-cloud/functions-framework@^3.0.0": "@google-cloud/functions-framework@3.1.2":
version "3.1.2" version "3.1.2"
resolved "https://registry.yarnpkg.com/@google-cloud/functions-framework/-/functions-framework-3.1.2.tgz#2cd92ce4307bf7f32555d028dca22e398473b410" resolved "https://registry.yarnpkg.com/@google-cloud/functions-framework/-/functions-framework-3.1.2.tgz#2cd92ce4307bf7f32555d028dca22e398473b410"
integrity sha512-pYvEH65/Rqh1JNPdcBmorcV7Xoom2/iOSmbtYza8msro7Inl+qOYxbyMiQfySD2gwAyn38WyWPRqsDRcf/BFLg== integrity sha512-pYvEH65/Rqh1JNPdcBmorcV7Xoom2/iOSmbtYza8msro7Inl+qOYxbyMiQfySD2gwAyn38WyWPRqsDRcf/BFLg==