monitor home feed job latency in prometheus

This commit is contained in:
Hongbo Wu
2024-06-12 13:01:36 +08:00
parent aa11fd6591
commit f052ab55a0
3 changed files with 54 additions and 29 deletions

View File

@ -1,7 +1,9 @@
import client from 'prom-client'
import { LibraryItem } from '../entity/library_item'
import { PublicItem } from '../entity/public_item'
import { Subscription } from '../entity/subscription'
import { User } from '../entity/user'
import { registerMetric } from '../prometheus'
import { redisDataSource } from '../redis_data_source'
import { findUnseenPublicItems } from '../services/home'
import { searchLibraryItems } from '../services/library_item'
@ -434,6 +436,18 @@ const mixHomeItems = (
return sections
}
// use prometheus to monitor the latency of each step
const latency = new client.Histogram({
name: 'update_home_latency',
help: 'Latency of update home job',
labelNames: ['step'],
buckets: [0.1, 0.5, 1, 2, 5, 10],
})
registerMetric(latency)
latency.observe(10)
export const updateHome = async (data: UpdateHomeJobData) => {
const { userId, cursor } = data
logger.info('Updating home for user', data)
@ -447,22 +461,19 @@ export const updateHome = async (data: UpdateHomeJobData) => {
logger.info(`Updating home for user ${userId}`)
logger.profile('justAdded')
let end = latency.startTimer({ step: 'justAdded' })
const justAddedCandidates = await getJustAddedCandidates(userId)
logger.profile('justAdded', {
level: 'info',
message: `Found ${justAddedCandidates.length} just added candidates`,
})
end()
logger.profile('selecting')
logger.info(`Found ${justAddedCandidates.length} just added candidates`)
end = latency.startTimer({ step: 'select' })
const candidates = await selectCandidates(
user,
justAddedCandidates.map((c) => c.id)
)
logger.profile('selecting', {
level: 'info',
message: `Found ${candidates.length} candidates`,
})
end()
logger.info(`Found ${candidates.length} candidates`)
if (!justAddedCandidates.length && !candidates.length) {
logger.info('No candidates found')
@ -471,26 +482,21 @@ export const updateHome = async (data: UpdateHomeJobData) => {
// TODO: integrity check on candidates
logger.profile('ranking')
end = latency.startTimer({ step: 'ranking' })
const rankedCandidates = await rankCandidates(userId, candidates)
logger.profile('ranking', {
level: 'info',
message: `Ranked ${rankedCandidates.length} candidates`,
})
end()
logger.profile('mixing')
logger.info(`Ranked ${rankedCandidates.length} candidates`)
end = latency.startTimer({ step: 'mixing' })
const sections = mixHomeItems(justAddedCandidates, rankedCandidates)
logger.profile('mixing', {
level: 'info',
message: `Created ${sections.length} sections`,
})
end()
logger.profile('saving')
logger.info(`Mixed ${sections.length} sections`)
end = latency.startTimer({ step: 'saving' })
await appendSectionsToHome(userId, sections, cursor)
logger.profile('saving', {
level: 'info',
message: 'Sections appended to home',
})
end()
logger.info('Home updated for user', { userId })
} catch (error) {

View File

@ -0,0 +1,8 @@
import client, { Metric } from 'prom-client'
const registry = new client.Registry()
export const registerMetric = (metric: Metric) =>
registry.registerMetric(metric)
export const getMetrics = async () => registry.metrics()

View File

@ -70,6 +70,7 @@ import {
import { updateHome, UPDATE_HOME_JOB } from './jobs/update_home'
import { updatePDFContentJob } from './jobs/update_pdf_content'
import { uploadContentJob, UPLOAD_CONTENT_JOB } from './jobs/upload_content'
import { getMetrics } from './prometheus'
import { redisDataSource } from './redis_data_source'
import { CACHED_READING_POSITION_PREFIX } from './services/cached_reading_position'
import { getJobPriority } from './utils/createTask'
@ -258,10 +259,15 @@ const main = async () => {
}
let output = ''
const metrics: JobType[] = ['active', 'failed', 'completed', 'prioritized']
const counts = await queue.getJobCounts(...metrics)
const jobsTypes: JobType[] = [
'active',
'failed',
'completed',
'prioritized',
]
const counts = await queue.getJobCounts(...jobsTypes)
metrics.forEach((metric, idx) => {
jobsTypes.forEach((metric, idx) => {
output += `# TYPE omnivore_queue_messages_${metric} gauge\n`
output += `omnivore_queue_messages_${metric}{queue="${QUEUE_NAME}"} ${counts[metric]}\n`
})
@ -298,7 +304,12 @@ const main = async () => {
output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE_NAME}"} ${0}\n`
}
res.status(200).setHeader('Content-Type', 'text/plain').send(output)
const metrics = await getMetrics()
res
.status(200)
.setHeader('Content-Type', 'text/plain')
.send(output + metrics)
})
const server = app.listen(port, () => {