enqueue trigger rule job once page is created or updated

This commit is contained in:
Hongbo Wu
2024-01-26 17:40:51 +08:00
parent 7556c6da5f
commit e03895c8b7
5 changed files with 64 additions and 14 deletions

View File

@ -10,14 +10,10 @@ import { findEnabledRules } from '../services/rules'
import { sendPushNotifications } from '../services/user'
import { logger } from '../utils/logger'
interface Data {
id: string
export interface TriggerRuleJobData {
libraryItemId: string
userId: string
ruleEventType: RuleEventType
subscription: string
image: string
content: string
readingProgressPercent: number
}
interface RuleActionObj {
@ -26,6 +22,8 @@ interface RuleActionObj {
libraryItem: LibraryItem
}
export const TRIGGER_RULE_JOB_NAME = 'trigger-rule'
type RuleActionFunc = (obj: RuleActionObj) => Promise<unknown>
const addLabels = async (obj: RuleActionObj) => {
@ -43,7 +41,9 @@ const archivePage = async (obj: RuleActionObj) => {
return updateLibraryItem(
obj.libraryItem.id,
{ archivedAt: new Date(), state: LibraryItemState.Archived },
obj.userId
obj.userId,
undefined,
true
)
}
@ -55,7 +55,9 @@ const markPageAsRead = async (obj: RuleActionObj) => {
readingProgressBottomPercent: 100,
readAt: new Date(),
},
obj.userId
obj.userId,
undefined,
true
)
}
@ -82,11 +84,15 @@ const getRuleAction = (actionType: RuleActionType): RuleActionFunc => {
}
}
const triggerActions = async (userId: string, rules: Rule[], data: Data) => {
const triggerActions = async (
userId: string,
rules: Rule[],
data: TriggerRuleJobData
) => {
const actionPromises: Promise<unknown>[] = []
for (const rule of rules) {
const itemId = data.id
const itemId = data.libraryItemId
const searchArgs: SearchArgs = {
includeContent: false,
includeDeleted: false,
@ -122,7 +128,7 @@ const triggerActions = async (userId: string, rules: Rule[], data: Data) => {
}
}
export const triggerRule = async (data: Data) => {
export const triggerRule = async (data: TriggerRuleJobData) => {
const { userId, ruleEventType } = data
// get rules by calling api

View File

@ -1,7 +1,9 @@
import { PubSub } from '@google-cloud/pubsub'
import express from 'express'
import { RuleEventType } from './entity/rule'
import { env } from './env'
import { ReportType } from './generated/graphql'
import { enqueueTriggerRuleJob } from './utils/createTask'
import { deepDelete } from './utils/helpers'
import { buildLogger } from './utils/logger'
@ -41,11 +43,21 @@ export const createPubSubClient = (): PubsubClient => {
Buffer.from(JSON.stringify({ userId, email, name, username }))
)
},
entityCreated: <T>(
entityCreated: async <T>(
type: EntityType,
data: T,
userId: string
): Promise<void> => {
// queue trigger rule job
if (type === EntityType.PAGE) {
const libraryItemId = (data as T & { id: string }).id
await enqueueTriggerRuleJob({
userId,
ruleEventType: RuleEventType.PageCreated,
libraryItemId,
})
}
const cleanData = deepDelete(
data as T & Record<typeof fieldsToDelete[number], unknown>,
[...fieldsToDelete]
@ -56,11 +68,21 @@ export const createPubSubClient = (): PubsubClient => {
Buffer.from(JSON.stringify({ type, userId, ...cleanData }))
)
},
entityUpdated: <T>(
entityUpdated: async <T>(
type: EntityType,
data: T,
userId: string
): Promise<void> => {
// queue trigger rule job
if (type === EntityType.PAGE) {
const libraryItemId = (data as T & { id: string }).id
await enqueueTriggerRuleJob({
userId,
ruleEventType: RuleEventType.PageUpdated,
libraryItemId,
})
}
const cleanData = deepDelete(
data as T & Record<typeof fieldsToDelete[number], unknown>,
[...fieldsToDelete]

View File

@ -14,6 +14,7 @@ import { savePageJob } from './jobs/save_page'
import { updatePDFContentJob } from './jobs/update_pdf_content'
import { redisDataSource } from './redis_data_source'
import { CustomTypeOrmLogger } from './utils/logger'
import { triggerRule, TRIGGER_RULE_JOB_NAME } from './jobs/trigger_rule'
export const QUEUE_NAME = 'omnivore-backend-queue'
@ -122,6 +123,8 @@ const main = async () => {
}
case THUMBNAIL_JOB:
return findThumbnail(job.data)
case TRIGGER_RULE_JOB_NAME:
return triggerRule(job.data)
}
},
{

View File

@ -698,7 +698,8 @@ export const updateLibraryItem = async (
id: string,
libraryItem: QueryDeepPartialEntity<LibraryItem>,
userId: string,
pubsub = createPubSubClient()
pubsub = createPubSubClient(),
skipPubSub = false
): Promise<LibraryItem> => {
const updatedLibraryItem = await authTrx(
async (tx) => {
@ -726,6 +727,10 @@ export const updateLibraryItem = async (
userId
)
if (skipPubSub) {
return updatedLibraryItem
}
await pubsub.entityUpdated<QueryDeepPartialEntity<LibraryItem>>(
EntityType.PAGE,
{

View File

@ -17,6 +17,8 @@ import {
import { THUMBNAIL_JOB } from '../jobs/find_thumbnail'
import { queueRSSRefreshFeedJob } from '../jobs/rss/refreshAllFeeds'
import { getBackendQueue } from '../queue-processor'
import { TriggerRuleJobData, TRIGGER_RULE_JOB_NAME } from '../jobs/trigger_rule'
import { getBackendQueue } from '../queue-processor'
import { redisDataSource } from '../redis_data_source'
import { signFeatureToken } from '../services/features'
import { OmnivoreAuthorizationHeader } from './auth'
@ -648,4 +650,16 @@ export const enqueueRssFeedFetch = async (
}
}
export const enqueueTriggerRuleJob = async (data: TriggerRuleJobData) => {
const queue = await getBackendQueue()
if (!queue) {
return undefined
}
return queue.add(TRIGGER_RULE_JOB_NAME, data, {
removeOnComplete: true,
removeOnFail: true,
})
}
export default createHttpTaskWithToken