From 0e590e53a65e2b853a2d4723ece289d77b3e1613 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Fri, 19 Jan 2024 16:12:22 +0800 Subject: [PATCH 1/8] Improve RSS priorities for refreshing feeds --- packages/api/src/jobs/rss/refreshAllFeeds.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/api/src/jobs/rss/refreshAllFeeds.ts b/packages/api/src/jobs/rss/refreshAllFeeds.ts index c9b93f196..057bfc183 100644 --- a/packages/api/src/jobs/rss/refreshAllFeeds.ts +++ b/packages/api/src/jobs/rss/refreshAllFeeds.ts @@ -105,6 +105,6 @@ export const queueRSSRefreshFeedJob = async ( jobId: jobid, removeOnComplete: true, removeOnFail: true, - lifo: options.priority == 'high', + priority: options.priority == 'low' ? 10 : 100, }) } From 2dbff6a6ab274ab9597ab842b01571343e98c006 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Fri, 19 Jan 2024 16:12:44 +0800 Subject: [PATCH 2/8] Add some more debug output --- packages/api/src/queue-processor.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index 30c8156ce..470c0fcd8 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -87,7 +87,7 @@ const main = async () => { }) queueEvents.on('added', async (job) => { - console.log('added job: ', job.jobId) + console.log('added job: ', job.jobId, job.name) }) queueEvents.on('removed', async (job) => { From 84375b7c3602a7ca1425e4c969b3ea44bcd73187 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Fri, 19 Jan 2024 16:25:05 +0800 Subject: [PATCH 3/8] Add validation before refreshing feed URLs --- packages/api/src/jobs/rss/refreshAllFeeds.ts | 9 ++++++++- packages/api/src/utils/helpers.ts | 1 - 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/packages/api/src/jobs/rss/refreshAllFeeds.ts b/packages/api/src/jobs/rss/refreshAllFeeds.ts index 057bfc183..f9eb6ca04 100644 --- a/packages/api/src/jobs/rss/refreshAllFeeds.ts +++ b/packages/api/src/jobs/rss/refreshAllFeeds.ts @@ -4,6 +4,7 @@ import { QUEUE_NAME } from '../../queue-processor' import { redisDataSource } from '../../redis_data_source' import { RssSubscriptionGroup } from '../../utils/createTask' import { stringToHash } from '../../utils/helpers' +import { validateUrl } from '../../services/create_page_save_request' export const refreshAllFeeds = async (db: DataSource): Promise => { const subscriptionGroups = (await db.createEntityManager().query( @@ -42,7 +43,7 @@ export const refreshAllFeeds = async (db: DataSource): Promise => { } const updateSubscriptionGroup = async (group: RssSubscriptionGroup) => { - const feedURL = group.url + var feedURL = group.url const userList = JSON.stringify(group.userIds.sort()) if (!feedURL) { console.error('no url for feed group', group) @@ -52,6 +53,12 @@ const updateSubscriptionGroup = async (group: RssSubscriptionGroup) => { console.error('no userlist for feed group', group) return } + + try { + feedURL = validateUrl(feedURL).toString() + } catch (err) { + console.log('not refreshing invalid feed url: ' { feedURL }) + } const jobid = `refresh-feed_${stringToHash(feedURL)}_${stringToHash( userList )}` diff --git a/packages/api/src/utils/helpers.ts b/packages/api/src/utils/helpers.ts index 0763f4a74..72388667c 100644 --- a/packages/api/src/utils/helpers.ts +++ b/packages/api/src/utils/helpers.ts @@ -26,7 +26,6 @@ import { SearchItem, } from '../generated/graphql' import { createPubSubClient } from '../pubsub' -import { redisDataSource } from '../redis_data_source' import { Claims, WithDataSourcesContext } from '../resolvers/types' import { validateUrl } from '../services/create_page_save_request' import { updateLibraryItem } from '../services/library_item' From 02818d94a22f93de023b63f8624e1314ec677913 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Fri, 19 Jan 2024 16:37:57 +0800 Subject: [PATCH 4/8] Adjust priorities --- packages/api/src/jobs/rss/refreshAllFeeds.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/api/src/jobs/rss/refreshAllFeeds.ts b/packages/api/src/jobs/rss/refreshAllFeeds.ts index f9eb6ca04..734f03218 100644 --- a/packages/api/src/jobs/rss/refreshAllFeeds.ts +++ b/packages/api/src/jobs/rss/refreshAllFeeds.ts @@ -112,6 +112,6 @@ export const queueRSSRefreshFeedJob = async ( jobId: jobid, removeOnComplete: true, removeOnFail: true, - priority: options.priority == 'low' ? 10 : 100, + priority: options.priority == 'low' ? 10 : 50, }) } From e06ff24ceaf21f743b08a3c276643d489b213587 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Fri, 19 Jan 2024 16:44:20 +0800 Subject: [PATCH 5/8] Fix logging --- packages/api/src/jobs/rss/refreshAllFeeds.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/api/src/jobs/rss/refreshAllFeeds.ts b/packages/api/src/jobs/rss/refreshAllFeeds.ts index 734f03218..7b7f8e365 100644 --- a/packages/api/src/jobs/rss/refreshAllFeeds.ts +++ b/packages/api/src/jobs/rss/refreshAllFeeds.ts @@ -57,7 +57,7 @@ const updateSubscriptionGroup = async (group: RssSubscriptionGroup) => { try { feedURL = validateUrl(feedURL).toString() } catch (err) { - console.log('not refreshing invalid feed url: ' { feedURL }) + console.log('not refreshing invalid feed url: ', { feedURL }) } const jobid = `refresh-feed_${stringToHash(feedURL)}_${stringToHash( userList From d2491d3eb7f8617a5b6d7caec5aba748fcd59344 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Fri, 19 Jan 2024 16:59:15 +0800 Subject: [PATCH 6/8] Reduce refresh-all-feeds priority --- packages/api/src/jobs/rss/refreshAllFeeds.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/packages/api/src/jobs/rss/refreshAllFeeds.ts b/packages/api/src/jobs/rss/refreshAllFeeds.ts index 7b7f8e365..9dbc5c362 100644 --- a/packages/api/src/jobs/rss/refreshAllFeeds.ts +++ b/packages/api/src/jobs/rss/refreshAllFeeds.ts @@ -94,7 +94,13 @@ export const queueRSSRefreshAllFeedsJob = async () => { if (!queue) { return false } - return queue.add('refresh-all-feeds', {}) + return queue.add( + 'refresh-all-feeds', + {}, + { + priority: 100, + } + ) } type QueuePriority = 'low' | 'high' From 8ba1242ffbfe611cce612af87375fcc27ede409e Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Fri, 19 Jan 2024 16:59:41 +0800 Subject: [PATCH 7/8] Back off refresh-all-feeds if queue is very deep --- packages/api/src/queue-processor.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index 470c0fcd8..ce98b3ceb 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -2,7 +2,7 @@ /* eslint-disable @typescript-eslint/restrict-template-expressions */ /* eslint-disable @typescript-eslint/require-await */ /* eslint-disable @typescript-eslint/no-misused-promises */ -import { Job, QueueEvents, Worker } from 'bullmq' +import { Job, QueueEvents, Worker, Queue } from 'bullmq' import express, { Express } from 'express' import { SnakeNamingStrategy } from 'typeorm-naming-strategies' import { appDataSource } from './data_source' @@ -61,11 +61,17 @@ const main = async () => { throw '[queue-processor] error redis is not initialized' } + const queue = new Queue(QUEUE_NAME) + const worker = new Worker( QUEUE_NAME, async (job: Job) => { switch (job.name) { case 'refresh-all-feeds': { + const counts = await queue.getJobCounts('wait') + if (counts.wait > 1000) { + return + } return await refreshAllFeeds(appDataSource) } case 'refresh-feed': { From defb79d8c8f75c9b85748f89bcecc1048207bac1 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Fri, 19 Jan 2024 17:12:34 +0800 Subject: [PATCH 8/8] Use let instead of var --- packages/api/src/jobs/rss/refreshAllFeeds.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/api/src/jobs/rss/refreshAllFeeds.ts b/packages/api/src/jobs/rss/refreshAllFeeds.ts index 9dbc5c362..43f4f38a3 100644 --- a/packages/api/src/jobs/rss/refreshAllFeeds.ts +++ b/packages/api/src/jobs/rss/refreshAllFeeds.ts @@ -43,7 +43,7 @@ export const refreshAllFeeds = async (db: DataSource): Promise => { } const updateSubscriptionGroup = async (group: RssSubscriptionGroup) => { - var feedURL = group.url + let feedURL = group.url const userList = JSON.stringify(group.userIds.sort()) if (!feedURL) { console.error('no url for feed group', group)