Files
omnivore/packages/queue-manager/src/index.ts
2023-10-18 09:11:11 +08:00

197 lines
5.4 KiB
TypeScript

import { MetricServiceClient } from '@google-cloud/monitoring'
import { v2beta3 } from '@google-cloud/tasks'
import fetch from 'node-fetch'
import * as dotenv from 'dotenv'
import * as Sentry from '@sentry/serverless'
dotenv.config()
Sentry.GCPFunction.init({
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 0,
})
const PROJECT_ID = process.env.GCP_PROJECT_ID
const LOCATION = 'us-west2'
const IMPORT_QUEUE_NAME = process.env.IMPORT_QUEUE_NAME
const RSS_QUEUE_NAME = process.env.RSS_FEED_QUEUE_NAME
const QUEUE_NAMES = [IMPORT_QUEUE_NAME, RSS_QUEUE_NAME]
const DISCORD_WEBHOOK_URL = process.env.DISCORD_WEBHOOK_URL
const METRICS_FILTER = `metric.type="appengine.googleapis.com/http/server/response_latencies" metric.labels.response_code="200"`
if (
!PROJECT_ID ||
!IMPORT_QUEUE_NAME ||
!RSS_QUEUE_NAME ||
!DISCORD_WEBHOOK_URL
) {
throw new Error('environment not supplied.')
}
const LATENCY_THRESHOLD = 500
const RSS_QUEUE_THRESHOLD = 20_000
const IMPORT_QUEUE_THRESHOLD = 250_000
const postToDiscord = async (message: string) => {
console.log('notify message', { message })
const payload = {
content: message,
}
try {
const response = await fetch(DISCORD_WEBHOOK_URL, {
method: 'POST',
body: JSON.stringify(payload),
headers: {
'Content-Type': 'application/json',
},
})
if (!response.ok) {
throw new Error(`Discord response was not ok: ${response.statusText}`)
}
} catch (error) {
console.error('Failed to post message to Discord:', error)
}
}
const checkShouldPauseQueues = async () => {
const now = Date.now()
const client = new MetricServiceClient()
// Query for the metrics from the last 5 minutes
const [timeSeries] = await client.listTimeSeries({
name: client.projectPath(PROJECT_ID),
filter: METRICS_FILTER,
interval: {
startTime: {
seconds: Math.floor(now / 1000 - 5 * 60),
},
endTime: {
seconds: Math.floor(now / 1000),
},
},
aggregation: {
alignmentPeriod: {
seconds: 300,
},
perSeriesAligner: 'ALIGN_PERCENTILE_95',
},
})
for (const ts of timeSeries) {
// We only want to look at the backend service right now
if (
!ts.resource ||
!ts.resource.labels ||
!ts.resource.labels['module_id'] ||
!ts.resource.labels['module_id'].startsWith('backend')
) {
continue
}
if (ts.points && ts.points.length) {
const avgLatency =
ts.points.reduce(
(acc, point) => acc + (point.value?.doubleValue ?? 0),
0
) / ts.points.length
if (avgLatency > LATENCY_THRESHOLD) {
return { shouldPauseQueues: true, avgLatency: avgLatency }
}
}
}
return { shouldPauseQueues: false, avgLatency: 0 }
}
const getQueueTaskCount = async (queueName: string) => {
const cloudTasksClient = new v2beta3.CloudTasksClient()
const queuePath = cloudTasksClient.queuePath(PROJECT_ID, LOCATION, queueName)
const [queue] = await cloudTasksClient.getQueue({
name: queuePath,
readMask: { paths: ['name', 'stats'] },
})
console.log(' queue.stats', { stats: queue.stats })
if (Number.isNaN(queue.stats?.tasksCount)) {
return 0
}
return Number(queue.stats?.tasksCount)
}
const pauseQueues = async () => {
const cloudTasksClient = new v2beta3.CloudTasksClient()
await Promise.all([
cloudTasksClient.pauseQueue({
name: cloudTasksClient.queuePath(PROJECT_ID, LOCATION, RSS_QUEUE_NAME),
}),
cloudTasksClient.pauseQueue({
name: cloudTasksClient.queuePath(PROJECT_ID, LOCATION, IMPORT_QUEUE_NAME),
}),
])
}
async function checkMetricsAndPauseQueues() {
if (
!PROJECT_ID ||
!IMPORT_QUEUE_NAME ||
!RSS_QUEUE_NAME ||
!DISCORD_WEBHOOK_URL
) {
throw new Error('environment not supplied.')
}
const { shouldPauseQueues, avgLatency } = await checkShouldPauseQueues()
if (shouldPauseQueues) {
let rssQueueCount: number | string = 'unknown'
let importQueueCount: number | string = 'unknown'
try {
rssQueueCount = await getQueueTaskCount(RSS_QUEUE_NAME)
importQueueCount = await getQueueTaskCount(IMPORT_QUEUE_NAME)
} catch (err) {
console.log('error fetching queue counts', err)
}
const message = `Both queues have been paused due to API latency threshold exceedance (${avgLatency}).\n\t-The RSS queue currently has ${rssQueueCount} tasks.\n\t-The import queue currently has ${importQueueCount} pending tasks.`
await pauseQueues()
await postToDiscord(message)
} else {
try {
const rssQueueCount = await getQueueTaskCount(RSS_QUEUE_NAME)
const importQueueCount = await getQueueTaskCount(IMPORT_QUEUE_NAME)
if (rssQueueCount > RSS_QUEUE_THRESHOLD) {
await postToDiscord(
`The RSS queue has exceeded it's threshold, it has ${rssQueueCount} items in it.`
)
}
if (importQueueCount > IMPORT_QUEUE_THRESHOLD) {
await postToDiscord(
`The import queue has exceeded it's threshold, it has ${importQueueCount} items in it.`
)
}
} catch (err) {
console.log('error getting queue counts')
}
}
}
export const queueManager = Sentry.GCPFunction.wrapHttpFunction(
async (req, res) => {
try {
if (req.query['check']) {
await checkMetricsAndPauseQueues()
}
res.send('ok')
} catch (e) {
console.error('Error while parsing RSS feed', e)
res.status(500).send('INTERNAL_SERVER_ERROR')
}
}
)