From 8ba1242ffbfe611cce612af87375fcc27ede409e Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Fri, 19 Jan 2024 16:59:41 +0800 Subject: [PATCH] Back off refresh-all-feeds if queue is very deep --- packages/api/src/queue-processor.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index 470c0fcd8..ce98b3ceb 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -2,7 +2,7 @@ /* eslint-disable @typescript-eslint/restrict-template-expressions */ /* eslint-disable @typescript-eslint/require-await */ /* eslint-disable @typescript-eslint/no-misused-promises */ -import { Job, QueueEvents, Worker } from 'bullmq' +import { Job, QueueEvents, Worker, Queue } from 'bullmq' import express, { Express } from 'express' import { SnakeNamingStrategy } from 'typeorm-naming-strategies' import { appDataSource } from './data_source' @@ -61,11 +61,17 @@ const main = async () => { throw '[queue-processor] error redis is not initialized' } + const queue = new Queue(QUEUE_NAME) + const worker = new Worker( QUEUE_NAME, async (job: Job) => { switch (job.name) { case 'refresh-all-feeds': { + const counts = await queue.getJobCounts('wait') + if (counts.wait > 1000) { + return + } return await refreshAllFeeds(appDataSource) } case 'refresh-feed': {