From 6de285432d156c148fd3e7476c0764ca7682914c Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Wed, 21 Aug 2024 16:53:50 +0800 Subject: [PATCH] fix importer status not updated if failed to fetch content --- packages/api/src/jobs/save_page.ts | 4 +- packages/content-fetch/package.json | 3 ++ packages/content-fetch/src/request_handler.ts | 54 ++++++++++++++++++- packages/content-fetch/src/worker.ts | 2 +- 4 files changed, 59 insertions(+), 4 deletions(-) diff --git a/packages/api/src/jobs/save_page.ts b/packages/api/src/jobs/save_page.ts index 850b36d8b..a721f4643 100644 --- a/packages/api/src/jobs/save_page.ts +++ b/packages/api/src/jobs/save_page.ts @@ -140,7 +140,7 @@ const sendImportStatusUpdate = async ( Authorization: auth as string, 'Content-Type': 'application/json', }, - timeout: REQUEST_TIMEOUT, + timeout: 5000, } ) } catch (e) { @@ -288,7 +288,7 @@ export const savePageJob = async (data: Data, attemptsMade: number) => { throw e } finally { - const lastAttempt = attemptsMade + 1 === MAX_IMPORT_ATTEMPTS + const lastAttempt = attemptsMade + 1 >= MAX_IMPORT_ATTEMPTS if (taskId && (isSaved || lastAttempt)) { logger.info('sending import status update') diff --git a/packages/content-fetch/package.json b/packages/content-fetch/package.json index f7fd84160..743a3bca0 100644 --- a/packages/content-fetch/package.json +++ b/packages/content-fetch/package.json @@ -10,14 +10,17 @@ "@google-cloud/storage": "^7.0.1", "@omnivore/puppeteer-parse": "^1.0.0", "@omnivore/utils": "1.0.0", + "axios": "^0.27.2", "bullmq": "^5.1.1", "dotenv": "^8.2.0", "express": "^4.17.1", "express-async-handler": "^1.2.0", + "jsonwebtoken": "^8.5.1", "posthog-node": "^3.6.3" }, "devDependencies": { "@types/express": "^4.17.1", + "@types/jsonwebtoken": "^8.5.0", "chai": "^4.3.6", "mocha": "^10.0.0" }, diff --git a/packages/content-fetch/src/request_handler.ts b/packages/content-fetch/src/request_handler.ts index a494d8d45..78310bf56 100644 --- a/packages/content-fetch/src/request_handler.ts +++ b/packages/content-fetch/src/request_handler.ts @@ -1,7 +1,10 @@ import { Storage } from '@google-cloud/storage' import { fetchContent } from '@omnivore/puppeteer-parse' import { RedisDataSource } from '@omnivore/utils' +import axios from 'axios' import 'dotenv/config' +import jwt from 'jsonwebtoken' +import { promisify } from 'util' import { analytics } from './analytics' import { queueSavePageJob } from './job' @@ -66,6 +69,14 @@ const NO_CACHE_URLS = [ 'https://deviceandbrowserinfo.com/info_device', ] +const signToken = promisify(jwt.sign) + +const IMPORTER_METRICS_COLLECTOR_URL = + process.env.IMPORTER_METRICS_COLLECTOR_URL +const JWT_SECRET = process.env.JWT_SECRET + +const MAX_IMPORT_ATTEMPTS = 1 + const uploadToBucket = async (filePath: string, data: string) => { await storage .bucket(bucketName) @@ -174,9 +185,43 @@ const incrementContentFetchFailure = async ( } } +const sendImportStatusUpdate = async ( + userId: string, + taskId: string, + isImported?: boolean +) => { + try { + if (!JWT_SECRET || !IMPORTER_METRICS_COLLECTOR_URL) { + console.error('JWT_SECRET or IMPORTER_METRICS_COLLECTOR_URL is not set') + return + } + + console.log('sending import status update') + const auth = await signToken({ uid: userId }, JWT_SECRET) + + await axios.post( + IMPORTER_METRICS_COLLECTOR_URL, + { + taskId, + status: isImported ? 'imported' : 'failed', + }, + { + headers: { + Authorization: auth as string, + 'Content-Type': 'application/json', + }, + timeout: 5000, + } + ) + } catch (e) { + console.error('Failed to send import status update', e) + } +} + export const processFetchContentJob = async ( redisDataSource: RedisDataSource, - data: JobData + data: JobData, + attemptsMade: number ) => { const functionStartTime = Date.now() @@ -318,5 +363,12 @@ export const processFetchContentJob = async ( }, } ) + + const lastAttempt = attemptsMade + 1 >= MAX_IMPORT_ATTEMPTS + if (logRecord.error && taskId && lastAttempt) { + console.log('sending import status update') + // send failed to import status to update the metrics for importer + await sendImportStatusUpdate(users[0].id, taskId, false) + } } } diff --git a/packages/content-fetch/src/worker.ts b/packages/content-fetch/src/worker.ts index 33cb44668..99672497e 100644 --- a/packages/content-fetch/src/worker.ts +++ b/packages/content-fetch/src/worker.ts @@ -35,7 +35,7 @@ export const createWorker = ( queueName, async (job: Job) => { // process the job - await processFetchContentJob(redisDataSource, job.data) + await processFetchContentJob(redisDataSource, job.data, job.attemptsMade) }, { connection: redisDataSource.queueRedisClient,