create a worker to process content-fetch job
This commit is contained in:
@ -7,15 +7,16 @@
|
||||
"build/src"
|
||||
],
|
||||
"dependencies": {
|
||||
"bullmq": "^5.1.1",
|
||||
"dotenv": "^8.2.0",
|
||||
"express": "^4.17.1",
|
||||
"posthog-node": "^3.6.3",
|
||||
"@google-cloud/functions-framework": "^3.0.0",
|
||||
"@google-cloud/storage": "^7.0.1",
|
||||
"@omnivore/puppeteer-parse": "^1.0.0",
|
||||
"@omnivore/utils": "1.0.0",
|
||||
"@sentry/serverless": "^7.77.0"
|
||||
"@sentry/serverless": "^7.77.0",
|
||||
"bullmq": "^5.1.1",
|
||||
"dotenv": "^8.2.0",
|
||||
"express": "^4.17.1",
|
||||
"express-async-handler": "^1.2.0",
|
||||
"posthog-node": "^3.6.3"
|
||||
},
|
||||
"devDependencies": {
|
||||
"chai": "^4.3.6",
|
||||
@ -27,7 +28,8 @@
|
||||
"lint": "eslint src --ext ts,js,tsx,jsx",
|
||||
"build": "tsc",
|
||||
"start": "node build/src/app.js",
|
||||
"start_gcf": "functions-framework --port=9090 --target=puppeteer"
|
||||
"start_gcf": "functions-framework --port=9090 --target=puppeteer",
|
||||
"start_worker": "node build/src/worker.js"
|
||||
},
|
||||
"volta": {
|
||||
"extends": "../../package.json"
|
||||
|
||||
@ -12,7 +12,7 @@ interface UserConfig {
|
||||
folder?: string
|
||||
}
|
||||
|
||||
interface RequestBody {
|
||||
export interface JobData {
|
||||
url: string
|
||||
userId?: string
|
||||
saveRequestId: string
|
||||
@ -175,36 +175,37 @@ const incrementContentFetchFailure = async (
|
||||
}
|
||||
}
|
||||
|
||||
export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
|
||||
export const processFetchContentJob = async (
|
||||
redisDataSource: RedisDataSource,
|
||||
data: JobData
|
||||
) => {
|
||||
const functionStartTime = Date.now()
|
||||
|
||||
const body = <RequestBody>req.body
|
||||
|
||||
// users is used when saving article for multiple users
|
||||
let users = body.users || []
|
||||
const userId = body.userId
|
||||
let users = data.users || []
|
||||
const userId = data.userId
|
||||
// userId is used when saving article for a single user
|
||||
if (userId) {
|
||||
users = [
|
||||
{
|
||||
id: userId,
|
||||
folder: body.folder,
|
||||
libraryItemId: body.saveRequestId,
|
||||
folder: data.folder,
|
||||
libraryItemId: data.saveRequestId,
|
||||
},
|
||||
]
|
||||
}
|
||||
const articleSavingRequestId = body.saveRequestId
|
||||
const state = body.state
|
||||
const labels = body.labels
|
||||
const source = body.source || 'puppeteer-parse'
|
||||
const taskId = body.taskId // taskId is used to update import status
|
||||
const url = body.url
|
||||
const locale = body.locale
|
||||
const timezone = body.timezone
|
||||
const rssFeedUrl = body.rssFeedUrl
|
||||
const savedAt = body.savedAt
|
||||
const publishedAt = body.publishedAt
|
||||
const priority = body.priority
|
||||
const articleSavingRequestId = data.saveRequestId
|
||||
const state = data.state
|
||||
const labels = data.labels
|
||||
const source = data.source || 'puppeteer-parse'
|
||||
const taskId = data.taskId // taskId is used to update import status
|
||||
const url = data.url
|
||||
const locale = data.locale
|
||||
const timezone = data.timezone
|
||||
const rssFeedUrl = data.rssFeedUrl
|
||||
const savedAt = data.savedAt
|
||||
const publishedAt = data.publishedAt
|
||||
const priority = data.priority
|
||||
|
||||
const logRecord: LogRecord = {
|
||||
url,
|
||||
@ -225,25 +226,13 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
|
||||
|
||||
console.log(`Article parsing request`, logRecord)
|
||||
|
||||
// create redis source
|
||||
const redisDataSource = new RedisDataSource({
|
||||
cache: {
|
||||
url: process.env.REDIS_URL,
|
||||
cert: process.env.REDIS_CERT,
|
||||
},
|
||||
mq: {
|
||||
url: process.env.MQ_REDIS_URL,
|
||||
cert: process.env.MQ_REDIS_CERT,
|
||||
},
|
||||
})
|
||||
|
||||
try {
|
||||
const domain = new URL(url).hostname
|
||||
const isBlocked = await isDomainBlocked(redisDataSource, domain)
|
||||
if (isBlocked) {
|
||||
console.log('domain is blocked', domain)
|
||||
|
||||
return res.sendStatus(200)
|
||||
return
|
||||
}
|
||||
|
||||
const key = cacheKey(url, locale, timezone)
|
||||
@ -312,7 +301,7 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
|
||||
logRecord.error = 'unknown error'
|
||||
}
|
||||
|
||||
return res.sendStatus(500)
|
||||
throw error
|
||||
} finally {
|
||||
logRecord.totalTime = Date.now() - functionStartTime
|
||||
console.log(`parse-page result`, logRecord)
|
||||
@ -330,7 +319,29 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
|
||||
},
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
|
||||
const data = <JobData>req.body
|
||||
|
||||
// create redis source
|
||||
const redisDataSource = new RedisDataSource({
|
||||
cache: {
|
||||
url: process.env.REDIS_URL,
|
||||
cert: process.env.REDIS_CERT,
|
||||
},
|
||||
mq: {
|
||||
url: process.env.MQ_REDIS_URL,
|
||||
cert: process.env.MQ_REDIS_CERT,
|
||||
},
|
||||
})
|
||||
|
||||
try {
|
||||
await processFetchContentJob(redisDataSource, data)
|
||||
} catch (error) {
|
||||
return res.sendStatus(500)
|
||||
} finally {
|
||||
await redisDataSource.shutdown()
|
||||
}
|
||||
|
||||
|
||||
186
packages/content-fetch/src/worker.ts
Normal file
186
packages/content-fetch/src/worker.ts
Normal file
@ -0,0 +1,186 @@
|
||||
import { RedisDataSource } from '@omnivore/utils'
|
||||
import { Job, JobType, Queue, QueueEvents, RedisClient, Worker } from 'bullmq'
|
||||
import express, { Express } from 'express'
|
||||
import asyncHandler from 'express-async-handler'
|
||||
import { JobData, processFetchContentJob } from './request_handler'
|
||||
|
||||
const QUEUE_NAME = 'omnivore-content-fetch-queue'
|
||||
|
||||
export const getContentFetchQueue = async (
|
||||
connection: RedisClient
|
||||
): Promise<Queue> => {
|
||||
const queue = new Queue(QUEUE_NAME, {
|
||||
connection,
|
||||
defaultJobOptions: {
|
||||
backoff: {
|
||||
type: 'exponential',
|
||||
delay: 2000, // 2 seconds
|
||||
},
|
||||
removeOnComplete: {
|
||||
age: 24 * 3600, // keep up to 24 hours
|
||||
},
|
||||
removeOnFail: {
|
||||
age: 7 * 24 * 3600, // keep up to 7 days
|
||||
},
|
||||
},
|
||||
})
|
||||
await queue.waitUntilReady()
|
||||
return queue
|
||||
}
|
||||
|
||||
const createWorker = (redisDataSource: RedisDataSource) => {
|
||||
return new Worker(
|
||||
QUEUE_NAME,
|
||||
async (job: Job<JobData>) => {
|
||||
// process the job
|
||||
await processFetchContentJob(redisDataSource, job.data)
|
||||
},
|
||||
{
|
||||
connection: redisDataSource.queueRedisClient,
|
||||
autorun: true, // start processing jobs immediately
|
||||
lockDuration: 60_000, // 1 minute
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
const main = () => {
|
||||
console.log('[worker]: starting worker')
|
||||
|
||||
const app: Express = express()
|
||||
const port = process.env.PORT || 3002
|
||||
|
||||
// create redis source
|
||||
const redisDataSource = new RedisDataSource({
|
||||
cache: {
|
||||
url: process.env.REDIS_URL,
|
||||
cert: process.env.REDIS_CERT,
|
||||
},
|
||||
mq: {
|
||||
url: process.env.MQ_REDIS_URL,
|
||||
cert: process.env.MQ_REDIS_CERT,
|
||||
},
|
||||
})
|
||||
|
||||
// respond healthy to auto-scaler.
|
||||
app.get('/_ah/health', (req, res) => res.sendStatus(200))
|
||||
|
||||
app.get(
|
||||
'/lifecycle/prestop',
|
||||
asyncHandler(async (_req, res) => {
|
||||
console.log('prestop lifecycle hook called.')
|
||||
await worker.close()
|
||||
res.sendStatus(200)
|
||||
})
|
||||
)
|
||||
|
||||
app.get(
|
||||
'/metrics',
|
||||
asyncHandler(async (_, res) => {
|
||||
const queue = await getContentFetchQueue(redisDataSource.queueRedisClient)
|
||||
if (!queue) {
|
||||
res.sendStatus(400)
|
||||
return
|
||||
}
|
||||
|
||||
let output = ''
|
||||
const jobsTypes: JobType[] = [
|
||||
'active',
|
||||
'failed',
|
||||
'completed',
|
||||
'prioritized',
|
||||
]
|
||||
const counts = await queue.getJobCounts(...jobsTypes)
|
||||
|
||||
jobsTypes.forEach((metric) => {
|
||||
output += `# TYPE omnivore_queue_messages_${metric} gauge\n`
|
||||
output += `omnivore_queue_messages_${metric}{queue="${QUEUE_NAME}"} ${counts[metric]}\n`
|
||||
})
|
||||
|
||||
// Export the age of the oldest prioritized job in the queue
|
||||
const oldestJobs = await queue.getJobs(['prioritized'], 0, 1, true)
|
||||
if (oldestJobs.length > 0) {
|
||||
const currentTime = Date.now()
|
||||
const ageInSeconds = (currentTime - oldestJobs[0].timestamp) / 1000
|
||||
output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n`
|
||||
output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE_NAME}"} ${ageInSeconds}\n`
|
||||
} else {
|
||||
output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n`
|
||||
output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE_NAME}"} ${0}\n`
|
||||
}
|
||||
|
||||
res.status(200).setHeader('Content-Type', 'text/plain').send(output)
|
||||
})
|
||||
)
|
||||
|
||||
const server = app.listen(port, () => {
|
||||
console.log(`[worker]: started`)
|
||||
})
|
||||
|
||||
const worker = createWorker(redisDataSource)
|
||||
|
||||
const queueEvents = new QueueEvents(QUEUE_NAME, {
|
||||
connection: redisDataSource.queueRedisClient,
|
||||
})
|
||||
|
||||
queueEvents.on('added', (job) => {
|
||||
console.log('added job: ', job.jobId, job.name)
|
||||
})
|
||||
|
||||
queueEvents.on('removed', (job) => {
|
||||
console.log('removed job: ', job.jobId)
|
||||
})
|
||||
|
||||
queueEvents.on('completed', (job) => {
|
||||
console.log('completed job: ', job.jobId)
|
||||
})
|
||||
|
||||
queueEvents.on('failed', (job) => {
|
||||
console.log('failed job: ', job.jobId)
|
||||
})
|
||||
|
||||
const gracefulShutdown = async (signal: string) => {
|
||||
console.log(`[worker]: Received ${signal}, closing server...`)
|
||||
await new Promise<void>((resolve) => {
|
||||
server.close((err) => {
|
||||
console.log('[worker]: Express server closed')
|
||||
if (err) {
|
||||
console.log('[worker]: error stopping server', { err })
|
||||
}
|
||||
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
|
||||
await worker.close()
|
||||
console.log('[worker]: Worker closed')
|
||||
|
||||
await redisDataSource.shutdown()
|
||||
console.log('[worker]: Redis connection closed')
|
||||
|
||||
process.exit(0)
|
||||
}
|
||||
|
||||
const handleShutdown = (signal: string) => {
|
||||
return () => {
|
||||
void gracefulShutdown(signal)
|
||||
}
|
||||
}
|
||||
|
||||
process.on('SIGTERM', handleShutdown('SIGTERM'))
|
||||
process.on('SIGINT', handleShutdown('SIGINT'))
|
||||
|
||||
process.on('uncaughtException', (error) => {
|
||||
console.error('Uncaught Exception:', error)
|
||||
handleShutdown('uncaughtException')
|
||||
})
|
||||
|
||||
process.on('unhandledRejection', (reason, promise) => {
|
||||
console.error('Unhandled Rejection at:', promise, 'reason:', reason)
|
||||
handleShutdown('unhandledRejection')
|
||||
})
|
||||
}
|
||||
|
||||
// only call main if the file was called from the CLI and wasn't required from another module
|
||||
if (require.main === module) {
|
||||
main()
|
||||
}
|
||||
Reference in New Issue
Block a user