still use redis for cache
This commit is contained in:
@ -381,7 +381,7 @@ const createItemWithFeedContent = async (
|
||||
rssFeedUrl: feedUrl,
|
||||
savedAt: item.isoDate,
|
||||
publishedAt: item.isoDate,
|
||||
originalContent: '',
|
||||
originalContent: feedContent || '',
|
||||
source: 'rss-feeder',
|
||||
state: ArticleSavingRequestStatus.ContentNotFetched,
|
||||
clientRequestId: '',
|
||||
|
||||
@ -6,8 +6,8 @@ import {
|
||||
ArticleSavingRequestStatus,
|
||||
CreateLabelInput,
|
||||
} from '../generated/graphql'
|
||||
import { redisDataSource } from '../redis_data_source'
|
||||
import { userRepository } from '../repository/user'
|
||||
import { downloadOriginalContent } from '../services/library_item'
|
||||
import { saveFile } from '../services/save_file'
|
||||
import { savePage } from '../services/save_page'
|
||||
import { uploadFile } from '../services/upload_file'
|
||||
@ -27,8 +27,6 @@ interface Data {
|
||||
url: string
|
||||
finalUrl: string
|
||||
articleSavingRequestId: string
|
||||
title: string
|
||||
contentType: string
|
||||
|
||||
state?: string
|
||||
labels?: CreateLabelInput[]
|
||||
@ -38,7 +36,76 @@ interface Data {
|
||||
savedAt?: string
|
||||
publishedAt?: string
|
||||
taskId?: string
|
||||
urlHash?: string
|
||||
}
|
||||
|
||||
interface FetchResult {
|
||||
finalUrl: string
|
||||
title?: string
|
||||
content?: string
|
||||
contentType?: string
|
||||
}
|
||||
|
||||
const isFetchResult = (obj: unknown): obj is FetchResult => {
|
||||
return typeof obj === 'object' && obj !== null && 'finalUrl' in obj
|
||||
}
|
||||
|
||||
const uploadToSignedUrl = async (
|
||||
uploadSignedUrl: string,
|
||||
contentType: string,
|
||||
contentObjUrl: string
|
||||
) => {
|
||||
const maxContentLength = 10 * 1024 * 1024 // 10MB
|
||||
|
||||
logger.info('downloading content', {
|
||||
contentObjUrl,
|
||||
})
|
||||
|
||||
// download the content as stream and max 10MB
|
||||
const response = await axios.get(contentObjUrl, {
|
||||
responseType: 'stream',
|
||||
maxContentLength,
|
||||
timeout: REQUEST_TIMEOUT,
|
||||
})
|
||||
|
||||
logger.info('uploading to signed url', {
|
||||
uploadSignedUrl,
|
||||
contentType,
|
||||
})
|
||||
|
||||
// upload the stream to the signed url
|
||||
await axios.put(uploadSignedUrl, response.data, {
|
||||
headers: {
|
||||
'Content-Type': contentType,
|
||||
},
|
||||
maxBodyLength: maxContentLength,
|
||||
timeout: REQUEST_TIMEOUT,
|
||||
})
|
||||
}
|
||||
|
||||
const getCachedFetchResult = async (url: string) => {
|
||||
const key = `fetch-result:${url}`
|
||||
if (!redisDataSource.redisClient || !redisDataSource.workerRedisClient) {
|
||||
throw new Error('redis client is not initialized')
|
||||
}
|
||||
|
||||
let result = await redisDataSource.redisClient.get(key)
|
||||
if (!result) {
|
||||
logger.debug(`fetch result is not cached in cache redis ${url}`)
|
||||
// fallback to worker redis client if the result is not found
|
||||
result = await redisDataSource.workerRedisClient.get(key)
|
||||
if (!result) {
|
||||
throw new Error('fetch result is not cached')
|
||||
}
|
||||
}
|
||||
|
||||
const fetchResult = JSON.parse(result) as unknown
|
||||
if (!isFetchResult(fetchResult)) {
|
||||
throw new Error('fetch result is not valid')
|
||||
}
|
||||
|
||||
logger.info('fetch result is cached', url)
|
||||
|
||||
return fetchResult
|
||||
}
|
||||
|
||||
const uploadPdf = async (
|
||||
@ -126,11 +193,10 @@ export const savePageJob = async (data: Data, attemptsMade: number) => {
|
||||
taskId,
|
||||
url,
|
||||
finalUrl,
|
||||
title,
|
||||
contentType,
|
||||
state,
|
||||
} = data
|
||||
let isImported, isSaved
|
||||
let isImported,
|
||||
isSaved,
|
||||
state = data.state
|
||||
|
||||
logger.info('savePageJob', {
|
||||
userId,
|
||||
@ -149,6 +215,11 @@ export const savePageJob = async (data: Data, attemptsMade: number) => {
|
||||
}
|
||||
|
||||
try {
|
||||
// get the fetch result from cache
|
||||
const fetchedResult = await getCachedFetchResult(finalUrl)
|
||||
const { title, contentType } = fetchedResult
|
||||
let content = fetchedResult.content
|
||||
|
||||
// for pdf content, we need to upload the pdf
|
||||
if (contentType === 'application/pdf') {
|
||||
const uploadResult = await uploadPdf(
|
||||
@ -181,8 +252,12 @@ export const savePageJob = async (data: Data, attemptsMade: number) => {
|
||||
return true
|
||||
}
|
||||
|
||||
// download content from the bucket
|
||||
const originalContent = (await downloadOriginalContent(finalUrl)).toString()
|
||||
if (!content) {
|
||||
logger.info(`content is not fetched: ${finalUrl}`)
|
||||
// set the state to failed if we don't have content
|
||||
content = 'Failed to fetch content'
|
||||
state = ArticleSavingRequestStatus.Failed
|
||||
}
|
||||
|
||||
// for non-pdf content, we need to save the page
|
||||
const result = await savePage(
|
||||
@ -190,7 +265,7 @@ export const savePageJob = async (data: Data, attemptsMade: number) => {
|
||||
url: finalUrl,
|
||||
clientRequestId: articleSavingRequestId,
|
||||
title,
|
||||
originalContent,
|
||||
originalContent: content,
|
||||
state: (state as ArticleSavingRequestStatus) || undefined,
|
||||
labels,
|
||||
rssFeedUrl,
|
||||
|
||||
@ -302,6 +302,7 @@ export const createArticleResolver = authorized<
|
||||
userId: uid,
|
||||
slug,
|
||||
croppedPathname,
|
||||
originalHtml: domContent,
|
||||
itemType,
|
||||
preparedDocument,
|
||||
uploadFileHash,
|
||||
|
||||
@ -21,11 +21,7 @@ import { redisDataSource } from '../redis_data_source'
|
||||
import { authTrx, getColumns, queryBuilderToRawSql } from '../repository'
|
||||
import { libraryItemRepository } from '../repository/library_item'
|
||||
import { Merge, PickTuple } from '../util'
|
||||
import {
|
||||
deepDelete,
|
||||
setRecentlySavedItemInRedis,
|
||||
stringToHash,
|
||||
} from '../utils/helpers'
|
||||
import { deepDelete, setRecentlySavedItemInRedis } from '../utils/helpers'
|
||||
import { logger } from '../utils/logger'
|
||||
import { parseSearchQuery } from '../utils/search'
|
||||
import { downloadFileFromBucket, uploadToBucket } from '../utils/uploads'
|
||||
@ -1021,13 +1017,13 @@ export const createOrUpdateLibraryItem = async (
|
||||
pubsub = createPubSubClient(),
|
||||
skipPubSub = false
|
||||
): Promise<LibraryItem> => {
|
||||
// if (libraryItem.originalContent && !urlHash) {
|
||||
// // upload original content to GCS
|
||||
// await uploadContent(libraryItem.originalUrl, libraryItem.originalContent)
|
||||
let originalContent: string | null = null
|
||||
if (libraryItem.originalContent) {
|
||||
originalContent = libraryItem.originalContent
|
||||
|
||||
// // remove original content
|
||||
// delete libraryItem.originalContent
|
||||
// }
|
||||
// remove original content from the item
|
||||
delete libraryItem.originalContent
|
||||
}
|
||||
|
||||
const newLibraryItem = await authTrx(
|
||||
async (tx) => {
|
||||
@ -1102,6 +1098,14 @@ export const createOrUpdateLibraryItem = async (
|
||||
const data = deepDelete(newLibraryItem, columnsToDelete)
|
||||
await pubsub.entityCreated<ItemEvent>(EntityType.ITEM, data, userId)
|
||||
|
||||
// upload original content to GCS
|
||||
if (originalContent) {
|
||||
await uploadOriginalContent(userId, newLibraryItem.id, originalContent)
|
||||
logger.info('Uploaded original content to GCS', {
|
||||
id: newLibraryItem.id,
|
||||
})
|
||||
}
|
||||
|
||||
return newLibraryItem
|
||||
}
|
||||
|
||||
@ -1677,22 +1681,27 @@ export const filterItemEvents = (
|
||||
throw new Error('Unexpected state.')
|
||||
}
|
||||
|
||||
const originalContentFilename = (originalUrl: string) =>
|
||||
`originalContent/${stringToHash(originalUrl)}`
|
||||
const originalContentFilename = (userId: string, libraryItemId: string) =>
|
||||
`original-content/${userId}/${libraryItemId}.html`
|
||||
|
||||
export const uploadOriginalContent = async (
|
||||
originalUrl: string,
|
||||
userId: string,
|
||||
libraryItemId: string,
|
||||
originalContent: string
|
||||
) => {
|
||||
await uploadToBucket(
|
||||
originalContentFilename(originalUrl),
|
||||
originalContentFilename(userId, libraryItemId),
|
||||
Buffer.from(originalContent),
|
||||
{
|
||||
public: false,
|
||||
contentType: 'text/html',
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
export const downloadOriginalContent = async (originalUrl: string) => {
|
||||
return downloadFileFromBucket(originalContentFilename(originalUrl))
|
||||
export const downloadOriginalContent = async (
|
||||
userId: string,
|
||||
libraryItemId: string
|
||||
) => {
|
||||
return downloadFileFromBucket(originalContentFilename(userId, libraryItemId))
|
||||
}
|
||||
|
||||
@ -47,6 +47,7 @@ export const addRecommendation = async (
|
||||
author: item.author,
|
||||
description: item.description,
|
||||
originalUrl: item.originalUrl,
|
||||
originalContent: item.originalContent,
|
||||
contentReader: item.contentReader,
|
||||
directionality: item.directionality,
|
||||
itemLanguage: item.itemLanguage,
|
||||
|
||||
@ -91,6 +91,7 @@ export const saveEmail = async (
|
||||
user: { id: input.userId },
|
||||
slug,
|
||||
readableContent: content,
|
||||
originalContent: input.originalContent,
|
||||
description: metadata?.description || parseResult.parsedContent?.excerpt,
|
||||
title: input.title,
|
||||
author: input.author,
|
||||
|
||||
@ -124,6 +124,7 @@ export const savePage = async (
|
||||
croppedPathname,
|
||||
parsedContent: parseResult.parsedContent,
|
||||
itemType: parseResult.pageType,
|
||||
originalHtml: parseResult.domContent,
|
||||
canonicalUrl: parseResult.canonicalUrl,
|
||||
savedAt: input.savedAt ? new Date(input.savedAt) : new Date(),
|
||||
publishedAt: input.publishedAt ? new Date(input.publishedAt) : undefined,
|
||||
@ -196,6 +197,7 @@ export const savePage = async (
|
||||
export const parsedContentToLibraryItem = ({
|
||||
url,
|
||||
userId,
|
||||
originalHtml,
|
||||
itemId,
|
||||
parsedContent,
|
||||
slug,
|
||||
@ -222,6 +224,7 @@ export const parsedContentToLibraryItem = ({
|
||||
croppedPathname: string
|
||||
itemType: string
|
||||
parsedContent: Readability.ParseResult | null
|
||||
originalHtml?: string | null
|
||||
itemId?: string | null
|
||||
title?: string | null
|
||||
preparedDocument?: PreparedDocumentInput | null
|
||||
@ -243,6 +246,7 @@ export const parsedContentToLibraryItem = ({
|
||||
id: itemId || undefined,
|
||||
slug,
|
||||
user: { id: userId },
|
||||
originalContent: originalHtml,
|
||||
readableContent: parsedContent?.content || '',
|
||||
description: parsedContent?.excerpt,
|
||||
title:
|
||||
|
||||
@ -13,7 +13,6 @@
|
||||
"ioredis": "^5.3.2",
|
||||
"posthog-node": "^3.6.3",
|
||||
"@google-cloud/functions-framework": "^3.0.0",
|
||||
"@google-cloud/storage": "^7.0.1",
|
||||
"@omnivore/puppeteer-parse": "^1.0.0",
|
||||
"@sentry/serverless": "^7.77.0"
|
||||
},
|
||||
|
||||
@ -1,9 +1,8 @@
|
||||
import { Storage } from '@google-cloud/storage'
|
||||
import { fetchContent } from '@omnivore/puppeteer-parse'
|
||||
import crypto from 'crypto'
|
||||
import { RequestHandler } from 'express'
|
||||
import { analytics } from './analytics'
|
||||
import { queueSavePageJob } from './job'
|
||||
import { redisDataSource } from './redis_data_source'
|
||||
|
||||
interface User {
|
||||
id: string
|
||||
@ -48,27 +47,20 @@ interface LogRecord {
|
||||
totalTime?: number
|
||||
}
|
||||
|
||||
const storage = process.env.GCS_UPLOAD_SA_KEY_FILE_PATH
|
||||
? new Storage({ keyFilename: process.env.GCS_UPLOAD_SA_KEY_FILE_PATH })
|
||||
: new Storage()
|
||||
const bucketName = process.env.GCS_UPLOAD_BUCKET || 'omnivore-files'
|
||||
|
||||
export const uploadToBucket = async (filename: string, data: string) => {
|
||||
const file = storage.bucket(bucketName).file(`originalContent/${filename}`)
|
||||
|
||||
// check if the file already exists
|
||||
const [exists] = await file.exists()
|
||||
|
||||
if (exists) {
|
||||
console.log('file already exists', filename)
|
||||
return
|
||||
}
|
||||
|
||||
await file.save(data, { public: false, timeout: 30000 })
|
||||
interface FetchResult {
|
||||
finalUrl: string
|
||||
title?: string
|
||||
content?: string
|
||||
contentType?: string
|
||||
}
|
||||
|
||||
const hash = (content: string) =>
|
||||
crypto.createHash('md5').update(content).digest('hex')
|
||||
export const cacheFetchResult = async (fetchResult: FetchResult) => {
|
||||
// cache the fetch result for 24 hours
|
||||
const ttl = 24 * 60 * 60
|
||||
const key = `fetch-result:${fetchResult.finalUrl}`
|
||||
const value = JSON.stringify(fetchResult)
|
||||
return redisDataSource.cacheClient.set(key, value, 'EX', ttl, 'NX')
|
||||
}
|
||||
|
||||
export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
|
||||
const functionStartTime = Date.now()
|
||||
@ -122,15 +114,6 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
|
||||
try {
|
||||
const fetchResult = await fetchContent(url, locale, timezone)
|
||||
const finalUrl = fetchResult.finalUrl
|
||||
let urlHash: string | undefined
|
||||
|
||||
const content = fetchResult.content
|
||||
if (content) {
|
||||
// hash final url to use as key
|
||||
urlHash = hash(finalUrl)
|
||||
await uploadToBucket(urlHash, content)
|
||||
console.log('content uploaded to bucket', urlHash)
|
||||
}
|
||||
|
||||
const savePageJobs = users.map((user) => ({
|
||||
userId: user.id,
|
||||
@ -147,15 +130,15 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
|
||||
savedAt,
|
||||
publishedAt,
|
||||
taskId,
|
||||
title: fetchResult.title,
|
||||
contentType: fetchResult.contentType,
|
||||
urlHash,
|
||||
},
|
||||
isRss: !!rssFeedUrl,
|
||||
isImport: !!taskId,
|
||||
priority,
|
||||
}))
|
||||
|
||||
const cacheResult = await cacheFetchResult(fetchResult)
|
||||
console.log('cacheFetchResult result', cacheResult)
|
||||
|
||||
const jobs = await queueSavePageJob(savePageJobs)
|
||||
console.log('save-page jobs queued', jobs.length)
|
||||
} catch (error) {
|
||||
|
||||
Reference in New Issue
Block a user