From 5986bd0a685735d040d0c0381fa24f947a9f3248 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Mon, 15 Jan 2024 17:22:00 +0800 Subject: [PATCH] Change around redis initialization so we can pull in secrets from secret manager async on startup --- packages/api/package.json | 1 + packages/api/src/jobqueue.ts | 20 ---- packages/api/src/jobs/rss/refreshAllFeeds.ts | 6 +- packages/api/src/jobs/rss/refreshFeed.ts | 21 ++-- packages/api/src/queue-processor.ts | 83 +++++++++----- packages/api/src/redis.ts | 41 ------- packages/api/src/redis_data_source.ts | 111 +++++++++++++++++++ packages/api/src/routers/svc/rss_feed.ts | 9 +- packages/api/src/server.ts | 13 +-- packages/api/src/services/save_page.ts | 1 - packages/api/src/util.ts | 2 +- packages/api/src/utils/createTask.ts | 77 +++++++------ packages/api/src/utils/helpers.ts | 10 +- packages/api/test/global-setup.ts | 6 +- packages/api/test/global-teardown.ts | 8 +- 15 files changed, 251 insertions(+), 158 deletions(-) delete mode 100644 packages/api/src/jobqueue.ts delete mode 100644 packages/api/src/redis.ts create mode 100644 packages/api/src/redis_data_source.ts diff --git a/packages/api/package.json b/packages/api/package.json index 7d5420686..f38e23c6d 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -19,6 +19,7 @@ "@google-cloud/monitoring": "^4.0.0", "@google-cloud/opentelemetry-cloud-trace-exporter": "^2.0.0", "@google-cloud/pubsub": "^4.0.0", + "@google-cloud/secret-manager": "^5.0.1", "@google-cloud/storage": "^7.0.1", "@google-cloud/tasks": "^4.0.0", "@graphql-tools/utils": "^9.1.1", diff --git a/packages/api/src/jobqueue.ts b/packages/api/src/jobqueue.ts deleted file mode 100644 index b7539c2b4..000000000 --- a/packages/api/src/jobqueue.ts +++ /dev/null @@ -1,20 +0,0 @@ -import { Queue } from 'bullmq' -import { mqRedisClient } from './redis' - -const createRSSRefreshFeedQueue = (): Queue | undefined => { - return new Queue('rssRefreshFeed', { - connection: mqRedisClient, - }) -} - -export const addRefreshFeedJob = async (jobid: string, payload: any) => { - const rssRefreshFeedJobQueue = createRSSRefreshFeedQueue() - if (!rssRefreshFeedJobQueue) { - return false - } - return rssRefreshFeedJobQueue?.add('rssRefreshFeed', payload, { - jobId: jobid, - removeOnComplete: true, - removeOnFail: true, - }) -} diff --git a/packages/api/src/jobs/rss/refreshAllFeeds.ts b/packages/api/src/jobs/rss/refreshAllFeeds.ts index 75c4c07a5..d45b1d98e 100644 --- a/packages/api/src/jobs/rss/refreshAllFeeds.ts +++ b/packages/api/src/jobs/rss/refreshAllFeeds.ts @@ -2,7 +2,7 @@ import Redis from 'ioredis' import { DataSource } from 'typeorm' import { stringToHash } from '../../utils/helpers' import { RssSubscriptionGroup } from '../../utils/createTask' -import { Queue } from 'bullmq' +import { Job, Queue } from 'bullmq' import { QUEUE_NAME } from '../../queue-processor' export const refreshAllFeeds = async ( @@ -75,10 +75,10 @@ export const queueRSSRefreshFeedJob = async ( redis: Redis, jobid: string, payload: any -) => { +): Promise => { const queue = createBackendQueue(redis) if (!queue) { - return false + return undefined } return queue.add('refresh-feed', payload, { jobId: jobid, diff --git a/packages/api/src/jobs/rss/refreshFeed.ts b/packages/api/src/jobs/rss/refreshFeed.ts index 95bfdae35..a0b4603af 100644 --- a/packages/api/src/jobs/rss/refreshFeed.ts +++ b/packages/api/src/jobs/rss/refreshFeed.ts @@ -3,14 +3,12 @@ import crypto from 'crypto' import * as dotenv from 'dotenv' // see https://github.com/motdotla/dotenv#how-do-i-use-dotenv-with-import import * as jwt from 'jsonwebtoken' import { parseHTML } from 'linkedom' -import { createClient } from 'redis' import Parser, { Item } from 'rss-parser' import { promisify } from 'util' +import { RedisClientType } from 'redis' import { CONTENT_FETCH_URL, createCloudTask } from './task' type FolderType = 'following' | 'inbox' -// explicitly create the return type of RedisClient -type RedisClient = ReturnType interface RefreshFeedRequest { subscriptionIds: string[] @@ -73,7 +71,7 @@ export const isOldItem = (item: RssFeedItem, lastFetchedAt: number) => { const feedFetchFailedRedisKey = (feedUrl: string) => `feed-fetch-failure:${feedUrl}` -const isFeedBlocked = async (feedUrl: string, redisClient: RedisClient) => { +const isFeedBlocked = async (feedUrl: string, redisClient: RedisClientType) => { const key = feedFetchFailedRedisKey(feedUrl) try { const result = await redisClient.get(key) @@ -92,7 +90,7 @@ const isFeedBlocked = async (feedUrl: string, redisClient: RedisClient) => { const incrementFeedFailure = async ( feedUrl: string, - redisClient: RedisClient + redisClient: RedisClientType ) => { const key = feedFetchFailedRedisKey(feedUrl) try { @@ -258,7 +256,7 @@ const sendUpdateSubscriptionMutation = async ( } const isItemRecentlySaved = async ( - redisClient: RedisClient, + redisClient: RedisClientType, userId: string, url: string ) => { @@ -273,7 +271,7 @@ const createTask = async ( item: RssFeedItem, fetchContent: boolean, folder: FolderType, - redisClient: RedisClient + redisClient: RedisClientType ) => { const isRecentlySaved = await isItemRecentlySaved( redisClient, @@ -463,7 +461,7 @@ const processSubscription = async ( fetchContent: boolean, folder: FolderType, feed: RssFeed, - redisClient: RedisClient + redisClient: RedisClientType ) => { let lastItemFetchedAt: Date | null = null let lastValidItem: RssFeedItem | null = null @@ -597,7 +595,10 @@ const processSubscription = async ( console.log('Updated subscription', updatedSubscription) } -export const refreshFeed = async (redisClient: RedisClient, request: any) => { +export const refreshFeed = async ( + redisClient: RedisClientType, + request: any +) => { if (isRefreshFeedRequest(request)) { return _refreshFeed(redisClient, request) } @@ -606,7 +607,7 @@ export const refreshFeed = async (redisClient: RedisClient, request: any) => { } export const _refreshFeed = async ( - redisClient: RedisClient, + redisClient: RedisClientType, request: RefreshFeedRequest ) => { try { diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index 69acf4575..163e2da36 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -4,40 +4,63 @@ /* eslint-disable @typescript-eslint/no-misused-promises */ import express, { Express } from 'express' import { appDataSource } from './data_source' -import { env } from './env' -import { redisClient, mqRedisClient } from './redis' - -import { Worker, Job, QueueEvents } from 'bullmq' +import { loadEnvFromGCPSecrets } from './gcp-utils' +import { getEnv } from './util' +import { redisDataSource } from './redis_data_source' +import { CustomTypeOrmLogger } from './utils/logger' +import { SnakeNamingStrategy } from 'typeorm-naming-strategies' import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds' +import { Job, Worker, QueueEvents } from 'bullmq' import { refreshFeed } from './jobs/rss/refreshFeed' export const QUEUE_NAME = 'omnivore-backend-queue' const main = async () => { - console.log('calling queue-processor start') + console.log('[queue-processor]: starting queue processor') + + let env = (await loadEnvFromGCPSecrets()) ?? getEnv() const app: Express = express() const port = process.env.PORT || 3002 - await appDataSource.initialize() + redisDataSource.setOptions({ + REDIS_URL: env.redis.url, + REDIS_CERT: env.redis.cert, + }) + + appDataSource.setOptions({ + type: 'postgres', + host: env.pg.host, + port: env.pg.port, + schema: 'omnivore', + username: env.pg.userName, + password: env.pg.password, + database: env.pg.dbName, + logging: ['query', 'info'], + entities: [__dirname + '/entity/**/*{.js,.ts}'], + subscribers: [__dirname + '/events/**/*{.js,.ts}'], + namingStrategy: new SnakeNamingStrategy(), + logger: new CustomTypeOrmLogger(['query', 'info']), + connectTimeoutMS: 40000, // 40 seconds + maxQueryExecutionTime: 10000, // 10 seconds + }) // respond healthy to auto-scaler. app.get('/_ah/health', (req, res) => res.sendStatus(200)) - // redis is optional - if (env.redis.url) { - redisClient.on('error', (err) => { - console.error('Redis Client Error', err) - }) + const server = app.listen(port, () => { + console.log(`[queue-processor]: started`) + }) - await redisClient.connect() - console.log('Redis Client Connected:', env.redis.url) - } + // This is done after all the setup so it can access the + // environment that was loaded from GCP + await appDataSource.initialize() + await redisDataSource.initialize() - // redis for message queue - if (env.redis.url) { - mqRedisClient?.on('error', (err) => { - console.error('Redis Client Error', err) - }) + const redisClient = redisDataSource.redisClient + const ioRedisClient = redisDataSource.ioRedisClient + + if (!redisClient || !ioRedisClient) { + throw '[queue-processor] error redis is not initialized' } const worker = new Worker( @@ -45,7 +68,7 @@ const main = async () => { async (job: Job) => { switch (job.name) { case 'refresh-all-feeds': { - return await refreshAllFeeds(appDataSource, mqRedisClient) + return await refreshAllFeeds(appDataSource, ioRedisClient) } case 'refresh-feed': { return await refreshFeed(redisClient, job.data) @@ -54,12 +77,12 @@ const main = async () => { return true }, { - connection: mqRedisClient, + connection: ioRedisClient, } ) const queueEvents = new QueueEvents(QUEUE_NAME, { - connection: mqRedisClient, + connection: ioRedisClient, }) queueEvents.on('added', async (job) => { @@ -76,18 +99,18 @@ const main = async () => { const gracefulShutdown = async (signal: string) => { console.log(`[queue-processor]: Received ${signal}, closing server...`) - await worker.close() - await redisClient.disconnect() - mqRedisClient.disconnect() - process.exit(0) + server.close(async () => { + console.log( + '[queue-processor]: Server closed. shuting down and exiting process...' + ) + await worker.close() + await redisDataSource.shutdown() + process.exit(0) + }) } process.on('SIGINT', () => gracefulShutdown('SIGINT')) process.on('SIGTERM', () => gracefulShutdown('SIGTERM')) - - app.listen(port, () => { - console.log(`[queue-processor]: started`) - }) } // only call main if the file was called from the CLI and wasn't required from another module diff --git a/packages/api/src/redis.ts b/packages/api/src/redis.ts deleted file mode 100644 index 27c241a06..000000000 --- a/packages/api/src/redis.ts +++ /dev/null @@ -1,41 +0,0 @@ -import { Redis } from 'ioredis' -import { env } from './env' -import { Redis } from 'ioredis' -import { RedisOptions } from 'bullmq' - -const url = env.redis.url -const cert = env.redis.cert - -export const redisClient = url - ? new Redis(url, { - connectTimeout: 10000, // 10 seconds - tls: cert - ? { - cert, - rejectUnauthorized: false, // for self-signed certs - } - : undefined, - reconnectOnError: (err) => { - const targetErrors = [/READONLY/, /ETIMEDOUT/] - - targetErrors.forEach((targetError) => { - if (targetError.test(err.message)) { - // Only reconnect when the error contains the keyword - return true - } - }) - - return false - }, - retryStrategy: (times) => { - if (times > 10) { - // End reconnecting after a specific number of tries and flush all commands with a individual error - return null - } - - // reconnect after - return Math.min(times * 50, 2000) - }, - maxRetriesPerRequest: 1, - }) - : null diff --git a/packages/api/src/redis_data_source.ts b/packages/api/src/redis_data_source.ts new file mode 100644 index 000000000..db22107e6 --- /dev/null +++ b/packages/api/src/redis_data_source.ts @@ -0,0 +1,111 @@ +import Redis, { RedisOptions } from 'ioredis' +import { RedisClientType, createClient } from 'redis' +import { env } from './env' + +export type RedisDataSourceOptions = { + REDIS_URL?: string + REDIS_CERT?: string +} + +export class RedisDataSource { + options: RedisDataSourceOptions + isInitialized: Boolean + + redisClient: RedisClientType | undefined = undefined + ioRedisClient: Redis | undefined = undefined + + constructor(options: RedisDataSourceOptions) { + this.options = options + this.isInitialized = false + } + + async initialize(): Promise { + if (this.isInitialized) throw 'Error already initialized' + + this.redisClient = createRedisClient(this.options) + this.ioRedisClient = createIORedisClient(this.options) + this.isInitialized = true + + this.redisClient?.connect() + + return this + } + + setOptions(options: RedisDataSourceOptions): void { + this.options = options + } + + async shutdown(): Promise { + if (this.redisClient && this.redisClient.isOpen) { + await this.redisClient.disconnect() + } + if (this.ioRedisClient && this.ioRedisClient.status == 'ready') { + this.ioRedisClient.quit() + } + } +} + +const createRedisClient = ( + options: RedisDataSourceOptions +): RedisClientType | undefined => { + if (!options.REDIS_URL) { + throw 'Error: no redisURL supplied' + } + return createClient({ + url: options.REDIS_URL, + socket: { + tls: options.REDIS_URL.startsWith('rediss://'), // rediss:// is the protocol for TLS + cert: options.REDIS_CERT, + rejectUnauthorized: false, // for self-signed certs + connectTimeout: 10000, // 10 seconds + reconnectStrategy(retries: number): number | Error { + if (retries > 10) { + return new Error('Retries exhausted') + } + return 1000 + }, + }, + }) +} + +const createIORedisClient = ( + options: RedisDataSourceOptions +): Redis | undefined => { + let redisURL = options.REDIS_URL + if (!redisURL) { + throw 'Error: no redisURL supplied' + } + const redisOptions = (redisURL: string): RedisOptions => { + if (redisURL.startsWith('rediss://') && options.REDIS_CERT) { + return { + tls: { + ca: options.REDIS_CERT, + rejectUnauthorized: false, + }, + connectTimeout: 10000, + maxRetriesPerRequest: null, + retryStrategy: (times: number) => { + return 10 + }, + } + } + return { + connectTimeout: 10000, + maxRetriesPerRequest: null, + retryStrategy: (times: number) => { + console.log('retrying', times) + if (times > 10) { + return null + } + return 10 + }, + } + } + + return new Redis(redisURL, redisOptions(redisURL)) +} + +export const redisDataSource = new RedisDataSource({ + REDIS_URL: env.redis.url, + REDIS_CERT: env.redis.cert, +}) diff --git a/packages/api/src/routers/svc/rss_feed.ts b/packages/api/src/routers/svc/rss_feed.ts index 2c5be9bd7..51ce8dcf5 100644 --- a/packages/api/src/routers/svc/rss_feed.ts +++ b/packages/api/src/routers/svc/rss_feed.ts @@ -13,7 +13,7 @@ import { } from '../../utils/createTask' import { logger } from '../../utils/logger' import { queueRSSRefreshAllFeedsJob } from '../../jobs/rss/refreshAllFeeds' -import { mqRedisClient } from '../../redis' +import { redisDataSource } from '../../redis_data_source' export function rssFeedRouter() { const router = express.Router() @@ -30,7 +30,12 @@ export function rssFeedRouter() { return res.status(200).send('Expired') } - await queueRSSRefreshAllFeedsJob(mqRedisClient) + if (redisDataSource.ioRedisClient) { + await queueRSSRefreshAllFeedsJob(redisDataSource.ioRedisClient) + } else { + console.log('unable to fetchAll feeds, redis is not configured') + return res.status(500).send('Expired') + } // // get active rss feed subscriptions scheduled for fetch and group by feed url // const subscriptionGroups = (await getRepository(Subscription).query( diff --git a/packages/api/src/server.ts b/packages/api/src/server.ts index d93a66a16..3bbc20203 100755 --- a/packages/api/src/server.ts +++ b/packages/api/src/server.ts @@ -15,7 +15,6 @@ import { config, loggers } from 'winston' import { makeApolloServer } from './apollo' import { appDataSource } from './data_source' import { env } from './env' -import { redisClient, mqRedisClient } from './redis' import { articleRouter } from './routers/article_router' import { authRouter } from './routers/auth/auth_router' import { mobileAuthRouter } from './routers/auth/mobile/mobile_auth_router' @@ -45,6 +44,7 @@ import { } from './utils/auth' import { corsConfig } from './utils/corsConfig' import { buildLogger, buildLoggerTransport } from './utils/logger' +import { redisDataSource } from './redis_data_source' const PORT = process.env.PORT || 4000 @@ -158,16 +158,9 @@ const main = async (): Promise => { // as healthy. await appDataSource.initialize() - // redis is optional - if (redisClient) { - console.log('Redis Client Connected:', env.redis.url) - } - - // redis for message queue + // redis is optional for the API server if (env.redis.url) { - mqRedisClient?.on('error', (err) => { - console.error('Redis Client Error', err) - }) + await redisDataSource.initialize() } const { app, apollo, httpServer } = createApp() diff --git a/packages/api/src/services/save_page.ts b/packages/api/src/services/save_page.ts index 0cded9a52..442dcea93 100644 --- a/packages/api/src/services/save_page.ts +++ b/packages/api/src/services/save_page.ts @@ -13,7 +13,6 @@ import { SavePageInput, SaveResult, } from '../generated/graphql' -import { redisClient } from '../redis' import { authTrx } from '../repository' import { enqueueThumbnailTask } from '../utils/createTask' import { diff --git a/packages/api/src/util.ts b/packages/api/src/util.ts index 7f665fd97..27e26a088 100755 --- a/packages/api/src/util.ts +++ b/packages/api/src/util.ts @@ -4,7 +4,7 @@ import * as dotenv from 'dotenv' import os from 'os' -interface BackendEnv { +export interface BackendEnv { pg: { host: string port: number diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index 021ec8291..be17d1991 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -19,8 +19,9 @@ import { generateVerificationToken, OmnivoreAuthorizationHeader } from './auth' import { CreateTaskError } from './errors' import { logger } from './logger' import View = google.cloud.tasks.v2.Task.View -import { addRefreshFeedJob } from '../jobqueue' import { stringToHash } from './helpers' +import { queueRSSRefreshFeedJob } from '../jobs/rss/refreshAllFeeds' +import { redisDataSource } from '../redis_data_source' // Instantiates a client. const client = new CloudTasksClient() @@ -655,41 +656,53 @@ export const enqueueRssFeedFetch = async ( subscriptionGroup.url )}_${stringToHash(JSON.stringify(subscriptionGroup.userIds.sort()))}` - await addRefreshFeedJob(jobid, payload) - - // If there is no Google Cloud Project Id exposed, it means that we are in local environment - if (env.dev.isLocal || !GOOGLE_CLOUD_PROJECT) { - if (env.queue.rssFeedTaskHandlerUrl) { - // Calling the handler function directly. - setTimeout(() => { - axios - .post( - `${env.queue.rssFeedTaskHandlerUrl}?token=${PUBSUB_VERIFICATION_TOKEN}`, - payload - ) - .catch((error) => { - logError(error) - }) - }, 0) + if (redisDataSource.ioRedisClient) { + let job = await queueRSSRefreshFeedJob( + redisDataSource.ioRedisClient, + jobid, + payload + ) + if (!job || !job.id) { + throw 'unable to queue rss-refresh-feed-job, job did not enqueue' } - return nanoid() + return job.id + } else { + throw 'unable to queue rss-refresh-feed-job, redis is not configured' } - const createdTasks = await createHttpTaskWithToken({ - project: GOOGLE_CLOUD_PROJECT, - queue: 'omnivore-rss-queue', - payload, - taskHandlerUrl: `${env.queue.rssFeedTaskHandlerUrl}?token=${PUBSUB_VERIFICATION_TOKEN}`, - }) + // // If there is no Google Cloud Project Id exposed, it means that we are in local environment + // if (env.dev.isLocal || !GOOGLE_CLOUD_PROJECT) { + // if (env.queue.rssFeedTaskHandlerUrl) { + // // Calling the handler function directly. + // setTimeout(() => { + // axios + // .post( + // `${env.queue.rssFeedTaskHandlerUrl}?token=${PUBSUB_VERIFICATION_TOKEN}`, + // payload + // ) + // .catch((error) => { + // logError(error) + // }) + // }, 0) + // } + // return nanoid() + // } - if (!createdTasks || !createdTasks[0].name) { - logger.error(`Unable to get the name of the task`, { - payload, - createdTasks, - }) - throw new CreateTaskError(`Unable to get the name of the task`) - } - return createdTasks[0].name + // const createdTasks = await createHttpTaskWithToken({ + // project: GOOGLE_CLOUD_PROJECT, + // queue: 'omnivore-rss-queue', + // payload, + // taskHandlerUrl: `${env.queue.rssFeedTaskHandlerUrl}?token=${PUBSUB_VERIFICATION_TOKEN}`, + // }) + + // if (!createdTasks || !createdTasks[0].name) { + // logger.error(`Unable to get the name of the task`, { + // payload, + // createdTasks, + // }) + // throw new CreateTaskError(`Unable to get the name of the task`) + // } + //return createdTasks[0].name } export default createHttpTaskWithToken diff --git a/packages/api/src/utils/helpers.ts b/packages/api/src/utils/helpers.ts index 58de3d455..2397738fd 100644 --- a/packages/api/src/utils/helpers.ts +++ b/packages/api/src/utils/helpers.ts @@ -26,7 +26,7 @@ import { SearchItem, } from '../generated/graphql' import { createPubSubClient } from '../pubsub' -import { redisClient } from '../redis' +import { redisDataSource } from '../redis_data_source' import { Claims, WithDataSourcesContext } from '../resolvers/types' import { validateUrl } from '../services/create_page_save_request' import { updateLibraryItem } from '../services/library_item' @@ -416,6 +416,14 @@ export const setRecentlySavedItemInRedis = async ( url: string ) => { // save the url in redis for 26 hours so rss-feeder won't try to re-save it + const client = redisDataSource.redisClient + if (!client) { + console.info( + 'not setting recently saved item because redis is not configured' + ) + return + } + // save the url in redis for 8 hours so rss-feeder won't try to re-save it const redisKey = `recent-saved-item:${userId}:${url}` const ttlInSeconds = 60 * 60 * 26 try { diff --git a/packages/api/test/global-setup.ts b/packages/api/test/global-setup.ts index b9ecb507a..dce893a86 100644 --- a/packages/api/test/global-setup.ts +++ b/packages/api/test/global-setup.ts @@ -1,4 +1,5 @@ -import { redisClient } from '../src/redis' +import { env } from '../src/env' +import { redisDataSource } from '../src/redis_data_source' import { createTestConnection } from './db' import { startApolloServer } from './util' @@ -6,7 +7,8 @@ export const mochaGlobalSetup = async () => { await createTestConnection() console.log('db connection created') - if (redisClient) { + if (env.redis.url) { + await redisDataSource.initialize() console.log('redis connection created') } diff --git a/packages/api/test/global-teardown.ts b/packages/api/test/global-teardown.ts index 578dc8d64..552d359d0 100644 --- a/packages/api/test/global-teardown.ts +++ b/packages/api/test/global-teardown.ts @@ -1,5 +1,5 @@ import { appDataSource } from '../src/data_source' -import { redisClient } from '../src/redis' +import { redisDataSource } from '../src/redis_data_source' import { stopApolloServer } from './util' export const mochaGlobalTeardown = async () => { @@ -9,8 +9,6 @@ export const mochaGlobalTeardown = async () => { await appDataSource.destroy() console.log('db connection closed') - if (redisClient) { - await redisClient.quit() - console.log('redis connection closed') - } + await redisDataSource.shutdown() + console.log('redis connection closed') }