enqueue content-fetch task to the queue

This commit is contained in:
Hongbo Wu
2024-07-15 21:47:27 +08:00
parent e3eae1c96c
commit 87b4ec503e
8 changed files with 103 additions and 135 deletions

View File

@ -82,7 +82,8 @@ import { CACHED_READING_POSITION_PREFIX } from './services/cached_reading_positi
import { getJobPriority } from './utils/createTask'
import { logger } from './utils/logger'
export const QUEUE_NAME = 'omnivore-backend-queue'
export const BACKEND_QUEUE_NAME = 'omnivore-backend-queue'
export const CONTENT_FETCH_QUEUE_NAME = 'omnivore-content-fetch-queue'
export const JOB_VERSION = 'v001'
const jobLatency = new client.Histogram({
@ -94,8 +95,8 @@ const jobLatency = new client.Histogram({
registerMetric(jobLatency)
export const getBackendQueue = async (
name = QUEUE_NAME
export const getQueue = async (
name = BACKEND_QUEUE_NAME
): Promise<Queue | undefined> => {
if (!redisDataSource.workerRedisClient) {
throw new Error('Can not create queues, redis is not initialized')
@ -124,7 +125,7 @@ export const createJobId = (jobName: string, userId: string) =>
`${jobName}_${userId}_${JOB_VERSION}`
export const getJob = async (jobId: string, queueName?: string) => {
const queue = await getBackendQueue(queueName)
const queue = await getQueue(queueName)
if (!queue) {
return
}
@ -152,12 +153,12 @@ export const jobStateToTaskState = (
export const createWorker = (connection: ConnectionOptions) =>
new Worker(
QUEUE_NAME,
BACKEND_QUEUE_NAME,
async (job: Job) => {
const executeJob = async (job: Job) => {
switch (job.name) {
case 'refresh-all-feeds': {
const queue = await getBackendQueue()
const queue = await getQueue()
const counts = await queue?.getJobCounts('prioritized')
if (counts && counts.wait > 1000) {
return
@ -239,7 +240,7 @@ export const createWorker = (connection: ConnectionOptions) =>
)
const setupCronJobs = async () => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
logger.error('Unable to setup cron jobs. Queue is not available.')
return
@ -278,7 +279,7 @@ const main = async () => {
})
app.get('/metrics', async (_, res) => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
res.sendStatus(400)
return
@ -295,7 +296,7 @@ const main = async () => {
jobsTypes.forEach((metric, idx) => {
output += `# TYPE omnivore_queue_messages_${metric} gauge\n`
output += `omnivore_queue_messages_${metric}{queue="${QUEUE_NAME}"} ${counts[metric]}\n`
output += `omnivore_queue_messages_${metric}{queue="${BACKEND_QUEUE_NAME}"} ${counts[metric]}\n`
})
if (redisDataSource.redisClient) {
@ -311,7 +312,7 @@ const main = async () => {
)
if (cursor != '0') {
output += `# TYPE omnivore_read_position_messages gauge\n`
output += `omnivore_read_position_messages{queue="${QUEUE_NAME}"} ${10_001}\n`
output += `omnivore_read_position_messages{queue="${BACKEND_QUEUE_NAME}"} ${10_001}\n`
} else if (batch) {
output += `# TYPE omnivore_read_position_messages gauge\n`
output += `omnivore_read_position_messages{} ${batch.length}\n`
@ -324,10 +325,10 @@ const main = async () => {
const currentTime = Date.now()
const ageInSeconds = (currentTime - oldestJobs[0].timestamp) / 1000
output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n`
output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE_NAME}"} ${ageInSeconds}\n`
output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${BACKEND_QUEUE_NAME}"} ${ageInSeconds}\n`
} else {
output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n`
output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE_NAME}"} ${0}\n`
output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${BACKEND_QUEUE_NAME}"} ${0}\n`
}
const metrics = await getMetrics()
@ -357,7 +358,7 @@ const main = async () => {
await setupCronJobs()
const queueEvents = new QueueEvents(QUEUE_NAME, {
const queueEvents = new QueueEvents(BACKEND_QUEUE_NAME, {
connection: workerRedisClient,
})