Merge pull request #2945 from omnivore-app/feat/api-queue-manager
Add queue manager service
This commit is contained in:
2
packages/queue-manager/.eslintignore
Normal file
2
packages/queue-manager/.eslintignore
Normal file
@ -0,0 +1,2 @@
|
||||
node_modules/
|
||||
build/
|
||||
6
packages/queue-manager/.eslintrc
Normal file
6
packages/queue-manager/.eslintrc
Normal file
@ -0,0 +1,6 @@
|
||||
{
|
||||
"extends": "../../.eslintrc",
|
||||
"parserOptions": {
|
||||
"project": "tsconfig.json"
|
||||
}
|
||||
}
|
||||
26
packages/queue-manager/Dockerfile
Normal file
26
packages/queue-manager/Dockerfile
Normal file
@ -0,0 +1,26 @@
|
||||
FROM node:14.18-alpine
|
||||
|
||||
# Run everything after as non-privileged user.
|
||||
WORKDIR /app
|
||||
|
||||
COPY package.json .
|
||||
COPY yarn.lock .
|
||||
COPY tsconfig.json .
|
||||
COPY .eslintrc .
|
||||
|
||||
COPY /packages/queue-manager/package.json ./packages/queue-manager/package.json
|
||||
|
||||
RUN yarn install --pure-lockfile
|
||||
|
||||
ADD /packages/queue-manager ./packages/queue-manager
|
||||
RUN yarn workspace @omnivore/queue-manager build
|
||||
|
||||
# After building, fetch the production dependencies
|
||||
RUN rm -rf /app/packages/queue-manager/node_modules
|
||||
RUN rm -rf /app/node_modules
|
||||
RUN yarn install --pure-lockfile --production
|
||||
|
||||
EXPOSE 8080
|
||||
|
||||
CMD ["yarn", "workspace", "@omnivore/queue-manager", "start"]
|
||||
|
||||
5
packages/queue-manager/mocha-config.json
Normal file
5
packages/queue-manager/mocha-config.json
Normal file
@ -0,0 +1,5 @@
|
||||
{
|
||||
"extension": ["ts"],
|
||||
"spec": "test/**/*.test.ts",
|
||||
"require": "test/babel-register.js"
|
||||
}
|
||||
32
packages/queue-manager/package.json
Normal file
32
packages/queue-manager/package.json
Normal file
@ -0,0 +1,32 @@
|
||||
{
|
||||
"name": "@omnivore/queue-manager",
|
||||
"version": "1.0.0",
|
||||
"main": "build/src/index.js",
|
||||
"files": [
|
||||
"build/src"
|
||||
],
|
||||
"license": "Apache-2.0",
|
||||
"scripts": {
|
||||
"test": "yarn mocha -r ts-node/register --config mocha-config.json",
|
||||
"lint": "eslint src --ext ts,js,tsx,jsx",
|
||||
"compile": "tsc",
|
||||
"build": "tsc",
|
||||
"start": "functions-framework --target=queueManager",
|
||||
"dev": "concurrently \"tsc -w\" \"nodemon --watch ./build/ --exec npm run start\""
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/node-fetch": "^2.6.6",
|
||||
"chai": "^4.3.6",
|
||||
"eslint-plugin-prettier": "^4.0.0",
|
||||
"mocha": "^10.0.0"
|
||||
},
|
||||
"dependencies": {
|
||||
"@google-cloud/functions-framework": "3.1.2",
|
||||
"@google-cloud/monitoring": "^4.0.0",
|
||||
"@google-cloud/tasks": "^4.0.0",
|
||||
"@sentry/serverless": "^6.16.1",
|
||||
"axios": "^1.4.0",
|
||||
"dotenv": "^16.0.1",
|
||||
"jsonwebtoken": "^8.5.1"
|
||||
}
|
||||
}
|
||||
200
packages/queue-manager/src/index.ts
Normal file
200
packages/queue-manager/src/index.ts
Normal file
@ -0,0 +1,200 @@
|
||||
import { MetricServiceClient } from '@google-cloud/monitoring'
|
||||
import { v2beta3 } from '@google-cloud/tasks'
|
||||
import fetch from 'node-fetch'
|
||||
import * as dotenv from 'dotenv'
|
||||
|
||||
import * as Sentry from '@sentry/serverless'
|
||||
|
||||
dotenv.config()
|
||||
Sentry.GCPFunction.init({
|
||||
dsn: process.env.SENTRY_DSN,
|
||||
tracesSampleRate: 0,
|
||||
})
|
||||
|
||||
const PROJECT_ID = process.env.GCP_PROJECT_ID
|
||||
const LOCATION = 'us-west2'
|
||||
const IMPORT_QUEUE_NAME = process.env.IMPORT_QUEUE_NAME
|
||||
const RSS_QUEUE_NAME = process.env.RSS_FEED_QUEUE_NAME
|
||||
const QUEUE_NAMES = [IMPORT_QUEUE_NAME, RSS_QUEUE_NAME]
|
||||
const DISCORD_WEBHOOK_URL = process.env.DISCORD_WEBHOOK_URL
|
||||
const METRICS_FILTER = `metric.type="appengine.googleapis.com/http/server/response_latencies" metric.labels.response_code="200"`
|
||||
|
||||
if (
|
||||
!PROJECT_ID ||
|
||||
!IMPORT_QUEUE_NAME ||
|
||||
!RSS_QUEUE_NAME ||
|
||||
!DISCORD_WEBHOOK_URL
|
||||
) {
|
||||
throw new Error('environment not supplied.')
|
||||
}
|
||||
|
||||
const LATENCY_THRESHOLD = 500
|
||||
const RSS_QUEUE_THRESHOLD = 20_000
|
||||
const IMPORT_QUEUE_THRESHOLD = 200_000
|
||||
|
||||
const postToDiscord = async (message: string) => {
|
||||
console.log('notify message', { message })
|
||||
const payload = {
|
||||
content: message,
|
||||
}
|
||||
|
||||
try {
|
||||
const response = await fetch(DISCORD_WEBHOOK_URL, {
|
||||
method: 'POST',
|
||||
body: JSON.stringify(payload),
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(`Discord response was not ok: ${response.statusText}`)
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Failed to post message to Discord:', error)
|
||||
}
|
||||
}
|
||||
|
||||
const checkShouldPauseQueues = async () => {
|
||||
const now = Date.now()
|
||||
const client = new MetricServiceClient()
|
||||
|
||||
// Query for the metrics from the last 5 minutes
|
||||
const [timeSeries] = await client.listTimeSeries({
|
||||
name: client.projectPath(PROJECT_ID),
|
||||
filter: METRICS_FILTER,
|
||||
interval: {
|
||||
startTime: {
|
||||
seconds: Math.floor(now / 1000 - 5 * 60),
|
||||
},
|
||||
endTime: {
|
||||
seconds: Math.floor(now / 1000),
|
||||
},
|
||||
},
|
||||
aggregation: {
|
||||
alignmentPeriod: {
|
||||
seconds: 300,
|
||||
},
|
||||
perSeriesAligner: 'ALIGN_PERCENTILE_95',
|
||||
},
|
||||
})
|
||||
|
||||
let shouldPauseQueues = false
|
||||
|
||||
for (const ts of timeSeries) {
|
||||
// We only want to look at the backend service right now
|
||||
if (
|
||||
!ts.resource ||
|
||||
!ts.resource.labels ||
|
||||
!ts.resource.labels['module_id'] ||
|
||||
!ts.resource.labels['module_id'].startsWith('backend')
|
||||
) {
|
||||
continue
|
||||
}
|
||||
|
||||
if (ts.points && ts.points.length) {
|
||||
const avgLatency =
|
||||
ts.points.reduce(
|
||||
(acc, point) => acc + (point.value?.doubleValue ?? 0),
|
||||
0
|
||||
) / ts.points.length
|
||||
console.log('avgLatency: ', avgLatency)
|
||||
if (avgLatency > LATENCY_THRESHOLD) {
|
||||
shouldPauseQueues = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return shouldPauseQueues
|
||||
}
|
||||
|
||||
const getQueueTaskCount = async (queueName: string) => {
|
||||
const cloudTasksClient = new v2beta3.CloudTasksClient()
|
||||
const queuePath = cloudTasksClient.queuePath(PROJECT_ID, LOCATION, queueName)
|
||||
const [queue] = await cloudTasksClient.getQueue({
|
||||
name: queuePath,
|
||||
readMask: { paths: ['name', 'stats'] },
|
||||
})
|
||||
|
||||
console.log(' queue.stats', { stats: queue.stats })
|
||||
if (Number.isNaN(queue.stats?.tasksCount)) {
|
||||
return 0
|
||||
}
|
||||
return Number(queue.stats?.tasksCount)
|
||||
}
|
||||
|
||||
const pauseQueues = async () => {
|
||||
const cloudTasksClient = new v2beta3.CloudTasksClient()
|
||||
|
||||
await Promise.all([
|
||||
cloudTasksClient.pauseQueue({
|
||||
name: cloudTasksClient.queuePath(PROJECT_ID, LOCATION, RSS_QUEUE_NAME),
|
||||
}),
|
||||
cloudTasksClient.pauseQueue({
|
||||
name: cloudTasksClient.queuePath(PROJECT_ID, LOCATION, IMPORT_QUEUE_NAME),
|
||||
}),
|
||||
])
|
||||
}
|
||||
|
||||
async function checkMetricsAndPauseQueues() {
|
||||
if (
|
||||
!PROJECT_ID ||
|
||||
!IMPORT_QUEUE_NAME ||
|
||||
!RSS_QUEUE_NAME ||
|
||||
!DISCORD_WEBHOOK_URL
|
||||
) {
|
||||
throw new Error('environment not supplied.')
|
||||
}
|
||||
|
||||
const shouldPauseQueues = await checkShouldPauseQueues()
|
||||
|
||||
if (shouldPauseQueues) {
|
||||
let rssQueueCount: number | string = 'unknown'
|
||||
let importQueueCount: number | string = 'unknown'
|
||||
try {
|
||||
rssQueueCount = await getQueueTaskCount(RSS_QUEUE_NAME)
|
||||
importQueueCount = await getQueueTaskCount(IMPORT_QUEUE_NAME)
|
||||
} catch (err) {
|
||||
console.log('error fetching queue counts', err)
|
||||
}
|
||||
|
||||
const message = `Both queues have been paused due to API latency threshold exceedance.\n\t-The RSS queue currently has ${rssQueueCount} tasks.\n\t-The import queue currently has ${importQueueCount} pending tasks.`
|
||||
|
||||
await pauseQueues()
|
||||
await postToDiscord(message)
|
||||
} else {
|
||||
try {
|
||||
const rssQueueCount = await getQueueTaskCount(RSS_QUEUE_NAME)
|
||||
const importQueueCount = await getQueueTaskCount(IMPORT_QUEUE_NAME)
|
||||
|
||||
if (rssQueueCount > RSS_QUEUE_THRESHOLD) {
|
||||
await postToDiscord(
|
||||
`The RSS queue has exceeded it's threshold, it has ${rssQueueCount} items in it.`
|
||||
)
|
||||
}
|
||||
|
||||
if (importQueueCount > IMPORT_QUEUE_THRESHOLD) {
|
||||
await postToDiscord(
|
||||
`The import queue has exceeded it's threshold, it has ${importQueueCount} items in it.`
|
||||
)
|
||||
}
|
||||
} catch (err) {
|
||||
console.log('error getting queue counts')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const queueManager = Sentry.GCPFunction.wrapHttpFunction(
|
||||
async (req, res) => {
|
||||
try {
|
||||
if (req.query['check']) {
|
||||
await checkMetricsAndPauseQueues()
|
||||
}
|
||||
res.send('ok')
|
||||
} catch (e) {
|
||||
console.error('Error while parsing RSS feed', e)
|
||||
res.status(500).send('INTERNAL_SERVER_ERROR')
|
||||
}
|
||||
}
|
||||
)
|
||||
3
packages/queue-manager/test/babel-register.js
Normal file
3
packages/queue-manager/test/babel-register.js
Normal file
@ -0,0 +1,3 @@
|
||||
const register = require('@babel/register').default
|
||||
|
||||
register({ extensions: ['.ts', '.tsx', '.js', '.jsx'] })
|
||||
8
packages/queue-manager/test/stub.test.ts
Normal file
8
packages/queue-manager/test/stub.test.ts
Normal file
@ -0,0 +1,8 @@
|
||||
import 'mocha'
|
||||
import { expect } from 'chai'
|
||||
|
||||
describe('stub test', () => {
|
||||
it('should pass', () => {
|
||||
expect(true).to.be.true
|
||||
})
|
||||
})
|
||||
8
packages/queue-manager/tsconfig.json
Normal file
8
packages/queue-manager/tsconfig.json
Normal file
@ -0,0 +1,8 @@
|
||||
{
|
||||
"extends": "./../../tsconfig.json",
|
||||
"compilerOptions": {
|
||||
"outDir": "build",
|
||||
"rootDir": "."
|
||||
},
|
||||
"include": ["src"]
|
||||
}
|
||||
Reference in New Issue
Block a user