From 7f441b4ff37225a303ea5aefced98b9701591abe Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Tue, 23 Apr 2024 21:44:25 +0800 Subject: [PATCH] dedupe save-page job --- packages/content-fetch/src/job.ts | 33 ++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/packages/content-fetch/src/job.ts b/packages/content-fetch/src/job.ts index 8d88472f9..88c263765 100644 --- a/packages/content-fetch/src/job.ts +++ b/packages/content-fetch/src/job.ts @@ -4,9 +4,24 @@ import { redisDataSource } from './redis_data_source' const QUEUE_NAME = 'omnivore-backend-queue' const JOB_NAME = 'save-page' -interface savePageJob { +interface SavePageJobData { userId: string - data: unknown + url: string + finalUrl: string + articleSavingRequestId: string + state?: string + labels?: string[] + source: string + folder?: string + rssFeedUrl?: string + savedAt?: string + publishedAt?: string + taskId?: string +} + +interface SavePageJob { + userId: string + data: SavePageJobData isRss: boolean isImport: boolean priority: 'low' | 'high' @@ -16,7 +31,7 @@ const queue = new Queue(QUEUE_NAME, { connection: redisDataSource.queueRedisClient, }) -const getPriority = (job: savePageJob): number => { +const getPriority = (job: SavePageJob): number => { // we want to prioritized jobs by the expected time to complete // lower number means higher priority // priority 1: jobs that are expected to finish immediately @@ -33,7 +48,7 @@ const getPriority = (job: savePageJob): number => { return job.priority === 'low' ? 10 : 1 } -const getAttempts = (job: savePageJob): number => { +const getAttempts = (job: SavePageJob): number => { if (job.isRss || job.isImport) { // we don't want to retry rss or import jobs return 1 @@ -42,11 +57,11 @@ const getAttempts = (job: savePageJob): number => { return 3 } -const getOpts = (job: savePageJob): BulkJobOptions => { +const getOpts = (job: SavePageJob): BulkJobOptions => { return { - // jobId: `${job.userId}-${job.url}`, - // removeOnComplete: true, - // removeOnFail: true, + jobId: `save-page_${job.userId}_${job.data.finalUrl}`, // make sure we don't have duplicate jobs + removeOnComplete: true, + removeOnFail: true, attempts: getAttempts(job), priority: getPriority(job), backoff: { @@ -56,7 +71,7 @@ const getOpts = (job: savePageJob): BulkJobOptions => { } } -export const queueSavePageJob = async (savePageJobs: savePageJob[]) => { +export const queueSavePageJob = async (savePageJobs: SavePageJob[]) => { const jobs = savePageJobs.map((job) => ({ name: JOB_NAME, data: job.data,