upload and download original content from GCS
This commit is contained in:
@ -6,13 +6,13 @@ import {
|
||||
ArticleSavingRequestStatus,
|
||||
CreateLabelInput,
|
||||
} from '../generated/graphql'
|
||||
import { redisDataSource } from '../redis_data_source'
|
||||
import { userRepository } from '../repository/user'
|
||||
import { saveFile } from '../services/save_file'
|
||||
import { savePage } from '../services/save_page'
|
||||
import { uploadFile } from '../services/upload_file'
|
||||
import { logError, logger } from '../utils/logger'
|
||||
import { downloadFromUrl, uploadToSignedUrl } from '../utils/uploads'
|
||||
import { downloadStringFromBucket } from '../utils/uploads'
|
||||
|
||||
const signToken = promisify(jwt.sign)
|
||||
|
||||
@ -27,6 +27,9 @@ interface Data {
|
||||
url: string
|
||||
finalUrl: string
|
||||
articleSavingRequestId: string
|
||||
title: string
|
||||
contentType: string
|
||||
|
||||
state?: string
|
||||
labels?: CreateLabelInput[]
|
||||
source: string
|
||||
@ -35,6 +38,7 @@ interface Data {
|
||||
savedAt?: string
|
||||
publishedAt?: string
|
||||
taskId?: string
|
||||
contentHash?: string
|
||||
}
|
||||
|
||||
interface FetchResult {
|
||||
@ -120,32 +124,6 @@ const sendImportStatusUpdate = async (
|
||||
}
|
||||
}
|
||||
|
||||
const getCachedFetchResult = async (url: string) => {
|
||||
const key = `fetch-result:${url}`
|
||||
if (!redisDataSource.redisClient || !redisDataSource.workerRedisClient) {
|
||||
throw new Error('redis client is not initialized')
|
||||
}
|
||||
|
||||
let result = await redisDataSource.redisClient.get(key)
|
||||
if (!result) {
|
||||
logger.debug(`fetch result is not cached in cache redis ${url}`)
|
||||
// fallback to worker redis client if the result is not found
|
||||
result = await redisDataSource.workerRedisClient.get(key)
|
||||
if (!result) {
|
||||
throw new Error('fetch result is not cached')
|
||||
}
|
||||
}
|
||||
|
||||
const fetchResult = JSON.parse(result) as unknown
|
||||
if (!isFetchResult(fetchResult)) {
|
||||
throw new Error('fetch result is not valid')
|
||||
}
|
||||
|
||||
logger.info('fetch result is cached', url)
|
||||
|
||||
return fetchResult
|
||||
}
|
||||
|
||||
export const savePageJob = async (data: Data, attemptsMade: number) => {
|
||||
const {
|
||||
userId,
|
||||
@ -159,6 +137,9 @@ export const savePageJob = async (data: Data, attemptsMade: number) => {
|
||||
taskId,
|
||||
url,
|
||||
finalUrl,
|
||||
title,
|
||||
contentType,
|
||||
contentHash,
|
||||
} = data
|
||||
let isImported,
|
||||
isSaved,
|
||||
@ -171,11 +152,6 @@ export const savePageJob = async (data: Data, attemptsMade: number) => {
|
||||
finalUrl,
|
||||
})
|
||||
|
||||
// get the fetch result from cache
|
||||
const fetchedResult = await getCachedFetchResult(finalUrl)
|
||||
const { title, contentType } = fetchedResult
|
||||
let content = fetchedResult.content
|
||||
|
||||
const user = await userRepository.findById(userId)
|
||||
if (!user) {
|
||||
logger.error('Unable to save job, user can not be found.', {
|
||||
@ -218,11 +194,24 @@ export const savePageJob = async (data: Data, attemptsMade: number) => {
|
||||
return true
|
||||
}
|
||||
|
||||
if (!content) {
|
||||
logger.info(`content is not fetched: ${finalUrl}`)
|
||||
let originalContent
|
||||
if (!contentHash) {
|
||||
logger.info(`content is not uploaded: ${finalUrl}`)
|
||||
// set the state to failed if we don't have content
|
||||
content = 'Failed to fetch content'
|
||||
originalContent = 'Failed to fetch content'
|
||||
state = ArticleSavingRequestStatus.Failed
|
||||
} else {
|
||||
// download content from the bucket
|
||||
const downloaded = await downloadStringFromBucket(
|
||||
`originalContent/${contentHash}`
|
||||
)
|
||||
if (!downloaded) {
|
||||
logger.error('error while downloading content from bucket')
|
||||
originalContent = 'Failed to fetch content'
|
||||
state = ArticleSavingRequestStatus.Failed
|
||||
} else {
|
||||
originalContent = downloaded
|
||||
}
|
||||
}
|
||||
|
||||
// for non-pdf content, we need to save the page
|
||||
@ -231,7 +220,7 @@ export const savePageJob = async (data: Data, attemptsMade: number) => {
|
||||
url: finalUrl,
|
||||
clientRequestId: articleSavingRequestId,
|
||||
title,
|
||||
originalContent: content,
|
||||
originalContent,
|
||||
state: state ? (state as ArticleSavingRequestStatus) : undefined,
|
||||
labels: labels,
|
||||
rssFeedUrl,
|
||||
|
||||
@ -153,3 +153,24 @@ export const isFileExists = async (filePath: string): Promise<boolean> => {
|
||||
const [exists] = await storage.bucket(bucketName).file(filePath).exists()
|
||||
return exists
|
||||
}
|
||||
|
||||
export const downloadStringFromBucket = async (
|
||||
filePath: string
|
||||
): Promise<string | null> => {
|
||||
try {
|
||||
const bucket = storage.bucket(bucketName)
|
||||
|
||||
const [exists] = await bucket.file(filePath).exists()
|
||||
if (!exists) {
|
||||
logger.error(`File not found: ${filePath}`)
|
||||
return null
|
||||
}
|
||||
|
||||
// Download the file contents as a string
|
||||
const [data] = await bucket.file(filePath).download()
|
||||
return data.toString()
|
||||
} catch (error) {
|
||||
logger.info('Error downloading file:', error)
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
@ -13,6 +13,7 @@
|
||||
"ioredis": "^5.3.2",
|
||||
"posthog-node": "^3.6.3",
|
||||
"@google-cloud/functions-framework": "^3.0.0",
|
||||
"@google-cloud/storage": "^7.0.1",
|
||||
"@omnivore/puppeteer-parse": "^1.0.0",
|
||||
"@sentry/serverless": "^7.77.0"
|
||||
},
|
||||
|
||||
@ -1,8 +1,9 @@
|
||||
import { Storage } from '@google-cloud/storage'
|
||||
import { fetchContent } from '@omnivore/puppeteer-parse'
|
||||
import crypto from 'crypto'
|
||||
import { RequestHandler } from 'express'
|
||||
import { analytics } from './analytics'
|
||||
import { queueSavePageJob } from './job'
|
||||
import { redisDataSource } from './redis_data_source'
|
||||
|
||||
interface User {
|
||||
id: string
|
||||
@ -47,20 +48,20 @@ interface LogRecord {
|
||||
totalTime?: number
|
||||
}
|
||||
|
||||
interface FetchResult {
|
||||
finalUrl: string
|
||||
title?: string
|
||||
content?: string
|
||||
contentType?: string
|
||||
const storage = process.env.GCS_UPLOAD_SA_KEY_FILE_PATH
|
||||
? new Storage({ keyFilename: process.env.GCS_UPLOAD_SA_KEY_FILE_PATH })
|
||||
: new Storage()
|
||||
const bucketName = process.env.GCS_UPLOAD_BUCKET || 'omnivore-files'
|
||||
|
||||
export const uploadToBucket = async (filename: string, data: string) => {
|
||||
await storage
|
||||
.bucket(bucketName)
|
||||
.file(`originalContent/${filename}`)
|
||||
.save(data, { public: false, timeout: 30000 })
|
||||
}
|
||||
|
||||
export const cacheFetchResult = async (fetchResult: FetchResult) => {
|
||||
// cache the fetch result for 24 hours
|
||||
const ttl = 24 * 60 * 60
|
||||
const key = `fetch-result:${fetchResult.finalUrl}`
|
||||
const value = JSON.stringify(fetchResult)
|
||||
return redisDataSource.cacheClient.set(key, value, 'EX', ttl, 'NX')
|
||||
}
|
||||
const hash = (content: string) =>
|
||||
crypto.createHash('md5').update(content).digest('hex')
|
||||
|
||||
export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
|
||||
const functionStartTime = Date.now()
|
||||
@ -114,6 +115,15 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
|
||||
try {
|
||||
const fetchResult = await fetchContent(url, locale, timezone)
|
||||
const finalUrl = fetchResult.finalUrl
|
||||
let contentHash: string | undefined
|
||||
|
||||
const content = fetchResult.content
|
||||
if (content) {
|
||||
// hash content to use as key
|
||||
contentHash = hash(content)
|
||||
await uploadToBucket(contentHash, content)
|
||||
console.log('content uploaded to bucket', contentHash)
|
||||
}
|
||||
|
||||
const savePageJobs = users.map((user) => ({
|
||||
userId: user.id,
|
||||
@ -130,15 +140,15 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
|
||||
savedAt,
|
||||
publishedAt,
|
||||
taskId,
|
||||
title: fetchResult.title,
|
||||
contentType: fetchResult.contentType,
|
||||
contentHash,
|
||||
},
|
||||
isRss: !!rssFeedUrl,
|
||||
isImport: !!taskId,
|
||||
priority,
|
||||
}))
|
||||
|
||||
const cacheResult = await cacheFetchResult(fetchResult)
|
||||
console.log('cacheFetchResult result', cacheResult)
|
||||
|
||||
const jobs = await queueSavePageJob(savePageJobs)
|
||||
console.log('save-page jobs queued', jobs.length)
|
||||
} catch (error) {
|
||||
|
||||
Reference in New Issue
Block a user