diff --git a/packages/api/src/jobs/call_webhook.ts b/packages/api/src/jobs/call_webhook.ts new file mode 100644 index 000000000..de8e986cb --- /dev/null +++ b/packages/api/src/jobs/call_webhook.ts @@ -0,0 +1,56 @@ +import axios, { Method } from 'axios' +import { findWebhooksByEventTypes } from '../services/webhook' +import { logger } from '../utils/logger' + +export interface CallWebhookJobData { + data: unknown + userId: string + type: string + action: string +} + +export const CALL_WEBHOOK_JOB_NAME = 'call-webhook' +const TIMEOUT = 5000 // 5s + +export const callWebhook = async (jobData: CallWebhookJobData) => { + const { data, type, action, userId } = jobData + const eventType = `${type}_${action}`.toUpperCase() + const webhooks = await findWebhooksByEventTypes(userId, [eventType]) + + if (webhooks.length <= 0) { + return + } + + await Promise.all( + webhooks.map((webhook) => { + const url = webhook.url + const method = webhook.method as Method + const body = { + action, + userId, + [type]: data, + } + + logger.info('triggering webhook', { url, method }) + + return axios + .request({ + url, + method, + headers: { + 'Content-Type': webhook.contentType, + }, + data: body, + timeout: TIMEOUT, + }) + .then(() => logger.info('webhook triggered')) + .catch((error) => { + if (axios.isAxiosError(error)) { + logger.info('webhook failed', error.response) + } else { + logger.info('webhook failed', error) + } + }) + }) + ) +} diff --git a/packages/api/src/pubsub.ts b/packages/api/src/pubsub.ts index 02c35f3d4..2f5abf639 100644 --- a/packages/api/src/pubsub.ts +++ b/packages/api/src/pubsub.ts @@ -3,7 +3,7 @@ import express from 'express' import { RuleEventType } from './entity/rule' import { env } from './env' import { ReportType } from './generated/graphql' -import { enqueueTriggerRuleJob } from './utils/createTask' +import { enqueueTriggerRuleJob, enqueueWebhookJob } from './utils/createTask' import { deepDelete } from './utils/helpers' import { buildLogger } from './utils/logger' @@ -63,6 +63,13 @@ export const createPubSubClient = (): PubsubClient => { [...fieldsToDelete] ) + await enqueueWebhookJob({ + userId, + type, + action: 'created', + data, + }) + return publish( 'entityCreated', Buffer.from(JSON.stringify({ type, userId, ...cleanData })) @@ -88,6 +95,13 @@ export const createPubSubClient = (): PubsubClient => { [...fieldsToDelete] ) + await enqueueWebhookJob({ + userId, + type, + action: 'updated', + data, + }) + return publish( 'entityUpdated', Buffer.from(JSON.stringify({ type, userId, ...cleanData })) diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index 162611742..b93b327d9 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -15,6 +15,7 @@ import { SnakeNamingStrategy } from 'typeorm-naming-strategies' import { appDataSource } from './data_source' import { env } from './env' import { bulkAction, BULK_ACTION_JOB_NAME } from './jobs/bulk_action' +import { CALL_WEBHOOK_JOB_NAME, callWebhook } from './jobs/call_webhook' import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail' import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds' import { refreshFeed } from './jobs/rss/refreshFeed' @@ -87,6 +88,8 @@ export const createWorker = (connection: ConnectionOptions) => return syncReadPositionsJob(job.data) case BULK_ACTION_JOB_NAME: return bulkAction(job.data) + case CALL_WEBHOOK_JOB_NAME: + return callWebhook(job.data) } }, { diff --git a/packages/api/src/services/webhook.ts b/packages/api/src/services/webhook.ts index 4414f44a1..149560fb5 100644 --- a/packages/api/src/services/webhook.ts +++ b/packages/api/src/services/webhook.ts @@ -1,4 +1,4 @@ -import { DeepPartial, EntityManager } from 'typeorm' +import { ArrayContainedBy, DeepPartial, EntityManager } from 'typeorm' import { Webhook } from '../entity/webhook' import { authTrx } from '../repository' @@ -34,6 +34,22 @@ export const findWebhooks = async (userId: string) => { ) } +export const findWebhooksByEventTypes = async ( + userId: string, + eventTypes: string[] +) => { + return authTrx( + (tx) => + tx.getRepository(Webhook).findBy({ + user: { id: userId }, + enabled: true, + eventTypes: ArrayContainedBy(eventTypes), + }), + undefined, + userId + ) +} + export const findWebhookById = async (id: string, userId: string) => { return authTrx( (tx) => tx.getRepository(Webhook).findOneBy({ id, user: { id: userId } }), diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index da1115200..53a15e041 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -15,6 +15,7 @@ import { CreateLabelInput, } from '../generated/graphql' import { BulkActionData, BULK_ACTION_JOB_NAME } from '../jobs/bulk_action' +import { CallWebhookJobData, CALL_WEBHOOK_JOB_NAME } from '../jobs/call_webhook' import { THUMBNAIL_JOB } from '../jobs/find_thumbnail' import { queueRSSRefreshFeedJob } from '../jobs/rss/refreshAllFeeds' import { TriggerRuleJobData, TRIGGER_RULE_JOB_NAME } from '../jobs/trigger_rule' @@ -670,6 +671,20 @@ export const enqueueTriggerRuleJob = async (data: TriggerRuleJobData) => { }) } +export const enqueueWebhookJob = async (data: CallWebhookJobData) => { + const queue = await getBackendQueue() + if (!queue) { + return undefined + } + + return queue.add(CALL_WEBHOOK_JOB_NAME, data, { + priority: 1, + attempts: 1, + removeOnComplete: true, + removeOnFail: true, + }) +} + export const bulkEnqueueUpdateLabels = async (data: UpdateLabelsData[]) => { const queue = await getBackendQueue() if (!queue) {