add retry on failure in jobs
This commit is contained in:
@ -10,6 +10,7 @@ const IMPORTER_METRICS_COLLECTOR_URL = env.queue.importerMetricsUrl
|
||||
const JWT_SECRET = env.server.jwtSecret
|
||||
const REST_BACKEND_ENDPOINT = `${env.server.internalApiUrl}/api`
|
||||
|
||||
const MAX_ATTEMPTS = 1
|
||||
const REQUEST_TIMEOUT = 30000 // 30 seconds
|
||||
|
||||
interface Data {
|
||||
@ -290,7 +291,7 @@ const sendSavePageMutation = async (userId: string, input: unknown) => {
|
||||
const sendImportStatusUpdate = async (
|
||||
userId: string,
|
||||
taskId: string,
|
||||
isContentParsed: boolean
|
||||
isImported?: boolean
|
||||
) => {
|
||||
try {
|
||||
const auth = await signToken({ uid: userId }, JWT_SECRET)
|
||||
@ -299,7 +300,7 @@ const sendImportStatusUpdate = async (
|
||||
IMPORTER_METRICS_COLLECTOR_URL,
|
||||
{
|
||||
taskId,
|
||||
status: isContentParsed ? 'imported' : 'failed',
|
||||
status: isImported ? 'imported' : 'failed',
|
||||
},
|
||||
{
|
||||
headers: {
|
||||
@ -330,10 +331,12 @@ const getCachedFetchResult = async (url: string) => {
|
||||
throw new Error('fetch result is not valid')
|
||||
}
|
||||
|
||||
console.log('fetch result is cached', url)
|
||||
|
||||
return fetchResult
|
||||
}
|
||||
|
||||
export const savePageJob = async (data: Data) => {
|
||||
export const savePageJob = async (data: Data, attemptsMade: number) => {
|
||||
const {
|
||||
userId,
|
||||
articleSavingRequestId,
|
||||
@ -346,7 +349,7 @@ export const savePageJob = async (data: Data) => {
|
||||
publishedAt,
|
||||
taskId,
|
||||
} = data
|
||||
let isContentParsed = true
|
||||
let isImported, isSaved
|
||||
|
||||
try {
|
||||
const url = encodeURI(data.url)
|
||||
@ -355,6 +358,7 @@ export const savePageJob = async (data: Data) => {
|
||||
const { title, content, contentType, readabilityResult } =
|
||||
await getCachedFetchResult(url)
|
||||
|
||||
// for pdf content, we need to upload the pdf
|
||||
if (contentType === 'application/pdf') {
|
||||
const uploadFileId = await uploadPdf(url, userId, articleSavingRequestId)
|
||||
const uploadedPdf = await sendCreateArticleMutation(userId, {
|
||||
@ -372,42 +376,47 @@ export const savePageJob = async (data: Data) => {
|
||||
if (!uploadedPdf) {
|
||||
throw new Error('error while saving uploaded pdf')
|
||||
}
|
||||
} else {
|
||||
const apiResponse = await sendSavePageMutation(userId, {
|
||||
url,
|
||||
clientRequestId: articleSavingRequestId,
|
||||
title,
|
||||
originalContent: content,
|
||||
parseResult: readabilityResult,
|
||||
state,
|
||||
labels,
|
||||
rssFeedUrl,
|
||||
savedAt,
|
||||
publishedAt,
|
||||
source,
|
||||
folder,
|
||||
})
|
||||
if (!apiResponse) {
|
||||
throw new Error('error while saving page')
|
||||
}
|
||||
// if ('error' in apiResponse && apiResponse.error === 'UNAUTHORIZED') {
|
||||
// console.log('user is deleted', userId)
|
||||
// return true
|
||||
// }
|
||||
|
||||
// if the readability result is not parsed, the import is failed
|
||||
if (!readabilityResult) {
|
||||
isContentParsed = false
|
||||
}
|
||||
isSaved = true
|
||||
isImported = true
|
||||
return true
|
||||
}
|
||||
|
||||
// for non-pdf content, we need to save the page
|
||||
const apiResponse = await sendSavePageMutation(userId, {
|
||||
url,
|
||||
clientRequestId: articleSavingRequestId,
|
||||
title,
|
||||
originalContent: content,
|
||||
parseResult: readabilityResult,
|
||||
state,
|
||||
labels,
|
||||
rssFeedUrl,
|
||||
savedAt,
|
||||
publishedAt,
|
||||
source,
|
||||
folder,
|
||||
})
|
||||
if (!apiResponse) {
|
||||
throw new Error('error while saving page')
|
||||
}
|
||||
|
||||
if ('error' in apiResponse && apiResponse.error === 'UNAUTHORIZED') {
|
||||
console.log('user is deleted', userId)
|
||||
return false
|
||||
}
|
||||
|
||||
// if the readability result is not parsed, the import is failed
|
||||
isImported = !!readabilityResult
|
||||
isSaved = true
|
||||
} catch (e) {
|
||||
console.error('error while saving page', e)
|
||||
isContentParsed = false
|
||||
return false
|
||||
throw e
|
||||
} finally {
|
||||
// send import status to update the metrics for importer
|
||||
if (taskId) {
|
||||
await sendImportStatusUpdate(userId, taskId, isContentParsed)
|
||||
const isFinalized = isSaved || attemptsMade >= MAX_ATTEMPTS
|
||||
if (taskId && isFinalized) {
|
||||
// send import status to update the metrics for importer
|
||||
await sendImportStatusUpdate(userId, taskId, isImported)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -9,8 +9,8 @@ import { appDataSource } from './data_source'
|
||||
import { env } from './env'
|
||||
import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds'
|
||||
import { refreshFeed } from './jobs/rss/refreshFeed'
|
||||
import { redisDataSource } from './redis_data_source'
|
||||
import { savePageJob } from './jobs/save_page'
|
||||
import { redisDataSource } from './redis_data_source'
|
||||
import { CustomTypeOrmLogger } from './utils/logger'
|
||||
|
||||
export const QUEUE_NAME = 'omnivore-backend-queue'
|
||||
@ -72,7 +72,7 @@ const main = async () => {
|
||||
return await refreshFeed(job.data)
|
||||
}
|
||||
case 'save-page': {
|
||||
return savePageJob(job.data)
|
||||
return savePageJob(job.data, job.attemptsMade)
|
||||
}
|
||||
}
|
||||
return true
|
||||
|
||||
@ -35,10 +35,10 @@ const getPriority = (job: savePageJob): number => {
|
||||
const getAttempts = (job: savePageJob): number => {
|
||||
if (job.isRss || job.isImport) {
|
||||
// we don't want to retry rss or import jobs
|
||||
return 1
|
||||
return 0
|
||||
}
|
||||
|
||||
return 2
|
||||
return 1
|
||||
}
|
||||
|
||||
const getOpts = (job: savePageJob): BulkJobOptions => {
|
||||
|
||||
Reference in New Issue
Block a user