Merge pull request #3546 from omnivore-app/feature/export-item-job

create a job for exporting item to integrations
This commit is contained in:
Hongbo Wu
2024-02-22 15:46:36 +08:00
committed by GitHub
25 changed files with 403 additions and 119 deletions

View File

@ -0,0 +1,69 @@
import { IntegrationType } from '../../entity/integration'
import { findIntegration } from '../../services/integrations'
import { searchLibraryItems } from '../../services/library_item'
import { findActiveUser } from '../../services/user'
import { enqueueExportItem } from '../../utils/createTask'
import { logger } from '../../utils/logger'
export interface ExportAllItemsJobData {
userId: string
integrationId: string
}
export const EXPORT_ALL_ITEMS_JOB_NAME = 'export-all-items'
export const exportAllItems = async (jobData: ExportAllItemsJobData) => {
const { userId, integrationId } = jobData
const user = await findActiveUser(userId)
if (!user) {
logger.error('user not found', {
userId,
})
return
}
const integration = await findIntegration(
{
id: integrationId,
enabled: true,
type: IntegrationType.Export,
},
userId
)
if (!integration) {
logger.error('integration not found', {
userId,
integrationId,
})
return
}
// get paginated items from the database
const first = 50
let after = 0
for (;;) {
console.log('searching for items...', {
userId,
first,
after,
})
const searchResult = await searchLibraryItems(
{ from: after, size: first },
userId
)
const libraryItems = searchResult.libraryItems
const size = libraryItems.length
if (size === 0) {
break
}
await enqueueExportItem({
userId,
libraryItemIds: libraryItems.map((item) => item.id),
integrationId,
})
after += size
}
}

View File

@ -0,0 +1,83 @@
import { IntegrationType } from '../../entity/integration'
import {
findIntegrations,
getIntegrationClient,
updateIntegration,
} from '../../services/integrations'
import { findLibraryItemsByIds } from '../../services/library_item'
import { logger } from '../../utils/logger'
export interface ExportItemJobData {
userId: string
libraryItemIds: string[]
integrationId?: string
}
export const EXPORT_ITEM_JOB_NAME = 'export-item'
export const exportItem = async (jobData: ExportItemJobData) => {
const { libraryItemIds, userId, integrationId } = jobData
const libraryItems = await findLibraryItemsByIds(libraryItemIds, userId)
if (libraryItems.length === 0) {
logger.error('library items not found', {
userId,
libraryItemIds,
})
return
}
const integrations = await findIntegrations(userId, {
id: integrationId,
enabled: true,
type: IntegrationType.Export,
})
if (integrations.length <= 0) {
return
}
await Promise.all(
integrations.map(async (integration) => {
const logObject = {
userId,
libraryItemIds,
integrationId: integration.id,
}
logger.info('exporting item...', logObject)
try {
const client = getIntegrationClient(integration.name)
const synced = await client.export(integration.token, libraryItems)
if (!synced) {
logger.error('failed to export item', logObject)
return Promise.resolve(false)
}
const syncedAt = new Date()
logger.info('updating integration...', {
...logObject,
syncedAt,
})
// update integration syncedAt if successful
const updated = await updateIntegration(
integration.id,
{
syncedAt,
},
userId
)
logger.info('integration updated', {
...logObject,
updated,
})
return Promise.resolve(true)
} catch (err) {
logger.error('export with integration failed', err)
return Promise.resolve(false)
}
})
)
}

View File

