add save page job processor
This commit is contained in:
@ -1,3 +1,4 @@
|
||||
import { Readability } from '@omnivore/readability'
|
||||
import axios from 'axios'
|
||||
import jwt from 'jsonwebtoken'
|
||||
import { promisify } from 'util'
|
||||
@ -15,7 +16,58 @@ if (!IMPORTER_METRICS_COLLECTOR_URL || !JWT_SECRET || !REST_BACKEND_ENDPOINT) {
|
||||
|
||||
const REQUEST_TIMEOUT = 30000 // 30 seconds
|
||||
|
||||
export const uploadToSignedUrl = async (
|
||||
interface Data {
|
||||
userId: string
|
||||
url: string
|
||||
title: string
|
||||
content: string
|
||||
contentType: string
|
||||
readabilityResult?: Readability.ParseResult
|
||||
articleSavingRequestId: string
|
||||
state?: string
|
||||
labels?: string[]
|
||||
source: string
|
||||
folder: string
|
||||
rssFeedUrl?: string
|
||||
savedAt?: string
|
||||
publishedAt?: string
|
||||
taskId?: string
|
||||
}
|
||||
|
||||
interface UploadFileResponse {
|
||||
data: {
|
||||
uploadFileRequest: {
|
||||
id: string
|
||||
uploadSignedUrl: string
|
||||
uploadFileId: string
|
||||
createdPageId: string
|
||||
errorCodes?: string[]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
interface CreateArticleResponse {
|
||||
data: {
|
||||
createArticle: {
|
||||
createdArticle: {
|
||||
id: string
|
||||
}
|
||||
errorCodes: string[]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
interface SavePageResponse {
|
||||
data: {
|
||||
savePage: {
|
||||
url: string
|
||||
clientRequestId: string
|
||||
errorCodes?: string[]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const uploadToSignedUrl = async (
|
||||
uploadSignedUrl: string,
|
||||
contentType: string,
|
||||
contentObjUrl: string
|
||||
@ -39,19 +91,7 @@ export const uploadToSignedUrl = async (
|
||||
}
|
||||
}
|
||||
|
||||
interface UploadFileResponse {
|
||||
data: {
|
||||
uploadFileRequest: {
|
||||
id: string
|
||||
uploadSignedUrl: string
|
||||
uploadFileId: string
|
||||
createdPageId: string
|
||||
errorCodes?: string[]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const getUploadIdAndSignedUrl = async (
|
||||
const getUploadIdAndSignedUrl = async (
|
||||
userId: string,
|
||||
url: string,
|
||||
articleSavingRequestId: string
|
||||
@ -109,18 +149,7 @@ export const getUploadIdAndSignedUrl = async (
|
||||
}
|
||||
}
|
||||
|
||||
interface CreateArticleResponse {
|
||||
data: {
|
||||
createArticle: {
|
||||
createdArticle: {
|
||||
id: string
|
||||
}
|
||||
errorCodes: string[]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const uploadPdf = async (
|
||||
const uploadPdf = async (
|
||||
url: string,
|
||||
userId: string,
|
||||
articleSavingRequestId: string
|
||||
@ -144,10 +173,7 @@ export const uploadPdf = async (
|
||||
return uploadResult.id
|
||||
}
|
||||
|
||||
export const sendCreateArticleMutation = async (
|
||||
userId: string,
|
||||
input: unknown
|
||||
) => {
|
||||
const sendCreateArticleMutation = async (userId: string, input: unknown) => {
|
||||
const data = JSON.stringify({
|
||||
query: `mutation CreateArticle ($input: CreateArticleInput!){
|
||||
createArticle(input:$input){
|
||||
@ -198,17 +224,7 @@ export const sendCreateArticleMutation = async (
|
||||
}
|
||||
}
|
||||
|
||||
interface SavePageResponse {
|
||||
data: {
|
||||
savePage: {
|
||||
url: string
|
||||
clientRequestId: string
|
||||
errorCodes?: string[]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const sendSavePageMutation = async (userId: string, input: unknown) => {
|
||||
const sendSavePageMutation = async (userId: string, input: unknown) => {
|
||||
const data = JSON.stringify({
|
||||
query: `mutation SavePage ($input: SavePageInput!){
|
||||
savePage(input:$input){
|
||||
@ -262,31 +278,10 @@ export const sendSavePageMutation = async (userId: string, input: unknown) => {
|
||||
}
|
||||
}
|
||||
|
||||
export const saveUploadedPdf = async (
|
||||
userId: string,
|
||||
url: string,
|
||||
uploadFileId: string,
|
||||
articleSavingRequestId: string,
|
||||
state: string,
|
||||
labels: string[],
|
||||
source: string,
|
||||
folder: string
|
||||
) => {
|
||||
return sendCreateArticleMutation(userId, {
|
||||
url: encodeURI(url),
|
||||
articleSavingRequestId,
|
||||
uploadFileId: uploadFileId,
|
||||
state,
|
||||
labels,
|
||||
source,
|
||||
folder,
|
||||
})
|
||||
}
|
||||
|
||||
export const sendImportStatusUpdate = async (
|
||||
const sendImportStatusUpdate = async (
|
||||
userId: string,
|
||||
taskId: string,
|
||||
status: string
|
||||
isContentParsed: boolean
|
||||
) => {
|
||||
try {
|
||||
const auth = await signToken({ uid: userId }, JWT_SECRET)
|
||||
@ -295,7 +290,7 @@ export const sendImportStatusUpdate = async (
|
||||
IMPORTER_METRICS_COLLECTOR_URL,
|
||||
{
|
||||
taskId,
|
||||
status,
|
||||
status: isContentParsed ? 'imported' : 'failed',
|
||||
},
|
||||
{
|
||||
headers: {
|
||||
@ -309,3 +304,83 @@ export const sendImportStatusUpdate = async (
|
||||
console.error('error while sending import status update', e)
|
||||
}
|
||||
}
|
||||
|
||||
export const savePageJob = async (data: Data) => {
|
||||
const {
|
||||
userId,
|
||||
title,
|
||||
content,
|
||||
readabilityResult,
|
||||
articleSavingRequestId,
|
||||
state,
|
||||
labels,
|
||||
source,
|
||||
folder,
|
||||
rssFeedUrl,
|
||||
savedAt,
|
||||
publishedAt,
|
||||
taskId,
|
||||
} = data
|
||||
let isContentParsed = true
|
||||
|
||||
try {
|
||||
const url = encodeURI(data.url)
|
||||
|
||||
if (data.contentType === 'application/pdf') {
|
||||
const uploadFileId = await uploadPdf(url, userId, articleSavingRequestId)
|
||||
const uploadedPdf = await sendCreateArticleMutation(userId, {
|
||||
url,
|
||||
articleSavingRequestId,
|
||||
uploadFileId,
|
||||
state,
|
||||
labels,
|
||||
source,
|
||||
folder,
|
||||
rssFeedUrl,
|
||||
savedAt,
|
||||
publishedAt,
|
||||
})
|
||||
if (!uploadedPdf) {
|
||||
throw new Error('error while saving uploaded pdf')
|
||||
}
|
||||
} else {
|
||||
const apiResponse = await sendSavePageMutation(userId, {
|
||||
url,
|
||||
clientRequestId: articleSavingRequestId,
|
||||
title,
|
||||
originalContent: content,
|
||||
parseResult: readabilityResult,
|
||||
state,
|
||||
labels,
|
||||
rssFeedUrl,
|
||||
savedAt,
|
||||
publishedAt,
|
||||
source,
|
||||
folder,
|
||||
})
|
||||
if (!apiResponse) {
|
||||
throw new Error('error while saving page')
|
||||
}
|
||||
// if ('error' in apiResponse && apiResponse.error === 'UNAUTHORIZED') {
|
||||
// console.log('user is deleted', userId)
|
||||
// return true
|
||||
// }
|
||||
|
||||
// if the readability result is not parsed, the import is failed
|
||||
if (!readabilityResult) {
|
||||
isContentParsed = false
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('error while saving page', e)
|
||||
isContentParsed = false
|
||||
return false
|
||||
} finally {
|
||||
// send import status to update the metrics for importer
|
||||
if (taskId) {
|
||||
await sendImportStatusUpdate(userId, taskId, isContentParsed)
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
@ -2,16 +2,16 @@
|
||||
/* eslint-disable @typescript-eslint/restrict-template-expressions */
|
||||
/* eslint-disable @typescript-eslint/require-await */
|
||||
/* eslint-disable @typescript-eslint/no-misused-promises */
|
||||
import { Job, QueueEvents, Worker } from 'bullmq'
|
||||
import express, { Express } from 'express'
|
||||
import { appDataSource } from './data_source'
|
||||
import { getEnv } from './util'
|
||||
import { redisDataSource } from './redis_data_source'
|
||||
import { CustomTypeOrmLogger } from './utils/logger'
|
||||
import { SnakeNamingStrategy } from 'typeorm-naming-strategies'
|
||||
import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds'
|
||||
import { Job, Worker, QueueEvents } from 'bullmq'
|
||||
import { refreshFeed } from './jobs/rss/refreshFeed'
|
||||
import { appDataSource } from './data_source'
|
||||
import { env } from './env'
|
||||
import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds'
|
||||
import { refreshFeed } from './jobs/rss/refreshFeed'
|
||||
import { redisDataSource } from './redis_data_source'
|
||||
import { savePageJob } from './jobs/save_page'
|
||||
import { CustomTypeOrmLogger } from './utils/logger'
|
||||
|
||||
export const QUEUE_NAME = 'omnivore-backend-queue'
|
||||
|
||||
@ -71,6 +71,9 @@ const main = async () => {
|
||||
case 'refresh-feed': {
|
||||
return await refreshFeed(job.data)
|
||||
}
|
||||
case 'save-page': {
|
||||
return savePageJob(job.data)
|
||||
}
|
||||
}
|
||||
return true
|
||||
},
|
||||
|
||||
@ -7,16 +7,13 @@
|
||||
"build/src"
|
||||
],
|
||||
"dependencies": {
|
||||
"axios": "^0.27.2",
|
||||
"bullmq": "^5.1.1",
|
||||
"dotenv": "^8.2.0",
|
||||
"express": "^4.17.1",
|
||||
"ioredis": "^5.3.2",
|
||||
"jsonwebtoken": "^8.5.1",
|
||||
"@google-cloud/functions-framework": "^3.0.0",
|
||||
"@omnivore/puppeteer-parse": "^1.0.0",
|
||||
"@sentry/serverless": "^7.77.0",
|
||||
"winston": "^3.3.3"
|
||||
"@sentry/serverless": "^7.77.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"chai": "^4.3.6",
|
||||
|
||||
@ -26,7 +26,6 @@ interface RequestBody {
|
||||
|
||||
interface LogRecord {
|
||||
url: string
|
||||
userId?: string
|
||||
articleSavingRequestId: string
|
||||
labels: {
|
||||
source: string
|
||||
@ -79,7 +78,6 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
|
||||
|
||||
const logRecord: LogRecord = {
|
||||
url,
|
||||
userId,
|
||||
articleSavingRequestId,
|
||||
labels: {
|
||||
source,
|
||||
@ -98,9 +96,6 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
|
||||
|
||||
console.log(`Article parsing request`, logRecord)
|
||||
|
||||
// let importStatus,
|
||||
// statusCode = 200
|
||||
|
||||
try {
|
||||
const fetchResult = await fetchContent(url, locale, timezone)
|
||||
const finalUrl = fetchResult.finalUrl
|
||||
@ -126,6 +121,7 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
|
||||
rssFeedUrl,
|
||||
savedAt,
|
||||
publishedAt,
|
||||
taskId,
|
||||
},
|
||||
isRss: !!rssFeedUrl,
|
||||
isImport: !!taskId,
|
||||
@ -137,59 +133,6 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
|
||||
logRecord.error = 'error while queueing save page job'
|
||||
return res.sendStatus(500)
|
||||
}
|
||||
|
||||
// if (fetchResult.contentType === 'application/pdf') {
|
||||
// const uploadFileId = await uploadPdf(
|
||||
// finalUrl,
|
||||
// userId,
|
||||
// articleSavingRequestId
|
||||
// )
|
||||
// const uploadedPdf = await sendCreateArticleMutation(userId, {
|
||||
// url: encodeURI(finalUrl),
|
||||
// articleSavingRequestId,
|
||||
// uploadFileId,
|
||||
// state,
|
||||
// labels,
|
||||
// source,
|
||||
// folder,
|
||||
// rssFeedUrl,
|
||||
// savedAt,
|
||||
// publishedAt,
|
||||
// })
|
||||
// if (!uploadedPdf) {
|
||||
// statusCode = 500
|
||||
// logRecord.error = 'error while saving uploaded pdf'
|
||||
// } else {
|
||||
// importStatus = 'imported'
|
||||
// }
|
||||
// } else {
|
||||
// const apiResponse = await sendSavePageMutation(userId, {
|
||||
// url,
|
||||
// clientRequestId: articleSavingRequestId,
|
||||
// title,
|
||||
// originalContent: content,
|
||||
// parseResult: readabilityResult,
|
||||
// state,
|
||||
// labels,
|
||||
// rssFeedUrl,
|
||||
// savedAt,
|
||||
// publishedAt,
|
||||
// source,
|
||||
// folder,
|
||||
// })
|
||||
// if (!apiResponse) {
|
||||
// logRecord.error = 'error while saving page'
|
||||
// statusCode = 500
|
||||
// } else if (
|
||||
// 'error' in apiResponse &&
|
||||
// apiResponse.error === 'UNAUTHORIZED'
|
||||
// ) {
|
||||
// console.log('user is deleted, do not retry', logRecord)
|
||||
// return res.sendStatus(200)
|
||||
// } else {
|
||||
// importStatus = readabilityResult ? 'imported' : 'failed'
|
||||
// }
|
||||
// }
|
||||
} catch (error) {
|
||||
if (error instanceof Error) {
|
||||
logRecord.error = error.message
|
||||
@ -201,18 +144,6 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
|
||||
} finally {
|
||||
logRecord.totalTime = Date.now() - functionStartTime
|
||||
console.log(`parse-page result`, logRecord)
|
||||
|
||||
// // mark import failed on the last failed retry
|
||||
// const retryCount = req.headers['x-cloudtasks-taskretrycount']
|
||||
// if (retryCount === MAX_RETRY_COUNT) {
|
||||
// console.log('max retry count reached')
|
||||
// importStatus = importStatus || 'failed'
|
||||
// }
|
||||
// // send import status to update the metrics
|
||||
// if (taskId && importStatus) {
|
||||
// await sendImportStatusUpdate(userId, taskId, importStatus)
|
||||
// }
|
||||
// res.sendStatus(statusCode)
|
||||
}
|
||||
|
||||
res.sendStatus(200)
|
||||
|
||||
Reference in New Issue
Block a user