diff --git a/packages/api/src/jobs/ai/create_digest.ts b/packages/api/src/jobs/ai/create_digest.ts index e424b679c..f5d636709 100644 --- a/packages/api/src/jobs/ai/create_digest.ts +++ b/packages/api/src/jobs/ai/create_digest.ts @@ -22,6 +22,7 @@ import { TaskState } from '../../generated/graphql' export type CreateDigestJobSchedule = 'daily' | 'weekly' export interface CreateDigestJobData { + id: string userId: string voices?: string[] language?: string @@ -30,6 +31,7 @@ export interface CreateDigestJobData { export interface CreateDigestJobResponse { jobId: string + jobState: TaskState } interface Selector { query: string @@ -385,7 +387,7 @@ export const createDigestJob = async (jobData: CreateDigestJobData) => { }) const title = generateTitle(summaries) const digest: Digest = { - id: uuid(), + id: jobData.id, title, content: generateContent(summaries), urlsToAudio: [], diff --git a/packages/api/src/routers/digest_router.ts b/packages/api/src/routers/digest_router.ts index 0bf492d0a..24f2bb103 100644 --- a/packages/api/src/routers/digest_router.ts +++ b/packages/api/src/routers/digest_router.ts @@ -1,11 +1,8 @@ import cors from 'cors' import express from 'express' import { env } from '../env' -import { - CreateDigestJobSchedule, - CREATE_DIGEST_JOB, -} from '../jobs/ai/create_digest' -import { createJobId, getJob, jobStateToTaskState } from '../queue-processor' +import { TaskState } from '../generated/graphql' +import { CreateDigestJobSchedule } from '../jobs/ai/create_digest' import { getDigest } from '../services/digest' import { FeatureName, findGrantedFeatureByName } from '../services/features' import { findActiveUser } from '../services/user' @@ -14,6 +11,7 @@ import { getClaimsByToken, getTokenByRequest } from '../utils/auth' import { corsConfig } from '../utils/corsConfig' import { enqueueCreateDigest } from '../utils/createTask' import { logger } from '../utils/logger' +import { v4 as uuid } from 'uuid' interface Feedback { digestRating: number @@ -82,21 +80,21 @@ export function digestRouter() { return res.sendStatus(403) } - // check if job is already in queue + const data = req.body as CreateDigestRequest + + // check if job is running // if yes then return 202 accepted // else enqueue job - const jobId = createJobId(CREATE_DIGEST_JOB, userId) - const existingJob = await getJob(jobId) - if (existingJob) { - logger.info(`Job already in queue: ${jobId}`) + const digest = await getDigest(userId) + if (digest?.jobState === TaskState.Running) { + logger.info(`Digest job is running: ${userId}`) return res.sendStatus(202) } - const data = req.body as CreateDigestRequest - // enqueue job and return job id const result = await enqueueCreateDigest( { + id: uuid(), // generate job id userId, ...data, }, @@ -147,26 +145,22 @@ export function digestRouter() { return res.sendStatus(403) } - // get job by user id - const jobId = createJobId(CREATE_DIGEST_JOB, userId) - const job = await getJob(jobId) - if (job) { - // if job is in queue then return job state - const jobState = await job.getState() - return res.send({ - jobId: job.id, - jobState: jobStateToTaskState(jobState), - }) - } - - // if job is done and removed then get the digest from redis + // get the digest from redis const digest = await getDigest(userId) if (!digest) { logger.info(`Digest not found: ${userId}`) return res.sendStatus(404) } - // return digest + if (digest.jobState === TaskState.Running) { + // if job is running then return job state + return res.send({ + jobId: digest.id, + jobState: digest.jobState, + }) + } + + // if job is done then return the digest return res.send(digest) } catch (error) { logger.error('Error while getting digest', error) diff --git a/packages/api/src/services/digest.ts b/packages/api/src/services/digest.ts index 24be606cc..811202efd 100644 --- a/packages/api/src/services/digest.ts +++ b/packages/api/src/services/digest.ts @@ -14,10 +14,10 @@ interface Chapter { export interface Digest { id: string jobState: TaskState - createdAt: Date - description: string - byline: string + createdAt?: Date + description?: string + byline?: string url?: string title?: string content?: string diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index 70306ff76..eeefd3692 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -14,6 +14,7 @@ import { env } from '../env' import { ArticleSavingRequestStatus, CreateLabelInput, + TaskState, } from '../generated/graphql' import { AISummarizeJobData, AI_SUMMARIZE_JOB_NAME } from '../jobs/ai-summarize' import { @@ -50,7 +51,7 @@ import { UPDATE_HIGHLIGHT_JOB, UPDATE_LABELS_JOB, } from '../jobs/update_db' -import { createJobId, getBackendQueue, JOB_VERSION } from '../queue-processor' +import { getBackendQueue, JOB_VERSION } from '../queue-processor' import { redisDataSource } from '../redis_data_source' import { signFeatureToken } from '../services/features' import { OmnivoreAuthorizationHeader } from './auth' @@ -58,6 +59,7 @@ import { CreateTaskError } from './errors' import { stringToHash } from './helpers' import { logger } from './logger' import View = google.cloud.tasks.v2.Task.View +import { writeDigest } from '../services/digest' // Instantiates a client. const client = new CloudTasksClient() @@ -868,9 +870,8 @@ export const enqueueCreateDigest = async ( throw new Error('No queue found') } - const jobId = createJobId(CREATE_DIGEST_JOB, data.userId) const job = await queue.add(CREATE_DIGEST_JOB, data, { - jobId, // dedupe by userId + jobId: data.id, // dedupe by job id removeOnComplete: true, removeOnFail: true, attempts: 3, @@ -886,8 +887,17 @@ export const enqueueCreateDigest = async ( logger.info('create digest job enqueued', { jobId: job.id }) + const digest = { + id: data.id, + jobState: TaskState.Running, + } + + // update digest job state in redis + await writeDigest(data.userId, digest) + return { - jobId, + jobId: digest.id, + jobState: digest.jobState, } }