From 971bbfda7f984239c136eddbc93dfb9abbf18b09 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Wed, 3 Aug 2022 18:54:11 +0800 Subject: [PATCH] Add integrations service router to sync pages with integrations --- packages/api/src/routers/svc/integrations.ts | 137 +++++++++++++++++++ packages/api/src/server.ts | 2 + 2 files changed, 139 insertions(+) create mode 100644 packages/api/src/routers/svc/integrations.ts diff --git a/packages/api/src/routers/svc/integrations.ts b/packages/api/src/routers/svc/integrations.ts new file mode 100644 index 000000000..77d421667 --- /dev/null +++ b/packages/api/src/routers/svc/integrations.ts @@ -0,0 +1,137 @@ +/* eslint-disable @typescript-eslint/no-misused-promises */ +/* eslint-disable @typescript-eslint/no-unsafe-assignment */ +/* eslint-disable @typescript-eslint/no-unsafe-member-access */ +import express from 'express' +import { EntityType, readPushSubscription } from '../../datalayer/pubsub' +import { getRepository } from '../../entity/utils' +import { Integration, IntegrationType } from '../../entity/integration' +import { buildLogger } from '../../utils/logger' +import { syncWithIntegration } from '../../services/integrations' +import { getPageById, getPageByParam, searchPages } from '../../elastic/pages' +import { Page } from '../../elastic/types' + +interface Message { + type: EntityType + data: any + userId: string +} + +const logger = buildLogger('app.dispatch') + +export function integrationsServiceRouter() { + const router = express.Router() + + router.post('/:integrationType/:action', async (req, res) => { + logger.info( + req.params.action, + 'with integration', + req.params.integrationType + ) + const { message: msgStr, expired } = readPushSubscription(req) + + if (!msgStr) { + res.status(400).send('Bad Request') + return + } + + if (expired) { + logger.info('discarding expired message') + res.status(200).send('Expired') + return + } + + try { + const { userId, type, data }: Message = JSON.parse(msgStr) + if (!userId) { + console.log('No userId found in message') + res.status(400).send('Bad Request') + return + } + + const integration = await getRepository(Integration).findOneBy({ + user: { id: userId }, + type: req.params.integrationType.toUpperCase() as IntegrationType, + enabled: true, + }) + if (!integration) { + logger.info('No active integration found for user', userId) + res.status(200).send('No integration found') + return + } + + const action = req.params.action.toUpperCase() + if (action === 'SYNC_UPDATED') { + // get updated page by id + const id = data.id + let page: Page | undefined + switch (type) { + case EntityType.PAGE: + page = await getPageById(id) + break + case EntityType.HIGHLIGHT: + page = await getPageByParam({ 'highlights.id': id }) + break + case EntityType.LABEL: + page = await getPageByParam({ 'labels.id': id }) + break + } + if (!page) { + logger.info('No page found for id', id) + res.status(200).send('No page found') + return + } + // sync updated page with integration + logger.info('syncing page', page.id, 'with integration', integration.id) + + const synced = await syncWithIntegration(integration, [page]) + if (!synced) { + logger.info( + 'failed to sync page', + page.id, + 'with integration', + integration.id + ) + res.status(400).send('Failed to sync') + return + } + } else if (action === 'SYNC_ALL') { + // sync all pages of the user + const size = 50 + + for ( + let hasNextPage = true, count = 0, after = 0, pages: Page[] = []; + hasNextPage; + after += size, hasNextPage = count > after + ) { + ;[pages, count] = (await searchPages({ from: after, size }, userId))! + const pageIds = pages.map((p) => p.id) + + logger.info('syncing pages', pageIds) + + const synced = await syncWithIntegration(integration, pages) + if (!synced) { + logger.info( + 'failed to sync pages', + pageIds, + 'with integration', + integration.id + ) + res.status(400).send('Failed to sync') + return + } + } + } else { + logger.info('unknown action', action) + res.status(200).send('Unknown action') + return + } + + res.status(200).send('OK') + } catch (err) { + logger.error('sync with integrations failed', err) + res.status(500).send(err) + } + }) + + return router +} diff --git a/packages/api/src/server.ts b/packages/api/src/server.ts index 64f0326c0..619a4fda5 100755 --- a/packages/api/src/server.ts +++ b/packages/api/src/server.ts @@ -44,6 +44,7 @@ import { initElasticsearch } from './elastic' import { uploadServiceRouter } from './routers/svc/upload' import rateLimit from 'express-rate-limit' import { webhooksServiceRouter } from './routers/svc/webhooks' +import { integrationsServiceRouter } from './routers/svc/integrations' const PORT = process.env.PORT || 4000 @@ -115,6 +116,7 @@ export const createApp = (): { app.use('/svc/pubsub/emails', emailsServiceRouter()) app.use('/svc/pubsub/upload', uploadServiceRouter()) app.use('/svc/pubsub/webhooks', webhooksServiceRouter()) + app.use('/svc/pubsub/integrations', integrationsServiceRouter()) app.use('/svc/reminders', remindersServiceRouter()) app.use('/svc/pdf-attachments', pdfAttachmentsRouter())