remove timeout

This commit is contained in:
Hongbo Wu
2024-04-23 15:28:28 +08:00
parent de258ae0bf
commit 1b6f4976d3
4 changed files with 68 additions and 73 deletions

View File

@ -63,7 +63,6 @@ import { updatePDFContentJob } from './jobs/update_pdf_content'
import { redisDataSource } from './redis_data_source'
import { CACHED_READING_POSITION_PREFIX } from './services/cached_reading_position'
import { getJobPriority } from './utils/createTask'
import { timeout } from './utils/helpers'
import { logger } from './utils/logger'
export const QUEUE_NAME = 'omnivore-backend-queue'
@ -129,68 +128,63 @@ export const createWorker = (connection: ConnectionOptions) =>
new Worker(
QUEUE_NAME,
async (job: Job) => {
const executeJob = async () => {
switch (job.name) {
case 'refresh-all-feeds': {
const queue = await getBackendQueue()
const counts = await queue?.getJobCounts('prioritized')
if (counts && counts.wait > 1000) {
return
}
return await refreshAllFeeds(appDataSource)
switch (job.name) {
case 'refresh-all-feeds': {
const queue = await getBackendQueue()
const counts = await queue?.getJobCounts('prioritized')
if (counts && counts.wait > 1000) {
return
}
case 'refresh-feed': {
return await refreshFeed(job.data)
}
case 'save-page': {
return savePageJob(job.data, job.attemptsMade)
}
case 'update-pdf-content': {
return updatePDFContentJob(job.data)
}
case THUMBNAIL_JOB:
return findThumbnail(job.data)
case TRIGGER_RULE_JOB_NAME:
return triggerRule(job.data)
case UPDATE_LABELS_JOB:
return updateLabels(job.data)
case UPDATE_HIGHLIGHT_JOB:
return updateHighlight(job.data)
case SYNC_READ_POSITIONS_JOB_NAME:
return syncReadPositionsJob(job.data)
case BULK_ACTION_JOB_NAME:
return bulkAction(job.data)
case CALL_WEBHOOK_JOB_NAME:
return callWebhook(job.data)
case EXPORT_ITEM_JOB_NAME:
return exportItem(job.data)
case AI_SUMMARIZE_JOB_NAME:
return aiSummarize(job.data)
case PROCESS_YOUTUBE_VIDEO_JOB_NAME:
return processYouTubeVideo(job.data)
case PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME:
return processYouTubeTranscript(job.data)
case EXPORT_ALL_ITEMS_JOB_NAME:
return exportAllItems(job.data)
case SEND_EMAIL_JOB:
return sendEmailJob(job.data)
case CONFIRM_EMAIL_JOB:
return confirmEmailJob(job.data)
case SAVE_ATTACHMENT_JOB:
return saveAttachmentJob(job.data)
case SAVE_NEWSLETTER_JOB:
return saveNewsletterJob(job.data)
case FORWARD_EMAIL_JOB:
return forwardEmailJob(job.data)
case CREATE_DIGEST_JOB:
return createDigestJob(job.data)
default:
logger.warning(`[queue-processor] unhandled job: ${job.name}`)
return await refreshAllFeeds(appDataSource)
}
case 'refresh-feed': {
return await refreshFeed(job.data)
}
case 'save-page': {
return savePageJob(job.data, job.attemptsMade)
}
case 'update-pdf-content': {
return updatePDFContentJob(job.data)
}
case THUMBNAIL_JOB:
return findThumbnail(job.data)
case TRIGGER_RULE_JOB_NAME:
return triggerRule(job.data)
case UPDATE_LABELS_JOB:
return updateLabels(job.data)
case UPDATE_HIGHLIGHT_JOB:
return updateHighlight(job.data)
case SYNC_READ_POSITIONS_JOB_NAME:
return syncReadPositionsJob(job.data)
case BULK_ACTION_JOB_NAME:
return bulkAction(job.data)
case CALL_WEBHOOK_JOB_NAME:
return callWebhook(job.data)
case EXPORT_ITEM_JOB_NAME:
return exportItem(job.data)
case AI_SUMMARIZE_JOB_NAME:
return aiSummarize(job.data)
case PROCESS_YOUTUBE_VIDEO_JOB_NAME:
return processYouTubeVideo(job.data)
case PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME:
return processYouTubeTranscript(job.data)
case EXPORT_ALL_ITEMS_JOB_NAME:
return exportAllItems(job.data)
case SEND_EMAIL_JOB:
return sendEmailJob(job.data)
case CONFIRM_EMAIL_JOB:
return confirmEmailJob(job.data)
case SAVE_ATTACHMENT_JOB:
return saveAttachmentJob(job.data)
case SAVE_NEWSLETTER_JOB:
return saveNewsletterJob(job.data)
case FORWARD_EMAIL_JOB:
return forwardEmailJob(job.data)
case CREATE_DIGEST_JOB:
return createDigestJob(job.data)
default:
logger.warning(`[queue-processor] unhandled job: ${job.name}`)
}
// timeout if the job takes more than 10 minutes to execute
await Promise.race([timeout(1000 * 60 * 10), executeJob()])
},
{
connection,
@ -324,6 +318,10 @@ const main = async () => {
console.log('completed job: ', job.jobId)
})
queueEvents.on('failed', async (job) => {
console.log('failed job: ', job.jobId)
})
workerRedisClient.on('error', (error) => {
console.trace('[queue-processor]: redis worker error', { error })
})