diff --git a/packages/content-fetch/.eslintrc b/packages/content-fetch/.eslintrc index 644bb1aec..301be9795 100644 --- a/packages/content-fetch/.eslintrc +++ b/packages/content-fetch/.eslintrc @@ -2,5 +2,13 @@ "extends": "../../.eslintrc", "parserOptions": { "project": "tsconfig.json" + }, + "rules": { + "@typescript-eslint/no-floating-promises": [ + "error", + { + "ignoreIIFE": true + } + ] } } diff --git a/packages/content-fetch/package.json b/packages/content-fetch/package.json index 541d6a39f..1d4af0339 100644 --- a/packages/content-fetch/package.json +++ b/packages/content-fetch/package.json @@ -8,8 +8,10 @@ ], "dependencies": { "axios": "^0.27.2", + "bullmq": "^5.1.1", "dotenv": "^8.2.0", "express": "^4.17.1", + "ioredis": "^5.3.2", "jsonwebtoken": "^8.5.1", "@google-cloud/functions-framework": "^3.0.0", "@omnivore/puppeteer-parse": "^1.0.0", diff --git a/packages/content-fetch/src/job.ts b/packages/content-fetch/src/job.ts new file mode 100644 index 000000000..7c4f21651 --- /dev/null +++ b/packages/content-fetch/src/job.ts @@ -0,0 +1,35 @@ +import { Queue } from 'bullmq' +import { redis } from './redis' + +const QUEUE_NAME = 'omnivore-content-fetch' + +interface savePageJob { + url: string + userId: string + data: unknown +} + +const createQueue = (): Queue | undefined => { + return new Queue(QUEUE_NAME, { + connection: redis, + }) +} + +export const queueSavePageJob = async (savePageJobs: savePageJob[]) => { + const queue = createQueue() + if (!queue) { + return undefined + } + + const jobs = savePageJobs.map((job) => ({ + name: 'save-page', + data: job.data, + opts: { + jobId: `${job.userId}-${job.url}`, + removeOnComplete: true, + removeOnFail: true, + }, + })) + + return queue.addBulk(jobs) +} diff --git a/packages/content-fetch/src/redis.ts b/packages/content-fetch/src/redis.ts new file mode 100644 index 000000000..900bf958c --- /dev/null +++ b/packages/content-fetch/src/redis.ts @@ -0,0 +1,45 @@ +import { Redis } from 'ioredis' + +const url = process.env.REDIS_URL +const cert = process.env.REDIS_CERT + +export const redis = new Redis(url || 'redis://localhost:6379', { + connectTimeout: 10000, // 10 seconds + tls: cert + ? { + cert, + rejectUnauthorized: false, // for self-signed certs + } + : undefined, + reconnectOnError: (err) => { + const targetErrors = [/READONLY/, /ETIMEDOUT/] + + targetErrors.forEach((targetError) => { + if (targetError.test(err.message)) { + // Only reconnect when the error contains the keyword + return true + } + }) + + return false + }, + retryStrategy: (times) => { + if (times > 10) { + // End reconnecting after a specific number of tries and flush all commands with a individual error + return null + } + + // reconnect after + return Math.min(times * 50, 2000) + }, +}) + +// graceful shutdown +process.on('SIGINT', () => { + ;(async () => { + console.log('SIGINT signal received: closing HTTP server') + await redis.quit() + console.log('redis connection closed') + process.exit() + })() +}) diff --git a/packages/content-fetch/src/request_handler.ts b/packages/content-fetch/src/request_handler.ts index a33ec5c47..9959f639b 100644 --- a/packages/content-fetch/src/request_handler.ts +++ b/packages/content-fetch/src/request_handler.ts @@ -1,15 +1,15 @@ import { fetchContent } from '@omnivore/puppeteer-parse' import { RequestHandler } from 'express' -import { - sendCreateArticleMutation, - sendImportStatusUpdate, - sendSavePageMutation, - uploadPdf, -} from './api' +import { queueSavePageJob } from './job' + +interface User { + id: string + folder?: string +} interface RequestBody { url: string - userId: string + userId?: string saveRequestId: string state?: string labels?: string[] @@ -21,12 +21,12 @@ interface RequestBody { savedAt?: string publishedAt?: string folder?: string - users?: string[] + users?: User[] } interface LogRecord { url: string - userId: string + userId?: string articleSavingRequestId: string labels: { source: string @@ -40,19 +40,29 @@ interface LogRecord { savedAt?: string publishedAt?: string folder?: string - users?: string[] + users?: User[] error?: string totalTime?: number } -const MAX_RETRY_COUNT = process.env.MAX_RETRY_COUNT || '1' +// const MAX_RETRY_COUNT = process.env.MAX_RETRY_COUNT || '1' export const contentFetchRequestHandler: RequestHandler = async (req, res) => { const functionStartTime = Date.now() const body = req.body + let users = body.users || [] // users is used when saving article for multiple users const userId = body.userId + const folder = body.folder + if (userId) { + users = [ + { + id: userId, + folder: body.folder, + }, + ] // userId is used when saving article for a single user + } const articleSavingRequestId = body.saveRequestId const state = body.state const labels = body.labels @@ -64,8 +74,6 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { const rssFeedUrl = body.rssFeedUrl const savedAt = body.savedAt const publishedAt = body.publishedAt - const folder = body.folder - const users = body ? body.users : undefined // users is used when saving article for multiple users const logRecord: LogRecord = { url, @@ -88,90 +96,120 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { console.log(`Article parsing request`, logRecord) - let importStatus, - statusCode = 200 + // let importStatus, + // statusCode = 200 try { const fetchResult = await fetchContent(url, locale, timezone) const finalUrl = fetchResult.finalUrl const title = fetchResult.title const content = fetchResult.content + const contentType = fetchResult.contentType const readabilityResult = fetchResult.readabilityResult as unknown - if (fetchResult.contentType === 'application/pdf') { - const uploadFileId = await uploadPdf( - finalUrl, - userId, - articleSavingRequestId - ) - const uploadedPdf = await sendCreateArticleMutation(userId, { - url: encodeURI(finalUrl), - articleSavingRequestId, - uploadFileId, - state, - labels, - source, - folder, - rssFeedUrl, - savedAt, - publishedAt, - }) - if (!uploadedPdf) { - statusCode = 500 - logRecord.error = 'error while saving uploaded pdf' - } else { - importStatus = 'imported' - } - } else { - const apiResponse = await sendSavePageMutation(userId, { - url, - clientRequestId: articleSavingRequestId, + + const savePageJobs = users.map((user) => ({ + url: finalUrl, + userId: user.id, + data: { + url: finalUrl, title, - originalContent: content, - parseResult: readabilityResult, + content, + contentType, + readabilityResult, + articleSavingRequestId, state, labels, + source, + folder: user.folder, rssFeedUrl, savedAt, publishedAt, - source, - folder, - }) - if (!apiResponse) { - logRecord.error = 'error while saving page' - statusCode = 500 - } else if ( - 'error' in apiResponse && - apiResponse.error === 'UNAUTHORIZED' - ) { - console.log('user is deleted, do not retry', logRecord) - return res.sendStatus(200) - } else { - importStatus = readabilityResult ? 'imported' : 'failed' - } + }, + })) + + const result = await queueSavePageJob(savePageJobs) + console.log('queueSavePageJob result', result) + if (!result) { + logRecord.error = 'error while queueing save page job' + return res.sendStatus(500) } + + // if (fetchResult.contentType === 'application/pdf') { + // const uploadFileId = await uploadPdf( + // finalUrl, + // userId, + // articleSavingRequestId + // ) + // const uploadedPdf = await sendCreateArticleMutation(userId, { + // url: encodeURI(finalUrl), + // articleSavingRequestId, + // uploadFileId, + // state, + // labels, + // source, + // folder, + // rssFeedUrl, + // savedAt, + // publishedAt, + // }) + // if (!uploadedPdf) { + // statusCode = 500 + // logRecord.error = 'error while saving uploaded pdf' + // } else { + // importStatus = 'imported' + // } + // } else { + // const apiResponse = await sendSavePageMutation(userId, { + // url, + // clientRequestId: articleSavingRequestId, + // title, + // originalContent: content, + // parseResult: readabilityResult, + // state, + // labels, + // rssFeedUrl, + // savedAt, + // publishedAt, + // source, + // folder, + // }) + // if (!apiResponse) { + // logRecord.error = 'error while saving page' + // statusCode = 500 + // } else if ( + // 'error' in apiResponse && + // apiResponse.error === 'UNAUTHORIZED' + // ) { + // console.log('user is deleted, do not retry', logRecord) + // return res.sendStatus(200) + // } else { + // importStatus = readabilityResult ? 'imported' : 'failed' + // } + // } } catch (error) { - console.error(error) if (error instanceof Error) { logRecord.error = error.message } else { logRecord.error = 'unknown error' } + + return res.sendStatus(500) } finally { logRecord.totalTime = Date.now() - functionStartTime console.log(`parse-page result`, logRecord) - // mark import failed on the last failed retry - const retryCount = req.headers['x-cloudtasks-taskretrycount'] - if (retryCount === MAX_RETRY_COUNT) { - console.log('max retry count reached') - importStatus = importStatus || 'failed' - } - - // send import status to update the metrics - if (taskId && importStatus) { - await sendImportStatusUpdate(userId, taskId, importStatus) - } - - res.sendStatus(statusCode) + // // mark import failed on the last failed retry + // const retryCount = req.headers['x-cloudtasks-taskretrycount'] + // if (retryCount === MAX_RETRY_COUNT) { + // console.log('max retry count reached') + // importStatus = importStatus || 'failed' + // } + // // send import status to update the metrics + // if (taskId && importStatus) { + // await sendImportStatusUpdate(userId, taskId, importStatus) + // } + // res.sendStatus(statusCode) } + + res.sendStatus(200) }