Merge pull request #3484 from omnivore-app/webhook-job

fix: replace webhook endpoint with a bullmq job
This commit is contained in:
Hongbo Wu
2024-02-02 16:28:23 +08:00
committed by GitHub
6 changed files with 109 additions and 5 deletions

View File

@ -0,0 +1,56 @@
import axios, { Method } from 'axios'
import { findWebhooksByEventType } from '../services/webhook'
import { logger } from '../utils/logger'
export interface CallWebhookJobData {
data: unknown
userId: string
type: string
action: string
}
export const CALL_WEBHOOK_JOB_NAME = 'call-webhook'
const TIMEOUT = 5000 // 5s
export const callWebhook = async (jobData: CallWebhookJobData) => {
const { data, type, action, userId } = jobData
const eventType = `${type}_${action}`.toUpperCase()
const webhooks = await findWebhooksByEventType(userId, eventType)
if (webhooks.length <= 0) {
return
}
await Promise.all(
webhooks.map((webhook) => {
const url = webhook.url
const method = webhook.method as Method
const body = {
action,
userId,
[type]: data,
}
logger.info('triggering webhook', { url, method })
return axios
.request({
url,
method,
headers: {
'Content-Type': webhook.contentType,
},
data: body,
timeout: TIMEOUT,
})
.then(() => logger.info('webhook triggered'))
.catch((error) => {
if (axios.isAxiosError(error)) {
logger.info('webhook failed', error.response)
} else {
logger.info('webhook failed', error)
}
})
})
)
}

View File

@ -3,7 +3,7 @@ import express from 'express'
import { RuleEventType } from './entity/rule'
import { env } from './env'
import { ReportType } from './generated/graphql'
import { enqueueTriggerRuleJob } from './utils/createTask'
import { enqueueTriggerRuleJob, enqueueWebhookJob } from './utils/createTask'
import { deepDelete } from './utils/helpers'
import { buildLogger } from './utils/logger'
@ -63,6 +63,13 @@ export const createPubSubClient = (): PubsubClient => {
[...fieldsToDelete]
)
await enqueueWebhookJob({
userId,
type,
action: 'created',
data,
})
return publish(
'entityCreated',
Buffer.from(JSON.stringify({ type, userId, ...cleanData }))
@ -88,6 +95,13 @@ export const createPubSubClient = (): PubsubClient => {
[...fieldsToDelete]
)
await enqueueWebhookJob({
userId,
type,
action: 'updated',
data,
})
return publish(
'entityUpdated',
Buffer.from(JSON.stringify({ type, userId, ...cleanData }))

View File

@ -15,6 +15,7 @@ import { SnakeNamingStrategy } from 'typeorm-naming-strategies'
import { appDataSource } from './data_source'
import { env } from './env'
import { bulkAction, BULK_ACTION_JOB_NAME } from './jobs/bulk_action'
import { CALL_WEBHOOK_JOB_NAME, callWebhook } from './jobs/call_webhook'
import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail'
import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds'
import { refreshFeed } from './jobs/rss/refreshFeed'
@ -87,6 +88,8 @@ export const createWorker = (connection: ConnectionOptions) =>
return syncReadPositionsJob(job.data)
case BULK_ACTION_JOB_NAME:
return bulkAction(job.data)
case CALL_WEBHOOK_JOB_NAME:
return callWebhook(job.data)
}
},
{

View File

@ -1,4 +1,4 @@
import { ArrayContainedBy, ArrayContains, ILike } from 'typeorm'
import { ArrayContains, ILike } from 'typeorm'
import { Rule, RuleAction, RuleEventType } from '../entity/rule'
import { authTrx, getRepository } from '../repository'
@ -61,6 +61,6 @@ export const findEnabledRules = async (
return getRepository(Rule).findBy({
user: { id: userId },
enabled: true,
eventTypes: ArrayContainedBy([eventType]),
eventTypes: ArrayContains([eventType]),
})
}

View File

@ -1,4 +1,4 @@
import { DeepPartial, EntityManager } from 'typeorm'
import { ArrayContains, DeepPartial, EntityManager } from 'typeorm'
import { Webhook } from '../entity/webhook'
import { authTrx } from '../repository'
@ -34,6 +34,22 @@ export const findWebhooks = async (userId: string) => {
)
}
export const findWebhooksByEventType = async (
userId: string,
eventType: string
) => {
return authTrx(
(tx) =>
tx.getRepository(Webhook).findBy({
user: { id: userId },
enabled: true,
eventTypes: ArrayContains([eventType]),
}),
undefined,
userId
)
}
export const findWebhookById = async (id: string, userId: string) => {
return authTrx(
(tx) => tx.getRepository(Webhook).findOneBy({ id, user: { id: userId } }),

View File

@ -15,6 +15,7 @@ import {
CreateLabelInput,
} from '../generated/graphql'
import { BulkActionData, BULK_ACTION_JOB_NAME } from '../jobs/bulk_action'
import { CallWebhookJobData, CALL_WEBHOOK_JOB_NAME } from '../jobs/call_webhook'
import { THUMBNAIL_JOB } from '../jobs/find_thumbnail'
import { queueRSSRefreshFeedJob } from '../jobs/rss/refreshAllFeeds'
import { TriggerRuleJobData, TRIGGER_RULE_JOB_NAME } from '../jobs/trigger_rule'
@ -663,7 +664,21 @@ export const enqueueTriggerRuleJob = async (data: TriggerRuleJobData) => {
}
return queue.add(TRIGGER_RULE_JOB_NAME, data, {
priority: 1,
priority: 5,
attempts: 1,
removeOnComplete: true,
removeOnFail: true,
})
}
export const enqueueWebhookJob = async (data: CallWebhookJobData) => {
const queue = await getBackendQueue()
if (!queue) {
return undefined
}
return queue.add(CALL_WEBHOOK_JOB_NAME, data, {
priority: 5,
attempts: 1,
removeOnComplete: true,
removeOnFail: true,