From 47f67c237dc81637b10a03839821b3f626bebac4 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Thu, 26 Oct 2023 15:42:20 +0800 Subject: [PATCH 01/16] change source to pocket if item is imported from pocket --- packages/api/src/routers/svc/integrations.ts | 2 +- packages/api/src/services/save_page.ts | 10 +++++++-- packages/import-handler/src/csv.ts | 2 +- packages/import-handler/src/index.ts | 23 +++++++++++++++++--- 4 files changed, 30 insertions(+), 7 deletions(-) diff --git a/packages/api/src/routers/svc/integrations.ts b/packages/api/src/routers/svc/integrations.ts index d06c2d44e..1fc201d2d 100644 --- a/packages/api/src/routers/svc/integrations.ts +++ b/packages/api/src/routers/svc/integrations.ts @@ -241,7 +241,7 @@ export function integrationsServiceRouter() { // path style: imports///-.csv const dateStr = DateTime.now().toISODate() const fileUuid = uuidv4() - const fullPath = `imports/${userId}/${dateStr}/URL_LIST-${fileUuid}.csv` + const fullPath = `imports/${userId}/${dateStr}/${integrationService.name}-${fileUuid}.csv` // open a write_stream to the file const file = createGCSFile(fullPath) writeStream = file.createWriteStream({ diff --git a/packages/api/src/services/save_page.ts b/packages/api/src/services/save_page.ts index 4f436c5fe..6e6b7b954 100644 --- a/packages/api/src/services/save_page.ts +++ b/packages/api/src/services/save_page.ts @@ -36,7 +36,12 @@ const FORCE_PUPPETEER_URLS = [ TWEET_URL_REGEX, /^((?:https?:)?\/\/)?((?:www|m)\.)?((?:youtube\.com|youtu.be))(\/(?:[\w-]+\?v=|embed\/|v\/)?)([\w-]+)(\S+)?$/, ] -const ALREADY_PARSED_SOURCES = ['puppeteer-parse', 'csv-importer', 'rss-feeder'] +const ALREADY_PARSED_SOURCES = [ + 'puppeteer-parse', + 'csv-importer', + 'rss-feeder', + 'pocket', +] const createSlug = (url: string, title?: Maybe | undefined) => { const { pathname } = new URL(url) @@ -93,7 +98,8 @@ export const savePage = async ( state: input.state || undefined, rssFeedUrl: input.rssFeedUrl, }) - const isImported = input.source === 'csv-importer' + const isImported = + input.source === 'csv-importer' || input.source === 'pocket' // always parse in backend if the url is in the force puppeteer list if (shouldParseInBackend(input)) { diff --git a/packages/import-handler/src/csv.ts b/packages/import-handler/src/csv.ts index d04852c75..fa3d209e1 100644 --- a/packages/import-handler/src/csv.ts +++ b/packages/import-handler/src/csv.ts @@ -46,7 +46,7 @@ const parseDate = (date: string): Date => { export const importCsv = async (ctx: ImportContext, stream: Stream) => { // create metrics in redis - await createMetrics(ctx.redisClient, ctx.userId, ctx.taskId, 'csv-importer') + await createMetrics(ctx.redisClient, ctx.userId, ctx.taskId, ctx.source) const parser = parse({ headers: true, diff --git a/packages/import-handler/src/index.ts b/packages/import-handler/src/index.ts index 9fa090f78..599219168 100644 --- a/packages/import-handler/src/index.ts +++ b/packages/import-handler/src/index.ts @@ -61,6 +61,7 @@ export type ImportContext = { contentHandler: ContentHandler redisClient: RedisClient taskId: string + source: string } type importHandlerFunc = (ctx: ImportContext, stream: Stream) => Promise @@ -176,13 +177,28 @@ const handlerForFile = (name: string): importHandlerFunc | undefined => { const fileName = path.parse(name).name if (fileName.startsWith('MATTER')) { return importMatterArchive - } else if (fileName.startsWith('URL_LIST')) { + } else if (fileName.startsWith('URL_LIST') || fileName.startsWith('POCKET')) { return importCsv } return undefined } +const importSource = (name: string): string => { + const fileName = path.parse(name).name + if (fileName.startsWith('MATTER')) { + return 'matter-history' + } + if (fileName.startsWith('URL_LIST')) { + return 'csv-importer' + } + if (fileName.startsWith('POCKET')) { + return 'pocket' + } + + return 'unknown' +} + const urlHandler = async ( ctx: ImportContext, url: URL, @@ -196,7 +212,7 @@ const urlHandler = async ( const result = await importURL( ctx.userId, url, - 'csv-importer', + ctx.source, ctx.taskId, state, labels && labels.length > 0 ? labels : undefined, @@ -309,7 +325,7 @@ const handleEvent = async (data: StorageEvent, redisClient: RedisClient) => { .file(data.name) .createReadStream() - const ctx = { + const ctx: ImportContext = { userId, countImported: 0, countFailed: 0, @@ -317,6 +333,7 @@ const handleEvent = async (data: StorageEvent, redisClient: RedisClient) => { contentHandler, redisClient, taskId: data.name, + source: importSource(data.name), } await handler(ctx, stream) From 5edba30e232cae1da41ddc2c9e27eeb200ef45ed Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Thu, 26 Oct 2023 18:38:39 +0800 Subject: [PATCH 02/16] create an integration handler for syncing with readwise --- packages/api/src/generated/graphql.ts | 1 + packages/api/src/generated/schema.graphql | 1 + .../api/src/resolvers/integrations/index.ts | 1 + packages/api/src/schema.ts | 1 + packages/integration-handler/.eslintignore | 4 + packages/integration-handler/.eslintrc | 6 + .../integration-handler/mocha-config.json | 5 + packages/integration-handler/package.json | 56 ++++++++ packages/integration-handler/src/index.ts | 127 +++++++++++++++++ .../src/integrations/index.ts | 82 +++++++++++ .../src/integrations/integration.ts | 33 +++++ .../src/integrations/pocket.ts | 132 ++++++++++++++++++ .../src/integrations/readwise.ts | 114 +++++++++++++++ packages/integration-handler/src/item.ts | 114 +++++++++++++++ .../test/babel-register.js | 3 + .../integration-handler/test/stub.test.ts | 8 ++ packages/integration-handler/tsconfig.json | 11 ++ 17 files changed, 699 insertions(+) create mode 100644 packages/integration-handler/.eslintignore create mode 100644 packages/integration-handler/.eslintrc create mode 100644 packages/integration-handler/mocha-config.json create mode 100644 packages/integration-handler/package.json create mode 100644 packages/integration-handler/src/index.ts create mode 100644 packages/integration-handler/src/integrations/index.ts create mode 100644 packages/integration-handler/src/integrations/integration.ts create mode 100644 packages/integration-handler/src/integrations/pocket.ts create mode 100644 packages/integration-handler/src/integrations/readwise.ts create mode 100644 packages/integration-handler/src/item.ts create mode 100644 packages/integration-handler/test/babel-register.js create mode 100644 packages/integration-handler/test/stub.test.ts create mode 100644 packages/integration-handler/tsconfig.json diff --git a/packages/api/src/generated/graphql.ts b/packages/api/src/generated/graphql.ts index fc5e22609..e42282815 100644 --- a/packages/api/src/generated/graphql.ts +++ b/packages/api/src/generated/graphql.ts @@ -2376,6 +2376,7 @@ export type SetIntegrationInput = { enabled: Scalars['Boolean']; id?: InputMaybe; name: Scalars['String']; + syncedAt?: InputMaybe; token: Scalars['String']; type?: InputMaybe; }; diff --git a/packages/api/src/generated/schema.graphql b/packages/api/src/generated/schema.graphql index 378c8bce1..dfb7c1677 100644 --- a/packages/api/src/generated/schema.graphql +++ b/packages/api/src/generated/schema.graphql @@ -1835,6 +1835,7 @@ input SetIntegrationInput { enabled: Boolean! id: ID name: String! + syncedAt: Date token: String! type: IntegrationType } diff --git a/packages/api/src/resolvers/integrations/index.ts b/packages/api/src/resolvers/integrations/index.ts index 8bf1b396a..5f0dce401 100644 --- a/packages/api/src/resolvers/integrations/index.ts +++ b/packages/api/src/resolvers/integrations/index.ts @@ -45,6 +45,7 @@ export const setIntegrationResolver = authorized< user: { id: uid }, id: input.id || undefined, type: input.type || IntegrationType.Export, + syncedAt: input.syncedAt ? new Date(input.syncedAt) : undefined, } if (input.id) { // Update diff --git a/packages/api/src/schema.ts b/packages/api/src/schema.ts index ac13b36b9..d257b5069 100755 --- a/packages/api/src/schema.ts +++ b/packages/api/src/schema.ts @@ -1978,6 +1978,7 @@ const schema = gql` type: IntegrationType token: String! enabled: Boolean! + syncedAt: Date } union IntegrationsResult = IntegrationsSuccess | IntegrationsError diff --git a/packages/integration-handler/.eslintignore b/packages/integration-handler/.eslintignore new file mode 100644 index 000000000..b741470fc --- /dev/null +++ b/packages/integration-handler/.eslintignore @@ -0,0 +1,4 @@ +node_modules/ +dist/ +readabilityjs/ +src/generated/ diff --git a/packages/integration-handler/.eslintrc b/packages/integration-handler/.eslintrc new file mode 100644 index 000000000..e006282a6 --- /dev/null +++ b/packages/integration-handler/.eslintrc @@ -0,0 +1,6 @@ +{ + "extends": "../../.eslintrc", + "parserOptions": { + "project": "tsconfig.json" + } +} \ No newline at end of file diff --git a/packages/integration-handler/mocha-config.json b/packages/integration-handler/mocha-config.json new file mode 100644 index 000000000..44d1d24c1 --- /dev/null +++ b/packages/integration-handler/mocha-config.json @@ -0,0 +1,5 @@ +{ + "extension": ["ts"], + "spec": "test/**/*.test.ts", + "require": "test/babel-register.js" + } \ No newline at end of file diff --git a/packages/integration-handler/package.json b/packages/integration-handler/package.json new file mode 100644 index 000000000..c25454038 --- /dev/null +++ b/packages/integration-handler/package.json @@ -0,0 +1,56 @@ +{ + "name": "@omnivore/integration-handler", + "version": "1.0.0", + "description": "", + "main": "build/src/index.js", + "files": [ + "build/src" + ], + "keywords": [], + "license": "Apache-2.0", + "scripts": { + "test": "yarn mocha -r ts-node/register --config mocha-config.json", + "lint": "eslint src --ext ts,js,tsx,jsx", + "compile": "tsc", + "build": "tsc", + "start_exporter": "functions-framework --target=exporter" + }, + "devDependencies": { + "@types/chai": "^4.3.4", + "@types/chai-string": "^1.4.2", + "@types/dompurify": "^2.4.0", + "@types/fs-extra": "^11.0.1", + "@types/glob": "^8.0.1", + "@types/jsonwebtoken": "^8.5.0", + "@types/mocha": "^10.0.1", + "@types/node": "^14.11.2", + "@types/unzip-stream": "^0.3.1", + "@types/urlsafe-base64": "^1.0.28", + "@types/uuid": "^9.0.0", + "copyfiles": "^2.4.1", + "eslint-plugin-prettier": "^4.0.0" + }, + "dependencies": { + "@fast-csv/parse": "^4.3.6", + "@google-cloud/functions-framework": "3.1.2", + "@google-cloud/storage": "^7.0.1", + "@google-cloud/tasks": "^4.0.0", + "@omnivore/readability": "1.0.0", + "@sentry/serverless": "^7.30.0", + "@types/express": "^4.17.13", + "axios": "^1.2.2", + "dompurify": "^2.4.3", + "fs-extra": "^11.1.0", + "glob": "^8.1.0", + "jsonwebtoken": "^8.5.1", + "linkedom": "^0.14.21", + "nodemon": "^2.0.15", + "redis": "^4.3.1", + "unzip-stream": "^0.3.1", + "urlsafe-base64": "^1.0.0", + "uuid": "^9.0.0" + }, + "volta": { + "extends": "../../package.json" + } +} diff --git a/packages/integration-handler/src/index.ts b/packages/integration-handler/src/index.ts new file mode 100644 index 000000000..349c7c65d --- /dev/null +++ b/packages/integration-handler/src/index.ts @@ -0,0 +1,127 @@ +import * as Sentry from '@sentry/serverless' +import * as jwt from 'jsonwebtoken' +import { getIntegrationClient, updateIntegration } from './integrations' +import { search } from './item' + +interface ExportRequest { + integrationId: string + syncAt: number // unix timestamp in milliseconds + integrationName: string +} + +interface Claims { + uid: string + token: string +} + +Sentry.GCPFunction.init({ + dsn: process.env.SENTRY_DSN, + tracesSampleRate: 0, +}) + +export const wait = (ms: number): Promise => { + return new Promise((resolve) => { + setTimeout(resolve, ms) + }) +} + +function isExportRequest(body: any): body is ExportRequest { + return ( + 'integrationId' in body && 'syncAt' in body && 'integrationName' in body + ) +} + +export const exporter = Sentry.GCPFunction.wrapHttpFunction( + async (req, res) => { + const JWT_SECRET = process.env.JWT_SECRET + const REST_BACKEND_ENDPOINT = process.env.REST_BACKEND_ENDPOINT + + if (!JWT_SECRET || !REST_BACKEND_ENDPOINT) { + return res.status(500).send('Environment not configured correctly') + } + + const token = (req.query.token || req.headers.authorization) as string + if (!token) { + return res.status(401).send({ errorCode: 'INVALID_TOKEN' }) + } + + let claims: Claims + try { + claims = jwt.verify(token, JWT_SECRET) as Claims + } catch (e) { + console.error(e) + return res.status(401).send('UNAUTHORIZED') + } + + try { + if (!isExportRequest(req.body)) { + console.error('Invalid message') + return res.status(200).send('Bad Request') + } + + const { integrationId, syncAt, integrationName } = req.body + const client = getIntegrationClient(integrationName) + + // get paginated items from the backend + let hasMore = true + let after = '0' + while (hasMore) { + const response = await search( + REST_BACKEND_ENDPOINT, + claims.token, + client.highlightOnly, + new Date(syncAt), + '50', + after + ) + + if (!response) { + console.error('failed to search for items', { + integrationId, + }) + return res.status(400).send('Failed to search') + } + + hasMore = response.data.search.pageInfo.hasNextPage + after = response.data.search.pageInfo.endCursor + const items = response.data.search.edges.map((edge) => edge.node) + if (items.length === 0) { + break + } + + const synced = await client.export(claims.token, items) + if (!synced) { + console.error('failed to export item', { + integrationId, + }) + return res.status(400).send('Failed to sync') + } + + // update integration syncedAt if successful + const updated = await updateIntegration( + REST_BACKEND_ENDPOINT, + integrationId, + items[items.length - 1].updatedAt, + integrationName, + claims.token, + token + ) + + if (!updated) { + console.error('failed to update integration', { + integrationId, + }) + return res.status(400).send('Failed to update integration') + } + + // avoid rate limiting + await wait(500) + } + } catch (err) { + console.error('export with integration failed', err) + return res.status(500).send(err) + } + + res.sendStatus(200) + } +) diff --git a/packages/integration-handler/src/integrations/index.ts b/packages/integration-handler/src/integrations/index.ts new file mode 100644 index 000000000..f3aa66913 --- /dev/null +++ b/packages/integration-handler/src/integrations/index.ts @@ -0,0 +1,82 @@ +import axios from 'axios' +import { IntegrationClient } from './integration' +import { PocketClient } from './pocket' +import { ReadwiseClient } from './readwise' + +interface SetIntegrationResponse { + data: { + setIntegration: { + integration: { + id: string + } + errorCodes: string[] + } + } +} + +const clients: IntegrationClient[] = [new ReadwiseClient(), new PocketClient()] + +export const getIntegrationClient = (name: string): IntegrationClient => { + const client = clients.find((s) => s.name === name) + if (!client) { + throw new Error(`Integration client not found: ${name}`) + } + return client +} + +export const updateIntegration = async ( + apiEndpoint: string, + id: string, + syncedAt: Date, + name: string, + integrationToken: string, + token: string +): Promise => { + const requestData = JSON.stringify({ + query: ` + mutation SetIntegration($input: SetIntegrationInput!) { + setIntegration(input: $input) { + ... on SetIntegrationSuccess { + integration { + id + enabled + } + } + ... on SetIntegrationError { + errorCodes + } + } + }`, + variables: { + id, + syncedAt, + name, + token: integrationToken, + enabled: true, + }, + }) + + try { + const response = await axios.post( + `${apiEndpoint}/graphql`, + requestData, + { + headers: { + Cookie: `auth=${token};`, + 'Content-Type': 'application/json', + 'X-OmnivoreClient': 'integration-handler', + }, + } + ) + + if (response.data.data.setIntegration.errorCodes) { + console.error(response.data.data.setIntegration.errorCodes) + return false + } + + return true + } catch (error) { + console.error(error) + return false + } +} diff --git a/packages/integration-handler/src/integrations/integration.ts b/packages/integration-handler/src/integrations/integration.ts new file mode 100644 index 000000000..78b035db9 --- /dev/null +++ b/packages/integration-handler/src/integrations/integration.ts @@ -0,0 +1,33 @@ +import { Item } from '../item' + +export interface RetrievedData { + url: string + labels?: string[] + state?: string +} +export interface RetrievedResult { + data: RetrievedData[] + hasMore?: boolean + since?: number // unix timestamp in milliseconds +} + +export interface RetrieveRequest { + token: string + since?: number // unix timestamp in milliseconds + count?: number + offset?: number +} + +export abstract class IntegrationClient { + abstract name: string + abstract apiUrl: string + highlightOnly = true + + export = async (token: string, items: Item[]): Promise => { + return Promise.resolve(false) + } + + retrieve = async (req: RetrieveRequest): Promise => { + return Promise.resolve({ data: [] }) + } +} diff --git a/packages/integration-handler/src/integrations/pocket.ts b/packages/integration-handler/src/integrations/pocket.ts new file mode 100644 index 000000000..fa4003c4d --- /dev/null +++ b/packages/integration-handler/src/integrations/pocket.ts @@ -0,0 +1,132 @@ +import axios from 'axios' +import { + IntegrationClient, + RetrievedResult, + RetrieveRequest, +} from './integration' + +interface PocketResponse { + status: number // 1 if success + complete: number // 1 if all items have been returned + list: { + [key: string]: PocketItem + } + since: number // unix timestamp in seconds + search_meta: { + search_type: string + } + error: string +} + +interface PocketItem { + item_id: string + resolved_id: string + given_url: string + resolved_url: string + given_title: string + resolved_title: string + favorite: string + status: string + excerpt: string + word_count: string + tags?: { + [key: string]: Tag + } + authors?: { + [key: string]: Author + } +} + +interface Tag { + item_id: string + tag: string +} + +interface Author { + item_id: string + author_id: string + name: string +} + +export class PocketClient extends IntegrationClient { + name = 'POCKET' + apiUrl = 'https://getpocket.com/v3' + headers = { + 'Content-Type': 'application/json', + 'X-Accept': 'application/json', + } + + retrievePocketData = async ( + accessToken: string, + since: number, // unix timestamp in seconds + count = 100, + offset = 0 + ): Promise => { + const url = `${this.apiUrl}/get` + try { + const response = await axios.post( + url, + { + consumer_key: process.env.POCKET_CONSUMER_KEY, + access_token: accessToken, + state: 'all', + detailType: 'complete', + since, + sort: 'oldest', + count, + offset, + }, + { + headers: this.headers, + timeout: 10000, // 10 seconds + } + ) + + return response.data + } catch (error) { + console.error('error retrievePocketData: ', error) + + return null + } + } + + retrieve = async ({ + token, + since = 0, + count = 100, + offset = 0, + }: RetrieveRequest): Promise => { + const pocketData = await this.retrievePocketData( + token, + since / 1000, + count, + offset + ) + if (!pocketData) { + throw new Error('Error retrieving pocket data') + } + + const pocketItems = Object.values(pocketData.list) + const statusToState: Record = { + '0': 'SUCCEEDED', + '1': 'ARCHIVED', + '2': 'DELETED', + } + const data = pocketItems.map((item) => ({ + url: item.given_url, + labels: item.tags + ? Object.values(item.tags).map((tag) => tag.tag) + : undefined, + state: statusToState[item.status], + })) + + if (pocketData.error) { + throw new Error(`Error retrieving pocket data: ${pocketData.error}`) + } + + return { + data, + since: pocketData.since * 1000, + } + } +} diff --git a/packages/integration-handler/src/integrations/readwise.ts b/packages/integration-handler/src/integrations/readwise.ts new file mode 100644 index 000000000..95bd3503d --- /dev/null +++ b/packages/integration-handler/src/integrations/readwise.ts @@ -0,0 +1,114 @@ +import axios from 'axios' +import { wait } from '..' +import { highlightUrl, Item } from '../item' +import { IntegrationClient } from './integration' + +interface ReadwiseHighlight { + // The highlight text, (technically the only field required in a highlight object) + text: string + // The title of the page the highlight is on + title?: string + // The author of the page the highlight is on + author?: string + // The URL of the page image + image_url?: string + // The URL of the page + source_url?: string + // A meaningful unique identifier for your app + source_type?: string + // One of: books, articles, tweets or podcasts + category?: string + // Annotation note attached to the specific highlight + note?: string + // Highlight's location in the source text. Used to order the highlights + location?: number + // One of: page, order or time_offset + location_type?: string + // A datetime representing when the highlight was taken in the ISO 8601 format + highlighted_at?: string + // Unique url of the specific highlight + highlight_url?: string +} + +export class ReadwiseClient extends IntegrationClient { + name = 'READWISE' + apiUrl = 'https://readwise.io/api/v2' + + export = async (token: string, items: Item[]): Promise => { + let result = true + + const highlights = items.flatMap(this.itemToReadwiseHighlight) + + // If there are no highlights, we will skip the sync + if (highlights.length > 0) { + result = await this.syncWithReadwise(token, highlights) + } + + return result + } + + itemToReadwiseHighlight = (item: Item): ReadwiseHighlight[] => { + const category = item.siteName === 'Twitter' ? 'tweets' : 'articles' + return item.highlights + .map((highlight) => { + // filter out highlights that are not of type highlight or have no quote + if (highlight.type !== 'HIGHLIGHT' || !highlight.quote) { + return undefined + } + + return { + text: highlight.quote, + title: item.title, + author: item.author || undefined, + highlight_url: highlightUrl(item.slug, highlight.id), + highlighted_at: new Date(highlight.createdAt).toISOString(), + category, + image_url: item.image || undefined, + location_type: 'order', + note: highlight.annotation || undefined, + source_type: 'omnivore', + source_url: item.url, + } + }) + .filter((highlight) => highlight !== undefined) as ReadwiseHighlight[] + } + + syncWithReadwise = async ( + token: string, + highlights: ReadwiseHighlight[], + retryCount = 0 + ): Promise => { + const url = `${this.apiUrl}/highlights` + try { + const response = await axios.post( + url, + { + highlights, + }, + { + headers: { + Authorization: `Token ${token}`, + 'Content-Type': 'application/json', + }, + timeout: 5000, // 5 seconds + } + ) + return response.status === 200 + } catch (error) { + console.error(error) + + if (axios.isAxiosError(error)) { + if (error.response?.status === 429 && retryCount < 3) { + console.log('Readwise API rate limit exceeded, retrying...') + // wait for Retry-After seconds in the header if rate limited + // max retry count is 3 + const retryAfter = error.response?.headers['retry-after'] || '10' // default to 10 seconds + await wait(parseInt(retryAfter, 10) * 1000) + return this.syncWithReadwise(token, highlights, retryCount + 1) + } + } + + return false + } + } +} diff --git a/packages/integration-handler/src/item.ts b/packages/integration-handler/src/item.ts new file mode 100644 index 000000000..6d09b4451 --- /dev/null +++ b/packages/integration-handler/src/item.ts @@ -0,0 +1,114 @@ +import axios from 'axios' + +interface SearchResponse { + data: { + search: { + edges: Edge[] + pageInfo: { + hasNextPage: boolean + endCursor: string + } + } + } + errors?: { + message: string + }[] +} + +interface Edge { + node: Item +} + +export interface Item { + id: string + title: string + image: string | null + author: string | null + siteName: string | null + highlights: Highlight[] + slug: string + url: string + updatedAt: Date +} + +interface Highlight { + id: string + quote: string + annotation: string | null + type: string + createdAt: string +} + +export const search = async ( + apiEndpoint: string, + token: string, + highlightOnly: boolean, + updatedSince: Date, + first = '50', + after = '0' +): Promise => { + const query = `updated:${updatedSince.toISOString()} ${ + highlightOnly ? 'has:highlights' : '' + } sort:updated-asc` + + const requestData = JSON.stringify({ + query: `query Search($query: String) { + search(query: $query) { + ... on SearchSuccess { + edges { + node { + id + slug + labels { + id + } + isArchived + readingProgressPercent + title + image + author + siteName + highlights { + id + quote + annotation + type + createdAt + } + } + } + } + ... on SearchError { + errorCodes + } + } + }`, + variables: { + query, + first, + after, + }, + }) + + try { + const response = await axios.post( + `${apiEndpoint}/graphql`, + requestData, + { + headers: { + Cookie: `auth=${token};`, + 'Content-Type': 'application/json', + 'X-OmnivoreClient': 'integration-handler', + }, + } + ) + + return response.data + } catch (error) { + console.error(error) + return null + } +} + +export const highlightUrl = (slug: string, highlightId: string): string => + `https://omnivore.app/me/${slug}#${highlightId}` diff --git a/packages/integration-handler/test/babel-register.js b/packages/integration-handler/test/babel-register.js new file mode 100644 index 000000000..9e872798e --- /dev/null +++ b/packages/integration-handler/test/babel-register.js @@ -0,0 +1,3 @@ +const register = require('@babel/register').default; + +register({ extensions: ['.ts', '.tsx', '.js', '.jsx'] }); \ No newline at end of file diff --git a/packages/integration-handler/test/stub.test.ts b/packages/integration-handler/test/stub.test.ts new file mode 100644 index 000000000..24ad25c8f --- /dev/null +++ b/packages/integration-handler/test/stub.test.ts @@ -0,0 +1,8 @@ +import 'mocha' +import { expect } from 'chai' + +describe('stub test', () => { + it('should pass', () => { + expect(true).to.be.true + }) +}) diff --git a/packages/integration-handler/tsconfig.json b/packages/integration-handler/tsconfig.json new file mode 100644 index 000000000..ea2e20c34 --- /dev/null +++ b/packages/integration-handler/tsconfig.json @@ -0,0 +1,11 @@ +{ + "extends": "./../../tsconfig.json", + "ts-node": { "files": true }, + "compilerOptions": { + "outDir": "build", + "rootDir": ".", + // Generate d.ts files + "declaration": true + }, + "include": ["src/**/*", "test/**/*"] +} From b8b20905e46eaf78e89cda9254a600c5e7d418cc Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Thu, 26 Oct 2023 18:42:34 +0800 Subject: [PATCH 03/16] update eslint --- packages/integration-handler/.eslintignore | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/integration-handler/.eslintignore b/packages/integration-handler/.eslintignore index b741470fc..b94707787 100644 --- a/packages/integration-handler/.eslintignore +++ b/packages/integration-handler/.eslintignore @@ -1,4 +1,2 @@ node_modules/ dist/ -readabilityjs/ -src/generated/ From 58d5eb6bc039ef64a6aebfb28bec6bf73c1697a8 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Thu, 26 Oct 2023 22:50:58 +0800 Subject: [PATCH 04/16] create an API to create a cloud task for each active integration when the cronjob triggers --- packages/api/src/routers/auth/jwt_helpers.ts | 11 ++++ packages/api/src/routers/svc/integrations.ts | 62 +++++++++++++++++++- packages/api/src/util.ts | 6 ++ packages/api/src/utils/createTask.ts | 51 ++++++++++++++++ 4 files changed, 129 insertions(+), 1 deletion(-) diff --git a/packages/api/src/routers/auth/jwt_helpers.ts b/packages/api/src/routers/auth/jwt_helpers.ts index 11f65f327..f43c155af 100644 --- a/packages/api/src/routers/auth/jwt_helpers.ts +++ b/packages/api/src/routers/auth/jwt_helpers.ts @@ -85,3 +85,14 @@ export function suggestedUsername(name: string): string { const suffix = Math.floor(Math.random() * 10000) return `${prefix}${suffix}` } + +export async function createWebAuthTokenWithPayload( + payload: Record +): Promise { + try { + const authToken = await signToken(payload, env.server.jwtSecret) + return authToken as string + } catch { + return undefined + } +} diff --git a/packages/api/src/routers/svc/integrations.ts b/packages/api/src/routers/svc/integrations.ts index 1fc201d2d..ae7a2c128 100644 --- a/packages/api/src/routers/svc/integrations.ts +++ b/packages/api/src/routers/svc/integrations.ts @@ -5,9 +5,10 @@ import { stringify } from 'csv-stringify' import express from 'express' import { DateTime } from 'luxon' import { v4 as uuidv4 } from 'uuid' -import { IntegrationType } from '../../entity/integration' +import { Integration, IntegrationType } from '../../entity/integration' import { LibraryItem } from '../../entity/library_item' import { EntityType, readPushSubscription } from '../../pubsub' +import { getRepository } from '../../repository' import { Claims } from '../../resolvers/types' import { findIntegration, @@ -19,9 +20,11 @@ import { searchLibraryItems, } from '../../services/library_item' import { getClaimsByToken } from '../../utils/auth' +import { enqueueExportToIntegration } from '../../utils/createTask' import { logger } from '../../utils/logger' import { DateFilter } from '../../utils/search' import { createGCSFile } from '../../utils/uploads' +import { createWebAuthTokenWithPayload } from '../auth/jwt_helpers' export interface Message { type?: EntityType @@ -41,6 +44,63 @@ const isImportEvent = (event: any): event is ImportEvent => export function integrationsServiceRouter() { const router = express.Router() + router.post('/export', async (req, res) => { + logger.info('start to sync with integration') + + try { + const { message: msgStr, expired } = readPushSubscription(req) + if (!msgStr) { + return res.status(200).send('Bad Request') + } + + if (expired) { + logger.info('discarding expired message') + return res.status(200).send('Expired') + } + + // find all active integrations + const integrations = await getRepository(Integration).find({ + where: { + enabled: true, + type: IntegrationType.Export, + }, + relations: ['user'], + }) + + // create a task to sync with each integration + await Promise.all( + integrations.map(async (integration) => { + const exp = Math.floor(Date.now() / 1000) + 60 * 60 * 24 // 1 day + const authToken = await createWebAuthTokenWithPayload({ + uid: integration.user.id, + exp, + token: integration.token, + }) + + if (!authToken) { + logger.error('failed to create auth token', { + integrationId: integration.id, + }) + return + } + + const syncAt = integration.syncedAt?.getTime() || 0 + return enqueueExportToIntegration( + integration.id, + integration.name, + syncAt, + authToken + ) + }) + ) + } catch (err) { + logger.error('sync with integrations failed', err) + return res.status(500).send(err) + } + + res.status(200).send('OK') + }) + router.post('/:integrationName/:action', async (req, res) => { logger.info('start to sync with integration', { action: req.params.action, diff --git a/packages/api/src/util.ts b/packages/api/src/util.ts index 0c62cd6e6..71f3978b5 100755 --- a/packages/api/src/util.ts +++ b/packages/api/src/util.ts @@ -70,6 +70,8 @@ interface BackendEnv { recommendationTaskHandlerUrl: string thumbnailTaskHandlerUrl: string rssFeedTaskHandlerUrl: string + integrationExporterUrl: string + integrationImporterUrl: string } fileUpload: { gcsUploadBucket: string @@ -163,6 +165,8 @@ const nullableEnvVars = [ 'SENDGRID_VERIFICATION_TEMPLATE_ID', 'REMINDER_TASK_HANDLER_URL', 'TRUST_PROXY', + 'INTEGRATION_EXPORTER_URL', + 'INTEGRATION_IMPORTER_URL', ] // Allow some vars to be null/empty /* If not in GAE and Prod/QA/Demo env (f.e. on localhost/dev env), allow following env vars to be null */ @@ -253,6 +257,8 @@ export function getEnv(): BackendEnv { recommendationTaskHandlerUrl: parse('RECOMMENDATION_TASK_HANDLER_URL'), thumbnailTaskHandlerUrl: parse('THUMBNAIL_TASK_HANDLER_URL'), rssFeedTaskHandlerUrl: parse('RSS_FEED_TASK_HANDLER_URL'), + integrationExporterUrl: parse('INTEGRATION_EXPORTER_URL'), + integrationImporterUrl: parse('INTEGRATION_IMPORTER_URL'), } const imageProxy = { url: parse('IMAGE_PROXY_URL'), diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index f6f2ac6dc..2e6b95bf1 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -543,6 +543,57 @@ export const enqueueImportFromIntegration = async ( return createdTasks[0].name } +export const enqueueExportToIntegration = async ( + integrationId: string, + integrationName: string, + syncAt: number, // unix timestamp in milliseconds + authToken: string +): Promise => { + const { GOOGLE_CLOUD_PROJECT } = process.env + const payload = { + integrationId, + integrationName, + syncAt, + } + + const headers = { + Cookie: `auth=${authToken}`, + } + // If there is no Google Cloud Project Id exposed, it means that we are in local environment + if (env.dev.isLocal || !GOOGLE_CLOUD_PROJECT) { + if (env.queue.integrationTaskHandlerUrl) { + // Calling the handler function directly. + setTimeout(() => { + axios + .post(`${env.queue.integrationExporterUrl}`, payload, { + headers, + }) + .catch((error) => { + logError(error) + }) + }, 0) + } + return nanoid() + } + + const createdTasks = await createHttpTaskWithToken({ + project: GOOGLE_CLOUD_PROJECT, + payload, + taskHandlerUrl: `${env.queue.integrationExporterUrl}`, + priority: 'low', + requestHeaders: headers, + }) + + if (!createdTasks || !createdTasks[0].name) { + logger.error(`Unable to get the name of the task`, { + payload, + createdTasks, + }) + throw new CreateTaskError(`Unable to get the name of the task`) + } + return createdTasks[0].name +} + export const enqueueThumbnailTask = async ( userId: string, slug: string From 5cc9474b73c8f9dbf64aeaa5c26f4edf8336be61 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Thu, 26 Oct 2023 23:14:48 +0800 Subject: [PATCH 05/16] create an integration handler for importing from pocket --- packages/integration-handler/package.json | 24 +--- packages/integration-handler/src/index.ts | 134 +++++++++++++++++++++- 2 files changed, 134 insertions(+), 24 deletions(-) diff --git a/packages/integration-handler/package.json b/packages/integration-handler/package.json index c25454038..cf8a5fe0b 100644 --- a/packages/integration-handler/package.json +++ b/packages/integration-handler/package.json @@ -13,41 +13,25 @@ "lint": "eslint src --ext ts,js,tsx,jsx", "compile": "tsc", "build": "tsc", - "start_exporter": "functions-framework --target=exporter" + "start_exporter": "functions-framework --target=exporter", + "start_importer": "functions-framework --target=importer" }, "devDependencies": { "@types/chai": "^4.3.4", - "@types/chai-string": "^1.4.2", - "@types/dompurify": "^2.4.0", - "@types/fs-extra": "^11.0.1", - "@types/glob": "^8.0.1", "@types/jsonwebtoken": "^8.5.0", "@types/mocha": "^10.0.1", "@types/node": "^14.11.2", - "@types/unzip-stream": "^0.3.1", - "@types/urlsafe-base64": "^1.0.28", "@types/uuid": "^9.0.0", - "copyfiles": "^2.4.1", "eslint-plugin-prettier": "^4.0.0" }, "dependencies": { - "@fast-csv/parse": "^4.3.6", "@google-cloud/functions-framework": "3.1.2", "@google-cloud/storage": "^7.0.1", - "@google-cloud/tasks": "^4.0.0", - "@omnivore/readability": "1.0.0", "@sentry/serverless": "^7.30.0", - "@types/express": "^4.17.13", "axios": "^1.2.2", - "dompurify": "^2.4.3", - "fs-extra": "^11.1.0", - "glob": "^8.1.0", + "csv-stringify": "^6.4.0", "jsonwebtoken": "^8.5.1", - "linkedom": "^0.14.21", - "nodemon": "^2.0.15", - "redis": "^4.3.1", - "unzip-stream": "^0.3.1", - "urlsafe-base64": "^1.0.0", + "luxon": "^3.2.1", "uuid": "^9.0.0" }, "volta": { diff --git a/packages/integration-handler/src/index.ts b/packages/integration-handler/src/index.ts index 349c7c65d..73a3c23e2 100644 --- a/packages/integration-handler/src/index.ts +++ b/packages/integration-handler/src/index.ts @@ -2,8 +2,12 @@ import * as Sentry from '@sentry/serverless' import * as jwt from 'jsonwebtoken' import { getIntegrationClient, updateIntegration } from './integrations' import { search } from './item' +import { stringify } from 'csv-stringify' +import { DateTime } from 'luxon' +import { v4 as uuidv4 } from 'uuid' +import { File, Storage } from '@google-cloud/storage' -interface ExportRequest { +interface IntegrationRequest { integrationId: string syncAt: number // unix timestamp in milliseconds integrationName: string @@ -19,18 +23,24 @@ Sentry.GCPFunction.init({ tracesSampleRate: 0, }) +const storage = new Storage() + export const wait = (ms: number): Promise => { return new Promise((resolve) => { setTimeout(resolve, ms) }) } -function isExportRequest(body: any): body is ExportRequest { +function isIntegrationRequest(body: any): body is IntegrationRequest { return ( 'integrationId' in body && 'syncAt' in body && 'integrationName' in body ) } +const createGCSFile = (bucket: string, filename: string): File => { + return storage.bucket(bucket).file(filename) +} + export const exporter = Sentry.GCPFunction.wrapHttpFunction( async (req, res) => { const JWT_SECRET = process.env.JWT_SECRET @@ -40,7 +50,8 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction( return res.status(500).send('Environment not configured correctly') } - const token = (req.query.token || req.headers.authorization) as string + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + const token = (req.cookies?.token || req.headers.authorization) as string if (!token) { return res.status(401).send({ errorCode: 'INVALID_TOKEN' }) } @@ -54,7 +65,7 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction( } try { - if (!isExportRequest(req.body)) { + if (!isIntegrationRequest(req.body)) { console.error('Invalid message') return res.status(200).send('Bad Request') } @@ -125,3 +136,118 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction( res.sendStatus(200) } ) + +export const importer = Sentry.GCPFunction.wrapHttpFunction( + async (req, res) => { + const JWT_SECRET = process.env.JWT_SECRET + const REST_BACKEND_ENDPOINT = process.env.REST_BACKEND_ENDPOINT + const GCS_BUCKET = process.env.GCS_BUCKET + + if (!JWT_SECRET || !REST_BACKEND_ENDPOINT || !GCS_BUCKET) { + return res.status(500).send('Environment not configured correctly') + } + + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + const token = (req.cookies?.token || req.headers.authorization) as string + if (!token) { + return res.status(401).send({ errorCode: 'INVALID_TOKEN' }) + } + + let claims: Claims + try { + claims = jwt.verify(token, JWT_SECRET) as Claims + } catch (e) { + console.error(e) + return res.status(401).send('UNAUTHORIZED') + } + + if (!isIntegrationRequest(req.body)) { + console.error('Invalid message') + return res.status(200).send('Bad Request') + } + + let writeStream: NodeJS.WritableStream | undefined + try { + const userId = claims.uid + const integrationClient = getIntegrationClient(req.body.integrationName) + + let offset = 0 + let syncedAt = req.body.syncAt + const since = syncedAt + + // get pages from integration + const retrieved = await integrationClient.retrieve({ + token: claims.token, + since, + offset, + }) + syncedAt = retrieved.since || Date.now() + + let retrievedData = retrieved.data + // if there are pages to import + if (retrievedData.length > 0) { + // write the list of urls to a csv file and upload it to gcs + // path style: imports///-.csv + const dateStr = DateTime.now().toISODate() + const fileUuid = uuidv4() + const fullPath = `imports/${userId}/${dateStr}/${integrationClient.name}-${fileUuid}.csv` + // open a write_stream to the file + const file = createGCSFile(GCS_BUCKET, fullPath) + writeStream = file.createWriteStream({ + contentType: 'text/csv', + }) + // stringify the data and pipe it to the write_stream + const stringifier = stringify({ + header: true, + columns: ['url', 'state', 'labels'], + }) + stringifier.pipe(writeStream) + + // paginate api calls to the integration + do { + // write the list of urls, state and labels to the stream + retrievedData.forEach((row) => stringifier.write(row)) + + // get next pages from the integration + offset += retrievedData.length + + const retrieved = await integrationClient.retrieve({ + token: claims.token, + since, + offset, + }) + syncedAt = retrieved.since || Date.now() + retrievedData = retrieved.data + + console.log('retrieved data', { + total: offset, + size: retrievedData.length, + }) + + // update the integration's syncedAt and remove taskName + const result = await updateIntegration( + REST_BACKEND_ENDPOINT, + req.body.integrationId, + new Date(syncedAt), + req.body.integrationName, + claims.token, + token + ) + if (!result) { + console.error('failed to update integration', { + integrationId: req.body.integrationId, + }) + return res.status(400).send('Failed to update integration') + } + } while (retrievedData.length > 0 && offset < 20000) // limit to 20k pages + } + } catch (err) { + console.error('import pages from integration failed', err) + return res.status(500).send(err) + } finally { + writeStream?.end() + } + + res.sendStatus(200) + } +) From 07eb97e7cc992a1419fee1998971dfa788173b90 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Fri, 27 Oct 2023 12:10:59 +0800 Subject: [PATCH 06/16] remove unused code --- .../api/src/resolvers/integrations/index.ts | 51 ++- packages/api/src/routers/auth/auth_types.ts | 5 + packages/api/src/routers/auth/jwt_helpers.ts | 15 +- packages/api/src/routers/svc/integrations.ts | 295 +----------- .../api/src/services/integrations/index.ts | 16 +- .../src/services/integrations/integration.ts | 21 +- .../api/src/services/integrations/pocket.ts | 10 +- .../api/src/services/integrations/readwise.ts | 123 +---- packages/api/src/utils/createTask.ts | 61 +-- .../api/test/resolvers/integrations.test.ts | 3 +- .../api/test/routers/integrations.test.ts | 423 ------------------ packages/api/tsconfig.json | 6 +- packages/integration-handler/.dockerignore | 5 + packages/integration-handler/.gcloudignore | 16 + .../integration-handler/Dockerfile-exporter | 26 ++ .../integration-handler/Dockerfile-importer | 26 ++ packages/integration-handler/package.json | 1 + packages/integration-handler/src/index.ts | 43 +- .../src/integrations/index.ts | 16 +- packages/integration-handler/src/item.ts | 11 +- 20 files changed, 220 insertions(+), 953 deletions(-) delete mode 100644 packages/api/test/routers/integrations.test.ts create mode 100644 packages/integration-handler/.dockerignore create mode 100644 packages/integration-handler/.gcloudignore create mode 100644 packages/integration-handler/Dockerfile-exporter create mode 100644 packages/integration-handler/Dockerfile-importer diff --git a/packages/api/src/resolvers/integrations/index.ts b/packages/api/src/resolvers/integrations/index.ts index 5f0dce401..82a67e6b5 100644 --- a/packages/api/src/resolvers/integrations/index.ts +++ b/packages/api/src/resolvers/integrations/index.ts @@ -18,10 +18,11 @@ import { SetIntegrationErrorCode, SetIntegrationSuccess, } from '../../generated/graphql' +import { createIntegrationToken } from '../../routers/auth/jwt_helpers' import { findIntegration, findIntegrations, - getIntegrationService, + getIntegrationClient, removeIntegration, saveIntegration, updateIntegration, @@ -29,8 +30,8 @@ import { import { analytics } from '../../utils/analytics' import { deleteTask, + enqueueExportToIntegration, enqueueImportFromIntegration, - enqueueSyncWithIntegration, } from '../../utils/createTask' import { authorized } from '../../utils/helpers' @@ -60,7 +61,7 @@ export const setIntegrationResolver = authorized< integrationToSave.taskName = existingIntegration.taskName } else { // Create - const integrationService = getIntegrationService(input.name) + const integrationService = getIntegrationClient(input.name) // authorize and get access token const token = await integrationService.accessToken(input.token) if (!token) { @@ -74,12 +75,27 @@ export const setIntegrationResolver = authorized< // save integration const integration = await saveIntegration(integrationToSave, uid) - if ( - integrationToSave.type === IntegrationType.Export && - (!integrationToSave.id || integrationToSave.enabled) - ) { + if (integrationToSave.type === IntegrationType.Export && !input.id) { + const authToken = await createIntegrationToken({ + uid, + token: integration.token, + }) + if (!authToken) { + log.error('failed to create auth token', { + integrationId: integration.id, + }) + return { + errorCodes: [SetIntegrationErrorCode.BadRequest], + } + } + // create a task to sync all the pages if new integration or enable integration (export type) - const taskName = await enqueueSyncWithIntegration(uid, input.name) + const taskName = await enqueueExportToIntegration( + integration.id, + integration.name, + 0, + authToken + ) log.info('enqueued task', taskName) // update task name in integration @@ -191,7 +207,7 @@ export const importFromIntegrationResolver = authorized< ImportFromIntegrationSuccess, ImportFromIntegrationError, MutationImportFromIntegrationArgs ->(async (_, { integrationId }, { claims: { uid }, log, signToken }) => { +>(async (_, { integrationId }, { claims: { uid }, log }) => { log.info('importFromIntegrationResolver') try { @@ -203,14 +219,21 @@ export const importFromIntegrationResolver = authorized< } } - const exp = Math.floor(Date.now() / 1000) + 60 * 60 * 24 // 1 day - const authToken = (await signToken( - { uid, exp }, - env.server.jwtSecret - )) as string + const authToken = await createIntegrationToken({ + uid: integration.user.id, + token: integration.token, + }) + if (!authToken) { + return { + errorCodes: [ImportFromIntegrationErrorCode.BadRequest], + } + } + // create a task to import all the pages const taskName = await enqueueImportFromIntegration( integration.id, + integration.name, + integration.syncedAt?.getTime() || 0, authToken ) // update task name in integration diff --git a/packages/api/src/routers/auth/auth_types.ts b/packages/api/src/routers/auth/auth_types.ts index b9e07e64c..56e0139af 100644 --- a/packages/api/src/routers/auth/auth_types.ts +++ b/packages/api/src/routers/auth/auth_types.ts @@ -40,3 +40,8 @@ export function isPendingUserTokenPayload( 'username' in object ) } + +export type IntegrationTokenPayload = { + uid: string + token: string +} diff --git a/packages/api/src/routers/auth/jwt_helpers.ts b/packages/api/src/routers/auth/jwt_helpers.ts index f43c155af..941982f54 100644 --- a/packages/api/src/routers/auth/jwt_helpers.ts +++ b/packages/api/src/routers/auth/jwt_helpers.ts @@ -4,6 +4,7 @@ import { promisify } from 'util' import { env } from '../../env' import { logger } from '../../utils/logger' import { + IntegrationTokenPayload, isPendingUserTokenPayload, PendingUserTokenPayload, } from './auth_types' @@ -86,11 +87,19 @@ export function suggestedUsername(name: string): string { return `${prefix}${suffix}` } -export async function createWebAuthTokenWithPayload( - payload: Record +export async function createIntegrationToken( + payload: IntegrationTokenPayload ): Promise { try { - const authToken = await signToken(payload, env.server.jwtSecret) + const exp = Math.floor(Date.now() / 1000) + 60 * 60 * 24 // 1 day + const authToken = await signToken( + { + ...payload, + exp, + }, + env.server.jwtSecret + ) + logger.info('createIntegrationToken', payload) return authToken as string } catch { return undefined diff --git a/packages/api/src/routers/svc/integrations.ts b/packages/api/src/routers/svc/integrations.ts index ae7a2c128..68db6b846 100644 --- a/packages/api/src/routers/svc/integrations.ts +++ b/packages/api/src/routers/svc/integrations.ts @@ -1,45 +1,13 @@ /* eslint-disable @typescript-eslint/no-misused-promises */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ -import { stringify } from 'csv-stringify' import express from 'express' -import { DateTime } from 'luxon' -import { v4 as uuidv4 } from 'uuid' import { Integration, IntegrationType } from '../../entity/integration' -import { LibraryItem } from '../../entity/library_item' -import { EntityType, readPushSubscription } from '../../pubsub' +import { readPushSubscription } from '../../pubsub' import { getRepository } from '../../repository' -import { Claims } from '../../resolvers/types' -import { - findIntegration, - getIntegrationService, - updateIntegration, -} from '../../services/integrations' -import { - findLibraryItemById, - searchLibraryItems, -} from '../../services/library_item' -import { getClaimsByToken } from '../../utils/auth' import { enqueueExportToIntegration } from '../../utils/createTask' import { logger } from '../../utils/logger' -import { DateFilter } from '../../utils/search' -import { createGCSFile } from '../../utils/uploads' -import { createWebAuthTokenWithPayload } from '../auth/jwt_helpers' - -export interface Message { - type?: EntityType - id?: string - userId: string - pageId?: string - articleId?: string -} - -interface ImportEvent { - integrationId: string -} - -const isImportEvent = (event: any): event is ImportEvent => - 'integrationId' in event +import { createIntegrationToken } from '../auth/jwt_helpers' export function integrationsServiceRouter() { const router = express.Router() @@ -70,10 +38,8 @@ export function integrationsServiceRouter() { // create a task to sync with each integration await Promise.all( integrations.map(async (integration) => { - const exp = Math.floor(Date.now() / 1000) + 60 * 60 * 24 // 1 day - const authToken = await createWebAuthTokenWithPayload({ + const authToken = await createIntegrationToken({ uid: integration.user.id, - exp, token: integration.token, }) @@ -101,260 +67,5 @@ export function integrationsServiceRouter() { res.status(200).send('OK') }) - router.post('/:integrationName/:action', async (req, res) => { - logger.info('start to sync with integration', { - action: req.params.action, - integrationName: req.params.integrationName, - }) - - try { - const { message: msgStr, expired } = readPushSubscription(req) - - if (!msgStr) { - return res.status(200).send('Bad Request') - } - - if (expired) { - logger.info('discarding expired message') - return res.status(200).send('Expired') - } - - const data: Message = JSON.parse(msgStr) - const userId = data.userId - const type = data.type - if (!userId) { - logger.info('No userId found in message') - res.status(200).send('Bad Request') - return - } - - const integration = await findIntegration( - { - name: req.params.integrationName.toUpperCase(), - type: IntegrationType.Export, - enabled: true, - }, - userId - ) - if (!integration) { - logger.info('No active integration found for user', { userId }) - res.status(200).send('No integration found') - return - } - - const action = req.params.action.toUpperCase() - const integrationService = getIntegrationService(integration.name) - if (action === 'SYNC_UPDATED') { - // get updated page by id - let id: string | undefined - switch (type) { - case EntityType.PAGE: - id = data.id - break - case EntityType.HIGHLIGHT: - id = data.articleId - break - case EntityType.LABEL: - id = data.pageId - break - } - if (!id) { - logger.info('No id found in message') - res.status(200).send('Bad Request') - return - } - const item = await findLibraryItemById(id, userId) - if (!item) { - logger.info('No item found for id', { id }) - res.status(200).send('No page found') - return - } - - // sync updated item with integration - logger.info('syncing updated item with integration', { - integrationId: integration.id, - itemId: item.id, - }) - - const synced = await integrationService.export(integration, [item]) - if (!synced) { - logger.info('failed to sync item', { - integrationId: integration.id, - itemId: item.id, - }) - return res.status(400).send('Failed to sync') - } - } else if (action === 'SYNC_ALL') { - // sync all pages of the user - const size = 50 - - for ( - let hasNextPage = true, - count = 0, - after = 0, - items: LibraryItem[] = []; - hasNextPage; - after += size, hasNextPage = count > after - ) { - const syncedAt = integration.syncedAt - // only sync pages that were updated after syncedAt - const dateFilters: DateFilter[] = [] - syncedAt && - dateFilters.push({ field: 'updatedAt', startDate: syncedAt }) - const { libraryItems } = await searchLibraryItems( - { from: after, size, dateFilters }, - userId - ) - items = libraryItems - const itemIds = items.map((p) => p.id) - - logger.info('syncing items', { pageIds: itemIds }) - - const synced = await integrationService.export(integration, items) - if (!synced) { - logger.error('failed to sync items', { - pageIds: itemIds, - integrationId: integration.id, - }) - return res.status(400).send('Failed to sync') - } - } - // delete task name if completed - await updateIntegration( - integration.id, - { - taskName: null, - }, - userId - ) - } else { - logger.info('unknown action', { action }) - res.status(200).send('Unknown action') - return - } - } catch (err) { - logger.error('sync with integrations failed', err) - return res.status(500).send(err) - } - - res.status(200).send('OK') - }) - - // import pages from integration task handler - router.post('/import', async (req, res) => { - logger.info('start cloud task to import pages from integration') - const token = req.cookies?.auth || req.headers?.authorization - let claims: Claims | undefined - try { - claims = await getClaimsByToken(token) - if (!claims) { - return res.status(401).send('UNAUTHORIZED') - } - } catch (err) { - logger.error('failed to get claims from token', err) - return res.status(401).send('UNAUTHORIZED') - } - - if (!isImportEvent(req.body)) { - logger.info('Invalid message') - return res.status(400).send('Bad Request') - } - - let writeStream: NodeJS.WritableStream | undefined - try { - const userId = claims.uid - const integration = await findIntegration( - { - id: req.body.integrationId, - enabled: true, - type: IntegrationType.Import, - }, - userId - ) - if (!integration) { - logger.info('No active integration found for user', { userId }) - return res.status(200).send('No integration found') - } - - const integrationService = getIntegrationService(integration.name) - // import pages from integration - logger.info('importing pages from integration', { - integrationId: integration.id, - }) - - let offset = 0 - const since = integration.syncedAt?.getTime() || 0 - let syncedAt = since - - // get pages from integration - const retrieved = await integrationService.retrieve({ - token: integration.token, - since, - offset, - }) - syncedAt = retrieved.since || Date.now() - - let retrievedData = retrieved.data - // if there are pages to import - if (retrievedData.length > 0) { - // write the list of urls to a csv file and upload it to gcs - // path style: imports///-.csv - const dateStr = DateTime.now().toISODate() - const fileUuid = uuidv4() - const fullPath = `imports/${userId}/${dateStr}/${integrationService.name}-${fileUuid}.csv` - // open a write_stream to the file - const file = createGCSFile(fullPath) - writeStream = file.createWriteStream({ - contentType: 'text/csv', - }) - // stringify the data and pipe it to the write_stream - const stringifier = stringify({ - header: true, - columns: ['url', 'state', 'labels'], - }) - stringifier.pipe(writeStream) - - // paginate api calls to the integration - do { - // write the list of urls, state and labels to the stream - retrievedData.forEach((row) => stringifier.write(row)) - - // get next pages from the integration - offset += retrievedData.length - - const retrieved = await integrationService.retrieve({ - token: integration.token, - since, - offset, - }) - syncedAt = retrieved.since || Date.now() - retrievedData = retrieved.data - - logger.info('retrieved data', { - total: offset, - size: retrievedData.length, - }) - } while (retrievedData.length > 0 && offset < 20000) // limit to 20k pages - } - - // update the integration's syncedAt and remove taskName - await updateIntegration( - integration.id, - { - syncedAt: new Date(syncedAt), - taskName: null, - }, - userId - ) - } catch (err) { - logger.error('import pages from integration failed', err) - return res.status(500).send(err) - } finally { - writeStream?.end() - } - - res.status(200).send('OK') - }) - return router } diff --git a/packages/api/src/services/integrations/index.ts b/packages/api/src/services/integrations/index.ts index 5927af9ac..286ac59e7 100644 --- a/packages/api/src/services/integrations/index.ts +++ b/packages/api/src/services/integrations/index.ts @@ -1,19 +1,19 @@ import { DeepPartial, FindOptionsWhere } from 'typeorm' import { Integration } from '../../entity/integration' import { authTrx } from '../../repository' -import { IntegrationService } from './integration' -import { PocketIntegration } from './pocket' -import { ReadwiseIntegration } from './readwise' +import { IntegrationClient } from './integration' +import { PocketClient } from './pocket' +import { ReadwiseClient } from './readwise' -const integrations: IntegrationService[] = [ - new ReadwiseIntegration(), - new PocketIntegration(), +const integrations: IntegrationClient[] = [ + new ReadwiseClient(), + new PocketClient(), ] -export const getIntegrationService = (name: string): IntegrationService => { +export const getIntegrationClient = (name: string): IntegrationClient => { const service = integrations.find((s) => s.name === name) if (!service) { - throw new Error(`Integration service not found: ${name}`) + throw new Error(`Integration client not found: ${name}`) } return service } diff --git a/packages/api/src/services/integrations/integration.ts b/packages/api/src/services/integrations/integration.ts index 7c9454543..6aa51a49f 100644 --- a/packages/api/src/services/integrations/integration.ts +++ b/packages/api/src/services/integrations/integration.ts @@ -1,5 +1,4 @@ -import { Integration } from '../../entity/integration' -import { LibraryItem, LibraryItemState } from '../../entity/library_item' +import { LibraryItemState } from '../../entity/library_item' export interface RetrievedData { url: string @@ -19,19 +18,9 @@ export interface RetrieveRequest { offset?: number } -export abstract class IntegrationService { - abstract name: string +export interface IntegrationClient { + name: string + apiUrl: string - accessToken = async (token: string): Promise => { - return Promise.resolve(null) - } - export = async ( - integration: Integration, - items: LibraryItem[] - ): Promise => { - return Promise.resolve(false) - } - retrieve = async (req: RetrieveRequest): Promise => { - return Promise.resolve({ data: [] }) - } + accessToken(token: string): Promise } diff --git a/packages/api/src/services/integrations/pocket.ts b/packages/api/src/services/integrations/pocket.ts index 0a57bb9a6..86a99f8ef 100644 --- a/packages/api/src/services/integrations/pocket.ts +++ b/packages/api/src/services/integrations/pocket.ts @@ -3,7 +3,7 @@ import { LibraryItemState } from '../../entity/library_item' import { env } from '../../env' import { logger } from '../../utils/logger' import { - IntegrationService, + IntegrationClient, RetrievedResult, RetrieveRequest, } from './integration' @@ -51,16 +51,16 @@ interface Author { name: string } -export class PocketIntegration extends IntegrationService { +export class PocketClient implements IntegrationClient { name = 'POCKET' - POCKET_API_URL = 'https://getpocket.com/v3' + apiUrl = 'https://getpocket.com/v3' headers = { 'Content-Type': 'application/json', 'X-Accept': 'application/json', } accessToken = async (token: string): Promise => { - const url = `${this.POCKET_API_URL}/oauth/authorize` + const url = `${this.apiUrl}/oauth/authorize` try { const response = await axios.post<{ access_token: string }>( url, @@ -90,7 +90,7 @@ export class PocketIntegration extends IntegrationService { count = 100, offset = 0 ): Promise => { - const url = `${this.POCKET_API_URL}/get` + const url = `${this.apiUrl}/get` try { const response = await axios.post( url, diff --git a/packages/api/src/services/integrations/readwise.ts b/packages/api/src/services/integrations/readwise.ts index cfbbf421e..42f50f744 100644 --- a/packages/api/src/services/integrations/readwise.ts +++ b/packages/api/src/services/integrations/readwise.ts @@ -1,13 +1,6 @@ import axios from 'axios' -import { updateIntegration } from '.' -import { HighlightType } from '../../entity/highlight' -import { Integration } from '../../entity/integration' -import { LibraryItem } from '../../entity/library_item' -import { env } from '../../env' -import { wait } from '../../utils/helpers' import { logger } from '../../utils/logger' -import { findHighlightsByLibraryItemId, getHighlightUrl } from '../highlights' -import { IntegrationService } from './integration' +import { IntegrationClient } from './integration' interface ReadwiseHighlight { // The highlight text, (technically the only field required in a highlight object) @@ -36,12 +29,12 @@ interface ReadwiseHighlight { highlight_url?: string } -export const READWISE_API_URL = 'https://readwise.io/api/v2' - -export class ReadwiseIntegration extends IntegrationService { +export class ReadwiseClient implements IntegrationClient { name = 'READWISE' + apiUrl = 'https://readwise.io/api/v2' + accessToken = async (token: string): Promise => { - const authUrl = `${env.readwise.apiUrl || READWISE_API_URL}/auth` + const authUrl = `${this.apiUrl}/auth` try { const response = await axios.get(authUrl, { headers: { @@ -58,110 +51,4 @@ export class ReadwiseIntegration extends IntegrationService { return null } } - export = async ( - integration: Integration, - items: LibraryItem[] - ): Promise => { - let result = true - - const highlights = await Promise.all( - items.map((item) => - this.libraryItemToReadwiseHighlight(item, integration.user.id) - ) - ) - // If there are no highlights, we will skip the sync - if (highlights.length > 0) { - result = await this.syncWithReadwise(integration.token, highlights.flat()) - } - - // update integration syncedAt if successful - if (result) { - logger.info('updating integration syncedAt') - await updateIntegration( - integration.id, - { - syncedAt: new Date(), - }, - integration.user.id - ) - } - return result - } - - libraryItemToReadwiseHighlight = async ( - item: LibraryItem, - userId: string - ): Promise => { - let highlights = item.highlights - if (!highlights) { - highlights = await findHighlightsByLibraryItemId(item.id, userId) - } - - const category = item.siteName === 'Twitter' ? 'tweets' : 'articles' - return highlights - .map((highlight) => { - // filter out highlights that are not of type highlight or have no quote - if ( - highlight.highlightType !== HighlightType.Highlight || - !highlight.quote - ) { - return undefined - } - - return { - text: highlight.quote, - title: item.title, - author: item.author || undefined, - highlight_url: getHighlightUrl(item.slug, highlight.id), - highlighted_at: new Date(highlight.createdAt).toISOString(), - category, - image_url: item.thumbnail || undefined, - // location: highlight.highlightPositionAnchorIndex || undefined, - location_type: 'order', - note: highlight.annotation || undefined, - source_type: 'omnivore', - source_url: item.originalUrl, - } - }) - .filter((highlight) => highlight !== undefined) as ReadwiseHighlight[] - } - - syncWithReadwise = async ( - token: string, - highlights: ReadwiseHighlight[], - retryCount = 0 - ): Promise => { - const url = `${env.readwise.apiUrl || READWISE_API_URL}/highlights` - try { - const response = await axios.post( - url, - { - highlights, - }, - { - headers: { - Authorization: `Token ${token}`, - 'Content-Type': 'application/json', - }, - timeout: 5000, // 5 seconds - } - ) - return response.status === 200 - } catch (error) { - logger.error(error) - - if (axios.isAxiosError(error)) { - if (error.response?.status === 429 && retryCount < 3) { - logger.info('Readwise API rate limit exceeded, retrying...') - // wait for Retry-After seconds in the header if rate limited - // max retry count is 3 - const retryAfter = error.response?.headers['retry-after'] || '10' // default to 10 seconds - await wait(parseInt(retryAfter, 10) * 1000) - return this.syncWithReadwise(token, highlights, retryCount + 1) - } - } - - return false - } - } } diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index 2e6b95bf1..be71a64a3 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -328,47 +328,6 @@ export const enqueueReminder = async ( return createdTasks[0].name } -export const enqueueSyncWithIntegration = async ( - userId: string, - integrationName: string -): Promise => { - const { GOOGLE_CLOUD_PROJECT, PUBSUB_VERIFICATION_TOKEN } = process.env - // use pubsub data format to send the userId to the task handler - const payload = { - message: { - data: Buffer.from( - JSON.stringify({ - userId, - }) - ).toString('base64'), - publishTime: new Date().toISOString(), - }, - } - - // If there is no Google Cloud Project Id exposed, it means that we are in local environment - if (env.dev.isLocal || !GOOGLE_CLOUD_PROJECT) { - return nanoid() - } - - const createdTasks = await createHttpTaskWithToken({ - project: GOOGLE_CLOUD_PROJECT, - payload, - taskHandlerUrl: `${ - env.queue.integrationTaskHandlerUrl - }/${integrationName.toLowerCase()}/sync_all?token=${PUBSUB_VERIFICATION_TOKEN}`, - priority: 'low', - }) - - if (!createdTasks || !createdTasks[0].name) { - logger.error(`Unable to get the name of the task`, { - payload, - createdTasks, - }) - throw new CreateTaskError(`Unable to get the name of the task`) - } - return createdTasks[0].name -} - export const enqueueTextToSpeech = async ({ userId, text, @@ -498,23 +457,27 @@ export const enqueueRecommendation = async ( export const enqueueImportFromIntegration = async ( integrationId: string, + integrationName: string, + syncAt: number, // unix timestamp in milliseconds authToken: string ): Promise => { const { GOOGLE_CLOUD_PROJECT } = process.env const payload = { integrationId, + integrationName, + syncAt, } const headers = { - Cookie: `auth=${authToken}`, + [OmnivoreAuthorizationHeader]: authToken, } // If there is no Google Cloud Project Id exposed, it means that we are in local environment if (env.dev.isLocal || !GOOGLE_CLOUD_PROJECT) { - if (env.queue.integrationTaskHandlerUrl) { + if (env.queue.integrationImporterUrl) { // Calling the handler function directly. setTimeout(() => { axios - .post(`${env.queue.integrationTaskHandlerUrl}/import`, payload, { + .post(env.queue.integrationImporterUrl, payload, { headers, }) .catch((error) => { @@ -528,7 +491,7 @@ export const enqueueImportFromIntegration = async ( const createdTasks = await createHttpTaskWithToken({ project: GOOGLE_CLOUD_PROJECT, payload, - taskHandlerUrl: `${env.queue.integrationTaskHandlerUrl}/import`, + taskHandlerUrl: env.queue.integrationImporterUrl, priority: 'low', requestHeaders: headers, }) @@ -557,15 +520,15 @@ export const enqueueExportToIntegration = async ( } const headers = { - Cookie: `auth=${authToken}`, + [OmnivoreAuthorizationHeader]: authToken, } // If there is no Google Cloud Project Id exposed, it means that we are in local environment if (env.dev.isLocal || !GOOGLE_CLOUD_PROJECT) { - if (env.queue.integrationTaskHandlerUrl) { + if (env.queue.integrationExporterUrl) { // Calling the handler function directly. setTimeout(() => { axios - .post(`${env.queue.integrationExporterUrl}`, payload, { + .post(env.queue.integrationExporterUrl, payload, { headers, }) .catch((error) => { @@ -579,7 +542,7 @@ export const enqueueExportToIntegration = async ( const createdTasks = await createHttpTaskWithToken({ project: GOOGLE_CLOUD_PROJECT, payload, - taskHandlerUrl: `${env.queue.integrationExporterUrl}`, + taskHandlerUrl: env.queue.integrationExporterUrl, priority: 'low', requestHeaders: headers, }) diff --git a/packages/api/test/resolvers/integrations.test.ts b/packages/api/test/resolvers/integrations.test.ts index c170dedb4..bb9e56929 100644 --- a/packages/api/test/resolvers/integrations.test.ts +++ b/packages/api/test/resolvers/integrations.test.ts @@ -11,7 +11,6 @@ import { saveIntegration, updateIntegration, } from '../../src/services/integrations' -import { READWISE_API_URL } from '../../src/services/integrations/readwise' import { deleteUser } from '../../src/services/user' import { createTestUser } from '../db' import { generateFakeUuid, graphqlRequest, request } from '../util' @@ -19,6 +18,8 @@ import { generateFakeUuid, graphqlRequest, request } from '../util' chai.use(sinonChai) describe('Integrations resolvers', () => { + const READWISE_API_URL = 'https://readwise.io/api/v2' + let loginUser: User let authToken: string diff --git a/packages/api/test/routers/integrations.test.ts b/packages/api/test/routers/integrations.test.ts deleted file mode 100644 index 9081ef2f3..000000000 --- a/packages/api/test/routers/integrations.test.ts +++ /dev/null @@ -1,423 +0,0 @@ -import { Storage } from '@google-cloud/storage' -import { expect } from 'chai' -import { DateTime } from 'luxon' -import 'mocha' -import nock from 'nock' -import sinon from 'sinon' -import { Highlight } from '../../src/entity/highlight' -import { Integration, IntegrationType } from '../../src/entity/integration' -import { LibraryItem } from '../../src/entity/library_item' -import { User } from '../../src/entity/user' -import { env } from '../../src/env' -import { PubSubRequestBody } from '../../src/pubsub' -import { createHighlight, getHighlightUrl } from '../../src/services/highlights' -import { - deleteIntegrations, - saveIntegration, - updateIntegration, -} from '../../src/services/integrations' -import { READWISE_API_URL } from '../../src/services/integrations/readwise' -import { deleteLibraryItemById } from '../../src/services/library_item' -import { deleteUser } from '../../src/services/user' -import { createTestLibraryItem, createTestUser } from '../db' -import { MockBucket } from '../mock_storage' -import { request } from '../util' - -describe('Integrations routers', () => { - const baseUrl = '/svc/pubsub/integrations' - let token: string - let user: User - let authToken: string - - before(async () => { - user = await createTestUser('fakeUser') - const res = await request - .post('/local/debug/fake-user-login') - .send({ fakeEmail: user.email }) - - const body = res.body as { authToken: string } - authToken = body.authToken - }) - - after(async () => { - await deleteUser(user.id) - }) - - describe('sync with integrations', () => { - const endpoint = (token: string, name = 'name', action = 'action') => - `${baseUrl}/${name}/${action}?token=${token}` - let action: string - let data: PubSubRequestBody - let integrationName: string - - context('when token is invalid', () => { - before(() => { - token = 'invalid-token' - }) - - it('returns 200', async () => { - return request.post(endpoint(token)).send(data).expect(200) - }) - }) - - context('when token is valid', () => { - before(() => { - token = process.env.PUBSUB_VERIFICATION_TOKEN as string - }) - - context('when data is expired', () => { - before(() => { - data = { - message: { - data: Buffer.from( - JSON.stringify({ userId: 'userId', type: 'page' }) - ).toString('base64'), - publishTime: DateTime.now().minus({ hours: 12 }).toISO(), - }, - } - }) - - it('returns 200 with Expired', async () => { - const res = await request.post(endpoint(token)).send(data).expect(200) - expect(res.text).to.eql('Expired') - }) - }) - - context('when userId is empty', () => { - before(() => { - data = { - message: { - data: Buffer.from( - JSON.stringify({ userId: '', type: 'page' }) - ).toString('base64'), - publishTime: new Date().toISOString(), - }, - } - }) - - it('returns 200', async () => { - return request.post(endpoint(token)).send(data).expect(200) - }) - }) - - context('when user exists', () => { - context('when integration not found', () => { - before(() => { - integrationName = 'READWISE' - data = { - message: { - data: Buffer.from( - JSON.stringify({ userId: user.id, type: 'page' }) - ).toString('base64'), - publishTime: new Date().toISOString(), - }, - } - }) - - it('returns 200 with No integration found', async () => { - const res = await request - .post(endpoint(token, integrationName)) - .send(data) - .expect(200) - expect(res.text).to.eql('No integration found') - }) - }) - - context('when integration is readwise and enabled', () => { - let integration: Integration - let item: LibraryItem - let highlight: Highlight - let highlightsData: any - - before(async () => { - integration = await saveIntegration( - { - user, - name: 'READWISE', - token: 'token', - }, - user.id - ) - integrationName = integration.name - // create page - item = await createTestLibraryItem(user.id) - - // create highlight - const highlightPositionPercent = 25 - highlight = await createHighlight( - { - patch: 'test patch', - quote: 'test quote', - shortId: 'test shortId', - highlightPositionPercent, - user, - libraryItem: item, - }, - item.id, - user.id - ) - // create highlights data for integration request - highlightsData = { - highlights: [ - { - text: highlight.quote, - title: item.title, - author: item.author ?? undefined, - highlight_url: getHighlightUrl(item.slug, highlight.id), - highlighted_at: highlight.createdAt.toISOString(), - category: 'articles', - image_url: item.thumbnail ?? undefined, - // location: highlightPositionPercent, - location_type: 'order', - note: highlight.annotation ?? undefined, - source_type: 'omnivore', - source_url: item.originalUrl, - }, - ], - } - }) - - after(async () => { - await deleteIntegrations(user.id, [integration.id]) - await deleteLibraryItemById(item.id) - }) - - context('when action is sync_updated', () => { - before(() => { - action = 'sync_updated' - }) - - context('when entity type is page', () => { - before(() => { - data = { - message: { - data: Buffer.from( - JSON.stringify({ - userId: user.id, - type: 'page', - id: item.id, - }) - ).toString('base64'), - publishTime: new Date().toISOString(), - }, - } - // mock Readwise Highlight API - nock(READWISE_API_URL, { - reqheaders: { - Authorization: `Token ${integration.token}`, - 'Content-Type': 'application/json', - }, - }) - .post('/highlights', highlightsData) - .reply(200) - }) - - it('returns 200 with OK', async () => { - const res = await request - .post(endpoint(token, integrationName, action)) - .send(data) - .expect(200) - expect(res.text).to.eql('OK') - }) - - context('when readwise highlight API reaches rate limits', () => { - before(() => { - // mock Readwise Highlight API with rate limits - // retry after 1 second - nock(READWISE_API_URL, { - reqheaders: { - Authorization: `Token ${integration.token}`, - 'Content-Type': 'application/json', - }, - }) - .post('/highlights') - .reply(429, 'Rate Limited', { 'Retry-After': '1' }) - // mock Readwise Highlight API after 1 second - nock(READWISE_API_URL, { - reqheaders: { - Authorization: `Token ${integration.token}`, - 'Content-Type': 'application/json', - }, - }) - .post('/highlights') - .delay(1000) - .reply(200) - }) - - it('returns 200 with OK', async () => { - const res = await request - .post(endpoint(token, integrationName, action)) - .send(data) - .expect(200) - expect(res.text).to.eql('OK') - }) - }) - }) - - context('when entity type is highlight', () => { - before(() => { - data = { - message: { - data: Buffer.from( - JSON.stringify({ - userId: user.id, - type: 'highlight', - articleId: item.id, - }) - ).toString('base64'), - publishTime: new Date().toISOString(), - }, - } - // mock Readwise Highlight API - nock(READWISE_API_URL, { - reqheaders: { - Authorization: `Token ${integration.token}`, - 'Content-Type': 'application/json', - }, - }) - .post('/highlights', highlightsData) - .reply(200) - }) - - it('returns 200 with OK', async () => { - const res = await request - .post(endpoint(token, integrationName, action)) - .send(data) - .expect(200) - expect(res.text).to.eql('OK') - }) - }) - }) - - context('when action is sync_all', () => { - before(async () => { - action = 'sync_all' - data = { - message: { - data: Buffer.from( - JSON.stringify({ - userId: user.id, - }) - ).toString('base64'), - publishTime: new Date().toISOString(), - }, - } - // mock Readwise Highlight API - nock(READWISE_API_URL, { - reqheaders: { - Authorization: `Token ${integration.token}`, - 'Content-Type': 'application/json', - }, - }) - .post('/highlights', highlightsData) - .reply(200) - await updateIntegration( - integration.id, - { - syncedAt: null, - taskName: 'some task name', - }, - user.id - ) - }) - - it('returns 200 with OK', async () => { - const res = await request - .post(endpoint(token, integrationName, action)) - .send(data) - .expect(200) - expect(res.text).to.eql('OK') - }) - }) - }) - }) - }) - }) - - describe('import from integrations router', () => { - let integration: Integration - - before(async () => { - token = 'test token' - // create integration - integration = await saveIntegration( - { - user: { id: user.id }, - name: 'POCKET', - token, - type: IntegrationType.Import, - }, - user.id - ) - - // mock Pocket API - const reqBody = { - access_token: token, - consumer_key: env.pocket.consumerKey, - state: 'all', - detailType: 'complete', - since: 0, - sort: 'oldest', - count: 100, - offset: 0, - } - nock('https://getpocket.com', { - reqheaders: { - 'content-type': 'application/json', - 'x-accept': 'application/json', - }, - }) - .post('/v3/get', reqBody) - .reply(200, { - complete: 1, - list: { - '123': { - given_url: 'https://omnivore.app/pocket-import-test,test', - state: '0', - tags: { - '1234': { - tag: 'test', - }, - '1235': { - tag: 'new', - }, - }, - }, - }, - since: Date.now() / 1000, - }) - .post('/v3/get', { - ...reqBody, - offset: 1, - }) - .reply(200, { - list: {}, - }) - - // mock cloud storage - const mockBucket = new MockBucket('test') - sinon.replace( - Storage.prototype, - 'bucket', - sinon.fake.returns(mockBucket as never) - ) - }) - - after(async () => { - sinon.restore() - await deleteIntegrations(user.id, [integration.id]) - }) - - context('when integration is pocket', () => { - it('returns 200 with OK', async () => { - return request - .post(`${baseUrl}/import`) - .send({ - integrationId: integration.id, - }) - .set('Cookie', `auth=${authToken}`) - .expect(200) - }) - }) - }) -}) diff --git a/packages/api/tsconfig.json b/packages/api/tsconfig.json index 7c8caecfd..ffa966215 100644 --- a/packages/api/tsconfig.json +++ b/packages/api/tsconfig.json @@ -6,6 +6,10 @@ "compilerOptions": { "outDir": "dist" }, - "include": ["src", "test"], + "include": [ + "src", + "test", + "../integration-handler/test/integrations.test.ts" + ], "exclude": ["./src/generated", "./test"] } diff --git a/packages/integration-handler/.dockerignore b/packages/integration-handler/.dockerignore new file mode 100644 index 000000000..d8aea4ee6 --- /dev/null +++ b/packages/integration-handler/.dockerignore @@ -0,0 +1,5 @@ +node_modules +build +.env* +Dockerfile +.dockerignore diff --git a/packages/integration-handler/.gcloudignore b/packages/integration-handler/.gcloudignore new file mode 100644 index 000000000..ccc4eb240 --- /dev/null +++ b/packages/integration-handler/.gcloudignore @@ -0,0 +1,16 @@ +# This file specifies files that are *not* uploaded to Google Cloud Platform +# using gcloud. It follows the same syntax as .gitignore, with the addition of +# "#!include" directives (which insert the entries of the given .gitignore-style +# file at that point). +# +# For more information, run: +# $ gcloud topic gcloudignore +# +.gcloudignore +# If you would like to upload your .git directory, .gitignore file or files +# from your .gitignore file, remove the corresponding line +# below: +.git +.gitignore + +node_modules diff --git a/packages/integration-handler/Dockerfile-exporter b/packages/integration-handler/Dockerfile-exporter new file mode 100644 index 000000000..60eee9d5f --- /dev/null +++ b/packages/integration-handler/Dockerfile-exporter @@ -0,0 +1,26 @@ +FROM node:18.16-alpine + +# Run everything after as non-privileged user. +WORKDIR /app + +COPY package.json . +COPY yarn.lock . +COPY tsconfig.json . +COPY .eslintrc . + +COPY /packages/integration-handler/package.json ./packages/integration-handler/package.json + +RUN yarn install --pure-lockfile + +COPY /packages/integration-handler ./packages/integration-handler +RUN yarn workspace @omnivore/integration-handler build + +# After building, fetch the production dependencies +RUN rm -rf /app/packages/integration-handler/node_modules +RUN rm -rf /app/node_modules +RUN yarn install --pure-lockfile --production + +EXPOSE 8080 + +CMD ["yarn", "workspace", "@omnivore/integration-handler", "start_exporter"] + diff --git a/packages/integration-handler/Dockerfile-importer b/packages/integration-handler/Dockerfile-importer new file mode 100644 index 000000000..e3a790b8a --- /dev/null +++ b/packages/integration-handler/Dockerfile-importer @@ -0,0 +1,26 @@ +FROM node:18.16-alpine + +# Run everything after as non-privileged user. +WORKDIR /app + +COPY package.json . +COPY yarn.lock . +COPY tsconfig.json . +COPY .eslintrc . + +COPY /packages/integration-handler/package.json ./packages/integration-handler/package.json + +RUN yarn install --pure-lockfile + +COPY /packages/integration-handler ./packages/integration-handler +RUN yarn workspace @omnivore/integration-handler build + +# After building, fetch the production dependencies +RUN rm -rf /app/packages/integration-handler/node_modules +RUN rm -rf /app/node_modules +RUN yarn install --pure-lockfile --production + +EXPOSE 8080 + +CMD ["yarn", "workspace", "@omnivore/integration-handler", "start_importer"] + diff --git a/packages/integration-handler/package.json b/packages/integration-handler/package.json index cf8a5fe0b..e1ccc1fcb 100644 --- a/packages/integration-handler/package.json +++ b/packages/integration-handler/package.json @@ -30,6 +30,7 @@ "@sentry/serverless": "^7.30.0", "axios": "^1.2.2", "csv-stringify": "^6.4.0", + "dotenv": "^16.0.1", "jsonwebtoken": "^8.5.1", "luxon": "^3.2.1", "uuid": "^9.0.0" diff --git a/packages/integration-handler/src/index.ts b/packages/integration-handler/src/index.ts index 73a3c23e2..d32ef8a08 100644 --- a/packages/integration-handler/src/index.ts +++ b/packages/integration-handler/src/index.ts @@ -1,11 +1,12 @@ +import { File, Storage } from '@google-cloud/storage' import * as Sentry from '@sentry/serverless' -import * as jwt from 'jsonwebtoken' -import { getIntegrationClient, updateIntegration } from './integrations' -import { search } from './item' import { stringify } from 'csv-stringify' +import * as dotenv from 'dotenv' // see https://github.com/motdotla/dotenv#how-do-i-use-dotenv-with-import +import * as jwt from 'jsonwebtoken' import { DateTime } from 'luxon' import { v4 as uuidv4 } from 'uuid' -import { File, Storage } from '@google-cloud/storage' +import { getIntegrationClient, updateIntegration } from './integrations' +import { search } from './item' interface IntegrationRequest { integrationId: string @@ -18,6 +19,8 @@ interface Claims { token: string } +dotenv.config() + Sentry.GCPFunction.init({ dsn: process.env.SENTRY_DSN, tracesSampleRate: 0, @@ -43,6 +46,8 @@ const createGCSFile = (bucket: string, filename: string): File => { export const exporter = Sentry.GCPFunction.wrapHttpFunction( async (req, res) => { + console.log('start to export to integration') + const JWT_SECRET = process.env.JWT_SECRET const REST_BACKEND_ENDPOINT = process.env.REST_BACKEND_ENDPOINT @@ -50,8 +55,7 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction( return res.status(500).send('Environment not configured correctly') } - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - const token = (req.cookies?.token || req.headers.authorization) as string + const token = req.get('Omnivore-Authorization') if (!token) { return res.status(401).send({ errorCode: 'INVALID_TOKEN' }) } @@ -77,9 +81,10 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction( let hasMore = true let after = '0' while (hasMore) { + console.log('searching for items...') const response = await search( REST_BACKEND_ENDPOINT, - claims.token, + token, client.highlightOnly, new Date(syncAt), '50', @@ -100,6 +105,7 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction( break } + console.log('exporting items...') const synced = await client.export(claims.token, items) if (!synced) { console.error('failed to export item', { @@ -108,6 +114,7 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction( return res.status(400).send('Failed to sync') } + console.log('updating integration...') // update integration syncedAt if successful const updated = await updateIntegration( REST_BACKEND_ENDPOINT, @@ -115,7 +122,8 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction( items[items.length - 1].updatedAt, integrationName, claims.token, - token + token, + 'EXPORT' ) if (!updated) { @@ -128,6 +136,8 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction( // avoid rate limiting await wait(500) } + + console.log('done') } catch (err) { console.error('export with integration failed', err) return res.status(500).send(err) @@ -139,16 +149,17 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction( export const importer = Sentry.GCPFunction.wrapHttpFunction( async (req, res) => { + console.log('start to import from integration') + const JWT_SECRET = process.env.JWT_SECRET const REST_BACKEND_ENDPOINT = process.env.REST_BACKEND_ENDPOINT - const GCS_BUCKET = process.env.GCS_BUCKET + const GCS_BUCKET = process.env.GCS_UPLOAD_BUCKET if (!JWT_SECRET || !REST_BACKEND_ENDPOINT || !GCS_BUCKET) { return res.status(500).send('Environment not configured correctly') } - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - const token = (req.cookies?.token || req.headers.authorization) as string + const token = req.get('Omnivore-Authorization') if (!token) { return res.status(401).send({ errorCode: 'INVALID_TOKEN' }) } @@ -175,6 +186,7 @@ export const importer = Sentry.GCPFunction.wrapHttpFunction( let syncedAt = req.body.syncAt const since = syncedAt + console.log('importing pages from integration...') // get pages from integration const retrieved = await integrationClient.retrieve({ token: claims.token, @@ -183,6 +195,8 @@ export const importer = Sentry.GCPFunction.wrapHttpFunction( }) syncedAt = retrieved.since || Date.now() + console.log('uploading items...') + let retrievedData = retrieved.data // if there are pages to import if (retrievedData.length > 0) { @@ -224,6 +238,7 @@ export const importer = Sentry.GCPFunction.wrapHttpFunction( size: retrievedData.length, }) + console.log('uploading integration...') // update the integration's syncedAt and remove taskName const result = await updateIntegration( REST_BACKEND_ENDPOINT, @@ -231,7 +246,8 @@ export const importer = Sentry.GCPFunction.wrapHttpFunction( new Date(syncedAt), req.body.integrationName, claims.token, - token + token, + 'IMPORT' ) if (!result) { console.error('failed to update integration', { @@ -241,10 +257,13 @@ export const importer = Sentry.GCPFunction.wrapHttpFunction( } } while (retrievedData.length > 0 && offset < 20000) // limit to 20k pages } + + console.log('done') } catch (err) { console.error('import pages from integration failed', err) return res.status(500).send(err) } finally { + console.log('closing write stream') writeStream?.end() } diff --git a/packages/integration-handler/src/integrations/index.ts b/packages/integration-handler/src/integrations/index.ts index f3aa66913..65c35a047 100644 --- a/packages/integration-handler/src/integrations/index.ts +++ b/packages/integration-handler/src/integrations/index.ts @@ -30,7 +30,8 @@ export const updateIntegration = async ( syncedAt: Date, name: string, integrationToken: string, - token: string + token: string, + type: string ): Promise => { const requestData = JSON.stringify({ query: ` @@ -48,11 +49,14 @@ export const updateIntegration = async ( } }`, variables: { - id, - syncedAt, - name, - token: integrationToken, - enabled: true, + input: { + id, + syncedAt, + name, + token: integrationToken, + enabled: true, + type, + }, }, }) diff --git a/packages/integration-handler/src/item.ts b/packages/integration-handler/src/item.ts index 6d09b4451..cb535904c 100644 --- a/packages/integration-handler/src/item.ts +++ b/packages/integration-handler/src/item.ts @@ -59,11 +59,8 @@ export const search = async ( node { id slug - labels { - id - } - isArchived - readingProgressPercent + url + updatedAt title image author @@ -77,6 +74,10 @@ export const search = async ( } } } + pageInfo { + hasNextPage + endCursor + } } ... on SearchError { errorCodes From c7953642c8d01b94bd237877746c083d35bb4f87 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Fri, 27 Oct 2023 12:21:10 +0800 Subject: [PATCH 07/16] update Dockerfile --- packages/integration-handler/Dockerfile-exporter | 4 +++- packages/integration-handler/Dockerfile-importer | 4 +++- packages/integration-handler/package.json | 1 + 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/packages/integration-handler/Dockerfile-exporter b/packages/integration-handler/Dockerfile-exporter index 60eee9d5f..eb069aec9 100644 --- a/packages/integration-handler/Dockerfile-exporter +++ b/packages/integration-handler/Dockerfile-exporter @@ -1,4 +1,6 @@ -FROM node:18.16-alpine +FROM node:18.16 + +RUN apt-get update && apt-get install -y g++ make python3 # Run everything after as non-privileged user. WORKDIR /app diff --git a/packages/integration-handler/Dockerfile-importer b/packages/integration-handler/Dockerfile-importer index e3a790b8a..ef8abcb15 100644 --- a/packages/integration-handler/Dockerfile-importer +++ b/packages/integration-handler/Dockerfile-importer @@ -1,4 +1,6 @@ -FROM node:18.16-alpine +FROM node:18.16 + +RUN apt-get update && apt-get install -y g++ make python3 # Run everything after as non-privileged user. WORKDIR /app diff --git a/packages/integration-handler/package.json b/packages/integration-handler/package.json index e1ccc1fcb..1b1077dd2 100644 --- a/packages/integration-handler/package.json +++ b/packages/integration-handler/package.json @@ -19,6 +19,7 @@ "devDependencies": { "@types/chai": "^4.3.4", "@types/jsonwebtoken": "^8.5.0", + "@types/luxon": "^1.25.0", "@types/mocha": "^10.0.1", "@types/node": "^14.11.2", "@types/uuid": "^9.0.0", From 1b8a9282b68b8144b95961f4b6fd6ea0da4dfd6d Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Fri, 27 Oct 2023 12:42:48 +0800 Subject: [PATCH 08/16] fix tests --- packages/api/test/resolvers/integrations.test.ts | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/packages/api/test/resolvers/integrations.test.ts b/packages/api/test/resolvers/integrations.test.ts index bb9e56929..753024ecf 100644 --- a/packages/api/test/resolvers/integrations.test.ts +++ b/packages/api/test/resolvers/integrations.test.ts @@ -266,17 +266,6 @@ describe('Integrations resolvers', () => { expect(res.body.data.setIntegration.integration.enabled).to.be .true }) - - it('creates new cloud task to sync all existing articles and highlights', async () => { - const res = await graphqlRequest( - query(integrationId, integrationName, token, enabled), - authToken - ) - const integration = await findIntegration({ - id: res.body.data.setIntegration.integration.id, - }, loginUser.id) - expect(integration?.taskName).not.to.be.null - }) }) }) }) From dd1cb188e44140e5d53e379f19735b97ec80a414 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Fri, 27 Oct 2023 13:01:23 +0800 Subject: [PATCH 09/16] fix tests --- packages/import-handler/test/util.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/import-handler/test/util.ts b/packages/import-handler/test/util.ts index 54a8b5082..d90aaa92d 100644 --- a/packages/import-handler/test/util.ts +++ b/packages/import-handler/test/util.ts @@ -28,5 +28,6 @@ export const stubImportCtx = async (): Promise => { }, redisClient, taskId: '', + source: 'csv-importer', } } From d84aae1d8e6177f1be2bdfa2d51d64d1115246e6 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Fri, 27 Oct 2023 15:46:18 +0800 Subject: [PATCH 10/16] update Dockerfile --- packages/integration-handler/Dockerfile-exporter | 4 +--- packages/integration-handler/Dockerfile-importer | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/packages/integration-handler/Dockerfile-exporter b/packages/integration-handler/Dockerfile-exporter index eb069aec9..60eee9d5f 100644 --- a/packages/integration-handler/Dockerfile-exporter +++ b/packages/integration-handler/Dockerfile-exporter @@ -1,6 +1,4 @@ -FROM node:18.16 - -RUN apt-get update && apt-get install -y g++ make python3 +FROM node:18.16-alpine # Run everything after as non-privileged user. WORKDIR /app diff --git a/packages/integration-handler/Dockerfile-importer b/packages/integration-handler/Dockerfile-importer index ef8abcb15..e3a790b8a 100644 --- a/packages/integration-handler/Dockerfile-importer +++ b/packages/integration-handler/Dockerfile-importer @@ -1,6 +1,4 @@ -FROM node:18.16 - -RUN apt-get update && apt-get install -y g++ make python3 +FROM node:18.16-alpine # Run everything after as non-privileged user. WORKDIR /app From 8b6dbf0604a689f994ea447df173a08ef3615f16 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Mon, 30 Oct 2023 11:47:54 +0800 Subject: [PATCH 11/16] filter out archived items by default when importing from pocket --- packages/integration-handler/src/index.ts | 2 ++ .../src/integrations/integration.ts | 1 + .../src/integrations/pocket.ts | 22 +++++++++++++------ 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/packages/integration-handler/src/index.ts b/packages/integration-handler/src/index.ts index d32ef8a08..4d739c8de 100644 --- a/packages/integration-handler/src/index.ts +++ b/packages/integration-handler/src/index.ts @@ -12,6 +12,7 @@ interface IntegrationRequest { integrationId: string syncAt: number // unix timestamp in milliseconds integrationName: string + includeArchived?: boolean } interface Claims { @@ -229,6 +230,7 @@ export const importer = Sentry.GCPFunction.wrapHttpFunction( token: claims.token, since, offset, + includeArchived: req.body.includeArchived, }) syncedAt = retrieved.since || Date.now() retrievedData = retrieved.data diff --git a/packages/integration-handler/src/integrations/integration.ts b/packages/integration-handler/src/integrations/integration.ts index 78b035db9..d501f88bd 100644 --- a/packages/integration-handler/src/integrations/integration.ts +++ b/packages/integration-handler/src/integrations/integration.ts @@ -16,6 +16,7 @@ export interface RetrieveRequest { since?: number // unix timestamp in milliseconds count?: number offset?: number + includeArchived?: boolean } export abstract class IntegrationClient { diff --git a/packages/integration-handler/src/integrations/pocket.ts b/packages/integration-handler/src/integrations/pocket.ts index fa4003c4d..1a68d4fe4 100644 --- a/packages/integration-handler/src/integrations/pocket.ts +++ b/packages/integration-handler/src/integrations/pocket.ts @@ -95,6 +95,7 @@ export class PocketClient extends IntegrationClient { since = 0, count = 100, offset = 0, + includeArchived = false, }: RetrieveRequest): Promise => { const pocketData = await this.retrievePocketData( token, @@ -112,13 +113,20 @@ export class PocketClient extends IntegrationClient { '1': 'ARCHIVED', '2': 'DELETED', } - const data = pocketItems.map((item) => ({ - url: item.given_url, - labels: item.tags - ? Object.values(item.tags).map((tag) => tag.tag) - : undefined, - state: statusToState[item.status], - })) + const data = pocketItems + .map((item) => ({ + url: item.given_url, + labels: item.tags + ? Object.values(item.tags).map((tag) => tag.tag) + : undefined, + state: statusToState[item.status], + })) + .filter((item) => { + if (item.state === 'DELETED') { + return false + } + return includeArchived || item.state !== 'ARCHIVED' + }) if (pocketData.error) { throw new Error(`Error retrieving pocket data: ${pocketData.error}`) From 761d0574a989169a201de4a6cc57fec3e1be5ddf Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Mon, 30 Oct 2023 14:39:22 +0800 Subject: [PATCH 12/16] add state to the integration handler request payload --- packages/integration-handler/src/index.ts | 5 ++-- .../src/integrations/integration.ts | 9 ++++++- .../src/integrations/pocket.ts | 25 +++++++++++++++---- 3 files changed, 31 insertions(+), 8 deletions(-) diff --git a/packages/integration-handler/src/index.ts b/packages/integration-handler/src/index.ts index 4d739c8de..4b594d1f3 100644 --- a/packages/integration-handler/src/index.ts +++ b/packages/integration-handler/src/index.ts @@ -6,13 +6,14 @@ import * as jwt from 'jsonwebtoken' import { DateTime } from 'luxon' import { v4 as uuidv4 } from 'uuid' import { getIntegrationClient, updateIntegration } from './integrations' +import { State } from './integrations/integration' import { search } from './item' interface IntegrationRequest { integrationId: string syncAt: number // unix timestamp in milliseconds integrationName: string - includeArchived?: boolean + state?: State } interface Claims { @@ -230,7 +231,7 @@ export const importer = Sentry.GCPFunction.wrapHttpFunction( token: claims.token, since, offset, - includeArchived: req.body.includeArchived, + state: req.body.state, }) syncedAt = retrieved.since || Date.now() retrievedData = retrieved.data diff --git a/packages/integration-handler/src/integrations/integration.ts b/packages/integration-handler/src/integrations/integration.ts index d501f88bd..83a50fe80 100644 --- a/packages/integration-handler/src/integrations/integration.ts +++ b/packages/integration-handler/src/integrations/integration.ts @@ -1,5 +1,12 @@ import { Item } from '../item' +export enum State { + ARCHIVED = 'ARCHIVED', + UNREAD = 'UNREAD', + UNARCHIVED = 'UNARCHIVED', + ALL = 'ALL', +} + export interface RetrievedData { url: string labels?: string[] @@ -16,7 +23,7 @@ export interface RetrieveRequest { since?: number // unix timestamp in milliseconds count?: number offset?: number - includeArchived?: boolean + state?: State } export abstract class IntegrationClient { diff --git a/packages/integration-handler/src/integrations/pocket.ts b/packages/integration-handler/src/integrations/pocket.ts index 1a68d4fe4..a8720cf1d 100644 --- a/packages/integration-handler/src/integrations/pocket.ts +++ b/packages/integration-handler/src/integrations/pocket.ts @@ -3,6 +3,7 @@ import { IntegrationClient, RetrievedResult, RetrieveRequest, + State, } from './integration' interface PocketResponse { @@ -60,7 +61,8 @@ export class PocketClient extends IntegrationClient { accessToken: string, since: number, // unix timestamp in seconds count = 100, - offset = 0 + offset = 0, + state = 'all' ): Promise => { const url = `${this.apiUrl}/get` try { @@ -69,7 +71,7 @@ export class PocketClient extends IntegrationClient { { consumer_key: process.env.POCKET_CONSUMER_KEY, access_token: accessToken, - state: 'all', + state, detailType: 'complete', since, sort: 'oldest', @@ -95,13 +97,25 @@ export class PocketClient extends IntegrationClient { since = 0, count = 100, offset = 0, - includeArchived = false, + state = State.UNARCHIVED, }: RetrieveRequest): Promise => { + let pocketItemState = 'all' + + switch (state) { + case State.ARCHIVED: + pocketItemState = 'archive' + break + case State.UNREAD: + pocketItemState = 'unread' + break + } + const pocketData = await this.retrievePocketData( token, since / 1000, count, - offset + offset, + pocketItemState ) if (!pocketData) { throw new Error('Error retrieving pocket data') @@ -125,7 +139,8 @@ export class PocketClient extends IntegrationClient { if (item.state === 'DELETED') { return false } - return includeArchived || item.state !== 'ARCHIVED' + + return state !== State.UNARCHIVED || item.state !== 'ARCHIVED' }) if (pocketData.error) { From 9be25fa418539ee521443edc2c5847daae398418 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Mon, 30 Oct 2023 15:07:52 +0800 Subject: [PATCH 13/16] add import_item_state column to integrations table --- ...4.do.add_import_item_state_to_integration.sql | 16 ++++++++++++++++ ...undo.add_import_item_state_to_integration.sql | 11 +++++++++++ 2 files changed, 27 insertions(+) create mode 100755 packages/db/migrations/0144.do.add_import_item_state_to_integration.sql create mode 100755 packages/db/migrations/0144.undo.add_import_item_state_to_integration.sql diff --git a/packages/db/migrations/0144.do.add_import_item_state_to_integration.sql b/packages/db/migrations/0144.do.add_import_item_state_to_integration.sql new file mode 100755 index 000000000..f7979cd80 --- /dev/null +++ b/packages/db/migrations/0144.do.add_import_item_state_to_integration.sql @@ -0,0 +1,16 @@ +-- Type: DO +-- Name: add_import_item_state_to_integration +-- Description: Add import_item_state column to integration table + +BEGIN; + +CREATE type import_item_state_type AS ENUM ( + 'UNREAD', + 'UNARCHIVED', + 'ARCHIVED', + 'ALL' +); + +ALTER TABLE omnivore.integrations ADD COLUMN import_item_state import_item_state_type; + +COMMIT; diff --git a/packages/db/migrations/0144.undo.add_import_item_state_to_integration.sql b/packages/db/migrations/0144.undo.add_import_item_state_to_integration.sql new file mode 100755 index 000000000..b64ce4f89 --- /dev/null +++ b/packages/db/migrations/0144.undo.add_import_item_state_to_integration.sql @@ -0,0 +1,11 @@ +-- Type: UNDO +-- Name: add_import_item_state_to_integration +-- Description: Add import_item_state column to integration table + +BEGIN; + +ALTER TABLE omnivore.integrations DROP COLUMN IF EXISTS import_item_state; + +DROP TYPE IF EXISTS import_item_state_type; + +COMMIT; From a52869212568f216c7b8c4cc489adb43b717d96b Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Mon, 30 Oct 2023 15:12:18 +0800 Subject: [PATCH 14/16] add import_item_state to integration entity --- packages/api/src/entity/integration.ts | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/packages/api/src/entity/integration.ts b/packages/api/src/entity/integration.ts index 8f03d455a..72855e7dc 100644 --- a/packages/api/src/entity/integration.ts +++ b/packages/api/src/entity/integration.ts @@ -14,6 +14,13 @@ export enum IntegrationType { Import = 'IMPORT', } +export enum ImportItemState { + UNREAD = 'UNREAD', + UNARCHIVED = 'UNARCHIVED', + ARCHIVED = 'ARCHIVED', + ALL = 'ALL', +} + @Entity({ name: 'integrations' }) export class Integration { @PrimaryGeneratedColumn('uuid') @@ -49,4 +56,7 @@ export class Integration { @Column('text', { nullable: true }) taskName?: string | null + + @Column('enum', { enum: ImportItemState, nullable: true }) + importItemState?: ImportItemState | null } From f6c0b2ba347a1abf478c2e67e8973b1c4bdf0ba9 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Mon, 30 Oct 2023 16:09:10 +0800 Subject: [PATCH 15/16] add import_item_state to set integration api payload --- packages/api/src/entity/integration.ts | 8 ++++---- packages/api/src/generated/graphql.ts | 9 +++++++++ packages/api/src/generated/schema.graphql | 8 ++++++++ packages/api/src/resolvers/integrations/index.ts | 10 +++++++++- packages/api/src/schema.ts | 8 ++++++++ 5 files changed, 38 insertions(+), 5 deletions(-) diff --git a/packages/api/src/entity/integration.ts b/packages/api/src/entity/integration.ts index 72855e7dc..e446f10c3 100644 --- a/packages/api/src/entity/integration.ts +++ b/packages/api/src/entity/integration.ts @@ -15,10 +15,10 @@ export enum IntegrationType { } export enum ImportItemState { - UNREAD = 'UNREAD', - UNARCHIVED = 'UNARCHIVED', - ARCHIVED = 'ARCHIVED', - ALL = 'ALL', + Unread = 'UNREAD', + Unarchived = 'UNARCHIVED', + Archived = 'ARCHIVED', + All = 'ALL', } @Entity({ name: 'integrations' }) diff --git a/packages/api/src/generated/graphql.ts b/packages/api/src/generated/graphql.ts index e42282815..8041b6beb 100644 --- a/packages/api/src/generated/graphql.ts +++ b/packages/api/src/generated/graphql.ts @@ -970,6 +970,13 @@ export type ImportFromIntegrationSuccess = { success: Scalars['Boolean']; }; +export enum ImportItemState { + All = 'ALL', + Archived = 'ARCHIVED', + Unarchived = 'UNARCHIVED', + Unread = 'UNREAD' +} + export type Integration = { __typename?: 'Integration'; createdAt: Scalars['Date']; @@ -2375,6 +2382,7 @@ export enum SetIntegrationErrorCode { export type SetIntegrationInput = { enabled: Scalars['Boolean']; id?: InputMaybe; + importItemState?: InputMaybe; name: Scalars['String']; syncedAt?: InputMaybe; token: Scalars['String']; @@ -3495,6 +3503,7 @@ export type ResolversTypes = { ImportFromIntegrationErrorCode: ImportFromIntegrationErrorCode; ImportFromIntegrationResult: ResolversTypes['ImportFromIntegrationError'] | ResolversTypes['ImportFromIntegrationSuccess']; ImportFromIntegrationSuccess: ResolverTypeWrapper; + ImportItemState: ImportItemState; Int: ResolverTypeWrapper; Integration: ResolverTypeWrapper; IntegrationType: IntegrationType; diff --git a/packages/api/src/generated/schema.graphql b/packages/api/src/generated/schema.graphql index dfb7c1677..335c93409 100644 --- a/packages/api/src/generated/schema.graphql +++ b/packages/api/src/generated/schema.graphql @@ -863,6 +863,13 @@ type ImportFromIntegrationSuccess { success: Boolean! } +enum ImportItemState { + ALL + ARCHIVED + UNARCHIVED + UNREAD +} + type Integration { createdAt: Date! enabled: Boolean! @@ -1834,6 +1841,7 @@ enum SetIntegrationErrorCode { input SetIntegrationInput { enabled: Boolean! id: ID + importItemState: ImportItemState name: String! syncedAt: Date token: String! diff --git a/packages/api/src/resolvers/integrations/index.ts b/packages/api/src/resolvers/integrations/index.ts index 82a67e6b5..fe9b9c33d 100644 --- a/packages/api/src/resolvers/integrations/index.ts +++ b/packages/api/src/resolvers/integrations/index.ts @@ -1,5 +1,9 @@ import { DeepPartial } from 'typeorm' -import { Integration, IntegrationType } from '../../entity/integration' +import { + ImportItemState, + Integration, + IntegrationType, +} from '../../entity/integration' import { env } from '../../env' import { DeleteIntegrationError, @@ -47,6 +51,10 @@ export const setIntegrationResolver = authorized< id: input.id || undefined, type: input.type || IntegrationType.Export, syncedAt: input.syncedAt ? new Date(input.syncedAt) : undefined, + importItemState: + input.type === IntegrationType.Import + ? input.importItemState ?? ImportItemState.Unarchived // default to unarchived + : undefined, } if (input.id) { // Update diff --git a/packages/api/src/schema.ts b/packages/api/src/schema.ts index d257b5069..c6bb63f9d 100755 --- a/packages/api/src/schema.ts +++ b/packages/api/src/schema.ts @@ -1972,6 +1972,13 @@ const schema = gql` ALREADY_EXISTS } + enum ImportItemState { + UNREAD + UNARCHIVED + ARCHIVED + ALL + } + input SetIntegrationInput { id: ID name: String! @@ -1979,6 +1986,7 @@ const schema = gql` token: String! enabled: Boolean! syncedAt: Date + importItemState: ImportItemState } union IntegrationsResult = IntegrationsSuccess | IntegrationsError From 8df6fb3ae2c6909895d85fe3307b048461a212e5 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Mon, 30 Oct 2023 16:22:09 +0800 Subject: [PATCH 16/16] import unarchived items by default --- packages/api/src/resolvers/integrations/index.ts | 5 +++-- packages/api/src/utils/createTask.ts | 5 ++++- packages/integration-handler/src/index.ts | 4 +++- packages/integration-handler/src/integrations/integration.ts | 2 +- packages/integration-handler/src/integrations/pocket.ts | 2 +- 5 files changed, 12 insertions(+), 6 deletions(-) diff --git a/packages/api/src/resolvers/integrations/index.ts b/packages/api/src/resolvers/integrations/index.ts index fe9b9c33d..dc2f861c1 100644 --- a/packages/api/src/resolvers/integrations/index.ts +++ b/packages/api/src/resolvers/integrations/index.ts @@ -53,7 +53,7 @@ export const setIntegrationResolver = authorized< syncedAt: input.syncedAt ? new Date(input.syncedAt) : undefined, importItemState: input.type === IntegrationType.Import - ? input.importItemState ?? ImportItemState.Unarchived // default to unarchived + ? input.importItemState || ImportItemState.Unarchived // default to unarchived : undefined, } if (input.id) { @@ -242,7 +242,8 @@ export const importFromIntegrationResolver = authorized< integration.id, integration.name, integration.syncedAt?.getTime() || 0, - authToken + authToken, + integration.importItemState || ImportItemState.Unarchived ) // update task name in integration await updateIntegration(integration.id, { taskName }, uid) diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index be71a64a3..643772116 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -6,6 +6,7 @@ import { google } from '@google-cloud/tasks/build/protos/protos' import axios from 'axios' import { nanoid } from 'nanoid' import { DeepPartial } from 'typeorm' +import { ImportItemState } from '../entity/integration' import { Recommendation } from '../entity/recommendation' import { env } from '../env' import { @@ -459,13 +460,15 @@ export const enqueueImportFromIntegration = async ( integrationId: string, integrationName: string, syncAt: number, // unix timestamp in milliseconds - authToken: string + authToken: string, + state: ImportItemState ): Promise => { const { GOOGLE_CLOUD_PROJECT } = process.env const payload = { integrationId, integrationName, syncAt, + state, } const headers = { diff --git a/packages/integration-handler/src/index.ts b/packages/integration-handler/src/index.ts index 4b594d1f3..dabccce5e 100644 --- a/packages/integration-handler/src/index.ts +++ b/packages/integration-handler/src/index.ts @@ -187,6 +187,7 @@ export const importer = Sentry.GCPFunction.wrapHttpFunction( let offset = 0 let syncedAt = req.body.syncAt const since = syncedAt + const state = req.body.state || State.UNARCHIVED // default to unarchived console.log('importing pages from integration...') // get pages from integration @@ -194,6 +195,7 @@ export const importer = Sentry.GCPFunction.wrapHttpFunction( token: claims.token, since, offset, + state, }) syncedAt = retrieved.since || Date.now() @@ -231,7 +233,7 @@ export const importer = Sentry.GCPFunction.wrapHttpFunction( token: claims.token, since, offset, - state: req.body.state, + state, }) syncedAt = retrieved.since || Date.now() retrievedData = retrieved.data diff --git a/packages/integration-handler/src/integrations/integration.ts b/packages/integration-handler/src/integrations/integration.ts index 83a50fe80..40fbd38f7 100644 --- a/packages/integration-handler/src/integrations/integration.ts +++ b/packages/integration-handler/src/integrations/integration.ts @@ -23,7 +23,7 @@ export interface RetrieveRequest { since?: number // unix timestamp in milliseconds count?: number offset?: number - state?: State + state: State } export abstract class IntegrationClient { diff --git a/packages/integration-handler/src/integrations/pocket.ts b/packages/integration-handler/src/integrations/pocket.ts index a8720cf1d..e2f911717 100644 --- a/packages/integration-handler/src/integrations/pocket.ts +++ b/packages/integration-handler/src/integrations/pocket.ts @@ -97,7 +97,7 @@ export class PocketClient extends IntegrationClient { since = 0, count = 100, offset = 0, - state = State.UNARCHIVED, + state, }: RetrieveRequest): Promise => { let pocketItemState = 'all'