From 485caec803661ff33e9d5736931d42c08e830716 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Wed, 7 Feb 2024 18:42:10 +0800 Subject: [PATCH] append job version to the job id --- packages/api/src/jobs/rss/refreshAllFeeds.ts | 4 ++-- packages/api/src/queue-processor.ts | 1 + packages/api/src/utils/createTask.ts | 8 ++++---- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/packages/api/src/jobs/rss/refreshAllFeeds.ts b/packages/api/src/jobs/rss/refreshAllFeeds.ts index 8f0f69ad1..00c39c1d1 100644 --- a/packages/api/src/jobs/rss/refreshAllFeeds.ts +++ b/packages/api/src/jobs/rss/refreshAllFeeds.ts @@ -1,7 +1,7 @@ import { Job } from 'bullmq' import { DataSource } from 'typeorm' import { v4 as uuid } from 'uuid' -import { getBackendQueue } from '../../queue-processor' +import { getBackendQueue, JOB_VERSION } from '../../queue-processor' import { validateUrl } from '../../services/create_page_save_request' import { RssSubscriptionGroup } from '../../utils/createTask' import { stringToHash } from '../../utils/helpers' @@ -136,7 +136,7 @@ export const queueRSSRefreshFeedJob = async ( return undefined } return queue.add('refresh-feed', payload, { - jobId: jobid, + jobId: `${jobid}_${JOB_VERSION}`, priority: options.priority == 'low' ? 10 : 50, removeOnComplete: true, removeOnFail: true, diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index d8dc2af3a..cb5598ece 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -37,6 +37,7 @@ import { CACHED_READING_POSITION_PREFIX } from './services/cached_reading_positi import { CustomTypeOrmLogger, logger } from './utils/logger' export const QUEUE_NAME = 'omnivore-backend-queue' +export const JOB_VERSION = 'v001' let backendQueue: Queue | undefined export const getBackendQueue = async (): Promise => { diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index 27d3ec3ee..e4f2f6ab1 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -25,7 +25,7 @@ import { UPDATE_HIGHLIGHT_JOB, UPDATE_LABELS_JOB, } from '../jobs/update_db' -import { getBackendQueue } from '../queue-processor' +import { getBackendQueue, JOB_VERSION } from '../queue-processor' import { redisDataSource } from '../redis_data_source' import { signFeatureToken } from '../services/features' import { OmnivoreAuthorizationHeader } from './auth' @@ -691,7 +691,7 @@ export const bulkEnqueueUpdateLabels = async (data: UpdateLabelsData[]) => { name: UPDATE_LABELS_JOB, data: d, opts: { - jobId: `${UPDATE_LABELS_JOB}_${d.libraryItemId}`, + jobId: `${UPDATE_LABELS_JOB}_${d.libraryItemId}_${JOB_VERSION}`, attempts: 6, priority: 1, removeOnComplete: true, @@ -715,7 +715,7 @@ export const enqueueUpdateHighlight = async (data: UpdateHighlightData) => { try { return queue.add(UPDATE_HIGHLIGHT_JOB, data, { - jobId: `${UPDATE_HIGHLIGHT_JOB}_${data.libraryItemId}`, + jobId: `${UPDATE_HIGHLIGHT_JOB}_${data.libraryItemId}_${JOB_VERSION}`, attempts: 6, priority: 1, removeOnComplete: true, @@ -732,7 +732,7 @@ export const enqueueBulkAction = async (data: BulkActionData) => { return undefined } - const jobId = `${BULK_ACTION_JOB_NAME}-${data.userId}` + const jobId = `${BULK_ACTION_JOB_NAME}_${data.userId}_${JOB_VERSION}` try { return queue.add(BULK_ACTION_JOB_NAME, data, {