fix importer status not updated if failed to fetch content

This commit is contained in:
Hongbo Wu
2024-08-21 16:53:50 +08:00
parent 2ada017427
commit 6de285432d
4 changed files with 59 additions and 4 deletions

View File

@ -140,7 +140,7 @@ const sendImportStatusUpdate = async (
Authorization: auth as string,
'Content-Type': 'application/json',
},
timeout: REQUEST_TIMEOUT,
timeout: 5000,
}
)
} catch (e) {
@ -288,7 +288,7 @@ export const savePageJob = async (data: Data, attemptsMade: number) => {
throw e
} finally {
const lastAttempt = attemptsMade + 1 === MAX_IMPORT_ATTEMPTS
const lastAttempt = attemptsMade + 1 >= MAX_IMPORT_ATTEMPTS
if (taskId && (isSaved || lastAttempt)) {
logger.info('sending import status update')

View File

@ -10,14 +10,17 @@
"@google-cloud/storage": "^7.0.1",
"@omnivore/puppeteer-parse": "^1.0.0",
"@omnivore/utils": "1.0.0",
"axios": "^0.27.2",
"bullmq": "^5.1.1",
"dotenv": "^8.2.0",
"express": "^4.17.1",
"express-async-handler": "^1.2.0",
"jsonwebtoken": "^8.5.1",
"posthog-node": "^3.6.3"
},
"devDependencies": {
"@types/express": "^4.17.1",
"@types/jsonwebtoken": "^8.5.0",
"chai": "^4.3.6",
"mocha": "^10.0.0"
},

View File

@ -1,7 +1,10 @@
import { Storage } from '@google-cloud/storage'
import { fetchContent } from '@omnivore/puppeteer-parse'
import { RedisDataSource } from '@omnivore/utils'
import axios from 'axios'
import 'dotenv/config'
import jwt from 'jsonwebtoken'
import { promisify } from 'util'
import { analytics } from './analytics'
import { queueSavePageJob } from './job'
@ -66,6 +69,14 @@ const NO_CACHE_URLS = [
'https://deviceandbrowserinfo.com/info_device',
]
const signToken = promisify(jwt.sign)
const IMPORTER_METRICS_COLLECTOR_URL =
process.env.IMPORTER_METRICS_COLLECTOR_URL
const JWT_SECRET = process.env.JWT_SECRET
const MAX_IMPORT_ATTEMPTS = 1
const uploadToBucket = async (filePath: string, data: string) => {
await storage
.bucket(bucketName)
@ -174,9 +185,43 @@ const incrementContentFetchFailure = async (
}
}
const sendImportStatusUpdate = async (
userId: string,
taskId: string,
isImported?: boolean
) => {
try {
if (!JWT_SECRET || !IMPORTER_METRICS_COLLECTOR_URL) {
console.error('JWT_SECRET or IMPORTER_METRICS_COLLECTOR_URL is not set')
return
}
console.log('sending import status update')
const auth = await signToken({ uid: userId }, JWT_SECRET)
await axios.post(
IMPORTER_METRICS_COLLECTOR_URL,
{
taskId,
status: isImported ? 'imported' : 'failed',
},
{
headers: {
Authorization: auth as string,
'Content-Type': 'application/json',
},
timeout: 5000,
}
)
} catch (e) {
console.error('Failed to send import status update', e)
}
}
export const processFetchContentJob = async (
redisDataSource: RedisDataSource,
data: JobData
data: JobData,
attemptsMade: number
) => {
const functionStartTime = Date.now()
@ -318,5 +363,12 @@ export const processFetchContentJob = async (
},
}
)
const lastAttempt = attemptsMade + 1 >= MAX_IMPORT_ATTEMPTS
if (logRecord.error && taskId && lastAttempt) {
console.log('sending import status update')
// send failed to import status to update the metrics for importer
await sendImportStatusUpdate(users[0].id, taskId, false)
}
}
}

View File

@ -35,7 +35,7 @@ export const createWorker = (
queueName,
async (job: Job<JobData>) => {
// process the job
await processFetchContentJob(redisDataSource, job.data)
await processFetchContentJob(redisDataSource, job.data, job.attemptsMade)
},
{
connection: redisDataSource.queueRedisClient,