initial export
This commit is contained in:
69
packages/api/src/jobs/integration/export_all_items.ts
Normal file
69
packages/api/src/jobs/integration/export_all_items.ts
Normal file
@ -0,0 +1,69 @@
|
||||
import { IntegrationType } from '../../entity/integration'
|
||||
import { findIntegration } from '../../services/integrations'
|
||||
import { searchLibraryItems } from '../../services/library_item'
|
||||
import { findActiveUser } from '../../services/user'
|
||||
import { enqueueExportItem } from '../../utils/createTask'
|
||||
import { logger } from '../../utils/logger'
|
||||
|
||||
export interface ExportAllItemsJobData {
|
||||
userId: string
|
||||
integrationId: string
|
||||
}
|
||||
|
||||
export const EXPORT_ALL_ITEMS_JOB_NAME = 'export-all-items'
|
||||
|
||||
export const exportAllItems = async (jobData: ExportAllItemsJobData) => {
|
||||
const { userId, integrationId } = jobData
|
||||
const user = await findActiveUser(userId)
|
||||
if (!user) {
|
||||
logger.error('user not found', {
|
||||
userId,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
const integration = await findIntegration(
|
||||
{
|
||||
id: integrationId,
|
||||
enabled: true,
|
||||
type: IntegrationType.Export,
|
||||
},
|
||||
userId
|
||||
)
|
||||
|
||||
if (!integration) {
|
||||
logger.error('integration not found', {
|
||||
userId,
|
||||
integrationId,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// get paginated items from the database
|
||||
const first = 50
|
||||
let after = 0
|
||||
for (;;) {
|
||||
console.log('searching for items...', {
|
||||
userId,
|
||||
first,
|
||||
after,
|
||||
})
|
||||
const searchResult = await searchLibraryItems(
|
||||
{ from: after, size: first },
|
||||
userId
|
||||
)
|
||||
const libraryItems = searchResult.libraryItems
|
||||
const size = libraryItems.length
|
||||
if (size === 0) {
|
||||
break
|
||||
}
|
||||
|
||||
await enqueueExportItem({
|
||||
userId,
|
||||
libraryItemIds: libraryItems.map((item) => item.id),
|
||||
integrationId,
|
||||
})
|
||||
|
||||
after += size
|
||||
}
|
||||
}
|
||||
@ -1,31 +1,33 @@
|
||||
import { IntegrationType } from '../entity/integration'
|
||||
import { IntegrationType } from '../../entity/integration'
|
||||
import {
|
||||
findIntegrations,
|
||||
getIntegrationClient,
|
||||
updateIntegration,
|
||||
} from '../services/integrations'
|
||||
import { findLibraryItemById } from '../services/library_item'
|
||||
import { logger } from '../utils/logger'
|
||||
} from '../../services/integrations'
|
||||
import { findLibraryItemsByIds } from '../../services/library_item'
|
||||
import { logger } from '../../utils/logger'
|
||||
|
||||
export interface ExportItemJobData {
|
||||
userId: string
|
||||
libraryItemId: string
|
||||
libraryItemIds: string[]
|
||||
integrationId?: string
|
||||
}
|
||||
|
||||
export const EXPORT_ITEM_JOB_NAME = 'export-item'
|
||||
|
||||
export const exportItem = async (jobData: ExportItemJobData) => {
|
||||
const { libraryItemId, userId } = jobData
|
||||
const libraryItem = await findLibraryItemById(libraryItemId, userId)
|
||||
if (!libraryItem) {
|
||||
logger.error('library item not found', {
|
||||
const { libraryItemIds, userId, integrationId } = jobData
|
||||
const libraryItems = await findLibraryItemsByIds(libraryItemIds, userId)
|
||||
if (libraryItems.length === 0) {
|
||||
logger.error('library items not found', {
|
||||
userId,
|
||||
libraryItemId,
|
||||
libraryItemIds,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
const integrations = await findIntegrations(userId, {
|
||||
id: integrationId,
|
||||
enabled: true,
|
||||
type: IntegrationType.Export,
|
||||
})
|
||||
@ -38,7 +40,7 @@ export const exportItem = async (jobData: ExportItemJobData) => {
|
||||
integrations.map(async (integration) => {
|
||||
const logObject = {
|
||||
userId,
|
||||
libraryItemId,
|
||||
libraryItemIds,
|
||||
integrationId: integration.id,
|
||||
}
|
||||
logger.info('exporting item...', logObject)
|
||||
@ -46,23 +48,23 @@ export const exportItem = async (jobData: ExportItemJobData) => {
|
||||
try {
|
||||
const client = getIntegrationClient(integration.name)
|
||||
|
||||
const synced = await client.export(integration.token, [libraryItem])
|
||||
const synced = await client.export(integration.token, libraryItems)
|
||||
if (!synced) {
|
||||
logger.error('failed to export item', logObject)
|
||||
return Promise.resolve(false)
|
||||
}
|
||||
|
||||
const lastItemUpdatedAt = libraryItem.updatedAt
|
||||
const syncedAt = new Date()
|
||||
logger.info('updating integration...', {
|
||||
...logObject,
|
||||
syncedAt: lastItemUpdatedAt,
|
||||
syncedAt,
|
||||
})
|
||||
|
||||
// update integration syncedAt if successful
|
||||
const updated = await updateIntegration(
|
||||
integration.id,
|
||||
{
|
||||
syncedAt: lastItemUpdatedAt,
|
||||
syncedAt,
|
||||
},
|
||||
userId
|
||||
)
|
||||
@ -64,10 +64,10 @@ export const createPubSubClient = (): PubsubClient => {
|
||||
libraryItemId,
|
||||
})
|
||||
}
|
||||
|
||||
// queue export item job
|
||||
await enqueueExportItem({
|
||||
userId,
|
||||
libraryItemId,
|
||||
libraryItemIds: [libraryItemId],
|
||||
})
|
||||
|
||||
const cleanData = deepDelete(
|
||||
@ -102,10 +102,10 @@ export const createPubSubClient = (): PubsubClient => {
|
||||
libraryItemId,
|
||||
})
|
||||
}
|
||||
|
||||
// queue export item job
|
||||
await enqueueExportItem({
|
||||
userId,
|
||||
libraryItemId,
|
||||
libraryItemIds: [libraryItemId],
|
||||
})
|
||||
|
||||
const cleanData = deepDelete(
|
||||
|
||||
@ -15,8 +15,15 @@ import { appDataSource } from './data_source'
|
||||
import { env } from './env'
|
||||
import { bulkAction, BULK_ACTION_JOB_NAME } from './jobs/bulk_action'
|
||||
import { callWebhook, CALL_WEBHOOK_JOB_NAME } from './jobs/call_webhook'
|
||||
import { exportItem, EXPORT_ITEM_JOB_NAME } from './jobs/export_item'
|
||||
import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail'
|
||||
import {
|
||||
exportAllItems,
|
||||
EXPORT_ALL_ITEMS_JOB_NAME,
|
||||
} from './jobs/integration/export_all_items'
|
||||
import {
|
||||
exportItem,
|
||||
EXPORT_ITEM_JOB_NAME,
|
||||
} from './jobs/integration/export_item'
|
||||
import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds'
|
||||
import { refreshFeed } from './jobs/rss/refreshFeed'
|
||||
import { savePageJob } from './jobs/save_page'
|
||||
@ -106,6 +113,8 @@ export const createWorker = (connection: ConnectionOptions) =>
|
||||
return callWebhook(job.data)
|
||||
case EXPORT_ITEM_JOB_NAME:
|
||||
return exportItem(job.data)
|
||||
case EXPORT_ALL_ITEMS_JOB_NAME:
|
||||
return exportAllItems(job.data)
|
||||
}
|
||||
},
|
||||
{
|
||||
|
||||
@ -34,7 +34,7 @@ import {
|
||||
import { analytics } from '../../utils/analytics'
|
||||
import {
|
||||
deleteTask,
|
||||
enqueueExportToIntegration,
|
||||
enqueueExportAllItems,
|
||||
enqueueImportFromIntegration,
|
||||
} from '../../utils/createTask'
|
||||
import { authorized } from '../../utils/gql-utils'
|
||||
@ -98,17 +98,8 @@ export const setIntegrationResolver = authorized<
|
||||
}
|
||||
|
||||
// create a task to sync all the pages if new integration or enable integration (export type)
|
||||
const taskName = await enqueueExportToIntegration(
|
||||
integration.id,
|
||||
integration.name,
|
||||
0,
|
||||
authToken
|
||||
)
|
||||
log.info('enqueued task', taskName)
|
||||
|
||||
// update task name in integration
|
||||
await updateIntegration(integration.id, { taskName }, uid)
|
||||
integration.taskName = taskName
|
||||
const job = await enqueueExportAllItems(integration.id, uid)
|
||||
log.info('enqueued job', job)
|
||||
} else if (integrationToSave.taskName) {
|
||||
// delete the task if disable integration and task exists
|
||||
const result = await deleteTask(integrationToSave.taskName)
|
||||
|
||||
@ -5,7 +5,7 @@ import express from 'express'
|
||||
import { Integration, IntegrationType } from '../../entity/integration'
|
||||
import { readPushSubscription } from '../../pubsub'
|
||||
import { getRepository } from '../../repository'
|
||||
import { enqueueExportToIntegration } from '../../utils/createTask'
|
||||
import { enqueueExportAllItems } from '../../utils/createTask'
|
||||
import { logger } from '../../utils/logger'
|
||||
import { createIntegrationToken } from '../auth/jwt_helpers'
|
||||
|
||||
@ -51,7 +51,7 @@ export function integrationsServiceRouter() {
|
||||
}
|
||||
|
||||
const syncAt = integration.syncedAt?.getTime() || 0
|
||||
return enqueueExportToIntegration(
|
||||
return enqueueExportAllItems(
|
||||
integration.id,
|
||||
integration.name,
|
||||
syncAt,
|
||||
|
||||
@ -652,6 +652,21 @@ export const searchLibraryItems = async (
|
||||
)
|
||||
}
|
||||
|
||||
export const findLibraryItemsByIds = async (ids: string[], userId: string) => {
|
||||
return authTrx(
|
||||
async (tx) =>
|
||||
tx
|
||||
.createQueryBuilder(LibraryItem, 'library_item')
|
||||
.leftJoinAndSelect('library_item.labels', 'labels')
|
||||
.leftJoinAndSelect('library_item.highlights', 'highlights')
|
||||
.leftJoinAndSelect('highlights.user', 'user')
|
||||
.where('library_item.id IN (:...ids)', { ids })
|
||||
.getMany(),
|
||||
undefined,
|
||||
userId
|
||||
)
|
||||
}
|
||||
|
||||
export const findLibraryItemById = async (
|
||||
id: string,
|
||||
userId: string
|
||||
|
||||
@ -16,8 +16,12 @@ import {
|
||||
} from '../generated/graphql'
|
||||
import { BulkActionData, BULK_ACTION_JOB_NAME } from '../jobs/bulk_action'
|
||||
import { CallWebhookJobData, CALL_WEBHOOK_JOB_NAME } from '../jobs/call_webhook'
|
||||
import { ExportItemJobData, EXPORT_ITEM_JOB_NAME } from '../jobs/export_item'
|
||||
import { THUMBNAIL_JOB } from '../jobs/find_thumbnail'
|
||||
import { EXPORT_ALL_ITEMS_JOB_NAME } from '../jobs/integration/export_all_items'
|
||||
import {
|
||||
ExportItemJobData,
|
||||
EXPORT_ITEM_JOB_NAME,
|
||||
} from '../jobs/integration/export_item'
|
||||
import {
|
||||
queueRSSRefreshFeedJob,
|
||||
REFRESH_ALL_FEEDS_JOB_NAME,
|
||||
@ -67,6 +71,7 @@ export const getJobPriority = (jobName: string): number => {
|
||||
return 10
|
||||
case `${REFRESH_FEED_JOB_NAME}_low`:
|
||||
return 50
|
||||
case EXPORT_ALL_ITEMS_JOB_NAME:
|
||||
case REFRESH_ALL_FEEDS_JOB_NAME:
|
||||
case THUMBNAIL_JOB:
|
||||
return 100
|
||||
@ -576,55 +581,22 @@ export const enqueueImportFromIntegration = async (
|
||||
return createdTasks[0].name
|
||||
}
|
||||
|
||||
export const enqueueExportToIntegration = async (
|
||||
export const enqueueExportAllItems = async (
|
||||
integrationId: string,
|
||||
integrationName: string,
|
||||
syncAt: number, // unix timestamp in milliseconds
|
||||
authToken: string
|
||||
): Promise<string> => {
|
||||
const { GOOGLE_CLOUD_PROJECT } = process.env
|
||||
userId: string
|
||||
) => {
|
||||
const queue = await getBackendQueue()
|
||||
if (!queue) {
|
||||
return undefined
|
||||
}
|
||||
const payload = {
|
||||
userId,
|
||||
integrationId,
|
||||
integrationName,
|
||||
syncAt,
|
||||
}
|
||||
|
||||
const headers = {
|
||||
[OmnivoreAuthorizationHeader]: authToken,
|
||||
}
|
||||
// If there is no Google Cloud Project Id exposed, it means that we are in local environment
|
||||
if (env.dev.isLocal || !GOOGLE_CLOUD_PROJECT) {
|
||||
if (env.queue.integrationExporterUrl) {
|
||||
// Calling the handler function directly.
|
||||
setTimeout(() => {
|
||||
axios
|
||||
.post(env.queue.integrationExporterUrl, payload, {
|
||||
headers,
|
||||
})
|
||||
.catch((error) => {
|
||||
logError(error)
|
||||
})
|
||||
}, 0)
|
||||
}
|
||||
return nanoid()
|
||||
}
|
||||
|
||||
const createdTasks = await createHttpTaskWithToken({
|
||||
project: GOOGLE_CLOUD_PROJECT,
|
||||
payload,
|
||||
taskHandlerUrl: env.queue.integrationExporterUrl,
|
||||
priority: 'low',
|
||||
requestHeaders: headers,
|
||||
return queue.add(EXPORT_ALL_ITEMS_JOB_NAME, payload, {
|
||||
priority: getJobPriority(EXPORT_ALL_ITEMS_JOB_NAME),
|
||||
attempts: 1,
|
||||
})
|
||||
|
||||
if (!createdTasks || !createdTasks[0].name) {
|
||||
logger.error(`Unable to get the name of the task`, {
|
||||
payload,
|
||||
createdTasks,
|
||||
})
|
||||
throw new CreateTaskError(`Unable to get the name of the task`)
|
||||
}
|
||||
return createdTasks[0].name
|
||||
}
|
||||
|
||||
export const enqueueThumbnailJob = async (
|
||||
|
||||
Reference in New Issue
Block a user