From 34db7bb2d8de0ccfbde752b65cd60fa5096a1bf4 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Tue, 20 Feb 2024 22:42:06 +0800 Subject: [PATCH 1/6] create a job for exporting item to integrations --- packages/api/src/jobs/export_item.ts | 72 ++++++++++++ .../src/services/integrations/integration.ts | 4 +- .../api/src/services/integrations/pocket.ts | 4 + .../api/src/services/integrations/readwise.ts | 107 ++++++++++++++++++ packages/api/src/utils/helpers.ts | 4 + 5 files changed, 190 insertions(+), 1 deletion(-) create mode 100644 packages/api/src/jobs/export_item.ts diff --git a/packages/api/src/jobs/export_item.ts b/packages/api/src/jobs/export_item.ts new file mode 100644 index 000000000..add1f1061 --- /dev/null +++ b/packages/api/src/jobs/export_item.ts @@ -0,0 +1,72 @@ +import { IntegrationType } from '../entity/integration' +import { LibraryItem } from '../entity/library_item' +import { + findIntegrations, + getIntegrationClient, + updateIntegration, +} from '../services/integrations' +import { logger } from '../utils/logger' + +export interface ExportItemJobData { + userId: string + libraryItem: LibraryItem +} + +export const EXPORT_ITEM_JOB_NAME = 'export-item' + +export const exportItem = async (jobData: ExportItemJobData) => { + const { libraryItem, userId } = jobData + const integrations = await findIntegrations(userId, { + enabled: true, + type: IntegrationType.Export, + }) + + if (integrations.length <= 0) { + return + } + + await Promise.all( + integrations.map(async (integration) => { + const logObject = { + userId, + libraryItemId: libraryItem.id, + integrationId: integration.id, + } + logger.info('exporting item...', logObject) + + try { + const client = getIntegrationClient(integration.name) + + const synced = await client.export(integration.token, [libraryItem]) + if (!synced) { + logger.error('failed to export item', logObject) + return Promise.resolve(false) + } + + const lastItemUpdatedAt = libraryItem.updatedAt + logger.info('updating integration...', { + ...logObject, + syncedAt: lastItemUpdatedAt, + }) + + // update integration syncedAt if successful + const updated = await updateIntegration( + integration.id, + { + syncedAt: lastItemUpdatedAt, + }, + userId + ) + logger.info('integration updated', { + ...logObject, + updated, + }) + + return Promise.resolve(true) + } catch (err) { + logger.error('export with integration failed', err) + return Promise.resolve(false) + } + }) + ) +} diff --git a/packages/api/src/services/integrations/integration.ts b/packages/api/src/services/integrations/integration.ts index 6aa51a49f..e3f1edbc8 100644 --- a/packages/api/src/services/integrations/integration.ts +++ b/packages/api/src/services/integrations/integration.ts @@ -1,4 +1,4 @@ -import { LibraryItemState } from '../../entity/library_item' +import { LibraryItem, LibraryItemState } from '../../entity/library_item' export interface RetrievedData { url: string @@ -23,4 +23,6 @@ export interface IntegrationClient { apiUrl: string accessToken(token: string): Promise + + export(token: string, items: LibraryItem[]): Promise } diff --git a/packages/api/src/services/integrations/pocket.ts b/packages/api/src/services/integrations/pocket.ts index 1ea4d6d87..517d3befa 100644 --- a/packages/api/src/services/integrations/pocket.ts +++ b/packages/api/src/services/integrations/pocket.ts @@ -35,4 +35,8 @@ export class PocketClient implements IntegrationClient { return null } } + + export = async (): Promise => { + return Promise.resolve(false) + } } diff --git a/packages/api/src/services/integrations/readwise.ts b/packages/api/src/services/integrations/readwise.ts index 7764294d2..0b0b6be0c 100644 --- a/packages/api/src/services/integrations/readwise.ts +++ b/packages/api/src/services/integrations/readwise.ts @@ -1,7 +1,36 @@ import axios from 'axios' +import { LibraryItem } from '../../entity/library_item' +import { highlightUrl, wait } from '../../utils/helpers' import { logger } from '../../utils/logger' import { IntegrationClient } from './integration' +interface ReadwiseHighlight { + // The highlight text, (technically the only field required in a highlight object) + text: string + // The title of the page the highlight is on + title?: string + // The author of the page the highlight is on + author?: string + // The URL of the page image + image_url?: string + // The URL of the page + source_url?: string + // A meaningful unique identifier for your app + source_type?: string + // One of: books, articles, tweets or podcasts + category?: string + // Annotation note attached to the specific highlight + note?: string + // Highlight's location in the source text. Used to order the highlights + location?: number + // One of: page, order or time_offset + location_type?: string + // A datetime representing when the highlight was taken in the ISO 8601 format + highlighted_at?: string + // Unique url of the specific highlight + highlight_url?: string +} + export class ReadwiseClient implements IntegrationClient { name = 'READWISE' apiUrl = 'https://readwise.io/api/v2' @@ -24,4 +53,82 @@ export class ReadwiseClient implements IntegrationClient { return null } } + + export = async (token: string, items: LibraryItem[]): Promise => { + let result = true + + const highlights = items.flatMap(this.itemToReadwiseHighlight) + + // If there are no highlights, we will skip the sync + if (highlights.length > 0) { + result = await this.syncWithReadwise(token, highlights) + } + + return result + } + + itemToReadwiseHighlight = (item: LibraryItem): ReadwiseHighlight[] => { + const category = item.siteName === 'Twitter' ? 'tweets' : 'articles' + return item.highlights + ?.map((highlight) => { + // filter out highlights that are not of type highlight or have no quote + if (highlight.highlightType !== 'HIGHLIGHT' || !highlight.quote) { + return undefined + } + + return { + text: highlight.quote, + title: item.title, + author: item.author || undefined, + highlight_url: highlightUrl(item.slug, highlight.id), + highlighted_at: new Date(highlight.createdAt).toISOString(), + category, + image_url: item.thumbnail || undefined, + location_type: 'order', + note: highlight.annotation || undefined, + source_type: 'omnivore', + source_url: item.originalUrl, + } + }) + .filter((highlight) => highlight !== undefined) as ReadwiseHighlight[] + } + + syncWithReadwise = async ( + token: string, + highlights: ReadwiseHighlight[], + retryCount = 0 + ): Promise => { + const url = `${this.apiUrl}/highlights` + try { + const response = await axios.post( + url, + { + highlights, + }, + { + headers: { + Authorization: `Token ${token}`, + 'Content-Type': 'application/json', + }, + timeout: 5000, // 5 seconds + } + ) + return response.status === 200 + } catch (error) { + console.error(error) + + if (axios.isAxiosError(error)) { + if (error.response?.status === 429 && retryCount < 3) { + console.log('Readwise API rate limit exceeded, retrying...') + // wait for Retry-After seconds in the header if rate limited + // max retry count is 3 + const retryAfter = error.response?.headers['retry-after'] || '10' // default to 10 seconds + await wait(parseInt(retryAfter, 10) * 1000) + return this.syncWithReadwise(token, highlights, retryCount + 1) + } + } + + return false + } + } } diff --git a/packages/api/src/utils/helpers.ts b/packages/api/src/utils/helpers.ts index c53b1f630..e76a9875a 100644 --- a/packages/api/src/utils/helpers.ts +++ b/packages/api/src/utils/helpers.ts @@ -10,6 +10,7 @@ import { Highlight as HighlightData } from '../entity/highlight' import { LibraryItem, LibraryItemState } from '../entity/library_item' import { Recommendation as RecommendationData } from '../entity/recommendation' import { RegistrationType, User } from '../entity/user' +import { env } from '../env' import { Article, ArticleSavingRequest, @@ -400,3 +401,6 @@ export const setRecentlySavedItemInRedis = async ( }) } } + +export const highlightUrl = (slug: string, highlightId: string): string => + `${env.client.url}/me/${slug}#${highlightId}` From 8b848912c91dbde7e4d7a30c6347b69a49c79b0d Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Thu, 22 Feb 2024 12:03:58 +0800 Subject: [PATCH 2/6] fix tests --- packages/api/test/global-teardown.ts | 2 +- packages/api/test/gql/sanitize-directive.test.ts | 2 +- packages/api/test/routers/emails.test.ts | 4 ++-- packages/api/test/routers/pages.test.ts | 2 +- .../api/test/services/create_page_save_request.test.ts | 2 +- packages/api/test/services/highlights.test.ts | 6 +++--- packages/api/test/util.ts | 6 +++--- packages/api/test/utils/helper.test.ts | 2 +- packages/api/test/utils/parser.test.ts | 10 +++++----- packages/api/tsconfig.json | 8 ++------ 10 files changed, 20 insertions(+), 24 deletions(-) diff --git a/packages/api/test/global-teardown.ts b/packages/api/test/global-teardown.ts index 45b0760b6..922d5db46 100644 --- a/packages/api/test/global-teardown.ts +++ b/packages/api/test/global-teardown.ts @@ -15,7 +15,7 @@ export const mochaGlobalTeardown = async () => { console.log('redis connection closed') if (redisDataSource.workerRedisClient) { - stopWorker() + await stopWorker() console.log('worker closed') } } diff --git a/packages/api/test/gql/sanitize-directive.test.ts b/packages/api/test/gql/sanitize-directive.test.ts index 13a7c2119..a5f799635 100644 --- a/packages/api/test/gql/sanitize-directive.test.ts +++ b/packages/api/test/gql/sanitize-directive.test.ts @@ -26,7 +26,7 @@ describe('Sanitize Directive', () => { }) describe('Update user with a bio that is too long', () => { - let bio = ''.padStart(500, '*') + const bio = ''.padStart(500, '*') let query: string beforeEach(() => { diff --git a/packages/api/test/routers/emails.test.ts b/packages/api/test/routers/emails.test.ts index 18a181848..316a411ca 100644 --- a/packages/api/test/routers/emails.test.ts +++ b/packages/api/test/routers/emails.test.ts @@ -30,7 +30,7 @@ describe('Emails Router', () => { user = await createTestUser('fakeUser') newsletterEmail = await createNewsletterEmail(user.id) - token = process.env.PUBSUB_VERIFICATION_TOKEN! + token = process.env.PUBSUB_VERIFICATION_TOKEN || '' receivedEmail = await saveReceivedEmail( from, newsletterEmail.address, @@ -52,7 +52,7 @@ describe('Emails Router', () => { describe('forward', () => { const html = 'test html' - beforeEach(async () => { + beforeEach(() => { sinon.replace( sendNotification, 'sendMulticastPushNotifications', diff --git a/packages/api/test/routers/pages.test.ts b/packages/api/test/routers/pages.test.ts index a78da8f87..99e130991 100644 --- a/packages/api/test/routers/pages.test.ts +++ b/packages/api/test/routers/pages.test.ts @@ -1,5 +1,5 @@ -import { request } from '../util' import 'mocha' +import { request } from '../util' describe('Upload Router', () => { const token = process.env.PUBSUB_VERIFICATION_TOKEN || '' diff --git a/packages/api/test/services/create_page_save_request.test.ts b/packages/api/test/services/create_page_save_request.test.ts index 0164e20c4..39d99c265 100644 --- a/packages/api/test/services/create_page_save_request.test.ts +++ b/packages/api/test/services/create_page_save_request.test.ts @@ -1,5 +1,5 @@ -import 'mocha' import { expect } from 'chai' +import 'mocha' import { validateUrl } from '../../src/services/create_page_save_request' describe('validateUrl', () => { diff --git a/packages/api/test/services/highlights.test.ts b/packages/api/test/services/highlights.test.ts index 1a6a7221e..3bbd27bd3 100644 --- a/packages/api/test/services/highlights.test.ts +++ b/packages/api/test/services/highlights.test.ts @@ -1,12 +1,12 @@ -import 'mocha' import { expect } from 'chai' +import 'mocha' import { getHighlightLocation } from '../../src/services/highlights' describe('getHighlightLocation', () => { let patch: string let location: number - before(async () => { + before(() => { location = 109 patch = `@@ -${location + 1},16 +${location + 1},36 @@ . We're @@ -18,7 +18,7 @@ describe('getHighlightLocation', () => { coming` }) - it('returns highlight location from patch', async () => { + it('returns highlight location from patch', () => { const result = getHighlightLocation(patch) expect(result).to.eql(location) }) diff --git a/packages/api/test/util.ts b/packages/api/test/util.ts index 2cae36704..6717fdbe1 100644 --- a/packages/api/test/util.ts +++ b/packages/api/test/util.ts @@ -20,7 +20,7 @@ export const stopApolloServer = async () => { await apollo.stop() } -export const startWorker = async (connection: ConnectionOptions) => { +export const startWorker = (connection: ConnectionOptions) => { worker = createWorker(connection) queueEvents = new QueueEvents(QUEUE_NAME, { connection, @@ -28,8 +28,8 @@ export const startWorker = async (connection: ConnectionOptions) => { } export const stopWorker = async () => { - queueEvents.close() - worker.close() + await queueEvents.close() + await worker.close() } export const waitUntilJobsDone = async (jobs: Job[]) => { diff --git a/packages/api/test/utils/helper.test.ts b/packages/api/test/utils/helper.test.ts index 9cb5796f0..766f20834 100644 --- a/packages/api/test/utils/helper.test.ts +++ b/packages/api/test/utils/helper.test.ts @@ -1,5 +1,5 @@ -import 'mocha' import { expect } from 'chai' +import 'mocha' import { validatedDate } from '../../src/utils/helpers' describe('validatedDate', () => { diff --git a/packages/api/test/utils/parser.test.ts b/packages/api/test/utils/parser.test.ts index 939a1d17c..b981b13bb 100644 --- a/packages/api/test/utils/parser.test.ts +++ b/packages/api/test/utils/parser.test.ts @@ -1,7 +1,11 @@ -import 'mocha' import * as chai from 'chai' import { expect } from 'chai' +import chaiAsPromised from 'chai-as-promised' import fs from 'fs' +import 'mocha' +import nock from 'nock' +import { User } from '../../src/entity/user' +import { deleteUser } from '../../src/services/user' import { getTitleFromEmailSubject, isProbablyArticle, @@ -9,11 +13,7 @@ import { parsePageMetadata, parsePreparedContent, } from '../../src/utils/parser' -import nock from 'nock' -import chaiAsPromised from 'chai-as-promised' -import { User } from '../../src/entity/user' import { createTestUser } from '../db' -import { deleteUser } from '../../src/services/user' chai.use(chaiAsPromised) diff --git a/packages/api/tsconfig.json b/packages/api/tsconfig.json index ffa966215..ee1fb782a 100644 --- a/packages/api/tsconfig.json +++ b/packages/api/tsconfig.json @@ -6,10 +6,6 @@ "compilerOptions": { "outDir": "dist" }, - "include": [ - "src", - "test", - "../integration-handler/test/integrations.test.ts" - ], - "exclude": ["./src/generated", "./test"] + "include": ["src", "test"], + "exclude": ["./src/generated"] } From f94267ee1a9510776e1e2623dd76a7fe9b59e15d Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Thu, 22 Feb 2024 13:38:51 +0800 Subject: [PATCH 3/6] enqueue export job --- packages/api/src/jobs/export_item.ts | 17 ++++++--- packages/api/src/pubsub.ts | 44 ++++++++++++++++++----- packages/api/src/queue-processor.ts | 3 ++ packages/api/src/services/highlights.ts | 2 +- packages/api/src/services/labels.ts | 23 +++++++----- packages/api/src/services/library_item.ts | 2 ++ packages/api/src/utils/createTask.ts | 14 ++++++++ 7 files changed, 82 insertions(+), 23 deletions(-) diff --git a/packages/api/src/jobs/export_item.ts b/packages/api/src/jobs/export_item.ts index add1f1061..b2e13ea35 100644 --- a/packages/api/src/jobs/export_item.ts +++ b/packages/api/src/jobs/export_item.ts @@ -1,21 +1,30 @@ import { IntegrationType } from '../entity/integration' -import { LibraryItem } from '../entity/library_item' import { findIntegrations, getIntegrationClient, updateIntegration, } from '../services/integrations' +import { findLibraryItemById } from '../services/library_item' import { logger } from '../utils/logger' export interface ExportItemJobData { userId: string - libraryItem: LibraryItem + libraryItemId: string } export const EXPORT_ITEM_JOB_NAME = 'export-item' export const exportItem = async (jobData: ExportItemJobData) => { - const { libraryItem, userId } = jobData + const { libraryItemId, userId } = jobData + const libraryItem = await findLibraryItemById(libraryItemId, userId) + if (!libraryItem) { + logger.error('library item not found', { + userId, + libraryItemId, + }) + return + } + const integrations = await findIntegrations(userId, { enabled: true, type: IntegrationType.Export, @@ -29,7 +38,7 @@ export const exportItem = async (jobData: ExportItemJobData) => { integrations.map(async (integration) => { const logObject = { userId, - libraryItemId: libraryItem.id, + libraryItemId, integrationId: integration.id, } logger.info('exporting item...', logObject) diff --git a/packages/api/src/pubsub.ts b/packages/api/src/pubsub.ts index 2f5abf639..723fb7947 100644 --- a/packages/api/src/pubsub.ts +++ b/packages/api/src/pubsub.ts @@ -3,7 +3,12 @@ import express from 'express' import { RuleEventType } from './entity/rule' import { env } from './env' import { ReportType } from './generated/graphql' -import { enqueueTriggerRuleJob, enqueueWebhookJob } from './utils/createTask' +import { Merge } from './util' +import { + enqueueExportItem, + enqueueTriggerRuleJob, + enqueueWebhookJob, +} from './utils/createTask' import { deepDelete } from './utils/helpers' import { buildLogger } from './utils/logger' @@ -11,6 +16,8 @@ const logger = buildLogger('pubsub') const client = new PubSub() +type EntityData = Merge + export const createPubSubClient = (): PubsubClient => { const fieldsToDelete = ['user'] as const @@ -45,12 +52,12 @@ export const createPubSubClient = (): PubsubClient => { }, entityCreated: async ( type: EntityType, - data: T, + data: EntityData, userId: string ): Promise => { + const libraryItemId = data.libraryItemId // queue trigger rule job if (type === EntityType.PAGE) { - const libraryItemId = (data as T & { id: string }).id await enqueueTriggerRuleJob({ userId, ruleEventType: RuleEventType.PageCreated, @@ -58,8 +65,13 @@ export const createPubSubClient = (): PubsubClient => { }) } + await enqueueExportItem({ + userId, + libraryItemId, + }) + const cleanData = deepDelete( - data as T & Record, + data as EntityData & Record, [...fieldsToDelete] ) @@ -77,12 +89,13 @@ export const createPubSubClient = (): PubsubClient => { }, entityUpdated: async ( type: EntityType, - data: T, + data: EntityData, userId: string ): Promise => { + const libraryItemId = data.libraryItemId + // queue trigger rule job if (type === EntityType.PAGE) { - const libraryItemId = (data as T & { id: string }).id await enqueueTriggerRuleJob({ userId, ruleEventType: RuleEventType.PageUpdated, @@ -90,8 +103,13 @@ export const createPubSubClient = (): PubsubClient => { }) } + await enqueueExportItem({ + userId, + libraryItemId, + }) + const cleanData = deepDelete( - data as T & Record, + data as EntityData & Record, [...fieldsToDelete] ) @@ -146,8 +164,16 @@ export interface PubsubClient { name: string, username: string ) => Promise - entityCreated: (type: EntityType, data: T, userId: string) => Promise - entityUpdated: (type: EntityType, data: T, userId: string) => Promise + entityCreated: ( + type: EntityType, + data: EntityData, + userId: string + ) => Promise + entityUpdated: ( + type: EntityType, + data: EntityData, + userId: string + ) => Promise entityDeleted: (type: EntityType, id: string, userId: string) => Promise reportSubmitted( submitterId: string | undefined, diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index 8d0a9c7f4..f1b5908f9 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -15,6 +15,7 @@ import { appDataSource } from './data_source' import { env } from './env' import { bulkAction, BULK_ACTION_JOB_NAME } from './jobs/bulk_action' import { callWebhook, CALL_WEBHOOK_JOB_NAME } from './jobs/call_webhook' +import { exportItem, EXPORT_ITEM_JOB_NAME } from './jobs/export_item' import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail' import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds' import { refreshFeed } from './jobs/rss/refreshFeed' @@ -103,6 +104,8 @@ export const createWorker = (connection: ConnectionOptions) => return bulkAction(job.data) case CALL_WEBHOOK_JOB_NAME: return callWebhook(job.data) + case EXPORT_ITEM_JOB_NAME: + return exportItem(job.data) } }, { diff --git a/packages/api/src/services/highlights.ts b/packages/api/src/services/highlights.ts index ac6b08f0c..5883fb7cb 100644 --- a/packages/api/src/services/highlights.ts +++ b/packages/api/src/services/highlights.ts @@ -139,7 +139,7 @@ export const updateHighlight = async ( const libraryItemId = updatedHighlight.libraryItem.id await pubsub.entityUpdated( EntityType.HIGHLIGHT, - { ...highlight, id: highlightId, pageId: libraryItemId }, + { ...highlight, id: highlightId, pageId: libraryItemId, libraryItemId }, userId ) diff --git a/packages/api/src/services/labels.ts b/packages/api/src/services/labels.ts index 1bf770a6d..285338745 100644 --- a/packages/api/src/services/labels.ts +++ b/packages/api/src/services/labels.ts @@ -6,15 +6,18 @@ import { createPubSubClient, EntityType, PubsubClient } from '../pubsub' import { authTrx } from '../repository' import { CreateLabelInput, labelRepository } from '../repository/label' import { bulkEnqueueUpdateLabels } from '../utils/createTask' +import { logger } from '../utils/logger' import { findHighlightById } from './highlights' import { findLibraryItemIdsByLabelId } from './library_item' type AddLabelsToLibraryItemEvent = { + libraryItemId: string pageId: string labels: DeepPartial