@ -3,7 +3,12 @@ import express from 'express'
import { RuleEventType } from './entity/rule'
import { env } from './env'
import { ReportType } from './generated/graphql'
import { enqueueTriggerRuleJob, enqueueWebhookJob } from './utils/createTask'
import { Merge } from './util'
import {
enqueueExportItem,
enqueueTriggerRuleJob,
enqueueWebhookJob,
} from './utils/createTask'
import { deepDelete } from './utils/helpers'
import { buildLogger } from './utils/logger'
@ -11,6 +16,8 @@ const logger = buildLogger('pubsub')
const client = new PubSub()
type EntityData<T> = Merge<T, { libraryItemId: string }>
export const createPubSubClient = (): PubsubClient => {
const fieldsToDelete = ['user'] as const
@ -45,21 +52,26 @@ export const createPubSubClient = (): PubsubClient => {
},
entityCreated: async <T>(
type: EntityType,
data: T,
data: EntityData<T>,
userId: string
): Promise<void> => {
const libraryItemId = data.libraryItemId
// queue trigger rule job
if (type === EntityType.PAGE) {
const libraryItemId = (data as T & { id: string }).id
await enqueueTriggerRuleJob({
userId,
ruleEventType: RuleEventType.PageCreated,
libraryItemId,
})
}
// queue export item job
await enqueueExportItem({
userId,
libraryItemIds: [libraryItemId],
})
const cleanData = deepDelete(
data as T & Record<typeof fieldsToDelete[number], unknown>,
data as EntityData<T> & Record<typeof fieldsToDelete[number], unknown>,
[...fieldsToDelete]
)
@ -77,21 +89,27 @@ export const createPubSubClient = (): PubsubClient => {
},
entityUpdated: async <T>(
type: EntityType,
data: T,
data: EntityData<T>,
userId: string
): Promise<void> => {
const libraryItemId = data.libraryItemId
// queue trigger rule job
if (type === EntityType.PAGE) {
const libraryItemId = (data as T & { id: string }).id
await enqueueTriggerRuleJob({
userId,
ruleEventType: RuleEventType.PageUpdated,
libraryItemId,
})
}
// queue export item job
await enqueueExportItem({
userId,
libraryItemIds: [libraryItemId],
})
const cleanData = deepDelete(
data as T & Record<typeof fieldsToDelete[number], unknown>,
data as EntityData<T> & Record<typeof fieldsToDelete[number], unknown>,
[...fieldsToDelete]
)
@ -146,8 +164,16 @@ export interface PubsubClient {
name: string,
username: string
) => Promise<void>
entityCreated: <T>(type: EntityType, data: T, userId: string) => Promise<void>
entityUpdated: <T>(type: EntityType, data: T, userId: string) => Promise<void>
entityCreated: <T>(
type: EntityType,
data: EntityData<T>,
userId: string
) => Promise<void>
entityUpdated: <T>(
type: EntityType,
data: EntityData<T>,
userId: string
) => Promise<void>
entityDeleted: (type: EntityType, id: string, userId: string) => Promise<void>
reportSubmitted(
submitterId: string | undefined,

View File

@ -16,6 +16,14 @@ import { env } from './env'
import { bulkAction, BULK_ACTION_JOB_NAME } from './jobs/bulk_action'
import { callWebhook, CALL_WEBHOOK_JOB_NAME } from './jobs/call_webhook'
import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail'
import {
exportAllItems,
EXPORT_ALL_ITEMS_JOB_NAME,
} from './jobs/integration/export_all_items'
import {
exportItem,
EXPORT_ITEM_JOB_NAME,
} from './jobs/integration/export_item'
import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds'
import { refreshFeed } from './jobs/rss/refreshFeed'
import { savePageJob } from './jobs/save_page'
@ -103,6 +111,10 @@ export const createWorker = (connection: ConnectionOptions) =>
return bulkAction(job.data)
case CALL_WEBHOOK_JOB_NAME:
return callWebhook(job.data)
case EXPORT_ITEM_JOB_NAME:
return exportItem(job.data)
case EXPORT_ALL_ITEMS_JOB_NAME:
return exportAllItems(job.data)
}
},
{

View File

@ -34,7 +34,7 @@ import {
import { analytics } from '../../utils/analytics'
import {
deleteTask,
enqueueExportToIntegration,
enqueueExportAllItems,
enqueueImportFromIntegration,
} from '../../utils/createTask'
import { authorized } from '../../utils/gql-utils'
@ -98,17 +98,7 @@ export const setIntegrationResolver = authorized<
}
// create a task to sync all the pages if new integration or enable integration (export type)
const taskName = await enqueueExportToIntegration(
integration.id,
integration.name,
0,
authToken
)
log.info('enqueued task', taskName)
// update task name in integration
await updateIntegration(integration.id, { taskName }, uid)
integration.taskName = taskName
await enqueueExportAllItems(integration.id, uid)
} else if (integrationToSave.taskName) {
// delete the task if disable integration and task exists
const result = await deleteTask(integrationToSave.taskName)

View File

@ -5,7 +5,7 @@ import express from 'express'
import { Integration, IntegrationType } from '../../entity/integration'
import { readPushSubscription } from '../../pubsub'
import { getRepository } from '../../repository'
import { enqueueExportToIntegration } from '../../utils/createTask'
import { enqueueExportAllItems } from '../../utils/createTask'
import { logger } from '../../utils/logger'
import { createIntegrationToken } from '../auth/jwt_helpers'
@ -50,13 +50,7 @@ export function integrationsServiceRouter() {
return
}
const syncAt = integration.syncedAt?.getTime() || 0
return enqueueExportToIntegration(
integration.id,
integration.name,
syncAt,
authToken
)
return enqueueExportAllItems(integration.id, integration.user.id)
})
)
} catch (err) {

View File

@ -139,7 +139,7 @@ export const updateHighlight = async (
const libraryItemId = updatedHighlight.libraryItem.id
await pubsub.entityUpdated<UpdateHighlightEvent>(
EntityType.HIGHLIGHT,
{ ...highlight, id: highlightId, pageId: libraryItemId },
{ ...highlight, id: highlightId, pageId: libraryItemId, libraryItemId },
userId
)

View File

@ -1,4 +1,4 @@
import { LibraryItemState } from '../../entity/library_item'
import { LibraryItem, LibraryItemState } from '../../entity/library_item'
export interface RetrievedData {
url: string
@ -23,4 +23,6 @@ export interface IntegrationClient {
apiUrl: string
accessToken(token: string): Promise<string | null>
export(token: string, items: LibraryItem[]): Promise<boolean>
}

View File

@ -35,4 +35,8 @@ export class PocketClient implements IntegrationClient {
return null
}
}
export = async (): Promise<boolean> => {
return Promise.resolve(false)
}
}

View File

@ -1,7 +1,36 @@
import axios from 'axios'
import { LibraryItem } from '../../entity/library_item'
import { highlightUrl, wait } from '../../utils/helpers'
import { logger } from '../../utils/logger'
import { IntegrationClient } from './integration'
interface ReadwiseHighlight {
// The highlight text, (technically the only field required in a highlight object)
text: string
// The title of the page the highlight is on
title?: string
// The author of the page the highlight is on
author?: string
// The URL of the page image
image_url?: string
// The URL of the page
source_url?: string
// A meaningful unique identifier for your app
source_type?: string
// One of: books, articles, tweets or podcasts
category?: string
// Annotation note attached to the specific highlight
note?: string
// Highlight's location in the source text. Used to order the highlights
location?: number
// One of: page, order or time_offset
location_type?: string
// A datetime representing when the highlight was taken in the ISO 8601 format
highlighted_at?: string
// Unique url of the specific highlight
highlight_url?: string
}
export class ReadwiseClient implements IntegrationClient {
name = 'READWISE'
apiUrl = 'https://readwise.io/api/v2'
@ -24,4 +53,82 @@ export class ReadwiseClient implements IntegrationClient {
return null
}
}
export = async (token: string, items: LibraryItem[]): Promise<boolean> => {
let result = true
const highlights = items.flatMap(this.itemToReadwiseHighlight)
// If there are no highlights, we will skip the sync
if (highlights.length > 0) {
result = await this.syncWithReadwise(token, highlights)
}
return result
}
itemToReadwiseHighlight = (item: LibraryItem): ReadwiseHighlight[] => {
const category = item.siteName === 'Twitter' ? 'tweets' : 'articles'
return item.highlights
?.map((highlight) => {
// filter out highlights that are not of type highlight or have no quote
if (highlight.highlightType !== 'HIGHLIGHT' || !highlight.quote) {
return undefined
}
return {
text: highlight.quote,
title: item.title,
author: item.author || undefined,
highlight_url: highlightUrl(item.slug, highlight.id),
highlighted_at: new Date(highlight.createdAt).toISOString(),
category,
image_url: item.thumbnail || undefined,
location_type: 'order',
note: highlight.annotation || undefined,
source_type: 'omnivore',
source_url: item.originalUrl,
}
})
.filter((highlight) => highlight !== undefined) as ReadwiseHighlight[]
}
syncWithReadwise = async (
token: string,
highlights: ReadwiseHighlight[],
retryCount = 0
): Promise<boolean> => {
const url = `${this.apiUrl}/highlights`
try {
const response = await axios.post(
url,
{
highlights,
},
{
headers: {
Authorization: `Token ${token}`,
'Content-Type': 'application/json',
},
timeout: 5000, // 5 seconds
}
)
return response.status === 200
} catch (error) {
console.error(error)
if (axios.isAxiosError(error)) {
if (error.response?.status === 429 && retryCount < 3) {
console.log('Readwise API rate limit exceeded, retrying...')
// wait for Retry-After seconds in the header if rate limited
// max retry count is 3
const retryAfter = error.response?.headers['retry-after'] || '10' // default to 10 seconds
await wait(parseInt(retryAfter, 10) * 1000)
return this.syncWithReadwise(token, highlights, retryCount + 1)
}
}
return false
}
}
}

