diff --git a/packages/api/src/jobs/rss/refreshAllFeeds.ts b/packages/api/src/jobs/rss/refreshAllFeeds.ts index 00c39c1d1..26f0882e4 100644 --- a/packages/api/src/jobs/rss/refreshAllFeeds.ts +++ b/packages/api/src/jobs/rss/refreshAllFeeds.ts @@ -3,10 +3,13 @@ import { DataSource } from 'typeorm' import { v4 as uuid } from 'uuid' import { getBackendQueue, JOB_VERSION } from '../../queue-processor' import { validateUrl } from '../../services/create_page_save_request' -import { RssSubscriptionGroup } from '../../utils/createTask' +import { getJobPriority, RssSubscriptionGroup } from '../../utils/createTask' import { stringToHash } from '../../utils/helpers' import { logger } from '../../utils/logger' +export const REFRESH_ALL_FEEDS_JOB_NAME = 'refresh-all-feeds' +export const REFRESH_FEED_JOB_NAME = 'refresh-feed' + export type RSSRefreshContext = { type: 'all' | 'user-added' refreshID: string @@ -116,10 +119,10 @@ export const queueRSSRefreshAllFeedsJob = async () => { return false } return queue.add( - 'refresh-all-feeds', + REFRESH_ALL_FEEDS_JOB_NAME, {}, { - priority: 100, + priority: getJobPriority(REFRESH_ALL_FEEDS_JOB_NAME), } ) } @@ -129,15 +132,15 @@ type QueuePriority = 'low' | 'high' export const queueRSSRefreshFeedJob = async ( jobid: string, payload: any, - options = { priority: 'high' as QueuePriority } + options = { priority: 'low' as QueuePriority } ): Promise => { const queue = await getBackendQueue() if (!queue) { return undefined } - return queue.add('refresh-feed', payload, { + return queue.add(REFRESH_FEED_JOB_NAME, payload, { jobId: `${jobid}_${JOB_VERSION}`, - priority: options.priority == 'low' ? 10 : 50, + priority: getJobPriority(`${REFRESH_FEED_JOB_NAME}_${options.priority}`), removeOnComplete: true, removeOnFail: true, }) diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index 218ddc156..8d0a9c7f4 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -11,11 +11,10 @@ import { Worker, } from 'bullmq' import express, { Express } from 'express' -import { SnakeNamingStrategy } from 'typeorm-naming-strategies' import { appDataSource } from './data_source' import { env } from './env' import { bulkAction, BULK_ACTION_JOB_NAME } from './jobs/bulk_action' -import { CALL_WEBHOOK_JOB_NAME, callWebhook } from './jobs/call_webhook' +import { callWebhook, CALL_WEBHOOK_JOB_NAME } from './jobs/call_webhook' import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail' import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds' import { refreshFeed } from './jobs/rss/refreshFeed' @@ -34,7 +33,8 @@ import { import { updatePDFContentJob } from './jobs/update_pdf_content' import { redisDataSource } from './redis_data_source' import { CACHED_READING_POSITION_PREFIX } from './services/cached_reading_position' -import { CustomTypeOrmLogger, logger } from './utils/logger' +import { getJobPriority } from './utils/createTask' +import { logger } from './utils/logger' export const QUEUE_NAME = 'omnivore-backend-queue' export const JOB_VERSION = 'v001' @@ -121,7 +121,7 @@ const setupCronJobs = async () => { SYNC_READ_POSITIONS_JOB_NAME, {}, { - priority: 1, + priority: getJobPriority(SYNC_READ_POSITIONS_JOB_NAME), repeat: { every: 60_000, }, diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index df9e08878..ad502f326 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -17,7 +17,12 @@ import { import { BulkActionData, BULK_ACTION_JOB_NAME } from '../jobs/bulk_action' import { CallWebhookJobData, CALL_WEBHOOK_JOB_NAME } from '../jobs/call_webhook' import { THUMBNAIL_JOB } from '../jobs/find_thumbnail' -import { queueRSSRefreshFeedJob } from '../jobs/rss/refreshAllFeeds' +import { + queueRSSRefreshFeedJob, + REFRESH_ALL_FEEDS_JOB_NAME, + REFRESH_FEED_JOB_NAME, +} from '../jobs/rss/refreshAllFeeds' +import { SYNC_READ_POSITIONS_JOB_NAME } from '../jobs/sync_read_positions' import { TriggerRuleJobData, TRIGGER_RULE_JOB_NAME } from '../jobs/trigger_rule' import { UpdateHighlightData, @@ -37,6 +42,37 @@ import View = google.cloud.tasks.v2.Task.View // Instantiates a client. const client = new CloudTasksClient() +/** + * we want to prioritized jobs by the expected time to complete + * lower number means higher priority + * priority 1: jobs that are expected to run immediately + * priority 5: jobs that are expected to run in less than 10 seconds + * priority 10: jobs that are expected to run in less than 1 minute + * priority 50: jobs that are expected to run in less than 30 minutes + * priority 100: jobs that are expected to run in less than 1 hour + **/ +export const getJobPriority = (jobName: string): number => { + switch (jobName) { + case UPDATE_LABELS_JOB: + case UPDATE_HIGHLIGHT_JOB: + case SYNC_READ_POSITIONS_JOB_NAME: + return 1 + case TRIGGER_RULE_JOB_NAME: + case CALL_WEBHOOK_JOB_NAME: + return 5 + case BULK_ACTION_JOB_NAME: + case `${REFRESH_FEED_JOB_NAME}_high`: + return 10 + case `${REFRESH_FEED_JOB_NAME}_low`: + return 50 + case REFRESH_ALL_FEEDS_JOB_NAME: + case THUMBNAIL_JOB: + return 100 + default: + return 1 + } +} + const logError = (error: any): void => { if (axios.isAxiosError(error)) { logger.error(error.response) @@ -601,7 +637,7 @@ export const enqueueThumbnailJob = async ( libraryItemId, } return queue.add(THUMBNAIL_JOB, payload, { - priority: 100, + priority: getJobPriority(THUMBNAIL_JOB), attempts: 1, removeOnComplete: true, }) @@ -665,7 +701,7 @@ export const enqueueTriggerRuleJob = async (data: TriggerRuleJobData) => { } return queue.add(TRIGGER_RULE_JOB_NAME, data, { - priority: 5, + priority: getJobPriority(TRIGGER_RULE_JOB_NAME), attempts: 1, }) } @@ -677,7 +713,7 @@ export const enqueueWebhookJob = async (data: CallWebhookJobData) => { } return queue.add(CALL_WEBHOOK_JOB_NAME, data, { - priority: 5, + priority: getJobPriority(CALL_WEBHOOK_JOB_NAME), attempts: 1, }) } @@ -694,7 +730,7 @@ export const bulkEnqueueUpdateLabels = async (data: UpdateLabelsData[]) => { opts: { jobId: `${UPDATE_LABELS_JOB}_${d.libraryItemId}_${JOB_VERSION}`, attempts: 6, - priority: 1, + priority: getJobPriority(UPDATE_LABELS_JOB), removeOnComplete: true, removeOnFail: true, }, @@ -718,7 +754,7 @@ export const enqueueUpdateHighlight = async (data: UpdateHighlightData) => { return queue.add(UPDATE_HIGHLIGHT_JOB, data, { jobId: `${UPDATE_HIGHLIGHT_JOB}_${data.libraryItemId}_${JOB_VERSION}`, attempts: 6, - priority: 1, + priority: getJobPriority(UPDATE_HIGHLIGHT_JOB), removeOnComplete: true, removeOnFail: true, }) @@ -738,7 +774,7 @@ export const enqueueBulkAction = async (data: BulkActionData) => { try { return queue.add(BULK_ACTION_JOB_NAME, data, { attempts: 1, - priority: 10, + priority: getJobPriority(BULK_ACTION_JOB_NAME), jobId, // deduplication removeOnComplete: true, removeOnFail: true,