timeout if job takes more than 10 minutes
This commit is contained in:
@ -19,6 +19,17 @@ import { aiSummarize, AI_SUMMARIZE_JOB_NAME } from './jobs/ai-summarize'
|
||||
import { createDigestJob, CREATE_DIGEST_JOB } from './jobs/ai/create_digest'
|
||||
import { bulkAction, BULK_ACTION_JOB_NAME } from './jobs/bulk_action'
|
||||
import { callWebhook, CALL_WEBHOOK_JOB_NAME } from './jobs/call_webhook'
|
||||
import {
|
||||
confirmEmailJob,
|
||||
CONFIRM_EMAIL_JOB,
|
||||
forwardEmailJob,
|
||||
FORWARD_EMAIL_JOB,
|
||||
saveAttachmentJob,
|
||||
saveNewsletterJob,
|
||||
SAVE_ATTACHMENT_JOB,
|
||||
SAVE_NEWSLETTER_JOB,
|
||||
} from './jobs/email/inbound_emails'
|
||||
import { sendEmailJob, SEND_EMAIL_JOB } from './jobs/email/send_email'
|
||||
import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail'
|
||||
import {
|
||||
exportAllItems,
|
||||
@ -37,7 +48,6 @@ import {
|
||||
import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds'
|
||||
import { refreshFeed } from './jobs/rss/refreshFeed'
|
||||
import { savePageJob } from './jobs/save_page'
|
||||
import { sendEmailJob, SEND_EMAIL_JOB } from './jobs/email/send_email'
|
||||
import {
|
||||
syncReadPositionsJob,
|
||||
SYNC_READ_POSITIONS_JOB_NAME,
|
||||
@ -53,17 +63,8 @@ import { updatePDFContentJob } from './jobs/update_pdf_content'
|
||||
import { redisDataSource } from './redis_data_source'
|
||||
import { CACHED_READING_POSITION_PREFIX } from './services/cached_reading_position'
|
||||
import { getJobPriority } from './utils/createTask'
|
||||
import { timeout } from './utils/helpers'
|
||||
import { logger } from './utils/logger'
|
||||
import {
|
||||
confirmEmailJob,
|
||||
CONFIRM_EMAIL_JOB,
|
||||
forwardEmailJob,
|
||||
FORWARD_EMAIL_JOB,
|
||||
saveAttachmentJob,
|
||||
saveNewsletterJob,
|
||||
SAVE_ATTACHMENT_JOB,
|
||||
SAVE_NEWSLETTER_JOB,
|
||||
} from './jobs/email/inbound_emails'
|
||||
|
||||
export const QUEUE_NAME = 'omnivore-backend-queue'
|
||||
export const JOB_VERSION = 'v001'
|
||||
@ -128,66 +129,73 @@ export const createWorker = (connection: ConnectionOptions) =>
|
||||
new Worker(
|
||||
QUEUE_NAME,
|
||||
async (job: Job) => {
|
||||
switch (job.name) {
|
||||
case 'refresh-all-feeds': {
|
||||
const queue = await getBackendQueue()
|
||||
const counts = await queue?.getJobCounts('prioritized')
|
||||
if (counts && counts.wait > 1000) {
|
||||
return
|
||||
const executeJob = async () => {
|
||||
switch (job.name) {
|
||||
case 'refresh-all-feeds': {
|
||||
const queue = await getBackendQueue()
|
||||
const counts = await queue?.getJobCounts('prioritized')
|
||||
if (counts && counts.wait > 1000) {
|
||||
return
|
||||
}
|
||||
return await refreshAllFeeds(appDataSource)
|
||||
}
|
||||
return await refreshAllFeeds(appDataSource)
|
||||
case 'refresh-feed': {
|
||||
return await refreshFeed(job.data)
|
||||
}
|
||||
case 'save-page': {
|
||||
return savePageJob(job.data, job.attemptsMade)
|
||||
}
|
||||
case 'update-pdf-content': {
|
||||
return updatePDFContentJob(job.data)
|
||||
}
|
||||
case THUMBNAIL_JOB:
|
||||
return findThumbnail(job.data)
|
||||
case TRIGGER_RULE_JOB_NAME:
|
||||
return triggerRule(job.data)
|
||||
case UPDATE_LABELS_JOB:
|
||||
return updateLabels(job.data)
|
||||
case UPDATE_HIGHLIGHT_JOB:
|
||||
return updateHighlight(job.data)
|
||||
case SYNC_READ_POSITIONS_JOB_NAME:
|
||||
return syncReadPositionsJob(job.data)
|
||||
case BULK_ACTION_JOB_NAME:
|
||||
return bulkAction(job.data)
|
||||
case CALL_WEBHOOK_JOB_NAME:
|
||||
return callWebhook(job.data)
|
||||
case EXPORT_ITEM_JOB_NAME:
|
||||
return exportItem(job.data)
|
||||
case AI_SUMMARIZE_JOB_NAME:
|
||||
return aiSummarize(job.data)
|
||||
case PROCESS_YOUTUBE_VIDEO_JOB_NAME:
|
||||
return processYouTubeVideo(job.data)
|
||||
case PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME:
|
||||
return processYouTubeTranscript(job.data)
|
||||
case EXPORT_ALL_ITEMS_JOB_NAME:
|
||||
return exportAllItems(job.data)
|
||||
case SEND_EMAIL_JOB:
|
||||
return sendEmailJob(job.data)
|
||||
case CONFIRM_EMAIL_JOB:
|
||||
return confirmEmailJob(job.data)
|
||||
case SAVE_ATTACHMENT_JOB:
|
||||
return saveAttachmentJob(job.data)
|
||||
case SAVE_NEWSLETTER_JOB:
|
||||
return saveNewsletterJob(job.data)
|
||||
case FORWARD_EMAIL_JOB:
|
||||
return forwardEmailJob(job.data)
|
||||
case CREATE_DIGEST_JOB:
|
||||
return createDigestJob(job.data)
|
||||
default:
|
||||
logger.warning(`[queue-processor] unhandled job: ${job.name}`)
|
||||
}
|
||||
case 'refresh-feed': {
|
||||
return await refreshFeed(job.data)
|
||||
}
|
||||
case 'save-page': {
|
||||
return savePageJob(job.data, job.attemptsMade)
|
||||
}
|
||||
case 'update-pdf-content': {
|
||||
return updatePDFContentJob(job.data)
|
||||
}
|
||||
case THUMBNAIL_JOB:
|
||||
return findThumbnail(job.data)
|
||||
case TRIGGER_RULE_JOB_NAME:
|
||||
return triggerRule(job.data)
|
||||
case UPDATE_LABELS_JOB:
|
||||
return updateLabels(job.data)
|
||||
case UPDATE_HIGHLIGHT_JOB:
|
||||
return updateHighlight(job.data)
|
||||
case SYNC_READ_POSITIONS_JOB_NAME:
|
||||
return syncReadPositionsJob(job.data)
|
||||
case BULK_ACTION_JOB_NAME:
|
||||
return bulkAction(job.data)
|
||||
case CALL_WEBHOOK_JOB_NAME:
|
||||
return callWebhook(job.data)
|
||||
case EXPORT_ITEM_JOB_NAME:
|
||||
return exportItem(job.data)
|
||||
case AI_SUMMARIZE_JOB_NAME:
|
||||
return aiSummarize(job.data)
|
||||
case PROCESS_YOUTUBE_VIDEO_JOB_NAME:
|
||||
return processYouTubeVideo(job.data)
|
||||
case PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME:
|
||||
return processYouTubeTranscript(job.data)
|
||||
case EXPORT_ALL_ITEMS_JOB_NAME:
|
||||
return exportAllItems(job.data)
|
||||
case SEND_EMAIL_JOB:
|
||||
return sendEmailJob(job.data)
|
||||
case CONFIRM_EMAIL_JOB:
|
||||
return confirmEmailJob(job.data)
|
||||
case SAVE_ATTACHMENT_JOB:
|
||||
return saveAttachmentJob(job.data)
|
||||
case SAVE_NEWSLETTER_JOB:
|
||||
return saveNewsletterJob(job.data)
|
||||
case FORWARD_EMAIL_JOB:
|
||||
return forwardEmailJob(job.data)
|
||||
case CREATE_DIGEST_JOB:
|
||||
return createDigestJob(job.data)
|
||||
default:
|
||||
logger.warning(`[queue-processor] unhandled job: ${job.name}`)
|
||||
}
|
||||
|
||||
// timeout if the job takes more than 10 minutes to execute
|
||||
await Promise.race([timeout(1000 * 60 * 10), executeJob()])
|
||||
},
|
||||
{
|
||||
connection,
|
||||
autorun: true, // start processing jobs immediately
|
||||
lockDuration: 60_000, // 1 minute
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@ -1117,6 +1117,13 @@ export const batchUpdateLibraryItems = async (
|
||||
labelIds?: string[] | null,
|
||||
args?: unknown
|
||||
) => {
|
||||
if (!searchArgs.query) {
|
||||
throw new Error('Search query is required')
|
||||
}
|
||||
|
||||
const searchQuery = parseSearchQuery(searchArgs.query)
|
||||
const parameters: ObjectLiteral[] = []
|
||||
const queryString = buildQueryString(searchQuery, parameters)
|
||||
interface FolderArguments {
|
||||
folder: string
|
||||
}
|
||||
@ -1140,19 +1147,17 @@ export const batchUpdateLibraryItems = async (
|
||||
const getLibraryItemIds = async (
|
||||
userId: string,
|
||||
em: EntityManager
|
||||
): Promise<{ id: string }[]> => {
|
||||
): Promise<string[]> => {
|
||||
const queryBuilder = getQueryBuilder(userId, em)
|
||||
return queryBuilder.select('library_item.id', 'id').getRawMany()
|
||||
}
|
||||
const libraryItems = await queryBuilder
|
||||
.select('library_item.id', 'id')
|
||||
.take(searchArgs.size)
|
||||
.skip(searchArgs.from)
|
||||
.getRawMany<{ id: string }>()
|
||||
|
||||
if (!searchArgs.query) {
|
||||
throw new Error('Search query is required')
|
||||
return libraryItems.map((item) => item.id)
|
||||
}
|
||||
|
||||
const searchQuery = parseSearchQuery(searchArgs.query)
|
||||
const parameters: ObjectLiteral[] = []
|
||||
const queryString = buildQueryString(searchQuery, parameters)
|
||||
|
||||
const now = new Date().toISOString()
|
||||
// build the script
|
||||
let values: Record<string, string | number> = {}
|
||||
@ -1174,27 +1179,27 @@ export const batchUpdateLibraryItems = async (
|
||||
throw new Error('Labels are required for this action')
|
||||
}
|
||||
|
||||
const libraryItems = await authTrx(
|
||||
const libraryItemIds = await authTrx(
|
||||
async (tx) => getLibraryItemIds(userId, tx),
|
||||
undefined,
|
||||
userId
|
||||
)
|
||||
// add labels to library items
|
||||
for (const libraryItem of libraryItems) {
|
||||
await addLabelsToLibraryItem(labelIds, libraryItem.id, userId)
|
||||
for (const libraryItemId of libraryItemIds) {
|
||||
await addLabelsToLibraryItem(labelIds, libraryItemId, userId)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
case BulkActionType.MarkAsRead: {
|
||||
const libraryItems = await authTrx(
|
||||
const libraryItemIds = await authTrx(
|
||||
async (tx) => getLibraryItemIds(userId, tx),
|
||||
undefined,
|
||||
userId
|
||||
)
|
||||
// update reading progress for library items
|
||||
for (const libraryItem of libraryItems) {
|
||||
await markItemAsRead(libraryItem.id, userId)
|
||||
for (const libraryItemId of libraryItemIds) {
|
||||
await markItemAsRead(libraryItemId, userId)
|
||||
}
|
||||
|
||||
return
|
||||
@ -1215,12 +1220,10 @@ export const batchUpdateLibraryItems = async (
|
||||
}
|
||||
|
||||
await authTrx(
|
||||
async (tx) =>
|
||||
getQueryBuilder(userId, tx)
|
||||
.take(searchArgs.size)
|
||||
.update(LibraryItem)
|
||||
.set(values)
|
||||
.execute(),
|
||||
async (tx) => {
|
||||
const libraryItemIds = await getLibraryItemIds(userId, tx)
|
||||
await tx.getRepository(LibraryItem).update(libraryItemIds, values)
|
||||
},
|
||||
undefined,
|
||||
userId
|
||||
)
|
||||
|
||||
@ -316,6 +316,12 @@ export const wait = (ms: number): Promise<void> => {
|
||||
})
|
||||
}
|
||||
|
||||
export const timeout = (ms: number): Promise<void> => {
|
||||
return new Promise((_resolve, reject) => {
|
||||
setTimeout(() => reject(new Error('timeout')), ms)
|
||||
})
|
||||
}
|
||||
|
||||
export const wordsCount = (text: string, isHtml = true): number => {
|
||||
try {
|
||||
return wordsCounter(text, { isHtml }).wordsCount
|
||||
|
||||
Reference in New Issue
Block a user