From d589c18b68d5ded8eb8dd29bdac80f56977b5430 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Tue, 30 Apr 2024 12:43:24 +0800 Subject: [PATCH] generate a unique id for each scheduled digest to avoid duplication --- packages/api/src/jobs/ai/create_digest.ts | 15 +++++++++------ packages/api/src/routers/digest_router.ts | 7 ++++--- packages/api/src/utils/createTask.ts | 7 ++++++- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/packages/api/src/jobs/ai/create_digest.ts b/packages/api/src/jobs/ai/create_digest.ts index 4ab46c679..bd2c29497 100644 --- a/packages/api/src/jobs/ai/create_digest.ts +++ b/packages/api/src/jobs/ai/create_digest.ts @@ -9,6 +9,7 @@ import { } from '@omnivore/text-to-speech-handler' import axios from 'axios' import showdown from 'showdown' +import { v4 as uuid } from 'uuid' import yaml from 'yaml' import { LibraryItem } from '../../entity/library_item' import { TaskState } from '../../generated/graphql' @@ -27,7 +28,7 @@ import { sendMulticastPushNotifications } from '../../utils/sendNotification' export type CreateDigestJobSchedule = 'daily' | 'weekly' export interface CreateDigestData { - id: string + id?: string userId: string voices?: string[] language?: string @@ -489,9 +490,11 @@ const generateByline = (summaries: RankedItem[]): string => .join(', ') export const createDigest = async (jobData: CreateDigestData) => { - try { - console.time('createDigestJob') + console.time('createDigestJob') + // generate a unique id for the digest if not provided for scheduled jobs + const digestId = jobData.id ?? uuid() + try { digestDefinition = await fetchDigestDefinition() const candidates = await getCandidatesList( @@ -501,7 +504,7 @@ export const createDigest = async (jobData: CreateDigestData) => { if (candidates.length === 0) { logger.info('No candidates found') return writeDigest(jobData.userId, { - id: jobData.id, + id: digestId, jobState: TaskState.Succeeded, title: 'No articles found', }) @@ -528,7 +531,7 @@ export const createDigest = async (jobData: CreateDigestData) => { }) const title = generateTitle(summaries) const digest: Digest = { - id: jobData.id, + id: digestId, title, content: generateContent(summaries), jobState: TaskState.Succeeded, @@ -552,7 +555,7 @@ export const createDigest = async (jobData: CreateDigestData) => { logger.error('createDigestJob error', error) await writeDigest(jobData.userId, { - id: jobData.id, + id: digestId, jobState: TaskState.Failed, }) } finally { diff --git a/packages/api/src/routers/digest_router.ts b/packages/api/src/routers/digest_router.ts index 5f7a38fe8..6cbdd1e1d 100644 --- a/packages/api/src/routers/digest_router.ts +++ b/packages/api/src/routers/digest_router.ts @@ -1,6 +1,5 @@ import cors from 'cors' import express from 'express' -import { v4 as uuid } from 'uuid' import { env } from '../env' import { TaskState } from '../generated/graphql' import { CreateDigestJobSchedule } from '../jobs/ai/create_digest' @@ -89,9 +88,11 @@ export function digestRouter() { // enqueue job and return job id const result = await enqueueCreateDigest( { - id: uuid(), // generate job id userId, - ...data, + voices: data.voices, + language: data.language, + rate: data.rate, + libraryItemIds: data.libraryItemIds, }, data.schedule ) diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index 2383df98c..d74f2ae10 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -864,6 +864,8 @@ export const enqueueCreateDigest = async ( throw new Error('No queue found') } + // generate unique id for the digest + data.id = uuid() // enqueue create digest job immediately const jobId = `${CREATE_DIGEST_JOB}_${data.userId}` const job = await queue.add(CREATE_DIGEST_JOB, data, { @@ -890,9 +892,9 @@ export const enqueueCreateDigest = async ( await writeDigest(data.userId, digest) if (schedule) { + // remove existing repeated job if any await Promise.all( Object.keys(CRON_PATTERNS).map(async (key) => { - // remove existing repeated job if any const isDeleted = await queue.removeRepeatable( CREATE_DIGEST_JOB, { @@ -909,6 +911,9 @@ export const enqueueCreateDigest = async ( ) // schedule repeated job + // delete the digest id to avoid duplication + delete data.id + const job = await queue.add(CREATE_DIGEST_JOB, data, { attempts: 1, priority: getJobPriority(CREATE_DIGEST_JOB),