From f94267ee1a9510776e1e2623dd76a7fe9b59e15d Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Thu, 22 Feb 2024 13:38:51 +0800 Subject: [PATCH] enqueue export job --- packages/api/src/jobs/export_item.ts | 17 ++++++--- packages/api/src/pubsub.ts | 44 ++++++++++++++++++----- packages/api/src/queue-processor.ts | 3 ++ packages/api/src/services/highlights.ts | 2 +- packages/api/src/services/labels.ts | 23 +++++++----- packages/api/src/services/library_item.ts | 2 ++ packages/api/src/utils/createTask.ts | 14 ++++++++ 7 files changed, 82 insertions(+), 23 deletions(-) diff --git a/packages/api/src/jobs/export_item.ts b/packages/api/src/jobs/export_item.ts index add1f1061..b2e13ea35 100644 --- a/packages/api/src/jobs/export_item.ts +++ b/packages/api/src/jobs/export_item.ts @@ -1,21 +1,30 @@ import { IntegrationType } from '../entity/integration' -import { LibraryItem } from '../entity/library_item' import { findIntegrations, getIntegrationClient, updateIntegration, } from '../services/integrations' +import { findLibraryItemById } from '../services/library_item' import { logger } from '../utils/logger' export interface ExportItemJobData { userId: string - libraryItem: LibraryItem + libraryItemId: string } export const EXPORT_ITEM_JOB_NAME = 'export-item' export const exportItem = async (jobData: ExportItemJobData) => { - const { libraryItem, userId } = jobData + const { libraryItemId, userId } = jobData + const libraryItem = await findLibraryItemById(libraryItemId, userId) + if (!libraryItem) { + logger.error('library item not found', { + userId, + libraryItemId, + }) + return + } + const integrations = await findIntegrations(userId, { enabled: true, type: IntegrationType.Export, @@ -29,7 +38,7 @@ export const exportItem = async (jobData: ExportItemJobData) => { integrations.map(async (integration) => { const logObject = { userId, - libraryItemId: libraryItem.id, + libraryItemId, integrationId: integration.id, } logger.info('exporting item...', logObject) diff --git a/packages/api/src/pubsub.ts b/packages/api/src/pubsub.ts index 2f5abf639..723fb7947 100644 --- a/packages/api/src/pubsub.ts +++ b/packages/api/src/pubsub.ts @@ -3,7 +3,12 @@ import express from 'express' import { RuleEventType } from './entity/rule' import { env } from './env' import { ReportType } from './generated/graphql' -import { enqueueTriggerRuleJob, enqueueWebhookJob } from './utils/createTask' +import { Merge } from './util' +import { + enqueueExportItem, + enqueueTriggerRuleJob, + enqueueWebhookJob, +} from './utils/createTask' import { deepDelete } from './utils/helpers' import { buildLogger } from './utils/logger' @@ -11,6 +16,8 @@ const logger = buildLogger('pubsub') const client = new PubSub() +type EntityData = Merge + export const createPubSubClient = (): PubsubClient => { const fieldsToDelete = ['user'] as const @@ -45,12 +52,12 @@ export const createPubSubClient = (): PubsubClient => { }, entityCreated: async ( type: EntityType, - data: T, + data: EntityData, userId: string ): Promise => { + const libraryItemId = data.libraryItemId // queue trigger rule job if (type === EntityType.PAGE) { - const libraryItemId = (data as T & { id: string }).id await enqueueTriggerRuleJob({ userId, ruleEventType: RuleEventType.PageCreated, @@ -58,8 +65,13 @@ export const createPubSubClient = (): PubsubClient => { }) } + await enqueueExportItem({ + userId, + libraryItemId, + }) + const cleanData = deepDelete( - data as T & Record, + data as EntityData & Record, [...fieldsToDelete] ) @@ -77,12 +89,13 @@ export const createPubSubClient = (): PubsubClient => { }, entityUpdated: async ( type: EntityType, - data: T, + data: EntityData, userId: string ): Promise => { + const libraryItemId = data.libraryItemId + // queue trigger rule job if (type === EntityType.PAGE) { - const libraryItemId = (data as T & { id: string }).id await enqueueTriggerRuleJob({ userId, ruleEventType: RuleEventType.PageUpdated, @@ -90,8 +103,13 @@ export const createPubSubClient = (): PubsubClient => { }) } + await enqueueExportItem({ + userId, + libraryItemId, + }) + const cleanData = deepDelete( - data as T & Record, + data as EntityData & Record, [...fieldsToDelete] ) @@ -146,8 +164,16 @@ export interface PubsubClient { name: string, username: string ) => Promise - entityCreated: (type: EntityType, data: T, userId: string) => Promise - entityUpdated: (type: EntityType, data: T, userId: string) => Promise + entityCreated: ( + type: EntityType, + data: EntityData, + userId: string + ) => Promise + entityUpdated: ( + type: EntityType, + data: EntityData, + userId: string + ) => Promise entityDeleted: (type: EntityType, id: string, userId: string) => Promise reportSubmitted( submitterId: string | undefined, diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index 8d0a9c7f4..f1b5908f9 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -15,6 +15,7 @@ import { appDataSource } from './data_source' import { env } from './env' import { bulkAction, BULK_ACTION_JOB_NAME } from './jobs/bulk_action' import { callWebhook, CALL_WEBHOOK_JOB_NAME } from './jobs/call_webhook' +import { exportItem, EXPORT_ITEM_JOB_NAME } from './jobs/export_item' import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail' import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds' import { refreshFeed } from './jobs/rss/refreshFeed' @@ -103,6 +104,8 @@ export const createWorker = (connection: ConnectionOptions) => return bulkAction(job.data) case CALL_WEBHOOK_JOB_NAME: return callWebhook(job.data) + case EXPORT_ITEM_JOB_NAME: + return exportItem(job.data) } }, { diff --git a/packages/api/src/services/highlights.ts b/packages/api/src/services/highlights.ts index ac6b08f0c..5883fb7cb 100644 --- a/packages/api/src/services/highlights.ts +++ b/packages/api/src/services/highlights.ts @@ -139,7 +139,7 @@ export const updateHighlight = async ( const libraryItemId = updatedHighlight.libraryItem.id await pubsub.entityUpdated( EntityType.HIGHLIGHT, - { ...highlight, id: highlightId, pageId: libraryItemId }, + { ...highlight, id: highlightId, pageId: libraryItemId, libraryItemId }, userId ) diff --git a/packages/api/src/services/labels.ts b/packages/api/src/services/labels.ts index 1bf770a6d..285338745 100644 --- a/packages/api/src/services/labels.ts +++ b/packages/api/src/services/labels.ts @@ -6,15 +6,18 @@ import { createPubSubClient, EntityType, PubsubClient } from '../pubsub' import { authTrx } from '../repository' import { CreateLabelInput, labelRepository } from '../repository/label' import { bulkEnqueueUpdateLabels } from '../utils/createTask' +import { logger } from '../utils/logger' import { findHighlightById } from './highlights' import { findLibraryItemIdsByLabelId } from './library_item' type AddLabelsToLibraryItemEvent = { + libraryItemId: string pageId: string labels: DeepPartial