diff --git a/packages/api/src/jobs/ai/create_digest.ts b/packages/api/src/jobs/ai/create_digest.ts index 81558f179..a3be248a0 100644 --- a/packages/api/src/jobs/ai/create_digest.ts +++ b/packages/api/src/jobs/ai/create_digest.ts @@ -75,9 +75,9 @@ interface RankedTitle { export const CREATE_DIGEST_JOB = 'create-digest' export const CRON_PATTERNS = { // every day at 10:30 UTC - daily: '0 30 10 * * *', + daily: '30 10 * * *', // every Sunday at 10:30 UTC - weekly: '0 30 10 * * 7', + weekly: '30 10 * * 7', } let digestDefinition: DigestDefinition diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index e6b02456f..b59914dc4 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -872,43 +872,14 @@ export const enqueueCreateDigest = async ( throw new Error('No queue found') } + // enqueue create digest job immediately const jobId = `${CREATE_DIGEST_JOB}_${data.userId}` - - let repeatOptions - - if (schedule) { - const pattern = getCronPattern(schedule) - repeatOptions = { - immediately: true, // run immediately - pattern, - } - - await Promise.all( - Object.keys(CRON_PATTERNS).map(async (key) => { - // remove existing repeated job if any - const isDeleted = await queue.removeRepeatable( - CREATE_DIGEST_JOB, - { - immediately: true, // run immediately - pattern: CRON_PATTERNS[key as keyof typeof CRON_PATTERNS], - }, - jobId - ) - - if (isDeleted) { - logger.info('existing repeated job removed', { jobId, schedule: key }) - } - }) - ) - } - const job = await queue.add(CREATE_DIGEST_JOB, data, { jobId, // dedupe by job id removeOnComplete: true, removeOnFail: true, attempts: 1, priority: getJobPriority(CREATE_DIGEST_JOB), - repeat: repeatOptions, }) if (!job || !job.id) { @@ -916,7 +887,7 @@ export const enqueueCreateDigest = async ( throw new Error('Error while enqueuing create digest job') } - logger.info('create digest job enqueued', { jobId, schedule }) + logger.info('create digest job enqueued', { jobId }) const digest = { id: data.id, @@ -926,6 +897,44 @@ export const enqueueCreateDigest = async ( // update digest job state in redis await writeDigest(data.userId, digest) + if (schedule) { + await Promise.all( + Object.keys(CRON_PATTERNS).map(async (key) => { + // remove existing repeated job if any + const isDeleted = await queue.removeRepeatable( + CREATE_DIGEST_JOB, + { + pattern: CRON_PATTERNS[key as keyof typeof CRON_PATTERNS], + tz: 'UTC', + }, + jobId + ) + + if (isDeleted) { + logger.info('existing repeated job removed', { jobId, schedule: key }) + } + }) + ) + + // schedule repeated job + const job = await queue.add(CREATE_DIGEST_JOB, data, { + attempts: 1, + priority: getJobPriority(CREATE_DIGEST_JOB), + repeat: { + pattern: getCronPattern(schedule), + jobId, + tz: 'UTC', + }, + }) + + if (!job || !job.id) { + logger.error('Error while scheduling create digest job', data) + throw new Error('Error while scheduling create digest job') + } + + logger.info('create digest job scheduled', { jobId, schedule }) + } + return { jobId: digest.id, jobState: digest.jobState, diff --git a/packages/api/test/util.ts b/packages/api/test/util.ts index ee1c30bdc..42b5ecd60 100644 --- a/packages/api/test/util.ts +++ b/packages/api/test/util.ts @@ -1,4 +1,5 @@ import { ConnectionOptions, Job, QueueEvents, Worker } from 'bullmq' +import { createServer } from 'http' import { nanoid } from 'nanoid' import supertest from 'supertest' import { v4 } from 'uuid' @@ -8,7 +9,8 @@ import { createApp } from '../src/server' import { corsConfig } from '../src/utils/corsConfig' const app = createApp() -const apollo = makeApolloServer(app) +const httpServer = createServer(app) +const apollo = makeApolloServer(app, httpServer) export const request = supertest(app) let worker: Worker let queueEvents: QueueEvents