From e4109a63f0d4c382fee089dbf274fc7d72188e2c Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Wed, 24 Jan 2024 12:15:38 +0800 Subject: [PATCH] Expose custom metrics for the queue processor --- packages/api/src/queue-processor.ts | 30 +++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index 7ccd5e272..b35cf7778 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -2,7 +2,7 @@ /* eslint-disable @typescript-eslint/restrict-template-expressions */ /* eslint-disable @typescript-eslint/require-await */ /* eslint-disable @typescript-eslint/no-misused-promises */ -import { Job, QueueEvents, Worker, Queue } from 'bullmq' +import { Job, QueueEvents, Worker, Queue, JobType } from 'bullmq' import express, { Express } from 'express' import { SnakeNamingStrategy } from 'typeorm-naming-strategies' import { appDataSource } from './data_source' @@ -63,6 +63,24 @@ const main = async () => { // respond healthy to auto-scaler. app.get('/_ah/health', (req, res) => res.sendStatus(200)) + app.get('/metrics', async (_, res) => { + const queue = await getBackendQueue() + if (!queue) { + res.sendStatus(400) + return + } + let output = '' + const metrics: JobType[] = ['active', 'failed', 'completed', 'prioritized'] + const counts = await queue.getJobCounts(...metrics) + console.log('counts: ', counts) + + metrics.forEach((metric, idx) => { + output += `omnivore_backend_queue_${metric}{} ${counts[metric]}\n` + }) + + res.status(200).setHeader('Content-Type', 'text/plain').send(output) + }) + const server = app.listen(port, () => { console.log(`[queue-processor]: started`) }) @@ -78,17 +96,17 @@ const main = async () => { throw '[queue-processor] error redis is not initialized' } - const queue = new Queue(QUEUE_NAME, { - connection: workerRedisClient, - }) + // Init the backend queue now + await getBackendQueue() const worker = new Worker( QUEUE_NAME, async (job: Job) => { switch (job.name) { case 'refresh-all-feeds': { - const counts = await queue.getJobCounts('wait') - if (counts.wait > 1000) { + const queue = await getBackendQueue() + const counts = await queue?.getJobCounts('prioritized') + if (counts && counts.wait > 1000) { return } return await refreshAllFeeds(appDataSource)