diff --git a/packages/api/src/jobs/save_page.ts b/packages/api/src/jobs/save_page.ts index 89e554b9e..6047e5a52 100644 --- a/packages/api/src/jobs/save_page.ts +++ b/packages/api/src/jobs/save_page.ts @@ -6,13 +6,13 @@ import { ArticleSavingRequestStatus, CreateLabelInput, } from '../generated/graphql' -import { redisDataSource } from '../redis_data_source' import { userRepository } from '../repository/user' import { saveFile } from '../services/save_file' import { savePage } from '../services/save_page' import { uploadFile } from '../services/upload_file' import { logError, logger } from '../utils/logger' import { downloadFromUrl, uploadToSignedUrl } from '../utils/uploads' +import { downloadStringFromBucket } from '../utils/uploads' const signToken = promisify(jwt.sign) @@ -27,6 +27,9 @@ interface Data { url: string finalUrl: string articleSavingRequestId: string + title: string + contentType: string + state?: string labels?: CreateLabelInput[] source: string @@ -35,6 +38,7 @@ interface Data { savedAt?: string publishedAt?: string taskId?: string + contentHash?: string } interface FetchResult { @@ -120,32 +124,6 @@ const sendImportStatusUpdate = async ( } } -const getCachedFetchResult = async (url: string) => { - const key = `fetch-result:${url}` - if (!redisDataSource.redisClient || !redisDataSource.workerRedisClient) { - throw new Error('redis client is not initialized') - } - - let result = await redisDataSource.redisClient.get(key) - if (!result) { - logger.debug(`fetch result is not cached in cache redis ${url}`) - // fallback to worker redis client if the result is not found - result = await redisDataSource.workerRedisClient.get(key) - if (!result) { - throw new Error('fetch result is not cached') - } - } - - const fetchResult = JSON.parse(result) as unknown - if (!isFetchResult(fetchResult)) { - throw new Error('fetch result is not valid') - } - - logger.info('fetch result is cached', url) - - return fetchResult -} - export const savePageJob = async (data: Data, attemptsMade: number) => { const { userId, @@ -159,6 +137,9 @@ export const savePageJob = async (data: Data, attemptsMade: number) => { taskId, url, finalUrl, + title, + contentType, + contentHash, } = data let isImported, isSaved, @@ -171,11 +152,6 @@ export const savePageJob = async (data: Data, attemptsMade: number) => { finalUrl, }) - // get the fetch result from cache - const fetchedResult = await getCachedFetchResult(finalUrl) - const { title, contentType } = fetchedResult - let content = fetchedResult.content - const user = await userRepository.findById(userId) if (!user) { logger.error('Unable to save job, user can not be found.', { @@ -218,11 +194,24 @@ export const savePageJob = async (data: Data, attemptsMade: number) => { return true } - if (!content) { - logger.info(`content is not fetched: ${finalUrl}`) + let originalContent + if (!contentHash) { + logger.info(`content is not uploaded: ${finalUrl}`) // set the state to failed if we don't have content - content = 'Failed to fetch content' + originalContent = 'Failed to fetch content' state = ArticleSavingRequestStatus.Failed + } else { + // download content from the bucket + const downloaded = await downloadStringFromBucket( + `originalContent/${contentHash}` + ) + if (!downloaded) { + logger.error('error while downloading content from bucket') + originalContent = 'Failed to fetch content' + state = ArticleSavingRequestStatus.Failed + } else { + originalContent = downloaded + } } // for non-pdf content, we need to save the page @@ -231,7 +220,7 @@ export const savePageJob = async (data: Data, attemptsMade: number) => { url: finalUrl, clientRequestId: articleSavingRequestId, title, - originalContent: content, + originalContent, state: state ? (state as ArticleSavingRequestStatus) : undefined, labels: labels, rssFeedUrl, diff --git a/packages/api/src/utils/uploads.ts b/packages/api/src/utils/uploads.ts index a60240c44..2209a9139 100644 --- a/packages/api/src/utils/uploads.ts +++ b/packages/api/src/utils/uploads.ts @@ -153,3 +153,24 @@ export const isFileExists = async (filePath: string): Promise => { const [exists] = await storage.bucket(bucketName).file(filePath).exists() return exists } + +export const downloadStringFromBucket = async ( + filePath: string +): Promise => { + try { + const bucket = storage.bucket(bucketName) + + const [exists] = await bucket.file(filePath).exists() + if (!exists) { + logger.error(`File not found: ${filePath}`) + return null + } + + // Download the file contents as a string + const [data] = await bucket.file(filePath).download() + return data.toString() + } catch (error) { + logger.info('Error downloading file:', error) + return null + } +} diff --git a/packages/content-fetch/package.json b/packages/content-fetch/package.json index f2cfed276..8836afa25 100644 --- a/packages/content-fetch/package.json +++ b/packages/content-fetch/package.json @@ -13,6 +13,7 @@ "ioredis": "^5.3.2", "posthog-node": "^3.6.3", "@google-cloud/functions-framework": "^3.0.0", + "@google-cloud/storage": "^7.0.1", "@omnivore/puppeteer-parse": "^1.0.0", "@sentry/serverless": "^7.77.0" }, diff --git a/packages/content-fetch/src/request_handler.ts b/packages/content-fetch/src/request_handler.ts index 51db8532d..51283e497 100644 --- a/packages/content-fetch/src/request_handler.ts +++ b/packages/content-fetch/src/request_handler.ts @@ -1,8 +1,9 @@ +import { Storage } from '@google-cloud/storage' import { fetchContent } from '@omnivore/puppeteer-parse' +import crypto from 'crypto' import { RequestHandler } from 'express' import { analytics } from './analytics' import { queueSavePageJob } from './job' -import { redisDataSource } from './redis_data_source' interface User { id: string @@ -47,20 +48,20 @@ interface LogRecord { totalTime?: number } -interface FetchResult { - finalUrl: string - title?: string - content?: string - contentType?: string +const storage = process.env.GCS_UPLOAD_SA_KEY_FILE_PATH + ? new Storage({ keyFilename: process.env.GCS_UPLOAD_SA_KEY_FILE_PATH }) + : new Storage() +const bucketName = process.env.GCS_UPLOAD_BUCKET || 'omnivore-files' + +export const uploadToBucket = async (filename: string, data: string) => { + await storage + .bucket(bucketName) + .file(`originalContent/${filename}`) + .save(data, { public: false, timeout: 30000 }) } -export const cacheFetchResult = async (fetchResult: FetchResult) => { - // cache the fetch result for 24 hours - const ttl = 24 * 60 * 60 - const key = `fetch-result:${fetchResult.finalUrl}` - const value = JSON.stringify(fetchResult) - return redisDataSource.cacheClient.set(key, value, 'EX', ttl, 'NX') -} +const hash = (content: string) => + crypto.createHash('md5').update(content).digest('hex') export const contentFetchRequestHandler: RequestHandler = async (req, res) => { const functionStartTime = Date.now() @@ -114,6 +115,15 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { try { const fetchResult = await fetchContent(url, locale, timezone) const finalUrl = fetchResult.finalUrl + let contentHash: string | undefined + + const content = fetchResult.content + if (content) { + // hash content to use as key + contentHash = hash(content) + await uploadToBucket(contentHash, content) + console.log('content uploaded to bucket', contentHash) + } const savePageJobs = users.map((user) => ({ userId: user.id, @@ -130,15 +140,15 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { savedAt, publishedAt, taskId, + title: fetchResult.title, + contentType: fetchResult.contentType, + contentHash, }, isRss: !!rssFeedUrl, isImport: !!taskId, priority, })) - const cacheResult = await cacheFetchResult(fetchResult) - console.log('cacheFetchResult result', cacheResult) - const jobs = await queueSavePageJob(savePageJobs) console.log('save-page jobs queued', jobs.length) } catch (error) {