From 4340286b8949b15badd2cab1142c8e73ae34cb2d Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Fri, 19 Jan 2024 17:38:25 +0800 Subject: [PATCH] add retry on failure in jobs --- packages/api/src/jobs/save_page.ts | 79 ++++++++++++++++------------- packages/api/src/queue-processor.ts | 4 +- packages/content-fetch/src/job.ts | 4 +- 3 files changed, 48 insertions(+), 39 deletions(-) diff --git a/packages/api/src/jobs/save_page.ts b/packages/api/src/jobs/save_page.ts index 11d2837f7..1475e8e39 100644 --- a/packages/api/src/jobs/save_page.ts +++ b/packages/api/src/jobs/save_page.ts @@ -10,6 +10,7 @@ const IMPORTER_METRICS_COLLECTOR_URL = env.queue.importerMetricsUrl const JWT_SECRET = env.server.jwtSecret const REST_BACKEND_ENDPOINT = `${env.server.internalApiUrl}/api` +const MAX_ATTEMPTS = 1 const REQUEST_TIMEOUT = 30000 // 30 seconds interface Data { @@ -290,7 +291,7 @@ const sendSavePageMutation = async (userId: string, input: unknown) => { const sendImportStatusUpdate = async ( userId: string, taskId: string, - isContentParsed: boolean + isImported?: boolean ) => { try { const auth = await signToken({ uid: userId }, JWT_SECRET) @@ -299,7 +300,7 @@ const sendImportStatusUpdate = async ( IMPORTER_METRICS_COLLECTOR_URL, { taskId, - status: isContentParsed ? 'imported' : 'failed', + status: isImported ? 'imported' : 'failed', }, { headers: { @@ -330,10 +331,12 @@ const getCachedFetchResult = async (url: string) => { throw new Error('fetch result is not valid') } + console.log('fetch result is cached', url) + return fetchResult } -export const savePageJob = async (data: Data) => { +export const savePageJob = async (data: Data, attemptsMade: number) => { const { userId, articleSavingRequestId, @@ -346,7 +349,7 @@ export const savePageJob = async (data: Data) => { publishedAt, taskId, } = data - let isContentParsed = true + let isImported, isSaved try { const url = encodeURI(data.url) @@ -355,6 +358,7 @@ export const savePageJob = async (data: Data) => { const { title, content, contentType, readabilityResult } = await getCachedFetchResult(url) + // for pdf content, we need to upload the pdf if (contentType === 'application/pdf') { const uploadFileId = await uploadPdf(url, userId, articleSavingRequestId) const uploadedPdf = await sendCreateArticleMutation(userId, { @@ -372,42 +376,47 @@ export const savePageJob = async (data: Data) => { if (!uploadedPdf) { throw new Error('error while saving uploaded pdf') } - } else { - const apiResponse = await sendSavePageMutation(userId, { - url, - clientRequestId: articleSavingRequestId, - title, - originalContent: content, - parseResult: readabilityResult, - state, - labels, - rssFeedUrl, - savedAt, - publishedAt, - source, - folder, - }) - if (!apiResponse) { - throw new Error('error while saving page') - } - // if ('error' in apiResponse && apiResponse.error === 'UNAUTHORIZED') { - // console.log('user is deleted', userId) - // return true - // } - // if the readability result is not parsed, the import is failed - if (!readabilityResult) { - isContentParsed = false - } + isSaved = true + isImported = true + return true } + + // for non-pdf content, we need to save the page + const apiResponse = await sendSavePageMutation(userId, { + url, + clientRequestId: articleSavingRequestId, + title, + originalContent: content, + parseResult: readabilityResult, + state, + labels, + rssFeedUrl, + savedAt, + publishedAt, + source, + folder, + }) + if (!apiResponse) { + throw new Error('error while saving page') + } + + if ('error' in apiResponse && apiResponse.error === 'UNAUTHORIZED') { + console.log('user is deleted', userId) + return false + } + + // if the readability result is not parsed, the import is failed + isImported = !!readabilityResult + isSaved = true } catch (e) { console.error('error while saving page', e) - isContentParsed = false - return false + throw e } finally { - // send import status to update the metrics for importer - if (taskId) { - await sendImportStatusUpdate(userId, taskId, isContentParsed) + const isFinalized = isSaved || attemptsMade >= MAX_ATTEMPTS + if (taskId && isFinalized) { + // send import status to update the metrics for importer + await sendImportStatusUpdate(userId, taskId, isImported) } } diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index 30c8156ce..eaef082a2 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -9,8 +9,8 @@ import { appDataSource } from './data_source' import { env } from './env' import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds' import { refreshFeed } from './jobs/rss/refreshFeed' -import { redisDataSource } from './redis_data_source' import { savePageJob } from './jobs/save_page' +import { redisDataSource } from './redis_data_source' import { CustomTypeOrmLogger } from './utils/logger' export const QUEUE_NAME = 'omnivore-backend-queue' @@ -72,7 +72,7 @@ const main = async () => { return await refreshFeed(job.data) } case 'save-page': { - return savePageJob(job.data) + return savePageJob(job.data, job.attemptsMade) } } return true diff --git a/packages/content-fetch/src/job.ts b/packages/content-fetch/src/job.ts index b03ed9469..201911ef7 100644 --- a/packages/content-fetch/src/job.ts +++ b/packages/content-fetch/src/job.ts @@ -35,10 +35,10 @@ const getPriority = (job: savePageJob): number => { const getAttempts = (job: savePageJob): number => { if (job.isRss || job.isImport) { // we don't want to retry rss or import jobs - return 1 + return 0 } - return 2 + return 1 } const getOpts = (job: savePageJob): BulkJobOptions => {