From 10c01c12f27d8e55c0e28d13ed819c4358bef2ce Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Wed, 1 Mar 2023 23:15:03 +0800 Subject: [PATCH] Add test case for integration import task handler --- .../api/src/resolvers/integrations/index.ts | 13 ++- packages/api/src/routers/svc/integrations.ts | 100 +++++++++--------- .../src/services/integrations/integration.ts | 4 +- .../api/src/services/integrations/pocket.ts | 16 +-- packages/api/src/utils/createTask.ts | 8 +- .../api/test/resolvers/integrations.test.ts | 28 ----- .../api/test/routers/integrations.test.ts | 97 ++++++++++++++--- 7 files changed, 163 insertions(+), 103 deletions(-) diff --git a/packages/api/src/resolvers/integrations/index.ts b/packages/api/src/resolvers/integrations/index.ts index d7f402602..808affbe5 100644 --- a/packages/api/src/resolvers/integrations/index.ts +++ b/packages/api/src/resolvers/integrations/index.ts @@ -228,7 +228,7 @@ export const importFromIntegrationResolver = authorized< ImportFromIntegrationSuccess, ImportFromIntegrationError, MutationImportFromIntegrationArgs ->(async (_, { integrationId }, { claims: { uid }, log }) => { +>(async (_, { integrationId }, { claims: { uid }, log, signToken }) => { log.info('importFromIntegrationResolver') try { @@ -243,8 +243,17 @@ export const importFromIntegrationResolver = authorized< } } + const exp = Math.floor(Date.now() / 1000) + 60 * 60 * 24 // 1 day + const authToken = (await signToken( + { uid, exp }, + env.server.jwtSecret + )) as string // create a task to import all the pages - const taskName = await enqueueImportFromIntegration(uid, integration.id) + const taskName = await enqueueImportFromIntegration( + uid, + integration.id, + authToken + ) // update task name in integration await getRepository(Integration).update(integration.id, { taskName }) diff --git a/packages/api/src/routers/svc/integrations.ts b/packages/api/src/routers/svc/integrations.ts index 269eb1163..2cd89229d 100644 --- a/packages/api/src/routers/svc/integrations.ts +++ b/packages/api/src/routers/svc/integrations.ts @@ -13,6 +13,8 @@ import { DateFilter } from '../../utils/search' import { DateTime } from 'luxon' import { createGCSFile } from '../../utils/uploads' import { v4 as uuidv4 } from 'uuid' +import { getClaimsByToken } from '../../utils/auth' +import { Claims } from '../../resolvers/types' export interface Message { type?: EntityType @@ -23,12 +25,11 @@ export interface Message { } interface ImportEvent { - userId: string integrationId: string } const isImportEvent = (event: any): event is ImportEvent => - 'userId' in event && 'integrationId' in event + 'integrationId' in event const logger = buildLogger('app.dispatch') @@ -169,56 +170,57 @@ export function integrationsServiceRouter() { res.status(500).send(err) } }) - // import pages from integration + // import pages from integration task handler router.post('/import', async (req, res) => { - logger.info('start to import pages from integration') - const { message: msgStr, expired } = readPushSubscription(req) - - if (!msgStr) { - return res.status(400).send('Bad Request') + logger.info('start cloud task to import pages from integration') + const token = req.cookies?.auth || req.headers?.authorization + let claims: Claims | undefined + try { + claims = await getClaimsByToken(token) + if (!claims) { + return res.status(401).send('UNAUTHORIZED') + } + } catch (err) { + logger.error('failed to get claims from token', err) + return res.status(401).send('UNAUTHORIZED') } - if (expired) { - logger.info('discarding expired message') - return res.status(200).send('Expired') - } - - const data = JSON.parse(msgStr) - if (!isImportEvent(data)) { + if (!isImportEvent(req.body)) { logger.info('Invalid message') return res.status(400).send('Bad Request') } - const userId = data.userId - const integration = await getRepository(Integration).findOneBy({ - user: { id: userId }, - id: data.integrationId, - enabled: true, - type: IntegrationType.Import, - }) - if (!integration) { - logger.info('No active integration found for user', { userId }) - return res.status(200).send('No integration found') - } - - const integrationService = getIntegrationService(integration.name) - // import pages from integration - logger.info('importing pages from integration', { - integrationId: integration.id, - }) - - // write the list of urls to a csv file and upload it to gcs - // path style: imports///-.csv - const dateStr = DateTime.now().toISODate() - const fileUuid = uuidv4() - const fullPath = `imports/${integration.user.id}/${dateStr}/URL_LIST-${fileUuid}.csv` - // open a write_stream to the file - const file = createGCSFile(fullPath) - const writeStream = file.createWriteStream({ - contentType: 'text/csv', - }) - + let writeStream: NodeJS.WritableStream | undefined try { + const userId = claims.uid + const integration = await getRepository(Integration).findOneBy({ + user: { id: userId }, + id: req.body.integrationId, + enabled: true, + type: IntegrationType.Import, + }) + if (!integration) { + logger.info('No active integration found for user', { userId }) + return res.status(200).send('No integration found') + } + + const integrationService = getIntegrationService(integration.name) + // import pages from integration + logger.info('importing pages from integration', { + integrationId: integration.id, + }) + + // write the list of urls to a csv file and upload it to gcs + // path style: imports///-.csv + const dateStr = DateTime.now().toISODate() + const fileUuid = uuidv4() + const fullPath = `imports/${userId}/${dateStr}/URL_LIST-${fileUuid}.csv` + // open a write_stream to the file + const file = createGCSFile(fullPath) + writeStream = file.createWriteStream({ + contentType: 'text/csv', + }) + let hasMore = true let offset = 0 let since = integration.syncedAt?.getTime() || 0 @@ -246,16 +248,16 @@ export function integrationsServiceRouter() { } // update the integration's syncedAt await getRepository(Integration).update(integration.id, { - syncedAt: since, + syncedAt: new Date(since), }) - - res.status(200).send('OK') } catch (err) { logger.error('import pages from integration failed', err) - res.status(500).send(err) + return res.status(500).send(err) } finally { - writeStream.end() + writeStream?.end() } + + res.status(200).send('OK') }) return router diff --git a/packages/api/src/services/integrations/integration.ts b/packages/api/src/services/integrations/integration.ts index d91e63030..e4c17051c 100644 --- a/packages/api/src/services/integrations/integration.ts +++ b/packages/api/src/services/integrations/integration.ts @@ -10,12 +10,12 @@ export interface RetrievedData { export interface RetrievedResult { data: RetrievedData[] hasMore?: boolean - since?: number + since?: number // unix timestamp in milliseconds } export interface RetrieveRequest { token: string - since?: number + since?: number // unix timestamp in milliseconds count?: number offset?: number } diff --git a/packages/api/src/services/integrations/pocket.ts b/packages/api/src/services/integrations/pocket.ts index f16b4fd48..43e3646f2 100644 --- a/packages/api/src/services/integrations/pocket.ts +++ b/packages/api/src/services/integrations/pocket.ts @@ -8,12 +8,12 @@ import axios from 'axios' import { env } from '../../env' interface PocketResponse { - status: number - complete: number + status: number // 1 if success + complete: number // 1 if all items have been returned list: { [key: string]: PocketItem } - since: number + since: number // unix timestamp in seconds search_meta: { search_type: string } @@ -31,10 +31,10 @@ interface PocketItem { status: string excerpt: string word_count: string - tags: { + tags?: { [key: string]: Tag } - authors: { + authors?: { [key: string]: Author } } @@ -80,7 +80,7 @@ export class PocketIntegration extends IntegrationService { retrievePocketData = async ( accessToken: string, - since: number, + since: number, // unix timestamp in seconds count = 100, offset = 0 ): Promise => { @@ -106,7 +106,7 @@ export class PocketIntegration extends IntegrationService { return response.data } catch (error) { console.log('error retrieving pocket data', error) - throw error + throw new Error('Error retrieving pocket data') } } @@ -130,7 +130,7 @@ export class PocketIntegration extends IntegrationService { } const data = pocketItems.map((item) => ({ url: item.given_url, - labels: Object.values(item.tags).map((tag) => tag.tag), + labels: Object.values(item.tags ?? {}).map((tag) => tag.tag), state: statusToState[item.status], })) return { diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index f314b256d..b6b16262b 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -443,14 +443,17 @@ export const enqueueRecommendation = async ( export const enqueueImportFromIntegration = async ( userId: string, - integrationId: string + integrationId: string, + authToken: string ): Promise => { const { GOOGLE_CLOUD_PROJECT } = process.env const payload = { - userId, integrationId, } + const headers = { + Cookie: `auth=${authToken}`, + } // If there is no Google Cloud Project Id exposed, it means that we are in local environment if (env.dev.isLocal || !GOOGLE_CLOUD_PROJECT) { return nanoid() @@ -461,6 +464,7 @@ export const enqueueImportFromIntegration = async ( payload, taskHandlerUrl: `${env.queue.integrationTaskHandlerUrl}/import`, priority: 'low', + requestHeaders: headers, }) if (!createdTasks || !createdTasks[0].name) { diff --git a/packages/api/test/resolvers/integrations.test.ts b/packages/api/test/resolvers/integrations.test.ts index c25b31715..22c3e5f37 100644 --- a/packages/api/test/resolvers/integrations.test.ts +++ b/packages/api/test/resolvers/integrations.test.ts @@ -377,38 +377,10 @@ describe('Integrations resolvers', () => { name: 'POCKET', token: 'fakeToken', }) - - // nock('https://getpocket.com', { - // reqheaders: { - // 'content-type': 'application/json', - // }, - // }) - // .post('/v3/get', { - // access_token: existingIntegration.token, - // consumer_key: '', - // state: 'all', - // detailType: 'complete', - // since: 0, - // sort: 'oldest', - // count: 100, - // offset: 0, - // }) - // .reply(200, { - // list: { - // '123': { - // given_url: 'https://omnivore.app/pocket-import-test', - // }, - // }, - // }) - // - // sinon.replace(uploads, 'uploadToBucket', () => { - // return Promise.resolve() - // }) }) after(async () => { await deleteTestIntegrations(loginUser.id, [existingIntegration.id]) - // sinon.restore() }) it('returns success and starts cloud task', async () => { diff --git a/packages/api/test/routers/integrations.test.ts b/packages/api/test/routers/integrations.test.ts index 9ff06905d..961aec012 100644 --- a/packages/api/test/routers/integrations.test.ts +++ b/packages/api/test/routers/integrations.test.ts @@ -8,7 +8,7 @@ import { } from '../../src/datalayer/pubsub' import { User } from '../../src/entity/user' import { createTestUser, deleteTestIntegrations, deleteTestUser } from '../db' -import { Integration } from '../../src/entity/integration' +import { Integration, IntegrationType } from '../../src/entity/integration' import { getRepository } from '../../src/entity/utils' import { Highlight, @@ -21,13 +21,33 @@ import { addHighlightToPage } from '../../src/elastic/highlights' import { getHighlightUrl } from '../../src/services/highlights' import { deletePage } from '../../src/elastic/pages' import { READWISE_API_URL } from '../../src/services/integrations/readwise' +import sinon from 'sinon' +import { Storage } from '@google-cloud/storage' +import { MockBucket } from '../mock_storage' describe('Integrations routers', () => { + const baseUrl = '/svc/pubsub/integrations' let token: string + let user: User + let authToken: string + + before(async () => { + user = await createTestUser('fakeUser') + const res = await request + .post('/local/debug/fake-user-login') + .send({ fakeEmail: user.email }) + + const body = res.body as { authToken: string } + authToken = body.authToken + }) + + after(async () => { + await deleteTestUser(user.id) + }) describe('sync with integrations', () => { const endpoint = (token: string, name = 'name', action = 'action') => - `/svc/pubsub/integrations/${name}/${action}?token=${token}` + `${baseUrl}/${name}/${action}?token=${token}` let action: string let data: PubSubRequestBody let integrationName: string @@ -83,16 +103,6 @@ describe('Integrations routers', () => { }) context('when user exists', () => { - let user: User - - before(async () => { - user = await createTestUser('fakeUser') - }) - - after(async () => { - await deleteTestUser(user.id) - }) - context('when integration not found', () => { before(() => { integrationName = 'READWISE' @@ -323,4 +333,67 @@ describe('Integrations routers', () => { }) }) }) + + describe('import from integrations router', () => { + let integration: Integration + + before(async () => { + token = 'test token' + // create integration + integration = await getRepository(Integration).save({ + user: { id: user.id }, + name: 'POCKET', + token, + type: IntegrationType.Import, + }) + // mock cloud storage bucket + sinon.stub(Storage, 'Bucket').returns(MockBucket) + // mock Pocket API + nock('https://getpocket.com', { + reqheaders: { + 'content-type': 'application/json', + 'x-accept': 'application/json', + }, + }) + .post('/v3/get', { + access_token: token, + consumer_key: process.env.POCKET_CONSUMER_KEY, + state: 'all', + detailType: 'complete', + since: 0, + sort: 'oldest', + count: 100, + offset: 0, + }) + .reply(200, { + complete: 1, + list: { + '123': { + given_url: 'https://omnivore.app/pocket-import-test', + state: '0', + }, + }, + since: Date.now() / 1000, + }) + }) + + after(async () => { + sinon.restore() + await deleteTestIntegrations(user.id, [integration.id]) + }) + + context('when integration is pocket', () => { + it('returns 200 with OK', async () => { + const res = await request + .post(`${baseUrl}/import`) + .send({ + integrationId: integration.id, + }) + .set('Cookie', `auth=${authToken}`) + .expect(200) + + expect(res.text).to.eql('OK') + }) + }) + }) })