fix cron schedule bug with bull

This commit is contained in:
Hongbo Wu
2024-04-18 22:09:04 +08:00
parent ac775ae8f2
commit fa4b3bf462
3 changed files with 45 additions and 34 deletions

View File

@ -872,43 +872,14 @@ export const enqueueCreateDigest = async (
throw new Error('No queue found')
}
// enqueue create digest job immediately
const jobId = `${CREATE_DIGEST_JOB}_${data.userId}`
let repeatOptions
if (schedule) {
const pattern = getCronPattern(schedule)
repeatOptions = {
immediately: true, // run immediately
pattern,
}
await Promise.all(
Object.keys(CRON_PATTERNS).map(async (key) => {
// remove existing repeated job if any
const isDeleted = await queue.removeRepeatable(
CREATE_DIGEST_JOB,
{
immediately: true, // run immediately
pattern: CRON_PATTERNS[key as keyof typeof CRON_PATTERNS],
},
jobId
)
if (isDeleted) {
logger.info('existing repeated job removed', { jobId, schedule: key })
}
})
)
}
const job = await queue.add(CREATE_DIGEST_JOB, data, {
jobId, // dedupe by job id
removeOnComplete: true,
removeOnFail: true,
attempts: 1,
priority: getJobPriority(CREATE_DIGEST_JOB),
repeat: repeatOptions,
})
if (!job || !job.id) {
@ -916,7 +887,7 @@ export const enqueueCreateDigest = async (
throw new Error('Error while enqueuing create digest job')
}
logger.info('create digest job enqueued', { jobId, schedule })
logger.info('create digest job enqueued', { jobId })
const digest = {
id: data.id,
@ -926,6 +897,44 @@ export const enqueueCreateDigest = async (
// update digest job state in redis
await writeDigest(data.userId, digest)
if (schedule) {
await Promise.all(
Object.keys(CRON_PATTERNS).map(async (key) => {
// remove existing repeated job if any
const isDeleted = await queue.removeRepeatable(
CREATE_DIGEST_JOB,
{
pattern: CRON_PATTERNS[key as keyof typeof CRON_PATTERNS],
tz: 'UTC',
},
jobId
)
if (isDeleted) {
logger.info('existing repeated job removed', { jobId, schedule: key })
}
})
)
// schedule repeated job
const job = await queue.add(CREATE_DIGEST_JOB, data, {
attempts: 1,
priority: getJobPriority(CREATE_DIGEST_JOB),
repeat: {
pattern: getCronPattern(schedule),
jobId,
tz: 'UTC',
},
})
if (!job || !job.id) {
logger.error('Error while scheduling create digest job', data)
throw new Error('Error while scheduling create digest job')
}
logger.info('create digest job scheduled', { jobId, schedule })
}
return {
jobId: digest.id,
jobState: digest.jobState,