diff --git a/packages/api/Dockerfile b/packages/api/Dockerfile index 889e41b57..6b7c38c7c 100644 --- a/packages/api/Dockerfile +++ b/packages/api/Dockerfile @@ -16,6 +16,7 @@ COPY /packages/api/package.json ./packages/api/package.json COPY /packages/text-to-speech/package.json ./packages/text-to-speech/package.json COPY /packages/content-handler/package.json ./packages/content-handler/package.json COPY /packages/liqe/package.json ./packages/liqe/package.json +COPY /packages/utils/package.json ./packages/utils/package.json RUN yarn install --pure-lockfile @@ -24,7 +25,9 @@ ADD /packages/api ./packages/api ADD /packages/text-to-speech ./packages/text-to-speech ADD /packages/content-handler ./packages/content-handler ADD /packages/liqe ./packages/liqe +ADD /packages/utils ./packages/utils +RUN yarn workspace @omnivore/utils build RUN yarn workspace @omnivore/text-to-speech-handler build RUN yarn workspace @omnivore/content-handler build RUN yarn workspace @omnivore/liqe build diff --git a/packages/content-fetch/Dockerfile b/packages/content-fetch/Dockerfile index d8ff553f1..b87362ee2 100644 --- a/packages/content-fetch/Dockerfile +++ b/packages/content-fetch/Dockerfile @@ -26,6 +26,7 @@ COPY /packages/content-fetch/package.json ./packages/content-fetch/package.json COPY /packages/readabilityjs/package.json ./packages/readabilityjs/package.json COPY /packages/content-handler/package.json ./packages/content-handler/package.json COPY /packages/puppeteer-parse/package.json ./packages/puppeteer-parse/package.json +COPY /packages/utils/package.json ./packages/utils/package.json RUN yarn install --pure-lockfile @@ -33,6 +34,8 @@ ADD /packages/content-fetch ./packages/content-fetch ADD /packages/content-handler ./packages/content-handler ADD /packages/puppeteer-parse ./packages/puppeteer-parse ADD /packages/readabilityjs ./packages/readabilityjs +ADD /packages/utils ./packages/utils +RUN yarn workspace @omnivore/utils build RUN yarn workspace @omnivore/content-handler build RUN yarn workspace @omnivore/puppeteer-parse build RUN yarn workspace @omnivore/content-fetch build diff --git a/packages/content-fetch/Dockerfile-gcf b/packages/content-fetch/Dockerfile-gcf index f0cf9209f..2cbabd931 100644 --- a/packages/content-fetch/Dockerfile-gcf +++ b/packages/content-fetch/Dockerfile-gcf @@ -30,6 +30,7 @@ COPY /packages/content-fetch/package.json ./packages/content-fetch/package.json COPY /packages/readabilityjs/package.json ./packages/readabilityjs/package.json COPY /packages/content-handler/package.json ./packages/content-handler/package.json COPY /packages/puppeteer-parse/package.json ./packages/puppeteer-parse/package.json +COPY /packages/utils/package.json ./packages/utils/package.json RUN yarn install --pure-lockfile @@ -37,6 +38,8 @@ ADD /packages/content-handler ./packages/content-handler ADD /packages/puppeteer-parse ./packages/puppeteer-parse ADD /packages/content-fetch ./packages/content-fetch ADD /packages/readabilityjs ./packages/readabilityjs +ADD /packages/utils ./packages/utils +RUN yarn workspace @omnivore/utils build RUN yarn workspace @omnivore/content-handler build RUN yarn workspace @omnivore/puppeteer-parse build RUN yarn workspace @omnivore/content-fetch build diff --git a/packages/content-fetch/package.json b/packages/content-fetch/package.json index 8836afa25..28f70ed19 100644 --- a/packages/content-fetch/package.json +++ b/packages/content-fetch/package.json @@ -10,11 +10,11 @@ "bullmq": "^5.1.1", "dotenv": "^8.2.0", "express": "^4.17.1", - "ioredis": "^5.3.2", "posthog-node": "^3.6.3", "@google-cloud/functions-framework": "^3.0.0", "@google-cloud/storage": "^7.0.1", "@omnivore/puppeteer-parse": "^1.0.0", + "@omnivore/utils": "1.0.0", "@sentry/serverless": "^7.77.0" }, "devDependencies": { diff --git a/packages/content-fetch/src/index.ts b/packages/content-fetch/src/index.ts index d3bc341d1..0b725325b 100644 --- a/packages/content-fetch/src/index.ts +++ b/packages/content-fetch/src/index.ts @@ -1,6 +1,5 @@ import { HttpFunction } from '@google-cloud/functions-framework' import * as Sentry from '@sentry/serverless' -import 'dotenv/config' import { contentFetchRequestHandler } from './request_handler' Sentry.GCPFunction.init({ diff --git a/packages/content-fetch/src/job.ts b/packages/content-fetch/src/job.ts index c6ce525e0..68e1d3cd1 100644 --- a/packages/content-fetch/src/job.ts +++ b/packages/content-fetch/src/job.ts @@ -1,5 +1,5 @@ +import { RedisDataSource } from '@omnivore/utils' import { BulkJobOptions, Queue } from 'bullmq' -import { redisDataSource } from './redis_data_source' const QUEUE_NAME = 'omnivore-backend-queue' const JOB_NAME = 'save-page' @@ -31,10 +31,6 @@ interface SavePageJob { priority: 'low' | 'high' } -const queue = new Queue(QUEUE_NAME, { - connection: redisDataSource.queueRedisClient, -}) - const getPriority = (job: SavePageJob): number => { // we want to prioritized jobs by the expected time to complete // lower number means higher priority @@ -72,7 +68,10 @@ const getOpts = (job: SavePageJob): BulkJobOptions => { } } -export const queueSavePageJob = async (savePageJobs: SavePageJob[]) => { +export const queueSavePageJob = async ( + redisDataSource: RedisDataSource, + savePageJobs: SavePageJob[] +) => { const jobs = savePageJobs.map((job) => ({ name: JOB_NAME, data: job.data, @@ -80,5 +79,9 @@ export const queueSavePageJob = async (savePageJobs: SavePageJob[]) => { })) console.log('queue save page jobs:', JSON.stringify(jobs, null, 2)) + const queue = new Queue(QUEUE_NAME, { + connection: redisDataSource.queueRedisClient, + }) + return queue.addBulk(jobs) } diff --git a/packages/content-fetch/src/redis_data_source.ts b/packages/content-fetch/src/redis_data_source.ts deleted file mode 100644 index aa985348a..000000000 --- a/packages/content-fetch/src/redis_data_source.ts +++ /dev/null @@ -1,91 +0,0 @@ -import Redis, { RedisOptions } from 'ioredis' - -type RedisClientType = 'cache' | 'mq' -type RedisDataSourceOption = { - url?: string - cert?: string -} -export type RedisDataSourceOptions = { - [key in RedisClientType]: RedisDataSourceOption -} - -export class RedisDataSource { - options: RedisDataSourceOptions - - cacheClient: Redis - queueRedisClient: Redis - - constructor(options: RedisDataSourceOptions) { - this.options = options - - const cacheClient = createIORedisClient('cache', this.options) - if (!cacheClient) throw 'Error initializing cache redis client' - - this.cacheClient = cacheClient - this.queueRedisClient = - createIORedisClient('mq', this.options) || this.cacheClient // if mq is not defined, use cache - } - - async shutdown(): Promise { - try { - await this.queueRedisClient?.quit() - await this.cacheClient?.quit() - } catch (err) { - console.error('error while shutting down redis', err) - } - } -} - -const createIORedisClient = ( - name: RedisClientType, - options: RedisDataSourceOptions -): Redis | undefined => { - const option = options[name] - const redisURL = option.url - if (!redisURL) { - console.log(`no redisURL supplied: ${name}`) - return undefined - } - - const redisCert = option.cert - const tls = - redisURL.startsWith('rediss://') && redisCert - ? { - ca: redisCert, - rejectUnauthorized: false, - } - : undefined - - const redisOptions: RedisOptions = { - tls, - name, - connectTimeout: 10000, - maxRetriesPerRequest: null, - offlineQueue: false, - } - return new Redis(redisURL, redisOptions) -} - -export const redisDataSource = new RedisDataSource({ - cache: { - url: process.env.REDIS_URL, - cert: process.env.REDIS_CERT, - }, - mq: { - url: process.env.MQ_REDIS_URL, - cert: process.env.MQ_REDIS_CERT, - }, -}) - -// eslint-disable-next-line @typescript-eslint/no-misused-promises -process.on('SIGINT', async () => { - console.log('SIGINT signal received.') - - try { - await redisDataSource.shutdown() - } catch (error) { - console.error('error while shutting down redis', error) - } - - process.exit(0) -}) diff --git a/packages/content-fetch/src/request_handler.ts b/packages/content-fetch/src/request_handler.ts index afb7295f4..b1aa2aa1f 100644 --- a/packages/content-fetch/src/request_handler.ts +++ b/packages/content-fetch/src/request_handler.ts @@ -1,9 +1,10 @@ import { Storage } from '@google-cloud/storage' import { fetchContent } from '@omnivore/puppeteer-parse' +import { RedisDataSource } from '@omnivore/utils' +import 'dotenv/config' import { RequestHandler } from 'express' import { analytics } from './analytics' import { queueSavePageJob } from './job' -import { redisDataSource } from './redis_data_source' interface UserConfig { id: string @@ -92,6 +93,7 @@ const isFetchResult = (obj: unknown): obj is FetchResult => { } export const cacheFetchResult = async ( + redisDataSource: RedisDataSource, key: string, fetchResult: FetchResult ) => { @@ -102,6 +104,7 @@ export const cacheFetchResult = async ( } const getCachedFetchResult = async ( + redisDataSource: RedisDataSource, key: string ): Promise => { const result = await redisDataSource.cacheClient.get(key) @@ -171,9 +174,21 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { console.log(`Article parsing request`, logRecord) + // create redis source + const redisDataSource = new RedisDataSource({ + cache: { + url: process.env.REDIS_URL, + cert: process.env.REDIS_CERT, + }, + mq: { + url: process.env.MQ_REDIS_URL, + cert: process.env.MQ_REDIS_CERT, + }, + }) + try { const key = cacheKey(url, locale, timezone) - let fetchResult = await getCachedFetchResult(key) + let fetchResult = await getCachedFetchResult(redisDataSource, key) if (!fetchResult) { console.log( 'fetch result not found in cache, fetching content now...', @@ -184,7 +199,11 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { console.log('content has been fetched') if (fetchResult.content) { - const cacheResult = await cacheFetchResult(key, fetchResult) + const cacheResult = await cacheFetchResult( + redisDataSource, + key, + fetchResult + ) console.log('cache result', cacheResult) } } @@ -219,7 +238,7 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { priority, })) - const jobs = await queueSavePageJob(savePageJobs) + const jobs = await queueSavePageJob(redisDataSource, savePageJobs) console.log('save-page jobs queued', jobs.length) } catch (error) { if (error instanceof Error) { @@ -246,6 +265,8 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { }, } ) + + await redisDataSource.shutdown() } res.sendStatus(200) diff --git a/packages/content-handler/package.json b/packages/content-handler/package.json index 832b2b4b4..e1f4604ee 100644 --- a/packages/content-handler/package.json +++ b/packages/content-handler/package.json @@ -30,9 +30,9 @@ "nock": "^13.2.9" }, "dependencies": { + "@omnivore/utils": "1.0.0", "addressparser": "^1.0.1", "axios": "^0.27.2", - "ioredis": "^5.3.2", "linkedom": "^0.14.16", "lodash": "^4.17.21", "luxon": "^3.0.4", diff --git a/packages/content-handler/src/redis.ts b/packages/content-handler/src/redis.ts deleted file mode 100644 index 5bc90decc..000000000 --- a/packages/content-handler/src/redis.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { Redis } from 'ioredis' - -export const createRedisClient = (url?: string, cert?: string) => { - return new Redis(url || 'redis://localhost:6379', { - connectTimeout: 10000, // 10 seconds - tls: cert - ? { - cert: cert.replace(/\\n/g, '\n'), // replace \n with new line - rejectUnauthorized: false, // for self-signed certs - } - : undefined, - reconnectOnError: (err) => { - const targetErrors = [/READONLY/, /ETIMEDOUT/] - - targetErrors.forEach((targetError) => { - if (targetError.test(err.message)) { - // Only reconnect when the error contains the keyword - return true - } - }) - - return false - }, - retryStrategy: (times) => { - if (times > 10) { - // End reconnecting after a specific number of tries and flush all commands with a individual error - return null - } - - // reconnect after - return Math.min(times * 50, 2000) - }, - }) -} diff --git a/packages/content-handler/src/websites/nitter-handler.ts b/packages/content-handler/src/websites/nitter-handler.ts deleted file mode 100644 index f8d927d6f..000000000 --- a/packages/content-handler/src/websites/nitter-handler.ts +++ /dev/null @@ -1,424 +0,0 @@ -import axios from 'axios' -import Redis from 'ioredis' -import { parseHTML } from 'linkedom' -import _, { truncate } from 'lodash' -import { DateTime } from 'luxon' -import { ContentHandler, PreHandleResult } from '../content-handler' -import { createRedisClient } from '../redis' - -interface Tweet { - url: string - author: { - username: string - name: string - profileImageUrl: string - } - text: string - entities: { - urls: { - url: string - displayUrl: string - }[] - } - attachments: { - type: string - url: string - previewUrl: string - }[] - createdAt: string -} - -export class NitterHandler extends ContentHandler { - // matches twitter.com and nitter.net urls - URL_MATCH = - /((x\.com)|(twitter\.com)|(nitter\.net))\/(?:#!\/)?(\w+)\/status(?:es)?\/(\d+)(?:\/.*)?/ - INSTANCES = [ - { value: 'https://nitter.moomoo.me', score: 0 }, - { value: 'https://nitter.net', score: 1 }, // the official instance - { value: 'https://nitter.lacontrevoie.fr', score: 2 }, - { value: 'https://nitter.kavin.rocks', score: 3 }, - { value: 'https://notabird.site', score: 4 }, - { value: 'https://singapore.unofficialbird.com', score: 5 }, - { value: 'https://nitter.fly.dev', score: 6 }, - ] - REDIS_KEY = 'nitter-instances' - - private instance: string - - constructor() { - super() - this.name = 'Nitter' - this.instance = '' - } - - async getInstances(redisClient: Redis) { - // get instances by score in ascending order - const instances = await redisClient.zrange( - this.REDIS_KEY, - '-inf', - '+inf', - 'BYSCORE' - ) - console.debug('instances', instances) - - // if no instance is found, save the default instances - if (instances.length === 0) { - // only add if the key does not exist - const result = await redisClient.zadd( - this.REDIS_KEY, - 'NX', - ...this.INSTANCES.map((i) => [i.score, i.value]).flat() - ) - console.debug('add instances', result) - - // expire the key after 1 day - const exp = await redisClient.expire(this.REDIS_KEY, 60 * 60 * 24) - console.debug('instances expire in 1 day', exp) - - return this.INSTANCES.map((i) => i.value) - } - - return instances - } - - async incrementInstanceScore( - redisClient: Redis, - instance: string, - score = 1 - ) { - await redisClient.zincrby(this.REDIS_KEY, score, instance) - } - - async getTweets(username: string, tweetId: string) { - function authorParser(header: Element) { - const profileImageUrl = - header.querySelector('.tweet-avatar img')?.getAttribute('src') ?? '' - const name = - header.querySelector('.fullname')?.getAttribute('title') ?? '' - const username = - header.querySelector('.username')?.getAttribute('title') ?? '' - - return { - profileImageUrl, - name, - username: username.replace('@', ''), // remove @ from username - } - } - - function dateParser(date: Element) { - const validDateTime = - date.getAttribute('title')?.replace(' · ', ' ') ?? Date.now() - - return new Date(validDateTime).toISOString() - } - - function urlParser(date: Element) { - return date.getAttribute('href') ?? '' - } - - function attachmentParser(attachments: Element | null) { - if (!attachments) return [] - - const photos = Array.from(attachments.querySelectorAll('img')).map( - (i) => ({ - url: i.getAttribute('src') ?? '', - type: 'photo', - previewUrl: i.getAttribute('src') ?? '', - }) - ) - const videos = Array.from(attachments.querySelectorAll('video')).map( - (i) => ({ - url: i.getAttribute('data-url') ?? '', - type: 'video', - previewUrl: i.getAttribute('poster') ?? '', - }) - ) - - return [...photos, ...videos] - } - - function parseTweet(tweet: Element): Tweet | null { - const header = tweet.querySelector('.tweet-header') - if (!header) { - console.error('no header found', tweet) - return null - } - const author = authorParser(header) - - const body = tweet.querySelector('.tweet-body') - if (!body) { - console.error('no body found', tweet) - return null - } - - const tweetDateElement = body.querySelector('.tweet-date a') - if (!tweetDateElement) { - console.error('no tweet date found', tweet) - return null - } - const createdAt = dateParser(tweetDateElement) - const url = urlParser(tweetDateElement) - - const content = body.querySelector('.tweet-content') - if (!content) { - console.error('no content found', tweet) - return null - } - const text = content.textContent ?? '' - const urls = Array.from(content.querySelectorAll('a')).map((a) => ({ - url: a.getAttribute('href') ?? '', - displayUrl: a.textContent ?? '', - })) - - const attachments = attachmentParser(body.querySelector('.attachments')) - - return { - author, - createdAt, - text, - url, - entities: { - urls, - }, - attachments, - } - } - - const redisClient = createRedisClient( - process.env.REDIS_URL, - process.env.REDIS_CERT - ) - - try { - const tweets: Tweet[] = [] - const option = { - timeout: 20000, // 20 seconds - } - let html = '' - // get instances from redis - const instances = await this.getInstances(redisClient) - for (const instance of instances) { - try { - const url = `${instance}/${username}/status/${tweetId}` - const startTime = Date.now() - const response = await axios.get(url, option) - const latency = Math.floor(Date.now() - startTime) - console.debug('latency', latency) - - html = response.data as string - this.instance = instance - - await this.incrementInstanceScore(redisClient, instance, latency) - break - } catch (error) { - await this.incrementInstanceScore( - redisClient, - instance, - option.timeout - ) - - if (axios.isAxiosError(error)) { - console.info(`Error getting tweets from ${instance}`, error.message) - } else { - console.info(`Error getting tweets from ${instance}`, error) - } - } - } - if (!this.instance || !html) { - console.error('no instance or html found') - return [] - } - - const document = parseHTML(html).document - - // get the main thread including tweets and threads - const mainThread = document.querySelector('.main-thread') - if (!mainThread) { - console.error('no main thread found') - return [] - } - const timelineItems = Array.from( - mainThread.querySelectorAll('.timeline-item') - ) - if (timelineItems.length === 0) { - console.error('no timeline items found') - return [] - } - for (let i = 0; i < timelineItems.length; i++) { - const item = timelineItems[i] - const classList = item.classList - // skip unavailable tweets and earlier replies - if ( - classList.contains('unavailable') || - classList.contains('earlier-replies') - ) { - console.info('skip unavailable tweets and earlier replies') - continue - } - // if there are more replies, get them - if (classList.contains('more-replies')) { - const newUrl = item.querySelector('a')?.getAttribute('href') - if (!newUrl) { - console.error('no new url', newUrl) - break - } - - let html = '' - try { - // go to new url and wait for it to load - const response = await axios.get( - `${this.instance}${newUrl}`, - option - ) - - html = response.data as string - } catch (error) { - console.error('Error getting tweets', error) - break - } - - const document = parseHTML(html).document - const nextThread = document.querySelector('.main-thread .after-tweet') - if (!nextThread) { - console.error('no next thread found') - break - } - - // get the new timeline items and add them to the list - const newTimelineItems = Array.from( - nextThread.querySelectorAll('.timeline-item') - ) - - timelineItems.push(...newTimelineItems) - continue - } - - const tweet = parseTweet(item) - // filter out replies - if ( - tweet && - tweet.author.username.toLowerCase() === username.toLowerCase() - ) { - tweets.push(tweet) - } - } - - return tweets - } catch (error) { - console.error('Error getting tweets', error) - - return [] - } finally { - await redisClient?.quit() - } - } - - parseTweetUrl = (url: string) => { - const match = url.match(this.URL_MATCH) - return { - domain: match?.[1]?.replace('x', 'twitter'), - username: match?.[4], - tweetId: match?.[5], - } - } - - titleForTweet = (author: { name: string }, text: string) => { - return `${author.name} on Twitter: ${truncate(text.replace(/http\S+/, ''), { - length: 100, - })}` - } - - formatTimestamp = (timestamp: string) => { - return DateTime.fromJSDate(new Date(timestamp)).toLocaleString( - DateTime.DATETIME_FULL - ) - } - - shouldPreHandle(url: string): boolean { - return this.URL_MATCH.test(url.toString()) - } - - async preHandle(url: string): Promise { - const { tweetId, username, domain } = this.parseTweetUrl(url) - if (!tweetId || !username || !domain) { - throw new Error('could not parse tweet url') - } - const tweets = await this.getTweets(username, tweetId) - if (tweets.length === 0) { - throw new Error('could not get tweets') - } - - const tweet = tweets[0] - const author = tweet.author - // escape html entities in title - const title = this.titleForTweet(author, tweet.text) - const escapedTitle = _.escape(title) - const authorImage = `${this.instance}${author.profileImageUrl.replace( - '_normal', - '_400x400' - )}` - const description = _.escape(tweet.text) || escapedTitle - const imageDomain = - domain.toLowerCase() === 'twitter.com' - ? 'https://pbs.twimg.com' - : 'https://nitter.net/pic' - - let tweetsContent = '' - for (const tweet of tweets) { - let text = tweet.text - for (const urlObj of tweet.entities.urls) { - text = text.replace( - urlObj.displayUrl, - `${urlObj.displayUrl}` - ) - } - - const includesHtml = tweet.attachments - .map( - (attachment) => - ` - - - - ` - ) - .join('\n') - - tweetsContent += `

${text}

${includesHtml}` - } - - const tweetUrl = ` - — ${ - author.username - } ${this.formatTimestamp(tweet.createdAt)}` - - const content = ` - - - - - - - - - - - - - -
- ${tweetsContent} - ${tweetUrl} -
- - ` - - return { content, url, title } - } -} diff --git a/packages/import-handler/Dockerfile b/packages/import-handler/Dockerfile index 9fc10597d..1b0dda2b5 100644 --- a/packages/import-handler/Dockerfile +++ b/packages/import-handler/Dockerfile @@ -14,11 +14,14 @@ COPY .eslintrc . COPY /packages/readabilityjs/package.json ./packages/readabilityjs/package.json COPY /packages/import-handler/package.json ./packages/import-handler/package.json +COPY /packages/utils/package.json ./packages/utils/package.json RUN yarn install --pure-lockfile +ADD /packages/utils ./packages/utils ADD /packages/import-handler ./packages/import-handler ADD /packages/readabilityjs ./packages/readabilityjs +RUN yarn workspace @omnivore/utils build RUN yarn workspace @omnivore/import-handler build # After building, fetch the production dependencies diff --git a/packages/import-handler/Dockerfile-collector b/packages/import-handler/Dockerfile-collector index 8a30021ec..0abe00d6a 100644 --- a/packages/import-handler/Dockerfile-collector +++ b/packages/import-handler/Dockerfile-collector @@ -14,11 +14,14 @@ COPY .eslintrc . COPY /packages/readabilityjs/package.json ./packages/readabilityjs/package.json COPY /packages/import-handler/package.json ./packages/import-handler/package.json +COPY /packages/utils/package.json ./packages/utils/package.json RUN yarn install --pure-lockfile ADD /packages/import-handler ./packages/import-handler ADD /packages/readabilityjs ./packages/readabilityjs +ADD /packages/utils ./packages/utils +RUN yarn workspace @omnivore/utils build RUN yarn workspace @omnivore/import-handler build # After building, fetch the production dependencies diff --git a/packages/import-handler/mocha-config.json b/packages/import-handler/mocha-config.json index 5a2631f42..8e24eb08b 100644 --- a/packages/import-handler/mocha-config.json +++ b/packages/import-handler/mocha-config.json @@ -1,4 +1,5 @@ { "extension": ["ts"], - "spec": "test/**/*.test.ts" + "spec": "test/**/*.test.ts", + "timeout": 10000 } diff --git a/packages/import-handler/package.json b/packages/import-handler/package.json index f62fae3f8..7641a598b 100644 --- a/packages/import-handler/package.json +++ b/packages/import-handler/package.json @@ -38,9 +38,11 @@ "@google-cloud/storage": "^7.0.1", "@google-cloud/tasks": "^4.0.0", "@omnivore/readability": "1.0.0", + "@omnivore/utils": "1.0.0", "@sentry/serverless": "^7.77.0", "@types/express": "^4.17.13", "axios": "^1.2.2", + "dotenv": "^16.0.1", "dompurify": "^2.4.3", "fs-extra": "^11.1.0", "glob": "^8.1.0", diff --git a/packages/import-handler/src/index.ts b/packages/import-handler/src/index.ts index d95751b47..de3379a26 100644 --- a/packages/import-handler/src/index.ts +++ b/packages/import-handler/src/index.ts @@ -1,7 +1,9 @@ import { Storage } from '@google-cloud/storage' import { Readability } from '@omnivore/readability' +import { RedisDataSource } from '@omnivore/utils' import * as Sentry from '@sentry/serverless' import axios from 'axios' +import 'dotenv/config' import Redis from 'ioredis' import * as jwt from 'jsonwebtoken' import { Stream } from 'node:stream' @@ -11,7 +13,6 @@ import { v4 as uuid } from 'uuid' import { importCsv } from './csv' import { importMatterArchive } from './matterHistory' import { ImportStatus, updateMetrics } from './metrics' -import { createRedisClient } from './redis' import { CONTENT_FETCH_URL, createCloudTask, emailUserUrl } from './task' export enum ArticleSavingRequestStatus { @@ -363,19 +364,26 @@ export const importHandler = Sentry.GCPFunction.wrapHttpFunction( const pubSubMessage = req.body.message.data as string const obj = getStorageEvent(pubSubMessage) if (obj) { - // create redis client - const redisClient = createRedisClient( - process.env.REDIS_URL, - process.env.REDIS_CERT - ) + // create redis source + const redisDataSource = new RedisDataSource({ + cache: { + url: process.env.REDIS_URL, + cert: process.env.REDIS_CERT, + }, + mq: { + url: process.env.MQ_REDIS_URL, + cert: process.env.MQ_REDIS_CERT, + }, + }) + try { - await handleEvent(obj, redisClient) + await handleEvent(obj, redisDataSource.cacheClient) } catch (err) { console.log('error handling event', { err, obj }) throw err } finally { // close redis client - await redisClient.quit() + await redisDataSource.shutdown() } } } else { @@ -413,14 +421,32 @@ export const importMetricsCollector = Sentry.GCPFunction.wrapHttpFunction( return res.status(400).send('Bad Request') } - const redisClient = createRedisClient( - process.env.REDIS_URL, - process.env.REDIS_CERT - ) - // update metrics - await updateMetrics(redisClient, userId, req.body.taskId, req.body.status) + // create redis source + const redisDataSource = new RedisDataSource({ + cache: { + url: process.env.REDIS_URL, + cert: process.env.REDIS_CERT, + }, + mq: { + url: process.env.MQ_REDIS_URL, + cert: process.env.MQ_REDIS_CERT, + }, + }) - await redisClient.quit() + try { + // update metrics + await updateMetrics( + redisDataSource.cacheClient, + userId, + req.body.taskId, + req.body.status + ) + } catch (error) { + console.error('Error updating metrics', error) + return res.status(500).send('Error updating metrics') + } finally { + await redisDataSource.shutdown() + } res.send('ok') } diff --git a/packages/import-handler/src/metrics.ts b/packages/import-handler/src/metrics.ts index b197356a0..41e4c734c 100644 --- a/packages/import-handler/src/metrics.ts +++ b/packages/import-handler/src/metrics.ts @@ -1,3 +1,4 @@ +import { RedisDataSource } from '@omnivore/utils' import Redis from 'ioredis' import { sendImportCompletedEmail } from '.' diff --git a/packages/import-handler/src/redis.ts b/packages/import-handler/src/redis.ts index ad765dcee..4419e6096 100644 --- a/packages/import-handler/src/redis.ts +++ b/packages/import-handler/src/redis.ts @@ -1,37 +1,4 @@ -import { Redis, Result } from 'ioredis' - -export const createRedisClient = (url?: string, cert?: string) => { - return new Redis(url || 'redis://localhost:6379', { - connectTimeout: 10000, // 10 seconds - tls: cert - ? { - cert: cert.replace(/\\n/g, '\n'), // replace \n with new line - rejectUnauthorized: false, // for self-signed certs - } - : undefined, - reconnectOnError: (err) => { - const targetErrors = [/READONLY/, /ETIMEDOUT/] - - targetErrors.forEach((targetError) => { - if (targetError.test(err.message)) { - // Only reconnect when the error contains the keyword - return true - } - }) - - return false - }, - retryStrategy: (times) => { - if (times > 10) { - // End reconnecting after a specific number of tries and flush all commands with a individual error - return null - } - - // reconnect after - return Math.min(times * 50, 2000) - }, - }) -} +import { Result } from 'ioredis' // Add declarations declare module 'ioredis' { diff --git a/packages/import-handler/test/csv/csv.test.ts b/packages/import-handler/test/csv/csv.test.ts index 69487483d..582444cab 100644 --- a/packages/import-handler/test/csv/csv.test.ts +++ b/packages/import-handler/test/csv/csv.test.ts @@ -1,3 +1,4 @@ +import { RedisDataSource } from '@omnivore/utils' import * as chai from 'chai' import { expect } from 'chai' import chaiString from 'chai-string' @@ -11,13 +12,25 @@ chai.use(chaiString) describe('Test csv importer', () => { let stub: ImportContext + let redisDataSource: RedisDataSource beforeEach(() => { - stub = stubImportCtx() + redisDataSource = new RedisDataSource({ + cache: { + url: process.env.REDIS_URL, + cert: process.env.REDIS_CERT, + }, + mq: { + url: process.env.MQ_REDIS_URL, + cert: process.env.MQ_REDIS_CERT, + }, + }) + + stub = stubImportCtx(redisDataSource.cacheClient) }) afterEach(async () => { - await stub.redisClient.quit() + await redisDataSource.shutdown() }) describe('Load a simple CSV file', () => { diff --git a/packages/import-handler/test/matter/matter_importer.test.ts b/packages/import-handler/test/matter/matter_importer.test.ts index 9fd777c37..3d6a84c00 100644 --- a/packages/import-handler/test/matter/matter_importer.test.ts +++ b/packages/import-handler/test/matter/matter_importer.test.ts @@ -1,4 +1,5 @@ import { Readability } from '@omnivore/readability' +import { RedisDataSource } from '@omnivore/utils' import * as chai from 'chai' import { expect } from 'chai' import chaiString from 'chai-string' @@ -13,50 +14,70 @@ import { stubImportCtx } from '../util' chai.use(chaiString) -describe('Load a simple _matter_history file', () => { - it('should find the URL of each row', async () => { - const urls: URL[] = [] - const stream = fs.createReadStream('./test/matter/data/_matter_history.csv') - const stub = stubImportCtx() - stub.urlHandler = (ctx: ImportContext, url): Promise => { - urls.push(url) - return Promise.resolve() - } +describe('matter importer', () => { + let stub: ImportContext + let redisDataSource: RedisDataSource - await importMatterHistoryCsv(stub, stream) - expect(stub.countFailed).to.equal(0) - expect(stub.countImported).to.equal(1) - expect(urls).to.eql([ - new URL('https://www.bloomberg.com/features/2022-the-crypto-story/'), - ]) + beforeEach(() => { + redisDataSource = new RedisDataSource({ + cache: { + url: process.env.REDIS_URL, + cert: process.env.REDIS_CERT, + }, + mq: { + url: process.env.MQ_REDIS_URL, + cert: process.env.MQ_REDIS_CERT, + }, + }) - await stub.redisClient.quit() - }) -}) - -describe('Load archive file', () => { - it('should find the URL of each row', async () => { - const urls: URL[] = [] - const stream = fs.createReadStream('./test/matter/data/Archive.zip') - const stub = stubImportCtx() - stub.contentHandler = ( - ctx: ImportContext, - url: URL, - title: string, - originalContent: string, - parseResult: Readability.ParseResult - ): Promise => { - urls.push(url) - return Promise.resolve() - } - - await importMatterArchive(stub, stream) - expect(stub.countFailed).to.equal(0) - expect(stub.countImported).to.equal(1) - expect(urls).to.eql([ - new URL('https://www.bloomberg.com/features/2022-the-crypto-story/'), - ]) - - await stub.redisClient.quit() + stub = stubImportCtx(redisDataSource.cacheClient) + }) + + afterEach(async () => { + await redisDataSource.shutdown() + }) + + describe('Load a simple _matter_history file', () => { + it('should find the URL of each row', async () => { + const urls: URL[] = [] + const stream = fs.createReadStream( + './test/matter/data/_matter_history.csv' + ) + stub.urlHandler = (ctx: ImportContext, url): Promise => { + urls.push(url) + return Promise.resolve() + } + + await importMatterHistoryCsv(stub, stream) + expect(stub.countFailed).to.equal(0) + expect(stub.countImported).to.equal(1) + expect(urls).to.eql([ + new URL('https://www.bloomberg.com/features/2022-the-crypto-story/'), + ]) + }) + }) + + describe('Load archive file', () => { + it('should find the URL of each row', async () => { + const urls: URL[] = [] + const stream = fs.createReadStream('./test/matter/data/Archive.zip') + stub.contentHandler = ( + ctx: ImportContext, + url: URL, + title: string, + originalContent: string, + parseResult: Readability.ParseResult + ): Promise => { + urls.push(url) + return Promise.resolve() + } + + await importMatterArchive(stub, stream) + expect(stub.countFailed).to.equal(0) + expect(stub.countImported).to.equal(1) + expect(urls).to.eql([ + new URL('https://www.bloomberg.com/features/2022-the-crypto-story/'), + ]) + }) }) }) diff --git a/packages/import-handler/test/util.ts b/packages/import-handler/test/util.ts index 04f7f7117..14550783d 100644 --- a/packages/import-handler/test/util.ts +++ b/packages/import-handler/test/util.ts @@ -1,10 +1,8 @@ import { Readability } from '@omnivore/readability' +import Redis from 'ioredis' import { ArticleSavingRequestStatus, ImportContext } from '../src' -import { createRedisClient } from '../src/redis' - -export const stubImportCtx = (): ImportContext => { - const redisClient = createRedisClient(process.env.REDIS_URL) +export const stubImportCtx = (redisClient: Redis): ImportContext => { return { userId: '', countImported: 0, diff --git a/packages/text-to-speech/Dockerfile b/packages/text-to-speech/Dockerfile index 4e526392a..17a501d5c 100644 --- a/packages/text-to-speech/Dockerfile +++ b/packages/text-to-speech/Dockerfile @@ -1,4 +1,4 @@ -FROM node:18.16-alpine +FROM node:18.16 # Run everything after as non-privileged user. WORKDIR /app @@ -10,9 +10,12 @@ COPY .prettierrc . COPY .eslintrc . COPY /packages/text-to-speech/package.json ./packages/text-to-speech/package.json +COPY /packages/utils/package.json ./packages/utils/package.json RUN yarn install --pure-lockfile +ADD /packages/utils ./packages/utils +RUN yarn workspace @omnivore/utils build ADD /packages/text-to-speech ./packages/text-to-speech RUN yarn workspace @omnivore/text-to-speech-handler build diff --git a/packages/text-to-speech/package.json b/packages/text-to-speech/package.json index b41f502bc..c6cf4f171 100644 --- a/packages/text-to-speech/package.json +++ b/packages/text-to-speech/package.json @@ -36,11 +36,11 @@ "dependencies": { "@google-cloud/functions-framework": "3.1.2", "@google-cloud/storage": "^7.0.1", + "@omnivore/utils": "1.0.0", "@sentry/serverless": "^7.77.0", "axios": "^0.27.2", "dotenv": "^16.0.1", "html-to-text": "^8.2.1", - "ioredis": "^5.3.2", "jsonwebtoken": "^8.5.1", "linkedom": "^0.14.12", "microsoft-cognitiveservices-speech-sdk": "1.30", diff --git a/packages/text-to-speech/src/index.ts b/packages/text-to-speech/src/index.ts index 08686e8dc..f868a5c62 100644 --- a/packages/text-to-speech/src/index.ts +++ b/packages/text-to-speech/src/index.ts @@ -4,17 +4,16 @@ /* eslint-disable @typescript-eslint/no-unused-vars */ import { File, Storage } from '@google-cloud/storage' +import { RedisDataSource } from '@omnivore/utils' import * as Sentry from '@sentry/serverless' import axios from 'axios' import crypto from 'crypto' import * as dotenv from 'dotenv' // see https://github.com/motdotla/dotenv#how-do-i-use-dotenv-with-import -import Redis from 'ioredis' import * as jwt from 'jsonwebtoken' import { AzureTextToSpeech } from './azureTextToSpeech' import { endSsml, htmlToSpeechFile, startSsml } from './htmlToSsml' import { OpenAITextToSpeech } from './openaiTextToSpeech' import { RealisticTextToSpeech } from './realisticTextToSpeech' -import { createRedisClient } from './redis' import { SpeechMark, TextToSpeechInput, @@ -115,10 +114,10 @@ const updateSpeech = async ( } const getCharacterCountFromRedis = async ( - redisClient: Redis, + redisClient: RedisDataSource, uid: string ): Promise => { - const wordCount = await redisClient.get(`tts:charCount:${uid}`) + const wordCount = await redisClient.cacheClient.get(`tts:charCount:${uid}`) return wordCount ? parseInt(wordCount) : 0 } @@ -126,11 +125,11 @@ const getCharacterCountFromRedis = async ( // which will be used to rate limit the request // expires after 1 day const updateCharacterCountInRedis = async ( - redisClient: Redis, + redisClient: RedisDataSource, uid: string, wordCount: number ) => { - await redisClient.set( + await redisClient.cacheClient.set( `tts:charCount:${uid}`, wordCount.toString(), 'EX', @@ -241,11 +240,17 @@ export const textToSpeechStreamingHandler = Sentry.GCPFunction.wrapHttpFunction( return res.status(401).send({ errorCode: 'UNAUTHENTICATED' }) } - // create redis client - const redisClient = createRedisClient( - process.env.REDIS_TTS_URL, - process.env.REDIS_TTS_CERT - ) + // create redis source + const redisDataSource = new RedisDataSource({ + cache: { + url: process.env.REDIS_URL, + cert: process.env.REDIS_CERT, + }, + mq: { + url: process.env.MQ_REDIS_URL, + cert: process.env.MQ_REDIS_CERT, + }, + }) try { const utteranceInput = req.body as UtteranceInput @@ -267,7 +272,7 @@ export const textToSpeechStreamingHandler = Sentry.GCPFunction.wrapHttpFunction( // validate character count const characterCount = - (await getCharacterCountFromRedis(redisClient, claim.uid)) + + (await getCharacterCountFromRedis(redisDataSource, claim.uid)) + utteranceInput.text.length if (characterCount > MAX_CHARACTER_COUNT) { return res.status(429).send('RATE_LIMITED') @@ -284,7 +289,7 @@ export const textToSpeechStreamingHandler = Sentry.GCPFunction.wrapHttpFunction( // hash ssml to get the cache key const cacheKey = crypto.createHash('md5').update(ssml).digest('hex') // find audio data in cache - const cacheResult = await redisClient.get(cacheKey) + const cacheResult = await redisDataSource.cacheClient.get(cacheKey) if (cacheResult) { console.log('Cache hit') const { audioDataString, speechMarks }: CacheResult = @@ -352,7 +357,7 @@ export const textToSpeechStreamingHandler = Sentry.GCPFunction.wrapHttpFunction( const audioDataString = audioData.toString('hex') // save audio data to cache for 72 hours for mainly the newsletters - await redisClient.set( + await redisDataSource.cacheClient.set( cacheKey, JSON.stringify({ audioDataString, speechMarks }), 'EX', @@ -362,7 +367,11 @@ export const textToSpeechStreamingHandler = Sentry.GCPFunction.wrapHttpFunction( console.log('Cache saved') // update character count - await updateCharacterCountInRedis(redisClient, claim.uid, characterCount) + await updateCharacterCountInRedis( + redisDataSource, + claim.uid, + characterCount + ) res.send({ idx: utteranceInput.idx, @@ -373,7 +382,7 @@ export const textToSpeechStreamingHandler = Sentry.GCPFunction.wrapHttpFunction( console.error('Text to speech streaming error:', e) return res.status(500).send({ errorCodes: 'SYNTHESIZER_ERROR' }) } finally { - await redisClient.quit() + await redisDataSource.shutdown() console.log('Redis Client Disconnected') } } diff --git a/packages/text-to-speech/src/redis.ts b/packages/text-to-speech/src/redis.ts deleted file mode 100644 index 5bc90decc..000000000 --- a/packages/text-to-speech/src/redis.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { Redis } from 'ioredis' - -export const createRedisClient = (url?: string, cert?: string) => { - return new Redis(url || 'redis://localhost:6379', { - connectTimeout: 10000, // 10 seconds - tls: cert - ? { - cert: cert.replace(/\\n/g, '\n'), // replace \n with new line - rejectUnauthorized: false, // for self-signed certs - } - : undefined, - reconnectOnError: (err) => { - const targetErrors = [/READONLY/, /ETIMEDOUT/] - - targetErrors.forEach((targetError) => { - if (targetError.test(err.message)) { - // Only reconnect when the error contains the keyword - return true - } - }) - - return false - }, - retryStrategy: (times) => { - if (times > 10) { - // End reconnecting after a specific number of tries and flush all commands with a individual error - return null - } - - // reconnect after - return Math.min(times * 50, 2000) - }, - }) -} diff --git a/packages/utils/src/redis_data_source.ts b/packages/utils/src/redis_data_source.ts index 3c982e90f..2d20fb3b8 100644 --- a/packages/utils/src/redis_data_source.ts +++ b/packages/utils/src/redis_data_source.ts @@ -28,9 +28,12 @@ export class RedisDataSource { async shutdown(): Promise { try { - await this.queueRedisClient?.quit() await this.cacheClient?.quit() + if (this.queueRedisClient !== this.cacheClient) { + await this.queueRedisClient.quit() + } + console.log('redis shutdown complete') } catch (err) { console.error('error while shutting down redis', err)