use a separate queue for digest job

This commit is contained in:
Hongbo Wu
2024-04-13 14:41:10 +08:00
parent cdb0b7525e
commit 03e497b489
5 changed files with 65 additions and 83 deletions

View File

@ -18,7 +18,6 @@ import { TaskState } from './generated/graphql'
import { aiSummarize, AI_SUMMARIZE_JOB_NAME } from './jobs/ai-summarize'
import { bulkAction, BULK_ACTION_JOB_NAME } from './jobs/bulk_action'
import { callWebhook, CALL_WEBHOOK_JOB_NAME } from './jobs/call_webhook'
import { CREATE_DIGEST_JOB, processCreateDigestJob } from './jobs/create_digest'
import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail'
import {
exportAllItems,
@ -66,18 +65,17 @@ import {
} from './jobs/email/inbound_emails'
export const QUEUE_NAME = 'omnivore-backend-queue'
export const LONG_RUNNING_QUEUE_NAME = 'omnivore-backend-long-running-queue'
export const JOB_VERSION = 'v001'
let backendQueue: Queue | undefined
export const getBackendQueue = async (): Promise<Queue | undefined> => {
if (backendQueue) {
await backendQueue.waitUntilReady()
return backendQueue
}
export const getBackendQueue = async (
name = QUEUE_NAME
): Promise<Queue | undefined> => {
if (!redisDataSource.workerRedisClient) {
throw new Error('Can not create queues, redis is not initialized')
}
backendQueue = new Queue(QUEUE_NAME, {
const backendQueue = new Queue(name, {
connection: redisDataSource.workerRedisClient,
defaultJobOptions: {
backoff: {
@ -99,8 +97,8 @@ export const getBackendQueue = async (): Promise<Queue | undefined> => {
export const createJobId = (jobName: string, userId: string) =>
`${jobName}_${userId}_${JOB_VERSION}`
export const getJob = async (jobId: string) => {
const queue = await getBackendQueue()
export const getJob = async (jobId: string, queueName?: string) => {
const queue = await getBackendQueue(queueName)
if (!queue) {
return
}
@ -182,8 +180,6 @@ export const createWorker = (connection: ConnectionOptions) =>
return saveNewsletterJob(job.data)
case FORWARD_EMAIL_JOB:
return forwardEmailJob(job.data)
case CREATE_DIGEST_JOB:
return processCreateDigestJob(job.data)
default:
logger.warning(`[queue-processor] unhandled job: ${job.name}`)
}