Merge pull request #3393 from omnivore-app/fix/job-priority

Fix refresh feed job priority
This commit is contained in:
Jackson Harper
2024-01-19 17:28:09 +08:00
committed by GitHub
3 changed files with 24 additions and 6 deletions

View File

@ -4,6 +4,7 @@ import { QUEUE_NAME } from '../../queue-processor'
import { redisDataSource } from '../../redis_data_source'
import { RssSubscriptionGroup } from '../../utils/createTask'
import { stringToHash } from '../../utils/helpers'
import { validateUrl } from '../../services/create_page_save_request'
export const refreshAllFeeds = async (db: DataSource): Promise<boolean> => {
const subscriptionGroups = (await db.createEntityManager().query(
@ -42,7 +43,7 @@ export const refreshAllFeeds = async (db: DataSource): Promise<boolean> => {
}
const updateSubscriptionGroup = async (group: RssSubscriptionGroup) => {
const feedURL = group.url
let feedURL = group.url
const userList = JSON.stringify(group.userIds.sort())
if (!feedURL) {
console.error('no url for feed group', group)
@ -52,6 +53,12 @@ const updateSubscriptionGroup = async (group: RssSubscriptionGroup) => {
console.error('no userlist for feed group', group)
return
}
try {
feedURL = validateUrl(feedURL).toString()
} catch (err) {
console.log('not refreshing invalid feed url: ', { feedURL })
}
const jobid = `refresh-feed_${stringToHash(feedURL)}_${stringToHash(
userList
)}`
@ -87,7 +94,13 @@ export const queueRSSRefreshAllFeedsJob = async () => {
if (!queue) {
return false
}
return queue.add('refresh-all-feeds', {})
return queue.add(
'refresh-all-feeds',
{},
{
priority: 100,
}
)
}
type QueuePriority = 'low' | 'high'
@ -105,6 +118,6 @@ export const queueRSSRefreshFeedJob = async (
jobId: jobid,
removeOnComplete: true,
removeOnFail: true,
lifo: options.priority == 'high',
priority: options.priority == 'low' ? 10 : 50,
})
}

View File

@ -2,7 +2,7 @@
/* eslint-disable @typescript-eslint/restrict-template-expressions */
/* eslint-disable @typescript-eslint/require-await */
/* eslint-disable @typescript-eslint/no-misused-promises */
import { Job, QueueEvents, Worker } from 'bullmq'
import { Job, QueueEvents, Worker, Queue } from 'bullmq'
import express, { Express } from 'express'
import { SnakeNamingStrategy } from 'typeorm-naming-strategies'
import { appDataSource } from './data_source'
@ -61,11 +61,17 @@ const main = async () => {
throw '[queue-processor] error redis is not initialized'
}
const queue = new Queue(QUEUE_NAME)
const worker = new Worker(
QUEUE_NAME,
async (job: Job) => {
switch (job.name) {
case 'refresh-all-feeds': {
const counts = await queue.getJobCounts('wait')
if (counts.wait > 1000) {
return
}
return await refreshAllFeeds(appDataSource)
}
case 'refresh-feed': {
@ -87,7 +93,7 @@ const main = async () => {
})
queueEvents.on('added', async (job) => {
console.log('added job: ', job.jobId)
console.log('added job: ', job.jobId, job.name)
})
queueEvents.on('removed', async (job) => {

View File

@ -26,7 +26,6 @@ import {
SearchItem,
} from '../generated/graphql'
import { createPubSubClient } from '../pubsub'
import { redisDataSource } from '../redis_data_source'
import { Claims, WithDataSourcesContext } from '../resolvers/types'
import { validateUrl } from '../services/create_page_save_request'
import { updateLibraryItem } from '../services/library_item'