create batch save page jobs after fetching content

This commit is contained in:
Hongbo Wu
2024-01-18 15:45:14 +08:00
parent 8a3b50f07f
commit 0015a946b2
5 changed files with 202 additions and 74 deletions

View File

@ -2,5 +2,13 @@
"extends": "../../.eslintrc",
"parserOptions": {
"project": "tsconfig.json"
},
"rules": {
"@typescript-eslint/no-floating-promises": [
"error",
{
"ignoreIIFE": true
}
]
}
}

View File

@ -8,8 +8,10 @@
],
"dependencies": {
"axios": "^0.27.2",
"bullmq": "^5.1.1",
"dotenv": "^8.2.0",
"express": "^4.17.1",
"ioredis": "^5.3.2",
"jsonwebtoken": "^8.5.1",
"@google-cloud/functions-framework": "^3.0.0",
"@omnivore/puppeteer-parse": "^1.0.0",

View File

@ -0,0 +1,35 @@
import { Queue } from 'bullmq'
import { redis } from './redis'
const QUEUE_NAME = 'omnivore-content-fetch'
interface savePageJob {
url: string
userId: string
data: unknown
}
const createQueue = (): Queue | undefined => {
return new Queue(QUEUE_NAME, {
connection: redis,
})
}
export const queueSavePageJob = async (savePageJobs: savePageJob[]) => {
const queue = createQueue()
if (!queue) {
return undefined
}
const jobs = savePageJobs.map((job) => ({
name: 'save-page',
data: job.data,
opts: {
jobId: `${job.userId}-${job.url}`,
removeOnComplete: true,
removeOnFail: true,
},
}))
return queue.addBulk(jobs)
}

View File

@ -0,0 +1,45 @@
import { Redis } from 'ioredis'
const url = process.env.REDIS_URL
const cert = process.env.REDIS_CERT
export const redis = new Redis(url || 'redis://localhost:6379', {
connectTimeout: 10000, // 10 seconds
tls: cert
? {
cert,
rejectUnauthorized: false, // for self-signed certs
}
: undefined,
reconnectOnError: (err) => {
const targetErrors = [/READONLY/, /ETIMEDOUT/]
targetErrors.forEach((targetError) => {
if (targetError.test(err.message)) {
// Only reconnect when the error contains the keyword
return true
}
})
return false
},
retryStrategy: (times) => {
if (times > 10) {
// End reconnecting after a specific number of tries and flush all commands with a individual error
return null
}
// reconnect after
return Math.min(times * 50, 2000)
},
})
// graceful shutdown
process.on('SIGINT', () => {
;(async () => {
console.log('SIGINT signal received: closing HTTP server')
await redis.quit()
console.log('redis connection closed')
process.exit()
})()
})

View File

@ -1,15 +1,15 @@
import { fetchContent } from '@omnivore/puppeteer-parse'
import { RequestHandler } from 'express'
import {
sendCreateArticleMutation,
sendImportStatusUpdate,
sendSavePageMutation,
uploadPdf,
} from './api'
import { queueSavePageJob } from './job'
interface User {
id: string
folder?: string
}
interface RequestBody {
url: string
userId: string
userId?: string
saveRequestId: string
state?: string
labels?: string[]
@ -21,12 +21,12 @@ interface RequestBody {
savedAt?: string
publishedAt?: string
folder?: string
users?: string[]
users?: User[]
}
interface LogRecord {
url: string
userId: string
userId?: string
articleSavingRequestId: string
labels: {
source: string
@ -40,19 +40,29 @@ interface LogRecord {
savedAt?: string
publishedAt?: string
folder?: string
users?: string[]
users?: User[]
error?: string
totalTime?: number
}
const MAX_RETRY_COUNT = process.env.MAX_RETRY_COUNT || '1'
// const MAX_RETRY_COUNT = process.env.MAX_RETRY_COUNT || '1'
export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
const functionStartTime = Date.now()
const body = <RequestBody>req.body
let users = body.users || [] // users is used when saving article for multiple users
const userId = body.userId
const folder = body.folder
if (userId) {
users = [
{
id: userId,
folder: body.folder,
},
] // userId is used when saving article for a single user
}
const articleSavingRequestId = body.saveRequestId
const state = body.state
const labels = body.labels
@ -64,8 +74,6 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
const rssFeedUrl = body.rssFeedUrl
const savedAt = body.savedAt
const publishedAt = body.publishedAt
const folder = body.folder
const users = body ? body.users : undefined // users is used when saving article for multiple users
const logRecord: LogRecord = {
url,
@ -88,90 +96,120 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
console.log(`Article parsing request`, logRecord)
let importStatus,
statusCode = 200
// let importStatus,
// statusCode = 200
try {
const fetchResult = await fetchContent(url, locale, timezone)
const finalUrl = fetchResult.finalUrl
const title = fetchResult.title
const content = fetchResult.content
const contentType = fetchResult.contentType
const readabilityResult = fetchResult.readabilityResult as unknown
if (fetchResult.contentType === 'application/pdf') {
const uploadFileId = await uploadPdf(
finalUrl,
userId,
articleSavingRequestId
)
const uploadedPdf = await sendCreateArticleMutation(userId, {
url: encodeURI(finalUrl),
articleSavingRequestId,
uploadFileId,
state,
labels,
source,
folder,
rssFeedUrl,
savedAt,
publishedAt,
})
if (!uploadedPdf) {
statusCode = 500
logRecord.error = 'error while saving uploaded pdf'
} else {
importStatus = 'imported'
}
} else {
const apiResponse = await sendSavePageMutation(userId, {
url,
clientRequestId: articleSavingRequestId,
const savePageJobs = users.map((user) => ({
url: finalUrl,
userId: user.id,
data: {
url: finalUrl,
title,
originalContent: content,
parseResult: readabilityResult,
content,
contentType,
readabilityResult,
articleSavingRequestId,
state,
labels,
source,
folder: user.folder,
rssFeedUrl,
savedAt,
publishedAt,
source,
folder,
})
if (!apiResponse) {
logRecord.error = 'error while saving page'
statusCode = 500
} else if (
'error' in apiResponse &&
apiResponse.error === 'UNAUTHORIZED'
) {
console.log('user is deleted, do not retry', logRecord)
return res.sendStatus(200)
} else {
importStatus = readabilityResult ? 'imported' : 'failed'
}
},
}))
const result = await queueSavePageJob(savePageJobs)
console.log('queueSavePageJob result', result)
if (!result) {
logRecord.error = 'error while queueing save page job'
return res.sendStatus(500)
}
// if (fetchResult.contentType === 'application/pdf') {
// const uploadFileId = await uploadPdf(
// finalUrl,
// userId,
// articleSavingRequestId
// )
// const uploadedPdf = await sendCreateArticleMutation(userId, {
// url: encodeURI(finalUrl),
// articleSavingRequestId,
// uploadFileId,
// state,
// labels,
// source,
// folder,
// rssFeedUrl,
// savedAt,
// publishedAt,
// })
// if (!uploadedPdf) {
// statusCode = 500
// logRecord.error = 'error while saving uploaded pdf'
// } else {
// importStatus = 'imported'
// }
// } else {
// const apiResponse = await sendSavePageMutation(userId, {
// url,
// clientRequestId: articleSavingRequestId,
// title,
// originalContent: content,
// parseResult: readabilityResult,
// state,
// labels,
// rssFeedUrl,
// savedAt,
// publishedAt,
// source,
// folder,
// })
// if (!apiResponse) {
// logRecord.error = 'error while saving page'
// statusCode = 500
// } else if (
// 'error' in apiResponse &&
// apiResponse.error === 'UNAUTHORIZED'
// ) {
// console.log('user is deleted, do not retry', logRecord)
// return res.sendStatus(200)
// } else {
// importStatus = readabilityResult ? 'imported' : 'failed'
// }
// }
} catch (error) {
console.error(error)
if (error instanceof Error) {
logRecord.error = error.message
} else {
logRecord.error = 'unknown error'
}
return res.sendStatus(500)
} finally {
logRecord.totalTime = Date.now() - functionStartTime
console.log(`parse-page result`, logRecord)
// mark import failed on the last failed retry
const retryCount = req.headers['x-cloudtasks-taskretrycount']
if (retryCount === MAX_RETRY_COUNT) {
console.log('max retry count reached')
importStatus = importStatus || 'failed'
}
// send import status to update the metrics
if (taskId && importStatus) {
await sendImportStatusUpdate(userId, taskId, importStatus)
}
res.sendStatus(statusCode)
// // mark import failed on the last failed retry
// const retryCount = req.headers['x-cloudtasks-taskretrycount']
// if (retryCount === MAX_RETRY_COUNT) {
// console.log('max retry count reached')
// importStatus = importStatus || 'failed'
// }
// // send import status to update the metrics
// if (taskId && importStatus) {
// await sendImportStatusUpdate(userId, taskId, importStatus)
// }
// res.sendStatus(statusCode)
}
res.sendStatus(200)
}