diff --git a/packages/api/src/jobs/rss/refreshAllFeeds.ts b/packages/api/src/jobs/rss/refreshAllFeeds.ts index c9b93f196..43f4f38a3 100644 --- a/packages/api/src/jobs/rss/refreshAllFeeds.ts +++ b/packages/api/src/jobs/rss/refreshAllFeeds.ts @@ -4,6 +4,7 @@ import { QUEUE_NAME } from '../../queue-processor' import { redisDataSource } from '../../redis_data_source' import { RssSubscriptionGroup } from '../../utils/createTask' import { stringToHash } from '../../utils/helpers' +import { validateUrl } from '../../services/create_page_save_request' export const refreshAllFeeds = async (db: DataSource): Promise => { const subscriptionGroups = (await db.createEntityManager().query( @@ -42,7 +43,7 @@ export const refreshAllFeeds = async (db: DataSource): Promise => { } const updateSubscriptionGroup = async (group: RssSubscriptionGroup) => { - const feedURL = group.url + let feedURL = group.url const userList = JSON.stringify(group.userIds.sort()) if (!feedURL) { console.error('no url for feed group', group) @@ -52,6 +53,12 @@ const updateSubscriptionGroup = async (group: RssSubscriptionGroup) => { console.error('no userlist for feed group', group) return } + + try { + feedURL = validateUrl(feedURL).toString() + } catch (err) { + console.log('not refreshing invalid feed url: ', { feedURL }) + } const jobid = `refresh-feed_${stringToHash(feedURL)}_${stringToHash( userList )}` @@ -87,7 +94,13 @@ export const queueRSSRefreshAllFeedsJob = async () => { if (!queue) { return false } - return queue.add('refresh-all-feeds', {}) + return queue.add( + 'refresh-all-feeds', + {}, + { + priority: 100, + } + ) } type QueuePriority = 'low' | 'high' @@ -105,6 +118,6 @@ export const queueRSSRefreshFeedJob = async ( jobId: jobid, removeOnComplete: true, removeOnFail: true, - lifo: options.priority == 'high', + priority: options.priority == 'low' ? 10 : 50, }) } diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index 30c8156ce..ce98b3ceb 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -2,7 +2,7 @@ /* eslint-disable @typescript-eslint/restrict-template-expressions */ /* eslint-disable @typescript-eslint/require-await */ /* eslint-disable @typescript-eslint/no-misused-promises */ -import { Job, QueueEvents, Worker } from 'bullmq' +import { Job, QueueEvents, Worker, Queue } from 'bullmq' import express, { Express } from 'express' import { SnakeNamingStrategy } from 'typeorm-naming-strategies' import { appDataSource } from './data_source' @@ -61,11 +61,17 @@ const main = async () => { throw '[queue-processor] error redis is not initialized' } + const queue = new Queue(QUEUE_NAME) + const worker = new Worker( QUEUE_NAME, async (job: Job) => { switch (job.name) { case 'refresh-all-feeds': { + const counts = await queue.getJobCounts('wait') + if (counts.wait > 1000) { + return + } return await refreshAllFeeds(appDataSource) } case 'refresh-feed': { @@ -87,7 +93,7 @@ const main = async () => { }) queueEvents.on('added', async (job) => { - console.log('added job: ', job.jobId) + console.log('added job: ', job.jobId, job.name) }) queueEvents.on('removed', async (job) => { diff --git a/packages/api/src/utils/helpers.ts b/packages/api/src/utils/helpers.ts index 0763f4a74..72388667c 100644 --- a/packages/api/src/utils/helpers.ts +++ b/packages/api/src/utils/helpers.ts @@ -26,7 +26,6 @@ import { SearchItem, } from '../generated/graphql' import { createPubSubClient } from '../pubsub' -import { redisDataSource } from '../redis_data_source' import { Claims, WithDataSourcesContext } from '../resolvers/types' import { validateUrl } from '../services/create_page_save_request' import { updateLibraryItem } from '../services/library_item'