diff --git a/packages/api/src/jobs/trigger_rule.ts b/packages/api/src/jobs/trigger_rule.ts index 3ba72fd4d..72d902a90 100644 --- a/packages/api/src/jobs/trigger_rule.ts +++ b/packages/api/src/jobs/trigger_rule.ts @@ -10,14 +10,10 @@ import { findEnabledRules } from '../services/rules' import { sendPushNotifications } from '../services/user' import { logger } from '../utils/logger' -interface Data { - id: string +export interface TriggerRuleJobData { + libraryItemId: string userId: string ruleEventType: RuleEventType - subscription: string - image: string - content: string - readingProgressPercent: number } interface RuleActionObj { @@ -26,6 +22,8 @@ interface RuleActionObj { libraryItem: LibraryItem } +export const TRIGGER_RULE_JOB_NAME = 'trigger-rule' + type RuleActionFunc = (obj: RuleActionObj) => Promise const addLabels = async (obj: RuleActionObj) => { @@ -43,7 +41,9 @@ const archivePage = async (obj: RuleActionObj) => { return updateLibraryItem( obj.libraryItem.id, { archivedAt: new Date(), state: LibraryItemState.Archived }, - obj.userId + obj.userId, + undefined, + true ) } @@ -55,7 +55,9 @@ const markPageAsRead = async (obj: RuleActionObj) => { readingProgressBottomPercent: 100, readAt: new Date(), }, - obj.userId + obj.userId, + undefined, + true ) } @@ -82,11 +84,15 @@ const getRuleAction = (actionType: RuleActionType): RuleActionFunc => { } } -const triggerActions = async (userId: string, rules: Rule[], data: Data) => { +const triggerActions = async ( + userId: string, + rules: Rule[], + data: TriggerRuleJobData +) => { const actionPromises: Promise[] = [] for (const rule of rules) { - const itemId = data.id + const itemId = data.libraryItemId const searchArgs: SearchArgs = { includeContent: false, includeDeleted: false, @@ -122,7 +128,7 @@ const triggerActions = async (userId: string, rules: Rule[], data: Data) => { } } -export const triggerRule = async (data: Data) => { +export const triggerRule = async (data: TriggerRuleJobData) => { const { userId, ruleEventType } = data // get rules by calling api diff --git a/packages/api/src/pubsub.ts b/packages/api/src/pubsub.ts index ea1bcf987..02c35f3d4 100644 --- a/packages/api/src/pubsub.ts +++ b/packages/api/src/pubsub.ts @@ -1,7 +1,9 @@ import { PubSub } from '@google-cloud/pubsub' import express from 'express' +import { RuleEventType } from './entity/rule' import { env } from './env' import { ReportType } from './generated/graphql' +import { enqueueTriggerRuleJob } from './utils/createTask' import { deepDelete } from './utils/helpers' import { buildLogger } from './utils/logger' @@ -41,11 +43,21 @@ export const createPubSubClient = (): PubsubClient => { Buffer.from(JSON.stringify({ userId, email, name, username })) ) }, - entityCreated: ( + entityCreated: async ( type: EntityType, data: T, userId: string ): Promise => { + // queue trigger rule job + if (type === EntityType.PAGE) { + const libraryItemId = (data as T & { id: string }).id + await enqueueTriggerRuleJob({ + userId, + ruleEventType: RuleEventType.PageCreated, + libraryItemId, + }) + } + const cleanData = deepDelete( data as T & Record, [...fieldsToDelete] @@ -56,11 +68,21 @@ export const createPubSubClient = (): PubsubClient => { Buffer.from(JSON.stringify({ type, userId, ...cleanData })) ) }, - entityUpdated: ( + entityUpdated: async ( type: EntityType, data: T, userId: string ): Promise => { + // queue trigger rule job + if (type === EntityType.PAGE) { + const libraryItemId = (data as T & { id: string }).id + await enqueueTriggerRuleJob({ + userId, + ruleEventType: RuleEventType.PageUpdated, + libraryItemId, + }) + } + const cleanData = deepDelete( data as T & Record, [...fieldsToDelete] diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index d99d79f10..cd89e1c69 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -14,6 +14,7 @@ import { savePageJob } from './jobs/save_page' import { updatePDFContentJob } from './jobs/update_pdf_content' import { redisDataSource } from './redis_data_source' import { CustomTypeOrmLogger } from './utils/logger' +import { triggerRule, TRIGGER_RULE_JOB_NAME } from './jobs/trigger_rule' export const QUEUE_NAME = 'omnivore-backend-queue' @@ -122,6 +123,8 @@ const main = async () => { } case THUMBNAIL_JOB: return findThumbnail(job.data) + case TRIGGER_RULE_JOB_NAME: + return triggerRule(job.data) } }, { diff --git a/packages/api/src/services/library_item.ts b/packages/api/src/services/library_item.ts index 11a7ee92a..73e9fde77 100644 --- a/packages/api/src/services/library_item.ts +++ b/packages/api/src/services/library_item.ts @@ -698,7 +698,8 @@ export const updateLibraryItem = async ( id: string, libraryItem: QueryDeepPartialEntity, userId: string, - pubsub = createPubSubClient() + pubsub = createPubSubClient(), + skipPubSub = false ): Promise => { const updatedLibraryItem = await authTrx( async (tx) => { @@ -726,6 +727,10 @@ export const updateLibraryItem = async ( userId ) + if (skipPubSub) { + return updatedLibraryItem + } + await pubsub.entityUpdated>( EntityType.PAGE, { diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index 94233a454..60e74da5d 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -17,6 +17,8 @@ import { import { THUMBNAIL_JOB } from '../jobs/find_thumbnail' import { queueRSSRefreshFeedJob } from '../jobs/rss/refreshAllFeeds' import { getBackendQueue } from '../queue-processor' +import { TriggerRuleJobData, TRIGGER_RULE_JOB_NAME } from '../jobs/trigger_rule' +import { getBackendQueue } from '../queue-processor' import { redisDataSource } from '../redis_data_source' import { signFeatureToken } from '../services/features' import { OmnivoreAuthorizationHeader } from './auth' @@ -648,4 +650,16 @@ export const enqueueRssFeedFetch = async ( } } +export const enqueueTriggerRuleJob = async (data: TriggerRuleJobData) => { + const queue = await getBackendQueue() + if (!queue) { + return undefined + } + + return queue.add(TRIGGER_RULE_JOB_NAME, data, { + removeOnComplete: true, + removeOnFail: true, + }) +} + export default createHttpTaskWithToken