From ddddf8234ec37b8699e3559966740499bab340da Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Tue, 30 Jan 2024 23:53:05 +0800 Subject: [PATCH 1/8] create a job for db trigger --- packages/api/src/jobs/update_db.ts | 31 ++++++++++++++++++++++++++++ packages/api/src/queue-processor.ts | 10 +++++++-- packages/api/src/services/labels.ts | 6 ++++++ packages/api/src/utils/createTask.ts | 17 +++++++++++++++ 4 files changed, 62 insertions(+), 2 deletions(-) create mode 100644 packages/api/src/jobs/update_db.ts diff --git a/packages/api/src/jobs/update_db.ts b/packages/api/src/jobs/update_db.ts new file mode 100644 index 000000000..096111606 --- /dev/null +++ b/packages/api/src/jobs/update_db.ts @@ -0,0 +1,31 @@ +import { authTrx } from '../repository' + +export const UPDATE_LABELS_IN_LIBRARY_ITEM_JOB = 'update-labels-in-library-item' + +export interface UpdateLabelsInLibraryItemData { + libraryItemId: string + userId: string +} + +export const updateLabelsInLibraryItem = async ( + data: UpdateLabelsInLibraryItemData +) => { + return authTrx( + async (tx) => + tx.query( + `WITH labels_agg AS ( + SELECT array_agg(l.name) AS names_agg + FROM omnivore.labels l + INNER JOIN omnivore.entity_labels el ON el.label_id = l.id + LEFT JOIN omnivore.highlight h ON h.id = el.highlight_id + WHERE el.library_item_id = $1 OR h.library_item_id = $1 + ) + UPDATE omnivore.library_item li + SET label_names = COALESCE((SELECT names_agg FROM labels_agg), ARRAY[]::TEXT[]) + WHERE li.id = $1;`, + [data.libraryItemId] + ), + undefined, + data.userId + ) +} diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index cd89e1c69..c2379c432 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -2,7 +2,7 @@ /* eslint-disable @typescript-eslint/restrict-template-expressions */ /* eslint-disable @typescript-eslint/require-await */ /* eslint-disable @typescript-eslint/no-misused-promises */ -import { Job, Queue, QueueEvents, Worker, JobType } from 'bullmq' +import { Job, JobType, Queue, QueueEvents, Worker } from 'bullmq' import express, { Express } from 'express' import { SnakeNamingStrategy } from 'typeorm-naming-strategies' import { appDataSource } from './data_source' @@ -11,10 +11,14 @@ import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail' import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds' import { refreshFeed } from './jobs/rss/refreshFeed' import { savePageJob } from './jobs/save_page' +import { triggerRule, TRIGGER_RULE_JOB_NAME } from './jobs/trigger_rule' +import { + updateLabelsInLibraryItem, + UPDATE_LABELS_IN_LIBRARY_ITEM_JOB, +} from './jobs/update_db' import { updatePDFContentJob } from './jobs/update_pdf_content' import { redisDataSource } from './redis_data_source' import { CustomTypeOrmLogger } from './utils/logger' -import { triggerRule, TRIGGER_RULE_JOB_NAME } from './jobs/trigger_rule' export const QUEUE_NAME = 'omnivore-backend-queue' @@ -125,6 +129,8 @@ const main = async () => { return findThumbnail(job.data) case TRIGGER_RULE_JOB_NAME: return triggerRule(job.data) + case UPDATE_LABELS_IN_LIBRARY_ITEM_JOB: + return updateLabelsInLibraryItem(job.data) } }, { diff --git a/packages/api/src/services/labels.ts b/packages/api/src/services/labels.ts index 564a1e2da..03f9eec7b 100644 --- a/packages/api/src/services/labels.ts +++ b/packages/api/src/services/labels.ts @@ -5,6 +5,8 @@ import { Label } from '../entity/label' import { createPubSubClient, EntityType, PubsubClient } from '../pubsub' import { authTrx } from '../repository' import { CreateLabelInput, labelRepository } from '../repository/label' +import { enqueueUpdateLabelsInLibraryItem } from '../utils/createTask' +import { logger } from '../utils/logger' type AddLabelsToLibraryItemEvent = { pageId: string @@ -120,6 +122,10 @@ export const saveLabelsInLibraryItem = async ( userId ) } + + // update labels in library item + const job = await enqueueUpdateLabelsInLibraryItem({ libraryItemId, userId }) + logger.info('update labels in library item job created', job) } export const addLabelsToLibraryItem = async ( diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index 4e3ed31ce..5196f5d31 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -17,6 +17,10 @@ import { import { THUMBNAIL_JOB } from '../jobs/find_thumbnail' import { queueRSSRefreshFeedJob } from '../jobs/rss/refreshAllFeeds' import { TriggerRuleJobData, TRIGGER_RULE_JOB_NAME } from '../jobs/trigger_rule' +import { + UpdateLabelsInLibraryItemData, + UPDATE_LABELS_IN_LIBRARY_ITEM_JOB, +} from '../jobs/update_db' import { getBackendQueue } from '../queue-processor' import { redisDataSource } from '../redis_data_source' import { signFeatureToken } from '../services/features' @@ -660,4 +664,17 @@ export const enqueueTriggerRuleJob = async (data: TriggerRuleJobData) => { }) } +export const enqueueUpdateLabelsInLibraryItem = async ( + data: UpdateLabelsInLibraryItemData +) => { + const queue = await getBackendQueue() + if (!queue) { + return undefined + } + + return queue.add(UPDATE_LABELS_IN_LIBRARY_ITEM_JOB, data, { + priority: 1, + }) +} + export default createHttpTaskWithToken From 5ed35bbeff5dfd985f5573698881c6fa260de02b Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Wed, 31 Jan 2024 13:32:37 +0800 Subject: [PATCH 2/8] create a job for update labels in library item --- packages/api/src/jobs/update_db.ts | 10 ++--- packages/api/src/queue-processor.ts | 9 ++--- packages/api/src/resolvers/labels/index.ts | 12 +++--- packages/api/src/services/labels.ts | 47 ++++++++++++++++++++-- packages/api/src/services/library_item.ts | 33 +++++++++++++++ packages/api/src/utils/createTask.ts | 25 +++++++----- 6 files changed, 103 insertions(+), 33 deletions(-) diff --git a/packages/api/src/jobs/update_db.ts b/packages/api/src/jobs/update_db.ts index 096111606..03e670b25 100644 --- a/packages/api/src/jobs/update_db.ts +++ b/packages/api/src/jobs/update_db.ts @@ -1,20 +1,18 @@ import { authTrx } from '../repository' -export const UPDATE_LABELS_IN_LIBRARY_ITEM_JOB = 'update-labels-in-library-item' +export const UPDATE_LABELS_JOB = 'update-labels' -export interface UpdateLabelsInLibraryItemData { +export interface UpdateLabelsData { libraryItemId: string userId: string } -export const updateLabelsInLibraryItem = async ( - data: UpdateLabelsInLibraryItemData -) => { +export const updateLabels = async (data: UpdateLabelsData) => { return authTrx( async (tx) => tx.query( `WITH labels_agg AS ( - SELECT array_agg(l.name) AS names_agg + SELECT array_agg(DISTINCT l.name) AS names_agg FROM omnivore.labels l INNER JOIN omnivore.entity_labels el ON el.label_id = l.id LEFT JOIN omnivore.highlight h ON h.id = el.highlight_id diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index c2379c432..f4458e53e 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -12,10 +12,7 @@ import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds' import { refreshFeed } from './jobs/rss/refreshFeed' import { savePageJob } from './jobs/save_page' import { triggerRule, TRIGGER_RULE_JOB_NAME } from './jobs/trigger_rule' -import { - updateLabelsInLibraryItem, - UPDATE_LABELS_IN_LIBRARY_ITEM_JOB, -} from './jobs/update_db' +import { updateLabels, UPDATE_LABELS_JOB } from './jobs/update_db' import { updatePDFContentJob } from './jobs/update_pdf_content' import { redisDataSource } from './redis_data_source' import { CustomTypeOrmLogger } from './utils/logger' @@ -129,8 +126,8 @@ const main = async () => { return findThumbnail(job.data) case TRIGGER_RULE_JOB_NAME: return triggerRule(job.data) - case UPDATE_LABELS_IN_LIBRARY_ITEM_JOB: - return updateLabelsInLibraryItem(job.data) + case UPDATE_LABELS_JOB: + return updateLabels(job.data) } }, { diff --git a/packages/api/src/resolvers/labels/index.ts b/packages/api/src/resolvers/labels/index.ts index 6a6860940..cf9d6e3b6 100644 --- a/packages/api/src/resolvers/labels/index.ts +++ b/packages/api/src/resolvers/labels/index.ts @@ -31,6 +31,7 @@ import { import { labelRepository } from '../../repository/label' import { userRepository } from '../../repository/user' import { + deleteLabelById, findOrCreateLabels, saveLabelsInHighlight, saveLabelsInLibraryItem, @@ -118,13 +119,10 @@ export const deleteLabelResolver = authorized< DeleteLabelSuccess, DeleteLabelError, MutationDeleteLabelArgs ->(async (_, { id: labelId }, { authTrx, log, uid }) => { +>(async (_, { id: labelId }, { log, uid }) => { try { - const deleteResult = await authTrx(async (tx) => { - return tx.withRepository(labelRepository).deleteById(labelId) - }) - - if (!deleteResult.affected) { + const deleted = await deleteLabelById(labelId, uid) + if (!deleted) { return { errorCodes: [DeleteLabelErrorCode.NotFound], } @@ -281,7 +279,7 @@ export const setLabelsForHighlightResolver = authorized< } } - // save labels in the library item + // save labels in the highlight await saveLabelsInHighlight(labelsSet, input.highlightId, uid, pubsub) analytics.track({ diff --git a/packages/api/src/services/labels.ts b/packages/api/src/services/labels.ts index 03f9eec7b..d4d1aff16 100644 --- a/packages/api/src/services/labels.ts +++ b/packages/api/src/services/labels.ts @@ -5,8 +5,9 @@ import { Label } from '../entity/label' import { createPubSubClient, EntityType, PubsubClient } from '../pubsub' import { authTrx } from '../repository' import { CreateLabelInput, labelRepository } from '../repository/label' -import { enqueueUpdateLabelsInLibraryItem } from '../utils/createTask' +import { bulkEnqueueUpdateLabels } from '../utils/createTask' import { logger } from '../utils/logger' +import { findLibraryItemIdsByLabelId } from './library_item' type AddLabelsToLibraryItemEvent = { pageId: string @@ -124,8 +125,8 @@ export const saveLabelsInLibraryItem = async ( } // update labels in library item - const job = await enqueueUpdateLabelsInLibraryItem({ libraryItemId, userId }) - logger.info('update labels in library item job created', job) + const jobs = await bulkEnqueueUpdateLabels([{ libraryItemId, userId }]) + logger.info('update labels jobs enqueued', jobs) } export const addLabelsToLibraryItem = async ( @@ -151,6 +152,10 @@ export const addLabelsToLibraryItem = async ( undefined, userId ) + + // update labels in library item + const jobs = await bulkEnqueueUpdateLabels([{ libraryItemId, userId }]) + logger.info('update labels jobs enqueued', jobs) } export const saveLabelsInHighlight = async ( @@ -224,12 +229,33 @@ export const deleteLabels = async ( ) } +export const deleteLabelById = async (labelId: string, userId: string) => { + const libraryItemIds = await findLibraryItemIdsByLabelId(labelId, userId) + + const deleteResult = await authTrx(async (tx) => { + return tx.withRepository(labelRepository).deleteById(labelId) + }) + + if (!deleteResult.affected) { + return false + } + + const data = libraryItemIds.map((libraryItemId) => ({ + libraryItemId, + userId, + })) + const jobs = await bulkEnqueueUpdateLabels(data) + logger.info('update labels jobs enqueued', jobs) + + return true +} + export const updateLabel = async ( id: string, label: QueryDeepPartialEntity