diff --git a/packages/queue-manager/Dockerfile b/packages/queue-manager/Dockerfile new file mode 100644 index 000000000..fe547f483 --- /dev/null +++ b/packages/queue-manager/Dockerfile @@ -0,0 +1,26 @@ +FROM node:14.18-alpine + +# Run everything after as non-privileged user. +WORKDIR /app + +COPY package.json . +COPY yarn.lock . +COPY tsconfig.json . +COPY .eslintrc . + +COPY /packages/queue-manager/package.json ./packages/queue-manager/package.json + +RUN yarn install --pure-lockfile + +ADD /packages/queue-manager ./packages/queue-manager +RUN yarn workspace @omnivore/queue-manager build + +# After building, fetch the production dependencies +RUN rm -rf /app/packages/queue-manager/node_modules +RUN rm -rf /app/node_modules +RUN yarn install --pure-lockfile --production + +EXPOSE 8080 + +CMD ["yarn", "workspace", "@omnivore/queue-manager", "start"] + diff --git a/packages/queue-manager/mocha-config.json b/packages/queue-manager/mocha-config.json new file mode 100644 index 000000000..44d1d24c1 --- /dev/null +++ b/packages/queue-manager/mocha-config.json @@ -0,0 +1,5 @@ +{ + "extension": ["ts"], + "spec": "test/**/*.test.ts", + "require": "test/babel-register.js" + } \ No newline at end of file diff --git a/packages/queue-manager/package.json b/packages/queue-manager/package.json new file mode 100644 index 000000000..69c2630a8 --- /dev/null +++ b/packages/queue-manager/package.json @@ -0,0 +1,31 @@ +{ + "name": "@omnivore/queue-manager", + "version": "1.0.0", + "main": "build/src/index.js", + "files": [ + "build/src" + ], + "license": "Apache-2.0", + "scripts": { + "test": "yarn mocha -r ts-node/register --config mocha-config.json", + "lint": "eslint src --ext ts,js,tsx,jsx", + "compile": "tsc", + "build": "tsc", + "start": "functions-framework --target=queueManager", + "dev": "concurrently \"tsc -w\" \"nodemon --watch ./build/ --exec npm run start\"" + }, + "devDependencies": { + "chai": "^4.3.6", + "eslint-plugin-prettier": "^4.0.0", + "mocha": "^10.0.0" + }, + "dependencies": { + "@google-cloud/functions-framework": "3.1.2", + "@google-cloud/monitoring": "^4.0.0", + "@google-cloud/tasks": "^4.0.0", + "@sentry/serverless": "^6.16.1", + "axios": "^1.4.0", + "dotenv": "^16.0.1", + "jsonwebtoken": "^8.5.1" + } +} \ No newline at end of file diff --git a/packages/queue-manager/src/index.ts b/packages/queue-manager/src/index.ts new file mode 100644 index 000000000..f2b5791c5 --- /dev/null +++ b/packages/queue-manager/src/index.ts @@ -0,0 +1,190 @@ +import { MetricServiceClient } from '@google-cloud/monitoring' +import { v2beta3 } from '@google-cloud/tasks' +import fetch from 'node-fetch' +import * as dotenv from 'dotenv' + +import * as Sentry from '@sentry/serverless' + +dotenv.config() +Sentry.GCPFunction.init({ + dsn: process.env.SENTRY_DSN, + tracesSampleRate: 0, +}) + +const PROJECT_ID = process.env.PROJECT_ID +const LOCATION = 'us-west2' +const IMPORT_QUEUE_NAME = process.env.IMPORT_QUEUE_NAME +const RSS_QUEUE_NAME = process.env.RSS_FEED_QUEUE_NAME +const QUEUE_NAMES = [IMPORT_QUEUE_NAME, RSS_QUEUE_NAME] +const DISCORD_WEBHOOK_URL = process.env.DISCORD_WEBHOOK_URL +const METRICS_FILTER = `metric.type="appengine.googleapis.com/http/server/response_latencies" metric.labels.response_code="200"` + +if ( + !PROJECT_ID || + !IMPORT_QUEUE_NAME || + !RSS_QUEUE_NAME || + !DISCORD_WEBHOOK_URL +) { + throw new Error('environment not supplied.') +} + +const LATENCY_THRESHOLD = 500 +const RSS_QUEUE_THRESHOLD = 20_000 +const IMPORT_QUEUE_THRESHOLD = 100_000 + +const postToDiscord = async (message: string) => { + console.log('notify message', { message }) + const payload = { + content: message, + } + + try { + const response = await fetch(DISCORD_WEBHOOK_URL, { + method: 'POST', + body: JSON.stringify(payload), + headers: { + 'Content-Type': 'application/json', + }, + }) + + if (!response.ok) { + throw new Error(`Discord response was not ok: ${response.statusText}`) + } + } catch (error) { + console.error('Failed to post message to Discord:', error) + } +} + +const checkShouldPauseQueues = async () => { + const now = Date.now() + const client = new MetricServiceClient() + + // Query for the metrics from the last 5 minutes + const [timeSeries] = await client.listTimeSeries({ + name: client.projectPath(PROJECT_ID), + filter: METRICS_FILTER, + interval: { + startTime: { + seconds: Math.floor(now / 1000 - 5 * 60), + }, + endTime: { + seconds: Math.floor(now / 1000), + }, + }, + aggregation: { + alignmentPeriod: { + seconds: 300, + }, + perSeriesAligner: 'ALIGN_PERCENTILE_95', + }, + }) + + let shouldPauseQueues = false + + for (const ts of timeSeries) { + // We only want to look at the backend service right now + if ( + !ts.resource || + !ts.resource.labels || + !ts.resource.labels['module_id'] || + !ts.resource.labels['module_id'].startsWith('backend') + ) { + continue + } + + if (ts.points && ts.points.length) { + const avgLatency = + ts.points.reduce( + (acc, point) => acc + (point.value?.doubleValue ?? 0), + 0 + ) / ts.points.length + console.log('avgLatency: ', avgLatency) + if (avgLatency > LATENCY_THRESHOLD) { + shouldPauseQueues = true + break + } + } + } + + return shouldPauseQueues +} + +const getQueueTaskCount = async (queueName: string) => { + const cloudTasksClient = new v2beta3.CloudTasksClient() + const queuePath = cloudTasksClient.queuePath(PROJECT_ID, LOCATION, queueName) + const [queue] = await cloudTasksClient.getQueue({ + name: queuePath, + readMask: { paths: ['name', 'stats'] }, + }) + + console.log(' queue.stats', { stats: queue.stats }) + if (Number.isNaN(queue.stats?.tasksCount)) { + return 0 + } + return Number(queue.stats?.tasksCount) +} + +const pauseQueues = async () => { + const cloudTasksClient = new v2beta3.CloudTasksClient() + + await Promise.all([ + cloudTasksClient.pauseQueue({ + name: cloudTasksClient.queuePath(PROJECT_ID, LOCATION, RSS_QUEUE_NAME), + }), + cloudTasksClient.pauseQueue({ + name: cloudTasksClient.queuePath(PROJECT_ID, LOCATION, IMPORT_QUEUE_NAME), + }), + ]) +} + +async function checkMetricsAndPauseQueues() { + if ( + !PROJECT_ID || + !IMPORT_QUEUE_NAME || + !RSS_QUEUE_NAME || + !DISCORD_WEBHOOK_URL + ) { + throw new Error('environment not supplied.') + } + + const shouldPauseQueues = await checkShouldPauseQueues() + + if (shouldPauseQueues) { + const rssQueueCount = await getQueueTaskCount(RSS_QUEUE_NAME) + const importQueueCount = await getQueueTaskCount(IMPORT_QUEUE_NAME) + const message = `Both queues have been paused due to API latency threshold exceedance.\n\t-The RSS queue currently has ${rssQueueCount} tasks.\n\t-The import queue currently has ${importQueueCount} pending tasks.` + + // Pause the two queues + await pauseQueues() + + // Post to Discord server using webhook + await postToDiscord(message) + } else { + const rssQueueCount = await getQueueTaskCount(RSS_QUEUE_NAME) + const importQueueCount = await getQueueTaskCount(IMPORT_QUEUE_NAME) + + if (rssQueueCount > RSS_QUEUE_THRESHOLD) { + await postToDiscord( + `The RSS queue has exceeded it's threshold, it has ${rssQueueCount} items in it.` + ) + } + + if (importQueueCount > IMPORT_QUEUE_THRESHOLD) { + await postToDiscord( + `The import queue has exceeded it's threshold, it has ${importQueueCount} items in it.` + ) + } + } +} + +export const queueManager = Sentry.GCPFunction.wrapHttpFunction( + async (req, res) => { + try { + checkMetricsAndPauseQueues() + res.send('ok') + } catch (e) { + console.error('Error while parsing RSS feed', e) + res.status(500).send('INTERNAL_SERVER_ERROR') + } + } +) diff --git a/packages/queue-manager/test/babel-register.js b/packages/queue-manager/test/babel-register.js new file mode 100644 index 000000000..a6f65f60a --- /dev/null +++ b/packages/queue-manager/test/babel-register.js @@ -0,0 +1,3 @@ +const register = require('@babel/register').default + +register({ extensions: ['.ts', '.tsx', '.js', '.jsx'] }) diff --git a/packages/queue-manager/test/stub.test.ts b/packages/queue-manager/test/stub.test.ts new file mode 100644 index 000000000..24ad25c8f --- /dev/null +++ b/packages/queue-manager/test/stub.test.ts @@ -0,0 +1,8 @@ +import 'mocha' +import { expect } from 'chai' + +describe('stub test', () => { + it('should pass', () => { + expect(true).to.be.true + }) +}) diff --git a/packages/queue-manager/tsconfig.json b/packages/queue-manager/tsconfig.json new file mode 100644 index 000000000..7ebe093f6 --- /dev/null +++ b/packages/queue-manager/tsconfig.json @@ -0,0 +1,8 @@ +{ + "extends": "./../../tsconfig.json", + "compilerOptions": { + "outDir": "build", + "rootDir": "." + }, + "include": ["src"] +}