Add queue manager service

This commit is contained in:
Jackson Harper
2023-10-16 20:11:51 +08:00
parent c34c547a1d
commit e4332b74f9
7 changed files with 271 additions and 0 deletions

View File

@ -0,0 +1,26 @@
FROM node:14.18-alpine
# Run everything after as non-privileged user.
WORKDIR /app
COPY package.json .
COPY yarn.lock .
COPY tsconfig.json .
COPY .eslintrc .
COPY /packages/queue-manager/package.json ./packages/queue-manager/package.json
RUN yarn install --pure-lockfile
ADD /packages/queue-manager ./packages/queue-manager
RUN yarn workspace @omnivore/queue-manager build
# After building, fetch the production dependencies
RUN rm -rf /app/packages/queue-manager/node_modules
RUN rm -rf /app/node_modules
RUN yarn install --pure-lockfile --production
EXPOSE 8080
CMD ["yarn", "workspace", "@omnivore/queue-manager", "start"]

View File

@ -0,0 +1,5 @@
{
"extension": ["ts"],
"spec": "test/**/*.test.ts",
"require": "test/babel-register.js"
}

View File

@ -0,0 +1,31 @@
{
"name": "@omnivore/queue-manager",
"version": "1.0.0",
"main": "build/src/index.js",
"files": [
"build/src"
],
"license": "Apache-2.0",
"scripts": {
"test": "yarn mocha -r ts-node/register --config mocha-config.json",
"lint": "eslint src --ext ts,js,tsx,jsx",
"compile": "tsc",
"build": "tsc",
"start": "functions-framework --target=queueManager",
"dev": "concurrently \"tsc -w\" \"nodemon --watch ./build/ --exec npm run start\""
},
"devDependencies": {
"chai": "^4.3.6",
"eslint-plugin-prettier": "^4.0.0",
"mocha": "^10.0.0"
},
"dependencies": {
"@google-cloud/functions-framework": "3.1.2",
"@google-cloud/monitoring": "^4.0.0",
"@google-cloud/tasks": "^4.0.0",
"@sentry/serverless": "^6.16.1",
"axios": "^1.4.0",
"dotenv": "^16.0.1",
"jsonwebtoken": "^8.5.1"
}
}

View File

