diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index 5f0fc6a21..489fcabfe 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -19,6 +19,17 @@ import { aiSummarize, AI_SUMMARIZE_JOB_NAME } from './jobs/ai-summarize' import { createDigestJob, CREATE_DIGEST_JOB } from './jobs/ai/create_digest' import { bulkAction, BULK_ACTION_JOB_NAME } from './jobs/bulk_action' import { callWebhook, CALL_WEBHOOK_JOB_NAME } from './jobs/call_webhook' +import { + confirmEmailJob, + CONFIRM_EMAIL_JOB, + forwardEmailJob, + FORWARD_EMAIL_JOB, + saveAttachmentJob, + saveNewsletterJob, + SAVE_ATTACHMENT_JOB, + SAVE_NEWSLETTER_JOB, +} from './jobs/email/inbound_emails' +import { sendEmailJob, SEND_EMAIL_JOB } from './jobs/email/send_email' import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail' import { exportAllItems, @@ -37,7 +48,6 @@ import { import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds' import { refreshFeed } from './jobs/rss/refreshFeed' import { savePageJob } from './jobs/save_page' -import { sendEmailJob, SEND_EMAIL_JOB } from './jobs/email/send_email' import { syncReadPositionsJob, SYNC_READ_POSITIONS_JOB_NAME, @@ -53,17 +63,8 @@ import { updatePDFContentJob } from './jobs/update_pdf_content' import { redisDataSource } from './redis_data_source' import { CACHED_READING_POSITION_PREFIX } from './services/cached_reading_position' import { getJobPriority } from './utils/createTask' +import { timeout } from './utils/helpers' import { logger } from './utils/logger' -import { - confirmEmailJob, - CONFIRM_EMAIL_JOB, - forwardEmailJob, - FORWARD_EMAIL_JOB, - saveAttachmentJob, - saveNewsletterJob, - SAVE_ATTACHMENT_JOB, - SAVE_NEWSLETTER_JOB, -} from './jobs/email/inbound_emails' export const QUEUE_NAME = 'omnivore-backend-queue' export const JOB_VERSION = 'v001' @@ -128,66 +129,73 @@ export const createWorker = (connection: ConnectionOptions) => new Worker( QUEUE_NAME, async (job: Job) => { - switch (job.name) { - case 'refresh-all-feeds': { - const queue = await getBackendQueue() - const counts = await queue?.getJobCounts('prioritized') - if (counts && counts.wait > 1000) { - return + const executeJob = async () => { + switch (job.name) { + case 'refresh-all-feeds': { + const queue = await getBackendQueue() + const counts = await queue?.getJobCounts('prioritized') + if (counts && counts.wait > 1000) { + return + } + return await refreshAllFeeds(appDataSource) } - return await refreshAllFeeds(appDataSource) + case 'refresh-feed': { + return await refreshFeed(job.data) + } + case 'save-page': { + return savePageJob(job.data, job.attemptsMade) + } + case 'update-pdf-content': { + return updatePDFContentJob(job.data) + } + case THUMBNAIL_JOB: + return findThumbnail(job.data) + case TRIGGER_RULE_JOB_NAME: + return triggerRule(job.data) + case UPDATE_LABELS_JOB: + return updateLabels(job.data) + case UPDATE_HIGHLIGHT_JOB: + return updateHighlight(job.data) + case SYNC_READ_POSITIONS_JOB_NAME: + return syncReadPositionsJob(job.data) + case BULK_ACTION_JOB_NAME: + return bulkAction(job.data) + case CALL_WEBHOOK_JOB_NAME: + return callWebhook(job.data) + case EXPORT_ITEM_JOB_NAME: + return exportItem(job.data) + case AI_SUMMARIZE_JOB_NAME: + return aiSummarize(job.data) + case PROCESS_YOUTUBE_VIDEO_JOB_NAME: + return processYouTubeVideo(job.data) + case PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME: + return processYouTubeTranscript(job.data) + case EXPORT_ALL_ITEMS_JOB_NAME: + return exportAllItems(job.data) + case SEND_EMAIL_JOB: + return sendEmailJob(job.data) + case CONFIRM_EMAIL_JOB: + return confirmEmailJob(job.data) + case SAVE_ATTACHMENT_JOB: + return saveAttachmentJob(job.data) + case SAVE_NEWSLETTER_JOB: + return saveNewsletterJob(job.data) + case FORWARD_EMAIL_JOB: + return forwardEmailJob(job.data) + case CREATE_DIGEST_JOB: + return createDigestJob(job.data) + default: + logger.warning(`[queue-processor] unhandled job: ${job.name}`) } - case 'refresh-feed': { - return await refreshFeed(job.data) - } - case 'save-page': { - return savePageJob(job.data, job.attemptsMade) - } - case 'update-pdf-content': { - return updatePDFContentJob(job.data) - } - case THUMBNAIL_JOB: - return findThumbnail(job.data) - case TRIGGER_RULE_JOB_NAME: - return triggerRule(job.data) - case UPDATE_LABELS_JOB: - return updateLabels(job.data) - case UPDATE_HIGHLIGHT_JOB: - return updateHighlight(job.data) - case SYNC_READ_POSITIONS_JOB_NAME: - return syncReadPositionsJob(job.data) - case BULK_ACTION_JOB_NAME: - return bulkAction(job.data) - case CALL_WEBHOOK_JOB_NAME: - return callWebhook(job.data) - case EXPORT_ITEM_JOB_NAME: - return exportItem(job.data) - case AI_SUMMARIZE_JOB_NAME: - return aiSummarize(job.data) - case PROCESS_YOUTUBE_VIDEO_JOB_NAME: - return processYouTubeVideo(job.data) - case PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME: - return processYouTubeTranscript(job.data) - case EXPORT_ALL_ITEMS_JOB_NAME: - return exportAllItems(job.data) - case SEND_EMAIL_JOB: - return sendEmailJob(job.data) - case CONFIRM_EMAIL_JOB: - return confirmEmailJob(job.data) - case SAVE_ATTACHMENT_JOB: - return saveAttachmentJob(job.data) - case SAVE_NEWSLETTER_JOB: - return saveNewsletterJob(job.data) - case FORWARD_EMAIL_JOB: - return forwardEmailJob(job.data) - case CREATE_DIGEST_JOB: - return createDigestJob(job.data) - default: - logger.warning(`[queue-processor] unhandled job: ${job.name}`) } + + // timeout if the job takes more than 10 minutes to execute + await Promise.race([timeout(1000 * 60 * 10), executeJob()]) }, { connection, + autorun: true, // start processing jobs immediately + lockDuration: 60_000, // 1 minute } ) diff --git a/packages/api/src/services/library_item.ts b/packages/api/src/services/library_item.ts index fb7f5957c..24b7a77f4 100644 --- a/packages/api/src/services/library_item.ts +++ b/packages/api/src/services/library_item.ts @@ -1117,6 +1117,13 @@ export const batchUpdateLibraryItems = async ( labelIds?: string[] | null, args?: unknown ) => { + if (!searchArgs.query) { + throw new Error('Search query is required') + } + + const searchQuery = parseSearchQuery(searchArgs.query) + const parameters: ObjectLiteral[] = [] + const queryString = buildQueryString(searchQuery, parameters) interface FolderArguments { folder: string } @@ -1140,19 +1147,17 @@ export const batchUpdateLibraryItems = async ( const getLibraryItemIds = async ( userId: string, em: EntityManager - ): Promise<{ id: string }[]> => { + ): Promise => { const queryBuilder = getQueryBuilder(userId, em) - return queryBuilder.select('library_item.id', 'id').getRawMany() - } + const libraryItems = await queryBuilder + .select('library_item.id', 'id') + .take(searchArgs.size) + .skip(searchArgs.from) + .getRawMany<{ id: string }>() - if (!searchArgs.query) { - throw new Error('Search query is required') + return libraryItems.map((item) => item.id) } - const searchQuery = parseSearchQuery(searchArgs.query) - const parameters: ObjectLiteral[] = [] - const queryString = buildQueryString(searchQuery, parameters) - const now = new Date().toISOString() // build the script let values: Record = {} @@ -1174,27 +1179,27 @@ export const batchUpdateLibraryItems = async ( throw new Error('Labels are required for this action') } - const libraryItems = await authTrx( + const libraryItemIds = await authTrx( async (tx) => getLibraryItemIds(userId, tx), undefined, userId ) // add labels to library items - for (const libraryItem of libraryItems) { - await addLabelsToLibraryItem(labelIds, libraryItem.id, userId) + for (const libraryItemId of libraryItemIds) { + await addLabelsToLibraryItem(labelIds, libraryItemId, userId) } return } case BulkActionType.MarkAsRead: { - const libraryItems = await authTrx( + const libraryItemIds = await authTrx( async (tx) => getLibraryItemIds(userId, tx), undefined, userId ) // update reading progress for library items - for (const libraryItem of libraryItems) { - await markItemAsRead(libraryItem.id, userId) + for (const libraryItemId of libraryItemIds) { + await markItemAsRead(libraryItemId, userId) } return @@ -1215,12 +1220,10 @@ export const batchUpdateLibraryItems = async ( } await authTrx( - async (tx) => - getQueryBuilder(userId, tx) - .take(searchArgs.size) - .update(LibraryItem) - .set(values) - .execute(), + async (tx) => { + const libraryItemIds = await getLibraryItemIds(userId, tx) + await tx.getRepository(LibraryItem).update(libraryItemIds, values) + }, undefined, userId ) diff --git a/packages/api/src/utils/helpers.ts b/packages/api/src/utils/helpers.ts index 785e5d5ef..2f27c0266 100644 --- a/packages/api/src/utils/helpers.ts +++ b/packages/api/src/utils/helpers.ts @@ -316,6 +316,12 @@ export const wait = (ms: number): Promise => { }) } +export const timeout = (ms: number): Promise => { + return new Promise((_resolve, reject) => { + setTimeout(() => reject(new Error('timeout')), ms) + }) +} + export const wordsCount = (text: string, isHtml = true): number => { try { return wordsCounter(text, { isHtml }).wordsCount