Merge pull request #4140 from omnivore-app/use-omnivore-utils
use omnivore utils
This commit is contained in:
@ -16,6 +16,7 @@ COPY /packages/api/package.json ./packages/api/package.json
|
||||
COPY /packages/text-to-speech/package.json ./packages/text-to-speech/package.json
|
||||
COPY /packages/content-handler/package.json ./packages/content-handler/package.json
|
||||
COPY /packages/liqe/package.json ./packages/liqe/package.json
|
||||
COPY /packages/utils/package.json ./packages/utils/package.json
|
||||
|
||||
RUN yarn install --pure-lockfile
|
||||
|
||||
@ -24,7 +25,9 @@ ADD /packages/api ./packages/api
|
||||
ADD /packages/text-to-speech ./packages/text-to-speech
|
||||
ADD /packages/content-handler ./packages/content-handler
|
||||
ADD /packages/liqe ./packages/liqe
|
||||
ADD /packages/utils ./packages/utils
|
||||
|
||||
RUN yarn workspace @omnivore/utils build
|
||||
RUN yarn workspace @omnivore/text-to-speech-handler build
|
||||
RUN yarn workspace @omnivore/content-handler build
|
||||
RUN yarn workspace @omnivore/liqe build
|
||||
|
||||
@ -26,6 +26,7 @@ COPY /packages/content-fetch/package.json ./packages/content-fetch/package.json
|
||||
COPY /packages/readabilityjs/package.json ./packages/readabilityjs/package.json
|
||||
COPY /packages/content-handler/package.json ./packages/content-handler/package.json
|
||||
COPY /packages/puppeteer-parse/package.json ./packages/puppeteer-parse/package.json
|
||||
COPY /packages/utils/package.json ./packages/utils/package.json
|
||||
|
||||
RUN yarn install --pure-lockfile
|
||||
|
||||
@ -33,6 +34,8 @@ ADD /packages/content-fetch ./packages/content-fetch
|
||||
ADD /packages/content-handler ./packages/content-handler
|
||||
ADD /packages/puppeteer-parse ./packages/puppeteer-parse
|
||||
ADD /packages/readabilityjs ./packages/readabilityjs
|
||||
ADD /packages/utils ./packages/utils
|
||||
RUN yarn workspace @omnivore/utils build
|
||||
RUN yarn workspace @omnivore/content-handler build
|
||||
RUN yarn workspace @omnivore/puppeteer-parse build
|
||||
RUN yarn workspace @omnivore/content-fetch build
|
||||
|
||||
@ -30,6 +30,7 @@ COPY /packages/content-fetch/package.json ./packages/content-fetch/package.json
|
||||
COPY /packages/readabilityjs/package.json ./packages/readabilityjs/package.json
|
||||
COPY /packages/content-handler/package.json ./packages/content-handler/package.json
|
||||
COPY /packages/puppeteer-parse/package.json ./packages/puppeteer-parse/package.json
|
||||
COPY /packages/utils/package.json ./packages/utils/package.json
|
||||
|
||||
RUN yarn install --pure-lockfile
|
||||
|
||||
@ -37,6 +38,8 @@ ADD /packages/content-handler ./packages/content-handler
|
||||
ADD /packages/puppeteer-parse ./packages/puppeteer-parse
|
||||
ADD /packages/content-fetch ./packages/content-fetch
|
||||
ADD /packages/readabilityjs ./packages/readabilityjs
|
||||
ADD /packages/utils ./packages/utils
|
||||
RUN yarn workspace @omnivore/utils build
|
||||
RUN yarn workspace @omnivore/content-handler build
|
||||
RUN yarn workspace @omnivore/puppeteer-parse build
|
||||
RUN yarn workspace @omnivore/content-fetch build
|
||||
|
||||
@ -10,11 +10,11 @@
|
||||
"bullmq": "^5.1.1",
|
||||
"dotenv": "^8.2.0",
|
||||
"express": "^4.17.1",
|
||||
"ioredis": "^5.3.2",
|
||||
"posthog-node": "^3.6.3",
|
||||
"@google-cloud/functions-framework": "^3.0.0",
|
||||
"@google-cloud/storage": "^7.0.1",
|
||||
"@omnivore/puppeteer-parse": "^1.0.0",
|
||||
"@omnivore/utils": "1.0.0",
|
||||
"@sentry/serverless": "^7.77.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
||||
@ -1,6 +1,5 @@
|
||||
import { HttpFunction } from '@google-cloud/functions-framework'
|
||||
import * as Sentry from '@sentry/serverless'
|
||||
import 'dotenv/config'
|
||||
import { contentFetchRequestHandler } from './request_handler'
|
||||
|
||||
Sentry.GCPFunction.init({
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import { RedisDataSource } from '@omnivore/utils'
|
||||
import { BulkJobOptions, Queue } from 'bullmq'
|
||||
import { redisDataSource } from './redis_data_source'
|
||||
|
||||
const QUEUE_NAME = 'omnivore-backend-queue'
|
||||
const JOB_NAME = 'save-page'
|
||||
@ -31,10 +31,6 @@ interface SavePageJob {
|
||||
priority: 'low' | 'high'
|
||||
}
|
||||
|
||||
const queue = new Queue(QUEUE_NAME, {
|
||||
connection: redisDataSource.queueRedisClient,
|
||||
})
|
||||
|
||||
const getPriority = (job: SavePageJob): number => {
|
||||
// we want to prioritized jobs by the expected time to complete
|
||||
// lower number means higher priority
|
||||
@ -72,7 +68,10 @@ const getOpts = (job: SavePageJob): BulkJobOptions => {
|
||||
}
|
||||
}
|
||||
|
||||
export const queueSavePageJob = async (savePageJobs: SavePageJob[]) => {
|
||||
export const queueSavePageJob = async (
|
||||
redisDataSource: RedisDataSource,
|
||||
savePageJobs: SavePageJob[]
|
||||
) => {
|
||||
const jobs = savePageJobs.map((job) => ({
|
||||
name: JOB_NAME,
|
||||
data: job.data,
|
||||
@ -80,5 +79,9 @@ export const queueSavePageJob = async (savePageJobs: SavePageJob[]) => {
|
||||
}))
|
||||
console.log('queue save page jobs:', JSON.stringify(jobs, null, 2))
|
||||
|
||||
const queue = new Queue(QUEUE_NAME, {
|
||||
connection: redisDataSource.queueRedisClient,
|
||||
})
|
||||
|
||||
return queue.addBulk(jobs)
|
||||
}
|
||||
|
||||
@ -1,91 +0,0 @@
|
||||
import Redis, { RedisOptions } from 'ioredis'
|
||||
|
||||
type RedisClientType = 'cache' | 'mq'
|
||||
type RedisDataSourceOption = {
|
||||
url?: string
|
||||
cert?: string
|
||||
}
|
||||
export type RedisDataSourceOptions = {
|
||||
[key in RedisClientType]: RedisDataSourceOption
|
||||
}
|
||||
|
||||
export class RedisDataSource {
|
||||
options: RedisDataSourceOptions
|
||||
|
||||
cacheClient: Redis
|
||||
queueRedisClient: Redis
|
||||
|
||||
constructor(options: RedisDataSourceOptions) {
|
||||
this.options = options
|
||||
|
||||
const cacheClient = createIORedisClient('cache', this.options)
|
||||
if (!cacheClient) throw 'Error initializing cache redis client'
|
||||
|
||||
this.cacheClient = cacheClient
|
||||
this.queueRedisClient =
|
||||
createIORedisClient('mq', this.options) || this.cacheClient // if mq is not defined, use cache
|
||||
}
|
||||
|
||||
async shutdown(): Promise<void> {
|
||||
try {
|
||||
await this.queueRedisClient?.quit()
|
||||
await this.cacheClient?.quit()
|
||||
} catch (err) {
|
||||
console.error('error while shutting down redis', err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const createIORedisClient = (
|
||||
name: RedisClientType,
|
||||
options: RedisDataSourceOptions
|
||||
): Redis | undefined => {
|
||||
const option = options[name]
|
||||
const redisURL = option.url
|
||||
if (!redisURL) {
|
||||
console.log(`no redisURL supplied: ${name}`)
|
||||
return undefined
|
||||
}
|
||||
|
||||
const redisCert = option.cert
|
||||
const tls =
|
||||
redisURL.startsWith('rediss://') && redisCert
|
||||
? {
|
||||
ca: redisCert,
|
||||
rejectUnauthorized: false,
|
||||
}
|
||||
: undefined
|
||||
|
||||
const redisOptions: RedisOptions = {
|
||||
tls,
|
||||
name,
|
||||
connectTimeout: 10000,
|
||||
maxRetriesPerRequest: null,
|
||||
offlineQueue: false,
|
||||
}
|
||||
return new Redis(redisURL, redisOptions)
|
||||
}
|
||||
|
||||
export const redisDataSource = new RedisDataSource({
|
||||
cache: {
|
||||
url: process.env.REDIS_URL,
|
||||
cert: process.env.REDIS_CERT,
|
||||
},
|
||||
mq: {
|
||||
url: process.env.MQ_REDIS_URL,
|
||||
cert: process.env.MQ_REDIS_CERT,
|
||||
},
|
||||
})
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-misused-promises
|
||||
process.on('SIGINT', async () => {
|
||||
console.log('SIGINT signal received.')
|
||||
|
||||
try {
|
||||
await redisDataSource.shutdown()
|
||||
} catch (error) {
|
||||
console.error('error while shutting down redis', error)
|
||||
}
|
||||
|
||||
process.exit(0)
|
||||
})
|
||||
@ -1,9 +1,10 @@
|
||||
import { Storage } from '@google-cloud/storage'
|
||||
import { fetchContent } from '@omnivore/puppeteer-parse'
|
||||
import { RedisDataSource } from '@omnivore/utils'
|
||||
import 'dotenv/config'
|
||||
import { RequestHandler } from 'express'
|
||||
import { analytics } from './analytics'
|
||||
import { queueSavePageJob } from './job'
|
||||
import { redisDataSource } from './redis_data_source'
|
||||
|
||||
interface UserConfig {
|
||||
id: string
|
||||
@ -92,6 +93,7 @@ const isFetchResult = (obj: unknown): obj is FetchResult => {
|
||||
}
|
||||
|
||||
export const cacheFetchResult = async (
|
||||
redisDataSource: RedisDataSource,
|
||||
key: string,
|
||||
fetchResult: FetchResult
|
||||
) => {
|
||||
@ -102,6 +104,7 @@ export const cacheFetchResult = async (
|
||||
}
|
||||
|
||||
const getCachedFetchResult = async (
|
||||
redisDataSource: RedisDataSource,
|
||||
key: string
|
||||
): Promise<FetchResult | undefined> => {
|
||||
const result = await redisDataSource.cacheClient.get(key)
|
||||
@ -171,9 +174,21 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
|
||||
|
||||
console.log(`Article parsing request`, logRecord)
|
||||
|
||||
// create redis source
|
||||
const redisDataSource = new RedisDataSource({
|
||||
cache: {
|
||||
url: process.env.REDIS_URL,
|
||||
cert: process.env.REDIS_CERT,
|
||||
},
|
||||
mq: {
|
||||
url: process.env.MQ_REDIS_URL,
|
||||
cert: process.env.MQ_REDIS_CERT,
|
||||
},
|
||||
})
|
||||
|
||||
try {
|
||||
const key = cacheKey(url, locale, timezone)
|
||||
let fetchResult = await getCachedFetchResult(key)
|
||||
let fetchResult = await getCachedFetchResult(redisDataSource, key)
|
||||
if (!fetchResult) {
|
||||
console.log(
|
||||
'fetch result not found in cache, fetching content now...',
|
||||
@ -184,7 +199,11 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
|
||||
console.log('content has been fetched')
|
||||
|
||||
if (fetchResult.content) {
|
||||
const cacheResult = await cacheFetchResult(key, fetchResult)
|
||||
const cacheResult = await cacheFetchResult(
|
||||
redisDataSource,
|
||||
key,
|
||||
fetchResult
|
||||
)
|
||||
console.log('cache result', cacheResult)
|
||||
}
|
||||
}
|
||||
@ -219,7 +238,7 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
|
||||
priority,
|
||||
}))
|
||||
|
||||
const jobs = await queueSavePageJob(savePageJobs)
|
||||
const jobs = await queueSavePageJob(redisDataSource, savePageJobs)
|
||||
console.log('save-page jobs queued', jobs.length)
|
||||
} catch (error) {
|
||||
if (error instanceof Error) {
|
||||
@ -246,6 +265,8 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
await redisDataSource.shutdown()
|
||||
}
|
||||
|
||||
res.sendStatus(200)
|
||||
|
||||
@ -30,9 +30,9 @@
|
||||
"nock": "^13.2.9"
|
||||
},
|
||||
"dependencies": {
|
||||
"@omnivore/utils": "1.0.0",
|
||||
"addressparser": "^1.0.1",
|
||||
"axios": "^0.27.2",
|
||||
"ioredis": "^5.3.2",
|
||||
"linkedom": "^0.14.16",
|
||||
"lodash": "^4.17.21",
|
||||
"luxon": "^3.0.4",
|
||||
|
||||
@ -1,34 +0,0 @@
|
||||
import { Redis } from 'ioredis'
|
||||
|
||||
export const createRedisClient = (url?: string, cert?: string) => {
|
||||
return new Redis(url || 'redis://localhost:6379', {
|
||||
connectTimeout: 10000, // 10 seconds
|
||||
tls: cert
|
||||
? {
|
||||
cert: cert.replace(/\\n/g, '\n'), // replace \n with new line
|
||||
rejectUnauthorized: false, // for self-signed certs
|
||||
}
|
||||
: undefined,
|
||||
reconnectOnError: (err) => {
|
||||
const targetErrors = [/READONLY/, /ETIMEDOUT/]
|
||||
|
||||
targetErrors.forEach((targetError) => {
|
||||
if (targetError.test(err.message)) {
|
||||
// Only reconnect when the error contains the keyword
|
||||
return true
|
||||
}
|
||||
})
|
||||
|
||||
return false
|
||||
},
|
||||
retryStrategy: (times) => {
|
||||
if (times > 10) {
|
||||
// End reconnecting after a specific number of tries and flush all commands with a individual error
|
||||
return null
|
||||
}
|
||||
|
||||
// reconnect after
|
||||
return Math.min(times * 50, 2000)
|
||||
},
|
||||
})
|
||||
}
|
||||
@ -1,424 +0,0 @@
|
||||
import axios from 'axios'
|
||||
import Redis from 'ioredis'
|
||||
import { parseHTML } from 'linkedom'
|
||||
import _, { truncate } from 'lodash'
|
||||
import { DateTime } from 'luxon'
|
||||
import { ContentHandler, PreHandleResult } from '../content-handler'
|
||||
import { createRedisClient } from '../redis'
|
||||
|
||||
interface Tweet {
|
||||
url: string
|
||||
author: {
|
||||
username: string
|
||||
name: string
|
||||
profileImageUrl: string
|
||||
}
|
||||
text: string
|
||||
entities: {
|
||||
urls: {
|
||||
url: string
|
||||
displayUrl: string
|
||||
}[]
|
||||
}
|
||||
attachments: {
|
||||
type: string
|
||||
url: string
|
||||
previewUrl: string
|
||||
}[]
|
||||
createdAt: string
|
||||
}
|
||||
|
||||
export class NitterHandler extends ContentHandler {
|
||||
// matches twitter.com and nitter.net urls
|
||||
URL_MATCH =
|
||||
/((x\.com)|(twitter\.com)|(nitter\.net))\/(?:#!\/)?(\w+)\/status(?:es)?\/(\d+)(?:\/.*)?/
|
||||
INSTANCES = [
|
||||
{ value: 'https://nitter.moomoo.me', score: 0 },
|
||||
{ value: 'https://nitter.net', score: 1 }, // the official instance
|
||||
{ value: 'https://nitter.lacontrevoie.fr', score: 2 },
|
||||
{ value: 'https://nitter.kavin.rocks', score: 3 },
|
||||
{ value: 'https://notabird.site', score: 4 },
|
||||
{ value: 'https://singapore.unofficialbird.com', score: 5 },
|
||||
{ value: 'https://nitter.fly.dev', score: 6 },
|
||||
]
|
||||
REDIS_KEY = 'nitter-instances'
|
||||
|
||||
private instance: string
|
||||
|
||||
constructor() {
|
||||
super()
|
||||
this.name = 'Nitter'
|
||||
this.instance = ''
|
||||
}
|
||||
|
||||
async getInstances(redisClient: Redis) {
|
||||
// get instances by score in ascending order
|
||||
const instances = await redisClient.zrange(
|
||||
this.REDIS_KEY,
|
||||
'-inf',
|
||||
'+inf',
|
||||
'BYSCORE'
|
||||
)
|
||||
console.debug('instances', instances)
|
||||
|
||||
// if no instance is found, save the default instances
|
||||
if (instances.length === 0) {
|
||||
// only add if the key does not exist
|
||||
const result = await redisClient.zadd(
|
||||
this.REDIS_KEY,
|
||||
'NX',
|
||||
...this.INSTANCES.map((i) => [i.score, i.value]).flat()
|
||||
)
|
||||
console.debug('add instances', result)
|
||||
|
||||
// expire the key after 1 day
|
||||
const exp = await redisClient.expire(this.REDIS_KEY, 60 * 60 * 24)
|
||||
console.debug('instances expire in 1 day', exp)
|
||||
|
||||
return this.INSTANCES.map((i) => i.value)
|
||||
}
|
||||
|
||||
return instances
|
||||
}
|
||||
|
||||
async incrementInstanceScore(
|
||||
redisClient: Redis,
|
||||
instance: string,
|
||||
score = 1
|
||||
) {
|
||||
await redisClient.zincrby(this.REDIS_KEY, score, instance)
|
||||
}
|
||||
|
||||
async getTweets(username: string, tweetId: string) {
|
||||
function authorParser(header: Element) {
|
||||
const profileImageUrl =
|
||||
header.querySelector('.tweet-avatar img')?.getAttribute('src') ?? ''
|
||||
const name =
|
||||
header.querySelector('.fullname')?.getAttribute('title') ?? ''
|
||||
const username =
|
||||
header.querySelector('.username')?.getAttribute('title') ?? ''
|
||||
|
||||
return {
|
||||
profileImageUrl,
|
||||
name,
|
||||
username: username.replace('@', ''), // remove @ from username
|
||||
}
|
||||
}
|
||||
|
||||
function dateParser(date: Element) {
|
||||
const validDateTime =
|
||||
date.getAttribute('title')?.replace(' · ', ' ') ?? Date.now()
|
||||
|
||||
return new Date(validDateTime).toISOString()
|
||||
}
|
||||
|
||||
function urlParser(date: Element) {
|
||||
return date.getAttribute('href') ?? ''
|
||||
}
|
||||
|
||||
function attachmentParser(attachments: Element | null) {
|
||||
if (!attachments) return []
|
||||
|
||||
const photos = Array.from(attachments.querySelectorAll('img')).map(
|
||||
(i) => ({
|
||||
url: i.getAttribute('src') ?? '',
|
||||
type: 'photo',
|
||||
previewUrl: i.getAttribute('src') ?? '',
|
||||
})
|
||||
)
|
||||
const videos = Array.from(attachments.querySelectorAll('video')).map(
|
||||
(i) => ({
|
||||
url: i.getAttribute('data-url') ?? '',
|
||||
type: 'video',
|
||||
previewUrl: i.getAttribute('poster') ?? '',
|
||||
})
|
||||
)
|
||||
|
||||
return [...photos, ...videos]
|
||||
}
|
||||
|
||||
function parseTweet(tweet: Element): Tweet | null {
|
||||
const header = tweet.querySelector('.tweet-header')
|
||||
if (!header) {
|
||||
console.error('no header found', tweet)
|
||||
return null
|
||||
}
|
||||
const author = authorParser(header)
|
||||
|
||||
const body = tweet.querySelector('.tweet-body')
|
||||
if (!body) {
|
||||
console.error('no body found', tweet)
|
||||
return null
|
||||
}
|
||||
|
||||
const tweetDateElement = body.querySelector('.tweet-date a')
|
||||
if (!tweetDateElement) {
|
||||
console.error('no tweet date found', tweet)
|
||||
return null
|
||||
}
|
||||
const createdAt = dateParser(tweetDateElement)
|
||||
const url = urlParser(tweetDateElement)
|
||||
|
||||
const content = body.querySelector('.tweet-content')
|
||||
if (!content) {
|
||||
console.error('no content found', tweet)
|
||||
return null
|
||||
}
|
||||
const text = content.textContent ?? ''
|
||||
const urls = Array.from(content.querySelectorAll('a')).map((a) => ({
|
||||
url: a.getAttribute('href') ?? '',
|
||||
displayUrl: a.textContent ?? '',
|
||||
}))
|
||||
|
||||
const attachments = attachmentParser(body.querySelector('.attachments'))
|
||||
|
||||
return {
|
||||
author,
|
||||
createdAt,
|
||||
text,
|
||||
url,
|
||||
entities: {
|
||||
urls,
|
||||
},
|
||||
attachments,
|
||||
}
|
||||
}
|
||||
|
||||
const redisClient = createRedisClient(
|
||||
process.env.REDIS_URL,
|
||||
process.env.REDIS_CERT
|
||||
)
|
||||
|
||||
try {
|
||||
const tweets: Tweet[] = []
|
||||
const option = {
|
||||
timeout: 20000, // 20 seconds
|
||||
}
|
||||
let html = ''
|
||||
// get instances from redis
|
||||
const instances = await this.getInstances(redisClient)
|
||||
for (const instance of instances) {
|
||||
try {
|
||||
const url = `${instance}/${username}/status/${tweetId}`
|
||||
const startTime = Date.now()
|
||||
const response = await axios.get(url, option)
|
||||
const latency = Math.floor(Date.now() - startTime)
|
||||
console.debug('latency', latency)
|
||||
|
||||
html = response.data as string
|
||||
this.instance = instance
|
||||
|
||||
await this.incrementInstanceScore(redisClient, instance, latency)
|
||||
break
|
||||
} catch (error) {
|
||||
await this.incrementInstanceScore(
|
||||
redisClient,
|
||||
instance,
|
||||
option.timeout
|
||||
)
|
||||
|
||||
if (axios.isAxiosError(error)) {
|
||||
console.info(`Error getting tweets from ${instance}`, error.message)
|
||||
} else {
|
||||
console.info(`Error getting tweets from ${instance}`, error)
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!this.instance || !html) {
|
||||
console.error('no instance or html found')
|
||||
return []
|
||||
}
|
||||
|
||||
const document = parseHTML(html).document
|
||||
|
||||
// get the main thread including tweets and threads
|
||||
const mainThread = document.querySelector('.main-thread')
|
||||
if (!mainThread) {
|
||||
console.error('no main thread found')
|
||||
return []
|
||||
}
|
||||
const timelineItems = Array.from(
|
||||
mainThread.querySelectorAll('.timeline-item')
|
||||
)
|
||||
if (timelineItems.length === 0) {
|
||||
console.error('no timeline items found')
|
||||
return []
|
||||
}
|
||||
for (let i = 0; i < timelineItems.length; i++) {
|
||||
const item = timelineItems[i]
|
||||
const classList = item.classList
|
||||
// skip unavailable tweets and earlier replies
|
||||
if (
|
||||
classList.contains('unavailable') ||
|
||||
classList.contains('earlier-replies')
|
||||
) {
|
||||
console.info('skip unavailable tweets and earlier replies')
|
||||
continue
|
||||
}
|
||||
// if there are more replies, get them
|
||||
if (classList.contains('more-replies')) {
|
||||
const newUrl = item.querySelector('a')?.getAttribute('href')
|
||||
if (!newUrl) {
|
||||
console.error('no new url', newUrl)
|
||||
break
|
||||
}
|
||||
|
||||
let html = ''
|
||||
try {
|
||||
// go to new url and wait for it to load
|
||||
const response = await axios.get(
|
||||
`${this.instance}${newUrl}`,
|
||||
option
|
||||
)
|
||||
|
||||
html = response.data as string
|
||||
} catch (error) {
|
||||
console.error('Error getting tweets', error)
|
||||
break
|
||||
}
|
||||
|
||||
const document = parseHTML(html).document
|
||||
const nextThread = document.querySelector('.main-thread .after-tweet')
|
||||
if (!nextThread) {
|
||||
console.error('no next thread found')
|
||||
break
|
||||
}
|
||||
|
||||
// get the new timeline items and add them to the list
|
||||
const newTimelineItems = Array.from(
|
||||
nextThread.querySelectorAll('.timeline-item')
|
||||
)
|
||||
|
||||
timelineItems.push(...newTimelineItems)
|
||||
continue
|
||||
}
|
||||
|
||||
const tweet = parseTweet(item)
|
||||
// filter out replies
|
||||
if (
|
||||
tweet &&
|
||||
tweet.author.username.toLowerCase() === username.toLowerCase()
|
||||
) {
|
||||
tweets.push(tweet)
|
||||
}
|
||||
}
|
||||
|
||||
return tweets
|
||||
} catch (error) {
|
||||
console.error('Error getting tweets', error)
|
||||
|
||||
return []
|
||||
} finally {
|
||||
await redisClient?.quit()
|
||||
}
|
||||
}
|
||||
|
||||
parseTweetUrl = (url: string) => {
|
||||
const match = url.match(this.URL_MATCH)
|
||||
return {
|
||||
domain: match?.[1]?.replace('x', 'twitter'),
|
||||
username: match?.[4],
|
||||
tweetId: match?.[5],
|
||||
}
|
||||
}
|
||||
|
||||
titleForTweet = (author: { name: string }, text: string) => {
|
||||
return `${author.name} on Twitter: ${truncate(text.replace(/http\S+/, ''), {
|
||||
length: 100,
|
||||
})}`
|
||||
}
|
||||
|
||||
formatTimestamp = (timestamp: string) => {
|
||||
return DateTime.fromJSDate(new Date(timestamp)).toLocaleString(
|
||||
DateTime.DATETIME_FULL
|
||||
)
|
||||
}
|
||||
|
||||
shouldPreHandle(url: string): boolean {
|
||||
return this.URL_MATCH.test(url.toString())
|
||||
}
|
||||
|
||||
async preHandle(url: string): Promise<PreHandleResult> {
|
||||
const { tweetId, username, domain } = this.parseTweetUrl(url)
|
||||
if (!tweetId || !username || !domain) {
|
||||
throw new Error('could not parse tweet url')
|
||||
}
|
||||
const tweets = await this.getTweets(username, tweetId)
|
||||
if (tweets.length === 0) {
|
||||
throw new Error('could not get tweets')
|
||||
}
|
||||
|
||||
const tweet = tweets[0]
|
||||
const author = tweet.author
|
||||
// escape html entities in title
|
||||
const title = this.titleForTweet(author, tweet.text)
|
||||
const escapedTitle = _.escape(title)
|
||||
const authorImage = `${this.instance}${author.profileImageUrl.replace(
|
||||
'_normal',
|
||||
'_400x400'
|
||||
)}`
|
||||
const description = _.escape(tweet.text) || escapedTitle
|
||||
const imageDomain =
|
||||
domain.toLowerCase() === 'twitter.com'
|
||||
? 'https://pbs.twimg.com'
|
||||
: 'https://nitter.net/pic'
|
||||
|
||||
let tweetsContent = ''
|
||||
for (const tweet of tweets) {
|
||||
let text = tweet.text
|
||||
for (const urlObj of tweet.entities.urls) {
|
||||
text = text.replace(
|
||||
urlObj.displayUrl,
|
||||
`<a href="${urlObj.url}">${urlObj.displayUrl}</a>`
|
||||
)
|
||||
}
|
||||
|
||||
const includesHtml = tweet.attachments
|
||||
.map(
|
||||
(attachment) =>
|
||||
`<a class="media-link" href=${imageDomain}${decodeURIComponent(
|
||||
attachment.url
|
||||
).replace('/pic', '')}>
|
||||
<picture>
|
||||
<img class="tweet-img" src=${imageDomain}${decodeURIComponent(
|
||||
attachment.previewUrl
|
||||
).replace('/pic', '')} />
|
||||
</picture>
|
||||
</a>`
|
||||
)
|
||||
.join('\n')
|
||||
|
||||
tweetsContent += `<p class="_omnivore_tweet_content">${text}</p>${includesHtml}`
|
||||
}
|
||||
|
||||
const tweetUrl = `
|
||||
— <a href="https://${domain}/${author.username}">${
|
||||
author.username
|
||||
}</a> <span itemscope itemtype="https://schema.org/Person" itemprop="author">${
|
||||
author.name
|
||||
}</span> <a href="${url}">${this.formatTimestamp(tweet.createdAt)}</a>`
|
||||
|
||||
const content = `
|
||||
<html>
|
||||
<head>
|
||||
<meta property="og:image" content="${authorImage}" />
|
||||
<meta property="og:image:secure_url" content="${authorImage}" />
|
||||
<meta property="og:title" content="${escapedTitle}" />
|
||||
<meta property="og:description" content="${description}" />
|
||||
<meta property="article:published_time" content="${tweet.createdAt}" />
|
||||
<meta property="og:site_name" content="Twitter" />
|
||||
<meta property="og:type" content="tweet" />
|
||||
<meta property="dc:creator" content="${author.name}" />
|
||||
<meta property="twitter:description" content="${description}" />
|
||||
</head>
|
||||
<body>
|
||||
<div class="_omnivore_twitter">
|
||||
${tweetsContent}
|
||||
${tweetUrl}
|
||||
</div>
|
||||
</body>
|
||||
</html>`
|
||||
|
||||
return { content, url, title }
|
||||
}
|
||||
}
|
||||
@ -14,11 +14,14 @@ COPY .eslintrc .
|
||||
|
||||
COPY /packages/readabilityjs/package.json ./packages/readabilityjs/package.json
|
||||
COPY /packages/import-handler/package.json ./packages/import-handler/package.json
|
||||
COPY /packages/utils/package.json ./packages/utils/package.json
|
||||
|
||||
RUN yarn install --pure-lockfile
|
||||
|
||||
ADD /packages/utils ./packages/utils
|
||||
ADD /packages/import-handler ./packages/import-handler
|
||||
ADD /packages/readabilityjs ./packages/readabilityjs
|
||||
RUN yarn workspace @omnivore/utils build
|
||||
RUN yarn workspace @omnivore/import-handler build
|
||||
|
||||
# After building, fetch the production dependencies
|
||||
|
||||
@ -14,11 +14,14 @@ COPY .eslintrc .
|
||||
|
||||
COPY /packages/readabilityjs/package.json ./packages/readabilityjs/package.json
|
||||
COPY /packages/import-handler/package.json ./packages/import-handler/package.json
|
||||
COPY /packages/utils/package.json ./packages/utils/package.json
|
||||
|
||||
RUN yarn install --pure-lockfile
|
||||
|
||||
ADD /packages/import-handler ./packages/import-handler
|
||||
ADD /packages/readabilityjs ./packages/readabilityjs
|
||||
ADD /packages/utils ./packages/utils
|
||||
RUN yarn workspace @omnivore/utils build
|
||||
RUN yarn workspace @omnivore/import-handler build
|
||||
|
||||
# After building, fetch the production dependencies
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
{
|
||||
"extension": ["ts"],
|
||||
"spec": "test/**/*.test.ts"
|
||||
"spec": "test/**/*.test.ts",
|
||||
"timeout": 10000
|
||||
}
|
||||
|
||||
@ -38,9 +38,11 @@
|
||||
"@google-cloud/storage": "^7.0.1",
|
||||
"@google-cloud/tasks": "^4.0.0",
|
||||
"@omnivore/readability": "1.0.0",
|
||||
"@omnivore/utils": "1.0.0",
|
||||
"@sentry/serverless": "^7.77.0",
|
||||
"@types/express": "^4.17.13",
|
||||
"axios": "^1.2.2",
|
||||
"dotenv": "^16.0.1",
|
||||
"dompurify": "^2.4.3",
|
||||
"fs-extra": "^11.1.0",
|
||||
"glob": "^8.1.0",
|
||||
|
||||
@ -1,7 +1,9 @@
|
||||
import { Storage } from '@google-cloud/storage'
|
||||
import { Readability } from '@omnivore/readability'
|
||||
import { RedisDataSource } from '@omnivore/utils'
|
||||
import * as Sentry from '@sentry/serverless'
|
||||
import axios from 'axios'
|
||||
import 'dotenv/config'
|
||||
import Redis from 'ioredis'
|
||||
import * as jwt from 'jsonwebtoken'
|
||||
import { Stream } from 'node:stream'
|
||||
@ -11,7 +13,6 @@ import { v4 as uuid } from 'uuid'
|
||||
import { importCsv } from './csv'
|
||||
import { importMatterArchive } from './matterHistory'
|
||||
import { ImportStatus, updateMetrics } from './metrics'
|
||||
import { createRedisClient } from './redis'
|
||||
import { CONTENT_FETCH_URL, createCloudTask, emailUserUrl } from './task'
|
||||
|
||||
export enum ArticleSavingRequestStatus {
|
||||
@ -363,19 +364,26 @@ export const importHandler = Sentry.GCPFunction.wrapHttpFunction(
|
||||
const pubSubMessage = req.body.message.data as string
|
||||
const obj = getStorageEvent(pubSubMessage)
|
||||
if (obj) {
|
||||
// create redis client
|
||||
const redisClient = createRedisClient(
|
||||
process.env.REDIS_URL,
|
||||
process.env.REDIS_CERT
|
||||
)
|
||||
// create redis source
|
||||
const redisDataSource = new RedisDataSource({
|
||||
cache: {
|
||||
url: process.env.REDIS_URL,
|
||||
cert: process.env.REDIS_CERT,
|
||||
},
|
||||
mq: {
|
||||
url: process.env.MQ_REDIS_URL,
|
||||
cert: process.env.MQ_REDIS_CERT,
|
||||
},
|
||||
})
|
||||
|
||||
try {
|
||||
await handleEvent(obj, redisClient)
|
||||
await handleEvent(obj, redisDataSource.cacheClient)
|
||||
} catch (err) {
|
||||
console.log('error handling event', { err, obj })
|
||||
throw err
|
||||
} finally {
|
||||
// close redis client
|
||||
await redisClient.quit()
|
||||
await redisDataSource.shutdown()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -413,14 +421,32 @@ export const importMetricsCollector = Sentry.GCPFunction.wrapHttpFunction(
|
||||
return res.status(400).send('Bad Request')
|
||||
}
|
||||
|
||||
const redisClient = createRedisClient(
|
||||
process.env.REDIS_URL,
|
||||
process.env.REDIS_CERT
|
||||
)
|
||||
// update metrics
|
||||
await updateMetrics(redisClient, userId, req.body.taskId, req.body.status)
|
||||
// create redis source
|
||||
const redisDataSource = new RedisDataSource({
|
||||
cache: {
|
||||
url: process.env.REDIS_URL,
|
||||
cert: process.env.REDIS_CERT,
|
||||
},
|
||||
mq: {
|
||||
url: process.env.MQ_REDIS_URL,
|
||||
cert: process.env.MQ_REDIS_CERT,
|
||||
},
|
||||
})
|
||||
|
||||
await redisClient.quit()
|
||||
try {
|
||||
// update metrics
|
||||
await updateMetrics(
|
||||
redisDataSource.cacheClient,
|
||||
userId,
|
||||
req.body.taskId,
|
||||
req.body.status
|
||||
)
|
||||
} catch (error) {
|
||||
console.error('Error updating metrics', error)
|
||||
return res.status(500).send('Error updating metrics')
|
||||
} finally {
|
||||
await redisDataSource.shutdown()
|
||||
}
|
||||
|
||||
res.send('ok')
|
||||
}
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
import { RedisDataSource } from '@omnivore/utils'
|
||||
import Redis from 'ioredis'
|
||||
import { sendImportCompletedEmail } from '.'
|
||||
|
||||
|
||||
@ -1,37 +1,4 @@
|
||||
import { Redis, Result } from 'ioredis'
|
||||
|
||||
export const createRedisClient = (url?: string, cert?: string) => {
|
||||
return new Redis(url || 'redis://localhost:6379', {
|
||||
connectTimeout: 10000, // 10 seconds
|
||||
tls: cert
|
||||
? {
|
||||
cert: cert.replace(/\\n/g, '\n'), // replace \n with new line
|
||||
rejectUnauthorized: false, // for self-signed certs
|
||||
}
|
||||
: undefined,
|
||||
reconnectOnError: (err) => {
|
||||
const targetErrors = [/READONLY/, /ETIMEDOUT/]
|
||||
|
||||
targetErrors.forEach((targetError) => {
|
||||
if (targetError.test(err.message)) {
|
||||
// Only reconnect when the error contains the keyword
|
||||
return true
|
||||
}
|
||||
})
|
||||
|
||||
return false
|
||||
},
|
||||
retryStrategy: (times) => {
|
||||
if (times > 10) {
|
||||
// End reconnecting after a specific number of tries and flush all commands with a individual error
|
||||
return null
|
||||
}
|
||||
|
||||
// reconnect after
|
||||
return Math.min(times * 50, 2000)
|
||||
},
|
||||
})
|
||||
}
|
||||
import { Result } from 'ioredis'
|
||||
|
||||
// Add declarations
|
||||
declare module 'ioredis' {
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
import { RedisDataSource } from '@omnivore/utils'
|
||||
import * as chai from 'chai'
|
||||
import { expect } from 'chai'
|
||||
import chaiString from 'chai-string'
|
||||
@ -11,13 +12,25 @@ chai.use(chaiString)
|
||||
|
||||
describe('Test csv importer', () => {
|
||||
let stub: ImportContext
|
||||
let redisDataSource: RedisDataSource
|
||||
|
||||
beforeEach(() => {
|
||||
stub = stubImportCtx()
|
||||
redisDataSource = new RedisDataSource({
|
||||
cache: {
|
||||
url: process.env.REDIS_URL,
|
||||
cert: process.env.REDIS_CERT,
|
||||
},
|
||||
mq: {
|
||||
url: process.env.MQ_REDIS_URL,
|
||||
cert: process.env.MQ_REDIS_CERT,
|
||||
},
|
||||
})
|
||||
|
||||
stub = stubImportCtx(redisDataSource.cacheClient)
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
await stub.redisClient.quit()
|
||||
await redisDataSource.shutdown()
|
||||
})
|
||||
|
||||
describe('Load a simple CSV file', () => {
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import { Readability } from '@omnivore/readability'
|
||||
import { RedisDataSource } from '@omnivore/utils'
|
||||
import * as chai from 'chai'
|
||||
import { expect } from 'chai'
|
||||
import chaiString from 'chai-string'
|
||||
@ -13,50 +14,70 @@ import { stubImportCtx } from '../util'
|
||||
|
||||
chai.use(chaiString)
|
||||
|
||||
describe('Load a simple _matter_history file', () => {
|
||||
it('should find the URL of each row', async () => {
|
||||
const urls: URL[] = []
|
||||
const stream = fs.createReadStream('./test/matter/data/_matter_history.csv')
|
||||
const stub = stubImportCtx()
|
||||
stub.urlHandler = (ctx: ImportContext, url): Promise<void> => {
|
||||
urls.push(url)
|
||||
return Promise.resolve()
|
||||
}
|
||||
describe('matter importer', () => {
|
||||
let stub: ImportContext
|
||||
let redisDataSource: RedisDataSource
|
||||
|
||||
await importMatterHistoryCsv(stub, stream)
|
||||
expect(stub.countFailed).to.equal(0)
|
||||
expect(stub.countImported).to.equal(1)
|
||||
expect(urls).to.eql([
|
||||
new URL('https://www.bloomberg.com/features/2022-the-crypto-story/'),
|
||||
])
|
||||
beforeEach(() => {
|
||||
redisDataSource = new RedisDataSource({
|
||||
cache: {
|
||||
url: process.env.REDIS_URL,
|
||||
cert: process.env.REDIS_CERT,
|
||||
},
|
||||
mq: {
|
||||
url: process.env.MQ_REDIS_URL,
|
||||
cert: process.env.MQ_REDIS_CERT,
|
||||
},
|
||||
})
|
||||
|
||||
await stub.redisClient.quit()
|
||||
})
|
||||
})
|
||||
|
||||
describe('Load archive file', () => {
|
||||
it('should find the URL of each row', async () => {
|
||||
const urls: URL[] = []
|
||||
const stream = fs.createReadStream('./test/matter/data/Archive.zip')
|
||||
const stub = stubImportCtx()
|
||||
stub.contentHandler = (
|
||||
ctx: ImportContext,
|
||||
url: URL,
|
||||
title: string,
|
||||
originalContent: string,
|
||||
parseResult: Readability.ParseResult
|
||||
): Promise<void> => {
|
||||
urls.push(url)
|
||||
return Promise.resolve()
|
||||
}
|
||||
|
||||
await importMatterArchive(stub, stream)
|
||||
expect(stub.countFailed).to.equal(0)
|
||||
expect(stub.countImported).to.equal(1)
|
||||
expect(urls).to.eql([
|
||||
new URL('https://www.bloomberg.com/features/2022-the-crypto-story/'),
|
||||
])
|
||||
|
||||
await stub.redisClient.quit()
|
||||
stub = stubImportCtx(redisDataSource.cacheClient)
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
await redisDataSource.shutdown()
|
||||
})
|
||||
|
||||
describe('Load a simple _matter_history file', () => {
|
||||
it('should find the URL of each row', async () => {
|
||||
const urls: URL[] = []
|
||||
const stream = fs.createReadStream(
|
||||
'./test/matter/data/_matter_history.csv'
|
||||
)
|
||||
stub.urlHandler = (ctx: ImportContext, url): Promise<void> => {
|
||||
urls.push(url)
|
||||
return Promise.resolve()
|
||||
}
|
||||
|
||||
await importMatterHistoryCsv(stub, stream)
|
||||
expect(stub.countFailed).to.equal(0)
|
||||
expect(stub.countImported).to.equal(1)
|
||||
expect(urls).to.eql([
|
||||
new URL('https://www.bloomberg.com/features/2022-the-crypto-story/'),
|
||||
])
|
||||
})
|
||||
})
|
||||
|
||||
describe('Load archive file', () => {
|
||||
it('should find the URL of each row', async () => {
|
||||
const urls: URL[] = []
|
||||
const stream = fs.createReadStream('./test/matter/data/Archive.zip')
|
||||
stub.contentHandler = (
|
||||
ctx: ImportContext,
|
||||
url: URL,
|
||||
title: string,
|
||||
originalContent: string,
|
||||
parseResult: Readability.ParseResult
|
||||
): Promise<void> => {
|
||||
urls.push(url)
|
||||
return Promise.resolve()
|
||||
}
|
||||
|
||||
await importMatterArchive(stub, stream)
|
||||
expect(stub.countFailed).to.equal(0)
|
||||
expect(stub.countImported).to.equal(1)
|
||||
expect(urls).to.eql([
|
||||
new URL('https://www.bloomberg.com/features/2022-the-crypto-story/'),
|
||||
])
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@ -1,10 +1,8 @@
|
||||
import { Readability } from '@omnivore/readability'
|
||||
import Redis from 'ioredis'
|
||||
import { ArticleSavingRequestStatus, ImportContext } from '../src'
|
||||
import { createRedisClient } from '../src/redis'
|
||||
|
||||
export const stubImportCtx = (): ImportContext => {
|
||||
const redisClient = createRedisClient(process.env.REDIS_URL)
|
||||
|
||||
export const stubImportCtx = (redisClient: Redis): ImportContext => {
|
||||
return {
|
||||
userId: '',
|
||||
countImported: 0,
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
FROM node:18.16-alpine
|
||||
FROM node:18.16
|
||||
|
||||
# Run everything after as non-privileged user.
|
||||
WORKDIR /app
|
||||
@ -10,9 +10,12 @@ COPY .prettierrc .
|
||||
COPY .eslintrc .
|
||||
|
||||
COPY /packages/text-to-speech/package.json ./packages/text-to-speech/package.json
|
||||
COPY /packages/utils/package.json ./packages/utils/package.json
|
||||
|
||||
RUN yarn install --pure-lockfile
|
||||
|
||||
ADD /packages/utils ./packages/utils
|
||||
RUN yarn workspace @omnivore/utils build
|
||||
ADD /packages/text-to-speech ./packages/text-to-speech
|
||||
RUN yarn workspace @omnivore/text-to-speech-handler build
|
||||
|
||||
|
||||
@ -36,11 +36,11 @@
|
||||
"dependencies": {
|
||||
"@google-cloud/functions-framework": "3.1.2",
|
||||
"@google-cloud/storage": "^7.0.1",
|
||||
"@omnivore/utils": "1.0.0",
|
||||
"@sentry/serverless": "^7.77.0",
|
||||
"axios": "^0.27.2",
|
||||
"dotenv": "^16.0.1",
|
||||
"html-to-text": "^8.2.1",
|
||||
"ioredis": "^5.3.2",
|
||||
"jsonwebtoken": "^8.5.1",
|
||||
"linkedom": "^0.14.12",
|
||||
"microsoft-cognitiveservices-speech-sdk": "1.30",
|
||||
|
||||
@ -4,17 +4,16 @@
|
||||
/* eslint-disable @typescript-eslint/no-unused-vars */
|
||||
|
||||
import { File, Storage } from '@google-cloud/storage'
|
||||
import { RedisDataSource } from '@omnivore/utils'
|
||||
import * as Sentry from '@sentry/serverless'
|
||||
import axios from 'axios'
|
||||
import crypto from 'crypto'
|
||||
import * as dotenv from 'dotenv' // see https://github.com/motdotla/dotenv#how-do-i-use-dotenv-with-import
|
||||
import Redis from 'ioredis'
|
||||
import * as jwt from 'jsonwebtoken'
|
||||
import { AzureTextToSpeech } from './azureTextToSpeech'
|
||||
import { endSsml, htmlToSpeechFile, startSsml } from './htmlToSsml'
|
||||
import { OpenAITextToSpeech } from './openaiTextToSpeech'
|
||||
import { RealisticTextToSpeech } from './realisticTextToSpeech'
|
||||
import { createRedisClient } from './redis'
|
||||
import {
|
||||
SpeechMark,
|
||||
TextToSpeechInput,
|
||||
@ -115,10 +114,10 @@ const updateSpeech = async (
|
||||
}
|
||||
|
||||
const getCharacterCountFromRedis = async (
|
||||
redisClient: Redis,
|
||||
redisClient: RedisDataSource,
|
||||
uid: string
|
||||
): Promise<number> => {
|
||||
const wordCount = await redisClient.get(`tts:charCount:${uid}`)
|
||||
const wordCount = await redisClient.cacheClient.get(`tts:charCount:${uid}`)
|
||||
return wordCount ? parseInt(wordCount) : 0
|
||||
}
|
||||
|
||||
@ -126,11 +125,11 @@ const getCharacterCountFromRedis = async (
|
||||
// which will be used to rate limit the request
|
||||
// expires after 1 day
|
||||
const updateCharacterCountInRedis = async (
|
||||
redisClient: Redis,
|
||||
redisClient: RedisDataSource,
|
||||
uid: string,
|
||||
wordCount: number
|
||||
) => {
|
||||
await redisClient.set(
|
||||
await redisClient.cacheClient.set(
|
||||
`tts:charCount:${uid}`,
|
||||
wordCount.toString(),
|
||||
'EX',
|
||||
@ -241,11 +240,17 @@ export const textToSpeechStreamingHandler = Sentry.GCPFunction.wrapHttpFunction(
|
||||
return res.status(401).send({ errorCode: 'UNAUTHENTICATED' })
|
||||
}
|
||||
|
||||
// create redis client
|
||||
const redisClient = createRedisClient(
|
||||
process.env.REDIS_TTS_URL,
|
||||
process.env.REDIS_TTS_CERT
|
||||
)
|
||||
// create redis source
|
||||
const redisDataSource = new RedisDataSource({
|
||||
cache: {
|
||||
url: process.env.REDIS_URL,
|
||||
cert: process.env.REDIS_CERT,
|
||||
},
|
||||
mq: {
|
||||
url: process.env.MQ_REDIS_URL,
|
||||
cert: process.env.MQ_REDIS_CERT,
|
||||
},
|
||||
})
|
||||
|
||||
try {
|
||||
const utteranceInput = req.body as UtteranceInput
|
||||
@ -267,7 +272,7 @@ export const textToSpeechStreamingHandler = Sentry.GCPFunction.wrapHttpFunction(
|
||||
|
||||
// validate character count
|
||||
const characterCount =
|
||||
(await getCharacterCountFromRedis(redisClient, claim.uid)) +
|
||||
(await getCharacterCountFromRedis(redisDataSource, claim.uid)) +
|
||||
utteranceInput.text.length
|
||||
if (characterCount > MAX_CHARACTER_COUNT) {
|
||||
return res.status(429).send('RATE_LIMITED')
|
||||
@ -284,7 +289,7 @@ export const textToSpeechStreamingHandler = Sentry.GCPFunction.wrapHttpFunction(
|
||||
// hash ssml to get the cache key
|
||||
const cacheKey = crypto.createHash('md5').update(ssml).digest('hex')
|
||||
// find audio data in cache
|
||||
const cacheResult = await redisClient.get(cacheKey)
|
||||
const cacheResult = await redisDataSource.cacheClient.get(cacheKey)
|
||||
if (cacheResult) {
|
||||
console.log('Cache hit')
|
||||
const { audioDataString, speechMarks }: CacheResult =
|
||||
@ -352,7 +357,7 @@ export const textToSpeechStreamingHandler = Sentry.GCPFunction.wrapHttpFunction(
|
||||
|
||||
const audioDataString = audioData.toString('hex')
|
||||
// save audio data to cache for 72 hours for mainly the newsletters
|
||||
await redisClient.set(
|
||||
await redisDataSource.cacheClient.set(
|
||||
cacheKey,
|
||||
JSON.stringify({ audioDataString, speechMarks }),
|
||||
'EX',
|
||||
@ -362,7 +367,11 @@ export const textToSpeechStreamingHandler = Sentry.GCPFunction.wrapHttpFunction(
|
||||
console.log('Cache saved')
|
||||
|
||||
// update character count
|
||||
await updateCharacterCountInRedis(redisClient, claim.uid, characterCount)
|
||||
await updateCharacterCountInRedis(
|
||||
redisDataSource,
|
||||
claim.uid,
|
||||
characterCount
|
||||
)
|
||||
|
||||
res.send({
|
||||
idx: utteranceInput.idx,
|
||||
@ -373,7 +382,7 @@ export const textToSpeechStreamingHandler = Sentry.GCPFunction.wrapHttpFunction(
|
||||
console.error('Text to speech streaming error:', e)
|
||||
return res.status(500).send({ errorCodes: 'SYNTHESIZER_ERROR' })
|
||||
} finally {
|
||||
await redisClient.quit()
|
||||
await redisDataSource.shutdown()
|
||||
console.log('Redis Client Disconnected')
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,34 +0,0 @@
|
||||
import { Redis } from 'ioredis'
|
||||
|
||||
export const createRedisClient = (url?: string, cert?: string) => {
|
||||
return new Redis(url || 'redis://localhost:6379', {
|
||||
connectTimeout: 10000, // 10 seconds
|
||||
tls: cert
|
||||
? {
|
||||
cert: cert.replace(/\\n/g, '\n'), // replace \n with new line
|
||||
rejectUnauthorized: false, // for self-signed certs
|
||||
}
|
||||
: undefined,
|
||||
reconnectOnError: (err) => {
|
||||
const targetErrors = [/READONLY/, /ETIMEDOUT/]
|
||||
|
||||
targetErrors.forEach((targetError) => {
|
||||
if (targetError.test(err.message)) {
|
||||
// Only reconnect when the error contains the keyword
|
||||
return true
|
||||
}
|
||||
})
|
||||
|
||||
return false
|
||||
},
|
||||
retryStrategy: (times) => {
|
||||
if (times > 10) {
|
||||
// End reconnecting after a specific number of tries and flush all commands with a individual error
|
||||
return null
|
||||
}
|
||||
|
||||
// reconnect after
|
||||
return Math.min(times * 50, 2000)
|
||||
},
|
||||
})
|
||||
}
|
||||
@ -28,9 +28,12 @@ export class RedisDataSource {
|
||||
|
||||
async shutdown(): Promise<void> {
|
||||
try {
|
||||
await this.queueRedisClient?.quit()
|
||||
await this.cacheClient?.quit()
|
||||
|
||||
if (this.queueRedisClient !== this.cacheClient) {
|
||||
await this.queueRedisClient.quit()
|
||||
}
|
||||
|
||||
console.log('redis shutdown complete')
|
||||
} catch (err) {
|
||||
console.error('error while shutting down redis', err)
|
||||
|
||||
Reference in New Issue
Block a user