get queue priority from a table

This commit is contained in:
Hongbo Wu
2024-02-21 22:01:04 +08:00
parent 927394e07c
commit ce19218e15
3 changed files with 56 additions and 17 deletions

View File

@ -17,7 +17,12 @@ import {
import { BulkActionData, BULK_ACTION_JOB_NAME } from '../jobs/bulk_action'
import { CallWebhookJobData, CALL_WEBHOOK_JOB_NAME } from '../jobs/call_webhook'
import { THUMBNAIL_JOB } from '../jobs/find_thumbnail'
import { queueRSSRefreshFeedJob } from '../jobs/rss/refreshAllFeeds'
import {
queueRSSRefreshFeedJob,
REFRESH_ALL_FEEDS_JOB_NAME,
REFRESH_FEED_JOB_NAME,
} from '../jobs/rss/refreshAllFeeds'
import { SYNC_READ_POSITIONS_JOB_NAME } from '../jobs/sync_read_positions'
import { TriggerRuleJobData, TRIGGER_RULE_JOB_NAME } from '../jobs/trigger_rule'
import {
UpdateHighlightData,
@ -37,6 +42,37 @@ import View = google.cloud.tasks.v2.Task.View
// Instantiates a client.
const client = new CloudTasksClient()
/**
* we want to prioritized jobs by the expected time to complete
* lower number means higher priority
* priority 1: jobs that are expected to run immediately
* priority 5: jobs that are expected to run in less than 10 seconds
* priority 10: jobs that are expected to run in less than 1 minute
* priority 50: jobs that are expected to run in less than 30 minutes
* priority 100: jobs that are expected to run in less than 1 hour
**/
export const getJobPriority = (jobName: string): number => {
switch (jobName) {
case UPDATE_LABELS_JOB:
case UPDATE_HIGHLIGHT_JOB:
case SYNC_READ_POSITIONS_JOB_NAME:
return 1
case TRIGGER_RULE_JOB_NAME:
case CALL_WEBHOOK_JOB_NAME:
return 5
case BULK_ACTION_JOB_NAME:
case `${REFRESH_FEED_JOB_NAME}_high`:
return 10
case `${REFRESH_FEED_JOB_NAME}_low`:
return 50
case REFRESH_ALL_FEEDS_JOB_NAME:
case THUMBNAIL_JOB:
return 100
default:
return 1
}
}
const logError = (error: any): void => {
if (axios.isAxiosError(error)) {
logger.error(error.response)
@ -601,7 +637,7 @@ export const enqueueThumbnailJob = async (
libraryItemId,
}
return queue.add(THUMBNAIL_JOB, payload, {
priority: 100,
priority: getJobPriority(THUMBNAIL_JOB),
attempts: 1,
removeOnComplete: true,
})
@ -665,7 +701,7 @@ export const enqueueTriggerRuleJob = async (data: TriggerRuleJobData) => {
}
return queue.add(TRIGGER_RULE_JOB_NAME, data, {
priority: 5,
priority: getJobPriority(TRIGGER_RULE_JOB_NAME),
attempts: 1,
})
}
@ -677,7 +713,7 @@ export const enqueueWebhookJob = async (data: CallWebhookJobData) => {
}
return queue.add(CALL_WEBHOOK_JOB_NAME, data, {
priority: 5,
priority: getJobPriority(CALL_WEBHOOK_JOB_NAME),
attempts: 1,
})
}
@ -694,7 +730,7 @@ export const bulkEnqueueUpdateLabels = async (data: UpdateLabelsData[]) => {
opts: {
jobId: `${UPDATE_LABELS_JOB}_${d.libraryItemId}_${JOB_VERSION}`,
attempts: 6,
priority: 1,
priority: getJobPriority(UPDATE_LABELS_JOB),
removeOnComplete: true,
removeOnFail: true,
},
@ -718,7 +754,7 @@ export const enqueueUpdateHighlight = async (data: UpdateHighlightData) => {
return queue.add(UPDATE_HIGHLIGHT_JOB, data, {
jobId: `${UPDATE_HIGHLIGHT_JOB}_${data.libraryItemId}_${JOB_VERSION}`,
attempts: 6,
priority: 1,
priority: getJobPriority(UPDATE_HIGHLIGHT_JOB),
removeOnComplete: true,
removeOnFail: true,
})
@ -738,7 +774,7 @@ export const enqueueBulkAction = async (data: BulkActionData) => {
try {
return queue.add(BULK_ACTION_JOB_NAME, data, {
attempts: 1,
priority: 10,
priority: getJobPriority(BULK_ACTION_JOB_NAME),
jobId, // deduplication
removeOnComplete: true,
removeOnFail: true,