Add integration tests for integration router
This commit is contained in:
@ -111,7 +111,7 @@ interface PubSubRequestMessage {
|
||||
publishTime: string
|
||||
}
|
||||
|
||||
interface PubSubRequestBody {
|
||||
export interface PubSubRequestBody {
|
||||
message: PubSubRequestMessage
|
||||
}
|
||||
|
||||
|
||||
@ -11,7 +11,7 @@ import { getPageById, searchPages } from '../../elastic/pages'
|
||||
import { Page } from '../../elastic/types'
|
||||
import { DateFilter } from '../../utils/search'
|
||||
|
||||
interface Message {
|
||||
export interface Message {
|
||||
type?: EntityType
|
||||
data?: any
|
||||
userId: string
|
||||
@ -23,11 +23,10 @@ export function integrationsServiceRouter() {
|
||||
const router = express.Router()
|
||||
|
||||
router.post('/:integrationType/:action', async (req, res) => {
|
||||
logger.info(
|
||||
req.params.action,
|
||||
'with integration',
|
||||
req.params.integrationType
|
||||
)
|
||||
logger.info('start to sync with integration', {
|
||||
action: req.params.action,
|
||||
integrationType: req.params.integrationType,
|
||||
})
|
||||
const { message: msgStr, expired } = readPushSubscription(req)
|
||||
|
||||
if (!msgStr) {
|
||||
@ -44,7 +43,7 @@ export function integrationsServiceRouter() {
|
||||
try {
|
||||
const { userId, type, data }: Message = JSON.parse(msgStr)
|
||||
if (!userId) {
|
||||
console.log('No userId found in message')
|
||||
logger.info('No userId found in message')
|
||||
res.status(400).send('Bad Request')
|
||||
return
|
||||
}
|
||||
@ -55,7 +54,7 @@ export function integrationsServiceRouter() {
|
||||
enabled: true,
|
||||
})
|
||||
if (!integration) {
|
||||
logger.info('No active integration found for user', userId)
|
||||
logger.info('No active integration found for user', { userId })
|
||||
res.status(200).send('No integration found')
|
||||
return
|
||||
}
|
||||
@ -77,21 +76,22 @@ export function integrationsServiceRouter() {
|
||||
}
|
||||
const page = await getPageById(id)
|
||||
if (!page) {
|
||||
logger.info('No page found for id', id)
|
||||
logger.info('No page found for id', { id })
|
||||
res.status(200).send('No page found')
|
||||
return
|
||||
}
|
||||
// sync updated page with integration
|
||||
logger.info('syncing page', page.id, 'with integration', integration.id)
|
||||
logger.info('syncing updated page with integration', {
|
||||
integrationId: integration.id,
|
||||
pageId: page.id,
|
||||
})
|
||||
|
||||
const synced = await syncWithIntegration(integration, [page])
|
||||
if (!synced) {
|
||||
logger.info(
|
||||
'failed to sync page',
|
||||
page.id,
|
||||
'with integration',
|
||||
integration.id
|
||||
)
|
||||
logger.info('failed to sync page', {
|
||||
integrationId: integration.id,
|
||||
pageId: page.id,
|
||||
})
|
||||
res.status(400).send('Failed to sync')
|
||||
return
|
||||
}
|
||||
@ -115,25 +115,25 @@ export function integrationsServiceRouter() {
|
||||
))!
|
||||
const pageIds = pages.map((p) => p.id)
|
||||
|
||||
logger.info('syncing pages', pageIds)
|
||||
logger.info('syncing pages', { pageIds })
|
||||
|
||||
const synced = await syncWithIntegration(integration, pages)
|
||||
if (!synced) {
|
||||
logger.info(
|
||||
'failed to sync pages',
|
||||
logger.info('failed to sync pages', {
|
||||
pageIds,
|
||||
'with integration',
|
||||
integration.id
|
||||
)
|
||||
integrationId: integration.id,
|
||||
})
|
||||
res.status(400).send('Failed to sync')
|
||||
return
|
||||
}
|
||||
}
|
||||
// update integration syncedAt if successful
|
||||
await getRepository(Integration).update(integration.id, {
|
||||
taskName: null,
|
||||
syncedAt: new Date(),
|
||||
})
|
||||
} else {
|
||||
logger.info('unknown action', action)
|
||||
logger.info('unknown action', { action })
|
||||
res.status(200).send('Unknown action')
|
||||
return
|
||||
}
|
||||
|
||||
@ -5,7 +5,6 @@ import { wait } from '../utils/helpers'
|
||||
import { Page } from '../elastic/types'
|
||||
import { getHighlightLocation, getHighlightUrl } from './highlights'
|
||||
import { Integration } from '../entity/integration'
|
||||
import { getRepository } from '../entity/utils'
|
||||
|
||||
interface ReadwiseHighlight {
|
||||
// The highlight text, (technically the only field required in a highlight object)
|
||||
@ -72,7 +71,7 @@ const pageToReadwiseHighlight = (page: Page): ReadwiseHighlight[] => {
|
||||
title: page.title,
|
||||
author: page.author,
|
||||
highlight_url: getHighlightUrl(page.slug, highlight.id),
|
||||
highlighted_at: highlight.createdAt.toISOString(),
|
||||
highlighted_at: new Date(highlight.createdAt).toISOString(),
|
||||
category: 'articles',
|
||||
image_url: page.image,
|
||||
location,
|
||||
@ -99,13 +98,6 @@ export const syncWithIntegration = async (
|
||||
default:
|
||||
return false
|
||||
}
|
||||
// update integration syncedAt if successful
|
||||
if (result) {
|
||||
console.log('updating integration syncedAt')
|
||||
await getRepository(Integration).update(integration.id, {
|
||||
syncedAt: new Date(),
|
||||
})
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
@ -138,7 +130,7 @@ export const syncWithReadwise = async (
|
||||
console.log('Readwise API rate limit exceeded, retrying...')
|
||||
// wait for Retry-After seconds in the header if rate limited
|
||||
// max retry count is 3
|
||||
const retryAfter = error.response?.headers['Retry-After'] || '10' // default to 10 seconds
|
||||
const retryAfter = error.response?.headers['retry-after'] || '10' // default to 10 seconds
|
||||
await wait(parseInt(retryAfter, 10) * 1000)
|
||||
return syncWithReadwise(token, highlights, retryCount + 1)
|
||||
}
|
||||
|
||||
@ -288,9 +288,17 @@ export const enqueueSyncWithIntegration = async (
|
||||
userId: string,
|
||||
integrationType: IntegrationType
|
||||
): Promise<string> => {
|
||||
const { GOOGLE_CLOUD_PROJECT } = process.env
|
||||
const { GOOGLE_CLOUD_PROJECT, PUBSUB_VERIFICATION_TOKEN } = process.env
|
||||
// use pubsub data format to send the userId to the task handler
|
||||
const payload = {
|
||||
userId,
|
||||
message: {
|
||||
data: Buffer.from(
|
||||
JSON.stringify({
|
||||
userId,
|
||||
})
|
||||
).toString('base64'),
|
||||
publishTime: new Date().toISOString(),
|
||||
},
|
||||
}
|
||||
|
||||
// If there is no Google Cloud Project Id exposed, it means that we are in local environment
|
||||
@ -303,7 +311,7 @@ export const enqueueSyncWithIntegration = async (
|
||||
payload,
|
||||
taskHandlerUrl: `${
|
||||
env.queue.integrationTaskHandlerUrl
|
||||
}/${integrationType.toLowerCase()}/sync_all`,
|
||||
}/${integrationType.toLowerCase()}/sync_all?token=${PUBSUB_VERIFICATION_TOKEN}`,
|
||||
priority: 'low',
|
||||
})
|
||||
|
||||
|
||||
@ -0,0 +1,323 @@
|
||||
import 'mocha'
|
||||
import { createTestElasticPage, request } from '../util'
|
||||
import { expect } from 'chai'
|
||||
import { DateTime } from 'luxon'
|
||||
import {
|
||||
createPubSubClient,
|
||||
PubSubRequestBody,
|
||||
} from '../../src/datalayer/pubsub'
|
||||
import { User } from '../../src/entity/user'
|
||||
import { createTestUser, deleteTestUser } from '../db'
|
||||
import { Integration, IntegrationType } from '../../src/entity/integration'
|
||||
import { getRepository } from '../../src/entity/utils'
|
||||
import { Highlight, Page, PageContext } from '../../src/elastic/types'
|
||||
import nock from 'nock'
|
||||
import { READWISE_API_URL } from '../../src/services/integrations'
|
||||
import { addHighlightToPage } from '../../src/elastic/highlights'
|
||||
import { getHighlightUrl } from '../../src/services/highlights'
|
||||
import { deletePage } from '../../src/elastic/pages'
|
||||
|
||||
describe('Integrations routers', () => {
|
||||
let token: string
|
||||
|
||||
describe('sync with integrations', () => {
|
||||
const endpoint = (token: string, type = 'type', action = 'action') =>
|
||||
`/svc/pubsub/integrations/${type}/${action}?token=${token}`
|
||||
let action: string
|
||||
let data: PubSubRequestBody
|
||||
let integrationType: string
|
||||
|
||||
context('when token is invalid', () => {
|
||||
before(() => {
|
||||
token = 'invalid-token'
|
||||
})
|
||||
|
||||
it('returns 400', async () => {
|
||||
return request.post(endpoint(token)).send(data).expect(400)
|
||||
})
|
||||
})
|
||||
|
||||
context('when token is valid', () => {
|
||||
before(() => {
|
||||
token = process.env.PUBSUB_VERIFICATION_TOKEN!
|
||||
})
|
||||
|
||||
context('when data is expired', () => {
|
||||
before(() => {
|
||||
data = {
|
||||
message: {
|
||||
data: Buffer.from(
|
||||
JSON.stringify({ userId: 'userId', type: 'page' })
|
||||
).toString('base64'),
|
||||
publishTime: DateTime.now().minus({ hours: 12 }).toISO(),
|
||||
},
|
||||
}
|
||||
})
|
||||
|
||||
it('returns 200 with Expired', async () => {
|
||||
const res = await request.post(endpoint(token)).send(data).expect(200)
|
||||
expect(res.text).to.eql('Expired')
|
||||
})
|
||||
})
|
||||
|
||||
context('when userId is empty', () => {
|
||||
before(() => {
|
||||
data = {
|
||||
message: {
|
||||
data: Buffer.from(
|
||||
JSON.stringify({ userId: '', type: 'page' })
|
||||
).toString('base64'),
|
||||
publishTime: new Date().toISOString(),
|
||||
},
|
||||
}
|
||||
})
|
||||
|
||||
it('returns 400', async () => {
|
||||
return request.post(endpoint(token)).send(data).expect(400)
|
||||
})
|
||||
})
|
||||
|
||||
context('when user exists', () => {
|
||||
let user: User
|
||||
|
||||
before(async () => {
|
||||
user = await createTestUser('fakeUser')
|
||||
})
|
||||
|
||||
after(async () => {
|
||||
await deleteTestUser(user.name)
|
||||
})
|
||||
|
||||
context('when integration not found', () => {
|
||||
before(() => {
|
||||
integrationType = IntegrationType.Readwise
|
||||
data = {
|
||||
message: {
|
||||
data: Buffer.from(
|
||||
JSON.stringify({ userId: user.id, type: 'page' })
|
||||
).toString('base64'),
|
||||
publishTime: new Date().toISOString(),
|
||||
},
|
||||
}
|
||||
})
|
||||
|
||||
it('returns 200 with No integration found', async () => {
|
||||
const res = await request
|
||||
.post(endpoint(token, integrationType))
|
||||
.send(data)
|
||||
.expect(200)
|
||||
expect(res.text).to.eql('No integration found')
|
||||
})
|
||||
})
|
||||
|
||||
context('when integration is readwise and enabled', () => {
|
||||
let integration: Integration
|
||||
let ctx: PageContext
|
||||
let page: Page
|
||||
let highlight: Highlight
|
||||
let highlightsData: string
|
||||
|
||||
before(async () => {
|
||||
integration = await getRepository(Integration).save({
|
||||
user: { id: user.id },
|
||||
type: IntegrationType.Readwise,
|
||||
token: 'token',
|
||||
})
|
||||
integrationType = integration.type
|
||||
// create page
|
||||
page = await createTestElasticPage(user.id)
|
||||
ctx = {
|
||||
uid: user.id,
|
||||
pubsub: createPubSubClient(),
|
||||
refresh: true,
|
||||
}
|
||||
// create highlight
|
||||
const location = 109
|
||||
const patch = `@@ -${location + 1},16 +${location + 1},36 @@
|
||||
. We're
|
||||
+%3Comnivore_highlight%3E
|
||||
humbled
|
||||
@@ -254,16 +254,37 @@
|
||||
h in the
|
||||
+%3C/omnivore_highlight%3E
|
||||
coming`
|
||||
highlight = {
|
||||
createdAt: new Date(),
|
||||
id: 'test id',
|
||||
patch,
|
||||
quote: 'test quote',
|
||||
shortId: 'test shortId',
|
||||
updatedAt: new Date(),
|
||||
userId: user.id,
|
||||
}
|
||||
await addHighlightToPage(page.id, highlight, ctx)
|
||||
// create highlights data for integration request
|
||||
highlightsData = JSON.stringify({
|
||||
highlights: [
|
||||
{
|
||||
text: highlight.quote,
|
||||
title: page.title,
|
||||
author: page.author,
|
||||
highlight_url: getHighlightUrl(page.slug, highlight.id),
|
||||
highlighted_at: highlight.createdAt.toISOString(),
|
||||
category: 'articles',
|
||||
image_url: page.image,
|
||||
location,
|
||||
location_type: 'page',
|
||||
note: highlight.annotation,
|
||||
source_type: 'omnivore',
|
||||
source_url: page.url,
|
||||
},
|
||||
],
|
||||
})
|
||||
})
|
||||
|
||||
after(async () => {
|
||||
await getRepository(Integration).delete(integration.id)
|
||||
await deletePage(page.id, ctx)
|
||||
})
|
||||
|
||||
context('when action is sync_updated', () => {
|
||||
before(async () => {
|
||||
action = 'sync_updated'
|
||||
})
|
||||
|
||||
context('when entity type is page', () => {
|
||||
before(() => {
|
||||
data = {
|
||||
message: {
|
||||
data: Buffer.from(
|
||||
JSON.stringify({
|
||||
userId: user.id,
|
||||
type: 'page',
|
||||
data: { id: page.id },
|
||||
})
|
||||
).toString('base64'),
|
||||
publishTime: new Date().toISOString(),
|
||||
},
|
||||
}
|
||||
// mock Readwise Highlight API
|
||||
nock(READWISE_API_URL, {
|
||||
reqheaders: {
|
||||
Authorization: `Token ${integration.token}`,
|
||||
ContentType: 'application/json',
|
||||
},
|
||||
})
|
||||
.post('/highlights', highlightsData)
|
||||
.reply(200)
|
||||
})
|
||||
|
||||
it('returns 200 with OK', async () => {
|
||||
const res = await request
|
||||
.post(endpoint(token, integrationType, action))
|
||||
.send(data)
|
||||
.expect(200)
|
||||
expect(res.text).to.eql('OK')
|
||||
})
|
||||
|
||||
context('when readwise highlight API reaches rate limits', () => {
|
||||
before(() => {
|
||||
// mock Readwise Highlight API with rate limits
|
||||
// retry after 1 second
|
||||
nock(READWISE_API_URL, {
|
||||
reqheaders: {
|
||||
Authorization: `Token ${integration.token}`,
|
||||
ContentType: 'application/json',
|
||||
},
|
||||
})
|
||||
.post('/highlights')
|
||||
.reply(429, 'Rate Limited', { 'Retry-After': '1' })
|
||||
// mock Readwise Highlight API after 1 second
|
||||
nock(READWISE_API_URL, {
|
||||
reqheaders: {
|
||||
Authorization: `Token ${integration.token}`,
|
||||
ContentType: 'application/json',
|
||||
},
|
||||
})
|
||||
.post('/highlights')
|
||||
.delay(1000)
|
||||
.reply(200)
|
||||
})
|
||||
|
||||
it('returns 200 with OK', async () => {
|
||||
const res = await request
|
||||
.post(endpoint(token, integrationType, action))
|
||||
.send(data)
|
||||
.expect(200)
|
||||
expect(res.text).to.eql('OK')
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
context('when entity type is highlight', () => {
|
||||
before(() => {
|
||||
data = {
|
||||
message: {
|
||||
data: Buffer.from(
|
||||
JSON.stringify({
|
||||
userId: user.id,
|
||||
type: 'highlight',
|
||||
data: { articleId: page.id },
|
||||
})
|
||||
).toString('base64'),
|
||||
publishTime: new Date().toISOString(),
|
||||
},
|
||||
}
|
||||
// mock Readwise Highlight API
|
||||
nock(READWISE_API_URL, {
|
||||
reqheaders: {
|
||||
Authorization: `Token ${integration.token}`,
|
||||
ContentType: 'application/json',
|
||||
},
|
||||
})
|
||||
.post('/highlights', highlightsData)
|
||||
.reply(200)
|
||||
})
|
||||
|
||||
it('returns 200 with OK', async () => {
|
||||
const res = await request
|
||||
.post(endpoint(token, integrationType, action))
|
||||
.send(data)
|
||||
.expect(200)
|
||||
expect(res.text).to.eql('OK')
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
context('when action is sync_all', () => {
|
||||
before(() => {
|
||||
action = 'sync_all'
|
||||
data = {
|
||||
message: {
|
||||
data: Buffer.from(
|
||||
JSON.stringify({
|
||||
userId: user.id,
|
||||
})
|
||||
).toString('base64'),
|
||||
publishTime: new Date().toISOString(),
|
||||
},
|
||||
}
|
||||
// mock Readwise Highlight API
|
||||
nock(READWISE_API_URL, {
|
||||
reqheaders: {
|
||||
Authorization: `Token ${integration.token}`,
|
||||
ContentType: 'application/json',
|
||||
},
|
||||
})
|
||||
.post('/highlights', highlightsData)
|
||||
.reply(200)
|
||||
})
|
||||
|
||||
it('returns 200 with OK', async () => {
|
||||
const res = await request
|
||||
.post(endpoint(token, integrationType, action))
|
||||
.send(data)
|
||||
.expect(200)
|
||||
expect(res.text).to.eql('OK')
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user