From 0fbc6d0a87fe5e73182c39c06da77e5b80656508 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Wed, 21 Aug 2024 12:21:49 +0800 Subject: [PATCH] update importer to use content-fetch queue --- packages/content-fetch/src/request_handler.ts | 2 +- packages/import-handler/src/index.ts | 18 +++---- packages/import-handler/src/job.ts | 44 ++++++++++++++++- packages/import-handler/src/task.ts | 48 ------------------- 4 files changed, 52 insertions(+), 60 deletions(-) delete mode 100644 packages/import-handler/src/task.ts diff --git a/packages/content-fetch/src/request_handler.ts b/packages/content-fetch/src/request_handler.ts index a698ac9e6..a494d8d45 100644 --- a/packages/content-fetch/src/request_handler.ts +++ b/packages/content-fetch/src/request_handler.ts @@ -214,7 +214,7 @@ export const processFetchContentJob = async ( }, state, labelsToAdd: labels, - taskId: taskId, + taskId, locale, timezone, rssFeedUrl, diff --git a/packages/import-handler/src/index.ts b/packages/import-handler/src/index.ts index 672310954..a9cf4b730 100644 --- a/packages/import-handler/src/index.ts +++ b/packages/import-handler/src/index.ts @@ -10,10 +10,9 @@ import * as path from 'path' import { promisify } from 'util' import { v4 as uuid } from 'uuid' import { importCsv } from './csv' -import { queueEmailJob } from './job' +import { enqueueFetchContentJob, queueEmailJob } from './job' import { importMatterArchive } from './matterHistory' import { ImportStatus, updateMetrics } from './metrics' -import { CONTENT_FETCH_URL, createCloudTask } from './task' export enum ArticleSavingRequestStatus { Failed = 'FAILED', @@ -94,6 +93,7 @@ const shouldHandle = (data: StorageEvent) => { } const importURL = async ( + redisDataSource: RedisDataSource, userId: string, url: URL, source: string, @@ -103,18 +103,17 @@ const importURL = async ( savedAt?: Date, publishedAt?: Date ): Promise => { - return createCloudTask(CONTENT_FETCH_URL, { - userId, - source, + return enqueueFetchContentJob(redisDataSource, { url: url.toString(), - saveRequestId: '', + users: [{ id: userId, libraryItemId: '' }], + source, + taskId, state, labels: labels?.map((l) => { return { name: l } }), - taskId, - savedAt, - publishedAt, + savedAt: savedAt?.toISOString(), + publishedAt: publishedAt?.toISOString(), }) } @@ -194,6 +193,7 @@ const urlHandler = async ( try { // Imports are stored in the format imports//-.csv const result = await importURL( + ctx.redisDataSource, ctx.userId, url, ctx.source, diff --git a/packages/import-handler/src/job.ts b/packages/import-handler/src/job.ts index 0f040b8f3..a47fe1dce 100644 --- a/packages/import-handler/src/job.ts +++ b/packages/import-handler/src/job.ts @@ -1,8 +1,12 @@ import { RedisDataSource } from '@omnivore/utils' import { Queue } from 'bullmq' +import { ArticleSavingRequestStatus } from '.' + +const BACKEND_QUEUE = 'omnivore-backend-queue' +const CONTENT_FETCH_QUEUE = 'omnivore-content-fetch-queue' -const QUEUE_NAME = 'omnivore-backend-queue' export const SEND_EMAIL_JOB = 'send-email' +const FETCH_CONTENT_JOB = 'fetch-content' interface SendEmailJobData { userId: string @@ -11,13 +15,49 @@ interface SendEmailJobData { html?: string } +interface FetchContentJobData { + url: string + users: Array<{ + id: string + folder?: string + libraryItemId: string + }> + source: string + taskId: string + state?: ArticleSavingRequestStatus + labels?: Array<{ name: string }> + savedAt?: string + publishedAt?: string +} + export const queueEmailJob = async ( redisDataSource: RedisDataSource, data: SendEmailJobData ) => { - const queue = new Queue(QUEUE_NAME, { + const queue = new Queue(BACKEND_QUEUE, { connection: redisDataSource.queueRedisClient, }) await queue.add(SEND_EMAIL_JOB, data) } + +export const enqueueFetchContentJob = async ( + redisDataSource: RedisDataSource, + data: FetchContentJobData +): Promise => { + const queue = new Queue(CONTENT_FETCH_QUEUE, { + connection: redisDataSource.queueRedisClient, + }) + + const job = await queue.add(FETCH_CONTENT_JOB, data, { + priority: 100, + attempts: 1, + }) + + if (!job || !job.id) { + console.error('Error while enqueuing fetch-content job', data) + throw new Error('Error while enqueuing fetch-content job') + } + + return job.id +} diff --git a/packages/import-handler/src/task.ts b/packages/import-handler/src/task.ts deleted file mode 100644 index bccca7ecb..000000000 --- a/packages/import-handler/src/task.ts +++ /dev/null @@ -1,48 +0,0 @@ -/* eslint-disable @typescript-eslint/restrict-template-expressions */ -import { CloudTasksClient, protos } from '@google-cloud/tasks' - -const cloudTask = new CloudTasksClient() - -export const CONTENT_FETCH_URL = process.env.CONTENT_FETCH_GCF_URL - -export const createCloudTask = async ( - taskHandlerUrl: string | undefined, - payload: unknown, - requestHeaders?: Record, - queue = 'omnivore-import-queue' -) => { - const location = process.env.GCP_LOCATION - const project = process.env.GCP_PROJECT_ID - - if (!project || !location || !queue || !taskHandlerUrl) { - throw `Environment not configured: ${project}, ${location}, ${queue}, ${taskHandlerUrl}` - } - - const serviceAccountEmail = `${project}@appspot.gserviceaccount.com` - - const parent = cloudTask.queuePath(project, location, queue) - const convertedPayload = JSON.stringify(payload) - const body = Buffer.from(convertedPayload).toString('base64') - const task: protos.google.cloud.tasks.v2.ITask = { - httpRequest: { - httpMethod: 'POST', - url: taskHandlerUrl, - headers: { - 'Content-Type': 'application/json', - ...requestHeaders, - }, - body, - ...(serviceAccountEmail - ? { - oidcToken: { - serviceAccountEmail, - }, - } - : null), - }, - } - - return cloudTask.createTask({ parent, task }).then((result) => { - return result[0].name ?? undefined - }) -}