diff --git a/packages/content-fetch/Dockerfile b/packages/content-fetch/Dockerfile index d8ff553f1..b87362ee2 100644 --- a/packages/content-fetch/Dockerfile +++ b/packages/content-fetch/Dockerfile @@ -26,6 +26,7 @@ COPY /packages/content-fetch/package.json ./packages/content-fetch/package.json COPY /packages/readabilityjs/package.json ./packages/readabilityjs/package.json COPY /packages/content-handler/package.json ./packages/content-handler/package.json COPY /packages/puppeteer-parse/package.json ./packages/puppeteer-parse/package.json +COPY /packages/utils/package.json ./packages/utils/package.json RUN yarn install --pure-lockfile @@ -33,6 +34,8 @@ ADD /packages/content-fetch ./packages/content-fetch ADD /packages/content-handler ./packages/content-handler ADD /packages/puppeteer-parse ./packages/puppeteer-parse ADD /packages/readabilityjs ./packages/readabilityjs +ADD /packages/utils ./packages/utils +RUN yarn workspace @omnivore/utils build RUN yarn workspace @omnivore/content-handler build RUN yarn workspace @omnivore/puppeteer-parse build RUN yarn workspace @omnivore/content-fetch build diff --git a/packages/content-fetch/Dockerfile-gcf b/packages/content-fetch/Dockerfile-gcf index f0cf9209f..2cbabd931 100644 --- a/packages/content-fetch/Dockerfile-gcf +++ b/packages/content-fetch/Dockerfile-gcf @@ -30,6 +30,7 @@ COPY /packages/content-fetch/package.json ./packages/content-fetch/package.json COPY /packages/readabilityjs/package.json ./packages/readabilityjs/package.json COPY /packages/content-handler/package.json ./packages/content-handler/package.json COPY /packages/puppeteer-parse/package.json ./packages/puppeteer-parse/package.json +COPY /packages/utils/package.json ./packages/utils/package.json RUN yarn install --pure-lockfile @@ -37,6 +38,8 @@ ADD /packages/content-handler ./packages/content-handler ADD /packages/puppeteer-parse ./packages/puppeteer-parse ADD /packages/content-fetch ./packages/content-fetch ADD /packages/readabilityjs ./packages/readabilityjs +ADD /packages/utils ./packages/utils +RUN yarn workspace @omnivore/utils build RUN yarn workspace @omnivore/content-handler build RUN yarn workspace @omnivore/puppeteer-parse build RUN yarn workspace @omnivore/content-fetch build diff --git a/packages/content-fetch/package.json b/packages/content-fetch/package.json index 8836afa25..3bd83e706 100644 --- a/packages/content-fetch/package.json +++ b/packages/content-fetch/package.json @@ -10,11 +10,11 @@ "bullmq": "^5.1.1", "dotenv": "^8.2.0", "express": "^4.17.1", - "ioredis": "^5.3.2", "posthog-node": "^3.6.3", "@google-cloud/functions-framework": "^3.0.0", "@google-cloud/storage": "^7.0.1", "@omnivore/puppeteer-parse": "^1.0.0", + "@omnivore/utils": "1.0.0", "@sentry/serverless": "^7.77.0" }, "devDependencies": { @@ -27,7 +27,8 @@ "lint": "eslint src --ext ts,js,tsx,jsx", "build": "tsc", "start": "node build/src/app.js", - "start_gcf": "functions-framework --port=9090 --target=puppeteer" + "start_gcf": "functions-framework --port=9090 --target=puppeteer", + "dev": "concurrently \"tsc -w\" \"nodemon --watch ./build/ --exec npm run start\"" }, "volta": { "extends": "../../package.json" diff --git a/packages/content-fetch/src/index.ts b/packages/content-fetch/src/index.ts index d3bc341d1..0b725325b 100644 --- a/packages/content-fetch/src/index.ts +++ b/packages/content-fetch/src/index.ts @@ -1,6 +1,5 @@ import { HttpFunction } from '@google-cloud/functions-framework' import * as Sentry from '@sentry/serverless' -import 'dotenv/config' import { contentFetchRequestHandler } from './request_handler' Sentry.GCPFunction.init({ diff --git a/packages/content-fetch/src/job.ts b/packages/content-fetch/src/job.ts index c6ce525e0..68e1d3cd1 100644 --- a/packages/content-fetch/src/job.ts +++ b/packages/content-fetch/src/job.ts @@ -1,5 +1,5 @@ +import { RedisDataSource } from '@omnivore/utils' import { BulkJobOptions, Queue } from 'bullmq' -import { redisDataSource } from './redis_data_source' const QUEUE_NAME = 'omnivore-backend-queue' const JOB_NAME = 'save-page' @@ -31,10 +31,6 @@ interface SavePageJob { priority: 'low' | 'high' } -const queue = new Queue(QUEUE_NAME, { - connection: redisDataSource.queueRedisClient, -}) - const getPriority = (job: SavePageJob): number => { // we want to prioritized jobs by the expected time to complete // lower number means higher priority @@ -72,7 +68,10 @@ const getOpts = (job: SavePageJob): BulkJobOptions => { } } -export const queueSavePageJob = async (savePageJobs: SavePageJob[]) => { +export const queueSavePageJob = async ( + redisDataSource: RedisDataSource, + savePageJobs: SavePageJob[] +) => { const jobs = savePageJobs.map((job) => ({ name: JOB_NAME, data: job.data, @@ -80,5 +79,9 @@ export const queueSavePageJob = async (savePageJobs: SavePageJob[]) => { })) console.log('queue save page jobs:', JSON.stringify(jobs, null, 2)) + const queue = new Queue(QUEUE_NAME, { + connection: redisDataSource.queueRedisClient, + }) + return queue.addBulk(jobs) } diff --git a/packages/content-fetch/src/redis_data_source.ts b/packages/content-fetch/src/redis_data_source.ts deleted file mode 100644 index aa985348a..000000000 --- a/packages/content-fetch/src/redis_data_source.ts +++ /dev/null @@ -1,91 +0,0 @@ -import Redis, { RedisOptions } from 'ioredis' - -type RedisClientType = 'cache' | 'mq' -type RedisDataSourceOption = { - url?: string - cert?: string -} -export type RedisDataSourceOptions = { - [key in RedisClientType]: RedisDataSourceOption -} - -export class RedisDataSource { - options: RedisDataSourceOptions - - cacheClient: Redis - queueRedisClient: Redis - - constructor(options: RedisDataSourceOptions) { - this.options = options - - const cacheClient = createIORedisClient('cache', this.options) - if (!cacheClient) throw 'Error initializing cache redis client' - - this.cacheClient = cacheClient - this.queueRedisClient = - createIORedisClient('mq', this.options) || this.cacheClient // if mq is not defined, use cache - } - - async shutdown(): Promise { - try { - await this.queueRedisClient?.quit() - await this.cacheClient?.quit() - } catch (err) { - console.error('error while shutting down redis', err) - } - } -} - -const createIORedisClient = ( - name: RedisClientType, - options: RedisDataSourceOptions -): Redis | undefined => { - const option = options[name] - const redisURL = option.url - if (!redisURL) { - console.log(`no redisURL supplied: ${name}`) - return undefined - } - - const redisCert = option.cert - const tls = - redisURL.startsWith('rediss://') && redisCert - ? { - ca: redisCert, - rejectUnauthorized: false, - } - : undefined - - const redisOptions: RedisOptions = { - tls, - name, - connectTimeout: 10000, - maxRetriesPerRequest: null, - offlineQueue: false, - } - return new Redis(redisURL, redisOptions) -} - -export const redisDataSource = new RedisDataSource({ - cache: { - url: process.env.REDIS_URL, - cert: process.env.REDIS_CERT, - }, - mq: { - url: process.env.MQ_REDIS_URL, - cert: process.env.MQ_REDIS_CERT, - }, -}) - -// eslint-disable-next-line @typescript-eslint/no-misused-promises -process.on('SIGINT', async () => { - console.log('SIGINT signal received.') - - try { - await redisDataSource.shutdown() - } catch (error) { - console.error('error while shutting down redis', error) - } - - process.exit(0) -}) diff --git a/packages/content-fetch/src/request_handler.ts b/packages/content-fetch/src/request_handler.ts index afb7295f4..b1aa2aa1f 100644 --- a/packages/content-fetch/src/request_handler.ts +++ b/packages/content-fetch/src/request_handler.ts @@ -1,9 +1,10 @@ import { Storage } from '@google-cloud/storage' import { fetchContent } from '@omnivore/puppeteer-parse' +import { RedisDataSource } from '@omnivore/utils' +import 'dotenv/config' import { RequestHandler } from 'express' import { analytics } from './analytics' import { queueSavePageJob } from './job' -import { redisDataSource } from './redis_data_source' interface UserConfig { id: string @@ -92,6 +93,7 @@ const isFetchResult = (obj: unknown): obj is FetchResult => { } export const cacheFetchResult = async ( + redisDataSource: RedisDataSource, key: string, fetchResult: FetchResult ) => { @@ -102,6 +104,7 @@ export const cacheFetchResult = async ( } const getCachedFetchResult = async ( + redisDataSource: RedisDataSource, key: string ): Promise => { const result = await redisDataSource.cacheClient.get(key) @@ -171,9 +174,21 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { console.log(`Article parsing request`, logRecord) + // create redis source + const redisDataSource = new RedisDataSource({ + cache: { + url: process.env.REDIS_URL, + cert: process.env.REDIS_CERT, + }, + mq: { + url: process.env.MQ_REDIS_URL, + cert: process.env.MQ_REDIS_CERT, + }, + }) + try { const key = cacheKey(url, locale, timezone) - let fetchResult = await getCachedFetchResult(key) + let fetchResult = await getCachedFetchResult(redisDataSource, key) if (!fetchResult) { console.log( 'fetch result not found in cache, fetching content now...', @@ -184,7 +199,11 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { console.log('content has been fetched') if (fetchResult.content) { - const cacheResult = await cacheFetchResult(key, fetchResult) + const cacheResult = await cacheFetchResult( + redisDataSource, + key, + fetchResult + ) console.log('cache result', cacheResult) } } @@ -219,7 +238,7 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { priority, })) - const jobs = await queueSavePageJob(savePageJobs) + const jobs = await queueSavePageJob(redisDataSource, savePageJobs) console.log('save-page jobs queued', jobs.length) } catch (error) { if (error instanceof Error) { @@ -246,6 +265,8 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { }, } ) + + await redisDataSource.shutdown() } res.sendStatus(200) diff --git a/packages/text-to-speech/package.json b/packages/text-to-speech/package.json index c1db2650a..c6cf4f171 100644 --- a/packages/text-to-speech/package.json +++ b/packages/text-to-speech/package.json @@ -41,7 +41,6 @@ "axios": "^0.27.2", "dotenv": "^16.0.1", "html-to-text": "^8.2.1", - "ioredis": "^5.3.2", "jsonwebtoken": "^8.5.1", "linkedom": "^0.14.12", "microsoft-cognitiveservices-speech-sdk": "1.30", diff --git a/packages/text-to-speech/src/index.ts b/packages/text-to-speech/src/index.ts index d8c6f8b75..f868a5c62 100644 --- a/packages/text-to-speech/src/index.ts +++ b/packages/text-to-speech/src/index.ts @@ -382,7 +382,7 @@ export const textToSpeechStreamingHandler = Sentry.GCPFunction.wrapHttpFunction( console.error('Text to speech streaming error:', e) return res.status(500).send({ errorCodes: 'SYNTHESIZER_ERROR' }) } finally { - await redisDataSource.cacheClient.quit() + await redisDataSource.shutdown() console.log('Redis Client Disconnected') } }