From 41b9fef3163566a51811badfac6149bb5e467dc9 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Mon, 27 May 2024 11:45:51 +0800 Subject: [PATCH] score library item after saved --- packages/api/src/pubsub.ts | 6 ++++++ packages/api/src/queue-processor.ts | 6 ++++++ packages/api/src/utils/createTask.ts | 22 ++++++++++++++++++++++ 3 files changed, 34 insertions(+) diff --git a/packages/api/src/pubsub.ts b/packages/api/src/pubsub.ts index fea9139b7..3e6fea7fd 100644 --- a/packages/api/src/pubsub.ts +++ b/packages/api/src/pubsub.ts @@ -5,6 +5,7 @@ import { env } from './env' import { ReportType } from './generated/graphql' import { enqueueProcessYouTubeVideo, + enqueueScoreJob, enqueueTriggerRuleJob, } from './utils/createTask' import { logger } from './utils/logger' @@ -74,6 +75,11 @@ export const createPubSubClient = (): PubsubClient => { libraryItemId: data.id, }) } + + await enqueueScoreJob({ + userId, + libraryItemId: data.id, + }) } }, entityUpdated: async ( diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index edb634e70..ea7a8fe16 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -48,6 +48,10 @@ import { import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds' import { refreshFeed } from './jobs/rss/refreshFeed' import { savePageJob } from './jobs/save_page' +import { + scoreLibraryItem, + SCORE_LIBRARY_ITEM_JOB, +} from './jobs/score_library_item' import { syncReadPositionsJob, SYNC_READ_POSITIONS_JOB_NAME, @@ -191,6 +195,8 @@ export const createWorker = (connection: ConnectionOptions) => return uploadContentJob(job.data) case UPDATE_JUST_READ_FEED_JOB: return updateJustReadFeed(job.data) + case SCORE_LIBRARY_ITEM_JOB: + return scoreLibraryItem(job.data) default: logger.warning(`[queue-processor] unhandled job: ${job.name}`) } diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index 3f4f12061..7b8342aad 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -45,6 +45,10 @@ import { REFRESH_ALL_FEEDS_JOB_NAME, REFRESH_FEED_JOB_NAME, } from '../jobs/rss/refreshAllFeeds' +import { + ScoreLibraryItemJobData, + SCORE_LIBRARY_ITEM_JOB, +} from '../jobs/score_library_item' import { SYNC_READ_POSITIONS_JOB_NAME } from '../jobs/sync_read_positions' import { TriggerRuleJobData, TRIGGER_RULE_JOB_NAME } from '../jobs/trigger_rule' import { @@ -1006,4 +1010,22 @@ export const enqueueUpdateJustReadFeed = async ( }) } +export const updateScoreJobId = (userId: string) => + `${SCORE_LIBRARY_ITEM_JOB}_${userId}_${JOB_VERSION}` + +export const enqueueScoreJob = async (data: ScoreLibraryItemJobData) => { + const queue = await getBackendQueue() + if (!queue) { + return undefined + } + + return queue.add(SCORE_LIBRARY_ITEM_JOB, data, { + jobId: updateScoreJobId(data.userId), + removeOnComplete: true, + removeOnFail: true, + priority: getJobPriority(SCORE_LIBRARY_ITEM_JOB), + attempts: 3, + }) +} + export default createHttpTaskWithToken