diff --git a/packages/api/src/jobs/integration/export_all_items.ts b/packages/api/src/jobs/integration/export_all_items.ts new file mode 100644 index 000000000..fb29b20fb --- /dev/null +++ b/packages/api/src/jobs/integration/export_all_items.ts @@ -0,0 +1,69 @@ +import { IntegrationType } from '../../entity/integration' +import { findIntegration } from '../../services/integrations' +import { searchLibraryItems } from '../../services/library_item' +import { findActiveUser } from '../../services/user' +import { enqueueExportItem } from '../../utils/createTask' +import { logger } from '../../utils/logger' + +export interface ExportAllItemsJobData { + userId: string + integrationId: string +} + +export const EXPORT_ALL_ITEMS_JOB_NAME = 'export-all-items' + +export const exportAllItems = async (jobData: ExportAllItemsJobData) => { + const { userId, integrationId } = jobData + const user = await findActiveUser(userId) + if (!user) { + logger.error('user not found', { + userId, + }) + return + } + + const integration = await findIntegration( + { + id: integrationId, + enabled: true, + type: IntegrationType.Export, + }, + userId + ) + + if (!integration) { + logger.error('integration not found', { + userId, + integrationId, + }) + return + } + + // get paginated items from the database + const first = 50 + let after = 0 + for (;;) { + console.log('searching for items...', { + userId, + first, + after, + }) + const searchResult = await searchLibraryItems( + { from: after, size: first }, + userId + ) + const libraryItems = searchResult.libraryItems + const size = libraryItems.length + if (size === 0) { + break + } + + await enqueueExportItem({ + userId, + libraryItemIds: libraryItems.map((item) => item.id), + integrationId, + }) + + after += size + } +} diff --git a/packages/api/src/jobs/integration/export_item.ts b/packages/api/src/jobs/integration/export_item.ts new file mode 100644 index 000000000..896ca45b4 --- /dev/null +++ b/packages/api/src/jobs/integration/export_item.ts @@ -0,0 +1,83 @@ +import { IntegrationType } from '../../entity/integration' +import { + findIntegrations, + getIntegrationClient, + updateIntegration, +} from '../../services/integrations' +import { findLibraryItemsByIds } from '../../services/library_item' +import { logger } from '../../utils/logger' + +export interface ExportItemJobData { + userId: string + libraryItemIds: string[] + integrationId?: string +} + +export const EXPORT_ITEM_JOB_NAME = 'export-item' + +export const exportItem = async (jobData: ExportItemJobData) => { + const { libraryItemIds, userId, integrationId } = jobData + const libraryItems = await findLibraryItemsByIds(libraryItemIds, userId) + if (libraryItems.length === 0) { + logger.error('library items not found', { + userId, + libraryItemIds, + }) + return + } + + const integrations = await findIntegrations(userId, { + id: integrationId, + enabled: true, + type: IntegrationType.Export, + }) + + if (integrations.length <= 0) { + return + } + + await Promise.all( + integrations.map(async (integration) => { + const logObject = { + userId, + libraryItemIds, + integrationId: integration.id, + } + logger.info('exporting item...', logObject) + + try { + const client = getIntegrationClient(integration.name) + + const synced = await client.export(integration.token, libraryItems) + if (!synced) { + logger.error('failed to export item', logObject) + return Promise.resolve(false) + } + + const syncedAt = new Date() + logger.info('updating integration...', { + ...logObject, + syncedAt, + }) + + // update integration syncedAt if successful + const updated = await updateIntegration( + integration.id, + { + syncedAt, + }, + userId + ) + logger.info('integration updated', { + ...logObject, + updated, + }) + + return Promise.resolve(true) + } catch (err) { + logger.error('export with integration failed', err) + return Promise.resolve(false) + } + }) + ) +} diff --git a/packages/api/src/pubsub.ts b/packages/api/src/pubsub.ts index 2f5abf639..5ca9d2693 100644 --- a/packages/api/src/pubsub.ts +++ b/packages/api/src/pubsub.ts @@ -3,7 +3,12 @@ import express from 'express' import { RuleEventType } from './entity/rule' import { env } from './env' import { ReportType } from './generated/graphql' -import { enqueueTriggerRuleJob, enqueueWebhookJob } from './utils/createTask' +import { Merge } from './util' +import { + enqueueExportItem, + enqueueTriggerRuleJob, + enqueueWebhookJob, +} from './utils/createTask' import { deepDelete } from './utils/helpers' import { buildLogger } from './utils/logger' @@ -11,6 +16,8 @@ const logger = buildLogger('pubsub') const client = new PubSub() +type EntityData = Merge + export const createPubSubClient = (): PubsubClient => { const fieldsToDelete = ['user'] as const @@ -45,21 +52,26 @@ export const createPubSubClient = (): PubsubClient => { }, entityCreated: async ( type: EntityType, - data: T, + data: EntityData, userId: string ): Promise => { + const libraryItemId = data.libraryItemId // queue trigger rule job if (type === EntityType.PAGE) { - const libraryItemId = (data as T & { id: string }).id await enqueueTriggerRuleJob({ userId, ruleEventType: RuleEventType.PageCreated, libraryItemId, }) } + // queue export item job + await enqueueExportItem({ + userId, + libraryItemIds: [libraryItemId], + }) const cleanData = deepDelete( - data as T & Record, + data as EntityData & Record, [...fieldsToDelete] ) @@ -77,21 +89,27 @@ export const createPubSubClient = (): PubsubClient => { }, entityUpdated: async ( type: EntityType, - data: T, + data: EntityData, userId: string ): Promise => { + const libraryItemId = data.libraryItemId + // queue trigger rule job if (type === EntityType.PAGE) { - const libraryItemId = (data as T & { id: string }).id await enqueueTriggerRuleJob({ userId, ruleEventType: RuleEventType.PageUpdated, libraryItemId, }) } + // queue export item job + await enqueueExportItem({ + userId, + libraryItemIds: [libraryItemId], + }) const cleanData = deepDelete( - data as T & Record, + data as EntityData & Record, [...fieldsToDelete] ) @@ -146,8 +164,16 @@ export interface PubsubClient { name: string, username: string ) => Promise - entityCreated: (type: EntityType, data: T, userId: string) => Promise - entityUpdated: (type: EntityType, data: T, userId: string) => Promise + entityCreated: ( + type: EntityType, + data: EntityData, + userId: string + ) => Promise + entityUpdated: ( + type: EntityType, + data: EntityData, + userId: string + ) => Promise entityDeleted: (type: EntityType, id: string, userId: string) => Promise reportSubmitted( submitterId: string | undefined, diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index 8d0a9c7f4..cbd06787b 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -16,6 +16,14 @@ import { env } from './env' import { bulkAction, BULK_ACTION_JOB_NAME } from './jobs/bulk_action' import { callWebhook, CALL_WEBHOOK_JOB_NAME } from './jobs/call_webhook' import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail' +import { + exportAllItems, + EXPORT_ALL_ITEMS_JOB_NAME, +} from './jobs/integration/export_all_items' +import { + exportItem, + EXPORT_ITEM_JOB_NAME, +} from './jobs/integration/export_item' import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds' import { refreshFeed } from './jobs/rss/refreshFeed' import { savePageJob } from './jobs/save_page' @@ -103,6 +111,10 @@ export const createWorker = (connection: ConnectionOptions) => return bulkAction(job.data) case CALL_WEBHOOK_JOB_NAME: return callWebhook(job.data) + case EXPORT_ITEM_JOB_NAME: + return exportItem(job.data) + case EXPORT_ALL_ITEMS_JOB_NAME: + return exportAllItems(job.data) } }, { diff --git a/packages/api/src/resolvers/integrations/index.ts b/packages/api/src/resolvers/integrations/index.ts index 0a849c5f0..145c9ec81 100644 --- a/packages/api/src/resolvers/integrations/index.ts +++ b/packages/api/src/resolvers/integrations/index.ts @@ -34,7 +34,7 @@ import { import { analytics } from '../../utils/analytics' import { deleteTask, - enqueueExportToIntegration, + enqueueExportAllItems, enqueueImportFromIntegration, } from '../../utils/createTask' import { authorized } from '../../utils/gql-utils' @@ -98,17 +98,7 @@ export const setIntegrationResolver = authorized< } // create a task to sync all the pages if new integration or enable integration (export type) - const taskName = await enqueueExportToIntegration( - integration.id, - integration.name, - 0, - authToken - ) - log.info('enqueued task', taskName) - - // update task name in integration - await updateIntegration(integration.id, { taskName }, uid) - integration.taskName = taskName + await enqueueExportAllItems(integration.id, uid) } else if (integrationToSave.taskName) { // delete the task if disable integration and task exists const result = await deleteTask(integrationToSave.taskName) diff --git a/packages/api/src/routers/svc/integrations.ts b/packages/api/src/routers/svc/integrations.ts index 68db6b846..b91bd4b20 100644 --- a/packages/api/src/routers/svc/integrations.ts +++ b/packages/api/src/routers/svc/integrations.ts @@ -5,7 +5,7 @@ import express from 'express' import { Integration, IntegrationType } from '../../entity/integration' import { readPushSubscription } from '../../pubsub' import { getRepository } from '../../repository' -import { enqueueExportToIntegration } from '../../utils/createTask' +import { enqueueExportAllItems } from '../../utils/createTask' import { logger } from '../../utils/logger' import { createIntegrationToken } from '../auth/jwt_helpers' @@ -50,13 +50,7 @@ export function integrationsServiceRouter() { return } - const syncAt = integration.syncedAt?.getTime() || 0 - return enqueueExportToIntegration( - integration.id, - integration.name, - syncAt, - authToken - ) + return enqueueExportAllItems(integration.id, integration.user.id) }) ) } catch (err) { diff --git a/packages/api/src/services/highlights.ts b/packages/api/src/services/highlights.ts index ac6b08f0c..5883fb7cb 100644 --- a/packages/api/src/services/highlights.ts +++ b/packages/api/src/services/highlights.ts @@ -139,7 +139,7 @@ export const updateHighlight = async ( const libraryItemId = updatedHighlight.libraryItem.id await pubsub.entityUpdated( EntityType.HIGHLIGHT, - { ...highlight, id: highlightId, pageId: libraryItemId }, + { ...highlight, id: highlightId, pageId: libraryItemId, libraryItemId }, userId ) diff --git a/packages/api/src/services/integrations/integration.ts b/packages/api/src/services/integrations/integration.ts index 6aa51a49f..e3f1edbc8 100644 --- a/packages/api/src/services/integrations/integration.ts +++ b/packages/api/src/services/integrations/integration.ts @@ -1,4 +1,4 @@ -import { LibraryItemState } from '../../entity/library_item' +import { LibraryItem, LibraryItemState } from '../../entity/library_item' export interface RetrievedData { url: string @@ -23,4 +23,6 @@ export interface IntegrationClient { apiUrl: string accessToken(token: string): Promise + + export(token: string, items: LibraryItem[]): Promise } diff --git a/packages/api/src/services/integrations/pocket.ts b/packages/api/src/services/integrations/pocket.ts index 1ea4d6d87..517d3befa 100644 --- a/packages/api/src/services/integrations/pocket.ts +++ b/packages/api/src/services/integrations/pocket.ts @@ -35,4 +35,8 @@ export class PocketClient implements IntegrationClient { return null } } + + export = async (): Promise => { + return Promise.resolve(false) + } } diff --git a/packages/api/src/services/integrations/readwise.ts b/packages/api/src/services/integrations/readwise.ts index 7764294d2..0b0b6be0c 100644 --- a/packages/api/src/services/integrations/readwise.ts +++ b/packages/api/src/services/integrations/readwise.ts @@ -1,7 +1,36 @@ import axios from 'axios' +import { LibraryItem } from '../../entity/library_item' +import { highlightUrl, wait } from '../../utils/helpers' import { logger } from '../../utils/logger' import { IntegrationClient } from './integration' +interface ReadwiseHighlight { + // The highlight text, (technically the only field required in a highlight object) + text: string + // The title of the page the highlight is on + title?: string + // The author of the page the highlight is on + author?: string + // The URL of the page image + image_url?: string + // The URL of the page + source_url?: string + // A meaningful unique identifier for your app + source_type?: string + // One of: books, articles, tweets or podcasts + category?: string + // Annotation note attached to the specific highlight + note?: string + // Highlight's location in the source text. Used to order the highlights + location?: number + // One of: page, order or time_offset + location_type?: string + // A datetime representing when the highlight was taken in the ISO 8601 format + highlighted_at?: string + // Unique url of the specific highlight + highlight_url?: string +} + export class ReadwiseClient implements IntegrationClient { name = 'READWISE' apiUrl = 'https://readwise.io/api/v2' @@ -24,4 +53,82 @@ export class ReadwiseClient implements IntegrationClient { return null } } + + export = async (token: string, items: LibraryItem[]): Promise => { + let result = true + + const highlights = items.flatMap(this.itemToReadwiseHighlight) + + // If there are no highlights, we will skip the sync + if (highlights.length > 0) { + result = await this.syncWithReadwise(token, highlights) + } + + return result + } + + itemToReadwiseHighlight = (item: LibraryItem): ReadwiseHighlight[] => { + const category = item.siteName === 'Twitter' ? 'tweets' : 'articles' + return item.highlights + ?.map((highlight) => { + // filter out highlights that are not of type highlight or have no quote + if (highlight.highlightType !== 'HIGHLIGHT' || !highlight.quote) { + return undefined + } + + return { + text: highlight.quote, + title: item.title, + author: item.author || undefined, + highlight_url: highlightUrl(item.slug, highlight.id), + highlighted_at: new Date(highlight.createdAt).toISOString(), + category, + image_url: item.thumbnail || undefined, + location_type: 'order', + note: highlight.annotation || undefined, + source_type: 'omnivore', + source_url: item.originalUrl, + } + }) + .filter((highlight) => highlight !== undefined) as ReadwiseHighlight[] + } + + syncWithReadwise = async ( + token: string, + highlights: ReadwiseHighlight[], + retryCount = 0 + ): Promise => { + const url = `${this.apiUrl}/highlights` + try { + const response = await axios.post( + url, + { + highlights, + }, + { + headers: { + Authorization: `Token ${token}`, + 'Content-Type': 'application/json', + }, + timeout: 5000, // 5 seconds + } + ) + return response.status === 200 + } catch (error) { + console.error(error) + + if (axios.isAxiosError(error)) { + if (error.response?.status === 429 && retryCount < 3) { + console.log('Readwise API rate limit exceeded, retrying...') + // wait for Retry-After seconds in the header if rate limited + // max retry count is 3 + const retryAfter = error.response?.headers['retry-after'] || '10' // default to 10 seconds + await wait(parseInt(retryAfter, 10) * 1000) + return this.syncWithReadwise(token, highlights, retryCount + 1) + } + } + + return false + } + } } diff --git a/packages/api/src/services/labels.ts b/packages/api/src/services/labels.ts index 1bf770a6d..285338745 100644 --- a/packages/api/src/services/labels.ts +++ b/packages/api/src/services/labels.ts @@ -6,15 +6,18 @@ import { createPubSubClient, EntityType, PubsubClient } from '../pubsub' import { authTrx } from '../repository' import { CreateLabelInput, labelRepository } from '../repository/label' import { bulkEnqueueUpdateLabels } from '../utils/createTask' +import { logger } from '../utils/logger' import { findHighlightById } from './highlights' import { findLibraryItemIdsByLabelId } from './library_item' type AddLabelsToLibraryItemEvent = { + libraryItemId: string pageId: string labels: DeepPartial