From 07eb97e7cc992a1419fee1998971dfa788173b90 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Fri, 27 Oct 2023 12:10:59 +0800 Subject: [PATCH] remove unused code --- .../api/src/resolvers/integrations/index.ts | 51 ++- packages/api/src/routers/auth/auth_types.ts | 5 + packages/api/src/routers/auth/jwt_helpers.ts | 15 +- packages/api/src/routers/svc/integrations.ts | 295 +----------- .../api/src/services/integrations/index.ts | 16 +- .../src/services/integrations/integration.ts | 21 +- .../api/src/services/integrations/pocket.ts | 10 +- .../api/src/services/integrations/readwise.ts | 123 +---- packages/api/src/utils/createTask.ts | 61 +-- .../api/test/resolvers/integrations.test.ts | 3 +- .../api/test/routers/integrations.test.ts | 423 ------------------ packages/api/tsconfig.json | 6 +- packages/integration-handler/.dockerignore | 5 + packages/integration-handler/.gcloudignore | 16 + .../integration-handler/Dockerfile-exporter | 26 ++ .../integration-handler/Dockerfile-importer | 26 ++ packages/integration-handler/package.json | 1 + packages/integration-handler/src/index.ts | 43 +- .../src/integrations/index.ts | 16 +- packages/integration-handler/src/item.ts | 11 +- 20 files changed, 220 insertions(+), 953 deletions(-) delete mode 100644 packages/api/test/routers/integrations.test.ts create mode 100644 packages/integration-handler/.dockerignore create mode 100644 packages/integration-handler/.gcloudignore create mode 100644 packages/integration-handler/Dockerfile-exporter create mode 100644 packages/integration-handler/Dockerfile-importer diff --git a/packages/api/src/resolvers/integrations/index.ts b/packages/api/src/resolvers/integrations/index.ts index 5f0dce401..82a67e6b5 100644 --- a/packages/api/src/resolvers/integrations/index.ts +++ b/packages/api/src/resolvers/integrations/index.ts @@ -18,10 +18,11 @@ import { SetIntegrationErrorCode, SetIntegrationSuccess, } from '../../generated/graphql' +import { createIntegrationToken } from '../../routers/auth/jwt_helpers' import { findIntegration, findIntegrations, - getIntegrationService, + getIntegrationClient, removeIntegration, saveIntegration, updateIntegration, @@ -29,8 +30,8 @@ import { import { analytics } from '../../utils/analytics' import { deleteTask, + enqueueExportToIntegration, enqueueImportFromIntegration, - enqueueSyncWithIntegration, } from '../../utils/createTask' import { authorized } from '../../utils/helpers' @@ -60,7 +61,7 @@ export const setIntegrationResolver = authorized< integrationToSave.taskName = existingIntegration.taskName } else { // Create - const integrationService = getIntegrationService(input.name) + const integrationService = getIntegrationClient(input.name) // authorize and get access token const token = await integrationService.accessToken(input.token) if (!token) { @@ -74,12 +75,27 @@ export const setIntegrationResolver = authorized< // save integration const integration = await saveIntegration(integrationToSave, uid) - if ( - integrationToSave.type === IntegrationType.Export && - (!integrationToSave.id || integrationToSave.enabled) - ) { + if (integrationToSave.type === IntegrationType.Export && !input.id) { + const authToken = await createIntegrationToken({ + uid, + token: integration.token, + }) + if (!authToken) { + log.error('failed to create auth token', { + integrationId: integration.id, + }) + return { + errorCodes: [SetIntegrationErrorCode.BadRequest], + } + } + // create a task to sync all the pages if new integration or enable integration (export type) - const taskName = await enqueueSyncWithIntegration(uid, input.name) + const taskName = await enqueueExportToIntegration( + integration.id, + integration.name, + 0, + authToken + ) log.info('enqueued task', taskName) // update task name in integration @@ -191,7 +207,7 @@ export const importFromIntegrationResolver = authorized< ImportFromIntegrationSuccess, ImportFromIntegrationError, MutationImportFromIntegrationArgs ->(async (_, { integrationId }, { claims: { uid }, log, signToken }) => { +>(async (_, { integrationId }, { claims: { uid }, log }) => { log.info('importFromIntegrationResolver') try { @@ -203,14 +219,21 @@ export const importFromIntegrationResolver = authorized< } } - const exp = Math.floor(Date.now() / 1000) + 60 * 60 * 24 // 1 day - const authToken = (await signToken( - { uid, exp }, - env.server.jwtSecret - )) as string + const authToken = await createIntegrationToken({ + uid: integration.user.id, + token: integration.token, + }) + if (!authToken) { + return { + errorCodes: [ImportFromIntegrationErrorCode.BadRequest], + } + } + // create a task to import all the pages const taskName = await enqueueImportFromIntegration( integration.id, + integration.name, + integration.syncedAt?.getTime() || 0, authToken ) // update task name in integration diff --git a/packages/api/src/routers/auth/auth_types.ts b/packages/api/src/routers/auth/auth_types.ts index b9e07e64c..56e0139af 100644 --- a/packages/api/src/routers/auth/auth_types.ts +++ b/packages/api/src/routers/auth/auth_types.ts @@ -40,3 +40,8 @@ export function isPendingUserTokenPayload( 'username' in object ) } + +export type IntegrationTokenPayload = { + uid: string + token: string +} diff --git a/packages/api/src/routers/auth/jwt_helpers.ts b/packages/api/src/routers/auth/jwt_helpers.ts index f43c155af..941982f54 100644 --- a/packages/api/src/routers/auth/jwt_helpers.ts +++ b/packages/api/src/routers/auth/jwt_helpers.ts @@ -4,6 +4,7 @@ import { promisify } from 'util' import { env } from '../../env' import { logger } from '../../utils/logger' import { + IntegrationTokenPayload, isPendingUserTokenPayload, PendingUserTokenPayload, } from './auth_types' @@ -86,11 +87,19 @@ export function suggestedUsername(name: string): string { return `${prefix}${suffix}` } -export async function createWebAuthTokenWithPayload( - payload: Record +export async function createIntegrationToken( + payload: IntegrationTokenPayload ): Promise { try { - const authToken = await signToken(payload, env.server.jwtSecret) + const exp = Math.floor(Date.now() / 1000) + 60 * 60 * 24 // 1 day + const authToken = await signToken( + { + ...payload, + exp, + }, + env.server.jwtSecret + ) + logger.info('createIntegrationToken', payload) return authToken as string } catch { return undefined diff --git a/packages/api/src/routers/svc/integrations.ts b/packages/api/src/routers/svc/integrations.ts index ae7a2c128..68db6b846 100644 --- a/packages/api/src/routers/svc/integrations.ts +++ b/packages/api/src/routers/svc/integrations.ts @@ -1,45 +1,13 @@ /* eslint-disable @typescript-eslint/no-misused-promises */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ -import { stringify } from 'csv-stringify' import express from 'express' -import { DateTime } from 'luxon' -import { v4 as uuidv4 } from 'uuid' import { Integration, IntegrationType } from '../../entity/integration' -import { LibraryItem } from '../../entity/library_item' -import { EntityType, readPushSubscription } from '../../pubsub' +import { readPushSubscription } from '../../pubsub' import { getRepository } from '../../repository' -import { Claims } from '../../resolvers/types' -import { - findIntegration, - getIntegrationService, - updateIntegration, -} from '../../services/integrations' -import { - findLibraryItemById, - searchLibraryItems, -} from '../../services/library_item' -import { getClaimsByToken } from '../../utils/auth' import { enqueueExportToIntegration } from '../../utils/createTask' import { logger } from '../../utils/logger' -import { DateFilter } from '../../utils/search' -import { createGCSFile } from '../../utils/uploads' -import { createWebAuthTokenWithPayload } from '../auth/jwt_helpers' - -export interface Message { - type?: EntityType - id?: string - userId: string - pageId?: string - articleId?: string -} - -interface ImportEvent { - integrationId: string -} - -const isImportEvent = (event: any): event is ImportEvent => - 'integrationId' in event +import { createIntegrationToken } from '../auth/jwt_helpers' export function integrationsServiceRouter() { const router = express.Router() @@ -70,10 +38,8 @@ export function integrationsServiceRouter() { // create a task to sync with each integration await Promise.all( integrations.map(async (integration) => { - const exp = Math.floor(Date.now() / 1000) + 60 * 60 * 24 // 1 day - const authToken = await createWebAuthTokenWithPayload({ + const authToken = await createIntegrationToken({ uid: integration.user.id, - exp, token: integration.token, }) @@ -101,260 +67,5 @@ export function integrationsServiceRouter() { res.status(200).send('OK') }) - router.post('/:integrationName/:action', async (req, res) => { - logger.info('start to sync with integration', { - action: req.params.action, - integrationName: req.params.integrationName, - }) - - try { - const { message: msgStr, expired } = readPushSubscription(req) - - if (!msgStr) { - return res.status(200).send('Bad Request') - } - - if (expired) { - logger.info('discarding expired message') - return res.status(200).send('Expired') - } - - const data: Message = JSON.parse(msgStr) - const userId = data.userId - const type = data.type - if (!userId) { - logger.info('No userId found in message') - res.status(200).send('Bad Request') - return - } - - const integration = await findIntegration( - { - name: req.params.integrationName.toUpperCase(), - type: IntegrationType.Export, - enabled: true, - }, - userId - ) - if (!integration) { - logger.info('No active integration found for user', { userId }) - res.status(200).send('No integration found') - return - } - - const action = req.params.action.toUpperCase() - const integrationService = getIntegrationService(integration.name) - if (action === 'SYNC_UPDATED') { - // get updated page by id - let id: string | undefined - switch (type) { - case EntityType.PAGE: - id = data.id - break - case EntityType.HIGHLIGHT: - id = data.articleId - break - case EntityType.LABEL: - id = data.pageId - break - } - if (!id) { - logger.info('No id found in message') - res.status(200).send('Bad Request') - return - } - const item = await findLibraryItemById(id, userId) - if (!item) { - logger.info('No item found for id', { id }) - res.status(200).send('No page found') - return - } - - // sync updated item with integration - logger.info('syncing updated item with integration', { - integrationId: integration.id, - itemId: item.id, - }) - - const synced = await integrationService.export(integration, [item]) - if (!synced) { - logger.info('failed to sync item', { - integrationId: integration.id, - itemId: item.id, - }) - return res.status(400).send('Failed to sync') - } - } else if (action === 'SYNC_ALL') { - // sync all pages of the user - const size = 50 - - for ( - let hasNextPage = true, - count = 0, - after = 0, - items: LibraryItem[] = []; - hasNextPage; - after += size, hasNextPage = count > after - ) { - const syncedAt = integration.syncedAt - // only sync pages that were updated after syncedAt - const dateFilters: DateFilter[] = [] - syncedAt && - dateFilters.push({ field: 'updatedAt', startDate: syncedAt }) - const { libraryItems } = await searchLibraryItems( - { from: after, size, dateFilters }, - userId - ) - items = libraryItems - const itemIds = items.map((p) => p.id) - - logger.info('syncing items', { pageIds: itemIds }) - - const synced = await integrationService.export(integration, items) - if (!synced) { - logger.error('failed to sync items', { - pageIds: itemIds, - integrationId: integration.id, - }) - return res.status(400).send('Failed to sync') - } - } - // delete task name if completed - await updateIntegration( - integration.id, - { - taskName: null, - }, - userId - ) - } else { - logger.info('unknown action', { action }) - res.status(200).send('Unknown action') - return - } - } catch (err) { - logger.error('sync with integrations failed', err) - return res.status(500).send(err) - } - - res.status(200).send('OK') - }) - - // import pages from integration task handler - router.post('/import', async (req, res) => { - logger.info('start cloud task to import pages from integration') - const token = req.cookies?.auth || req.headers?.authorization - let claims: Claims | undefined - try { - claims = await getClaimsByToken(token) - if (!claims) { - return res.status(401).send('UNAUTHORIZED') - } - } catch (err) { - logger.error('failed to get claims from token', err) - return res.status(401).send('UNAUTHORIZED') - } - - if (!isImportEvent(req.body)) { - logger.info('Invalid message') - return res.status(400).send('Bad Request') - } - - let writeStream: NodeJS.WritableStream | undefined - try { - const userId = claims.uid - const integration = await findIntegration( - { - id: req.body.integrationId, - enabled: true, - type: IntegrationType.Import, - }, - userId - ) - if (!integration) { - logger.info('No active integration found for user', { userId }) - return res.status(200).send('No integration found') - } - - const integrationService = getIntegrationService(integration.name) - // import pages from integration - logger.info('importing pages from integration', { - integrationId: integration.id, - }) - - let offset = 0 - const since = integration.syncedAt?.getTime() || 0 - let syncedAt = since - - // get pages from integration - const retrieved = await integrationService.retrieve({ - token: integration.token, - since, - offset, - }) - syncedAt = retrieved.since || Date.now() - - let retrievedData = retrieved.data - // if there are pages to import - if (retrievedData.length > 0) { - // write the list of urls to a csv file and upload it to gcs - // path style: imports///-.csv - const dateStr = DateTime.now().toISODate() - const fileUuid = uuidv4() - const fullPath = `imports/${userId}/${dateStr}/${integrationService.name}-${fileUuid}.csv` - // open a write_stream to the file - const file = createGCSFile(fullPath) - writeStream = file.createWriteStream({ - contentType: 'text/csv', - }) - // stringify the data and pipe it to the write_stream - const stringifier = stringify({ - header: true, - columns: ['url', 'state', 'labels'], - }) - stringifier.pipe(writeStream) - - // paginate api calls to the integration - do { - // write the list of urls, state and labels to the stream - retrievedData.forEach((row) => stringifier.write(row)) - - // get next pages from the integration - offset += retrievedData.length - - const retrieved = await integrationService.retrieve({ - token: integration.token, - since, - offset, - }) - syncedAt = retrieved.since || Date.now() - retrievedData = retrieved.data - - logger.info('retrieved data', { - total: offset, - size: retrievedData.length, - }) - } while (retrievedData.length > 0 && offset < 20000) // limit to 20k pages - } - - // update the integration's syncedAt and remove taskName - await updateIntegration( - integration.id, - { - syncedAt: new Date(syncedAt), - taskName: null, - }, - userId - ) - } catch (err) { - logger.error('import pages from integration failed', err) - return res.status(500).send(err) - } finally { - writeStream?.end() - } - - res.status(200).send('OK') - }) - return router } diff --git a/packages/api/src/services/integrations/index.ts b/packages/api/src/services/integrations/index.ts index 5927af9ac..286ac59e7 100644 --- a/packages/api/src/services/integrations/index.ts +++ b/packages/api/src/services/integrations/index.ts @@ -1,19 +1,19 @@ import { DeepPartial, FindOptionsWhere } from 'typeorm' import { Integration } from '../../entity/integration' import { authTrx } from '../../repository' -import { IntegrationService } from './integration' -import { PocketIntegration } from './pocket' -import { ReadwiseIntegration } from './readwise' +import { IntegrationClient } from './integration' +import { PocketClient } from './pocket' +import { ReadwiseClient } from './readwise' -const integrations: IntegrationService[] = [ - new ReadwiseIntegration(), - new PocketIntegration(), +const integrations: IntegrationClient[] = [ + new ReadwiseClient(), + new PocketClient(), ] -export const getIntegrationService = (name: string): IntegrationService => { +export const getIntegrationClient = (name: string): IntegrationClient => { const service = integrations.find((s) => s.name === name) if (!service) { - throw new Error(`Integration service not found: ${name}`) + throw new Error(`Integration client not found: ${name}`) } return service } diff --git a/packages/api/src/services/integrations/integration.ts b/packages/api/src/services/integrations/integration.ts index 7c9454543..6aa51a49f 100644 --- a/packages/api/src/services/integrations/integration.ts +++ b/packages/api/src/services/integrations/integration.ts @@ -1,5 +1,4 @@ -import { Integration } from '../../entity/integration' -import { LibraryItem, LibraryItemState } from '../../entity/library_item' +import { LibraryItemState } from '../../entity/library_item' export interface RetrievedData { url: string @@ -19,19 +18,9 @@ export interface RetrieveRequest { offset?: number } -export abstract class IntegrationService { - abstract name: string +export interface IntegrationClient { + name: string + apiUrl: string - accessToken = async (token: string): Promise => { - return Promise.resolve(null) - } - export = async ( - integration: Integration, - items: LibraryItem[] - ): Promise => { - return Promise.resolve(false) - } - retrieve = async (req: RetrieveRequest): Promise => { - return Promise.resolve({ data: [] }) - } + accessToken(token: string): Promise } diff --git a/packages/api/src/services/integrations/pocket.ts b/packages/api/src/services/integrations/pocket.ts index 0a57bb9a6..86a99f8ef 100644 --- a/packages/api/src/services/integrations/pocket.ts +++ b/packages/api/src/services/integrations/pocket.ts @@ -3,7 +3,7 @@ import { LibraryItemState } from '../../entity/library_item' import { env } from '../../env' import { logger } from '../../utils/logger' import { - IntegrationService, + IntegrationClient, RetrievedResult, RetrieveRequest, } from './integration' @@ -51,16 +51,16 @@ interface Author { name: string } -export class PocketIntegration extends IntegrationService { +export class PocketClient implements IntegrationClient { name = 'POCKET' - POCKET_API_URL = 'https://getpocket.com/v3' + apiUrl = 'https://getpocket.com/v3' headers = { 'Content-Type': 'application/json', 'X-Accept': 'application/json', } accessToken = async (token: string): Promise => { - const url = `${this.POCKET_API_URL}/oauth/authorize` + const url = `${this.apiUrl}/oauth/authorize` try { const response = await axios.post<{ access_token: string }>( url, @@ -90,7 +90,7 @@ export class PocketIntegration extends IntegrationService { count = 100, offset = 0 ): Promise => { - const url = `${this.POCKET_API_URL}/get` + const url = `${this.apiUrl}/get` try { const response = await axios.post( url, diff --git a/packages/api/src/services/integrations/readwise.ts b/packages/api/src/services/integrations/readwise.ts index cfbbf421e..42f50f744 100644 --- a/packages/api/src/services/integrations/readwise.ts +++ b/packages/api/src/services/integrations/readwise.ts @@ -1,13 +1,6 @@ import axios from 'axios' -import { updateIntegration } from '.' -import { HighlightType } from '../../entity/highlight' -import { Integration } from '../../entity/integration' -import { LibraryItem } from '../../entity/library_item' -import { env } from '../../env' -import { wait } from '../../utils/helpers' import { logger } from '../../utils/logger' -import { findHighlightsByLibraryItemId, getHighlightUrl } from '../highlights' -import { IntegrationService } from './integration' +import { IntegrationClient } from './integration' interface ReadwiseHighlight { // The highlight text, (technically the only field required in a highlight object) @@ -36,12 +29,12 @@ interface ReadwiseHighlight { highlight_url?: string } -export const READWISE_API_URL = 'https://readwise.io/api/v2' - -export class ReadwiseIntegration extends IntegrationService { +export class ReadwiseClient implements IntegrationClient { name = 'READWISE' + apiUrl = 'https://readwise.io/api/v2' + accessToken = async (token: string): Promise => { - const authUrl = `${env.readwise.apiUrl || READWISE_API_URL}/auth` + const authUrl = `${this.apiUrl}/auth` try { const response = await axios.get(authUrl, { headers: { @@ -58,110 +51,4 @@ export class ReadwiseIntegration extends IntegrationService { return null } } - export = async ( - integration: Integration, - items: LibraryItem[] - ): Promise => { - let result = true - - const highlights = await Promise.all( - items.map((item) => - this.libraryItemToReadwiseHighlight(item, integration.user.id) - ) - ) - // If there are no highlights, we will skip the sync - if (highlights.length > 0) { - result = await this.syncWithReadwise(integration.token, highlights.flat()) - } - - // update integration syncedAt if successful - if (result) { - logger.info('updating integration syncedAt') - await updateIntegration( - integration.id, - { - syncedAt: new Date(), - }, - integration.user.id - ) - } - return result - } - - libraryItemToReadwiseHighlight = async ( - item: LibraryItem, - userId: string - ): Promise => { - let highlights = item.highlights - if (!highlights) { - highlights = await findHighlightsByLibraryItemId(item.id, userId) - } - - const category = item.siteName === 'Twitter' ? 'tweets' : 'articles' - return highlights - .map((highlight) => { - // filter out highlights that are not of type highlight or have no quote - if ( - highlight.highlightType !== HighlightType.Highlight || - !highlight.quote - ) { - return undefined - } - - return { - text: highlight.quote, - title: item.title, - author: item.author || undefined, - highlight_url: getHighlightUrl(item.slug, highlight.id), - highlighted_at: new Date(highlight.createdAt).toISOString(), - category, - image_url: item.thumbnail || undefined, - // location: highlight.highlightPositionAnchorIndex || undefined, - location_type: 'order', - note: highlight.annotation || undefined, - source_type: 'omnivore', - source_url: item.originalUrl, - } - }) - .filter((highlight) => highlight !== undefined) as ReadwiseHighlight[] - } - - syncWithReadwise = async ( - token: string, - highlights: ReadwiseHighlight[], - retryCount = 0 - ): Promise => { - const url = `${env.readwise.apiUrl || READWISE_API_URL}/highlights` - try { - const response = await axios.post( - url, - { - highlights, - }, - { - headers: { - Authorization: `Token ${token}`, - 'Content-Type': 'application/json', - }, - timeout: 5000, // 5 seconds - } - ) - return response.status === 200 - } catch (error) { - logger.error(error) - - if (axios.isAxiosError(error)) { - if (error.response?.status === 429 && retryCount < 3) { - logger.info('Readwise API rate limit exceeded, retrying...') - // wait for Retry-After seconds in the header if rate limited - // max retry count is 3 - const retryAfter = error.response?.headers['retry-after'] || '10' // default to 10 seconds - await wait(parseInt(retryAfter, 10) * 1000) - return this.syncWithReadwise(token, highlights, retryCount + 1) - } - } - - return false - } - } } diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index 2e6b95bf1..be71a64a3 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -328,47 +328,6 @@ export const enqueueReminder = async ( return createdTasks[0].name } -export const enqueueSyncWithIntegration = async ( - userId: string, - integrationName: string -): Promise => { - const { GOOGLE_CLOUD_PROJECT, PUBSUB_VERIFICATION_TOKEN } = process.env - // use pubsub data format to send the userId to the task handler - const payload = { - message: { - data: Buffer.from( - JSON.stringify({ - userId, - }) - ).toString('base64'), - publishTime: new Date().toISOString(), - }, - } - - // If there is no Google Cloud Project Id exposed, it means that we are in local environment - if (env.dev.isLocal || !GOOGLE_CLOUD_PROJECT) { - return nanoid() - } - - const createdTasks = await createHttpTaskWithToken({ - project: GOOGLE_CLOUD_PROJECT, - payload, - taskHandlerUrl: `${ - env.queue.integrationTaskHandlerUrl - }/${integrationName.toLowerCase()}/sync_all?token=${PUBSUB_VERIFICATION_TOKEN}`, - priority: 'low', - }) - - if (!createdTasks || !createdTasks[0].name) { - logger.error(`Unable to get the name of the task`, { - payload, - createdTasks, - }) - throw new CreateTaskError(`Unable to get the name of the task`) - } - return createdTasks[0].name -} - export const enqueueTextToSpeech = async ({ userId, text, @@ -498,23 +457,27 @@ export const enqueueRecommendation = async ( export const enqueueImportFromIntegration = async ( integrationId: string, + integrationName: string, + syncAt: number, // unix timestamp in milliseconds authToken: string ): Promise => { const { GOOGLE_CLOUD_PROJECT } = process.env const payload = { integrationId, + integrationName, + syncAt, } const headers = { - Cookie: `auth=${authToken}`, + [OmnivoreAuthorizationHeader]: authToken, } // If there is no Google Cloud Project Id exposed, it means that we are in local environment if (env.dev.isLocal || !GOOGLE_CLOUD_PROJECT) { - if (env.queue.integrationTaskHandlerUrl) { + if (env.queue.integrationImporterUrl) { // Calling the handler function directly. setTimeout(() => { axios - .post(`${env.queue.integrationTaskHandlerUrl}/import`, payload, { + .post(env.queue.integrationImporterUrl, payload, { headers, }) .catch((error) => { @@ -528,7 +491,7 @@ export const enqueueImportFromIntegration = async ( const createdTasks = await createHttpTaskWithToken({ project: GOOGLE_CLOUD_PROJECT, payload, - taskHandlerUrl: `${env.queue.integrationTaskHandlerUrl}/import`, + taskHandlerUrl: env.queue.integrationImporterUrl, priority: 'low', requestHeaders: headers, }) @@ -557,15 +520,15 @@ export const enqueueExportToIntegration = async ( } const headers = { - Cookie: `auth=${authToken}`, + [OmnivoreAuthorizationHeader]: authToken, } // If there is no Google Cloud Project Id exposed, it means that we are in local environment if (env.dev.isLocal || !GOOGLE_CLOUD_PROJECT) { - if (env.queue.integrationTaskHandlerUrl) { + if (env.queue.integrationExporterUrl) { // Calling the handler function directly. setTimeout(() => { axios - .post(`${env.queue.integrationExporterUrl}`, payload, { + .post(env.queue.integrationExporterUrl, payload, { headers, }) .catch((error) => { @@ -579,7 +542,7 @@ export const enqueueExportToIntegration = async ( const createdTasks = await createHttpTaskWithToken({ project: GOOGLE_CLOUD_PROJECT, payload, - taskHandlerUrl: `${env.queue.integrationExporterUrl}`, + taskHandlerUrl: env.queue.integrationExporterUrl, priority: 'low', requestHeaders: headers, }) diff --git a/packages/api/test/resolvers/integrations.test.ts b/packages/api/test/resolvers/integrations.test.ts index c170dedb4..bb9e56929 100644 --- a/packages/api/test/resolvers/integrations.test.ts +++ b/packages/api/test/resolvers/integrations.test.ts @@ -11,7 +11,6 @@ import { saveIntegration, updateIntegration, } from '../../src/services/integrations' -import { READWISE_API_URL } from '../../src/services/integrations/readwise' import { deleteUser } from '../../src/services/user' import { createTestUser } from '../db' import { generateFakeUuid, graphqlRequest, request } from '../util' @@ -19,6 +18,8 @@ import { generateFakeUuid, graphqlRequest, request } from '../util' chai.use(sinonChai) describe('Integrations resolvers', () => { + const READWISE_API_URL = 'https://readwise.io/api/v2' + let loginUser: User let authToken: string diff --git a/packages/api/test/routers/integrations.test.ts b/packages/api/test/routers/integrations.test.ts deleted file mode 100644 index 9081ef2f3..000000000 --- a/packages/api/test/routers/integrations.test.ts +++ /dev/null @@ -1,423 +0,0 @@ -import { Storage } from '@google-cloud/storage' -import { expect } from 'chai' -import { DateTime } from 'luxon' -import 'mocha' -import nock from 'nock' -import sinon from 'sinon' -import { Highlight } from '../../src/entity/highlight' -import { Integration, IntegrationType } from '../../src/entity/integration' -import { LibraryItem } from '../../src/entity/library_item' -import { User } from '../../src/entity/user' -import { env } from '../../src/env' -import { PubSubRequestBody } from '../../src/pubsub' -import { createHighlight, getHighlightUrl } from '../../src/services/highlights' -import { - deleteIntegrations, - saveIntegration, - updateIntegration, -} from '../../src/services/integrations' -import { READWISE_API_URL } from '../../src/services/integrations/readwise' -import { deleteLibraryItemById } from '../../src/services/library_item' -import { deleteUser } from '../../src/services/user' -import { createTestLibraryItem, createTestUser } from '../db' -import { MockBucket } from '../mock_storage' -import { request } from '../util' - -describe('Integrations routers', () => { - const baseUrl = '/svc/pubsub/integrations' - let token: string - let user: User - let authToken: string - - before(async () => { - user = await createTestUser('fakeUser') - const res = await request - .post('/local/debug/fake-user-login') - .send({ fakeEmail: user.email }) - - const body = res.body as { authToken: string } - authToken = body.authToken - }) - - after(async () => { - await deleteUser(user.id) - }) - - describe('sync with integrations', () => { - const endpoint = (token: string, name = 'name', action = 'action') => - `${baseUrl}/${name}/${action}?token=${token}` - let action: string - let data: PubSubRequestBody - let integrationName: string - - context('when token is invalid', () => { - before(() => { - token = 'invalid-token' - }) - - it('returns 200', async () => { - return request.post(endpoint(token)).send(data).expect(200) - }) - }) - - context('when token is valid', () => { - before(() => { - token = process.env.PUBSUB_VERIFICATION_TOKEN as string - }) - - context('when data is expired', () => { - before(() => { - data = { - message: { - data: Buffer.from( - JSON.stringify({ userId: 'userId', type: 'page' }) - ).toString('base64'), - publishTime: DateTime.now().minus({ hours: 12 }).toISO(), - }, - } - }) - - it('returns 200 with Expired', async () => { - const res = await request.post(endpoint(token)).send(data).expect(200) - expect(res.text).to.eql('Expired') - }) - }) - - context('when userId is empty', () => { - before(() => { - data = { - message: { - data: Buffer.from( - JSON.stringify({ userId: '', type: 'page' }) - ).toString('base64'), - publishTime: new Date().toISOString(), - }, - } - }) - - it('returns 200', async () => { - return request.post(endpoint(token)).send(data).expect(200) - }) - }) - - context('when user exists', () => { - context('when integration not found', () => { - before(() => { - integrationName = 'READWISE' - data = { - message: { - data: Buffer.from( - JSON.stringify({ userId: user.id, type: 'page' }) - ).toString('base64'), - publishTime: new Date().toISOString(), - }, - } - }) - - it('returns 200 with No integration found', async () => { - const res = await request - .post(endpoint(token, integrationName)) - .send(data) - .expect(200) - expect(res.text).to.eql('No integration found') - }) - }) - - context('when integration is readwise and enabled', () => { - let integration: Integration - let item: LibraryItem - let highlight: Highlight - let highlightsData: any - - before(async () => { - integration = await saveIntegration( - { - user, - name: 'READWISE', - token: 'token', - }, - user.id - ) - integrationName = integration.name - // create page - item = await createTestLibraryItem(user.id) - - // create highlight - const highlightPositionPercent = 25 - highlight = await createHighlight( - { - patch: 'test patch', - quote: 'test quote', - shortId: 'test shortId', - highlightPositionPercent, - user, - libraryItem: item, - }, - item.id, - user.id - ) - // create highlights data for integration request - highlightsData = { - highlights: [ - { - text: highlight.quote, - title: item.title, - author: item.author ?? undefined, - highlight_url: getHighlightUrl(item.slug, highlight.id), - highlighted_at: highlight.createdAt.toISOString(), - category: 'articles', - image_url: item.thumbnail ?? undefined, - // location: highlightPositionPercent, - location_type: 'order', - note: highlight.annotation ?? undefined, - source_type: 'omnivore', - source_url: item.originalUrl, - }, - ], - } - }) - - after(async () => { - await deleteIntegrations(user.id, [integration.id]) - await deleteLibraryItemById(item.id) - }) - - context('when action is sync_updated', () => { - before(() => { - action = 'sync_updated' - }) - - context('when entity type is page', () => { - before(() => { - data = { - message: { - data: Buffer.from( - JSON.stringify({ - userId: user.id, - type: 'page', - id: item.id, - }) - ).toString('base64'), - publishTime: new Date().toISOString(), - }, - } - // mock Readwise Highlight API - nock(READWISE_API_URL, { - reqheaders: { - Authorization: `Token ${integration.token}`, - 'Content-Type': 'application/json', - }, - }) - .post('/highlights', highlightsData) - .reply(200) - }) - - it('returns 200 with OK', async () => { - const res = await request - .post(endpoint(token, integrationName, action)) - .send(data) - .expect(200) - expect(res.text).to.eql('OK') - }) - - context('when readwise highlight API reaches rate limits', () => { - before(() => { - // mock Readwise Highlight API with rate limits - // retry after 1 second - nock(READWISE_API_URL, { - reqheaders: { - Authorization: `Token ${integration.token}`, - 'Content-Type': 'application/json', - }, - }) - .post('/highlights') - .reply(429, 'Rate Limited', { 'Retry-After': '1' }) - // mock Readwise Highlight API after 1 second - nock(READWISE_API_URL, { - reqheaders: { - Authorization: `Token ${integration.token}`, - 'Content-Type': 'application/json', - }, - }) - .post('/highlights') - .delay(1000) - .reply(200) - }) - - it('returns 200 with OK', async () => { - const res = await request - .post(endpoint(token, integrationName, action)) - .send(data) - .expect(200) - expect(res.text).to.eql('OK') - }) - }) - }) - - context('when entity type is highlight', () => { - before(() => { - data = { - message: { - data: Buffer.from( - JSON.stringify({ - userId: user.id, - type: 'highlight', - articleId: item.id, - }) - ).toString('base64'), - publishTime: new Date().toISOString(), - }, - } - // mock Readwise Highlight API - nock(READWISE_API_URL, { - reqheaders: { - Authorization: `Token ${integration.token}`, - 'Content-Type': 'application/json', - }, - }) - .post('/highlights', highlightsData) - .reply(200) - }) - - it('returns 200 with OK', async () => { - const res = await request - .post(endpoint(token, integrationName, action)) - .send(data) - .expect(200) - expect(res.text).to.eql('OK') - }) - }) - }) - - context('when action is sync_all', () => { - before(async () => { - action = 'sync_all' - data = { - message: { - data: Buffer.from( - JSON.stringify({ - userId: user.id, - }) - ).toString('base64'), - publishTime: new Date().toISOString(), - }, - } - // mock Readwise Highlight API - nock(READWISE_API_URL, { - reqheaders: { - Authorization: `Token ${integration.token}`, - 'Content-Type': 'application/json', - }, - }) - .post('/highlights', highlightsData) - .reply(200) - await updateIntegration( - integration.id, - { - syncedAt: null, - taskName: 'some task name', - }, - user.id - ) - }) - - it('returns 200 with OK', async () => { - const res = await request - .post(endpoint(token, integrationName, action)) - .send(data) - .expect(200) - expect(res.text).to.eql('OK') - }) - }) - }) - }) - }) - }) - - describe('import from integrations router', () => { - let integration: Integration - - before(async () => { - token = 'test token' - // create integration - integration = await saveIntegration( - { - user: { id: user.id }, - name: 'POCKET', - token, - type: IntegrationType.Import, - }, - user.id - ) - - // mock Pocket API - const reqBody = { - access_token: token, - consumer_key: env.pocket.consumerKey, - state: 'all', - detailType: 'complete', - since: 0, - sort: 'oldest', - count: 100, - offset: 0, - } - nock('https://getpocket.com', { - reqheaders: { - 'content-type': 'application/json', - 'x-accept': 'application/json', - }, - }) - .post('/v3/get', reqBody) - .reply(200, { - complete: 1, - list: { - '123': { - given_url: 'https://omnivore.app/pocket-import-test,test', - state: '0', - tags: { - '1234': { - tag: 'test', - }, - '1235': { - tag: 'new', - }, - }, - }, - }, - since: Date.now() / 1000, - }) - .post('/v3/get', { - ...reqBody, - offset: 1, - }) - .reply(200, { - list: {}, - }) - - // mock cloud storage - const mockBucket = new MockBucket('test') - sinon.replace( - Storage.prototype, - 'bucket', - sinon.fake.returns(mockBucket as never) - ) - }) - - after(async () => { - sinon.restore() - await deleteIntegrations(user.id, [integration.id]) - }) - - context('when integration is pocket', () => { - it('returns 200 with OK', async () => { - return request - .post(`${baseUrl}/import`) - .send({ - integrationId: integration.id, - }) - .set('Cookie', `auth=${authToken}`) - .expect(200) - }) - }) - }) -}) diff --git a/packages/api/tsconfig.json b/packages/api/tsconfig.json index 7c8caecfd..ffa966215 100644 --- a/packages/api/tsconfig.json +++ b/packages/api/tsconfig.json @@ -6,6 +6,10 @@ "compilerOptions": { "outDir": "dist" }, - "include": ["src", "test"], + "include": [ + "src", + "test", + "../integration-handler/test/integrations.test.ts" + ], "exclude": ["./src/generated", "./test"] } diff --git a/packages/integration-handler/.dockerignore b/packages/integration-handler/.dockerignore new file mode 100644 index 000000000..d8aea4ee6 --- /dev/null +++ b/packages/integration-handler/.dockerignore @@ -0,0 +1,5 @@ +node_modules +build +.env* +Dockerfile +.dockerignore diff --git a/packages/integration-handler/.gcloudignore b/packages/integration-handler/.gcloudignore new file mode 100644 index 000000000..ccc4eb240 --- /dev/null +++ b/packages/integration-handler/.gcloudignore @@ -0,0 +1,16 @@ +# This file specifies files that are *not* uploaded to Google Cloud Platform +# using gcloud. It follows the same syntax as .gitignore, with the addition of +# "#!include" directives (which insert the entries of the given .gitignore-style +# file at that point). +# +# For more information, run: +# $ gcloud topic gcloudignore +# +.gcloudignore +# If you would like to upload your .git directory, .gitignore file or files +# from your .gitignore file, remove the corresponding line +# below: +.git +.gitignore + +node_modules diff --git a/packages/integration-handler/Dockerfile-exporter b/packages/integration-handler/Dockerfile-exporter new file mode 100644 index 000000000..60eee9d5f --- /dev/null +++ b/packages/integration-handler/Dockerfile-exporter @@ -0,0 +1,26 @@ +FROM node:18.16-alpine + +# Run everything after as non-privileged user. +WORKDIR /app + +COPY package.json . +COPY yarn.lock . +COPY tsconfig.json . +COPY .eslintrc . + +COPY /packages/integration-handler/package.json ./packages/integration-handler/package.json + +RUN yarn install --pure-lockfile + +COPY /packages/integration-handler ./packages/integration-handler +RUN yarn workspace @omnivore/integration-handler build + +# After building, fetch the production dependencies +RUN rm -rf /app/packages/integration-handler/node_modules +RUN rm -rf /app/node_modules +RUN yarn install --pure-lockfile --production + +EXPOSE 8080 + +CMD ["yarn", "workspace", "@omnivore/integration-handler", "start_exporter"] + diff --git a/packages/integration-handler/Dockerfile-importer b/packages/integration-handler/Dockerfile-importer new file mode 100644 index 000000000..e3a790b8a --- /dev/null +++ b/packages/integration-handler/Dockerfile-importer @@ -0,0 +1,26 @@ +FROM node:18.16-alpine + +# Run everything after as non-privileged user. +WORKDIR /app + +COPY package.json . +COPY yarn.lock . +COPY tsconfig.json . +COPY .eslintrc . + +COPY /packages/integration-handler/package.json ./packages/integration-handler/package.json + +RUN yarn install --pure-lockfile + +COPY /packages/integration-handler ./packages/integration-handler +RUN yarn workspace @omnivore/integration-handler build + +# After building, fetch the production dependencies +RUN rm -rf /app/packages/integration-handler/node_modules +RUN rm -rf /app/node_modules +RUN yarn install --pure-lockfile --production + +EXPOSE 8080 + +CMD ["yarn", "workspace", "@omnivore/integration-handler", "start_importer"] + diff --git a/packages/integration-handler/package.json b/packages/integration-handler/package.json index cf8a5fe0b..e1ccc1fcb 100644 --- a/packages/integration-handler/package.json +++ b/packages/integration-handler/package.json @@ -30,6 +30,7 @@ "@sentry/serverless": "^7.30.0", "axios": "^1.2.2", "csv-stringify": "^6.4.0", + "dotenv": "^16.0.1", "jsonwebtoken": "^8.5.1", "luxon": "^3.2.1", "uuid": "^9.0.0" diff --git a/packages/integration-handler/src/index.ts b/packages/integration-handler/src/index.ts index 73a3c23e2..d32ef8a08 100644 --- a/packages/integration-handler/src/index.ts +++ b/packages/integration-handler/src/index.ts @@ -1,11 +1,12 @@ +import { File, Storage } from '@google-cloud/storage' import * as Sentry from '@sentry/serverless' -import * as jwt from 'jsonwebtoken' -import { getIntegrationClient, updateIntegration } from './integrations' -import { search } from './item' import { stringify } from 'csv-stringify' +import * as dotenv from 'dotenv' // see https://github.com/motdotla/dotenv#how-do-i-use-dotenv-with-import +import * as jwt from 'jsonwebtoken' import { DateTime } from 'luxon' import { v4 as uuidv4 } from 'uuid' -import { File, Storage } from '@google-cloud/storage' +import { getIntegrationClient, updateIntegration } from './integrations' +import { search } from './item' interface IntegrationRequest { integrationId: string @@ -18,6 +19,8 @@ interface Claims { token: string } +dotenv.config() + Sentry.GCPFunction.init({ dsn: process.env.SENTRY_DSN, tracesSampleRate: 0, @@ -43,6 +46,8 @@ const createGCSFile = (bucket: string, filename: string): File => { export const exporter = Sentry.GCPFunction.wrapHttpFunction( async (req, res) => { + console.log('start to export to integration') + const JWT_SECRET = process.env.JWT_SECRET const REST_BACKEND_ENDPOINT = process.env.REST_BACKEND_ENDPOINT @@ -50,8 +55,7 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction( return res.status(500).send('Environment not configured correctly') } - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - const token = (req.cookies?.token || req.headers.authorization) as string + const token = req.get('Omnivore-Authorization') if (!token) { return res.status(401).send({ errorCode: 'INVALID_TOKEN' }) } @@ -77,9 +81,10 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction( let hasMore = true let after = '0' while (hasMore) { + console.log('searching for items...') const response = await search( REST_BACKEND_ENDPOINT, - claims.token, + token, client.highlightOnly, new Date(syncAt), '50', @@ -100,6 +105,7 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction( break } + console.log('exporting items...') const synced = await client.export(claims.token, items) if (!synced) { console.error('failed to export item', { @@ -108,6 +114,7 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction( return res.status(400).send('Failed to sync') } + console.log('updating integration...') // update integration syncedAt if successful const updated = await updateIntegration( REST_BACKEND_ENDPOINT, @@ -115,7 +122,8 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction( items[items.length - 1].updatedAt, integrationName, claims.token, - token + token, + 'EXPORT' ) if (!updated) { @@ -128,6 +136,8 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction( // avoid rate limiting await wait(500) } + + console.log('done') } catch (err) { console.error('export with integration failed', err) return res.status(500).send(err) @@ -139,16 +149,17 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction( export const importer = Sentry.GCPFunction.wrapHttpFunction( async (req, res) => { + console.log('start to import from integration') + const JWT_SECRET = process.env.JWT_SECRET const REST_BACKEND_ENDPOINT = process.env.REST_BACKEND_ENDPOINT - const GCS_BUCKET = process.env.GCS_BUCKET + const GCS_BUCKET = process.env.GCS_UPLOAD_BUCKET if (!JWT_SECRET || !REST_BACKEND_ENDPOINT || !GCS_BUCKET) { return res.status(500).send('Environment not configured correctly') } - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - const token = (req.cookies?.token || req.headers.authorization) as string + const token = req.get('Omnivore-Authorization') if (!token) { return res.status(401).send({ errorCode: 'INVALID_TOKEN' }) } @@ -175,6 +186,7 @@ export const importer = Sentry.GCPFunction.wrapHttpFunction( let syncedAt = req.body.syncAt const since = syncedAt + console.log('importing pages from integration...') // get pages from integration const retrieved = await integrationClient.retrieve({ token: claims.token, @@ -183,6 +195,8 @@ export const importer = Sentry.GCPFunction.wrapHttpFunction( }) syncedAt = retrieved.since || Date.now() + console.log('uploading items...') + let retrievedData = retrieved.data // if there are pages to import if (retrievedData.length > 0) { @@ -224,6 +238,7 @@ export const importer = Sentry.GCPFunction.wrapHttpFunction( size: retrievedData.length, }) + console.log('uploading integration...') // update the integration's syncedAt and remove taskName const result = await updateIntegration( REST_BACKEND_ENDPOINT, @@ -231,7 +246,8 @@ export const importer = Sentry.GCPFunction.wrapHttpFunction( new Date(syncedAt), req.body.integrationName, claims.token, - token + token, + 'IMPORT' ) if (!result) { console.error('failed to update integration', { @@ -241,10 +257,13 @@ export const importer = Sentry.GCPFunction.wrapHttpFunction( } } while (retrievedData.length > 0 && offset < 20000) // limit to 20k pages } + + console.log('done') } catch (err) { console.error('import pages from integration failed', err) return res.status(500).send(err) } finally { + console.log('closing write stream') writeStream?.end() } diff --git a/packages/integration-handler/src/integrations/index.ts b/packages/integration-handler/src/integrations/index.ts index f3aa66913..65c35a047 100644 --- a/packages/integration-handler/src/integrations/index.ts +++ b/packages/integration-handler/src/integrations/index.ts @@ -30,7 +30,8 @@ export const updateIntegration = async ( syncedAt: Date, name: string, integrationToken: string, - token: string + token: string, + type: string ): Promise => { const requestData = JSON.stringify({ query: ` @@ -48,11 +49,14 @@ export const updateIntegration = async ( } }`, variables: { - id, - syncedAt, - name, - token: integrationToken, - enabled: true, + input: { + id, + syncedAt, + name, + token: integrationToken, + enabled: true, + type, + }, }, }) diff --git a/packages/integration-handler/src/item.ts b/packages/integration-handler/src/item.ts index 6d09b4451..cb535904c 100644 --- a/packages/integration-handler/src/item.ts +++ b/packages/integration-handler/src/item.ts @@ -59,11 +59,8 @@ export const search = async ( node { id slug - labels { - id - } - isArchived - readingProgressPercent + url + updatedAt title image author @@ -77,6 +74,10 @@ export const search = async ( } } } + pageInfo { + hasNextPage + endCursor + } } ... on SearchError { errorCodes