From e4332b74f97b88eda9c702fa34c0f699da679725 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Mon, 16 Oct 2023 20:11:51 +0800 Subject: [PATCH 1/7] Add queue manager service --- packages/queue-manager/Dockerfile | 26 +++ packages/queue-manager/mocha-config.json | 5 + packages/queue-manager/package.json | 31 +++ packages/queue-manager/src/index.ts | 190 ++++++++++++++++++ packages/queue-manager/test/babel-register.js | 3 + packages/queue-manager/test/stub.test.ts | 8 + packages/queue-manager/tsconfig.json | 8 + 7 files changed, 271 insertions(+) create mode 100644 packages/queue-manager/Dockerfile create mode 100644 packages/queue-manager/mocha-config.json create mode 100644 packages/queue-manager/package.json create mode 100644 packages/queue-manager/src/index.ts create mode 100644 packages/queue-manager/test/babel-register.js create mode 100644 packages/queue-manager/test/stub.test.ts create mode 100644 packages/queue-manager/tsconfig.json diff --git a/packages/queue-manager/Dockerfile b/packages/queue-manager/Dockerfile new file mode 100644 index 000000000..fe547f483 --- /dev/null +++ b/packages/queue-manager/Dockerfile @@ -0,0 +1,26 @@ +FROM node:14.18-alpine + +# Run everything after as non-privileged user. +WORKDIR /app + +COPY package.json . +COPY yarn.lock . +COPY tsconfig.json . +COPY .eslintrc . + +COPY /packages/queue-manager/package.json ./packages/queue-manager/package.json + +RUN yarn install --pure-lockfile + +ADD /packages/queue-manager ./packages/queue-manager +RUN yarn workspace @omnivore/queue-manager build + +# After building, fetch the production dependencies +RUN rm -rf /app/packages/queue-manager/node_modules +RUN rm -rf /app/node_modules +RUN yarn install --pure-lockfile --production + +EXPOSE 8080 + +CMD ["yarn", "workspace", "@omnivore/queue-manager", "start"] + diff --git a/packages/queue-manager/mocha-config.json b/packages/queue-manager/mocha-config.json new file mode 100644 index 000000000..44d1d24c1 --- /dev/null +++ b/packages/queue-manager/mocha-config.json @@ -0,0 +1,5 @@ +{ + "extension": ["ts"], + "spec": "test/**/*.test.ts", + "require": "test/babel-register.js" + } \ No newline at end of file diff --git a/packages/queue-manager/package.json b/packages/queue-manager/package.json new file mode 100644 index 000000000..69c2630a8 --- /dev/null +++ b/packages/queue-manager/package.json @@ -0,0 +1,31 @@ +{ + "name": "@omnivore/queue-manager", + "version": "1.0.0", + "main": "build/src/index.js", + "files": [ + "build/src" + ], + "license": "Apache-2.0", + "scripts": { + "test": "yarn mocha -r ts-node/register --config mocha-config.json", + "lint": "eslint src --ext ts,js,tsx,jsx", + "compile": "tsc", + "build": "tsc", + "start": "functions-framework --target=queueManager", + "dev": "concurrently \"tsc -w\" \"nodemon --watch ./build/ --exec npm run start\"" + }, + "devDependencies": { + "chai": "^4.3.6", + "eslint-plugin-prettier": "^4.0.0", + "mocha": "^10.0.0" + }, + "dependencies": { + "@google-cloud/functions-framework": "3.1.2", + "@google-cloud/monitoring": "^4.0.0", + "@google-cloud/tasks": "^4.0.0", + "@sentry/serverless": "^6.16.1", + "axios": "^1.4.0", + "dotenv": "^16.0.1", + "jsonwebtoken": "^8.5.1" + } +} \ No newline at end of file diff --git a/packages/queue-manager/src/index.ts b/packages/queue-manager/src/index.ts new file mode 100644 index 000000000..f2b5791c5 --- /dev/null +++ b/packages/queue-manager/src/index.ts @@ -0,0 +1,190 @@ +import { MetricServiceClient } from '@google-cloud/monitoring' +import { v2beta3 } from '@google-cloud/tasks' +import fetch from 'node-fetch' +import * as dotenv from 'dotenv' + +import * as Sentry from '@sentry/serverless' + +dotenv.config() +Sentry.GCPFunction.init({ + dsn: process.env.SENTRY_DSN, + tracesSampleRate: 0, +}) + +const PROJECT_ID = process.env.PROJECT_ID +const LOCATION = 'us-west2' +const IMPORT_QUEUE_NAME = process.env.IMPORT_QUEUE_NAME +const RSS_QUEUE_NAME = process.env.RSS_FEED_QUEUE_NAME +const QUEUE_NAMES = [IMPORT_QUEUE_NAME, RSS_QUEUE_NAME] +const DISCORD_WEBHOOK_URL = process.env.DISCORD_WEBHOOK_URL +const METRICS_FILTER = `metric.type="appengine.googleapis.com/http/server/response_latencies" metric.labels.response_code="200"` + +if ( + !PROJECT_ID || + !IMPORT_QUEUE_NAME || + !RSS_QUEUE_NAME || + !DISCORD_WEBHOOK_URL +) { + throw new Error('environment not supplied.') +} + +const LATENCY_THRESHOLD = 500 +const RSS_QUEUE_THRESHOLD = 20_000 +const IMPORT_QUEUE_THRESHOLD = 100_000 + +const postToDiscord = async (message: string) => { + console.log('notify message', { message }) + const payload = { + content: message, + } + + try { + const response = await fetch(DISCORD_WEBHOOK_URL, { + method: 'POST', + body: JSON.stringify(payload), + headers: { + 'Content-Type': 'application/json', + }, + }) + + if (!response.ok) { + throw new Error(`Discord response was not ok: ${response.statusText}`) + } + } catch (error) { + console.error('Failed to post message to Discord:', error) + } +} + +const checkShouldPauseQueues = async () => { + const now = Date.now() + const client = new MetricServiceClient() + + // Query for the metrics from the last 5 minutes + const [timeSeries] = await client.listTimeSeries({ + name: client.projectPath(PROJECT_ID), + filter: METRICS_FILTER, + interval: { + startTime: { + seconds: Math.floor(now / 1000 - 5 * 60), + }, + endTime: { + seconds: Math.floor(now / 1000), + }, + }, + aggregation: { + alignmentPeriod: { + seconds: 300, + }, + perSeriesAligner: 'ALIGN_PERCENTILE_95', + }, + }) + + let shouldPauseQueues = false + + for (const ts of timeSeries) { + // We only want to look at the backend service right now + if ( + !ts.resource || + !ts.resource.labels || + !ts.resource.labels['module_id'] || + !ts.resource.labels['module_id'].startsWith('backend') + ) { + continue + } + + if (ts.points && ts.points.length) { + const avgLatency = + ts.points.reduce( + (acc, point) => acc + (point.value?.doubleValue ?? 0), + 0 + ) / ts.points.length + console.log('avgLatency: ', avgLatency) + if (avgLatency > LATENCY_THRESHOLD) { + shouldPauseQueues = true + break + } + } + } + + return shouldPauseQueues +} + +const getQueueTaskCount = async (queueName: string) => { + const cloudTasksClient = new v2beta3.CloudTasksClient() + const queuePath = cloudTasksClient.queuePath(PROJECT_ID, LOCATION, queueName) + const [queue] = await cloudTasksClient.getQueue({ + name: queuePath, + readMask: { paths: ['name', 'stats'] }, + }) + + console.log(' queue.stats', { stats: queue.stats }) + if (Number.isNaN(queue.stats?.tasksCount)) { + return 0 + } + return Number(queue.stats?.tasksCount) +} + +const pauseQueues = async () => { + const cloudTasksClient = new v2beta3.CloudTasksClient() + + await Promise.all([ + cloudTasksClient.pauseQueue({ + name: cloudTasksClient.queuePath(PROJECT_ID, LOCATION, RSS_QUEUE_NAME), + }), + cloudTasksClient.pauseQueue({ + name: cloudTasksClient.queuePath(PROJECT_ID, LOCATION, IMPORT_QUEUE_NAME), + }), + ]) +} + +async function checkMetricsAndPauseQueues() { + if ( + !PROJECT_ID || + !IMPORT_QUEUE_NAME || + !RSS_QUEUE_NAME || + !DISCORD_WEBHOOK_URL + ) { + throw new Error('environment not supplied.') + } + + const shouldPauseQueues = await checkShouldPauseQueues() + + if (shouldPauseQueues) { + const rssQueueCount = await getQueueTaskCount(RSS_QUEUE_NAME) + const importQueueCount = await getQueueTaskCount(IMPORT_QUEUE_NAME) + const message = `Both queues have been paused due to API latency threshold exceedance.\n\t-The RSS queue currently has ${rssQueueCount} tasks.\n\t-The import queue currently has ${importQueueCount} pending tasks.` + + // Pause the two queues + await pauseQueues() + + // Post to Discord server using webhook + await postToDiscord(message) + } else { + const rssQueueCount = await getQueueTaskCount(RSS_QUEUE_NAME) + const importQueueCount = await getQueueTaskCount(IMPORT_QUEUE_NAME) + + if (rssQueueCount > RSS_QUEUE_THRESHOLD) { + await postToDiscord( + `The RSS queue has exceeded it's threshold, it has ${rssQueueCount} items in it.` + ) + } + + if (importQueueCount > IMPORT_QUEUE_THRESHOLD) { + await postToDiscord( + `The import queue has exceeded it's threshold, it has ${importQueueCount} items in it.` + ) + } + } +} + +export const queueManager = Sentry.GCPFunction.wrapHttpFunction( + async (req, res) => { + try { + checkMetricsAndPauseQueues() + res.send('ok') + } catch (e) { + console.error('Error while parsing RSS feed', e) + res.status(500).send('INTERNAL_SERVER_ERROR') + } + } +) diff --git a/packages/queue-manager/test/babel-register.js b/packages/queue-manager/test/babel-register.js new file mode 100644 index 000000000..a6f65f60a --- /dev/null +++ b/packages/queue-manager/test/babel-register.js @@ -0,0 +1,3 @@ +const register = require('@babel/register').default + +register({ extensions: ['.ts', '.tsx', '.js', '.jsx'] }) diff --git a/packages/queue-manager/test/stub.test.ts b/packages/queue-manager/test/stub.test.ts new file mode 100644 index 000000000..24ad25c8f --- /dev/null +++ b/packages/queue-manager/test/stub.test.ts @@ -0,0 +1,8 @@ +import 'mocha' +import { expect } from 'chai' + +describe('stub test', () => { + it('should pass', () => { + expect(true).to.be.true + }) +}) diff --git a/packages/queue-manager/tsconfig.json b/packages/queue-manager/tsconfig.json new file mode 100644 index 000000000..7ebe093f6 --- /dev/null +++ b/packages/queue-manager/tsconfig.json @@ -0,0 +1,8 @@ +{ + "extends": "./../../tsconfig.json", + "compilerOptions": { + "outDir": "build", + "rootDir": "." + }, + "include": ["src"] +} From 51e2aaa3eae6134d473a72af2c5e801bcacc8807 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Mon, 16 Oct 2023 20:29:03 +0800 Subject: [PATCH 2/7] Add node-fetch dependency --- packages/queue-manager/package.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/queue-manager/package.json b/packages/queue-manager/package.json index 69c2630a8..7c1319a45 100644 --- a/packages/queue-manager/package.json +++ b/packages/queue-manager/package.json @@ -15,6 +15,7 @@ "dev": "concurrently \"tsc -w\" \"nodemon --watch ./build/ --exec npm run start\"" }, "devDependencies": { + "@types/node-fetch": "^2.6.6", "chai": "^4.3.6", "eslint-plugin-prettier": "^4.0.0", "mocha": "^10.0.0" @@ -28,4 +29,4 @@ "dotenv": "^16.0.1", "jsonwebtoken": "^8.5.1" } -} \ No newline at end of file +} From b700c75552f6382537b022f93e071874249f9de2 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Mon, 16 Oct 2023 22:32:41 +0800 Subject: [PATCH 3/7] Use the GCP project id --- packages/queue-manager/src/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/queue-manager/src/index.ts b/packages/queue-manager/src/index.ts index f2b5791c5..1e79a304d 100644 --- a/packages/queue-manager/src/index.ts +++ b/packages/queue-manager/src/index.ts @@ -11,7 +11,7 @@ Sentry.GCPFunction.init({ tracesSampleRate: 0, }) -const PROJECT_ID = process.env.PROJECT_ID +const PROJECT_ID = process.env.GCP_PROJECT_ID const LOCATION = 'us-west2' const IMPORT_QUEUE_NAME = process.env.IMPORT_QUEUE_NAME const RSS_QUEUE_NAME = process.env.RSS_FEED_QUEUE_NAME From d141fea9777838299a012ce95eb34cc6dea6c680 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Tue, 17 Oct 2023 14:18:45 +0800 Subject: [PATCH 4/7] Handle cases where fetching queue counts fails --- packages/queue-manager/src/index.ts | 50 +++++++++++++++++------------ 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/packages/queue-manager/src/index.ts b/packages/queue-manager/src/index.ts index 1e79a304d..190903a96 100644 --- a/packages/queue-manager/src/index.ts +++ b/packages/queue-manager/src/index.ts @@ -150,29 +150,37 @@ async function checkMetricsAndPauseQueues() { const shouldPauseQueues = await checkShouldPauseQueues() if (shouldPauseQueues) { - const rssQueueCount = await getQueueTaskCount(RSS_QUEUE_NAME) - const importQueueCount = await getQueueTaskCount(IMPORT_QUEUE_NAME) - const message = `Both queues have been paused due to API latency threshold exceedance.\n\t-The RSS queue currently has ${rssQueueCount} tasks.\n\t-The import queue currently has ${importQueueCount} pending tasks.` - - // Pause the two queues - await pauseQueues() - - // Post to Discord server using webhook - await postToDiscord(message) - } else { - const rssQueueCount = await getQueueTaskCount(RSS_QUEUE_NAME) - const importQueueCount = await getQueueTaskCount(IMPORT_QUEUE_NAME) - - if (rssQueueCount > RSS_QUEUE_THRESHOLD) { - await postToDiscord( - `The RSS queue has exceeded it's threshold, it has ${rssQueueCount} items in it.` - ) + let rssQueueCount: number | string = 'unknown' + let importQueueCount: number | string = 'unknown' + try { + rssQueueCount = await getQueueTaskCount(RSS_QUEUE_NAME) + importQueueCount = await getQueueTaskCount(IMPORT_QUEUE_NAME) + } catch (err) { + console.log('error fetching queue counts', err) } - if (importQueueCount > IMPORT_QUEUE_THRESHOLD) { - await postToDiscord( - `The import queue has exceeded it's threshold, it has ${importQueueCount} items in it.` - ) + const message = `Both queues have been paused due to API latency threshold exceedance.\n\t-The RSS queue currently has ${rssQueueCount} tasks.\n\t-The import queue currently has ${importQueueCount} pending tasks.` + + await pauseQueues() + await postToDiscord(message) + } else { + try { + const rssQueueCount = await getQueueTaskCount(RSS_QUEUE_NAME) + const importQueueCount = await getQueueTaskCount(IMPORT_QUEUE_NAME) + + if (rssQueueCount > RSS_QUEUE_THRESHOLD) { + await postToDiscord( + `The RSS queue has exceeded it's threshold, it has ${rssQueueCount} items in it.` + ) + } + + if (importQueueCount > IMPORT_QUEUE_THRESHOLD) { + await postToDiscord( + `The import queue has exceeded it's threshold, it has ${importQueueCount} items in it.` + ) + } + } catch (err) { + console.log('error getting queue counts') } } } From a6d3bb22b11e192d50aa7a87d8cf9e5ade3b4811 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Tue, 17 Oct 2023 14:46:57 +0800 Subject: [PATCH 5/7] Add a check param so we can avoid checks on container startup while deploying --- packages/queue-manager/src/index.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/queue-manager/src/index.ts b/packages/queue-manager/src/index.ts index 190903a96..4d560b337 100644 --- a/packages/queue-manager/src/index.ts +++ b/packages/queue-manager/src/index.ts @@ -188,7 +188,9 @@ async function checkMetricsAndPauseQueues() { export const queueManager = Sentry.GCPFunction.wrapHttpFunction( async (req, res) => { try { - checkMetricsAndPauseQueues() + if (req.query['check']) { + checkMetricsAndPauseQueues() + } res.send('ok') } catch (e) { console.error('Error while parsing RSS feed', e) From 97cb8d3af7fb2694cd422f7002f409be7c316a21 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Tue, 17 Oct 2023 15:53:05 +0800 Subject: [PATCH 6/7] Bump test limit --- packages/queue-manager/src/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/queue-manager/src/index.ts b/packages/queue-manager/src/index.ts index 4d560b337..3ddc6a62e 100644 --- a/packages/queue-manager/src/index.ts +++ b/packages/queue-manager/src/index.ts @@ -30,7 +30,7 @@ if ( const LATENCY_THRESHOLD = 500 const RSS_QUEUE_THRESHOLD = 20_000 -const IMPORT_QUEUE_THRESHOLD = 100_000 +const IMPORT_QUEUE_THRESHOLD = 200_000 const postToDiscord = async (message: string) => { console.log('notify message', { message }) From d450bdd8e65f1c23faa9dcd30ea27f2fd2d24a8f Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Tue, 17 Oct 2023 16:03:38 +0800 Subject: [PATCH 7/7] Linting improvements --- packages/queue-manager/.eslintignore | 2 ++ packages/queue-manager/.eslintrc | 6 ++++++ packages/queue-manager/src/index.ts | 2 +- 3 files changed, 9 insertions(+), 1 deletion(-) create mode 100644 packages/queue-manager/.eslintignore create mode 100644 packages/queue-manager/.eslintrc diff --git a/packages/queue-manager/.eslintignore b/packages/queue-manager/.eslintignore new file mode 100644 index 000000000..b38db2f29 --- /dev/null +++ b/packages/queue-manager/.eslintignore @@ -0,0 +1,2 @@ +node_modules/ +build/ diff --git a/packages/queue-manager/.eslintrc b/packages/queue-manager/.eslintrc new file mode 100644 index 000000000..e006282a6 --- /dev/null +++ b/packages/queue-manager/.eslintrc @@ -0,0 +1,6 @@ +{ + "extends": "../../.eslintrc", + "parserOptions": { + "project": "tsconfig.json" + } +} \ No newline at end of file diff --git a/packages/queue-manager/src/index.ts b/packages/queue-manager/src/index.ts index 3ddc6a62e..f26246914 100644 --- a/packages/queue-manager/src/index.ts +++ b/packages/queue-manager/src/index.ts @@ -189,7 +189,7 @@ export const queueManager = Sentry.GCPFunction.wrapHttpFunction( async (req, res) => { try { if (req.query['check']) { - checkMetricsAndPauseQueues() + await checkMetricsAndPauseQueues() } res.send('ok') } catch (e) {