diff --git a/packages/api/src/jobs/rss/refreshFeed.ts b/packages/api/src/jobs/rss/refreshFeed.ts index 06faefde2..3bd1d2624 100644 --- a/packages/api/src/jobs/rss/refreshFeed.ts +++ b/packages/api/src/jobs/rss/refreshFeed.ts @@ -2,6 +2,7 @@ import axios from 'axios' import crypto from 'crypto' import { parseHTML } from 'linkedom' import Parser, { Item } from 'rss-parser' +import { v4 as uuid } from 'uuid' import { FetchContentType } from '../../entity/subscription' import { env } from '../../env' import { ArticleSavingRequestStatus } from '../../generated/graphql' @@ -72,13 +73,14 @@ export type RssFeedItem = Item & { link: string } -interface User { +interface UserConfig { id: string folder: FolderType + libraryItemId: string } interface FetchContentTask { - users: Map // userId -> User + users: Map // userId -> User item: RssFeedItem } @@ -280,13 +282,16 @@ const addFetchContentTask = ( ) => { const url = item.link const task = fetchContentTasks.get(url) + const libraryItemId = uuid() + const userConfig = { id: userId, folder, libraryItemId } + if (!task) { fetchContentTasks.set(url, { - users: new Map([[userId, { id: userId, folder }]]), + users: new Map([[userId, userConfig]]), item, }) } else { - task.users.set(userId, { id: userId, folder }) + task.users.set(userId, userConfig) } return true @@ -319,7 +324,7 @@ const createTask = async ( } const fetchContentAndCreateItem = async ( - users: User[], + users: UserConfig[], feedUrl: string, item: RssFeedItem ) => { @@ -327,7 +332,6 @@ const fetchContentAndCreateItem = async ( users, source: 'rss-feeder', url: item.link.trim(), - saveRequestId: '', labels: [{ name: 'RSS' }], rssFeedUrl: feedUrl, savedAt: item.isoDate, diff --git a/packages/api/src/jobs/save_page.ts b/packages/api/src/jobs/save_page.ts index 02d9a2cfa..bf338a167 100644 --- a/packages/api/src/jobs/save_page.ts +++ b/packages/api/src/jobs/save_page.ts @@ -6,13 +6,18 @@ import { ArticleSavingRequestStatus, CreateLabelInput, } from '../generated/graphql' -import { redisDataSource } from '../redis_data_source' import { userRepository } from '../repository/user' import { saveFile } from '../services/save_file' import { savePage } from '../services/save_page' import { uploadFile } from '../services/upload_file' import { logError, logger } from '../utils/logger' -import { downloadFromUrl, uploadToSignedUrl } from '../utils/uploads' +import { + contentFilePath, + downloadFromBucket, + downloadFromUrl, + isFileExists, + uploadToSignedUrl, +} from '../utils/uploads' const signToken = promisify(jwt.sign) @@ -27,54 +32,19 @@ interface Data { url: string finalUrl: string articleSavingRequestId: string + title: string + contentType: string + savedAt: string state?: string labels?: CreateLabelInput[] source: string folder: string rssFeedUrl?: string - savedAt?: string publishedAt?: string taskId?: string } -interface FetchResult { - finalUrl: string - title?: string - content?: string - contentType?: string -} - -const isFetchResult = (obj: unknown): obj is FetchResult => { - return typeof obj === 'object' && obj !== null && 'finalUrl' in obj -} - -const getCachedFetchResult = async (url: string) => { - const key = `fetch-result:${url}` - if (!redisDataSource.redisClient || !redisDataSource.workerRedisClient) { - throw new Error('redis client is not initialized') - } - - let result = await redisDataSource.redisClient.get(key) - if (!result) { - logger.debug(`fetch result is not cached in cache redis ${url}`) - // fallback to worker redis client if the result is not found - result = await redisDataSource.workerRedisClient.get(key) - if (!result) { - throw new Error('fetch result is not cached') - } - } - - const fetchResult = JSON.parse(result) as unknown - if (!isFetchResult(fetchResult)) { - throw new Error('fetch result is not valid') - } - - logger.info('fetch result is cached', url) - - return fetchResult -} - const uploadPdf = async ( url: string, userId: string, @@ -160,10 +130,11 @@ export const savePageJob = async (data: Data, attemptsMade: number) => { taskId, url, finalUrl, + title, + contentType, + state, } = data - let isImported, - isSaved, - state = data.state + let isImported, isSaved logger.info('savePageJob', { userId, @@ -182,11 +153,6 @@ export const savePageJob = async (data: Data, attemptsMade: number) => { } try { - // get the fetch result from cache - const fetchedResult = await getCachedFetchResult(finalUrl) - const { title, contentType } = fetchedResult - let content = fetchedResult.content - // for pdf content, we need to upload the pdf if (contentType === 'application/pdf') { const uploadResult = await uploadPdf( @@ -219,14 +185,27 @@ export const savePageJob = async (data: Data, attemptsMade: number) => { return true } - if (!content) { - logger.info(`content is not fetched: ${finalUrl}`) - // set the state to failed if we don't have content - content = 'Failed to fetch content' - state = ArticleSavingRequestStatus.Failed + // download the original content + const filePath = contentFilePath( + userId, + articleSavingRequestId, + new Date(savedAt).getTime(), + 'original' + ) + const exists = await isFileExists(filePath) + if (!exists) { + logger.error('Original content file does not exist', { + finalUrl, + filePath, + }) + + throw new Error('Original content file does not exist') } - // for non-pdf content, we need to save the page + const content = (await downloadFromBucket(filePath)).toString() + console.log('Downloaded original content from:', filePath) + + // for non-pdf content, we need to save the content const result = await savePage( { url: finalUrl, @@ -236,10 +215,11 @@ export const savePageJob = async (data: Data, attemptsMade: number) => { state: (state as ArticleSavingRequestStatus) || undefined, labels, rssFeedUrl, - savedAt: savedAt ? new Date(savedAt) : new Date(), + savedAt, publishedAt: publishedAt ? new Date(publishedAt) : null, source, folder, + originalContentUploaded: true, }, user ) diff --git a/packages/api/src/routers/content_router.ts b/packages/api/src/routers/content_router.ts index ee17f432b..b3110d354 100644 --- a/packages/api/src/routers/content_router.ts +++ b/packages/api/src/routers/content_router.ts @@ -6,7 +6,11 @@ import { getClaimsByToken, getTokenByRequest } from '../utils/auth' import { corsConfig } from '../utils/corsConfig' import { enqueueBulkUploadContentJob } from '../utils/createTask' import { logger } from '../utils/logger' -import { generateDownloadSignedUrl, isFileExists } from '../utils/uploads' +import { + contentFilePath, + generateDownloadSignedUrl, + isFileExists, +} from '../utils/uploads' export function contentRouter() { const router = Router() @@ -58,7 +62,7 @@ export function contentRouter() { const userId = claims.uid const libraryItems = await findLibraryItemsByIds(libraryItemIds, userId, { - select: ['id', 'updatedAt'], + select: ['id', 'updatedAt', 'savedAt'], }) if (libraryItems.length === 0) { logger.error('Library items not found') @@ -68,9 +72,14 @@ export function contentRouter() { // generate signed url for each library item const data = await Promise.all( libraryItems.map(async (libraryItem) => { - const filePath = `content/${userId}/${ - libraryItem.id - }.${libraryItem.updatedAt.getTime()}.${format}` + const date = + format === 'original' ? libraryItem.savedAt : libraryItem.updatedAt + const filePath = contentFilePath( + userId, + libraryItem.id, + date.getTime(), + format + ) try { const downloadUrl = await generateDownloadSignedUrl(filePath, { diff --git a/packages/api/src/services/library_item.ts b/packages/api/src/services/library_item.ts index 1b503f5af..f62d9d2a4 100644 --- a/packages/api/src/services/library_item.ts +++ b/packages/api/src/services/library_item.ts @@ -24,7 +24,11 @@ import { Merge, PickTuple } from '../util' import { deepDelete, setRecentlySavedItemInRedis } from '../utils/helpers' import { logger } from '../utils/logger' import { parseSearchQuery } from '../utils/search' -import { downloadFromBucket, uploadToBucket } from '../utils/uploads' +import { + contentFilePath, + downloadFromBucket, + uploadToBucket, +} from '../utils/uploads' import { HighlightEvent } from './highlights' import { addLabelsToLibraryItem, LabelEvent } from './labels' @@ -1015,7 +1019,8 @@ export const createOrUpdateLibraryItem = async ( libraryItem: CreateOrUpdateLibraryItemArgs, userId: string, pubsub = createPubSubClient(), - skipPubSub = false + skipPubSub = false, + originalContentUploaded = false ): Promise => { let originalContent: string | null = null if (libraryItem.originalContent) { @@ -1098,9 +1103,14 @@ export const createOrUpdateLibraryItem = async ( const data = deepDelete(newLibraryItem, columnsToDelete) await pubsub.entityCreated(EntityType.ITEM, data, userId) - // upload original content to GCS - if (originalContent) { - await uploadOriginalContent(userId, newLibraryItem.id, originalContent) + // upload original content to GCS if it's not already uploaded + if (originalContent && !originalContentUploaded) { + await uploadOriginalContent( + userId, + newLibraryItem.id, + newLibraryItem.savedAt, + originalContent + ) logger.info('Uploaded original content to GCS', { id: newLibraryItem.id, }) @@ -1681,16 +1691,14 @@ export const filterItemEvents = ( throw new Error('Unexpected state.') } -const originalContentFilename = (userId: string, libraryItemId: string) => - `original-content/${userId}/${libraryItemId}.html` - export const uploadOriginalContent = async ( userId: string, libraryItemId: string, + savedAt: Date, originalContent: string ) => { await uploadToBucket( - originalContentFilename(userId, libraryItemId), + contentFilePath(userId, libraryItemId, savedAt.getTime(), 'original'), Buffer.from(originalContent), { public: false, @@ -1701,7 +1709,10 @@ export const uploadOriginalContent = async ( export const downloadOriginalContent = async ( userId: string, - libraryItemId: string + libraryItemId: string, + savedAt: Date ) => { - return downloadFromBucket(originalContentFilename(userId, libraryItemId)) + return downloadFromBucket( + contentFilePath(userId, libraryItemId, savedAt.getTime(), 'original') + ) } diff --git a/packages/api/src/services/save_page.ts b/packages/api/src/services/save_page.ts index c5ba13989..977e5d4bd 100644 --- a/packages/api/src/services/save_page.ts +++ b/packages/api/src/services/save_page.ts @@ -68,7 +68,12 @@ const shouldParseInBackend = (input: SavePageInput): boolean => { export type SavePageArgs = Merge< SavePageInput, - { feedContent?: string; previewImage?: string; author?: string } + { + feedContent?: string + previewImage?: string + author?: string + originalContentUploaded?: boolean + } > export const savePage = async ( @@ -145,7 +150,8 @@ export const savePage = async ( itemToSave, user.id, undefined, - isImported + isImported, + input.originalContentUploaded ) clientRequestId = newItem.id diff --git a/packages/api/src/utils/uploads.ts b/packages/api/src/utils/uploads.ts index 043a21159..a3bf80635 100644 --- a/packages/api/src/utils/uploads.ts +++ b/packages/api/src/utils/uploads.ts @@ -5,6 +5,7 @@ import axios from 'axios' import { ContentReaderType } from '../entity/library_item' import { env } from '../env' import { PageType } from '../generated/graphql' +import { ContentFormat } from '../jobs/upload_content' import { logger } from './logger' export const contentReaderForLibraryItem = ( @@ -157,13 +158,14 @@ export const isFileExists = async (filePath: string): Promise => { export const downloadFromBucket = async (filePath: string): Promise => { const file = storage.bucket(bucketName).file(filePath) - const [exists] = await file.exists() - if (!exists) { - logger.error(`File not found: ${filePath}`) - throw new Error('File not found') - } - - // Download the file contents as a string + // Download the file contents const [data] = await file.download() return data } + +export const contentFilePath = ( + userId: string, + libraryItemId: string, + timestamp: number, + format: ContentFormat +) => `content/${userId}/${libraryItemId}.${timestamp}.${format}` diff --git a/packages/content-fetch/package.json b/packages/content-fetch/package.json index f2cfed276..8836afa25 100644 --- a/packages/content-fetch/package.json +++ b/packages/content-fetch/package.json @@ -13,6 +13,7 @@ "ioredis": "^5.3.2", "posthog-node": "^3.6.3", "@google-cloud/functions-framework": "^3.0.0", + "@google-cloud/storage": "^7.0.1", "@omnivore/puppeteer-parse": "^1.0.0", "@sentry/serverless": "^7.77.0" }, diff --git a/packages/content-fetch/src/job.ts b/packages/content-fetch/src/job.ts index 0a3a884ee..de39d4bc0 100644 --- a/packages/content-fetch/src/job.ts +++ b/packages/content-fetch/src/job.ts @@ -9,6 +9,7 @@ interface SavePageJobData { url: string finalUrl: string articleSavingRequestId: string + state?: string labels?: string[] source: string @@ -17,6 +18,8 @@ interface SavePageJobData { savedAt?: string publishedAt?: string taskId?: string + title?: string + contentType?: string } interface SavePageJob { diff --git a/packages/content-fetch/src/request_handler.ts b/packages/content-fetch/src/request_handler.ts index 51db8532d..89ec14f51 100644 --- a/packages/content-fetch/src/request_handler.ts +++ b/packages/content-fetch/src/request_handler.ts @@ -1,11 +1,12 @@ +import { Storage } from '@google-cloud/storage' import { fetchContent } from '@omnivore/puppeteer-parse' import { RequestHandler } from 'express' import { analytics } from './analytics' import { queueSavePageJob } from './job' -import { redisDataSource } from './redis_data_source' -interface User { +interface UserConfig { id: string + libraryItemId: string folder?: string } @@ -23,7 +24,7 @@ interface RequestBody { savedAt?: string publishedAt?: string folder?: string - users?: User[] + users?: UserConfig[] priority: 'high' | 'low' } @@ -42,24 +43,37 @@ interface LogRecord { savedAt?: string publishedAt?: string folder?: string - users?: User[] + users?: UserConfig[] error?: string totalTime?: number } -interface FetchResult { - finalUrl: string - title?: string - content?: string - contentType?: string +const storage = process.env.GCS_UPLOAD_SA_KEY_FILE_PATH + ? new Storage({ keyFilename: process.env.GCS_UPLOAD_SA_KEY_FILE_PATH }) + : new Storage() +const bucketName = process.env.GCS_UPLOAD_BUCKET || 'omnivore-files' + +const uploadToBucket = async (filePath: string, data: string) => { + await storage + .bucket(bucketName) + .file(filePath) + .save(data, { public: false, timeout: 30000 }) } -export const cacheFetchResult = async (fetchResult: FetchResult) => { - // cache the fetch result for 24 hours - const ttl = 24 * 60 * 60 - const key = `fetch-result:${fetchResult.finalUrl}` - const value = JSON.stringify(fetchResult) - return redisDataSource.cacheClient.set(key, value, 'EX', ttl, 'NX') +const uploadOriginalContent = async ( + users: UserConfig[], + content: string, + savedTimestamp: number +) => { + await Promise.all( + users.map(async (user) => { + const filePath = `content/${user.id}/${user.libraryItemId}.${savedTimestamp}.original` + + await uploadToBucket(filePath, content) + + console.log(`Original content uploaded to ${filePath}`) + }) + ) } export const contentFetchRequestHandler: RequestHandler = async (req, res) => { @@ -76,6 +90,7 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { { id: userId, folder: body.folder, + libraryItemId: body.saveRequestId, }, ] } @@ -112,8 +127,12 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { console.log(`Article parsing request`, logRecord) try { + const savedDate = savedAt ? new Date(savedAt) : new Date() const fetchResult = await fetchContent(url, locale, timezone) - const finalUrl = fetchResult.finalUrl + const { title, content, contentType, finalUrl } = fetchResult + if (content) { + await uploadOriginalContent(users, content, savedDate.getTime()) + } const savePageJobs = users.map((user) => ({ userId: user.id, @@ -121,24 +140,23 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { userId: user.id, url, finalUrl, - articleSavingRequestId, + articleSavingRequestId: user.libraryItemId, state, labels, source, folder: user.folder, rssFeedUrl, - savedAt, + savedAt: savedDate.toISOString(), publishedAt, taskId, + title, + contentType, }, isRss: !!rssFeedUrl, isImport: !!taskId, priority, })) - const cacheResult = await cacheFetchResult(fetchResult) - console.log('cacheFetchResult result', cacheResult) - const jobs = await queueSavePageJob(savePageJobs) console.log('save-page jobs queued', jobs.length) } catch (error) {