From 5edba30e232cae1da41ddc2c9e27eeb200ef45ed Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Thu, 26 Oct 2023 18:38:39 +0800 Subject: [PATCH] create an integration handler for syncing with readwise --- packages/api/src/generated/graphql.ts | 1 + packages/api/src/generated/schema.graphql | 1 + .../api/src/resolvers/integrations/index.ts | 1 + packages/api/src/schema.ts | 1 + packages/integration-handler/.eslintignore | 4 + packages/integration-handler/.eslintrc | 6 + .../integration-handler/mocha-config.json | 5 + packages/integration-handler/package.json | 56 ++++++++ packages/integration-handler/src/index.ts | 127 +++++++++++++++++ .../src/integrations/index.ts | 82 +++++++++++ .../src/integrations/integration.ts | 33 +++++ .../src/integrations/pocket.ts | 132 ++++++++++++++++++ .../src/integrations/readwise.ts | 114 +++++++++++++++ packages/integration-handler/src/item.ts | 114 +++++++++++++++ .../test/babel-register.js | 3 + .../integration-handler/test/stub.test.ts | 8 ++ packages/integration-handler/tsconfig.json | 11 ++ 17 files changed, 699 insertions(+) create mode 100644 packages/integration-handler/.eslintignore create mode 100644 packages/integration-handler/.eslintrc create mode 100644 packages/integration-handler/mocha-config.json create mode 100644 packages/integration-handler/package.json create mode 100644 packages/integration-handler/src/index.ts create mode 100644 packages/integration-handler/src/integrations/index.ts create mode 100644 packages/integration-handler/src/integrations/integration.ts create mode 100644 packages/integration-handler/src/integrations/pocket.ts create mode 100644 packages/integration-handler/src/integrations/readwise.ts create mode 100644 packages/integration-handler/src/item.ts create mode 100644 packages/integration-handler/test/babel-register.js create mode 100644 packages/integration-handler/test/stub.test.ts create mode 100644 packages/integration-handler/tsconfig.json diff --git a/packages/api/src/generated/graphql.ts b/packages/api/src/generated/graphql.ts index fc5e22609..e42282815 100644 --- a/packages/api/src/generated/graphql.ts +++ b/packages/api/src/generated/graphql.ts @@ -2376,6 +2376,7 @@ export type SetIntegrationInput = { enabled: Scalars['Boolean']; id?: InputMaybe; name: Scalars['String']; + syncedAt?: InputMaybe; token: Scalars['String']; type?: InputMaybe; }; diff --git a/packages/api/src/generated/schema.graphql b/packages/api/src/generated/schema.graphql index 378c8bce1..dfb7c1677 100644 --- a/packages/api/src/generated/schema.graphql +++ b/packages/api/src/generated/schema.graphql @@ -1835,6 +1835,7 @@ input SetIntegrationInput { enabled: Boolean! id: ID name: String! + syncedAt: Date token: String! type: IntegrationType } diff --git a/packages/api/src/resolvers/integrations/index.ts b/packages/api/src/resolvers/integrations/index.ts index 8bf1b396a..5f0dce401 100644 --- a/packages/api/src/resolvers/integrations/index.ts +++ b/packages/api/src/resolvers/integrations/index.ts @@ -45,6 +45,7 @@ export const setIntegrationResolver = authorized< user: { id: uid }, id: input.id || undefined, type: input.type || IntegrationType.Export, + syncedAt: input.syncedAt ? new Date(input.syncedAt) : undefined, } if (input.id) { // Update diff --git a/packages/api/src/schema.ts b/packages/api/src/schema.ts index ac13b36b9..d257b5069 100755 --- a/packages/api/src/schema.ts +++ b/packages/api/src/schema.ts @@ -1978,6 +1978,7 @@ const schema = gql` type: IntegrationType token: String! enabled: Boolean! + syncedAt: Date } union IntegrationsResult = IntegrationsSuccess | IntegrationsError diff --git a/packages/integration-handler/.eslintignore b/packages/integration-handler/.eslintignore new file mode 100644 index 000000000..b741470fc --- /dev/null +++ b/packages/integration-handler/.eslintignore @@ -0,0 +1,4 @@ +node_modules/ +dist/ +readabilityjs/ +src/generated/ diff --git a/packages/integration-handler/.eslintrc b/packages/integration-handler/.eslintrc new file mode 100644 index 000000000..e006282a6 --- /dev/null +++ b/packages/integration-handler/.eslintrc @@ -0,0 +1,6 @@ +{ + "extends": "../../.eslintrc", + "parserOptions": { + "project": "tsconfig.json" + } +} \ No newline at end of file diff --git a/packages/integration-handler/mocha-config.json b/packages/integration-handler/mocha-config.json new file mode 100644 index 000000000..44d1d24c1 --- /dev/null +++ b/packages/integration-handler/mocha-config.json @@ -0,0 +1,5 @@ +{ + "extension": ["ts"], + "spec": "test/**/*.test.ts", + "require": "test/babel-register.js" + } \ No newline at end of file diff --git a/packages/integration-handler/package.json b/packages/integration-handler/package.json new file mode 100644 index 000000000..c25454038 --- /dev/null +++ b/packages/integration-handler/package.json @@ -0,0 +1,56 @@ +{ + "name": "@omnivore/integration-handler", + "version": "1.0.0", + "description": "", + "main": "build/src/index.js", + "files": [ + "build/src" + ], + "keywords": [], + "license": "Apache-2.0", + "scripts": { + "test": "yarn mocha -r ts-node/register --config mocha-config.json", + "lint": "eslint src --ext ts,js,tsx,jsx", + "compile": "tsc", + "build": "tsc", + "start_exporter": "functions-framework --target=exporter" + }, + "devDependencies": { + "@types/chai": "^4.3.4", + "@types/chai-string": "^1.4.2", + "@types/dompurify": "^2.4.0", + "@types/fs-extra": "^11.0.1", + "@types/glob": "^8.0.1", + "@types/jsonwebtoken": "^8.5.0", + "@types/mocha": "^10.0.1", + "@types/node": "^14.11.2", + "@types/unzip-stream": "^0.3.1", + "@types/urlsafe-base64": "^1.0.28", + "@types/uuid": "^9.0.0", + "copyfiles": "^2.4.1", + "eslint-plugin-prettier": "^4.0.0" + }, + "dependencies": { + "@fast-csv/parse": "^4.3.6", + "@google-cloud/functions-framework": "3.1.2", + "@google-cloud/storage": "^7.0.1", + "@google-cloud/tasks": "^4.0.0", + "@omnivore/readability": "1.0.0", + "@sentry/serverless": "^7.30.0", + "@types/express": "^4.17.13", + "axios": "^1.2.2", + "dompurify": "^2.4.3", + "fs-extra": "^11.1.0", + "glob": "^8.1.0", + "jsonwebtoken": "^8.5.1", + "linkedom": "^0.14.21", + "nodemon": "^2.0.15", + "redis": "^4.3.1", + "unzip-stream": "^0.3.1", + "urlsafe-base64": "^1.0.0", + "uuid": "^9.0.0" + }, + "volta": { + "extends": "../../package.json" + } +} diff --git a/packages/integration-handler/src/index.ts b/packages/integration-handler/src/index.ts new file mode 100644 index 000000000..349c7c65d --- /dev/null +++ b/packages/integration-handler/src/index.ts @@ -0,0 +1,127 @@ +import * as Sentry from '@sentry/serverless' +import * as jwt from 'jsonwebtoken' +import { getIntegrationClient, updateIntegration } from './integrations' +import { search } from './item' + +interface ExportRequest { + integrationId: string + syncAt: number // unix timestamp in milliseconds + integrationName: string +} + +interface Claims { + uid: string + token: string +} + +Sentry.GCPFunction.init({ + dsn: process.env.SENTRY_DSN, + tracesSampleRate: 0, +}) + +export const wait = (ms: number): Promise => { + return new Promise((resolve) => { + setTimeout(resolve, ms) + }) +} + +function isExportRequest(body: any): body is ExportRequest { + return ( + 'integrationId' in body && 'syncAt' in body && 'integrationName' in body + ) +} + +export const exporter = Sentry.GCPFunction.wrapHttpFunction( + async (req, res) => { + const JWT_SECRET = process.env.JWT_SECRET + const REST_BACKEND_ENDPOINT = process.env.REST_BACKEND_ENDPOINT + + if (!JWT_SECRET || !REST_BACKEND_ENDPOINT) { + return res.status(500).send('Environment not configured correctly') + } + + const token = (req.query.token || req.headers.authorization) as string + if (!token) { + return res.status(401).send({ errorCode: 'INVALID_TOKEN' }) + } + + let claims: Claims + try { + claims = jwt.verify(token, JWT_SECRET) as Claims + } catch (e) { + console.error(e) + return res.status(401).send('UNAUTHORIZED') + } + + try { + if (!isExportRequest(req.body)) { + console.error('Invalid message') + return res.status(200).send('Bad Request') + } + + const { integrationId, syncAt, integrationName } = req.body + const client = getIntegrationClient(integrationName) + + // get paginated items from the backend + let hasMore = true + let after = '0' + while (hasMore) { + const response = await search( + REST_BACKEND_ENDPOINT, + claims.token, + client.highlightOnly, + new Date(syncAt), + '50', + after + ) + + if (!response) { + console.error('failed to search for items', { + integrationId, + }) + return res.status(400).send('Failed to search') + } + + hasMore = response.data.search.pageInfo.hasNextPage + after = response.data.search.pageInfo.endCursor + const items = response.data.search.edges.map((edge) => edge.node) + if (items.length === 0) { + break + } + + const synced = await client.export(claims.token, items) + if (!synced) { + console.error('failed to export item', { + integrationId, + }) + return res.status(400).send('Failed to sync') + } + + // update integration syncedAt if successful + const updated = await updateIntegration( + REST_BACKEND_ENDPOINT, + integrationId, + items[items.length - 1].updatedAt, + integrationName, + claims.token, + token + ) + + if (!updated) { + console.error('failed to update integration', { + integrationId, + }) + return res.status(400).send('Failed to update integration') + } + + // avoid rate limiting + await wait(500) + } + } catch (err) { + console.error('export with integration failed', err) + return res.status(500).send(err) + } + + res.sendStatus(200) + } +) diff --git a/packages/integration-handler/src/integrations/index.ts b/packages/integration-handler/src/integrations/index.ts new file mode 100644 index 000000000..f3aa66913 --- /dev/null +++ b/packages/integration-handler/src/integrations/index.ts @@ -0,0 +1,82 @@ +import axios from 'axios' +import { IntegrationClient } from './integration' +import { PocketClient } from './pocket' +import { ReadwiseClient } from './readwise' + +interface SetIntegrationResponse { + data: { + setIntegration: { + integration: { + id: string + } + errorCodes: string[] + } + } +} + +const clients: IntegrationClient[] = [new ReadwiseClient(), new PocketClient()] + +export const getIntegrationClient = (name: string): IntegrationClient => { + const client = clients.find((s) => s.name === name) + if (!client) { + throw new Error(`Integration client not found: ${name}`) + } + return client +} + +export const updateIntegration = async ( + apiEndpoint: string, + id: string, + syncedAt: Date, + name: string, + integrationToken: string, + token: string +): Promise => { + const requestData = JSON.stringify({ + query: ` + mutation SetIntegration($input: SetIntegrationInput!) { + setIntegration(input: $input) { + ... on SetIntegrationSuccess { + integration { + id + enabled + } + } + ... on SetIntegrationError { + errorCodes + } + } + }`, + variables: { + id, + syncedAt, + name, + token: integrationToken, + enabled: true, + }, + }) + + try { + const response = await axios.post( + `${apiEndpoint}/graphql`, + requestData, + { + headers: { + Cookie: `auth=${token};`, + 'Content-Type': 'application/json', + 'X-OmnivoreClient': 'integration-handler', + }, + } + ) + + if (response.data.data.setIntegration.errorCodes) { + console.error(response.data.data.setIntegration.errorCodes) + return false + } + + return true + } catch (error) { + console.error(error) + return false + } +} diff --git a/packages/integration-handler/src/integrations/integration.ts b/packages/integration-handler/src/integrations/integration.ts new file mode 100644 index 000000000..78b035db9 --- /dev/null +++ b/packages/integration-handler/src/integrations/integration.ts @@ -0,0 +1,33 @@ +import { Item } from '../item' + +export interface RetrievedData { + url: string + labels?: string[] + state?: string +} +export interface RetrievedResult { + data: RetrievedData[] + hasMore?: boolean + since?: number // unix timestamp in milliseconds +} + +export interface RetrieveRequest { + token: string + since?: number // unix timestamp in milliseconds + count?: number + offset?: number +} + +export abstract class IntegrationClient { + abstract name: string + abstract apiUrl: string + highlightOnly = true + + export = async (token: string, items: Item[]): Promise => { + return Promise.resolve(false) + } + + retrieve = async (req: RetrieveRequest): Promise => { + return Promise.resolve({ data: [] }) + } +} diff --git a/packages/integration-handler/src/integrations/pocket.ts b/packages/integration-handler/src/integrations/pocket.ts new file mode 100644 index 000000000..fa4003c4d --- /dev/null +++ b/packages/integration-handler/src/integrations/pocket.ts @@ -0,0 +1,132 @@ +import axios from 'axios' +import { + IntegrationClient, + RetrievedResult, + RetrieveRequest, +} from './integration' + +interface PocketResponse { + status: number // 1 if success + complete: number // 1 if all items have been returned + list: { + [key: string]: PocketItem + } + since: number // unix timestamp in seconds + search_meta: { + search_type: string + } + error: string +} + +interface PocketItem { + item_id: string + resolved_id: string + given_url: string + resolved_url: string + given_title: string + resolved_title: string + favorite: string + status: string + excerpt: string + word_count: string + tags?: { + [key: string]: Tag + } + authors?: { + [key: string]: Author + } +} + +interface Tag { + item_id: string + tag: string +} + +interface Author { + item_id: string + author_id: string + name: string +} + +export class PocketClient extends IntegrationClient { + name = 'POCKET' + apiUrl = 'https://getpocket.com/v3' + headers = { + 'Content-Type': 'application/json', + 'X-Accept': 'application/json', + } + + retrievePocketData = async ( + accessToken: string, + since: number, // unix timestamp in seconds + count = 100, + offset = 0 + ): Promise => { + const url = `${this.apiUrl}/get` + try { + const response = await axios.post( + url, + { + consumer_key: process.env.POCKET_CONSUMER_KEY, + access_token: accessToken, + state: 'all', + detailType: 'complete', + since, + sort: 'oldest', + count, + offset, + }, + { + headers: this.headers, + timeout: 10000, // 10 seconds + } + ) + + return response.data + } catch (error) { + console.error('error retrievePocketData: ', error) + + return null + } + } + + retrieve = async ({ + token, + since = 0, + count = 100, + offset = 0, + }: RetrieveRequest): Promise => { + const pocketData = await this.retrievePocketData( + token, + since / 1000, + count, + offset + ) + if (!pocketData) { + throw new Error('Error retrieving pocket data') + } + + const pocketItems = Object.values(pocketData.list) + const statusToState: Record = { + '0': 'SUCCEEDED', + '1': 'ARCHIVED', + '2': 'DELETED', + } + const data = pocketItems.map((item) => ({ + url: item.given_url, + labels: item.tags + ? Object.values(item.tags).map((tag) => tag.tag) + : undefined, + state: statusToState[item.status], + })) + + if (pocketData.error) { + throw new Error(`Error retrieving pocket data: ${pocketData.error}`) + } + + return { + data, + since: pocketData.since * 1000, + } + } +} diff --git a/packages/integration-handler/src/integrations/readwise.ts b/packages/integration-handler/src/integrations/readwise.ts new file mode 100644 index 000000000..95bd3503d --- /dev/null +++ b/packages/integration-handler/src/integrations/readwise.ts @@ -0,0 +1,114 @@ +import axios from 'axios' +import { wait } from '..' +import { highlightUrl, Item } from '../item' +import { IntegrationClient } from './integration' + +interface ReadwiseHighlight { + // The highlight text, (technically the only field required in a highlight object) + text: string + // The title of the page the highlight is on + title?: string + // The author of the page the highlight is on + author?: string + // The URL of the page image + image_url?: string + // The URL of the page + source_url?: string + // A meaningful unique identifier for your app + source_type?: string + // One of: books, articles, tweets or podcasts + category?: string + // Annotation note attached to the specific highlight + note?: string + // Highlight's location in the source text. Used to order the highlights + location?: number + // One of: page, order or time_offset + location_type?: string + // A datetime representing when the highlight was taken in the ISO 8601 format + highlighted_at?: string + // Unique url of the specific highlight + highlight_url?: string +} + +export class ReadwiseClient extends IntegrationClient { + name = 'READWISE' + apiUrl = 'https://readwise.io/api/v2' + + export = async (token: string, items: Item[]): Promise => { + let result = true + + const highlights = items.flatMap(this.itemToReadwiseHighlight) + + // If there are no highlights, we will skip the sync + if (highlights.length > 0) { + result = await this.syncWithReadwise(token, highlights) + } + + return result + } + + itemToReadwiseHighlight = (item: Item): ReadwiseHighlight[] => { + const category = item.siteName === 'Twitter' ? 'tweets' : 'articles' + return item.highlights + .map((highlight) => { + // filter out highlights that are not of type highlight or have no quote + if (highlight.type !== 'HIGHLIGHT' || !highlight.quote) { + return undefined + } + + return { + text: highlight.quote, + title: item.title, + author: item.author || undefined, + highlight_url: highlightUrl(item.slug, highlight.id), + highlighted_at: new Date(highlight.createdAt).toISOString(), + category, + image_url: item.image || undefined, + location_type: 'order', + note: highlight.annotation || undefined, + source_type: 'omnivore', + source_url: item.url, + } + }) + .filter((highlight) => highlight !== undefined) as ReadwiseHighlight[] + } + + syncWithReadwise = async ( + token: string, + highlights: ReadwiseHighlight[], + retryCount = 0 + ): Promise => { + const url = `${this.apiUrl}/highlights` + try { + const response = await axios.post( + url, + { + highlights, + }, + { + headers: { + Authorization: `Token ${token}`, + 'Content-Type': 'application/json', + }, + timeout: 5000, // 5 seconds + } + ) + return response.status === 200 + } catch (error) { + console.error(error) + + if (axios.isAxiosError(error)) { + if (error.response?.status === 429 && retryCount < 3) { + console.log('Readwise API rate limit exceeded, retrying...') + // wait for Retry-After seconds in the header if rate limited + // max retry count is 3 + const retryAfter = error.response?.headers['retry-after'] || '10' // default to 10 seconds + await wait(parseInt(retryAfter, 10) * 1000) + return this.syncWithReadwise(token, highlights, retryCount + 1) + } + } + + return false + } + } +} diff --git a/packages/integration-handler/src/item.ts b/packages/integration-handler/src/item.ts new file mode 100644 index 000000000..6d09b4451 --- /dev/null +++ b/packages/integration-handler/src/item.ts @@ -0,0 +1,114 @@ +import axios from 'axios' + +interface SearchResponse { + data: { + search: { + edges: Edge[] + pageInfo: { + hasNextPage: boolean + endCursor: string + } + } + } + errors?: { + message: string + }[] +} + +interface Edge { + node: Item +} + +export interface Item { + id: string + title: string + image: string | null + author: string | null + siteName: string | null + highlights: Highlight[] + slug: string + url: string + updatedAt: Date +} + +interface Highlight { + id: string + quote: string + annotation: string | null + type: string + createdAt: string +} + +export const search = async ( + apiEndpoint: string, + token: string, + highlightOnly: boolean, + updatedSince: Date, + first = '50', + after = '0' +): Promise => { + const query = `updated:${updatedSince.toISOString()} ${ + highlightOnly ? 'has:highlights' : '' + } sort:updated-asc` + + const requestData = JSON.stringify({ + query: `query Search($query: String) { + search(query: $query) { + ... on SearchSuccess { + edges { + node { + id + slug + labels { + id + } + isArchived + readingProgressPercent + title + image + author + siteName + highlights { + id + quote + annotation + type + createdAt + } + } + } + } + ... on SearchError { + errorCodes + } + } + }`, + variables: { + query, + first, + after, + }, + }) + + try { + const response = await axios.post( + `${apiEndpoint}/graphql`, + requestData, + { + headers: { + Cookie: `auth=${token};`, + 'Content-Type': 'application/json', + 'X-OmnivoreClient': 'integration-handler', + }, + } + ) + + return response.data + } catch (error) { + console.error(error) + return null + } +} + +export const highlightUrl = (slug: string, highlightId: string): string => + `https://omnivore.app/me/${slug}#${highlightId}` diff --git a/packages/integration-handler/test/babel-register.js b/packages/integration-handler/test/babel-register.js new file mode 100644 index 000000000..9e872798e --- /dev/null +++ b/packages/integration-handler/test/babel-register.js @@ -0,0 +1,3 @@ +const register = require('@babel/register').default; + +register({ extensions: ['.ts', '.tsx', '.js', '.jsx'] }); \ No newline at end of file diff --git a/packages/integration-handler/test/stub.test.ts b/packages/integration-handler/test/stub.test.ts new file mode 100644 index 000000000..24ad25c8f --- /dev/null +++ b/packages/integration-handler/test/stub.test.ts @@ -0,0 +1,8 @@ +import 'mocha' +import { expect } from 'chai' + +describe('stub test', () => { + it('should pass', () => { + expect(true).to.be.true + }) +}) diff --git a/packages/integration-handler/tsconfig.json b/packages/integration-handler/tsconfig.json new file mode 100644 index 000000000..ea2e20c34 --- /dev/null +++ b/packages/integration-handler/tsconfig.json @@ -0,0 +1,11 @@ +{ + "extends": "./../../tsconfig.json", + "ts-node": { "files": true }, + "compilerOptions": { + "outDir": "build", + "rootDir": ".", + // Generate d.ts files + "declaration": true + }, + "include": ["src/**/*", "test/**/*"] +}