From cce5f2463d0144e7ca38b8d124798d112e5bb16d Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Wed, 8 May 2024 11:37:51 +0800 Subject: [PATCH] still use redis for cache --- packages/api/src/jobs/rss/refreshFeed.ts | 2 +- packages/api/src/jobs/save_page.ts | 97 ++++++++++++++++--- packages/api/src/resolvers/article/index.ts | 1 + packages/api/src/services/library_item.ts | 43 ++++---- packages/api/src/services/recommendation.ts | 1 + packages/api/src/services/save_email.ts | 1 + packages/api/src/services/save_page.ts | 4 + packages/content-fetch/package.json | 1 - packages/content-fetch/src/request_handler.ts | 49 +++------- 9 files changed, 136 insertions(+), 63 deletions(-) diff --git a/packages/api/src/jobs/rss/refreshFeed.ts b/packages/api/src/jobs/rss/refreshFeed.ts index 4b7995d5c..06faefde2 100644 --- a/packages/api/src/jobs/rss/refreshFeed.ts +++ b/packages/api/src/jobs/rss/refreshFeed.ts @@ -381,7 +381,7 @@ const createItemWithFeedContent = async ( rssFeedUrl: feedUrl, savedAt: item.isoDate, publishedAt: item.isoDate, - originalContent: '', + originalContent: feedContent || '', source: 'rss-feeder', state: ArticleSavingRequestStatus.ContentNotFetched, clientRequestId: '', diff --git a/packages/api/src/jobs/save_page.ts b/packages/api/src/jobs/save_page.ts index 07869306d..24d77ae8e 100644 --- a/packages/api/src/jobs/save_page.ts +++ b/packages/api/src/jobs/save_page.ts @@ -6,8 +6,8 @@ import { ArticleSavingRequestStatus, CreateLabelInput, } from '../generated/graphql' +import { redisDataSource } from '../redis_data_source' import { userRepository } from '../repository/user' -import { downloadOriginalContent } from '../services/library_item' import { saveFile } from '../services/save_file' import { savePage } from '../services/save_page' import { uploadFile } from '../services/upload_file' @@ -27,8 +27,6 @@ interface Data { url: string finalUrl: string articleSavingRequestId: string - title: string - contentType: string state?: string labels?: CreateLabelInput[] @@ -38,7 +36,76 @@ interface Data { savedAt?: string publishedAt?: string taskId?: string - urlHash?: string +} + +interface FetchResult { + finalUrl: string + title?: string + content?: string + contentType?: string +} + +const isFetchResult = (obj: unknown): obj is FetchResult => { + return typeof obj === 'object' && obj !== null && 'finalUrl' in obj +} + +const uploadToSignedUrl = async ( + uploadSignedUrl: string, + contentType: string, + contentObjUrl: string +) => { + const maxContentLength = 10 * 1024 * 1024 // 10MB + + logger.info('downloading content', { + contentObjUrl, + }) + + // download the content as stream and max 10MB + const response = await axios.get(contentObjUrl, { + responseType: 'stream', + maxContentLength, + timeout: REQUEST_TIMEOUT, + }) + + logger.info('uploading to signed url', { + uploadSignedUrl, + contentType, + }) + + // upload the stream to the signed url + await axios.put(uploadSignedUrl, response.data, { + headers: { + 'Content-Type': contentType, + }, + maxBodyLength: maxContentLength, + timeout: REQUEST_TIMEOUT, + }) +} + +const getCachedFetchResult = async (url: string) => { + const key = `fetch-result:${url}` + if (!redisDataSource.redisClient || !redisDataSource.workerRedisClient) { + throw new Error('redis client is not initialized') + } + + let result = await redisDataSource.redisClient.get(key) + if (!result) { + logger.debug(`fetch result is not cached in cache redis ${url}`) + // fallback to worker redis client if the result is not found + result = await redisDataSource.workerRedisClient.get(key) + if (!result) { + throw new Error('fetch result is not cached') + } + } + + const fetchResult = JSON.parse(result) as unknown + if (!isFetchResult(fetchResult)) { + throw new Error('fetch result is not valid') + } + + logger.info('fetch result is cached', url) + + return fetchResult } const uploadPdf = async ( @@ -126,11 +193,10 @@ export const savePageJob = async (data: Data, attemptsMade: number) => { taskId, url, finalUrl, - title, - contentType, - state, } = data - let isImported, isSaved + let isImported, + isSaved, + state = data.state logger.info('savePageJob', { userId, @@ -149,6 +215,11 @@ export const savePageJob = async (data: Data, attemptsMade: number) => { } try { + // get the fetch result from cache + const fetchedResult = await getCachedFetchResult(finalUrl) + const { title, contentType } = fetchedResult + let content = fetchedResult.content + // for pdf content, we need to upload the pdf if (contentType === 'application/pdf') { const uploadResult = await uploadPdf( @@ -181,8 +252,12 @@ export const savePageJob = async (data: Data, attemptsMade: number) => { return true } - // download content from the bucket - const originalContent = (await downloadOriginalContent(finalUrl)).toString() + if (!content) { + logger.info(`content is not fetched: ${finalUrl}`) + // set the state to failed if we don't have content + content = 'Failed to fetch content' + state = ArticleSavingRequestStatus.Failed + } // for non-pdf content, we need to save the page const result = await savePage( @@ -190,7 +265,7 @@ export const savePageJob = async (data: Data, attemptsMade: number) => { url: finalUrl, clientRequestId: articleSavingRequestId, title, - originalContent, + originalContent: content, state: (state as ArticleSavingRequestStatus) || undefined, labels, rssFeedUrl, diff --git a/packages/api/src/resolvers/article/index.ts b/packages/api/src/resolvers/article/index.ts index f1da74eef..026cb7988 100644 --- a/packages/api/src/resolvers/article/index.ts +++ b/packages/api/src/resolvers/article/index.ts @@ -302,6 +302,7 @@ export const createArticleResolver = authorized< userId: uid, slug, croppedPathname, + originalHtml: domContent, itemType, preparedDocument, uploadFileHash, diff --git a/packages/api/src/services/library_item.ts b/packages/api/src/services/library_item.ts index 0ee80ecee..1b3420c22 100644 --- a/packages/api/src/services/library_item.ts +++ b/packages/api/src/services/library_item.ts @@ -21,11 +21,7 @@ import { redisDataSource } from '../redis_data_source' import { authTrx, getColumns, queryBuilderToRawSql } from '../repository' import { libraryItemRepository } from '../repository/library_item' import { Merge, PickTuple } from '../util' -import { - deepDelete, - setRecentlySavedItemInRedis, - stringToHash, -} from '../utils/helpers' +import { deepDelete, setRecentlySavedItemInRedis } from '../utils/helpers' import { logger } from '../utils/logger' import { parseSearchQuery } from '../utils/search' import { downloadFileFromBucket, uploadToBucket } from '../utils/uploads' @@ -1021,13 +1017,13 @@ export const createOrUpdateLibraryItem = async ( pubsub = createPubSubClient(), skipPubSub = false ): Promise => { - // if (libraryItem.originalContent && !urlHash) { - // // upload original content to GCS - // await uploadContent(libraryItem.originalUrl, libraryItem.originalContent) + let originalContent: string | null = null + if (libraryItem.originalContent) { + originalContent = libraryItem.originalContent - // // remove original content - // delete libraryItem.originalContent - // } + // remove original content from the item + delete libraryItem.originalContent + } const newLibraryItem = await authTrx( async (tx) => { @@ -1102,6 +1098,14 @@ export const createOrUpdateLibraryItem = async ( const data = deepDelete(newLibraryItem, columnsToDelete) await pubsub.entityCreated(EntityType.ITEM, data, userId) + // upload original content to GCS + if (originalContent) { + await uploadOriginalContent(userId, newLibraryItem.id, originalContent) + logger.info('Uploaded original content to GCS', { + id: newLibraryItem.id, + }) + } + return newLibraryItem } @@ -1677,22 +1681,27 @@ export const filterItemEvents = ( throw new Error('Unexpected state.') } -const originalContentFilename = (originalUrl: string) => - `originalContent/${stringToHash(originalUrl)}` +const originalContentFilename = (userId: string, libraryItemId: string) => + `original-content/${userId}/${libraryItemId}.html` export const uploadOriginalContent = async ( - originalUrl: string, + userId: string, + libraryItemId: string, originalContent: string ) => { await uploadToBucket( - originalContentFilename(originalUrl), + originalContentFilename(userId, libraryItemId), Buffer.from(originalContent), { public: false, + contentType: 'text/html', } ) } -export const downloadOriginalContent = async (originalUrl: string) => { - return downloadFileFromBucket(originalContentFilename(originalUrl)) +export const downloadOriginalContent = async ( + userId: string, + libraryItemId: string +) => { + return downloadFileFromBucket(originalContentFilename(userId, libraryItemId)) } diff --git a/packages/api/src/services/recommendation.ts b/packages/api/src/services/recommendation.ts index 07d189383..9e58224f4 100644 --- a/packages/api/src/services/recommendation.ts +++ b/packages/api/src/services/recommendation.ts @@ -47,6 +47,7 @@ export const addRecommendation = async ( author: item.author, description: item.description, originalUrl: item.originalUrl, + originalContent: item.originalContent, contentReader: item.contentReader, directionality: item.directionality, itemLanguage: item.itemLanguage, diff --git a/packages/api/src/services/save_email.ts b/packages/api/src/services/save_email.ts index 020237662..699db6919 100644 --- a/packages/api/src/services/save_email.ts +++ b/packages/api/src/services/save_email.ts @@ -91,6 +91,7 @@ export const saveEmail = async ( user: { id: input.userId }, slug, readableContent: content, + originalContent: input.originalContent, description: metadata?.description || parseResult.parsedContent?.excerpt, title: input.title, author: input.author, diff --git a/packages/api/src/services/save_page.ts b/packages/api/src/services/save_page.ts index 5dab1fe1b..c5ba13989 100644 --- a/packages/api/src/services/save_page.ts +++ b/packages/api/src/services/save_page.ts @@ -124,6 +124,7 @@ export const savePage = async ( croppedPathname, parsedContent: parseResult.parsedContent, itemType: parseResult.pageType, + originalHtml: parseResult.domContent, canonicalUrl: parseResult.canonicalUrl, savedAt: input.savedAt ? new Date(input.savedAt) : new Date(), publishedAt: input.publishedAt ? new Date(input.publishedAt) : undefined, @@ -196,6 +197,7 @@ export const savePage = async ( export const parsedContentToLibraryItem = ({ url, userId, + originalHtml, itemId, parsedContent, slug, @@ -222,6 +224,7 @@ export const parsedContentToLibraryItem = ({ croppedPathname: string itemType: string parsedContent: Readability.ParseResult | null + originalHtml?: string | null itemId?: string | null title?: string | null preparedDocument?: PreparedDocumentInput | null @@ -243,6 +246,7 @@ export const parsedContentToLibraryItem = ({ id: itemId || undefined, slug, user: { id: userId }, + originalContent: originalHtml, readableContent: parsedContent?.content || '', description: parsedContent?.excerpt, title: diff --git a/packages/content-fetch/package.json b/packages/content-fetch/package.json index 8836afa25..f2cfed276 100644 --- a/packages/content-fetch/package.json +++ b/packages/content-fetch/package.json @@ -13,7 +13,6 @@ "ioredis": "^5.3.2", "posthog-node": "^3.6.3", "@google-cloud/functions-framework": "^3.0.0", - "@google-cloud/storage": "^7.0.1", "@omnivore/puppeteer-parse": "^1.0.0", "@sentry/serverless": "^7.77.0" }, diff --git a/packages/content-fetch/src/request_handler.ts b/packages/content-fetch/src/request_handler.ts index c88f688cb..51db8532d 100644 --- a/packages/content-fetch/src/request_handler.ts +++ b/packages/content-fetch/src/request_handler.ts @@ -1,9 +1,8 @@ -import { Storage } from '@google-cloud/storage' import { fetchContent } from '@omnivore/puppeteer-parse' -import crypto from 'crypto' import { RequestHandler } from 'express' import { analytics } from './analytics' import { queueSavePageJob } from './job' +import { redisDataSource } from './redis_data_source' interface User { id: string @@ -48,27 +47,20 @@ interface LogRecord { totalTime?: number } -const storage = process.env.GCS_UPLOAD_SA_KEY_FILE_PATH - ? new Storage({ keyFilename: process.env.GCS_UPLOAD_SA_KEY_FILE_PATH }) - : new Storage() -const bucketName = process.env.GCS_UPLOAD_BUCKET || 'omnivore-files' - -export const uploadToBucket = async (filename: string, data: string) => { - const file = storage.bucket(bucketName).file(`originalContent/${filename}`) - - // check if the file already exists - const [exists] = await file.exists() - - if (exists) { - console.log('file already exists', filename) - return - } - - await file.save(data, { public: false, timeout: 30000 }) +interface FetchResult { + finalUrl: string + title?: string + content?: string + contentType?: string } -const hash = (content: string) => - crypto.createHash('md5').update(content).digest('hex') +export const cacheFetchResult = async (fetchResult: FetchResult) => { + // cache the fetch result for 24 hours + const ttl = 24 * 60 * 60 + const key = `fetch-result:${fetchResult.finalUrl}` + const value = JSON.stringify(fetchResult) + return redisDataSource.cacheClient.set(key, value, 'EX', ttl, 'NX') +} export const contentFetchRequestHandler: RequestHandler = async (req, res) => { const functionStartTime = Date.now() @@ -122,15 +114,6 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { try { const fetchResult = await fetchContent(url, locale, timezone) const finalUrl = fetchResult.finalUrl - let urlHash: string | undefined - - const content = fetchResult.content - if (content) { - // hash final url to use as key - urlHash = hash(finalUrl) - await uploadToBucket(urlHash, content) - console.log('content uploaded to bucket', urlHash) - } const savePageJobs = users.map((user) => ({ userId: user.id, @@ -147,15 +130,15 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { savedAt, publishedAt, taskId, - title: fetchResult.title, - contentType: fetchResult.contentType, - urlHash, }, isRss: !!rssFeedUrl, isImport: !!taskId, priority, })) + const cacheResult = await cacheFetchResult(fetchResult) + console.log('cacheFetchResult result', cacheResult) + const jobs = await queueSavePageJob(savePageJobs) console.log('save-page jobs queued', jobs.length) } catch (error) {