create an integration handler for syncing with readwise

This commit is contained in:
Hongbo Wu
2023-10-26 18:38:39 +08:00
parent 47f67c237d
commit 5edba30e23
17 changed files with 699 additions and 0 deletions

View File

@ -2376,6 +2376,7 @@ export type SetIntegrationInput = {
enabled: Scalars['Boolean'];
id?: InputMaybe<Scalars['ID']>;
name: Scalars['String'];
syncedAt?: InputMaybe<Scalars['Date']>;
token: Scalars['String'];
type?: InputMaybe<IntegrationType>;
};

View File

@ -1835,6 +1835,7 @@ input SetIntegrationInput {
enabled: Boolean!
id: ID
name: String!
syncedAt: Date
token: String!
type: IntegrationType
}

View File

@ -45,6 +45,7 @@ export const setIntegrationResolver = authorized<
user: { id: uid },
id: input.id || undefined,
type: input.type || IntegrationType.Export,
syncedAt: input.syncedAt ? new Date(input.syncedAt) : undefined,
}
if (input.id) {
// Update

View File

@ -1978,6 +1978,7 @@ const schema = gql`
type: IntegrationType
token: String!
enabled: Boolean!
syncedAt: Date
}
union IntegrationsResult = IntegrationsSuccess | IntegrationsError

View File

@ -0,0 +1,4 @@
node_modules/
dist/
readabilityjs/
src/generated/

View File

@ -0,0 +1,6 @@
{
"extends": "../../.eslintrc",
"parserOptions": {
"project": "tsconfig.json"
}
}

View File

@ -0,0 +1,5 @@
{
"extension": ["ts"],
"spec": "test/**/*.test.ts",
"require": "test/babel-register.js"
}

View File

@ -0,0 +1,56 @@
{
"name": "@omnivore/integration-handler",
"version": "1.0.0",
"description": "",
"main": "build/src/index.js",
"files": [
"build/src"
],
"keywords": [],
"license": "Apache-2.0",
"scripts": {
"test": "yarn mocha -r ts-node/register --config mocha-config.json",
"lint": "eslint src --ext ts,js,tsx,jsx",
"compile": "tsc",
"build": "tsc",
"start_exporter": "functions-framework --target=exporter"
},
"devDependencies": {
"@types/chai": "^4.3.4",
"@types/chai-string": "^1.4.2",
"@types/dompurify": "^2.4.0",
"@types/fs-extra": "^11.0.1",
"@types/glob": "^8.0.1",
"@types/jsonwebtoken": "^8.5.0",
"@types/mocha": "^10.0.1",
"@types/node": "^14.11.2",
"@types/unzip-stream": "^0.3.1",
"@types/urlsafe-base64": "^1.0.28",
"@types/uuid": "^9.0.0",
"copyfiles": "^2.4.1",
"eslint-plugin-prettier": "^4.0.0"
},
"dependencies": {
"@fast-csv/parse": "^4.3.6",
"@google-cloud/functions-framework": "3.1.2",
"@google-cloud/storage": "^7.0.1",
"@google-cloud/tasks": "^4.0.0",
"@omnivore/readability": "1.0.0",
"@sentry/serverless": "^7.30.0",
"@types/express": "^4.17.13",
"axios": "^1.2.2",
"dompurify": "^2.4.3",
"fs-extra": "^11.1.0",
"glob": "^8.1.0",
"jsonwebtoken": "^8.5.1",
"linkedom": "^0.14.21",
"nodemon": "^2.0.15",
"redis": "^4.3.1",
"unzip-stream": "^0.3.1",
"urlsafe-base64": "^1.0.0",
"uuid": "^9.0.0"
},
"volta": {
"extends": "../../package.json"
}
}

View File

@ -0,0 +1,127 @@
import * as Sentry from '@sentry/serverless'
import * as jwt from 'jsonwebtoken'
import { getIntegrationClient, updateIntegration } from './integrations'
import { search } from './item'
interface ExportRequest {
integrationId: string
syncAt: number // unix timestamp in milliseconds
integrationName: string
}
interface Claims {
uid: string
token: string
}
Sentry.GCPFunction.init({
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 0,
})
export const wait = (ms: number): Promise<void> => {
return new Promise((resolve) => {
setTimeout(resolve, ms)
})
}
function isExportRequest(body: any): body is ExportRequest {
return (
'integrationId' in body && 'syncAt' in body && 'integrationName' in body
)
}
export const exporter = Sentry.GCPFunction.wrapHttpFunction(
async (req, res) => {
const JWT_SECRET = process.env.JWT_SECRET
const REST_BACKEND_ENDPOINT = process.env.REST_BACKEND_ENDPOINT
if (!JWT_SECRET || !REST_BACKEND_ENDPOINT) {
return res.status(500).send('Environment not configured correctly')
}
const token = (req.query.token || req.headers.authorization) as string
if (!token) {
return res.status(401).send({ errorCode: 'INVALID_TOKEN' })
}
let claims: Claims
try {
claims = jwt.verify(token, JWT_SECRET) as Claims
} catch (e) {
console.error(e)
return res.status(401).send('UNAUTHORIZED')
}
try {
if (!isExportRequest(req.body)) {
console.error('Invalid message')
return res.status(200).send('Bad Request')
}
const { integrationId, syncAt, integrationName } = req.body
const client = getIntegrationClient(integrationName)
// get paginated items from the backend
let hasMore = true
let after = '0'
while (hasMore) {
const response = await search(
REST_BACKEND_ENDPOINT,
claims.token,
client.highlightOnly,
new Date(syncAt),
'50',
after
)
if (!response) {
console.error('failed to search for items', {
integrationId,
})
return res.status(400).send('Failed to search')
}
hasMore = response.data.search.pageInfo.hasNextPage
after = response.data.search.pageInfo.endCursor
const items = response.data.search.edges.map((edge) => edge.node)
if (items.length === 0) {
break
}
const synced = await client.export(claims.token, items)
if (!synced) {
console.error('failed to export item', {
integrationId,
})
return res.status(400).send('Failed to sync')
}
// update integration syncedAt if successful
const updated = await updateIntegration(
REST_BACKEND_ENDPOINT,
integrationId,
items[items.length - 1].updatedAt,
integrationName,
claims.token,
token
)
if (!updated) {
console.error('failed to update integration', {
integrationId,
})
return res.status(400).send('Failed to update integration')
}
// avoid rate limiting
await wait(500)
}
} catch (err) {
console.error('export with integration failed', err)
return res.status(500).send(err)
}
res.sendStatus(200)
}
)

View File

@ -0,0 +1,82 @@
import axios from 'axios'
import { IntegrationClient } from './integration'
import { PocketClient } from './pocket'
import { ReadwiseClient } from './readwise'
interface SetIntegrationResponse {
data: {
setIntegration: {
integration: {
id: string
}
errorCodes: string[]
}
}
}
const clients: IntegrationClient[] = [new ReadwiseClient(), new PocketClient()]
export const getIntegrationClient = (name: string): IntegrationClient => {
const client = clients.find((s) => s.name === name)
if (!client) {
throw new Error(`Integration client not found: ${name}`)
}
return client
}
export const updateIntegration = async (
apiEndpoint: string,
id: string,
syncedAt: Date,
name: string,
integrationToken: string,
token: string
): Promise<boolean> => {
const requestData = JSON.stringify({
query: `
mutation SetIntegration($input: SetIntegrationInput!) {
setIntegration(input: $input) {
... on SetIntegrationSuccess {
integration {
id
enabled
}
}
... on SetIntegrationError {
errorCodes
}
}
}`,
variables: {
id,
syncedAt,
name,
token: integrationToken,
enabled: true,
},
})
try {
const response = await axios.post<SetIntegrationResponse>(
`${apiEndpoint}/graphql`,
requestData,
{
headers: {
Cookie: `auth=${token};`,
'Content-Type': 'application/json',
'X-OmnivoreClient': 'integration-handler',
},
}
)
if (response.data.data.setIntegration.errorCodes) {
console.error(response.data.data.setIntegration.errorCodes)
return false
}
return true
} catch (error) {
console.error(error)
return false
}
}

View File

@ -0,0 +1,33 @@
import { Item } from '../item'
export interface RetrievedData {
url: string
labels?: string[]
state?: string
}
export interface RetrievedResult {
data: RetrievedData[]
hasMore?: boolean
since?: number // unix timestamp in milliseconds
}
export interface RetrieveRequest {
token: string
since?: number // unix timestamp in milliseconds
count?: number
offset?: number
}
export abstract class IntegrationClient {
abstract name: string
abstract apiUrl: string
highlightOnly = true
export = async (token: string, items: Item[]): Promise<boolean> => {
return Promise.resolve(false)
}
retrieve = async (req: RetrieveRequest): Promise<RetrievedResult> => {
return Promise.resolve({ data: [] })
}
}

View File

@ -0,0 +1,132 @@
import axios from 'axios'
import {
IntegrationClient,
RetrievedResult,
RetrieveRequest,
} from './integration'
interface PocketResponse {
status: number // 1 if success
complete: number // 1 if all items have been returned
list: {
[key: string]: PocketItem
}
since: number // unix timestamp in seconds
search_meta: {
search_type: string
}
error: string
}
interface PocketItem {
item_id: string
resolved_id: string
given_url: string
resolved_url: string
given_title: string
resolved_title: string
favorite: string
status: string
excerpt: string
word_count: string
tags?: {
[key: string]: Tag
}
authors?: {
[key: string]: Author
}
}
interface Tag {
item_id: string
tag: string
}
interface Author {
item_id: string
author_id: string
name: string
}
export class PocketClient extends IntegrationClient {
name = 'POCKET'
apiUrl = 'https://getpocket.com/v3'
headers = {
'Content-Type': 'application/json',
'X-Accept': 'application/json',
}
retrievePocketData = async (
accessToken: string,
since: number, // unix timestamp in seconds
count = 100,
offset = 0
): Promise<PocketResponse | null> => {
const url = `${this.apiUrl}/get`
try {
const response = await axios.post<PocketResponse>(
url,
{
consumer_key: process.env.POCKET_CONSUMER_KEY,
access_token: accessToken,
state: 'all',
detailType: 'complete',
since,
sort: 'oldest',
count,
offset,
},
{
headers: this.headers,
timeout: 10000, // 10 seconds
}
)
return response.data
} catch (error) {
console.error('error retrievePocketData: ', error)
return null
}
}
retrieve = async ({
token,
since = 0,
count = 100,
offset = 0,
}: RetrieveRequest): Promise<RetrievedResult> => {
const pocketData = await this.retrievePocketData(
token,
since / 1000,
count,
offset
)
if (!pocketData) {
throw new Error('Error retrieving pocket data')
}
const pocketItems = Object.values(pocketData.list)
const statusToState: Record<string, string> = {
'0': 'SUCCEEDED',
'1': 'ARCHIVED',
'2': 'DELETED',
}
const data = pocketItems.map((item) => ({
url: item.given_url,
labels: item.tags
? Object.values(item.tags).map((tag) => tag.tag)
: undefined,
state: statusToState[item.status],
}))
if (pocketData.error) {
throw new Error(`Error retrieving pocket data: ${pocketData.error}`)
}
return {
data,
since: pocketData.since * 1000,
}
}
}

View File

@ -0,0 +1,114 @@
import axios from 'axios'
import { wait } from '..'
import { highlightUrl, Item } from '../item'
import { IntegrationClient } from './integration'
interface ReadwiseHighlight {
// The highlight text, (technically the only field required in a highlight object)
text: string
// The title of the page the highlight is on
title?: string
// The author of the page the highlight is on
author?: string
// The URL of the page image
image_url?: string
// The URL of the page
source_url?: string
// A meaningful unique identifier for your app
source_type?: string
// One of: books, articles, tweets or podcasts
category?: string
// Annotation note attached to the specific highlight
note?: string
// Highlight's location in the source text. Used to order the highlights
location?: number
// One of: page, order or time_offset
location_type?: string
// A datetime representing when the highlight was taken in the ISO 8601 format
highlighted_at?: string
// Unique url of the specific highlight
highlight_url?: string
}
export class ReadwiseClient extends IntegrationClient {
name = 'READWISE'
apiUrl = 'https://readwise.io/api/v2'
export = async (token: string, items: Item[]): Promise<boolean> => {
let result = true
const highlights = items.flatMap(this.itemToReadwiseHighlight)
// If there are no highlights, we will skip the sync
if (highlights.length > 0) {
result = await this.syncWithReadwise(token, highlights)
}
return result
}
itemToReadwiseHighlight = (item: Item): ReadwiseHighlight[] => {
const category = item.siteName === 'Twitter' ? 'tweets' : 'articles'
return item.highlights
.map((highlight) => {
// filter out highlights that are not of type highlight or have no quote
if (highlight.type !== 'HIGHLIGHT' || !highlight.quote) {
return undefined
}
return {
text: highlight.quote,
title: item.title,
author: item.author || undefined,
highlight_url: highlightUrl(item.slug, highlight.id),
highlighted_at: new Date(highlight.createdAt).toISOString(),
category,
image_url: item.image || undefined,
location_type: 'order',
note: highlight.annotation || undefined,
source_type: 'omnivore',
source_url: item.url,
}
})
.filter((highlight) => highlight !== undefined) as ReadwiseHighlight[]
}
syncWithReadwise = async (
token: string,
highlights: ReadwiseHighlight[],
retryCount = 0
): Promise<boolean> => {
const url = `${this.apiUrl}/highlights`
try {
const response = await axios.post(
url,
{
highlights,
},
{
headers: {
Authorization: `Token ${token}`,
'Content-Type': 'application/json',
},
timeout: 5000, // 5 seconds
}
)
return response.status === 200
} catch (error) {
console.error(error)
if (axios.isAxiosError(error)) {
if (error.response?.status === 429 && retryCount < 3) {
console.log('Readwise API rate limit exceeded, retrying...')
// wait for Retry-After seconds in the header if rate limited
// max retry count is 3
const retryAfter = error.response?.headers['retry-after'] || '10' // default to 10 seconds
await wait(parseInt(retryAfter, 10) * 1000)
return this.syncWithReadwise(token, highlights, retryCount + 1)
}
}
return false
}
}
}

View File

@ -0,0 +1,114 @@
import axios from 'axios'
interface SearchResponse {
data: {
search: {
edges: Edge[]
pageInfo: {
hasNextPage: boolean
endCursor: string
}
}
}
errors?: {
message: string
}[]
}
interface Edge {
node: Item
}
export interface Item {
id: string
title: string
image: string | null
author: string | null
siteName: string | null
highlights: Highlight[]
slug: string
url: string
updatedAt: Date
}
interface Highlight {
id: string
quote: string
annotation: string | null
type: string
createdAt: string
}
export const search = async (
apiEndpoint: string,
token: string,
highlightOnly: boolean,
updatedSince: Date,
first = '50',
after = '0'
): Promise<SearchResponse | null> => {
const query = `updated:${updatedSince.toISOString()} ${
highlightOnly ? 'has:highlights' : ''
} sort:updated-asc`
const requestData = JSON.stringify({
query: `query Search($query: String) {
search(query: $query) {
... on SearchSuccess {
edges {
node {
id
slug
labels {
id
}
isArchived
readingProgressPercent
title
image
author
siteName
highlights {
id
quote
annotation
type
createdAt
}
}
}
}
... on SearchError {
errorCodes
}
}
}`,
variables: {
query,
first,
after,
},
})
try {
const response = await axios.post<SearchResponse>(
`${apiEndpoint}/graphql`,
requestData,
{
headers: {
Cookie: `auth=${token};`,
'Content-Type': 'application/json',
'X-OmnivoreClient': 'integration-handler',
},
}
)
return response.data
} catch (error) {
console.error(error)
return null
}
}
export const highlightUrl = (slug: string, highlightId: string): string =>
`https://omnivore.app/me/${slug}#${highlightId}`

View File

@ -0,0 +1,3 @@
const register = require('@babel/register').default;
register({ extensions: ['.ts', '.tsx', '.js', '.jsx'] });

View File

@ -0,0 +1,8 @@
import 'mocha'
import { expect } from 'chai'
describe('stub test', () => {
it('should pass', () => {
expect(true).to.be.true
})
})

View File

@ -0,0 +1,11 @@
{
"extends": "./../../tsconfig.json",
"ts-node": { "files": true },
"compilerOptions": {
"outDir": "build",
"rootDir": ".",
// Generate d.ts files
"declaration": true
},
"include": ["src/**/*", "test/**/*"]
}