prioritized jobs

This commit is contained in:
Hongbo Wu
2024-01-18 16:39:03 +08:00
parent 0015a946b2
commit 43df5d30a8
3 changed files with 37 additions and 33 deletions

View File

@ -1,12 +1,32 @@
import { Queue } from 'bullmq'
import { redis } from './redis'
const QUEUE_NAME = 'omnivore-content-fetch'
const QUEUE_NAME = 'omnivore-backend-queue'
const JOB_NAME = 'save-page'
interface savePageJob {
url: string
userId: string
data: unknown
isRss: boolean
isImport: boolean
}
const getPriority = (job: savePageJob): number => {
// we want to prioritized jobs by the expected time to complete
// lower number means higher priority
// priority 1: jobs that are expected to finish immediately
// priority 5: jobs that are expected to finish in less than 10 second
// priority 10: jobs that are expected to finish in less than 10 minutes
// priority 100: jobs that are expected to finish in less than 1 hour
if (job.isRss) {
return 10
}
if (job.isImport) {
return 100
}
return 5
}
const createQueue = (): Queue | undefined => {
@ -22,12 +42,13 @@ export const queueSavePageJob = async (savePageJobs: savePageJob[]) => {
}
const jobs = savePageJobs.map((job) => ({
name: 'save-page',
name: JOB_NAME,
data: job.data,
opts: {
jobId: `${job.userId}-${job.url}`,
removeOnComplete: true,
removeOnFail: true,
priority: getPriority(job),
},
}))

View File

@ -11,35 +11,14 @@ export const redis = new Redis(url || 'redis://localhost:6379', {
rejectUnauthorized: false, // for self-signed certs
}
: undefined,
reconnectOnError: (err) => {
const targetErrors = [/READONLY/, /ETIMEDOUT/]
targetErrors.forEach((targetError) => {
if (targetError.test(err.message)) {
// Only reconnect when the error contains the keyword
return true
}
})
return false
},
retryStrategy: (times) => {
if (times > 10) {
// End reconnecting after a specific number of tries and flush all commands with a individual error
return null
}
// reconnect after
return Math.min(times * 50, 2000)
},
maxRetriesPerRequest: null,
offlineQueue: false,
})
// graceful shutdown
process.on('SIGINT', () => {
;(async () => {
console.log('SIGINT signal received: closing HTTP server')
await redis.quit()
console.log('redis connection closed')
process.exit()
})()
redis.on('connect', () => {
console.log('Redis connected')
})
redis.on('error', (err) => {
console.error('Redis error', err)
})

View File

@ -52,16 +52,18 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
const body = <RequestBody>req.body
let users = body.users || [] // users is used when saving article for multiple users
// users is used when saving article for multiple users
let users = body.users || []
const userId = body.userId
const folder = body.folder
// userId is used when saving article for a single user
if (userId) {
users = [
{
id: userId,
folder: body.folder,
},
] // userId is used when saving article for a single user
]
}
const articleSavingRequestId = body.saveRequestId
const state = body.state
@ -125,6 +127,8 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
savedAt,
publishedAt,
},
isRss: !!rssFeedUrl,
isImport: !!taskId,
}))
const result = await queueSavePageJob(savePageJobs)