View File

@ -6,15 +6,18 @@ import { createPubSubClient, EntityType, PubsubClient } from '../pubsub'
import { authTrx } from '../repository'
import { CreateLabelInput, labelRepository } from '../repository/label'
import { bulkEnqueueUpdateLabels } from '../utils/createTask'
import { logger } from '../utils/logger'
import { findHighlightById } from './highlights'
import { findLibraryItemIdsByLabelId } from './library_item'
type AddLabelsToLibraryItemEvent = {
libraryItemId: string
pageId: string
labels: DeepPartial<Label>[]
source?: LabelSource
}
type AddLabelsToHighlightEvent = {
libraryItemId: string
highlightId: string
labels: DeepPartial<Label>[]
}
@ -145,7 +148,7 @@ export const saveLabelsInLibraryItem = async (
// create pubsub event
await pubsub.entityCreated<AddLabelsToLibraryItemEvent>(
EntityType.LABEL,
{ pageId: libraryItemId, labels, source },
{ pageId: libraryItemId, labels, source, libraryItemId },
userId
)
}
@ -205,20 +208,22 @@ export const saveLabelsInHighlight = async (
)
})
const highlight = await findHighlightById(highlightId, userId)
if (!highlight) {
logger.error('Highlight not found', { highlightId, userId })
return
}
const libraryItemId = highlight.libraryItemId
// create pubsub event
await pubsub.entityCreated<AddLabelsToHighlightEvent>(
EntityType.LABEL,
{ highlightId, labels },
{ highlightId, labels, libraryItemId },
userId
)
const highlight = await findHighlightById(highlightId, userId)
if (highlight) {
// update labels in library item
await bulkEnqueueUpdateLabels([
{ libraryItemId: highlight.libraryItemId, userId },
])
}
// update labels in library item
await bulkEnqueueUpdateLabels([{ libraryItemId, userId }])
}
export const findLabelsByIds = async (

View File

@ -652,6 +652,21 @@ export const searchLibraryItems = async (
)
}
export const findLibraryItemsByIds = async (ids: string[], userId: string) => {
return authTrx(
async (tx) =>
tx
.createQueryBuilder(LibraryItem, 'library_item')
.leftJoinAndSelect('library_item.labels', 'labels')
.leftJoinAndSelect('library_item.highlights', 'highlights')
.leftJoinAndSelect('highlights.user', 'user')
.where('library_item.id IN (:...ids)', { ids })
.getMany(),
undefined,
userId
)
}
export const findLibraryItemById = async (
id: string,
userId: string
@ -775,6 +790,7 @@ export const updateLibraryItem = async (
{
...libraryItem,
id,
libraryItemId: id,
// don't send original content and readable content
originalContent: undefined,
readableContent: undefined,
@ -935,6 +951,7 @@ export const createOrUpdateLibraryItem = async (
EntityType.PAGE,
{
...newLibraryItem,
libraryItemId: newLibraryItem.id,
// don't send original content and readable content
originalContent: undefined,
readableContent: undefined,

View File

@ -17,6 +17,11 @@ import {
import { BulkActionData, BULK_ACTION_JOB_NAME } from '../jobs/bulk_action'
import { CallWebhookJobData, CALL_WEBHOOK_JOB_NAME } from '../jobs/call_webhook'
import { THUMBNAIL_JOB } from '../jobs/find_thumbnail'
import { EXPORT_ALL_ITEMS_JOB_NAME } from '../jobs/integration/export_all_items'
import {
ExportItemJobData,
EXPORT_ITEM_JOB_NAME,
} from '../jobs/integration/export_item'
import {
queueRSSRefreshFeedJob,
REFRESH_ALL_FEEDS_JOB_NAME,
@ -59,12 +64,14 @@ export const getJobPriority = (jobName: string): number => {
return 1
case TRIGGER_RULE_JOB_NAME:
case CALL_WEBHOOK_JOB_NAME:
case EXPORT_ITEM_JOB_NAME:
return 5
case BULK_ACTION_JOB_NAME:
case `${REFRESH_FEED_JOB_NAME}_high`:
return 10
case `${REFRESH_FEED_JOB_NAME}_low`:
return 50
case EXPORT_ALL_ITEMS_JOB_NAME:
case REFRESH_ALL_FEEDS_JOB_NAME:
case THUMBNAIL_JOB:
return 100
@ -574,55 +581,22 @@ export const enqueueImportFromIntegration = async (
return createdTasks[0].name
}
export const enqueueExportToIntegration = async (
export const enqueueExportAllItems = async (
integrationId: string,
integrationName: string,
syncAt: number, // unix timestamp in milliseconds
authToken: string
): Promise<string> => {
const { GOOGLE_CLOUD_PROJECT } = process.env
userId: string
) => {
const queue = await getBackendQueue()
if (!queue) {
return undefined
}
const payload = {
userId,
integrationId,
integrationName,
syncAt,
}
const headers = {
[OmnivoreAuthorizationHeader]: authToken,
}
// If there is no Google Cloud Project Id exposed, it means that we are in local environment
if (env.dev.isLocal || !GOOGLE_CLOUD_PROJECT) {
if (env.queue.integrationExporterUrl) {
// Calling the handler function directly.
setTimeout(() => {
axios
.post(env.queue.integrationExporterUrl, payload, {
headers,
})
.catch((error) => {
logError(error)
})
}, 0)
}
return nanoid()
}
const createdTasks = await createHttpTaskWithToken({
project: GOOGLE_CLOUD_PROJECT,
payload,
taskHandlerUrl: env.queue.integrationExporterUrl,
priority: 'low',
requestHeaders: headers,
return queue.add(EXPORT_ALL_ITEMS_JOB_NAME, payload, {
priority: getJobPriority(EXPORT_ALL_ITEMS_JOB_NAME),
attempts: 1,
})
if (!createdTasks || !createdTasks[0].name) {
logger.error(`Unable to get the name of the task`, {
payload,
createdTasks,
})
throw new CreateTaskError(`Unable to get the name of the task`)
}
return createdTasks[0].name
}
export const enqueueThumbnailJob = async (
@ -785,4 +759,16 @@ export const enqueueBulkAction = async (data: BulkActionData) => {
}
}
export const enqueueExportItem = async (jobData: ExportItemJobData) => {
const queue = await getBackendQueue()
if (!queue) {
return undefined
}
return queue.add(EXPORT_ITEM_JOB_NAME, jobData, {
attempts: 1,
priority: getJobPriority(EXPORT_ITEM_JOB_NAME),
})
}
export default createHttpTaskWithToken

View File

@ -10,6 +10,7 @@ import { Highlight as HighlightData } from '../entity/highlight'
import { LibraryItem, LibraryItemState } from '../entity/library_item'
import { Recommendation as RecommendationData } from '../entity/recommendation'
import { RegistrationType, User } from '../entity/user'
import { env } from '../env'
import {
Article,
ArticleSavingRequest,
@ -400,3 +401,6 @@ export const setRecentlySavedItemInRedis = async (
})
}
}
export const highlightUrl = (slug: string, highlightId: string): string =>
`${env.client.url}/me/${slug}#${highlightId}`

View File

@ -15,7 +15,7 @@ export const mochaGlobalTeardown = async () => {
console.log('redis connection closed')
if (redisDataSource.workerRedisClient) {
stopWorker()
await stopWorker()
console.log('worker closed')
}
}

View File

@ -26,7 +26,7 @@ describe('Sanitize Directive', () => {
})
describe('Update user with a bio that is too long', () => {
let bio = ''.padStart(500, '*')
const bio = ''.padStart(500, '*')
let query: string
beforeEach(() => {

View File

@ -130,17 +130,6 @@ describe('Integrations resolvers', () => {
)
expect(res.body.data.setIntegration.integration.enabled).to.be.true
})
it('creates new cloud task to sync all existing articles and highlights', async () => {
const res = await graphqlRequest(
query(integrationId, integrationName, token),
authToken
)
const integration = await findIntegration({
id: res.body.data.setIntegration.integration.id,
}, loginUser.id)
expect(integration?.taskName).not.to.be.null
})
})
})

View File

@ -30,7 +30,7 @@ describe('Emails Router', () => {
user = await createTestUser('fakeUser')
newsletterEmail = await createNewsletterEmail(user.id)
token = process.env.PUBSUB_VERIFICATION_TOKEN!
token = process.env.PUBSUB_VERIFICATION_TOKEN || ''
receivedEmail = await saveReceivedEmail(
from,
newsletterEmail.address,
@ -52,7 +52,7 @@ describe('Emails Router', () => {
describe('forward', () => {
const html = '<html>test html</html>'
beforeEach(async () => {
beforeEach(() => {
sinon.replace(
sendNotification,
'sendMulticastPushNotifications',

View File

@ -1,5 +1,5 @@
import { request } from '../util'
import 'mocha'
import { request } from '../util'
describe('Upload Router', () => {
const token = process.env.PUBSUB_VERIFICATION_TOKEN || ''

View File

@ -1,5 +1,5 @@
import 'mocha'
import { expect } from 'chai'
import 'mocha'
import { validateUrl } from '../../src/services/create_page_save_request'
describe('validateUrl', () => {

View File

@ -1,12 +1,12 @@
import 'mocha'
import { expect } from 'chai'
import 'mocha'
import { getHighlightLocation } from '../../src/services/highlights'
describe('getHighlightLocation', () => {
let patch: string
let location: number
before(async () => {
before(() => {
location = 109
patch = `@@ -${location + 1},16 +${location + 1},36 @@
. We're
@ -18,7 +18,7 @@ describe('getHighlightLocation', () => {
coming`
})
it('returns highlight location from patch', async () => {
it('returns highlight location from patch', () => {
const result = getHighlightLocation(patch)
expect(result).to.eql(location)
})

View File

@ -20,7 +20,7 @@ export const stopApolloServer = async () => {
await apollo.stop()
}
export const startWorker = async (connection: ConnectionOptions) => {
export const startWorker = (connection: ConnectionOptions) => {
worker = createWorker(connection)
queueEvents = new QueueEvents(QUEUE_NAME, {
connection,
@ -28,8 +28,8 @@ export const startWorker = async (connection: ConnectionOptions) => {
}
export const stopWorker = async () => {
queueEvents.close()
worker.close()
await queueEvents.close()
await worker.close()
}
export const waitUntilJobsDone = async (jobs: Job[]) => {

View File

@ -1,5 +1,5 @@
import 'mocha'
import { expect } from 'chai'
import 'mocha'
import { validatedDate } from '../../src/utils/helpers'
describe('validatedDate', () => {

View File

@ -1,7 +1,11 @@
import 'mocha'
import * as chai from 'chai'
import { expect } from 'chai'
import chaiAsPromised from 'chai-as-promised'
import fs from 'fs'
import 'mocha'
import nock from 'nock'
import { User } from '../../src/entity/user'
import { deleteUser } from '../../src/services/user'
import {
getTitleFromEmailSubject,
isProbablyArticle,
@ -9,11 +13,7 @@ import {
parsePageMetadata,
parsePreparedContent,
} from '../../src/utils/parser'
import nock from 'nock'
import chaiAsPromised from 'chai-as-promised'
import { User } from '../../src/entity/user'
import { createTestUser } from '../db'
import { deleteUser } from '../../src/services/user'
chai.use(chaiAsPromised)

View File

@ -6,10 +6,6 @@
"compilerOptions": {
"outDir": "dist"
},
"include": [
"src",
"test",
"../integration-handler/test/integrations.test.ts"
],
"exclude": ["./src/generated", "./test"]
"include": ["src", "test"],
"exclude": ["./src/generated"]
}