monitor each job latency in prometheus
This commit is contained in:
@ -12,6 +12,7 @@ import {
|
||||
Worker,
|
||||
} from 'bullmq'
|
||||
import express, { Express } from 'express'
|
||||
import client from 'prom-client'
|
||||
import { appDataSource } from './data_source'
|
||||
import { env } from './env'
|
||||
import { TaskState } from './generated/graphql'
|
||||
@ -70,7 +71,7 @@ import {
|
||||
import { updateHome, UPDATE_HOME_JOB } from './jobs/update_home'
|
||||
import { updatePDFContentJob } from './jobs/update_pdf_content'
|
||||
import { uploadContentJob, UPLOAD_CONTENT_JOB } from './jobs/upload_content'
|
||||
import { getMetrics } from './prometheus'
|
||||
import { getMetrics, registerMetric } from './prometheus'
|
||||
import { redisDataSource } from './redis_data_source'
|
||||
import { CACHED_READING_POSITION_PREFIX } from './services/cached_reading_position'
|
||||
import { getJobPriority } from './utils/createTask'
|
||||
@ -79,6 +80,17 @@ import { logger } from './utils/logger'
|
||||
export const QUEUE_NAME = 'omnivore-backend-queue'
|
||||
export const JOB_VERSION = 'v001'
|
||||
|
||||
const jobLatency = new client.Histogram({
|
||||
name: 'omnivore_job_latency',
|
||||
help: 'Latency of jobs in the queue',
|
||||
labelNames: ['job_name'],
|
||||
buckets: [0, 1, 5, 10, 50, 100, 500],
|
||||
})
|
||||
|
||||
jobLatency.observe(10)
|
||||
|
||||
registerMetric(jobLatency)
|
||||
|
||||
export const getBackendQueue = async (
|
||||
name = QUEUE_NAME
|
||||
): Promise<Queue | undefined> => {
|
||||
@ -139,71 +151,77 @@ export const createWorker = (connection: ConnectionOptions) =>
|
||||
new Worker(
|
||||
QUEUE_NAME,
|
||||
async (job: Job) => {
|
||||
switch (job.name) {
|
||||
case 'refresh-all-feeds': {
|
||||
const queue = await getBackendQueue()
|
||||
const counts = await queue?.getJobCounts('prioritized')
|
||||
if (counts && counts.wait > 1000) {
|
||||
return
|
||||
const executeJob = async (job: Job) => {
|
||||
switch (job.name) {
|
||||
case 'refresh-all-feeds': {
|
||||
const queue = await getBackendQueue()
|
||||
const counts = await queue?.getJobCounts('prioritized')
|
||||
if (counts && counts.wait > 1000) {
|
||||
return
|
||||
}
|
||||
return await refreshAllFeeds(appDataSource)
|
||||
}
|
||||
return await refreshAllFeeds(appDataSource)
|
||||
case 'refresh-feed': {
|
||||
return await refreshFeed(job.data)
|
||||
}
|
||||
case 'save-page': {
|
||||
return savePageJob(job.data, job.attemptsMade)
|
||||
}
|
||||
case 'update-pdf-content': {
|
||||
return updatePDFContentJob(job.data)
|
||||
}
|
||||
case THUMBNAIL_JOB:
|
||||
return findThumbnail(job.data)
|
||||
case TRIGGER_RULE_JOB_NAME:
|
||||
return triggerRule(job.data)
|
||||
case UPDATE_LABELS_JOB:
|
||||
return updateLabels(job.data)
|
||||
case UPDATE_HIGHLIGHT_JOB:
|
||||
return updateHighlight(job.data)
|
||||
case SYNC_READ_POSITIONS_JOB_NAME:
|
||||
return syncReadPositionsJob(job.data)
|
||||
case BULK_ACTION_JOB_NAME:
|
||||
return bulkAction(job.data)
|
||||
case CALL_WEBHOOK_JOB_NAME:
|
||||
return callWebhook(job.data)
|
||||
case EXPORT_ITEM_JOB_NAME:
|
||||
return exportItem(job.data)
|
||||
case AI_SUMMARIZE_JOB_NAME:
|
||||
return aiSummarize(job.data)
|
||||
case PROCESS_YOUTUBE_VIDEO_JOB_NAME:
|
||||
return processYouTubeVideo(job.data)
|
||||
case PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME:
|
||||
return processYouTubeTranscript(job.data)
|
||||
case EXPORT_ALL_ITEMS_JOB_NAME:
|
||||
return exportAllItems(job.data)
|
||||
case SEND_EMAIL_JOB:
|
||||
return sendEmailJob(job.data)
|
||||
case CONFIRM_EMAIL_JOB:
|
||||
return confirmEmailJob(job.data)
|
||||
case SAVE_ATTACHMENT_JOB:
|
||||
return saveAttachmentJob(job.data)
|
||||
case SAVE_NEWSLETTER_JOB:
|
||||
return saveNewsletterJob(job.data)
|
||||
case FORWARD_EMAIL_JOB:
|
||||
return forwardEmailJob(job.data)
|
||||
case CREATE_DIGEST_JOB:
|
||||
return createDigest(job.data)
|
||||
case UPLOAD_CONTENT_JOB:
|
||||
return uploadContentJob(job.data)
|
||||
case UPDATE_HOME_JOB:
|
||||
return updateHome(job.data)
|
||||
case SCORE_LIBRARY_ITEM_JOB:
|
||||
return scoreLibraryItem(job.data)
|
||||
case GENERATE_PREVIEW_CONTENT_JOB:
|
||||
return generatePreviewContent(job.data)
|
||||
default:
|
||||
logger.warning(`[queue-processor] unhandled job: ${job.name}`)
|
||||
}
|
||||
case 'refresh-feed': {
|
||||
return await refreshFeed(job.data)
|
||||
}
|
||||
case 'save-page': {
|
||||
return savePageJob(job.data, job.attemptsMade)
|
||||
}
|
||||
case 'update-pdf-content': {
|
||||
return updatePDFContentJob(job.data)
|
||||
}
|
||||
case THUMBNAIL_JOB:
|
||||
return findThumbnail(job.data)
|
||||
case TRIGGER_RULE_JOB_NAME:
|
||||
return triggerRule(job.data)
|
||||
case UPDATE_LABELS_JOB:
|
||||
return updateLabels(job.data)
|
||||
case UPDATE_HIGHLIGHT_JOB:
|
||||
return updateHighlight(job.data)
|
||||
case SYNC_READ_POSITIONS_JOB_NAME:
|
||||
return syncReadPositionsJob(job.data)
|
||||
case BULK_ACTION_JOB_NAME:
|
||||
return bulkAction(job.data)
|
||||
case CALL_WEBHOOK_JOB_NAME:
|
||||
return callWebhook(job.data)
|
||||
case EXPORT_ITEM_JOB_NAME:
|
||||
return exportItem(job.data)
|
||||
case AI_SUMMARIZE_JOB_NAME:
|
||||
return aiSummarize(job.data)
|
||||
case PROCESS_YOUTUBE_VIDEO_JOB_NAME:
|
||||
return processYouTubeVideo(job.data)
|
||||
case PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME:
|
||||
return processYouTubeTranscript(job.data)
|
||||
case EXPORT_ALL_ITEMS_JOB_NAME:
|
||||
return exportAllItems(job.data)
|
||||
case SEND_EMAIL_JOB:
|
||||
return sendEmailJob(job.data)
|
||||
case CONFIRM_EMAIL_JOB:
|
||||
return confirmEmailJob(job.data)
|
||||
case SAVE_ATTACHMENT_JOB:
|
||||
return saveAttachmentJob(job.data)
|
||||
case SAVE_NEWSLETTER_JOB:
|
||||
return saveNewsletterJob(job.data)
|
||||
case FORWARD_EMAIL_JOB:
|
||||
return forwardEmailJob(job.data)
|
||||
case CREATE_DIGEST_JOB:
|
||||
return createDigest(job.data)
|
||||
case UPLOAD_CONTENT_JOB:
|
||||
return uploadContentJob(job.data)
|
||||
case UPDATE_HOME_JOB:
|
||||
return updateHome(job.data)
|
||||
case SCORE_LIBRARY_ITEM_JOB:
|
||||
return scoreLibraryItem(job.data)
|
||||
case GENERATE_PREVIEW_CONTENT_JOB:
|
||||
return generatePreviewContent(job.data)
|
||||
default:
|
||||
logger.warning(`[queue-processor] unhandled job: ${job.name}`)
|
||||
}
|
||||
|
||||
const end = jobLatency.startTimer({ job_name: job.name })
|
||||
await executeJob(job)
|
||||
end()
|
||||
},
|
||||
{
|
||||
connection,
|
||||
|
||||
Reference in New Issue
Block a user