diff --git a/packages/api/src/jobs/update_home.ts b/packages/api/src/jobs/update_home.ts index 02f994770..6c8ddad54 100644 --- a/packages/api/src/jobs/update_home.ts +++ b/packages/api/src/jobs/update_home.ts @@ -1,7 +1,9 @@ +import client from 'prom-client' import { LibraryItem } from '../entity/library_item' import { PublicItem } from '../entity/public_item' import { Subscription } from '../entity/subscription' import { User } from '../entity/user' +import { registerMetric } from '../prometheus' import { redisDataSource } from '../redis_data_source' import { findUnseenPublicItems } from '../services/home' import { searchLibraryItems } from '../services/library_item' @@ -434,6 +436,18 @@ const mixHomeItems = ( return sections } +// use prometheus to monitor the latency of each step +const latency = new client.Histogram({ + name: 'update_home_latency', + help: 'Latency of update home job', + labelNames: ['step'], + buckets: [0.1, 0.5, 1, 2, 5, 10], +}) + +registerMetric(latency) + +latency.observe(10) + export const updateHome = async (data: UpdateHomeJobData) => { const { userId, cursor } = data logger.info('Updating home for user', data) @@ -447,22 +461,19 @@ export const updateHome = async (data: UpdateHomeJobData) => { logger.info(`Updating home for user ${userId}`) - logger.profile('justAdded') + let end = latency.startTimer({ step: 'justAdded' }) const justAddedCandidates = await getJustAddedCandidates(userId) - logger.profile('justAdded', { - level: 'info', - message: `Found ${justAddedCandidates.length} just added candidates`, - }) + end() - logger.profile('selecting') + logger.info(`Found ${justAddedCandidates.length} just added candidates`) + + end = latency.startTimer({ step: 'select' }) const candidates = await selectCandidates( user, justAddedCandidates.map((c) => c.id) ) - logger.profile('selecting', { - level: 'info', - message: `Found ${candidates.length} candidates`, - }) + end() + logger.info(`Found ${candidates.length} candidates`) if (!justAddedCandidates.length && !candidates.length) { logger.info('No candidates found') @@ -471,26 +482,21 @@ export const updateHome = async (data: UpdateHomeJobData) => { // TODO: integrity check on candidates - logger.profile('ranking') + end = latency.startTimer({ step: 'ranking' }) const rankedCandidates = await rankCandidates(userId, candidates) - logger.profile('ranking', { - level: 'info', - message: `Ranked ${rankedCandidates.length} candidates`, - }) + end() - logger.profile('mixing') + logger.info(`Ranked ${rankedCandidates.length} candidates`) + + end = latency.startTimer({ step: 'mixing' }) const sections = mixHomeItems(justAddedCandidates, rankedCandidates) - logger.profile('mixing', { - level: 'info', - message: `Created ${sections.length} sections`, - }) + end() - logger.profile('saving') + logger.info(`Mixed ${sections.length} sections`) + + end = latency.startTimer({ step: 'saving' }) await appendSectionsToHome(userId, sections, cursor) - logger.profile('saving', { - level: 'info', - message: 'Sections appended to home', - }) + end() logger.info('Home updated for user', { userId }) } catch (error) { diff --git a/packages/api/src/prometheus.ts b/packages/api/src/prometheus.ts new file mode 100644 index 000000000..ffaa0516c --- /dev/null +++ b/packages/api/src/prometheus.ts @@ -0,0 +1,8 @@ +import client, { Metric } from 'prom-client' + +const registry = new client.Registry() + +export const registerMetric = (metric: Metric) => + registry.registerMetric(metric) + +export const getMetrics = async () => registry.metrics() diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index f77f92a55..42b775845 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -70,6 +70,7 @@ import { import { updateHome, UPDATE_HOME_JOB } from './jobs/update_home' import { updatePDFContentJob } from './jobs/update_pdf_content' import { uploadContentJob, UPLOAD_CONTENT_JOB } from './jobs/upload_content' +import { getMetrics } from './prometheus' import { redisDataSource } from './redis_data_source' import { CACHED_READING_POSITION_PREFIX } from './services/cached_reading_position' import { getJobPriority } from './utils/createTask' @@ -258,10 +259,15 @@ const main = async () => { } let output = '' - const metrics: JobType[] = ['active', 'failed', 'completed', 'prioritized'] - const counts = await queue.getJobCounts(...metrics) + const jobsTypes: JobType[] = [ + 'active', + 'failed', + 'completed', + 'prioritized', + ] + const counts = await queue.getJobCounts(...jobsTypes) - metrics.forEach((metric, idx) => { + jobsTypes.forEach((metric, idx) => { output += `# TYPE omnivore_queue_messages_${metric} gauge\n` output += `omnivore_queue_messages_${metric}{queue="${QUEUE_NAME}"} ${counts[metric]}\n` }) @@ -298,7 +304,12 @@ const main = async () => { output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE_NAME}"} ${0}\n` } - res.status(200).setHeader('Content-Type', 'text/plain').send(output) + const metrics = await getMetrics() + + res + .status(200) + .setHeader('Content-Type', 'text/plain') + .send(output + metrics) }) const server = app.listen(port, () => {