use @omnivore/utils in content-fetch
This commit is contained in:
@ -26,6 +26,7 @@ COPY /packages/content-fetch/package.json ./packages/content-fetch/package.json
|
||||
COPY /packages/readabilityjs/package.json ./packages/readabilityjs/package.json
|
||||
COPY /packages/content-handler/package.json ./packages/content-handler/package.json
|
||||
COPY /packages/puppeteer-parse/package.json ./packages/puppeteer-parse/package.json
|
||||
COPY /packages/utils/package.json ./packages/utils/package.json
|
||||
|
||||
RUN yarn install --pure-lockfile
|
||||
|
||||
@ -33,6 +34,8 @@ ADD /packages/content-fetch ./packages/content-fetch
|
||||
ADD /packages/content-handler ./packages/content-handler
|
||||
ADD /packages/puppeteer-parse ./packages/puppeteer-parse
|
||||
ADD /packages/readabilityjs ./packages/readabilityjs
|
||||
ADD /packages/utils ./packages/utils
|
||||
RUN yarn workspace @omnivore/utils build
|
||||
RUN yarn workspace @omnivore/content-handler build
|
||||
RUN yarn workspace @omnivore/puppeteer-parse build
|
||||
RUN yarn workspace @omnivore/content-fetch build
|
||||
|
||||
@ -30,6 +30,7 @@ COPY /packages/content-fetch/package.json ./packages/content-fetch/package.json
|
||||
COPY /packages/readabilityjs/package.json ./packages/readabilityjs/package.json
|
||||
COPY /packages/content-handler/package.json ./packages/content-handler/package.json
|
||||
COPY /packages/puppeteer-parse/package.json ./packages/puppeteer-parse/package.json
|
||||
COPY /packages/utils/package.json ./packages/utils/package.json
|
||||
|
||||
RUN yarn install --pure-lockfile
|
||||
|
||||
@ -37,6 +38,8 @@ ADD /packages/content-handler ./packages/content-handler
|
||||
ADD /packages/puppeteer-parse ./packages/puppeteer-parse
|
||||
ADD /packages/content-fetch ./packages/content-fetch
|
||||
ADD /packages/readabilityjs ./packages/readabilityjs
|
||||
ADD /packages/utils ./packages/utils
|
||||
RUN yarn workspace @omnivore/utils build
|
||||
RUN yarn workspace @omnivore/content-handler build
|
||||
RUN yarn workspace @omnivore/puppeteer-parse build
|
||||
RUN yarn workspace @omnivore/content-fetch build
|
||||
|
||||
@ -10,11 +10,11 @@
|
||||
"bullmq": "^5.1.1",
|
||||
"dotenv": "^8.2.0",
|
||||
"express": "^4.17.1",
|
||||
"ioredis": "^5.3.2",
|
||||
"posthog-node": "^3.6.3",
|
||||
"@google-cloud/functions-framework": "^3.0.0",
|
||||
"@google-cloud/storage": "^7.0.1",
|
||||
"@omnivore/puppeteer-parse": "^1.0.0",
|
||||
"@omnivore/utils": "1.0.0",
|
||||
"@sentry/serverless": "^7.77.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
@ -27,7 +27,8 @@
|
||||
"lint": "eslint src --ext ts,js,tsx,jsx",
|
||||
"build": "tsc",
|
||||
"start": "node build/src/app.js",
|
||||
"start_gcf": "functions-framework --port=9090 --target=puppeteer"
|
||||
"start_gcf": "functions-framework --port=9090 --target=puppeteer",
|
||||
"dev": "concurrently \"tsc -w\" \"nodemon --watch ./build/ --exec npm run start\""
|
||||
},
|
||||
"volta": {
|
||||
"extends": "../../package.json"
|
||||
|
||||
@ -1,6 +1,5 @@
|
||||
import { HttpFunction } from '@google-cloud/functions-framework'
|
||||
import * as Sentry from '@sentry/serverless'
|
||||
import 'dotenv/config'
|
||||
import { contentFetchRequestHandler } from './request_handler'
|
||||
|
||||
Sentry.GCPFunction.init({
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import { RedisDataSource } from '@omnivore/utils'
|
||||
import { BulkJobOptions, Queue } from 'bullmq'
|
||||
import { redisDataSource } from './redis_data_source'
|
||||
|
||||
const QUEUE_NAME = 'omnivore-backend-queue'
|
||||
const JOB_NAME = 'save-page'
|
||||
@ -31,10 +31,6 @@ interface SavePageJob {
|
||||
priority: 'low' | 'high'
|
||||
}
|
||||
|
||||
const queue = new Queue(QUEUE_NAME, {
|
||||
connection: redisDataSource.queueRedisClient,
|
||||
})
|
||||
|
||||
const getPriority = (job: SavePageJob): number => {
|
||||
// we want to prioritized jobs by the expected time to complete
|
||||
// lower number means higher priority
|
||||
@ -72,7 +68,10 @@ const getOpts = (job: SavePageJob): BulkJobOptions => {
|
||||
}
|
||||
}
|
||||
|
||||
export const queueSavePageJob = async (savePageJobs: SavePageJob[]) => {
|
||||
export const queueSavePageJob = async (
|
||||
redisDataSource: RedisDataSource,
|
||||
savePageJobs: SavePageJob[]
|
||||
) => {
|
||||
const jobs = savePageJobs.map((job) => ({
|
||||
name: JOB_NAME,
|
||||
data: job.data,
|
||||
@ -80,5 +79,9 @@ export const queueSavePageJob = async (savePageJobs: SavePageJob[]) => {
|
||||
}))
|
||||
console.log('queue save page jobs:', JSON.stringify(jobs, null, 2))
|
||||
|
||||
const queue = new Queue(QUEUE_NAME, {
|
||||
connection: redisDataSource.queueRedisClient,
|
||||
})
|
||||
|
||||
return queue.addBulk(jobs)
|
||||
}
|
||||
|
||||
@ -1,91 +0,0 @@
|
||||
import Redis, { RedisOptions } from 'ioredis'
|
||||
|
||||
type RedisClientType = 'cache' | 'mq'
|
||||
type RedisDataSourceOption = {
|
||||
url?: string
|
||||
cert?: string
|
||||
}
|
||||
export type RedisDataSourceOptions = {
|
||||
[key in RedisClientType]: RedisDataSourceOption
|
||||
}
|
||||
|
||||
export class RedisDataSource {
|
||||
options: RedisDataSourceOptions
|
||||
|
||||
cacheClient: Redis
|
||||
queueRedisClient: Redis
|
||||
|
||||
constructor(options: RedisDataSourceOptions) {
|
||||
this.options = options
|
||||
|
||||
const cacheClient = createIORedisClient('cache', this.options)
|
||||
if (!cacheClient) throw 'Error initializing cache redis client'
|
||||
|
||||
this.cacheClient = cacheClient
|
||||
this.queueRedisClient =
|
||||
createIORedisClient('mq', this.options) || this.cacheClient // if mq is not defined, use cache
|
||||
}
|
||||
|
||||
async shutdown(): Promise<void> {
|
||||
try {
|
||||
await this.queueRedisClient?.quit()
|
||||
await this.cacheClient?.quit()
|
||||
} catch (err) {
|
||||
console.error('error while shutting down redis', err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const createIORedisClient = (
|
||||
name: RedisClientType,
|
||||
options: RedisDataSourceOptions
|
||||
): Redis | undefined => {
|
||||
const option = options[name]
|
||||
const redisURL = option.url
|
||||
if (!redisURL) {
|
||||
console.log(`no redisURL supplied: ${name}`)
|
||||
return undefined
|
||||
}
|
||||
|
||||
const redisCert = option.cert
|
||||
const tls =
|
||||
redisURL.startsWith('rediss://') && redisCert
|
||||
? {
|
||||
ca: redisCert,
|
||||
rejectUnauthorized: false,
|
||||
}
|
||||
: undefined
|
||||
|
||||
const redisOptions: RedisOptions = {
|
||||
tls,
|
||||
name,
|
||||
connectTimeout: 10000,
|
||||
maxRetriesPerRequest: null,
|
||||
offlineQueue: false,
|
||||
}
|
||||
return new Redis(redisURL, redisOptions)
|
||||
}
|
||||
|
||||
export const redisDataSource = new RedisDataSource({
|
||||
cache: {
|
||||
url: process.env.REDIS_URL,
|
||||
cert: process.env.REDIS_CERT,
|
||||
},
|
||||
mq: {
|
||||
url: process.env.MQ_REDIS_URL,
|
||||
cert: process.env.MQ_REDIS_CERT,
|
||||
},
|
||||
})
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-misused-promises
|
||||
process.on('SIGINT', async () => {
|
||||
console.log('SIGINT signal received.')
|
||||
|
||||
try {
|
||||
await redisDataSource.shutdown()
|
||||
} catch (error) {
|
||||
console.error('error while shutting down redis', error)
|
||||
}
|
||||
|
||||
process.exit(0)
|
||||
})
|
||||
@ -1,9 +1,10 @@
|
||||
import { Storage } from '@google-cloud/storage'
|
||||
import { fetchContent } from '@omnivore/puppeteer-parse'
|
||||
import { RedisDataSource } from '@omnivore/utils'
|
||||
import 'dotenv/config'
|
||||
import { RequestHandler } from 'express'
|
||||
import { analytics } from './analytics'
|
||||
import { queueSavePageJob } from './job'
|
||||
import { redisDataSource } from './redis_data_source'
|
||||
|
||||
interface UserConfig {
|
||||
id: string
|
||||
@ -92,6 +93,7 @@ const isFetchResult = (obj: unknown): obj is FetchResult => {
|
||||
}
|
||||
|
||||
export const cacheFetchResult = async (
|
||||
redisDataSource: RedisDataSource,
|
||||
key: string,
|
||||
fetchResult: FetchResult
|
||||
) => {
|
||||
@ -102,6 +104,7 @@ export const cacheFetchResult = async (
|
||||
}
|
||||
|
||||
const getCachedFetchResult = async (
|
||||
redisDataSource: RedisDataSource,
|
||||
key: string
|
||||
): Promise<FetchResult | undefined> => {
|
||||
const result = await redisDataSource.cacheClient.get(key)
|
||||
@ -171,9 +174,21 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
|
||||
|
||||
console.log(`Article parsing request`, logRecord)
|
||||
|
||||
// create redis source
|
||||
const redisDataSource = new RedisDataSource({
|
||||
cache: {
|
||||
url: process.env.REDIS_URL,
|
||||
cert: process.env.REDIS_CERT,
|
||||
},
|
||||
mq: {
|
||||
url: process.env.MQ_REDIS_URL,
|
||||
cert: process.env.MQ_REDIS_CERT,
|
||||
},
|
||||
})
|
||||
|
||||
try {
|
||||
const key = cacheKey(url, locale, timezone)
|
||||
let fetchResult = await getCachedFetchResult(key)
|
||||
let fetchResult = await getCachedFetchResult(redisDataSource, key)
|
||||
if (!fetchResult) {
|
||||
console.log(
|
||||
'fetch result not found in cache, fetching content now...',
|
||||
@ -184,7 +199,11 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
|
||||
console.log('content has been fetched')
|
||||
|
||||
if (fetchResult.content) {
|
||||
const cacheResult = await cacheFetchResult(key, fetchResult)
|
||||
const cacheResult = await cacheFetchResult(
|
||||
redisDataSource,
|
||||
key,
|
||||
fetchResult
|
||||
)
|
||||
console.log('cache result', cacheResult)
|
||||
}
|
||||
}
|
||||
@ -219,7 +238,7 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
|
||||
priority,
|
||||
}))
|
||||
|
||||
const jobs = await queueSavePageJob(savePageJobs)
|
||||
const jobs = await queueSavePageJob(redisDataSource, savePageJobs)
|
||||
console.log('save-page jobs queued', jobs.length)
|
||||
} catch (error) {
|
||||
if (error instanceof Error) {
|
||||
@ -246,6 +265,8 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
await redisDataSource.shutdown()
|
||||
}
|
||||
|
||||
res.sendStatus(200)
|
||||
|
||||
@ -41,7 +41,6 @@
|
||||
"axios": "^0.27.2",
|
||||
"dotenv": "^16.0.1",
|
||||
"html-to-text": "^8.2.1",
|
||||
"ioredis": "^5.3.2",
|
||||
"jsonwebtoken": "^8.5.1",
|
||||
"linkedom": "^0.14.12",
|
||||
"microsoft-cognitiveservices-speech-sdk": "1.30",
|
||||
|
||||
@ -382,7 +382,7 @@ export const textToSpeechStreamingHandler = Sentry.GCPFunction.wrapHttpFunction(
|
||||
console.error('Text to speech streaming error:', e)
|
||||
return res.status(500).send({ errorCodes: 'SYNTHESIZER_ERROR' })
|
||||
} finally {
|
||||
await redisDataSource.cacheClient.quit()
|
||||
await redisDataSource.shutdown()
|
||||
console.log('Redis Client Disconnected')
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user