Add integrations service router to sync pages with integrations
This commit is contained in:
137
packages/api/src/routers/svc/integrations.ts
Normal file
137
packages/api/src/routers/svc/integrations.ts
Normal file
@ -0,0 +1,137 @@
|
||||
/* eslint-disable @typescript-eslint/no-misused-promises */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||
import express from 'express'
|
||||
import { EntityType, readPushSubscription } from '../../datalayer/pubsub'
|
||||
import { getRepository } from '../../entity/utils'
|
||||
import { Integration, IntegrationType } from '../../entity/integration'
|
||||
import { buildLogger } from '../../utils/logger'
|
||||
import { syncWithIntegration } from '../../services/integrations'
|
||||
import { getPageById, getPageByParam, searchPages } from '../../elastic/pages'
|
||||
import { Page } from '../../elastic/types'
|
||||
|
||||
interface Message {
|
||||
type: EntityType
|
||||
data: any
|
||||
userId: string
|
||||
}
|
||||
|
||||
const logger = buildLogger('app.dispatch')
|
||||
|
||||
export function integrationsServiceRouter() {
|
||||
const router = express.Router()
|
||||
|
||||
router.post('/:integrationType/:action', async (req, res) => {
|
||||
logger.info(
|
||||
req.params.action,
|
||||
'with integration',
|
||||
req.params.integrationType
|
||||
)
|
||||
const { message: msgStr, expired } = readPushSubscription(req)
|
||||
|
||||
if (!msgStr) {
|
||||
res.status(400).send('Bad Request')
|
||||
return
|
||||
}
|
||||
|
||||
if (expired) {
|
||||
logger.info('discarding expired message')
|
||||
res.status(200).send('Expired')
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
const { userId, type, data }: Message = JSON.parse(msgStr)
|
||||
if (!userId) {
|
||||
console.log('No userId found in message')
|
||||
res.status(400).send('Bad Request')
|
||||
return
|
||||
}
|
||||
|
||||
const integration = await getRepository(Integration).findOneBy({
|
||||
user: { id: userId },
|
||||
type: req.params.integrationType.toUpperCase() as IntegrationType,
|
||||
enabled: true,
|
||||
})
|
||||
if (!integration) {
|
||||
logger.info('No active integration found for user', userId)
|
||||
res.status(200).send('No integration found')
|
||||
return
|
||||
}
|
||||
|
||||
const action = req.params.action.toUpperCase()
|
||||
if (action === 'SYNC_UPDATED') {
|
||||
// get updated page by id
|
||||
const id = data.id
|
||||
let page: Page | undefined
|
||||
switch (type) {
|
||||
case EntityType.PAGE:
|
||||
page = await getPageById(id)
|
||||
break
|
||||
case EntityType.HIGHLIGHT:
|
||||
page = await getPageByParam({ 'highlights.id': id })
|
||||
break
|
||||
case EntityType.LABEL:
|
||||
page = await getPageByParam({ 'labels.id': id })
|
||||
break
|
||||
}
|
||||
if (!page) {
|
||||
logger.info('No page found for id', id)
|
||||
res.status(200).send('No page found')
|
||||
return
|
||||
}
|
||||
// sync updated page with integration
|
||||
logger.info('syncing page', page.id, 'with integration', integration.id)
|
||||
|
||||
const synced = await syncWithIntegration(integration, [page])
|
||||
if (!synced) {
|
||||
logger.info(
|
||||
'failed to sync page',
|
||||
page.id,
|
||||
'with integration',
|
||||
integration.id
|
||||
)
|
||||
res.status(400).send('Failed to sync')
|
||||
return
|
||||
}
|
||||
} else if (action === 'SYNC_ALL') {
|
||||
// sync all pages of the user
|
||||
const size = 50
|
||||
|
||||
for (
|
||||
let hasNextPage = true, count = 0, after = 0, pages: Page[] = [];
|
||||
hasNextPage;
|
||||
after += size, hasNextPage = count > after
|
||||
) {
|
||||
;[pages, count] = (await searchPages({ from: after, size }, userId))!
|
||||
const pageIds = pages.map((p) => p.id)
|
||||
|
||||
logger.info('syncing pages', pageIds)
|
||||
|
||||
const synced = await syncWithIntegration(integration, pages)
|
||||
if (!synced) {
|
||||
logger.info(
|
||||
'failed to sync pages',
|
||||
pageIds,
|
||||
'with integration',
|
||||
integration.id
|
||||
)
|
||||
res.status(400).send('Failed to sync')
|
||||
return
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logger.info('unknown action', action)
|
||||
res.status(200).send('Unknown action')
|
||||
return
|
||||
}
|
||||
|
||||
res.status(200).send('OK')
|
||||
} catch (err) {
|
||||
logger.error('sync with integrations failed', err)
|
||||
res.status(500).send(err)
|
||||
}
|
||||
})
|
||||
|
||||
return router
|
||||
}
|
||||
@ -44,6 +44,7 @@ import { initElasticsearch } from './elastic'
|
||||
import { uploadServiceRouter } from './routers/svc/upload'
|
||||
import rateLimit from 'express-rate-limit'
|
||||
import { webhooksServiceRouter } from './routers/svc/webhooks'
|
||||
import { integrationsServiceRouter } from './routers/svc/integrations'
|
||||
|
||||
const PORT = process.env.PORT || 4000
|
||||
|
||||
@ -115,6 +116,7 @@ export const createApp = (): {
|
||||
app.use('/svc/pubsub/emails', emailsServiceRouter())
|
||||
app.use('/svc/pubsub/upload', uploadServiceRouter())
|
||||
app.use('/svc/pubsub/webhooks', webhooksServiceRouter())
|
||||
app.use('/svc/pubsub/integrations', integrationsServiceRouter())
|
||||
app.use('/svc/reminders', remindersServiceRouter())
|
||||
app.use('/svc/pdf-attachments', pdfAttachmentsRouter())
|
||||
|
||||
|
||||
Reference in New Issue
Block a user