@ -0,0 +1,190 @@
import { MetricServiceClient } from '@google-cloud/monitoring'
import { v2beta3 } from '@google-cloud/tasks'
import fetch from 'node-fetch'
import * as dotenv from 'dotenv'
import * as Sentry from '@sentry/serverless'
dotenv.config()
Sentry.GCPFunction.init({
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 0,
})
const PROJECT_ID = process.env.PROJECT_ID
const LOCATION = 'us-west2'
const IMPORT_QUEUE_NAME = process.env.IMPORT_QUEUE_NAME
const RSS_QUEUE_NAME = process.env.RSS_FEED_QUEUE_NAME
const QUEUE_NAMES = [IMPORT_QUEUE_NAME, RSS_QUEUE_NAME]
const DISCORD_WEBHOOK_URL = process.env.DISCORD_WEBHOOK_URL
const METRICS_FILTER = `metric.type="appengine.googleapis.com/http/server/response_latencies" metric.labels.response_code="200"`
if (
!PROJECT_ID ||
!IMPORT_QUEUE_NAME ||
!RSS_QUEUE_NAME ||
!DISCORD_WEBHOOK_URL
) {
throw new Error('environment not supplied.')
}
const LATENCY_THRESHOLD = 500
const RSS_QUEUE_THRESHOLD = 20_000
const IMPORT_QUEUE_THRESHOLD = 100_000
const postToDiscord = async (message: string) => {
console.log('notify message', { message })
const payload = {
content: message,
}
try {
const response = await fetch(DISCORD_WEBHOOK_URL, {
method: 'POST',
body: JSON.stringify(payload),
headers: {
'Content-Type': 'application/json',
},
})
if (!response.ok) {
throw new Error(`Discord response was not ok: ${response.statusText}`)
}
} catch (error) {
console.error('Failed to post message to Discord:', error)
}
}
const checkShouldPauseQueues = async () => {
const now = Date.now()
const client = new MetricServiceClient()
// Query for the metrics from the last 5 minutes
const [timeSeries] = await client.listTimeSeries({
name: client.projectPath(PROJECT_ID),
filter: METRICS_FILTER,
interval: {
startTime: {
seconds: Math.floor(now / 1000 - 5 * 60),
},
endTime: {
seconds: Math.floor(now / 1000),
},
},
aggregation: {
alignmentPeriod: {
seconds: 300,
},
perSeriesAligner: 'ALIGN_PERCENTILE_95',
},
})
let shouldPauseQueues = false
for (const ts of timeSeries) {
// We only want to look at the backend service right now
if (
!ts.resource ||
!ts.resource.labels ||
!ts.resource.labels['module_id'] ||
!ts.resource.labels['module_id'].startsWith('backend')
) {
continue
}
if (ts.points && ts.points.length) {
const avgLatency =
ts.points.reduce(
(acc, point) => acc + (point.value?.doubleValue ?? 0),
0
) / ts.points.length
console.log('avgLatency: ', avgLatency)
if (avgLatency > LATENCY_THRESHOLD) {
shouldPauseQueues = true
break
}
}
}
return shouldPauseQueues
}
const getQueueTaskCount = async (queueName: string) => {
const cloudTasksClient = new v2beta3.CloudTasksClient()
const queuePath = cloudTasksClient.queuePath(PROJECT_ID, LOCATION, queueName)
const [queue] = await cloudTasksClient.getQueue({
name: queuePath,
readMask: { paths: ['name', 'stats'] },
})
console.log(' queue.stats', { stats: queue.stats })
if (Number.isNaN(queue.stats?.tasksCount)) {
return 0
}
return Number(queue.stats?.tasksCount)
}
const pauseQueues = async () => {
const cloudTasksClient = new v2beta3.CloudTasksClient()
await Promise.all([
cloudTasksClient.pauseQueue({
name: cloudTasksClient.queuePath(PROJECT_ID, LOCATION, RSS_QUEUE_NAME),
}),
cloudTasksClient.pauseQueue({
name: cloudTasksClient.queuePath(PROJECT_ID, LOCATION, IMPORT_QUEUE_NAME),
}),
])
}
async function checkMetricsAndPauseQueues() {
if (
!PROJECT_ID ||
!IMPORT_QUEUE_NAME ||
!RSS_QUEUE_NAME ||
!DISCORD_WEBHOOK_URL
) {
throw new Error('environment not supplied.')
}
const shouldPauseQueues = await checkShouldPauseQueues()
if (shouldPauseQueues) {
const rssQueueCount = await getQueueTaskCount(RSS_QUEUE_NAME)
const importQueueCount = await getQueueTaskCount(IMPORT_QUEUE_NAME)
const message = `Both queues have been paused due to API latency threshold exceedance.\n\t-The RSS queue currently has ${rssQueueCount} tasks.\n\t-The import queue currently has ${importQueueCount} pending tasks.`
// Pause the two queues
await pauseQueues()
// Post to Discord server using webhook
await postToDiscord(message)
} else {
const rssQueueCount = await getQueueTaskCount(RSS_QUEUE_NAME)
const importQueueCount = await getQueueTaskCount(IMPORT_QUEUE_NAME)
if (rssQueueCount > RSS_QUEUE_THRESHOLD) {
await postToDiscord(
`The RSS queue has exceeded it's threshold, it has ${rssQueueCount} items in it.`
)
}
if (importQueueCount > IMPORT_QUEUE_THRESHOLD) {
await postToDiscord(
`The import queue has exceeded it's threshold, it has ${importQueueCount} items in it.`
)
}
}
}
export const queueManager = Sentry.GCPFunction.wrapHttpFunction(
async (req, res) => {
try {
checkMetricsAndPauseQueues()
res.send('ok')
} catch (e) {
console.error('Error while parsing RSS feed', e)
res.status(500).send('INTERNAL_SERVER_ERROR')
}
}
)

View File

@ -0,0 +1,3 @@
const register = require('@babel/register').default
register({ extensions: ['.ts', '.tsx', '.js', '.jsx'] })

View File

@ -0,0 +1,8 @@
import 'mocha'
import { expect } from 'chai'
describe('stub test', () => {
it('should pass', () => {
expect(true).to.be.true
})
})

View File

@ -0,0 +1,8 @@
{
"extends": "./../../tsconfig.json",
"compilerOptions": {
"outDir": "build",
"rootDir": "."
},
"include": ["src"]
}