Merge pull request #4464 from omnivore-app/jacksonh/queue-cleanup

Clean up queue processing
This commit is contained in:
Jackson Harper
2024-10-30 21:19:02 +08:00
committed by GitHub
3 changed files with 50 additions and 46 deletions

View File

@ -127,6 +127,10 @@ const uploadToBucket = async (
export const exportJob = async (jobData: ExportJobData) => {
const { userId, exportId } = jobData
logger.info('starting export job', {
userId,
exportId,
})
try {
const user = await findActiveUser(userId)

View File

@ -281,24 +281,24 @@ export const processYouTubeVideo = async (
updatedLibraryItem.publishedAt = new Date(video.uploadDate)
}
if ('getTranscript' in video && duration > 0 && duration < 1801) {
// If the video has a transcript available, put a placehold in and
// enqueue a job to process the full transcript
const updatedContent = await addTranscriptToReadableContent(
libraryItem.originalUrl,
libraryItem.readableContent,
TRANSCRIPT_PLACEHOLDER_TEXT
)
// if ('getTranscript' in video && duration > 0 && duration < 1801) {
// // If the video has a transcript available, put a placehold in and
// // enqueue a job to process the full transcript
// const updatedContent = await addTranscriptToReadableContent(
// libraryItem.originalUrl,
// libraryItem.readableContent,
// TRANSCRIPT_PLACEHOLDER_TEXT
// )
if (updatedContent) {
updatedLibraryItem.readableContent = updatedContent
}
// if (updatedContent) {
// updatedLibraryItem.readableContent = updatedContent
// }
await enqueueProcessYouTubeTranscript({
videoId,
...jobData,
})
}
// await enqueueProcessYouTubeTranscript({
// videoId,
// ...jobData,
// })
// }
if (updatedLibraryItem !== {}) {
await updateLibraryItem(

View File

@ -159,25 +159,25 @@ export const createWorker = (connection: ConnectionOptions) =>
async (job: Job) => {
const executeJob = async (job: Job) => {
switch (job.name) {
case 'refresh-all-feeds': {
const queue = await getQueue()
const counts = await queue?.getJobCounts('prioritized')
if (counts && counts.wait > 1000) {
return
}
return await refreshAllFeeds(appDataSource)
}
case 'refresh-feed': {
return await refreshFeed(job.data)
}
// case 'refresh-all-feeds': {
// const queue = await getQueue()
// const counts = await queue?.getJobCounts('prioritized')
// if (counts && counts.wait > 1000) {
// return
// }
// return await refreshAllFeeds(appDataSource)
// }
// case 'refresh-feed': {
// return await refreshFeed(job.data)
// }
case 'save-page': {
return savePageJob(job.data, job.attemptsMade)
}
case 'update-pdf-content': {
return updatePDFContentJob(job.data)
}
case THUMBNAIL_JOB:
return findThumbnail(job.data)
// case 'update-pdf-content': {
// return updatePDFContentJob(job.data)
// }
// case THUMBNAIL_JOB:
// return findThumbnail(job.data)
case TRIGGER_RULE_JOB_NAME:
return triggerRule(job.data)
case UPDATE_LABELS_JOB:
@ -192,12 +192,12 @@ export const createWorker = (connection: ConnectionOptions) =>
return callWebhook(job.data)
case EXPORT_ITEM_JOB_NAME:
return exportItem(job.data)
case AI_SUMMARIZE_JOB_NAME:
return aiSummarize(job.data)
case PROCESS_YOUTUBE_VIDEO_JOB_NAME:
return processYouTubeVideo(job.data)
case PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME:
return processYouTubeTranscript(job.data)
// case AI_SUMMARIZE_JOB_NAME:
// return aiSummarize(job.data)
// case PROCESS_YOUTUBE_VIDEO_JOB_NAME:
// return processYouTubeVideo(job.data)
// case PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME:
// return processYouTubeTranscript(job.data)
case EXPORT_ALL_ITEMS_JOB_NAME:
return exportAllItems(job.data)
case SEND_EMAIL_JOB:
@ -210,16 +210,16 @@ export const createWorker = (connection: ConnectionOptions) =>
return saveNewsletterJob(job.data)
case FORWARD_EMAIL_JOB:
return forwardEmailJob(job.data)
case CREATE_DIGEST_JOB:
return createDigest(job.data)
// case CREATE_DIGEST_JOB:
// return createDigest(job.data)
case UPLOAD_CONTENT_JOB:
return uploadContentJob(job.data)
case UPDATE_HOME_JOB:
return updateHome(job.data)
case SCORE_LIBRARY_ITEM_JOB:
return scoreLibraryItem(job.data)
case GENERATE_PREVIEW_CONTENT_JOB:
return generatePreviewContent(job.data)
// case UPDATE_HOME_JOB:
// return updateHome(job.data)
// case SCORE_LIBRARY_ITEM_JOB:
// return scoreLibraryItem(job.data)
// case GENERATE_PREVIEW_CONTENT_JOB:
// return generatePreviewContent(job.data)
case PRUNE_TRASH_JOB:
return pruneTrashJob(job.data)
case EXPIRE_FOLDERS_JOB_NAME: