move trigger rule into a job
This commit is contained in:
138
packages/api/src/jobs/trigger_rule.ts
Normal file
138
packages/api/src/jobs/trigger_rule.ts
Normal file
@ -0,0 +1,138 @@
|
||||
import { LibraryItem, LibraryItemState } from '../entity/library_item'
|
||||
import { Rule, RuleAction, RuleActionType, RuleEventType } from '../entity/rule'
|
||||
import { addLabelsToLibraryItem } from '../services/labels'
|
||||
import {
|
||||
SearchArgs,
|
||||
searchLibraryItems,
|
||||
updateLibraryItem,
|
||||
} from '../services/library_item'
|
||||
import { findEnabledRules } from '../services/rules'
|
||||
import { sendPushNotifications } from '../services/user'
|
||||
import { logger } from '../utils/logger'
|
||||
|
||||
interface Data {
|
||||
id: string
|
||||
userId: string
|
||||
ruleEventType: RuleEventType
|
||||
subscription: string
|
||||
image: string
|
||||
content: string
|
||||
readingProgressPercent: number
|
||||
}
|
||||
|
||||
interface RuleActionObj {
|
||||
userId: string
|
||||
action: RuleAction
|
||||
libraryItem: LibraryItem
|
||||
}
|
||||
|
||||
type RuleActionFunc = (obj: RuleActionObj) => Promise<unknown>
|
||||
|
||||
const addLabels = async (obj: RuleActionObj) => {
|
||||
const labelIds = obj.action.params
|
||||
|
||||
return addLabelsToLibraryItem(
|
||||
labelIds,
|
||||
obj.libraryItem.id,
|
||||
obj.userId,
|
||||
'system'
|
||||
)
|
||||
}
|
||||
|
||||
const archivePage = async (obj: RuleActionObj) => {
|
||||
return updateLibraryItem(
|
||||
obj.libraryItem.id,
|
||||
{ archivedAt: new Date(), state: LibraryItemState.Archived },
|
||||
obj.userId
|
||||
)
|
||||
}
|
||||
|
||||
const markPageAsRead = async (obj: RuleActionObj) => {
|
||||
return updateLibraryItem(
|
||||
obj.libraryItem.id,
|
||||
{
|
||||
readingProgressTopPercent: 100,
|
||||
readingProgressBottomPercent: 100,
|
||||
readAt: new Date(),
|
||||
},
|
||||
obj.userId
|
||||
)
|
||||
}
|
||||
|
||||
const sendNotification = async (obj: RuleActionObj) => {
|
||||
const item = obj.libraryItem
|
||||
const message = {
|
||||
title: item.author || item.siteName || 'Omnivore',
|
||||
body: item.title,
|
||||
}
|
||||
|
||||
return sendPushNotifications(obj.userId, message, 'rule')
|
||||
}
|
||||
|
||||
const getRuleAction = (actionType: RuleActionType): RuleActionFunc => {
|
||||
switch (actionType) {
|
||||
case RuleActionType.AddLabel:
|
||||
return addLabels
|
||||
case RuleActionType.Archive:
|
||||
return archivePage
|
||||
case RuleActionType.MarkAsRead:
|
||||
return markPageAsRead
|
||||
case RuleActionType.SendNotification:
|
||||
return sendNotification
|
||||
}
|
||||
}
|
||||
|
||||
const triggerActions = async (userId: string, rules: Rule[], data: Data) => {
|
||||
const actionPromises: Promise<unknown>[] = []
|
||||
|
||||
for (const rule of rules) {
|
||||
const itemId = data.id
|
||||
const searchArgs: SearchArgs = {
|
||||
includeContent: false,
|
||||
includeDeleted: false,
|
||||
includePending: false,
|
||||
size: 1,
|
||||
query: `(${rule.filter}) AND includes:${itemId}`,
|
||||
}
|
||||
|
||||
const libraryItems = await searchLibraryItems(searchArgs, userId)
|
||||
if (libraryItems.count === 0) {
|
||||
logger.info(`No pages found for rule ${rule.id}`)
|
||||
continue
|
||||
}
|
||||
|
||||
const libraryItem = libraryItems.libraryItems[0]
|
||||
|
||||
for (const action of rule.actions) {
|
||||
const actionFunc = getRuleAction(action.type)
|
||||
const actionObj: RuleActionObj = {
|
||||
userId,
|
||||
action,
|
||||
libraryItem,
|
||||
}
|
||||
|
||||
actionPromises.push(actionFunc(actionObj))
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
await Promise.all(actionPromises)
|
||||
} catch (error) {
|
||||
logger.error(error)
|
||||
}
|
||||
}
|
||||
|
||||
export const triggerRule = async (data: Data) => {
|
||||
const { userId, ruleEventType } = data
|
||||
|
||||
// get rules by calling api
|
||||
const rules = await findEnabledRules(userId, ruleEventType)
|
||||
if (rules.length === 0) {
|
||||
console.log('No rules found')
|
||||
return false
|
||||
}
|
||||
|
||||
await triggerActions(userId, rules, data)
|
||||
|
||||
return true
|
||||
}
|
||||
@ -92,6 +92,7 @@ import {
|
||||
import { traceAs } from '../../tracing'
|
||||
import { analytics } from '../../utils/analytics'
|
||||
import { isSiteBlockedForParse } from '../../utils/blocked'
|
||||
import { authorized } from '../../utils/gql-utils'
|
||||
import {
|
||||
cleanUrl,
|
||||
errorHandler,
|
||||
@ -102,7 +103,6 @@ import {
|
||||
titleForFilePath,
|
||||
userDataToUser,
|
||||
} from '../../utils/helpers'
|
||||
import { authorized } from '../../utils/gql-utils'
|
||||
import {
|
||||
contentConverter,
|
||||
getDistillerResult,
|
||||
@ -908,7 +908,12 @@ export const setFavoriteArticleResolver = authorized<
|
||||
|
||||
const labels = await findOrCreateLabels([label], uid)
|
||||
// adds Favorites label to item
|
||||
await addLabelsToLibraryItem(labels, id, uid)
|
||||
await addLabelsToLibraryItem(
|
||||
labels.map((l) => l.id),
|
||||
id,
|
||||
uid,
|
||||
'user'
|
||||
)
|
||||
|
||||
return {
|
||||
success: true,
|
||||
|
||||
@ -5,7 +5,6 @@ import { Label } from '../entity/label'
|
||||
import { createPubSubClient, EntityType, PubsubClient } from '../pubsub'
|
||||
import { authTrx } from '../repository'
|
||||
import { CreateLabelInput, labelRepository } from '../repository/label'
|
||||
import { libraryItemRepository } from '../repository/library_item'
|
||||
|
||||
type AddLabelsToLibraryItemEvent = {
|
||||
pageId: string
|
||||
@ -124,43 +123,28 @@ export const saveLabelsInLibraryItem = async (
|
||||
}
|
||||
|
||||
export const addLabelsToLibraryItem = async (
|
||||
labels: Label[],
|
||||
labelIds: string[],
|
||||
libraryItemId: string,
|
||||
userId: string,
|
||||
source: LabelSource = 'user',
|
||||
pubsub = createPubSubClient()
|
||||
source: LabelSource = 'user'
|
||||
) => {
|
||||
await authTrx(
|
||||
async (tx) => {
|
||||
const libraryItem = await tx
|
||||
.withRepository(libraryItemRepository)
|
||||
.findOneByOrFail({ id: libraryItemId, user: { id: userId } })
|
||||
|
||||
if (libraryItem.labels) {
|
||||
labels.push(...libraryItem.labels)
|
||||
}
|
||||
|
||||
// save new labels
|
||||
await tx.getRepository(EntityLabel).save(
|
||||
labels.map((l) => ({
|
||||
labelId: l.id,
|
||||
libraryItemId,
|
||||
source,
|
||||
}))
|
||||
await tx.query(
|
||||
`INSERT INTO omnivore.entity_labels (label_id, library_item_id, source)
|
||||
SELECT id, $1, $2 FROM omnivore.labels
|
||||
WHERE id = ANY($3)
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM omnivore.entity_labels
|
||||
WHERE label_id = labels.id
|
||||
AND library_item_id = $1
|
||||
)`,
|
||||
[libraryItemId, source, labelIds]
|
||||
)
|
||||
},
|
||||
undefined,
|
||||
userId
|
||||
)
|
||||
|
||||
if (source === 'user') {
|
||||
// create pubsub event
|
||||
await pubsub.entityCreated<AddLabelsToLibraryItemEvent>(
|
||||
EntityType.LABEL,
|
||||
{ pageId: libraryItemId, labels, source },
|
||||
userId
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
export const saveLabelsInHighlight = async (
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import { ILike } from 'typeorm'
|
||||
import { Rule, RuleAction } from '../entity/rule'
|
||||
import { authTrx } from '../repository'
|
||||
import { ArrayContainedBy, ArrayContains, ILike } from 'typeorm'
|
||||
import { Rule, RuleAction, RuleEventType } from '../entity/rule'
|
||||
import { authTrx, getRepository } from '../repository'
|
||||
|
||||
export const createRule = async (
|
||||
userId: string,
|
||||
@ -53,3 +53,14 @@ export const deleteRules = async (userId: string) => {
|
||||
userId
|
||||
)
|
||||
}
|
||||
|
||||
export const findEnabledRules = async (
|
||||
userId: string,
|
||||
eventType: RuleEventType
|
||||
) => {
|
||||
return getRepository(Rule).findBy({
|
||||
user: { id: userId },
|
||||
enabled: true,
|
||||
eventTypes: ArrayContainedBy([eventType]),
|
||||
})
|
||||
}
|
||||
|
||||
@ -1,9 +1,15 @@
|
||||
import { Notification } from 'firebase-admin/messaging'
|
||||
import { DeepPartial, FindOptionsWhere, In } from 'typeorm'
|
||||
import { Profile } from '../entity/profile'
|
||||
import { StatusType, User } from '../entity/user'
|
||||
import { authTrx, getRepository, queryBuilderToRawSql } from '../repository'
|
||||
import { userRepository } from '../repository/user'
|
||||
import { SetClaimsRole } from '../utils/dictionary'
|
||||
import {
|
||||
PushNotificationType,
|
||||
sendMulticastPushNotifications,
|
||||
} from '../utils/sendNotification'
|
||||
import { findDeviceTokensByUserId } from './user_device_tokens'
|
||||
|
||||
export const deleteUser = async (userId: string) => {
|
||||
await authTrx(
|
||||
@ -120,3 +126,23 @@ export const batchDelete = async (criteria: FindOptionsWhere<User>) => {
|
||||
SetClaimsRole.ADMIN
|
||||
)
|
||||
}
|
||||
|
||||
export const sendPushNotifications = async (
|
||||
userId: string,
|
||||
notification: Notification,
|
||||
notificationType: PushNotificationType,
|
||||
data?: { [key: string]: string }
|
||||
) => {
|
||||
const tokens = await findDeviceTokensByUserId(userId)
|
||||
if (tokens.length === 0) {
|
||||
throw new Error('No device tokens found')
|
||||
}
|
||||
|
||||
const message = {
|
||||
notification,
|
||||
data,
|
||||
tokens: tokens.map((token) => token.token),
|
||||
}
|
||||
|
||||
return sendMulticastPushNotifications(userId, message, notificationType)
|
||||
}
|
||||
|
||||
@ -60,7 +60,7 @@ export const sendMulticastPushNotifications = async (
|
||||
})
|
||||
|
||||
logger.info('sending multicast message: ', message)
|
||||
const res = await getMessaging().sendMulticast(message)
|
||||
const res = await getMessaging().sendEachForMulticast(message)
|
||||
logger.info('send notification result: ', res.responses)
|
||||
|
||||
return res
|
||||
@ -75,7 +75,7 @@ export const sendBatchPushNotifications = async (
|
||||
messages: Message[]
|
||||
): Promise<BatchResponse | undefined> => {
|
||||
try {
|
||||
const res = await getMessaging().sendAll(messages)
|
||||
const res = await getMessaging().sendEach(messages)
|
||||
logger.info(`success count: ${res.successCount}`)
|
||||
|
||||
return res
|
||||
|
||||
Reference in New Issue
Block a user