From a6f0e2f2d9c22c7c5f180b8044334c67c3fd0b68 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Thu, 18 Jan 2024 17:57:04 +0800 Subject: [PATCH] add save page job processor --- .../src/api.ts => api/src/jobs/save_page.ts} | 205 ++++++++++++------ packages/api/src/queue-processor.ts | 17 +- packages/content-fetch/package.json | 5 +- packages/content-fetch/src/request_handler.ts | 71 +----- 4 files changed, 152 insertions(+), 146 deletions(-) rename packages/{content-fetch/src/api.ts => api/src/jobs/save_page.ts} (71%) diff --git a/packages/content-fetch/src/api.ts b/packages/api/src/jobs/save_page.ts similarity index 71% rename from packages/content-fetch/src/api.ts rename to packages/api/src/jobs/save_page.ts index 4066cd9b3..e3a238636 100644 --- a/packages/content-fetch/src/api.ts +++ b/packages/api/src/jobs/save_page.ts @@ -1,3 +1,4 @@ +import { Readability } from '@omnivore/readability' import axios from 'axios' import jwt from 'jsonwebtoken' import { promisify } from 'util' @@ -15,7 +16,58 @@ if (!IMPORTER_METRICS_COLLECTOR_URL || !JWT_SECRET || !REST_BACKEND_ENDPOINT) { const REQUEST_TIMEOUT = 30000 // 30 seconds -export const uploadToSignedUrl = async ( +interface Data { + userId: string + url: string + title: string + content: string + contentType: string + readabilityResult?: Readability.ParseResult + articleSavingRequestId: string + state?: string + labels?: string[] + source: string + folder: string + rssFeedUrl?: string + savedAt?: string + publishedAt?: string + taskId?: string +} + +interface UploadFileResponse { + data: { + uploadFileRequest: { + id: string + uploadSignedUrl: string + uploadFileId: string + createdPageId: string + errorCodes?: string[] + } + } +} + +interface CreateArticleResponse { + data: { + createArticle: { + createdArticle: { + id: string + } + errorCodes: string[] + } + } +} + +interface SavePageResponse { + data: { + savePage: { + url: string + clientRequestId: string + errorCodes?: string[] + } + } +} + +const uploadToSignedUrl = async ( uploadSignedUrl: string, contentType: string, contentObjUrl: string @@ -39,19 +91,7 @@ export const uploadToSignedUrl = async ( } } -interface UploadFileResponse { - data: { - uploadFileRequest: { - id: string - uploadSignedUrl: string - uploadFileId: string - createdPageId: string - errorCodes?: string[] - } - } -} - -export const getUploadIdAndSignedUrl = async ( +const getUploadIdAndSignedUrl = async ( userId: string, url: string, articleSavingRequestId: string @@ -109,18 +149,7 @@ export const getUploadIdAndSignedUrl = async ( } } -interface CreateArticleResponse { - data: { - createArticle: { - createdArticle: { - id: string - } - errorCodes: string[] - } - } -} - -export const uploadPdf = async ( +const uploadPdf = async ( url: string, userId: string, articleSavingRequestId: string @@ -144,10 +173,7 @@ export const uploadPdf = async ( return uploadResult.id } -export const sendCreateArticleMutation = async ( - userId: string, - input: unknown -) => { +const sendCreateArticleMutation = async (userId: string, input: unknown) => { const data = JSON.stringify({ query: `mutation CreateArticle ($input: CreateArticleInput!){ createArticle(input:$input){ @@ -198,17 +224,7 @@ export const sendCreateArticleMutation = async ( } } -interface SavePageResponse { - data: { - savePage: { - url: string - clientRequestId: string - errorCodes?: string[] - } - } -} - -export const sendSavePageMutation = async (userId: string, input: unknown) => { +const sendSavePageMutation = async (userId: string, input: unknown) => { const data = JSON.stringify({ query: `mutation SavePage ($input: SavePageInput!){ savePage(input:$input){ @@ -262,31 +278,10 @@ export const sendSavePageMutation = async (userId: string, input: unknown) => { } } -export const saveUploadedPdf = async ( - userId: string, - url: string, - uploadFileId: string, - articleSavingRequestId: string, - state: string, - labels: string[], - source: string, - folder: string -) => { - return sendCreateArticleMutation(userId, { - url: encodeURI(url), - articleSavingRequestId, - uploadFileId: uploadFileId, - state, - labels, - source, - folder, - }) -} - -export const sendImportStatusUpdate = async ( +const sendImportStatusUpdate = async ( userId: string, taskId: string, - status: string + isContentParsed: boolean ) => { try { const auth = await signToken({ uid: userId }, JWT_SECRET) @@ -295,7 +290,7 @@ export const sendImportStatusUpdate = async ( IMPORTER_METRICS_COLLECTOR_URL, { taskId, - status, + status: isContentParsed ? 'imported' : 'failed', }, { headers: { @@ -309,3 +304,83 @@ export const sendImportStatusUpdate = async ( console.error('error while sending import status update', e) } } + +export const savePageJob = async (data: Data) => { + const { + userId, + title, + content, + readabilityResult, + articleSavingRequestId, + state, + labels, + source, + folder, + rssFeedUrl, + savedAt, + publishedAt, + taskId, + } = data + let isContentParsed = true + + try { + const url = encodeURI(data.url) + + if (data.contentType === 'application/pdf') { + const uploadFileId = await uploadPdf(url, userId, articleSavingRequestId) + const uploadedPdf = await sendCreateArticleMutation(userId, { + url, + articleSavingRequestId, + uploadFileId, + state, + labels, + source, + folder, + rssFeedUrl, + savedAt, + publishedAt, + }) + if (!uploadedPdf) { + throw new Error('error while saving uploaded pdf') + } + } else { + const apiResponse = await sendSavePageMutation(userId, { + url, + clientRequestId: articleSavingRequestId, + title, + originalContent: content, + parseResult: readabilityResult, + state, + labels, + rssFeedUrl, + savedAt, + publishedAt, + source, + folder, + }) + if (!apiResponse) { + throw new Error('error while saving page') + } + // if ('error' in apiResponse && apiResponse.error === 'UNAUTHORIZED') { + // console.log('user is deleted', userId) + // return true + // } + + // if the readability result is not parsed, the import is failed + if (!readabilityResult) { + isContentParsed = false + } + } + } catch (e) { + console.error('error while saving page', e) + isContentParsed = false + return false + } finally { + // send import status to update the metrics for importer + if (taskId) { + await sendImportStatusUpdate(userId, taskId, isContentParsed) + } + } + + return true +} diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index 7fcc608fb..30c8156ce 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -2,16 +2,16 @@ /* eslint-disable @typescript-eslint/restrict-template-expressions */ /* eslint-disable @typescript-eslint/require-await */ /* eslint-disable @typescript-eslint/no-misused-promises */ +import { Job, QueueEvents, Worker } from 'bullmq' import express, { Express } from 'express' -import { appDataSource } from './data_source' -import { getEnv } from './util' -import { redisDataSource } from './redis_data_source' -import { CustomTypeOrmLogger } from './utils/logger' import { SnakeNamingStrategy } from 'typeorm-naming-strategies' -import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds' -import { Job, Worker, QueueEvents } from 'bullmq' -import { refreshFeed } from './jobs/rss/refreshFeed' +import { appDataSource } from './data_source' import { env } from './env' +import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds' +import { refreshFeed } from './jobs/rss/refreshFeed' +import { redisDataSource } from './redis_data_source' +import { savePageJob } from './jobs/save_page' +import { CustomTypeOrmLogger } from './utils/logger' export const QUEUE_NAME = 'omnivore-backend-queue' @@ -71,6 +71,9 @@ const main = async () => { case 'refresh-feed': { return await refreshFeed(job.data) } + case 'save-page': { + return savePageJob(job.data) + } } return true }, diff --git a/packages/content-fetch/package.json b/packages/content-fetch/package.json index 1d4af0339..841f56f02 100644 --- a/packages/content-fetch/package.json +++ b/packages/content-fetch/package.json @@ -7,16 +7,13 @@ "build/src" ], "dependencies": { - "axios": "^0.27.2", "bullmq": "^5.1.1", "dotenv": "^8.2.0", "express": "^4.17.1", "ioredis": "^5.3.2", - "jsonwebtoken": "^8.5.1", "@google-cloud/functions-framework": "^3.0.0", "@omnivore/puppeteer-parse": "^1.0.0", - "@sentry/serverless": "^7.77.0", - "winston": "^3.3.3" + "@sentry/serverless": "^7.77.0" }, "devDependencies": { "chai": "^4.3.6", diff --git a/packages/content-fetch/src/request_handler.ts b/packages/content-fetch/src/request_handler.ts index 748d4366b..9319722f5 100644 --- a/packages/content-fetch/src/request_handler.ts +++ b/packages/content-fetch/src/request_handler.ts @@ -26,7 +26,6 @@ interface RequestBody { interface LogRecord { url: string - userId?: string articleSavingRequestId: string labels: { source: string @@ -79,7 +78,6 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { const logRecord: LogRecord = { url, - userId, articleSavingRequestId, labels: { source, @@ -98,9 +96,6 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { console.log(`Article parsing request`, logRecord) - // let importStatus, - // statusCode = 200 - try { const fetchResult = await fetchContent(url, locale, timezone) const finalUrl = fetchResult.finalUrl @@ -126,6 +121,7 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { rssFeedUrl, savedAt, publishedAt, + taskId, }, isRss: !!rssFeedUrl, isImport: !!taskId, @@ -137,59 +133,6 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { logRecord.error = 'error while queueing save page job' return res.sendStatus(500) } - - // if (fetchResult.contentType === 'application/pdf') { - // const uploadFileId = await uploadPdf( - // finalUrl, - // userId, - // articleSavingRequestId - // ) - // const uploadedPdf = await sendCreateArticleMutation(userId, { - // url: encodeURI(finalUrl), - // articleSavingRequestId, - // uploadFileId, - // state, - // labels, - // source, - // folder, - // rssFeedUrl, - // savedAt, - // publishedAt, - // }) - // if (!uploadedPdf) { - // statusCode = 500 - // logRecord.error = 'error while saving uploaded pdf' - // } else { - // importStatus = 'imported' - // } - // } else { - // const apiResponse = await sendSavePageMutation(userId, { - // url, - // clientRequestId: articleSavingRequestId, - // title, - // originalContent: content, - // parseResult: readabilityResult, - // state, - // labels, - // rssFeedUrl, - // savedAt, - // publishedAt, - // source, - // folder, - // }) - // if (!apiResponse) { - // logRecord.error = 'error while saving page' - // statusCode = 500 - // } else if ( - // 'error' in apiResponse && - // apiResponse.error === 'UNAUTHORIZED' - // ) { - // console.log('user is deleted, do not retry', logRecord) - // return res.sendStatus(200) - // } else { - // importStatus = readabilityResult ? 'imported' : 'failed' - // } - // } } catch (error) { if (error instanceof Error) { logRecord.error = error.message @@ -201,18 +144,6 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { } finally { logRecord.totalTime = Date.now() - functionStartTime console.log(`parse-page result`, logRecord) - - // // mark import failed on the last failed retry - // const retryCount = req.headers['x-cloudtasks-taskretrycount'] - // if (retryCount === MAX_RETRY_COUNT) { - // console.log('max retry count reached') - // importStatus = importStatus || 'failed' - // } - // // send import status to update the metrics - // if (taskId && importStatus) { - // await sendImportStatusUpdate(userId, taskId, importStatus) - // } - // res.sendStatus(statusCode) } res.sendStatus(200)