use one queue with different priority for fetching content of rss feed item or saved url
This commit is contained in:
@ -35,6 +35,7 @@ interface RefreshFeedRequest {
|
||||
fetchContentTypes: FetchContentType[]
|
||||
folders: FolderType[]
|
||||
refreshContext?: RSSRefreshContext
|
||||
priority?: 'low' | 'high'
|
||||
}
|
||||
|
||||
export const isRefreshFeedRequest = (data: any): data is RefreshFeedRequest => {
|
||||
@ -332,7 +333,8 @@ const createTask = async (
|
||||
const fetchContentAndCreateItem = async (
|
||||
users: UserConfig[],
|
||||
feedUrl: string,
|
||||
item: RssFeedItem
|
||||
item: RssFeedItem,
|
||||
priority = 'low' as 'low' | 'high'
|
||||
) => {
|
||||
const data: FetchContentJobData = {
|
||||
users,
|
||||
@ -342,7 +344,7 @@ const fetchContentAndCreateItem = async (
|
||||
rssFeedUrl: feedUrl,
|
||||
savedAt: item.isoDate,
|
||||
publishedAt: item.isoDate,
|
||||
priority: 'low',
|
||||
priority,
|
||||
}
|
||||
|
||||
try {
|
||||
@ -703,6 +705,7 @@ export const _refreshFeed = async (request: RefreshFeedRequest) => {
|
||||
fetchContentTypes,
|
||||
folders,
|
||||
refreshContext,
|
||||
priority,
|
||||
} = request
|
||||
|
||||
logger.info('Processing feed', feedUrl, { refreshContext: refreshContext })
|
||||
@ -771,7 +774,8 @@ export const _refreshFeed = async (request: RefreshFeedRequest) => {
|
||||
await fetchContentAndCreateItem(
|
||||
Array.from(task.users.values()),
|
||||
feedUrl,
|
||||
task.item
|
||||
task.item,
|
||||
priority
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@ -84,8 +84,6 @@ import { logger } from './utils/logger'
|
||||
|
||||
export const BACKEND_QUEUE_NAME = 'omnivore-backend-queue'
|
||||
export const CONTENT_FETCH_QUEUE = 'omnivore-content-fetch-queue'
|
||||
export const CONTENT_FETCH_SLOW_QUEUE = 'omnivore-content-fetch-slow-queue'
|
||||
export const CONTENT_FETCH_RSS_QUEUE = 'omnivore-content-fetch-rss-queue'
|
||||
|
||||
export const JOB_VERSION = 'v001'
|
||||
|
||||
|
||||
@ -57,7 +57,7 @@ const addRecentSavedItem = async (userId: string) => {
|
||||
// default: use normal queue
|
||||
const getPriorityByRateLimit = async (
|
||||
userId: string
|
||||
): Promise<'low' | 'high' | undefined> => {
|
||||
): Promise<'low' | 'high'> => {
|
||||
const redisClient = redisDataSource.redisClient
|
||||
if (redisClient) {
|
||||
const oneMinuteAgo = Date.now() - 60 * 1000
|
||||
@ -75,6 +75,8 @@ const getPriorityByRateLimit = async (
|
||||
logger.error('Failed to get priority by rate limit', { userId, error })
|
||||
}
|
||||
}
|
||||
|
||||
return 'high'
|
||||
}
|
||||
|
||||
export const validateUrl = (url: string): URL => {
|
||||
|
||||
@ -65,13 +65,7 @@ import {
|
||||
UploadContentJobData,
|
||||
UPLOAD_CONTENT_JOB,
|
||||
} from '../jobs/upload_content'
|
||||
import {
|
||||
CONTENT_FETCH_QUEUE,
|
||||
CONTENT_FETCH_RSS_QUEUE,
|
||||
CONTENT_FETCH_SLOW_QUEUE,
|
||||
getQueue,
|
||||
JOB_VERSION,
|
||||
} from '../queue-processor'
|
||||
import { CONTENT_FETCH_QUEUE, getQueue, JOB_VERSION } from '../queue-processor'
|
||||
import { redisDataSource } from '../redis_data_source'
|
||||
import { writeDigest } from '../services/digest'
|
||||
import { signFeatureToken } from '../services/features'
|
||||
@ -102,18 +96,21 @@ export const getJobPriority = (jobName: string): number => {
|
||||
case SYNC_READ_POSITIONS_JOB_NAME:
|
||||
case SEND_EMAIL_JOB:
|
||||
case UPDATE_HOME_JOB:
|
||||
case FETCH_CONTENT_JOB:
|
||||
case `${FETCH_CONTENT_JOB}_high`:
|
||||
return 1
|
||||
case TRIGGER_RULE_JOB_NAME:
|
||||
case CALL_WEBHOOK_JOB_NAME:
|
||||
case AI_SUMMARIZE_JOB_NAME:
|
||||
case PROCESS_YOUTUBE_VIDEO_JOB_NAME:
|
||||
case `${FETCH_CONTENT_JOB}_low`:
|
||||
case `${FETCH_CONTENT_JOB}_rss_high`:
|
||||
return 5
|
||||
case BULK_ACTION_JOB_NAME:
|
||||
case `${REFRESH_FEED_JOB_NAME}_high`:
|
||||
case PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME:
|
||||
case UPLOAD_CONTENT_JOB:
|
||||
case SCORE_LIBRARY_ITEM_JOB:
|
||||
case `${FETCH_CONTENT_JOB}_rss_low`:
|
||||
return 10
|
||||
case `${REFRESH_FEED_JOB_NAME}_low`:
|
||||
case EXPORT_ITEM_JOB_NAME:
|
||||
@ -336,7 +333,7 @@ export interface FetchContentJobData {
|
||||
folder?: string
|
||||
libraryItemId: string
|
||||
}>
|
||||
priority?: 'low' | 'high'
|
||||
priority: 'low' | 'high'
|
||||
state?: ArticleSavingRequestStatus
|
||||
labels?: Array<CreateLabelInput>
|
||||
locale?: string
|
||||
@ -360,39 +357,32 @@ export interface FetchContentJobData {
|
||||
export const enqueueFetchContentJob = async (
|
||||
data: FetchContentJobData
|
||||
): Promise<string> => {
|
||||
const getQueueName = (data: FetchContentJobData) => {
|
||||
if (data.rssFeedUrl) {
|
||||
return CONTENT_FETCH_RSS_QUEUE
|
||||
}
|
||||
|
||||
if (data.priority === 'low') {
|
||||
return CONTENT_FETCH_SLOW_QUEUE
|
||||
}
|
||||
|
||||
return CONTENT_FETCH_QUEUE
|
||||
}
|
||||
|
||||
const queueName = getQueueName(data)
|
||||
const queue = await getQueue(queueName)
|
||||
const queue = await getQueue(CONTENT_FETCH_QUEUE)
|
||||
if (!queue) {
|
||||
throw new Error('No queue found')
|
||||
}
|
||||
|
||||
const jobName = `${FETCH_CONTENT_JOB}${data.rssFeedUrl ? '_rss' : ''}_${
|
||||
data.priority
|
||||
}`
|
||||
|
||||
// sort the data to make sure the hash is consistent
|
||||
const sortedData = JSON.stringify(data, Object.keys(data).sort())
|
||||
const jobId = `${FETCH_CONTENT_JOB}_${stringToHash(
|
||||
sortedData
|
||||
)}_${JOB_VERSION}`
|
||||
const priority = getJobPriority(jobName)
|
||||
|
||||
const job = await queue.add(FETCH_CONTENT_JOB, data, {
|
||||
priority: getJobPriority(FETCH_CONTENT_JOB),
|
||||
attempts: data.priority === 'low' ? 2 : 3,
|
||||
jobId,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
priority,
|
||||
attempts: 2,
|
||||
backoff: {
|
||||
type: 'exponential',
|
||||
delay: 2000,
|
||||
},
|
||||
jobId,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
})
|
||||
|
||||
if (!job || !job.id) {
|
||||
|
||||
@ -2,10 +2,10 @@ import { RedisDataSource } from '@omnivore/utils'
|
||||
import { JobType } from 'bullmq'
|
||||
import express, { Express } from 'express'
|
||||
import asyncHandler from 'express-async-handler'
|
||||
import { createWorkers, getQueue } from './worker'
|
||||
import { createWorker, getQueue, QUEUE } from './worker'
|
||||
|
||||
const main = () => {
|
||||
console.log('[worker]: starting workers')
|
||||
console.log('Starting worker...')
|
||||
|
||||
const app: Express = express()
|
||||
const port = process.env.PORT || 3002
|
||||
@ -22,16 +22,7 @@ const main = () => {
|
||||
},
|
||||
})
|
||||
|
||||
const workers = createWorkers(redisDataSource)
|
||||
|
||||
const closeWorkers = async () => {
|
||||
await Promise.all(
|
||||
workers.map(async (worker) => {
|
||||
await worker.close()
|
||||
console.log('worker closed:', worker.name)
|
||||
})
|
||||
)
|
||||
}
|
||||
const worker = createWorker(redisDataSource)
|
||||
|
||||
// respond healthy to auto-scaler.
|
||||
app.get('/_ah/health', (req, res) => res.sendStatus(200))
|
||||
@ -39,9 +30,10 @@ const main = () => {
|
||||
app.get(
|
||||
'/lifecycle/prestop',
|
||||
asyncHandler(async (_req, res) => {
|
||||
console.log('prestop lifecycle hook called.')
|
||||
await closeWorkers()
|
||||
console.log('workers closed')
|
||||
console.log('Prestop lifecycle hook called.')
|
||||
|
||||
await worker.close()
|
||||
console.log('Worker closed')
|
||||
|
||||
res.sendStatus(200)
|
||||
})
|
||||
@ -52,37 +44,31 @@ const main = () => {
|
||||
asyncHandler(async (_, res) => {
|
||||
let output = ''
|
||||
|
||||
for (const worker of workers) {
|
||||
const queueName = worker.name
|
||||
const queue = await getQueue(
|
||||
redisDataSource.queueRedisClient,
|
||||
queueName
|
||||
)
|
||||
const queue = await getQueue(redisDataSource.queueRedisClient)
|
||||
|
||||
const jobsTypes: Array<JobType> = [
|
||||
'active',
|
||||
'failed',
|
||||
'completed',
|
||||
'prioritized',
|
||||
]
|
||||
const counts = await queue.getJobCounts(...jobsTypes)
|
||||
const jobsTypes: Array<JobType> = [
|
||||
'active',
|
||||
'failed',
|
||||
'completed',
|
||||
'prioritized',
|
||||
]
|
||||
const counts = await queue.getJobCounts(...jobsTypes)
|
||||
|
||||
jobsTypes.forEach((metric) => {
|
||||
output += `# TYPE omnivore_queue_messages_${metric} gauge\n`
|
||||
output += `omnivore_queue_messages_${metric}{queue="${queueName}"} ${counts[metric]}\n`
|
||||
})
|
||||
jobsTypes.forEach((metric) => {
|
||||
output += `# TYPE omnivore_queue_messages_${metric} gauge\n`
|
||||
output += `omnivore_queue_messages_${metric}{queue="${QUEUE}"} ${counts[metric]}\n`
|
||||
})
|
||||
|
||||
// Export the age of the oldest prioritized job in the queue
|
||||
const oldestJobs = await queue.getJobs(['prioritized'], 0, 1, true)
|
||||
if (oldestJobs.length > 0) {
|
||||
const currentTime = Date.now()
|
||||
const ageInSeconds = (currentTime - oldestJobs[0].timestamp) / 1000
|
||||
output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n`
|
||||
output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${queueName}"} ${ageInSeconds}\n`
|
||||
} else {
|
||||
output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n`
|
||||
output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${queueName}"} ${0}\n`
|
||||
}
|
||||
// Export the age of the oldest prioritized job in the queue
|
||||
const oldestJobs = await queue.getJobs(['prioritized'], 0, 1, true)
|
||||
if (oldestJobs.length > 0) {
|
||||
const currentTime = Date.now()
|
||||
const ageInSeconds = (currentTime - oldestJobs[0].timestamp) / 1000
|
||||
output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n`
|
||||
output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE}"} ${ageInSeconds}\n`
|
||||
} else {
|
||||
output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n`
|
||||
output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE}"} ${0}\n`
|
||||
}
|
||||
|
||||
res.status(200).setHeader('Content-Type', 'text/plain').send(output)
|
||||
@ -90,27 +76,27 @@ const main = () => {
|
||||
)
|
||||
|
||||
const server = app.listen(port, () => {
|
||||
console.log(`[worker]: Workers started`)
|
||||
console.log('Worker started')
|
||||
})
|
||||
|
||||
const gracefulShutdown = async (signal: string) => {
|
||||
console.log(`[worker]: Received ${signal}, closing server...`)
|
||||
console.log(`Received ${signal}, closing server...`)
|
||||
await new Promise<void>((resolve) => {
|
||||
server.close((err) => {
|
||||
console.log('[worker]: Express server closed')
|
||||
console.log('Express server closed')
|
||||
if (err) {
|
||||
console.log('[worker]: error stopping server', { err })
|
||||
console.log('Error stopping server', { err })
|
||||
}
|
||||
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
|
||||
await closeWorkers()
|
||||
console.log('[worker]: Workers closed')
|
||||
await worker.close()
|
||||
console.log('Worker closed')
|
||||
|
||||
await redisDataSource.shutdown()
|
||||
console.log('[worker]: Redis connection closed')
|
||||
console.log('Redis connection closed')
|
||||
|
||||
process.exit(0)
|
||||
}
|
||||
|
||||
@ -38,23 +38,24 @@ const getPriority = (job: SavePageJob): number => {
|
||||
// priority 5: jobs that are expected to finish in less than 10 second
|
||||
// priority 10: jobs that are expected to finish in less than 10 minutes
|
||||
// priority 100: jobs that are expected to finish in less than 1 hour
|
||||
if (job.isRss) {
|
||||
return 10
|
||||
}
|
||||
if (job.isImport) {
|
||||
return 100
|
||||
}
|
||||
|
||||
return job.priority === 'low' ? 10 : 1
|
||||
if (job.isRss) {
|
||||
return job.priority === 'low' ? 10 : 5
|
||||
}
|
||||
|
||||
return job.priority === 'low' ? 5 : 1
|
||||
}
|
||||
|
||||
const getAttempts = (job: SavePageJob): number => {
|
||||
if (job.isRss || job.isImport) {
|
||||
// we don't want to retry rss or import jobs
|
||||
if (job.isImport) {
|
||||
// we don't want to retry import jobs
|
||||
return 1
|
||||
}
|
||||
|
||||
return 3
|
||||
return job.isRss ? 2 : 3
|
||||
}
|
||||
|
||||
const getOpts = (job: SavePageJob): BulkJobOptions => {
|
||||
|
||||
@ -2,14 +2,11 @@ import { RedisDataSource } from '@omnivore/utils'
|
||||
import { Job, Queue, QueueEvents, RedisClient, Worker } from 'bullmq'
|
||||
import { JobData, processFetchContentJob } from './request_handler'
|
||||
|
||||
const FAST_QUEUE = 'omnivore-content-fetch-queue'
|
||||
const SLOW_QUEUE = 'omnivore-content-fetch-slow-queue'
|
||||
const RSS_QUEUE = 'omnivore-content-fetch-rss-queue'
|
||||
const QUEUE_NAMES = [FAST_QUEUE, SLOW_QUEUE, RSS_QUEUE] as const
|
||||
export const QUEUE = 'omnivore-content-fetch-queue'
|
||||
|
||||
export const getQueue = async (
|
||||
connection: RedisClient,
|
||||
queueName: string
|
||||
queueName = QUEUE
|
||||
): Promise<Queue> => {
|
||||
const queue = new Queue(queueName, {
|
||||
connection,
|
||||
@ -30,27 +27,10 @@ export const getQueue = async (
|
||||
return queue
|
||||
}
|
||||
|
||||
const createWorker = (redisDataSource: RedisDataSource, queueName: string) => {
|
||||
const getLimiter = (queueName: string) => {
|
||||
switch (queueName) {
|
||||
case SLOW_QUEUE:
|
||||
return {
|
||||
max: 5,
|
||||
duration: 1000, // 1 second
|
||||
}
|
||||
case RSS_QUEUE:
|
||||
return {
|
||||
max: 5,
|
||||
duration: 1000, // 1 second
|
||||
}
|
||||
default:
|
||||
return {
|
||||
max: 100,
|
||||
duration: 1000, // 1 second
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const createWorker = (
|
||||
redisDataSource: RedisDataSource,
|
||||
queueName = QUEUE
|
||||
) => {
|
||||
const worker = new Worker(
|
||||
queueName,
|
||||
async (job: Job<JobData>) => {
|
||||
@ -60,7 +40,6 @@ const createWorker = (redisDataSource: RedisDataSource, queueName: string) => {
|
||||
{
|
||||
connection: redisDataSource.queueRedisClient,
|
||||
autorun: true, // start processing jobs immediately
|
||||
limiter: getLimiter(queueName),
|
||||
}
|
||||
)
|
||||
|
||||
@ -90,9 +69,3 @@ const createWorker = (redisDataSource: RedisDataSource, queueName: string) => {
|
||||
|
||||
return worker
|
||||
}
|
||||
|
||||
export const createWorkers = (redisDataSource: RedisDataSource) => {
|
||||
return QUEUE_NAMES.map((queueName) =>
|
||||
createWorker(redisDataSource, queueName)
|
||||
)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user