diff --git a/packages/api/src/jobs/create_digest.ts b/packages/api/src/jobs/create_digest.ts new file mode 100644 index 000000000..1e06132c0 --- /dev/null +++ b/packages/api/src/jobs/create_digest.ts @@ -0,0 +1,18 @@ +import { logger } from '../utils/logger' + +export const CREATE_DIGEST_JOB = 'CREATE_DIGEST_JOB' + +export interface CreateDigestJobData { + userId: string +} + +export interface CreateDigestJobResponse { + jobId: string +} + +export const processCreateDigestJob = async (data: CreateDigestJobData) => { + logger.info('processing create digest job', data) + + // do something + await Promise.resolve() +} diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index 136a854f4..2670e2777 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -18,6 +18,7 @@ import { TaskState } from './generated/graphql' import { aiSummarize, AI_SUMMARIZE_JOB_NAME } from './jobs/ai-summarize' import { bulkAction, BULK_ACTION_JOB_NAME } from './jobs/bulk_action' import { callWebhook, CALL_WEBHOOK_JOB_NAME } from './jobs/call_webhook' +import { CREATE_DIGEST_JOB, processCreateDigestJob } from './jobs/create_digest' import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail' import { exportAllItems, @@ -178,6 +179,8 @@ export const createWorker = (connection: ConnectionOptions) => return saveNewsletterJob(job.data) case FORWARD_EMAIL_JOB: return forwardEmailJob(job.data) + case CREATE_DIGEST_JOB: + return processCreateDigestJob(job.data) default: logger.warning(`[queue-processor] unhandled job: ${job.name}`) } diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index 001daee64..d8fd2bbf5 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -18,6 +18,11 @@ import { import { AISummarizeJobData, AI_SUMMARIZE_JOB_NAME } from '../jobs/ai-summarize' import { BulkActionData, BULK_ACTION_JOB_NAME } from '../jobs/bulk_action' import { CallWebhookJobData, CALL_WEBHOOK_JOB_NAME } from '../jobs/call_webhook' +import { + CreateDigestJobData, + CreateDigestJobResponse, + CREATE_DIGEST_JOB, +} from '../jobs/create_digest' import { THUMBNAIL_JOB } from '../jobs/find_thumbnail' import { EXPORT_ALL_ITEMS_JOB_NAME } from '../jobs/integration/export_all_items' import { @@ -81,6 +86,7 @@ export const getJobPriority = (jobName: string): number => { case `${REFRESH_FEED_JOB_NAME}_high`: return 10 case PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME: + case CREATE_DIGEST_JOB: return 20 case `${REFRESH_FEED_JOB_NAME}_low`: case EXPORT_ITEM_JOB_NAME: @@ -853,4 +859,28 @@ export const enqueueSendEmail = async (jobData: SendEmailJobData) => { }) } +export const enqueueCreateDigest = async ( + data: CreateDigestJobData +): Promise => { + const queue = await getBackendQueue() + if (!queue) { + throw new Error('No queue found') + } + + const jobId = `create-digest-${data.userId}` + const job = await queue.add(CREATE_DIGEST_JOB, data, { + jobId, // dedupe by userId + removeOnComplete: true, + removeOnFail: true, + attempts: 3, + priority: getJobPriority(CREATE_DIGEST_JOB), + }) + + logger.info('create digest job enqueued', { jobId: job.id }) + + return { + jobId, + } +} + export default createHttpTaskWithToken