diff --git a/packages/api/src/jobs/save_page.ts b/packages/api/src/jobs/save_page.ts index b1431d74f..bd28840be 100644 --- a/packages/api/src/jobs/save_page.ts +++ b/packages/api/src/jobs/save_page.ts @@ -1,8 +1,8 @@ -import { Readability } from '@omnivore/readability' import axios from 'axios' import jwt from 'jsonwebtoken' import { promisify } from 'util' import { env } from '../env' +import { redisDataSource } from '../redis_data_source' const signToken = promisify(jwt.sign) @@ -10,15 +10,12 @@ const IMPORTER_METRICS_COLLECTOR_URL = env.queue.importerMetricsUrl const JWT_SECRET = env.server.jwtSecret const REST_BACKEND_ENDPOINT = `${env.server.internalApiUrl}/api` +const MAX_ATTEMPTS = 2 const REQUEST_TIMEOUT = 30000 // 30 seconds interface Data { userId: string url: string - title: string - content: string - contentType: string - readabilityResult?: Readability.ParseResult articleSavingRequestId: string state?: string labels?: string[] @@ -63,6 +60,23 @@ interface SavePageResponse { } } +interface FetchResult { + finalUrl: string + title: string + content?: string + contentType?: string + readabilityResult?: unknown +} + +const isFetchResult = (obj: unknown): obj is FetchResult => { + return ( + typeof obj === 'object' && + obj !== null && + 'finalUrl' in obj && + 'title' in obj + ) +} + const uploadToSignedUrl = async ( uploadSignedUrl: string, contentType: string, @@ -277,7 +291,7 @@ const sendSavePageMutation = async (userId: string, input: unknown) => { const sendImportStatusUpdate = async ( userId: string, taskId: string, - isContentParsed: boolean + isImported?: boolean ) => { try { const auth = await signToken({ uid: userId }, JWT_SECRET) @@ -286,7 +300,7 @@ const sendImportStatusUpdate = async ( IMPORTER_METRICS_COLLECTOR_URL, { taskId, - status: isContentParsed ? 'imported' : 'failed', + status: isImported ? 'imported' : 'failed', }, { headers: { @@ -301,12 +315,30 @@ const sendImportStatusUpdate = async ( } } -export const savePageJob = async (data: Data) => { +const getCachedFetchResult = async (url: string) => { + const key = `fetch-result:${url}` + if (!redisDataSource.redisClient) { + throw new Error('redis client is not initialized') + } + + const result = await redisDataSource.redisClient.get(key) + if (!result) { + throw new Error('fetch result is not cached') + } + + const fetchResult = JSON.parse(result) as unknown + if (!isFetchResult(fetchResult)) { + throw new Error('fetch result is not valid') + } + + console.log('fetch result is cached', url) + + return fetchResult +} + +export const savePageJob = async (data: Data, attemptsMade: number) => { const { userId, - title, - content, - readabilityResult, articleSavingRequestId, state, labels, @@ -317,12 +349,17 @@ export const savePageJob = async (data: Data) => { publishedAt, taskId, } = data - let isContentParsed = true + let isImported, isSaved try { const url = encodeURI(data.url) - if (data.contentType === 'application/pdf') { + // get the fetch result from cache + const { title, content, contentType, readabilityResult } = + await getCachedFetchResult(url) + + // for pdf content, we need to upload the pdf + if (contentType === 'application/pdf') { const uploadFileId = await uploadPdf(url, userId, articleSavingRequestId) const uploadedPdf = await sendCreateArticleMutation(userId, { url, @@ -339,42 +376,56 @@ export const savePageJob = async (data: Data) => { if (!uploadedPdf) { throw new Error('error while saving uploaded pdf') } - } else { - const apiResponse = await sendSavePageMutation(userId, { - url, - clientRequestId: articleSavingRequestId, - title, - originalContent: content, - parseResult: readabilityResult, - state, - labels, - rssFeedUrl, - savedAt, - publishedAt, - source, - folder, - }) - if (!apiResponse) { - throw new Error('error while saving page') - } - // if ('error' in apiResponse && apiResponse.error === 'UNAUTHORIZED') { - // console.log('user is deleted', userId) - // return true - // } - // if the readability result is not parsed, the import is failed - if (!readabilityResult) { - isContentParsed = false - } + isSaved = true + isImported = true + return true } + + // for non-pdf content, we need to save the page + const apiResponse = await sendSavePageMutation(userId, { + url, + clientRequestId: articleSavingRequestId, + title, + originalContent: content, + parseResult: readabilityResult, + state, + labels, + rssFeedUrl, + savedAt, + publishedAt, + source, + folder, + }) + if (!apiResponse) { + throw new Error('error while saving page') + } + + if ('error' in apiResponse && apiResponse.error === 'UNAUTHORIZED') { + console.log('user is deleted', userId) + return false + } + + // if the readability result is not parsed, the import is failed + isImported = !!readabilityResult + isSaved = true } catch (e) { - console.error('error while saving page', e) - isContentParsed = false - return false + if (e instanceof Error) { + console.error('error while saving page', e.message) + } else { + console.error('error while saving page', 'unknown error') + } + + throw e } finally { - // send import status to update the metrics for importer - if (taskId) { - await sendImportStatusUpdate(userId, taskId, isContentParsed) + const lastAttempt = attemptsMade === MAX_ATTEMPTS - 1 + if (lastAttempt) { + console.log('last attempt reached', data.url) + } + + if (taskId && (isSaved || lastAttempt)) { + // send import status to update the metrics for importer + await sendImportStatusUpdate(userId, taskId, isImported) } } diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index ce98b3ceb..d835aecc3 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -9,8 +9,8 @@ import { appDataSource } from './data_source' import { env } from './env' import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds' import { refreshFeed } from './jobs/rss/refreshFeed' -import { redisDataSource } from './redis_data_source' import { savePageJob } from './jobs/save_page' +import { redisDataSource } from './redis_data_source' import { CustomTypeOrmLogger } from './utils/logger' export const QUEUE_NAME = 'omnivore-backend-queue' @@ -78,7 +78,7 @@ const main = async () => { return await refreshFeed(job.data) } case 'save-page': { - return savePageJob(job.data) + return savePageJob(job.data, job.attemptsMade) } } return true diff --git a/packages/api/src/resolvers/subscriptions/index.ts b/packages/api/src/resolvers/subscriptions/index.ts index 493c0fb31..ab785b73a 100644 --- a/packages/api/src/resolvers/subscriptions/index.ts +++ b/packages/api/src/resolvers/subscriptions/index.ts @@ -193,15 +193,8 @@ export const subscribeResolver = authorized< env: env.server.apiEnv, }, }) - - // validate rss feed - const feed = await parseFeed(input.url) - if (!feed) { - return { - errorCodes: [SubscribeErrorCode.NotFound], - } - } - const feedUrl = feed.url + // use user provided url + const feedUrl = input.url try { validateUrl(feedUrl) } catch (error) { @@ -212,6 +205,14 @@ export const subscribeResolver = authorized< } } + // validate rss feed + const feed = await parseFeed(feedUrl) + if (!feed) { + return { + errorCodes: [SubscribeErrorCode.NotFound], + } + } + // find existing subscription const existingSubscription = await getRepository(Subscription).findOneBy({ url: In([feedUrl, input.url]), // check both user provided url and parsed url diff --git a/packages/content-fetch/src/job.ts b/packages/content-fetch/src/job.ts index c883ad4d0..b03ed9469 100644 --- a/packages/content-fetch/src/job.ts +++ b/packages/content-fetch/src/job.ts @@ -1,11 +1,10 @@ import { BulkJobOptions, Queue } from 'bullmq' -import { redis } from './redis' +import { redisDataSource } from './redis_data_source' const QUEUE_NAME = 'omnivore-backend-queue' const JOB_NAME = 'save-page' interface savePageJob { - url: string userId: string data: unknown isRss: boolean @@ -13,7 +12,7 @@ interface savePageJob { } const queue = new Queue(QUEUE_NAME, { - connection: redis, + connection: redisDataSource.queueRedisClient, }) const getPriority = (job: savePageJob): number => { diff --git a/packages/content-fetch/src/redis.ts b/packages/content-fetch/src/redis.ts deleted file mode 100644 index a32d4c92e..000000000 --- a/packages/content-fetch/src/redis.ts +++ /dev/null @@ -1,29 +0,0 @@ -/* eslint-disable @typescript-eslint/no-misused-promises */ -import { Redis } from 'ioredis' - -const url = process.env.REDIS_URL -const cert = process.env.REDIS_CERT?.replace(/\\n/g, '\n') // replace \n with new line - -export const redis = new Redis(url || 'redis://localhost:6379', { - connectTimeout: 10000, // 10 seconds - tls: cert - ? { - cert, - rejectUnauthorized: false, // for self-signed certs - } - : undefined, - maxRetriesPerRequest: null, - offlineQueue: false, -}) - -redis.on('connect', () => { - console.log('Redis connected') -}) - -redis.on('error', (err) => { - console.error('Redis error', err) -}) - -redis.on('close', () => { - console.log('Redis closed') -}) diff --git a/packages/content-fetch/src/redis_data_source.ts b/packages/content-fetch/src/redis_data_source.ts new file mode 100644 index 000000000..1a95f9d6e --- /dev/null +++ b/packages/content-fetch/src/redis_data_source.ts @@ -0,0 +1,88 @@ +import Redis, { RedisOptions } from 'ioredis' + +export type RedisDataSourceOptions = { + REDIS_URL?: string + REDIS_CERT?: string +} + +export class RedisDataSource { + options: RedisDataSourceOptions + + cacheClient: Redis + queueRedisClient: Redis + + constructor(options: RedisDataSourceOptions) { + this.options = options + + this.cacheClient = createRedisClient('cache', this.options) + this.queueRedisClient = createRedisClient('queue', this.options) + } + + setOptions(options: RedisDataSourceOptions): void { + this.options = options + } + + async shutdown(): Promise { + try { + await this.queueRedisClient?.quit() + await this.cacheClient?.quit() + } catch (err) { + console.error('error while shutting down redis', err) + } + } +} + +const createRedisClient = (name: string, options: RedisDataSourceOptions) => { + const redisURL = options.REDIS_URL + const cert = options.REDIS_CERT?.replace(/\\n/g, '\n') // replace \n with new line + if (!redisURL) { + throw 'Error: no redisURL supplied' + } + + const redisOptions: RedisOptions = { + name, + connectTimeout: 10000, // 10 seconds + tls: cert + ? { + cert, + rejectUnauthorized: false, // for self-signed certs + } + : undefined, + maxRetriesPerRequest: null, + offlineQueue: false, + } + + const redis = new Redis(redisURL, redisOptions) + + redis.on('connect', () => { + console.log('Redis connected', name) + }) + + redis.on('error', (err) => { + console.error('Redis error', err, name) + }) + + redis.on('close', () => { + console.log('Redis closed', name) + }) + + return redis +} + +export const redisDataSource = new RedisDataSource({ + REDIS_URL: process.env.REDIS_URL, + REDIS_CERT: process.env.REDIS_CERT, +}) + +// eslint-disable-next-line @typescript-eslint/no-misused-promises +process.on('SIGINT', async () => { + console.log('SIGINT signal received.') + + try { + await redisDataSource.shutdown() + } catch (error) { + console.error('error while shutting down redis', error) + } + + process.exit(0) +}) diff --git a/packages/content-fetch/src/request_handler.ts b/packages/content-fetch/src/request_handler.ts index 66acc1a37..9672344d3 100644 --- a/packages/content-fetch/src/request_handler.ts +++ b/packages/content-fetch/src/request_handler.ts @@ -1,6 +1,7 @@ import { fetchContent } from '@omnivore/puppeteer-parse' import { RequestHandler } from 'express' import { queueSavePageJob } from './job' +import { redisDataSource } from './redis_data_source' interface User { id: string @@ -44,7 +45,21 @@ interface LogRecord { totalTime?: number } -// const MAX_RETRY_COUNT = process.env.MAX_RETRY_COUNT || '1' +interface FetchResult { + finalUrl: string + title: string + content?: string + contentType?: string + readabilityResult?: unknown +} + +export const cacheFetchResult = async (fetchResult: FetchResult) => { + // cache the fetch result for 4 hours + const ttl = 4 * 60 * 60 + const key = `fetch-result:${fetchResult.finalUrl}` + const value = JSON.stringify(fetchResult) + return redisDataSource.cacheClient.set(key, value, 'EX', ttl, 'NX') +} export const contentFetchRequestHandler: RequestHandler = async (req, res) => { const functionStartTime = Date.now() @@ -99,21 +114,12 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { try { const fetchResult = await fetchContent(url, locale, timezone) const finalUrl = fetchResult.finalUrl - const title = fetchResult.title - const content = fetchResult.content - const contentType = fetchResult.contentType - const readabilityResult = fetchResult.readabilityResult as unknown const savePageJobs = users.map((user) => ({ - url: finalUrl, userId: user.id, data: { userId: user.id, url: finalUrl, - title, - content, - contentType, - readabilityResult, articleSavingRequestId, state, labels, @@ -128,12 +134,11 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { isImport: !!taskId, })) - const result = await queueSavePageJob(savePageJobs) - console.log('queueSavePageJob result', result) - if (!result) { - logRecord.error = 'error while queueing save page job' - return res.sendStatus(500) - } + const cacheResult = await cacheFetchResult(fetchResult) + console.log('cacheFetchResult result', cacheResult) + + const jobs = await queueSavePageJob(savePageJobs) + console.log('save-page jobs queued', jobs.length) } catch (error) { if (error instanceof Error) { logRecord.error = error.message