diff --git a/packages/api/src/jobs/create_digest.ts b/packages/api/src/jobs/create_digest.ts deleted file mode 100644 index 43550ef32..000000000 --- a/packages/api/src/jobs/create_digest.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { logger } from '../utils/logger' - -export const CREATE_DIGEST_JOB = 'create-digest' - -export interface CreateDigestJobData { - userId: string -} - -export interface CreateDigestJobResponse { - jobId: string -} - -export const processCreateDigestJob = async (data: CreateDigestJobData) => { - logger.info('processing create digest job', data) - - // simulate long running task - await new Promise((resolve) => setTimeout(resolve, 5000)) - - logger.info('digest created') - - return true -} diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index 86712a67b..dac1f74d9 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -18,7 +18,6 @@ import { TaskState } from './generated/graphql' import { aiSummarize, AI_SUMMARIZE_JOB_NAME } from './jobs/ai-summarize' import { bulkAction, BULK_ACTION_JOB_NAME } from './jobs/bulk_action' import { callWebhook, CALL_WEBHOOK_JOB_NAME } from './jobs/call_webhook' -import { CREATE_DIGEST_JOB, processCreateDigestJob } from './jobs/create_digest' import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail' import { exportAllItems, @@ -66,18 +65,17 @@ import { } from './jobs/email/inbound_emails' export const QUEUE_NAME = 'omnivore-backend-queue' +export const LONG_RUNNING_QUEUE_NAME = 'omnivore-backend-long-running-queue' export const JOB_VERSION = 'v001' -let backendQueue: Queue | undefined -export const getBackendQueue = async (): Promise => { - if (backendQueue) { - await backendQueue.waitUntilReady() - return backendQueue - } +export const getBackendQueue = async ( + name = QUEUE_NAME +): Promise => { if (!redisDataSource.workerRedisClient) { throw new Error('Can not create queues, redis is not initialized') } - backendQueue = new Queue(QUEUE_NAME, { + + const backendQueue = new Queue(name, { connection: redisDataSource.workerRedisClient, defaultJobOptions: { backoff: { @@ -99,8 +97,8 @@ export const getBackendQueue = async (): Promise => { export const createJobId = (jobName: string, userId: string) => `${jobName}_${userId}_${JOB_VERSION}` -export const getJob = async (jobId: string) => { - const queue = await getBackendQueue() +export const getJob = async (jobId: string, queueName?: string) => { + const queue = await getBackendQueue(queueName) if (!queue) { return } @@ -182,8 +180,6 @@ export const createWorker = (connection: ConnectionOptions) => return saveNewsletterJob(job.data) case FORWARD_EMAIL_JOB: return forwardEmailJob(job.data) - case CREATE_DIGEST_JOB: - return processCreateDigestJob(job.data) default: logger.warning(`[queue-processor] unhandled job: ${job.name}`) } diff --git a/packages/api/src/routers/digest_router.ts b/packages/api/src/routers/digest_router.ts index 7f9ab327b..122097c09 100644 --- a/packages/api/src/routers/digest_router.ts +++ b/packages/api/src/routers/digest_router.ts @@ -1,11 +1,18 @@ import cors from 'cors' import express from 'express' -import { TaskState } from '../generated/graphql' -import { getDigest, setDigest } from '../services/digest' +import { env } from '../env' +import { + createJobId, + getJob, + jobStateToTaskState, + LONG_RUNNING_QUEUE_NAME, +} from '../queue-processor' +import { CREATE_DIGEST_JOB, getDigest } from '../services/digest' import { findActiveUser } from '../services/user' import { analytics } from '../utils/analytics' import { getClaimsByToken, getTokenByRequest } from '../utils/auth' import { corsConfig } from '../utils/corsConfig' +import { enqueueCreateDigest } from '../utils/createTask' import { logger } from '../utils/logger' interface Feedback { @@ -59,24 +66,23 @@ export function digestRouter() { return res.sendStatus(401) } - // check if digest job is running + // check if job is already in queue // if yes then return 202 accepted // else enqueue job - const digest = await getDigest(userId) - if (digest?.jobState === TaskState.Running) { - logger.info(`job is running: ${userId}`) + const jobId = createJobId(CREATE_DIGEST_JOB, userId) + const existingJob = await getJob(jobId, LONG_RUNNING_QUEUE_NAME) + if (existingJob) { + logger.info(`Job already in queue: ${jobId}`) return res.sendStatus(202) } // enqueue job and return job id - const jobId = await setDigest(userId, { - jobState: TaskState.Running, + const result = await enqueueCreateDigest({ + userId, }) // return job id - return res.status(201).send({ - jobId, - }) + return res.status(201).send(result) } catch (error) { logger.error('Error while enqueuing create digest task', error) return res.sendStatus(500) @@ -110,20 +116,25 @@ export function digestRouter() { return res.sendStatus(401) } - // get digest from redis + // get job by user id + const jobId = createJobId(CREATE_DIGEST_JOB, userId) + const job = await getJob(jobId, LONG_RUNNING_QUEUE_NAME) + if (job) { + // if job is in queue then return job state + const jobState = await job.getState() + return res.send({ + jobId: job.id, + jobState: jobStateToTaskState(jobState), + }) + } + + // if job is done and removed then get the digest from redis const digest = await getDigest(userId) if (!digest) { logger.info(`Digest not found: ${userId}`) return res.sendStatus(404) } - if (digest.jobState === TaskState.Running) { - // if job is in queue then return job state - return res.status(202).send({ - jobState: TaskState.Running, - }) - } - // return digest return res.send(digest) } catch (error) { @@ -177,7 +188,11 @@ export function digestRouter() { analytics.capture({ distinctId: userId, event: 'digest_feedback', - properties: feedback, + properties: { + ...feedback, + env: env.server.apiEnv, + userId, + }, }) // return success diff --git a/packages/api/src/services/digest.ts b/packages/api/src/services/digest.ts index 41dac920f..de2ca2909 100644 --- a/packages/api/src/services/digest.ts +++ b/packages/api/src/services/digest.ts @@ -1,5 +1,15 @@ import { redisDataSource } from '../redis_data_source' +export const CREATE_DIGEST_JOB = 'create-digest' + +export interface CreateDigestJobData { + userId: string +} + +export interface CreateDigestJobResponse { + jobId: string +} + export interface Digest { url?: string title?: string @@ -19,22 +29,3 @@ export const getDigest = async (userId: string): Promise => { const digest = await redisDataSource.redisClient?.get(digestKey(userId)) return digest ? (JSON.parse(digest) as Digest) : null } - -export const setDigest = async ( - userId: string, - digest: Digest -): Promise => { - const key = digestKey(userId) - const result = await redisDataSource.redisClient?.set( - digestKey(userId), - JSON.stringify(digest), - 'EX', - 60 * 60 * 24 // 1 day - ) - - if (result != 'OK') { - throw new Error('Failed to set digest') - } - - return key -} diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index 7a8a2d23b..d64471252 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -18,11 +18,6 @@ import { import { AISummarizeJobData, AI_SUMMARIZE_JOB_NAME } from '../jobs/ai-summarize' import { BulkActionData, BULK_ACTION_JOB_NAME } from '../jobs/bulk_action' import { CallWebhookJobData, CALL_WEBHOOK_JOB_NAME } from '../jobs/call_webhook' -import { - CreateDigestJobData, - CreateDigestJobResponse, - CREATE_DIGEST_JOB, -} from '../jobs/create_digest' import { THUMBNAIL_JOB } from '../jobs/find_thumbnail' import { EXPORT_ALL_ITEMS_JOB_NAME } from '../jobs/integration/export_all_items' import { @@ -49,8 +44,18 @@ import { UPDATE_HIGHLIGHT_JOB, UPDATE_LABELS_JOB, } from '../jobs/update_db' -import { createJobId, getBackendQueue, JOB_VERSION } from '../queue-processor' +import { + createJobId, + getBackendQueue, + JOB_VERSION, + LONG_RUNNING_QUEUE_NAME, +} from '../queue-processor' import { redisDataSource } from '../redis_data_source' +import { + CreateDigestJobData, + CreateDigestJobResponse, + CREATE_DIGEST_JOB, +} from '../services/digest' import { signFeatureToken } from '../services/features' import { OmnivoreAuthorizationHeader } from './auth' import { CreateTaskError } from './errors' @@ -86,8 +91,6 @@ export const getJobPriority = (jobName: string): number => { case `${REFRESH_FEED_JOB_NAME}_high`: return 10 case PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME: - case CREATE_DIGEST_JOB: - return 20 case `${REFRESH_FEED_JOB_NAME}_low`: case EXPORT_ITEM_JOB_NAME: return 50 @@ -862,7 +865,7 @@ export const enqueueSendEmail = async (jobData: SendEmailJobData) => { export const enqueueCreateDigest = async ( data: CreateDigestJobData ): Promise => { - const queue = await getBackendQueue() + const queue = await getBackendQueue(LONG_RUNNING_QUEUE_NAME) if (!queue) { throw new Error('No queue found') } @@ -873,7 +876,6 @@ export const enqueueCreateDigest = async ( removeOnComplete: true, removeOnFail: true, attempts: 3, - priority: getJobPriority(CREATE_DIGEST_JOB), }) logger.info('create digest job enqueued', { jobId: job.id })