This commit is contained in:
Hongbo Wu
2024-05-14 20:18:18 +08:00
parent dbd7b7932f
commit 9dee510be1
9 changed files with 141 additions and 107 deletions

View File

@ -2,6 +2,7 @@ import axios from 'axios'
import crypto from 'crypto'
import { parseHTML } from 'linkedom'
import Parser, { Item } from 'rss-parser'
import { v4 as uuid } from 'uuid'
import { FetchContentType } from '../../entity/subscription'
import { env } from '../../env'
import { ArticleSavingRequestStatus } from '../../generated/graphql'
@ -72,13 +73,14 @@ export type RssFeedItem = Item & {
link: string
}
interface User {
interface UserConfig {
id: string
folder: FolderType
libraryItemId: string
}
interface FetchContentTask {
users: Map<string, User> // userId -> User
users: Map<string, UserConfig> // userId -> User
item: RssFeedItem
}
@ -280,13 +282,16 @@ const addFetchContentTask = (
) => {
const url = item.link
const task = fetchContentTasks.get(url)
const libraryItemId = uuid()
const userConfig = { id: userId, folder, libraryItemId }
if (!task) {
fetchContentTasks.set(url, {
users: new Map([[userId, { id: userId, folder }]]),
users: new Map([[userId, userConfig]]),
item,
})
} else {
task.users.set(userId, { id: userId, folder })
task.users.set(userId, userConfig)
}
return true
@ -319,7 +324,7 @@ const createTask = async (
}
const fetchContentAndCreateItem = async (
users: User[],
users: UserConfig[],
feedUrl: string,
item: RssFeedItem
) => {
@ -327,7 +332,6 @@ const fetchContentAndCreateItem = async (
users,
source: 'rss-feeder',
url: item.link.trim(),
saveRequestId: '',
labels: [{ name: 'RSS' }],
rssFeedUrl: feedUrl,
savedAt: item.isoDate,

View File

@ -6,13 +6,18 @@ import {
ArticleSavingRequestStatus,
CreateLabelInput,
} from '../generated/graphql'
import { redisDataSource } from '../redis_data_source'
import { userRepository } from '../repository/user'
import { saveFile } from '../services/save_file'
import { savePage } from '../services/save_page'
import { uploadFile } from '../services/upload_file'
import { logError, logger } from '../utils/logger'
import { downloadFromUrl, uploadToSignedUrl } from '../utils/uploads'
import {
contentFilePath,
downloadFromBucket,
downloadFromUrl,
isFileExists,
uploadToSignedUrl,
} from '../utils/uploads'
const signToken = promisify(jwt.sign)
@ -27,54 +32,19 @@ interface Data {
url: string
finalUrl: string
articleSavingRequestId: string
title: string
contentType: string
savedAt: string
state?: string
labels?: CreateLabelInput[]
source: string
folder: string
rssFeedUrl?: string
savedAt?: string
publishedAt?: string
taskId?: string
}
interface FetchResult {
finalUrl: string
title?: string
content?: string
contentType?: string
}
const isFetchResult = (obj: unknown): obj is FetchResult => {
return typeof obj === 'object' && obj !== null && 'finalUrl' in obj
}
const getCachedFetchResult = async (url: string) => {
const key = `fetch-result:${url}`
if (!redisDataSource.redisClient || !redisDataSource.workerRedisClient) {
throw new Error('redis client is not initialized')
}
let result = await redisDataSource.redisClient.get(key)
if (!result) {
logger.debug(`fetch result is not cached in cache redis ${url}`)
// fallback to worker redis client if the result is not found
result = await redisDataSource.workerRedisClient.get(key)
if (!result) {
throw new Error('fetch result is not cached')
}
}
const fetchResult = JSON.parse(result) as unknown
if (!isFetchResult(fetchResult)) {
throw new Error('fetch result is not valid')
}
logger.info('fetch result is cached', url)
return fetchResult
}
const uploadPdf = async (
url: string,
userId: string,
@ -160,10 +130,11 @@ export const savePageJob = async (data: Data, attemptsMade: number) => {
taskId,
url,
finalUrl,
title,
contentType,
state,
} = data
let isImported,
isSaved,
state = data.state
let isImported, isSaved
logger.info('savePageJob', {
userId,
@ -182,11 +153,6 @@ export const savePageJob = async (data: Data, attemptsMade: number) => {
}
try {
// get the fetch result from cache
const fetchedResult = await getCachedFetchResult(finalUrl)
const { title, contentType } = fetchedResult
let content = fetchedResult.content
// for pdf content, we need to upload the pdf
if (contentType === 'application/pdf') {
const uploadResult = await uploadPdf(
@ -219,14 +185,27 @@ export const savePageJob = async (data: Data, attemptsMade: number) => {
return true
}
if (!content) {
logger.info(`content is not fetched: ${finalUrl}`)
// set the state to failed if we don't have content
content = 'Failed to fetch content'
state = ArticleSavingRequestStatus.Failed
// download the original content
const filePath = contentFilePath(
userId,
articleSavingRequestId,
new Date(savedAt).getTime(),
'original'
)
const exists = await isFileExists(filePath)
if (!exists) {
logger.error('Original content file does not exist', {
finalUrl,
filePath,
})
throw new Error('Original content file does not exist')
}
// for non-pdf content, we need to save the page
const content = (await downloadFromBucket(filePath)).toString()
console.log('Downloaded original content from:', filePath)
// for non-pdf content, we need to save the content
const result = await savePage(
{
url: finalUrl,
@ -236,10 +215,11 @@ export const savePageJob = async (data: Data, attemptsMade: number) => {
state: (state as ArticleSavingRequestStatus) || undefined,
labels,
rssFeedUrl,
savedAt: savedAt ? new Date(savedAt) : new Date(),
savedAt,
publishedAt: publishedAt ? new Date(publishedAt) : null,
source,
folder,
originalContentUploaded: true,
},
user
)

View File

@ -6,7 +6,11 @@ import { getClaimsByToken, getTokenByRequest } from '../utils/auth'
import { corsConfig } from '../utils/corsConfig'
import { enqueueBulkUploadContentJob } from '../utils/createTask'
import { logger } from '../utils/logger'
import { generateDownloadSignedUrl, isFileExists } from '../utils/uploads'
import {
contentFilePath,
generateDownloadSignedUrl,
isFileExists,
} from '../utils/uploads'
export function contentRouter() {
const router = Router()
@ -58,7 +62,7 @@ export function contentRouter() {
const userId = claims.uid
const libraryItems = await findLibraryItemsByIds(libraryItemIds, userId, {
select: ['id', 'updatedAt'],
select: ['id', 'updatedAt', 'savedAt'],
})
if (libraryItems.length === 0) {
logger.error('Library items not found')
@ -68,9 +72,14 @@ export function contentRouter() {
// generate signed url for each library item
const data = await Promise.all(
libraryItems.map(async (libraryItem) => {
const filePath = `content/${userId}/${
libraryItem.id
}.${libraryItem.updatedAt.getTime()}.${format}`
const date =
format === 'original' ? libraryItem.savedAt : libraryItem.updatedAt
const filePath = contentFilePath(
userId,
libraryItem.id,
date.getTime(),
format
)
try {
const downloadUrl = await generateDownloadSignedUrl(filePath, {

View File

@ -24,7 +24,11 @@ import { Merge, PickTuple } from '../util'
import { deepDelete, setRecentlySavedItemInRedis } from '../utils/helpers'
import { logger } from '../utils/logger'
import { parseSearchQuery } from '../utils/search'
import { downloadFromBucket, uploadToBucket } from '../utils/uploads'
import {
contentFilePath,
downloadFromBucket,
uploadToBucket,
} from '../utils/uploads'
import { HighlightEvent } from './highlights'
import { addLabelsToLibraryItem, LabelEvent } from './labels'
@ -1015,7 +1019,8 @@ export const createOrUpdateLibraryItem = async (
libraryItem: CreateOrUpdateLibraryItemArgs,
userId: string,
pubsub = createPubSubClient(),
skipPubSub = false
skipPubSub = false,
originalContentUploaded = false
): Promise<LibraryItem> => {
let originalContent: string | null = null
if (libraryItem.originalContent) {
@ -1098,9 +1103,14 @@ export const createOrUpdateLibraryItem = async (
const data = deepDelete(newLibraryItem, columnsToDelete)
await pubsub.entityCreated<ItemEvent>(EntityType.ITEM, data, userId)
// upload original content to GCS
if (originalContent) {
await uploadOriginalContent(userId, newLibraryItem.id, originalContent)
// upload original content to GCS if it's not already uploaded
if (originalContent && !originalContentUploaded) {
await uploadOriginalContent(
userId,
newLibraryItem.id,
newLibraryItem.savedAt,
originalContent
)
logger.info('Uploaded original content to GCS', {
id: newLibraryItem.id,
})
@ -1681,16 +1691,14 @@ export const filterItemEvents = (
throw new Error('Unexpected state.')
}
const originalContentFilename = (userId: string, libraryItemId: string) =>
`original-content/${userId}/${libraryItemId}.html`
export const uploadOriginalContent = async (
userId: string,
libraryItemId: string,
savedAt: Date,
originalContent: string
) => {
await uploadToBucket(
originalContentFilename(userId, libraryItemId),
contentFilePath(userId, libraryItemId, savedAt.getTime(), 'original'),
Buffer.from(originalContent),
{
public: false,
@ -1701,7 +1709,10 @@ export const uploadOriginalContent = async (
export const downloadOriginalContent = async (
userId: string,
libraryItemId: string
libraryItemId: string,
savedAt: Date
) => {
return downloadFromBucket(originalContentFilename(userId, libraryItemId))
return downloadFromBucket(
contentFilePath(userId, libraryItemId, savedAt.getTime(), 'original')
)
}

View File

@ -68,7 +68,12 @@ const shouldParseInBackend = (input: SavePageInput): boolean => {
export type SavePageArgs = Merge<
SavePageInput,
{ feedContent?: string; previewImage?: string; author?: string }
{
feedContent?: string
previewImage?: string
author?: string
originalContentUploaded?: boolean
}
>
export const savePage = async (
@ -145,7 +150,8 @@ export const savePage = async (
itemToSave,
user.id,
undefined,
isImported
isImported,
input.originalContentUploaded
)
clientRequestId = newItem.id

View File

@ -5,6 +5,7 @@ import axios from 'axios'
import { ContentReaderType } from '../entity/library_item'
import { env } from '../env'
import { PageType } from '../generated/graphql'
import { ContentFormat } from '../jobs/upload_content'
import { logger } from './logger'
export const contentReaderForLibraryItem = (
@ -157,13 +158,14 @@ export const isFileExists = async (filePath: string): Promise<boolean> => {
export const downloadFromBucket = async (filePath: string): Promise<Buffer> => {
const file = storage.bucket(bucketName).file(filePath)
const [exists] = await file.exists()
if (!exists) {
logger.error(`File not found: ${filePath}`)
throw new Error('File not found')
}
// Download the file contents as a string
// Download the file contents
const [data] = await file.download()
return data
}
export const contentFilePath = (
userId: string,
libraryItemId: string,
timestamp: number,
format: ContentFormat
) => `content/${userId}/${libraryItemId}.${timestamp}.${format}`

View File

@ -13,6 +13,7 @@
"ioredis": "^5.3.2",
"posthog-node": "^3.6.3",
"@google-cloud/functions-framework": "^3.0.0",
"@google-cloud/storage": "^7.0.1",
"@omnivore/puppeteer-parse": "^1.0.0",
"@sentry/serverless": "^7.77.0"
},

View File

@ -9,6 +9,7 @@ interface SavePageJobData {
url: string
finalUrl: string
articleSavingRequestId: string
state?: string
labels?: string[]
source: string
@ -17,6 +18,8 @@ interface SavePageJobData {
savedAt?: string
publishedAt?: string
taskId?: string
title?: string
contentType?: string
}
interface SavePageJob {

View File

@ -1,11 +1,12 @@
import { Storage } from '@google-cloud/storage'
import { fetchContent } from '@omnivore/puppeteer-parse'
import { RequestHandler } from 'express'
import { analytics } from './analytics'
import { queueSavePageJob } from './job'
import { redisDataSource } from './redis_data_source'
interface User {
interface UserConfig {
id: string
libraryItemId: string
folder?: string
}
@ -23,7 +24,7 @@ interface RequestBody {
savedAt?: string
publishedAt?: string
folder?: string
users?: User[]
users?: UserConfig[]
priority: 'high' | 'low'
}
@ -42,24 +43,37 @@ interface LogRecord {
savedAt?: string
publishedAt?: string
folder?: string
users?: User[]
users?: UserConfig[]
error?: string
totalTime?: number
}
interface FetchResult {
finalUrl: string
title?: string
content?: string
contentType?: string
const storage = process.env.GCS_UPLOAD_SA_KEY_FILE_PATH
? new Storage({ keyFilename: process.env.GCS_UPLOAD_SA_KEY_FILE_PATH })
: new Storage()
const bucketName = process.env.GCS_UPLOAD_BUCKET || 'omnivore-files'
const uploadToBucket = async (filePath: string, data: string) => {
await storage
.bucket(bucketName)
.file(filePath)
.save(data, { public: false, timeout: 30000 })
}
export const cacheFetchResult = async (fetchResult: FetchResult) => {
// cache the fetch result for 24 hours
const ttl = 24 * 60 * 60
const key = `fetch-result:${fetchResult.finalUrl}`
const value = JSON.stringify(fetchResult)
return redisDataSource.cacheClient.set(key, value, 'EX', ttl, 'NX')
const uploadOriginalContent = async (
users: UserConfig[],
content: string,
savedTimestamp: number
) => {
await Promise.all(
users.map(async (user) => {
const filePath = `content/${user.id}/${user.libraryItemId}.${savedTimestamp}.original`
await uploadToBucket(filePath, content)
console.log(`Original content uploaded to ${filePath}`)
})
)
}
export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
@ -76,6 +90,7 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
{
id: userId,
folder: body.folder,
libraryItemId: body.saveRequestId,
},
]
}
@ -112,8 +127,12 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
console.log(`Article parsing request`, logRecord)
try {
const savedDate = savedAt ? new Date(savedAt) : new Date()
const fetchResult = await fetchContent(url, locale, timezone)
const finalUrl = fetchResult.finalUrl
const { title, content, contentType, finalUrl } = fetchResult
if (content) {
await uploadOriginalContent(users, content, savedDate.getTime())
}
const savePageJobs = users.map((user) => ({
userId: user.id,
@ -121,24 +140,23 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
userId: user.id,
url,
finalUrl,
articleSavingRequestId,
articleSavingRequestId: user.libraryItemId,
state,
labels,
source,
folder: user.folder,
rssFeedUrl,
savedAt,
savedAt: savedDate.toISOString(),
publishedAt,
taskId,
title,
contentType,
},
isRss: !!rssFeedUrl,
isImport: !!taskId,
priority,
}))
const cacheResult = await cacheFetchResult(fetchResult)
console.log('cacheFetchResult result', cacheResult)
const jobs = await queueSavePageJob(savePageJobs)
console.log('save-page jobs queued', jobs.length)
} catch (error) {