diff --git a/.github/workflows/run-tests.yaml b/.github/workflows/run-tests.yaml index 572408202..de40ff68e 100644 --- a/.github/workflows/run-tests.yaml +++ b/.github/workflows/run-tests.yaml @@ -111,7 +111,5 @@ jobs: run: 'docker build --file packages/content-fetch/Dockerfile .' - name: Build the inbound-email-handler docker image run: 'docker build --file packages/inbound-email-handler/Dockerfile .' - - name: Build the content-fetch cloud function docker image - run: 'docker build --file packages/content-fetch/Dockerfile-gcf .' - name: Build the tts docker image run: 'docker build --file packages/text-to-speech/Dockerfile .' diff --git a/packages/api/src/jobs/bulk_action.ts b/packages/api/src/jobs/bulk_action.ts index 28ee8461c..a24f9b597 100644 --- a/packages/api/src/jobs/bulk_action.ts +++ b/packages/api/src/jobs/bulk_action.ts @@ -1,5 +1,5 @@ import { BulkActionType } from '../generated/graphql' -import { getBackendQueue } from '../queue-processor' +import { getQueue } from '../queue-processor' import { batchUpdateLibraryItems } from '../services/library_item' import { logger } from '../utils/logger' @@ -18,7 +18,7 @@ export const BULK_ACTION_JOB_NAME = 'bulk-action' export const bulkAction = async (data: BulkActionData) => { const { userId, action, query, labelIds, args, batchSize, count } = data - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { throw new Error('Queue not initialized') } diff --git a/packages/api/src/jobs/rss/refreshAllFeeds.ts b/packages/api/src/jobs/rss/refreshAllFeeds.ts index a75385187..29ef6f894 100644 --- a/packages/api/src/jobs/rss/refreshAllFeeds.ts +++ b/packages/api/src/jobs/rss/refreshAllFeeds.ts @@ -1,7 +1,7 @@ import { Job } from 'bullmq' import { DataSource } from 'typeorm' import { v4 as uuid } from 'uuid' -import { getBackendQueue, JOB_VERSION } from '../../queue-processor' +import { getQueue, JOB_VERSION } from '../../queue-processor' import { validateUrl } from '../../services/create_page_save_request' import { getJobPriority, RssSubscriptionGroup } from '../../utils/createTask' import { stringToHash } from '../../utils/helpers' @@ -124,7 +124,7 @@ const updateSubscriptionGroup = async ( } export const queueRSSRefreshAllFeedsJob = async () => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return false } @@ -144,7 +144,7 @@ export const queueRSSRefreshFeedJob = async ( payload: any, options = { priority: 'low' as QueuePriority } ): Promise => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } diff --git a/packages/api/src/jobs/rss/refreshFeed.ts b/packages/api/src/jobs/rss/refreshFeed.ts index 78069a118..e568d750d 100644 --- a/packages/api/src/jobs/rss/refreshFeed.ts +++ b/packages/api/src/jobs/rss/refreshFeed.ts @@ -14,7 +14,10 @@ import { updateSubscriptions, } from '../../services/update_subscription' import { findActiveUser } from '../../services/user' -import createHttpTaskWithToken from '../../utils/createTask' +import createHttpTaskWithToken, { + enqueueFetchContentJob, + FetchContentJobData, +} from '../../utils/createTask' import { cleanUrl } from '../../utils/helpers' import { createThumbnailProxyUrl } from '../../utils/imageproxy' import { logger } from '../../utils/logger' @@ -33,6 +36,7 @@ interface RefreshFeedRequest { fetchContentTypes: FetchContentType[] folders: FolderType[] refreshContext?: RSSRefreshContext + priority?: 'low' | 'high' } export const isRefreshFeedRequest = (data: any): data is RefreshFeedRequest => { @@ -330,9 +334,10 @@ const createTask = async ( const fetchContentAndCreateItem = async ( users: UserConfig[], feedUrl: string, - item: RssFeedItem + item: RssFeedItem, + priority = 'low' as 'low' | 'high' ) => { - const payload = { + const data: FetchContentJobData = { users, source: 'rss-feeder', url: item.link.trim(), @@ -340,15 +345,24 @@ const fetchContentAndCreateItem = async ( rssFeedUrl: feedUrl, savedAt: item.isoDate, publishedAt: item.isoDate, + priority, } try { - const task = await createHttpTaskWithToken({ + const contentFetchQueueEnabled = + process.env.CONTENT_FETCH_QUEUE_ENABLED === 'true' + if (contentFetchQueueEnabled) { + return await enqueueFetchContentJob(data) + } + + return await createHttpTaskWithToken({ queue: 'omnivore-rss-feed-queue', taskHandlerUrl: env.queue.contentFetchGCFUrl, - payload, + payload: { + ...data, + priority: 'high', // use one queue for all RSS feeds for now + }, }) - return !!task } catch (error) { logger.error('Error while creating task', error) return false @@ -704,6 +718,7 @@ export const _refreshFeed = async (request: RefreshFeedRequest) => { fetchContentTypes, folders, refreshContext, + priority, } = request logger.info('Processing feed', feedUrl, { refreshContext: refreshContext }) @@ -772,7 +787,8 @@ export const _refreshFeed = async (request: RefreshFeedRequest) => { await fetchContentAndCreateItem( Array.from(task.users.values()), feedUrl, - task.item + task.item, + priority ) } diff --git a/packages/api/src/jobs/save_page.ts b/packages/api/src/jobs/save_page.ts index 850b36d8b..a721f4643 100644 --- a/packages/api/src/jobs/save_page.ts +++ b/packages/api/src/jobs/save_page.ts @@ -140,7 +140,7 @@ const sendImportStatusUpdate = async ( Authorization: auth as string, 'Content-Type': 'application/json', }, - timeout: REQUEST_TIMEOUT, + timeout: 5000, } ) } catch (e) { @@ -288,7 +288,7 @@ export const savePageJob = async (data: Data, attemptsMade: number) => { throw e } finally { - const lastAttempt = attemptsMade + 1 === MAX_IMPORT_ATTEMPTS + const lastAttempt = attemptsMade + 1 >= MAX_IMPORT_ATTEMPTS if (taskId && (isSaved || lastAttempt)) { logger.info('sending import status update') diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index d9b762937..aa5f2e7ed 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -82,7 +82,9 @@ import { CACHED_READING_POSITION_PREFIX } from './services/cached_reading_positi import { getJobPriority } from './utils/createTask' import { logger } from './utils/logger' -export const QUEUE_NAME = 'omnivore-backend-queue' +export const BACKEND_QUEUE_NAME = 'omnivore-backend-queue' +export const CONTENT_FETCH_QUEUE = 'omnivore-content-fetch-queue' + export const JOB_VERSION = 'v001' const jobLatency = new client.Histogram({ @@ -94,8 +96,8 @@ const jobLatency = new client.Histogram({ registerMetric(jobLatency) -export const getBackendQueue = async ( - name = QUEUE_NAME +export const getQueue = async ( + name = BACKEND_QUEUE_NAME ): Promise => { if (!redisDataSource.workerRedisClient) { throw new Error('Can not create queues, redis is not initialized') @@ -124,7 +126,7 @@ export const createJobId = (jobName: string, userId: string) => `${jobName}_${userId}_${JOB_VERSION}` export const getJob = async (jobId: string, queueName?: string) => { - const queue = await getBackendQueue(queueName) + const queue = await getQueue(queueName) if (!queue) { return } @@ -152,12 +154,12 @@ export const jobStateToTaskState = ( export const createWorker = (connection: ConnectionOptions) => new Worker( - QUEUE_NAME, + BACKEND_QUEUE_NAME, async (job: Job) => { const executeJob = async (job: Job) => { switch (job.name) { case 'refresh-all-feeds': { - const queue = await getBackendQueue() + const queue = await getQueue() const counts = await queue?.getJobCounts('prioritized') if (counts && counts.wait > 1000) { return @@ -239,7 +241,7 @@ export const createWorker = (connection: ConnectionOptions) => ) const setupCronJobs = async () => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { logger.error('Unable to setup cron jobs. Queue is not available.') return @@ -278,7 +280,7 @@ const main = async () => { }) app.get('/metrics', async (_, res) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { res.sendStatus(400) return @@ -295,7 +297,7 @@ const main = async () => { jobsTypes.forEach((metric, idx) => { output += `# TYPE omnivore_queue_messages_${metric} gauge\n` - output += `omnivore_queue_messages_${metric}{queue="${QUEUE_NAME}"} ${counts[metric]}\n` + output += `omnivore_queue_messages_${metric}{queue="${BACKEND_QUEUE_NAME}"} ${counts[metric]}\n` }) if (redisDataSource.redisClient) { @@ -311,7 +313,7 @@ const main = async () => { ) if (cursor != '0') { output += `# TYPE omnivore_read_position_messages gauge\n` - output += `omnivore_read_position_messages{queue="${QUEUE_NAME}"} ${10_001}\n` + output += `omnivore_read_position_messages{queue="${BACKEND_QUEUE_NAME}"} ${10_001}\n` } else if (batch) { output += `# TYPE omnivore_read_position_messages gauge\n` output += `omnivore_read_position_messages{} ${batch.length}\n` @@ -324,10 +326,10 @@ const main = async () => { const currentTime = Date.now() const ageInSeconds = (currentTime - oldestJobs[0].timestamp) / 1000 output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` - output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE_NAME}"} ${ageInSeconds}\n` + output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${BACKEND_QUEUE_NAME}"} ${ageInSeconds}\n` } else { output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` - output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE_NAME}"} ${0}\n` + output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${BACKEND_QUEUE_NAME}"} ${0}\n` } const metrics = await getMetrics() @@ -357,7 +359,7 @@ const main = async () => { await setupCronJobs() - const queueEvents = new QueueEvents(QUEUE_NAME, { + const queueEvents = new QueueEvents(BACKEND_QUEUE_NAME, { connection: workerRedisClient, }) diff --git a/packages/api/src/resolvers/integrations/index.ts b/packages/api/src/resolvers/integrations/index.ts index d976fa6a5..a893c3e79 100644 --- a/packages/api/src/resolvers/integrations/index.ts +++ b/packages/api/src/resolvers/integrations/index.ts @@ -245,8 +245,9 @@ export const importFromIntegrationResolver = authorized< authToken, integration.importItemState || ImportItemState.Unarchived ) - // update task name in integration - await updateIntegration(integration.id, { taskName }, uid) + log.info('task created', taskName) + // // update task name in integration + // await updateIntegration(integration.id, { taskName }, uid) analytics.capture({ distinctId: uid, diff --git a/packages/api/src/services/create_page_save_request.ts b/packages/api/src/services/create_page_save_request.ts index 86d45765d..cde6140c3 100644 --- a/packages/api/src/services/create_page_save_request.ts +++ b/packages/api/src/services/create_page_save_request.ts @@ -9,7 +9,7 @@ import { } from '../generated/graphql' import { createPubSubClient, PubsubClient } from '../pubsub' import { redisDataSource } from '../redis_data_source' -import { enqueueParseRequest } from '../utils/createTask' +import { enqueueFetchContentJob } from '../utils/createTask' import { cleanUrl, generateSlug } from '../utils/helpers' import { logger } from '../utils/logger' import { createOrUpdateLibraryItem } from './library_item' @@ -57,7 +57,7 @@ const addRecentSavedItem = async (userId: string) => { // default: use normal queue const getPriorityByRateLimit = async ( userId: string -): Promise<'low' | 'high' | undefined> => { +): Promise<'low' | 'high'> => { const redisClient = redisDataSource.redisClient if (redisClient) { const oneMinuteAgo = Date.now() - 60 * 1000 @@ -75,6 +75,8 @@ const getPriorityByRateLimit = async ( logger.error('Failed to get priority by rate limit', { userId, error }) } } + + return 'high' } export const validateUrl = (url: string): URL => { @@ -157,21 +159,24 @@ export const createPageSaveRequest = async ({ // get priority by checking rate limit if not specified priority = priority || (await getPriorityByRateLimit(userId)) - logger.debug('priority', { priority }) // enqueue task to parse item - await enqueueParseRequest({ + await enqueueFetchContentJob({ url, - userId, - saveRequestId: libraryItem.id, + users: [ + { + folder, + id: userId, + libraryItemId: libraryItem.id, + }, + ], priority, state, labels, locale, timezone, - savedAt, - publishedAt, - folder, + savedAt: savedAt?.toISOString(), + publishedAt: publishedAt?.toISOString(), rssFeedUrl: subscription, }) diff --git a/packages/api/src/services/library_item.ts b/packages/api/src/services/library_item.ts index 2a64bd77a..bd64cf1e0 100644 --- a/packages/api/src/services/library_item.ts +++ b/packages/api/src/services/library_item.ts @@ -1368,7 +1368,7 @@ export const deleteLibraryItemsByUserId = async (userId: string) => { } export const batchDelete = async (criteria: FindOptionsWhere) => { - const batchSize = 1000 + const batchSize = 20 const qb = libraryItemRepository.createQueryBuilder().where(criteria) const countSql = queryBuilderToRawSql(qb.select('COUNT(1)')) diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index e95de4fd5..4e6a4f862 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -65,7 +65,7 @@ import { UploadContentJobData, UPLOAD_CONTENT_JOB, } from '../jobs/upload_content' -import { getBackendQueue, JOB_VERSION } from '../queue-processor' +import { CONTENT_FETCH_QUEUE, getQueue, JOB_VERSION } from '../queue-processor' import { redisDataSource } from '../redis_data_source' import { writeDigest } from '../services/digest' import { signFeatureToken } from '../services/features' @@ -78,6 +78,8 @@ import View = google.cloud.tasks.v2.Task.View // Instantiates a client. const client = new CloudTasksClient() +const FETCH_CONTENT_JOB = 'fetch-content' + /** * we want to prioritized jobs by the expected time to complete * lower number means higher priority @@ -94,17 +96,21 @@ export const getJobPriority = (jobName: string): number => { case SYNC_READ_POSITIONS_JOB_NAME: case SEND_EMAIL_JOB: case UPDATE_HOME_JOB: + case `${FETCH_CONTENT_JOB}_high`: return 1 case TRIGGER_RULE_JOB_NAME: case CALL_WEBHOOK_JOB_NAME: case AI_SUMMARIZE_JOB_NAME: case PROCESS_YOUTUBE_VIDEO_JOB_NAME: + case `${FETCH_CONTENT_JOB}_low`: + case `${FETCH_CONTENT_JOB}_rss_high`: return 5 case BULK_ACTION_JOB_NAME: case `${REFRESH_FEED_JOB_NAME}_high`: case PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME: case UPLOAD_CONTENT_JOB: case SCORE_LIBRARY_ITEM_JOB: + case `${FETCH_CONTENT_JOB}_rss_low`: return 10 case `${REFRESH_FEED_JOB_NAME}_low`: case EXPORT_ITEM_JOB_NAME: @@ -320,6 +326,25 @@ export const deleteTask = async ( } } +export interface FetchContentJobData { + url: string + users: Array<{ + id: string + folder?: string + libraryItemId: string + }> + priority: 'low' | 'high' + state?: ArticleSavingRequestStatus + labels?: Array + locale?: string + timezone?: string + savedAt?: string + publishedAt?: string + folder?: string + rssFeedUrl?: string + source?: string +} + /** * Enqueues the task for the article content parsing with Puppeteer by URL * @param url - URL address of the article to parse @@ -329,88 +354,43 @@ export const deleteTask = async ( * @param queue - Queue name * @returns Name of the task created */ -export const enqueueParseRequest = async ({ - url, - userId, - saveRequestId, - priority = 'high', - queue = env.queue.name, - state, - labels, - locale, - timezone, - savedAt, - publishedAt, - folder, - rssFeedUrl, -}: { - url: string - userId: string - saveRequestId: string - priority?: 'low' | 'high' - queue?: string - state?: ArticleSavingRequestStatus - labels?: CreateLabelInput[] - locale?: string - timezone?: string - savedAt?: Date - publishedAt?: Date - folder?: string - rssFeedUrl?: string -}): Promise => { - const { GOOGLE_CLOUD_PROJECT } = process.env - const payload = { - url, - userId, - saveRequestId, - state, - labels, - locale, - timezone, - savedAt, - publishedAt, - folder, - rssFeedUrl, - priority, +export const enqueueFetchContentJob = async ( + data: FetchContentJobData +): Promise => { + const queue = await getQueue(CONTENT_FETCH_QUEUE) + if (!queue) { + throw new Error('No queue found') } - // If there is no Google Cloud Project Id exposed, it means that we are in local environment - if (env.dev.isLocal || !GOOGLE_CLOUD_PROJECT) { - if (env.queue.contentFetchUrl) { - // Calling the handler function directly. - setTimeout(() => { - axios.post(env.queue.contentFetchUrl, payload).catch((error) => { - logError(error) - logger.error( - `Error occurred while requesting local puppeteer-parse function\nPlease, ensure your function is set up properly and running using "yarn start" from the "/pkg/gcf/puppeteer-parse" folder` - ) - }) - }, 0) - } - return '' - } + const jobName = `${FETCH_CONTENT_JOB}${data.rssFeedUrl ? '_rss' : ''}_${ + data.priority + }` - // use GCF url for low priority tasks - const taskHandlerUrl = - priority === 'low' - ? env.queue.contentFetchGCFUrl - : env.queue.contentFetchUrl + // sort the data to make sure the hash is consistent + const sortedData = JSON.stringify(data, Object.keys(data).sort()) + const jobId = `${FETCH_CONTENT_JOB}_${stringToHash( + sortedData + )}_${JOB_VERSION}` + const priority = getJobPriority(jobName) - const createdTasks = await createHttpTaskWithToken({ - project: GOOGLE_CLOUD_PROJECT, - payload, + const job = await queue.add(FETCH_CONTENT_JOB, data, { + jobId, + removeOnComplete: true, + removeOnFail: true, priority, - taskHandlerUrl, - queue, + attempts: 2, + backoff: { + type: 'exponential', + delay: 2000, + }, }) - if (!createdTasks || !createdTasks[0].name) { - logger.error(`Unable to get the name of the task`, { - payload, - createdTasks, - }) - throw new CreateTaskError(`Unable to get the name of the task`) + + if (!job || !job.id) { + logger.error('Error while enqueuing fetch-content job', data) + throw new Error('Error while enqueuing fetch-content job') } - return createdTasks[0].name + + return job.id } export const enqueueReminder = async ( @@ -629,7 +609,7 @@ export const enqueueExportToIntegration = async ( integrationId: string, userId: string ) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -647,7 +627,7 @@ export const enqueueThumbnailJob = async ( userId: string, libraryItemId: string ) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -714,7 +694,7 @@ export const enqueueRssFeedFetch = async ( } export const enqueueTriggerRuleJob = async (data: TriggerRuleJobData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -726,7 +706,7 @@ export const enqueueTriggerRuleJob = async (data: TriggerRuleJobData) => { } export const enqueueWebhookJob = async (data: CallWebhookJobData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -738,7 +718,7 @@ export const enqueueWebhookJob = async (data: CallWebhookJobData) => { } export const enqueueAISummarizeJob = async (data: AISummarizeJobData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -752,7 +732,7 @@ export const enqueueAISummarizeJob = async (data: AISummarizeJobData) => { export const enqueueProcessYouTubeVideo = async ( data: ProcessYouTubeVideoJobData ) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -767,7 +747,7 @@ export const enqueueProcessYouTubeVideo = async ( export const enqueueProcessYouTubeTranscript = async ( data: ProcessYouTubeTranscriptJobData ) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -780,7 +760,7 @@ export const enqueueProcessYouTubeTranscript = async ( } export const bulkEnqueueUpdateLabels = async (data: UpdateLabelsData[]) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return [] } @@ -806,7 +786,7 @@ export const bulkEnqueueUpdateLabels = async (data: UpdateLabelsData[]) => { } export const enqueueUpdateHighlight = async (data: UpdateHighlightData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -825,7 +805,7 @@ export const enqueueUpdateHighlight = async (data: UpdateHighlightData) => { } export const enqueueBulkAction = async (data: BulkActionData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -846,7 +826,7 @@ export const enqueueBulkAction = async (data: BulkActionData) => { } export const enqueueExportItem = async (jobData: ExportItemJobData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -862,7 +842,7 @@ export const enqueueExportItem = async (jobData: ExportItemJobData) => { } export const enqueueSendEmail = async (jobData: SendEmailJobData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -881,7 +861,7 @@ export const scheduledDigestJobOptions = ( }) export const removeDigestJobs = async (userId: string) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { throw new Error('No queue found') } @@ -911,7 +891,7 @@ export const enqueueCreateDigest = async ( data: CreateDigestData, schedule?: CreateDigestJobSchedule ): Promise => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { throw new Error('No queue found') } @@ -974,7 +954,7 @@ export const enqueueCreateDigest = async ( export const enqueueBulkUploadContentJob = async ( data: UploadContentJobData[] ) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return '' } @@ -998,7 +978,7 @@ export const updateHomeJobId = (userId: string) => `${UPDATE_HOME_JOB}_${userId}_${JOB_VERSION}` export const enqueueUpdateHomeJob = async (data: UpdateHomeJobData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -1016,7 +996,7 @@ export const updateScoreJobId = (userId: string) => `${SCORE_LIBRARY_ITEM_JOB}_${userId}_${JOB_VERSION}` export const enqueueScoreJob = async (data: ScoreLibraryItemJobData) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -1034,7 +1014,7 @@ export const enqueueGeneratePreviewContentJob = async ( libraryItemId: string, userId: string ) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -1056,7 +1036,7 @@ export const enqueueGeneratePreviewContentJob = async ( } export const enqueuePruneTrashJob = async (numDays: number) => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } @@ -1075,7 +1055,7 @@ export const enqueuePruneTrashJob = async (numDays: number) => { } export const enqueueExpireFoldersJob = async () => { - const queue = await getBackendQueue() + const queue = await getQueue() if (!queue) { return undefined } diff --git a/packages/api/test/resolvers/article.test.ts b/packages/api/test/resolvers/article.test.ts index b1fe5a07a..fa76abc86 100644 --- a/packages/api/test/resolvers/article.test.ts +++ b/packages/api/test/resolvers/article.test.ts @@ -646,8 +646,8 @@ describe('Article API', () => { context('when the source is rss-feeder and url is from youtube.com', () => { const source = 'rss-feeder' - const stub = sinon.stub(createTask, 'enqueueParseRequest') - const stub2 = sinon.stub(createTask, 'enqueueProcessYouTubeVideo') + const stub = sinon.stub(createTask, 'enqueueFetchContentJob') + sinon.stub(createTask, 'enqueueProcessYouTubeVideo') before(() => { url = 'https://www.youtube.com/watch?v=123' @@ -678,7 +678,11 @@ describe('Article API', () => { const url = 'https://blog.omnivore.app/new-url-1' before(() => { - sinon.replace(createTask, 'enqueueParseRequest', sinon.fake.resolves('')) + sinon.replace( + createTask, + 'enqueueFetchContentJob', + sinon.fake.resolves('') + ) }) beforeEach(() => { diff --git a/packages/api/test/resolvers/article_saving_request.test.ts b/packages/api/test/resolvers/article_saving_request.test.ts index 9df8b57f5..104146261 100644 --- a/packages/api/test/resolvers/article_saving_request.test.ts +++ b/packages/api/test/resolvers/article_saving_request.test.ts @@ -13,17 +13,9 @@ import * as createTask from '../../src/utils/createTask' import { createTestUser } from '../db' import { graphqlRequest, request } from '../util' -const articleSavingRequestQuery = ({ - id, - url, -}: { - id?: string - url?: string -}) => ` - query { - articleSavingRequest(id: ${id ? `"${id}"` : null}, url: ${ - url ? `"${url}"` : null -}) { +const articleSavingRequestQuery = ` + query ArticleSavingRequest($id: ID, $url: String) { + articleSavingRequest(id: $id, url: $url) { ... on ArticleSavingRequestSuccess { articleSavingRequest { id @@ -74,9 +66,9 @@ describe('ArticleSavingRequest API', () => { .post('/local/debug/fake-user-login') .send({ fakeEmail: user.email }) - authToken = res.body.authToken + authToken = res.body.authToken as string - sinon.replace(createTask, 'enqueueParseRequest', sinon.fake.resolves('')) + sinon.replace(createTask, 'enqueueFetchContentJob', sinon.fake.resolves('')) }) after(async () => { @@ -131,14 +123,14 @@ describe('ArticleSavingRequest API', () => { createArticleSavingRequestMutation(url), authToken ).expect(200) - id = res.body.data.createArticleSavingRequest.articleSavingRequest.id + id = res.body.data.createArticleSavingRequest.articleSavingRequest + .id as string }) it('returns the article saving request if exists', async () => { - const res = await graphqlRequest( - articleSavingRequestQuery({ url }), - authToken - ).expect(200) + const res = await graphqlRequest(articleSavingRequestQuery, authToken, { + id, + }).expect(200) expect( res.body.data.articleSavingRequest.articleSavingRequest.status @@ -146,10 +138,9 @@ describe('ArticleSavingRequest API', () => { }) it('returns the user profile info', async () => { - const res = await graphqlRequest( - articleSavingRequestQuery({ url }), - authToken - ).expect(200) + const res = await graphqlRequest(articleSavingRequestQuery, authToken, { + url, + }).expect(200) expect( res.body.data.articleSavingRequest.articleSavingRequest.user.profile @@ -158,10 +149,9 @@ describe('ArticleSavingRequest API', () => { }) it('returns the article saving request by id', async () => { - const res = await graphqlRequest( - articleSavingRequestQuery({ id }), - authToken - ).expect(200) + const res = await graphqlRequest(articleSavingRequestQuery, authToken, { + id, + }).expect(200) expect( res.body.data.articleSavingRequest.articleSavingRequest.status @@ -169,10 +159,9 @@ describe('ArticleSavingRequest API', () => { }) it('returns not_found if not exists', async () => { - const res = await graphqlRequest( - articleSavingRequestQuery({ id: 'invalid-id' }), - authToken - ).expect(200) + const res = await graphqlRequest(articleSavingRequestQuery, authToken, { + id: 'invalid-id', + }).expect(200) expect(res.body.data.articleSavingRequest.errorCodes).to.eql([ ArticleSavingRequestErrorCode.NotFound, diff --git a/packages/api/test/resolvers/integrations.test.ts b/packages/api/test/resolvers/integrations.test.ts index bc716e915..a8278b06c 100644 --- a/packages/api/test/resolvers/integrations.test.ts +++ b/packages/api/test/resolvers/integrations.test.ts @@ -19,7 +19,7 @@ chai.use(sinonChai) describe('Integrations resolvers', () => { const READWISE_API_URL = 'https://readwise.io/api/v2' - + let loginUser: User let authToken: string @@ -30,7 +30,7 @@ describe('Integrations resolvers', () => { .post('/local/debug/fake-user-login') .send({ fakeEmail: loginUser.email }) - authToken = res.body.authToken + authToken = res.body.authToken as string }) after(async () => { @@ -39,19 +39,9 @@ describe('Integrations resolvers', () => { describe('setIntegration API', () => { const validToken = 'valid-token' - const query = ( - id = '', - name = 'READWISE', - token: string = 'test token', - enabled = true - ) => ` - mutation { - setIntegration(input: { - id: "${id}", - name: "${name}", - token: "${token}", - enabled: ${enabled}, - }) { + const query = ` + mutation SetIntegration($input: SetIntegrationInput!) { + setIntegration(input: $input) { ... on SetIntegrationSuccess { integration { id @@ -79,6 +69,8 @@ describe('Integrations resolvers', () => { .reply(204) .persist() integrationName = 'READWISE' + enabled = true + token = 'test token' }) after(() => { @@ -101,10 +93,14 @@ describe('Integrations resolvers', () => { }) it('returns InvalidToken error code', async () => { - const res = await graphqlRequest( - query(integrationId, integrationName, token), - authToken - ) + const res = await graphqlRequest(query, authToken, { + input: { + id: integrationId, + name: integrationName, + token, + enabled, + }, + }) expect(res.body.data.setIntegration.errorCodes).to.eql([ SetIntegrationErrorCode.InvalidToken, ]) @@ -124,10 +120,14 @@ describe('Integrations resolvers', () => { }) it('creates new integration', async () => { - const res = await graphqlRequest( - query(integrationId, integrationName, token), - authToken - ) + const res = await graphqlRequest(query, authToken, { + input: { + id: integrationId, + name: integrationName, + token, + enabled, + }, + }) expect(res.body.data.setIntegration.integration.enabled).to.be.true }) }) @@ -142,10 +142,9 @@ describe('Integrations resolvers', () => { }) it('returns NotFound error code', async () => { - const res = await graphqlRequest( - query(integrationId, integrationName), - authToken - ) + const res = await graphqlRequest(query, authToken, { + input: { id: integrationId, name: integrationName, enabled, token }, + }) expect(res.body.data.setIntegration.errorCodes).to.eql([ SetIntegrationErrorCode.NotFound, ]) @@ -163,6 +162,7 @@ describe('Integrations resolvers', () => { user: { id: otherUser.id }, name: 'READWISE', token: 'fakeToken', + enabled, }, otherUser.id ) @@ -175,10 +175,14 @@ describe('Integrations resolvers', () => { }) it('returns Unauthorized error code', async () => { - const res = await graphqlRequest( - query(integrationId, integrationName), - authToken - ) + const res = await graphqlRequest(query, authToken, { + input: { + id: integrationId, + name: integrationName, + enabled, + token, + }, + }) expect(res.body.data.setIntegration.errorCodes).to.eql([ SetIntegrationErrorCode.NotFound, ]) @@ -192,6 +196,7 @@ describe('Integrations resolvers', () => { user: { id: loginUser.id }, name: 'READWISE', token: 'fakeToken', + enabled, }, loginUser.id ) @@ -208,17 +213,25 @@ describe('Integrations resolvers', () => { }) afterEach(async () => { - await updateIntegration(existingIntegration.id, { - taskName: 'some task name', - enabled: true, - }, loginUser.id) + await updateIntegration( + existingIntegration.id, + { + taskName: 'some task name', + enabled: true, + }, + loginUser.id + ) }) it('disables integration', async () => { - const res = await graphqlRequest( - query(integrationId, integrationName, token, enabled), - authToken - ) + const res = await graphqlRequest(query, authToken, { + input: { + id: integrationId, + name: integrationName, + token, + enabled, + }, + }) expect(res.body.data.setIntegration.integration.enabled).to.be .false }) @@ -230,17 +243,25 @@ describe('Integrations resolvers', () => { }) afterEach(async () => { - await updateIntegration(existingIntegration.id, { - taskName: null, - enabled: false, - }, loginUser.id) + await updateIntegration( + existingIntegration.id, + { + taskName: null, + enabled: false, + }, + loginUser.id + ) }) it('enables integration', async () => { - const res = await graphqlRequest( - query(integrationId, integrationName, token, enabled), - authToken - ) + const res = await graphqlRequest(query, authToken, { + input: { + id: integrationId, + name: integrationName, + token, + enabled, + }, + }) expect(res.body.data.setIntegration.integration.enabled).to.be .true }) @@ -333,9 +354,12 @@ describe('Integrations resolvers', () => { query(existingIntegration.id), authToken ) - const integration = await findIntegration({ - id: existingIntegration.id, - }, loginUser.id) + const integration = await findIntegration( + { + id: existingIntegration.id, + }, + loginUser.id + ) expect(res.body.data.deleteIntegration.integration).to.be.an('object') expect(res.body.data.deleteIntegration.integration.id).to.eql( @@ -383,10 +407,6 @@ describe('Integrations resolvers', () => { authToken ).expect(200) expect(res.body.data.importFromIntegration.success).to.be.true - const integration = await findIntegration({ - id: existingIntegration.id, - }, loginUser.id) - expect(integration?.taskName).not.to.be.null }) }) diff --git a/packages/api/test/routers/article.test.ts b/packages/api/test/routers/article.test.ts index 1d0008602..b992cfb07 100644 --- a/packages/api/test/routers/article.test.ts +++ b/packages/api/test/routers/article.test.ts @@ -27,7 +27,7 @@ describe('/article/save API', () => { .post('/local/debug/fake-user-login') .send({ fakeEmail: user.email }) - authToken = res.body.authToken + authToken = res.body.authToken as string }) after(async () => { @@ -39,7 +39,11 @@ describe('/article/save API', () => { const url = 'https://blog.omnivore.app' before(() => { - sinon.replace(createTask, 'enqueueParseRequest', sinon.fake.resolves('')) + sinon.replace( + createTask, + 'enqueueFetchContentJob', + sinon.fake.resolves('') + ) }) after(() => { diff --git a/packages/api/test/util.ts b/packages/api/test/util.ts index 6f043d98a..a20c12a76 100644 --- a/packages/api/test/util.ts +++ b/packages/api/test/util.ts @@ -4,7 +4,7 @@ import { nanoid } from 'nanoid' import supertest from 'supertest' import { v4 } from 'uuid' import { makeApolloServer } from '../src/apollo' -import { createWorker, QUEUE_NAME } from '../src/queue-processor' +import { BACKEND_QUEUE_NAME, createWorker } from '../src/queue-processor' import { createApp } from '../src/server' import { corsConfig } from '../src/utils/corsConfig' @@ -26,7 +26,7 @@ export const stopApolloServer = async () => { export const startWorker = (connection: ConnectionOptions) => { worker = createWorker(connection) - queueEvents = new QueueEvents(QUEUE_NAME, { + queueEvents = new QueueEvents(BACKEND_QUEUE_NAME, { connection, }) } diff --git a/packages/content-fetch/Dockerfile-gcf b/packages/content-fetch/Dockerfile-gcf deleted file mode 100644 index 7691ab1ae..000000000 --- a/packages/content-fetch/Dockerfile-gcf +++ /dev/null @@ -1,53 +0,0 @@ -FROM node:18.16-alpine - -# Installs latest Chromium (92) package. -RUN apk add --no-cache \ - chromium \ - nss \ - freetype \ - harfbuzz \ - ca-certificates \ - ttf-freefont \ - nodejs \ - yarn \ - g++ \ - make \ - python3 - -WORKDIR /app - -ENV CHROMIUM_PATH /usr/bin/chromium-browser -ENV LAUNCH_HEADLESS=true -ENV PORT 9090 - -COPY package.json . -COPY yarn.lock . -COPY tsconfig.json . -COPY .prettierrc . -COPY .eslintrc . - -COPY /packages/content-fetch/package.json ./packages/content-fetch/package.json -COPY /packages/content-handler/package.json ./packages/content-handler/package.json -COPY /packages/puppeteer-parse/package.json ./packages/puppeteer-parse/package.json -COPY /packages/utils/package.json ./packages/utils/package.json - -RUN yarn install --pure-lockfile - -ADD /packages/content-handler ./packages/content-handler -ADD /packages/puppeteer-parse ./packages/puppeteer-parse -ADD /packages/content-fetch ./packages/content-fetch -ADD /packages/utils ./packages/utils -RUN yarn workspace @omnivore/utils build -RUN yarn workspace @omnivore/content-handler build -RUN yarn workspace @omnivore/puppeteer-parse build -RUN yarn workspace @omnivore/content-fetch build - -# After building, fetch the production dependencies -RUN rm -rf /app/packages/content-fetch/node_modules -RUN rm -rf /app/node_modules -RUN yarn install --pure-lockfile --production - -EXPOSE 9090 - -# USER pptruser -ENTRYPOINT ["yarn", "workspace", "@omnivore/content-fetch", "start_gcf"] diff --git a/packages/content-fetch/package.json b/packages/content-fetch/package.json index 28f70ed19..743a3bca0 100644 --- a/packages/content-fetch/package.json +++ b/packages/content-fetch/package.json @@ -7,17 +7,20 @@ "build/src" ], "dependencies": { - "bullmq": "^5.1.1", - "dotenv": "^8.2.0", - "express": "^4.17.1", - "posthog-node": "^3.6.3", - "@google-cloud/functions-framework": "^3.0.0", "@google-cloud/storage": "^7.0.1", "@omnivore/puppeteer-parse": "^1.0.0", "@omnivore/utils": "1.0.0", - "@sentry/serverless": "^7.77.0" + "axios": "^0.27.2", + "bullmq": "^5.1.1", + "dotenv": "^8.2.0", + "express": "^4.17.1", + "express-async-handler": "^1.2.0", + "jsonwebtoken": "^8.5.1", + "posthog-node": "^3.6.3" }, "devDependencies": { + "@types/express": "^4.17.1", + "@types/jsonwebtoken": "^8.5.0", "chai": "^4.3.6", "mocha": "^10.0.0" }, @@ -26,8 +29,7 @@ "test:typecheck": "tsc --noEmit", "lint": "eslint src --ext ts,js,tsx,jsx", "build": "tsc", - "start": "node build/src/app.js", - "start_gcf": "functions-framework --port=9090 --target=puppeteer" + "start": "node build/src/app.js" }, "volta": { "extends": "../../package.json" diff --git a/packages/content-fetch/src/app.ts b/packages/content-fetch/src/app.ts index f3b949dfd..8ffb1ef6b 100644 --- a/packages/content-fetch/src/app.ts +++ b/packages/content-fetch/src/app.ts @@ -1,34 +1,124 @@ -import 'dotenv/config' -import express from 'express' -import { contentFetchRequestHandler } from './request_handler' +import { RedisDataSource } from '@omnivore/utils' +import { JobType } from 'bullmq' +import express, { Express } from 'express' +import asyncHandler from 'express-async-handler' +import { createWorker, getQueue, QUEUE } from './worker' -const app = express() +const main = () => { + console.log('Starting worker...') -app.use(express.json()) -app.use(express.urlencoded({ extended: true })) + const app: Express = express() + const port = process.env.PORT || 3002 -if (!process.env.VERIFICATION_TOKEN) { - throw new Error('VERIFICATION_TOKEN environment variable is not set') + // create redis source + const redisDataSource = new RedisDataSource({ + cache: { + url: process.env.REDIS_URL, + cert: process.env.REDIS_CERT, + }, + mq: { + url: process.env.MQ_REDIS_URL, + cert: process.env.MQ_REDIS_CERT, + }, + }) + + const worker = createWorker(redisDataSource) + + // respond healthy to auto-scaler. + app.get('/_ah/health', (req, res) => res.sendStatus(200)) + + app.get( + '/lifecycle/prestop', + asyncHandler(async (_req, res) => { + console.log('Prestop lifecycle hook called.') + + await worker.close() + console.log('Worker closed') + + res.sendStatus(200) + }) + ) + + app.get( + '/metrics', + asyncHandler(async (_, res) => { + let output = '' + + const queue = await getQueue(redisDataSource.queueRedisClient) + + const jobsTypes: Array = [ + 'active', + 'failed', + 'completed', + 'prioritized', + ] + const counts = await queue.getJobCounts(...jobsTypes) + + jobsTypes.forEach((metric) => { + output += `# TYPE omnivore_queue_messages_${metric} gauge\n` + output += `omnivore_queue_messages_${metric}{queue="${QUEUE}"} ${counts[metric]}\n` + }) + + // Export the age of the oldest prioritized job in the queue + const oldestJobs = await queue.getJobs(['prioritized'], 0, 1, true) + if (oldestJobs.length > 0) { + const currentTime = Date.now() + const ageInSeconds = (currentTime - oldestJobs[0].timestamp) / 1000 + output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` + output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE}"} ${ageInSeconds}\n` + } else { + output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n` + output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE}"} ${0}\n` + } + + res.status(200).setHeader('Content-Type', 'text/plain').send(output) + }) + ) + + const server = app.listen(port, () => { + console.log('Worker started') + }) + + const gracefulShutdown = async (signal: string) => { + console.log(`Received ${signal}, closing server...`) + await new Promise((resolve) => { + server.close((err) => { + console.log('Express server closed') + if (err) { + console.log('Error stopping server', { err }) + } + + resolve() + }) + }) + + await worker.close() + console.log('Worker closed') + + await redisDataSource.shutdown() + console.log('Redis connection closed') + + process.exit(0) + } + + // eslint-disable-next-line @typescript-eslint/no-misused-promises + process.on('SIGINT', () => gracefulShutdown('SIGINT')) + + // eslint-disable-next-line @typescript-eslint/no-misused-promises + process.on('SIGTERM', () => gracefulShutdown('SIGTERM')) + + process.on('uncaughtException', function (err) { + // Handle the error safely + console.error(err, 'Uncaught exception') + }) + + process.on('unhandledRejection', (reason, promise) => { + // Handle the error safely + console.error({ promise, reason }, 'Unhandled Rejection at: Promise') + }) } -app.get('/_ah/health', (req, res) => res.sendStatus(200)) - -app.all('/', (req, res, next) => { - if (req.method !== 'GET' && req.method !== 'POST') { - console.error('request method is not GET or POST') - return res.sendStatus(405) - } - - if (req.query.token !== process.env.VERIFICATION_TOKEN) { - console.error('query does not include valid token') - return res.sendStatus(403) - } - - return contentFetchRequestHandler(req, res, next) -}) - -const PORT = process.env.PORT ? parseInt(process.env.PORT) : 8080 -app.listen(PORT, () => { - console.log(`App listening on port ${PORT}`) - console.log('Press Ctrl+C to quit.') -}) +// only call main if the file was called from the CLI and wasn't required from another module +if (require.main === module) { + main() +} diff --git a/packages/content-fetch/src/index.ts b/packages/content-fetch/src/index.ts deleted file mode 100644 index d3bc341d1..000000000 --- a/packages/content-fetch/src/index.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { HttpFunction } from '@google-cloud/functions-framework' -import * as Sentry from '@sentry/serverless' -import 'dotenv/config' -import { contentFetchRequestHandler } from './request_handler' - -Sentry.GCPFunction.init({ - dsn: process.env.SENTRY_DSN, - tracesSampleRate: 0, -}) - -/** - * Cloud Function entry point, HTTP trigger. - * Loads the requested URL via Puppeteer, captures page content and sends it to backend - * - * @param {Object} req Cloud Function request context. - * @param {Object} res Cloud Function response context. - */ -export const puppeteer = Sentry.GCPFunction.wrapHttpFunction( - contentFetchRequestHandler as HttpFunction -) - -/** - * Cloud Function entry point, HTTP trigger. - * Loads the requested URL via Puppeteer and captures a screenshot of the provided element - * - * @param {Object} req Cloud Function request context. - * Inlcudes: - * * url - URL address of the page to open - * @param {Object} res Cloud Function response context. - */ -// exports.preview = Sentry.GCPFunction.wrapHttpFunction(preview); diff --git a/packages/content-fetch/src/job.ts b/packages/content-fetch/src/job.ts index ae282bfb7..e3465b922 100644 --- a/packages/content-fetch/src/job.ts +++ b/packages/content-fetch/src/job.ts @@ -38,23 +38,24 @@ const getPriority = (job: SavePageJob): number => { // priority 5: jobs that are expected to finish in less than 10 second // priority 10: jobs that are expected to finish in less than 10 minutes // priority 100: jobs that are expected to finish in less than 1 hour - if (job.isRss) { - return 10 - } if (job.isImport) { return 100 } - return job.priority === 'low' ? 10 : 1 + if (job.isRss) { + return job.priority === 'low' ? 10 : 5 + } + + return job.priority === 'low' ? 5 : 1 } const getAttempts = (job: SavePageJob): number => { - if (job.isRss || job.isImport) { - // we don't want to retry rss or import jobs + if (job.isImport) { + // we don't want to retry import jobs return 1 } - return 3 + return job.isRss ? 2 : 3 } const getOpts = (job: SavePageJob): BulkJobOptions => { diff --git a/packages/content-fetch/src/request_handler.ts b/packages/content-fetch/src/request_handler.ts index 6a427434d..e5ff9bc97 100644 --- a/packages/content-fetch/src/request_handler.ts +++ b/packages/content-fetch/src/request_handler.ts @@ -1,8 +1,10 @@ import { Storage } from '@google-cloud/storage' import { fetchContent } from '@omnivore/puppeteer-parse' import { RedisDataSource } from '@omnivore/utils' +import axios from 'axios' import 'dotenv/config' -import { RequestHandler } from 'express' +import jwt from 'jsonwebtoken' +import { promisify } from 'util' import { analytics } from './analytics' import { queueSavePageJob } from './job' @@ -12,7 +14,7 @@ interface UserConfig { folder?: string } -interface RequestBody { +export interface JobData { url: string userId?: string saveRequestId: string @@ -65,8 +67,17 @@ const bucketName = process.env.GCS_UPLOAD_BUCKET || 'omnivore-files' const NO_CACHE_URLS = [ 'https://deviceandbrowserinfo.com/are_you_a_bot', 'https://deviceandbrowserinfo.com/info_device', + 'https://jacksonh.org', ] +const signToken = promisify(jwt.sign) + +const IMPORTER_METRICS_COLLECTOR_URL = + process.env.IMPORTER_METRICS_COLLECTOR_URL +const JWT_SECRET = process.env.JWT_SECRET + +const MAX_IMPORT_ATTEMPTS = 1 + const uploadToBucket = async (filePath: string, data: string) => { await storage .bucket(bucketName) @@ -175,36 +186,71 @@ const incrementContentFetchFailure = async ( } } -export const contentFetchRequestHandler: RequestHandler = async (req, res) => { +const sendImportStatusUpdate = async ( + userId: string, + taskId: string, + isImported?: boolean +) => { + try { + if (!JWT_SECRET || !IMPORTER_METRICS_COLLECTOR_URL) { + console.error('JWT_SECRET or IMPORTER_METRICS_COLLECTOR_URL is not set') + return + } + + console.log('sending import status update') + const auth = await signToken({ uid: userId }, JWT_SECRET) + + await axios.post( + IMPORTER_METRICS_COLLECTOR_URL, + { + taskId, + status: isImported ? 'imported' : 'failed', + }, + { + headers: { + Authorization: auth as string, + 'Content-Type': 'application/json', + }, + timeout: 5000, + } + ) + } catch (e) { + console.error('Failed to send import status update', e) + } +} + +export const processFetchContentJob = async ( + redisDataSource: RedisDataSource, + data: JobData, + attemptsMade: number +) => { const functionStartTime = Date.now() - const body = req.body - // users is used when saving article for multiple users - let users = body.users || [] - const userId = body.userId + let users = data.users || [] + const userId = data.userId // userId is used when saving article for a single user if (userId) { users = [ { id: userId, - folder: body.folder, - libraryItemId: body.saveRequestId, + folder: data.folder, + libraryItemId: data.saveRequestId, }, ] } - const articleSavingRequestId = body.saveRequestId - const state = body.state - const labels = body.labels - const source = body.source || 'puppeteer-parse' - const taskId = body.taskId // taskId is used to update import status - const url = body.url - const locale = body.locale - const timezone = body.timezone - const rssFeedUrl = body.rssFeedUrl - const savedAt = body.savedAt - const publishedAt = body.publishedAt - const priority = body.priority + const articleSavingRequestId = data.saveRequestId + const state = data.state + const labels = data.labels + const source = data.source || 'puppeteer-parse' + const taskId = data.taskId // taskId is used to update import status + const url = data.url + const locale = data.locale + const timezone = data.timezone + const rssFeedUrl = data.rssFeedUrl + const savedAt = data.savedAt + const publishedAt = data.publishedAt + const priority = data.priority const logRecord: LogRecord = { url, @@ -214,7 +260,7 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { }, state, labelsToAdd: labels, - taskId: taskId, + taskId, locale, timezone, rssFeedUrl, @@ -225,25 +271,14 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { console.log(`Article parsing request`, logRecord) - // create redis source - const redisDataSource = new RedisDataSource({ - cache: { - url: process.env.REDIS_URL, - cert: process.env.REDIS_CERT, - }, - mq: { - url: process.env.MQ_REDIS_URL, - cert: process.env.MQ_REDIS_CERT, - }, - }) - try { const domain = new URL(url).hostname const isBlocked = await isDomainBlocked(redisDataSource, domain) if (isBlocked) { console.log('domain is blocked', domain) + logRecord.error = 'domain is blocked' - return res.sendStatus(200) + return } const key = cacheKey(url, locale, timezone) @@ -312,7 +347,7 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { logRecord.error = 'unknown error' } - return res.sendStatus(500) + throw error } finally { logRecord.totalTime = Date.now() - functionStartTime console.log(`parse-page result`, logRecord) @@ -331,8 +366,11 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { } ) - await redisDataSource.shutdown() + const lastAttempt = attemptsMade + 1 >= MAX_IMPORT_ATTEMPTS + if (logRecord.error && taskId && lastAttempt) { + console.log('sending import status update') + // send failed to import status to update the metrics for importer + await sendImportStatusUpdate(users[0].id, taskId, false) + } } - - res.sendStatus(200) } diff --git a/packages/content-fetch/src/worker.ts b/packages/content-fetch/src/worker.ts new file mode 100644 index 000000000..0d28d4f5a --- /dev/null +++ b/packages/content-fetch/src/worker.ts @@ -0,0 +1,77 @@ +import { RedisDataSource } from '@omnivore/utils' +import { Job, Queue, QueueEvents, RedisClient, Worker } from 'bullmq' +import { JobData, processFetchContentJob } from './request_handler' + +export const QUEUE = 'omnivore-content-fetch-queue' + +export const getQueue = async ( + connection: RedisClient, + queueName = QUEUE +): Promise => { + const queue = new Queue(queueName, { + connection, + defaultJobOptions: { + backoff: { + type: 'exponential', + delay: 2000, // 2 seconds + }, + removeOnComplete: { + age: 24 * 3600, // keep up to 24 hours + }, + removeOnFail: { + age: 7 * 24 * 3600, // keep up to 7 days + }, + }, + }) + await queue.waitUntilReady() + return queue +} + +export const createWorker = ( + redisDataSource: RedisDataSource, + queueName = QUEUE +) => { + const worker = new Worker( + queueName, + async (job: Job) => { + // process the job + await processFetchContentJob(redisDataSource, job.data, job.attemptsMade) + }, + { + connection: redisDataSource.queueRedisClient, + autorun: true, // start processing jobs immediately + // process up to 10 jobs in a second + limiter: { + max: 10, + duration: 1000, + }, + concurrency: 2, // process up to 2 jobs concurrently + } + ) + + worker.on('error', (err) => { + console.error('worker error:', err) + }) + + const queueEvents = new QueueEvents(queueName, { + connection: redisDataSource.queueRedisClient, + }) + + queueEvents.on('added', (job) => { + console.log('added job:', job.jobId, job.name) + }) + + queueEvents.on('removed', (job) => { + console.log('removed job:', job.jobId) + }) + + queueEvents.on('completed', (job) => { + console.log('completed job:', job.jobId) + }) + + queueEvents.on('failed', (job) => { + console.log('failed job:', job.jobId) + }) + + return worker +} diff --git a/packages/import-handler/package.json b/packages/import-handler/package.json index e845181a9..b274864c6 100644 --- a/packages/import-handler/package.json +++ b/packages/import-handler/package.json @@ -36,7 +36,6 @@ "@fast-csv/parse": "^5.0.0", "@google-cloud/functions-framework": "3.1.2", "@google-cloud/storage": "^7.0.1", - "@google-cloud/tasks": "^4.0.0", "@omnivore/readability": "1.0.0", "@omnivore/utils": "1.0.0", "@sentry/serverless": "^7.77.0", diff --git a/packages/import-handler/src/index.ts b/packages/import-handler/src/index.ts index 672310954..a9cf4b730 100644 --- a/packages/import-handler/src/index.ts +++ b/packages/import-handler/src/index.ts @@ -10,10 +10,9 @@ import * as path from 'path' import { promisify } from 'util' import { v4 as uuid } from 'uuid' import { importCsv } from './csv' -import { queueEmailJob } from './job' +import { enqueueFetchContentJob, queueEmailJob } from './job' import { importMatterArchive } from './matterHistory' import { ImportStatus, updateMetrics } from './metrics' -import { CONTENT_FETCH_URL, createCloudTask } from './task' export enum ArticleSavingRequestStatus { Failed = 'FAILED', @@ -94,6 +93,7 @@ const shouldHandle = (data: StorageEvent) => { } const importURL = async ( + redisDataSource: RedisDataSource, userId: string, url: URL, source: string, @@ -103,18 +103,17 @@ const importURL = async ( savedAt?: Date, publishedAt?: Date ): Promise => { - return createCloudTask(CONTENT_FETCH_URL, { - userId, - source, + return enqueueFetchContentJob(redisDataSource, { url: url.toString(), - saveRequestId: '', + users: [{ id: userId, libraryItemId: '' }], + source, + taskId, state, labels: labels?.map((l) => { return { name: l } }), - taskId, - savedAt, - publishedAt, + savedAt: savedAt?.toISOString(), + publishedAt: publishedAt?.toISOString(), }) } @@ -194,6 +193,7 @@ const urlHandler = async ( try { // Imports are stored in the format imports//-.csv const result = await importURL( + ctx.redisDataSource, ctx.userId, url, ctx.source, diff --git a/packages/import-handler/src/job.ts b/packages/import-handler/src/job.ts index 0f040b8f3..c44156cb0 100644 --- a/packages/import-handler/src/job.ts +++ b/packages/import-handler/src/job.ts @@ -1,8 +1,14 @@ import { RedisDataSource } from '@omnivore/utils' import { Queue } from 'bullmq' +import { ArticleSavingRequestStatus } from '.' +import crypto from 'crypto' + +const BACKEND_QUEUE = 'omnivore-backend-queue' +const CONTENT_FETCH_QUEUE = 'omnivore-content-fetch-queue' -const QUEUE_NAME = 'omnivore-backend-queue' export const SEND_EMAIL_JOB = 'send-email' +const FETCH_CONTENT_JOB = 'fetch-content' +const JOB_VERSION = 'v001' interface SendEmailJobData { userId: string @@ -11,13 +17,61 @@ interface SendEmailJobData { html?: string } +interface FetchContentJobData { + url: string + users: Array<{ + id: string + folder?: string + libraryItemId: string + }> + source: string + taskId: string + state?: ArticleSavingRequestStatus + labels?: Array<{ name: string }> + savedAt?: string + publishedAt?: string +} + +export const stringToHash = (str: string): string => { + return crypto.createHash('md5').update(str).digest('hex') +} + export const queueEmailJob = async ( redisDataSource: RedisDataSource, data: SendEmailJobData ) => { - const queue = new Queue(QUEUE_NAME, { + const queue = new Queue(BACKEND_QUEUE, { connection: redisDataSource.queueRedisClient, }) await queue.add(SEND_EMAIL_JOB, data) } + +export const enqueueFetchContentJob = async ( + redisDataSource: RedisDataSource, + data: FetchContentJobData +): Promise => { + const queue = new Queue(CONTENT_FETCH_QUEUE, { + connection: redisDataSource.queueRedisClient, + }) + + // sort the data to make sure the hash is consistent + const sortedData = JSON.stringify(data, Object.keys(data).sort()) + const jobId = `${FETCH_CONTENT_JOB}_${stringToHash( + sortedData + )}_${JOB_VERSION}` + const job = await queue.add(FETCH_CONTENT_JOB, data, { + jobId, + removeOnComplete: true, + removeOnFail: true, + priority: 100, + attempts: 1, + }) + + if (!job || !job.id) { + console.error('Error while enqueuing fetch-content job', data) + throw new Error('Error while enqueuing fetch-content job') + } + + return job.id +} diff --git a/packages/import-handler/src/task.ts b/packages/import-handler/src/task.ts deleted file mode 100644 index bccca7ecb..000000000 --- a/packages/import-handler/src/task.ts +++ /dev/null @@ -1,48 +0,0 @@ -/* eslint-disable @typescript-eslint/restrict-template-expressions */ -import { CloudTasksClient, protos } from '@google-cloud/tasks' - -const cloudTask = new CloudTasksClient() - -export const CONTENT_FETCH_URL = process.env.CONTENT_FETCH_GCF_URL - -export const createCloudTask = async ( - taskHandlerUrl: string | undefined, - payload: unknown, - requestHeaders?: Record, - queue = 'omnivore-import-queue' -) => { - const location = process.env.GCP_LOCATION - const project = process.env.GCP_PROJECT_ID - - if (!project || !location || !queue || !taskHandlerUrl) { - throw `Environment not configured: ${project}, ${location}, ${queue}, ${taskHandlerUrl}` - } - - const serviceAccountEmail = `${project}@appspot.gserviceaccount.com` - - const parent = cloudTask.queuePath(project, location, queue) - const convertedPayload = JSON.stringify(payload) - const body = Buffer.from(convertedPayload).toString('base64') - const task: protos.google.cloud.tasks.v2.ITask = { - httpRequest: { - httpMethod: 'POST', - url: taskHandlerUrl, - headers: { - 'Content-Type': 'application/json', - ...requestHeaders, - }, - body, - ...(serviceAccountEmail - ? { - oidcToken: { - serviceAccountEmail, - }, - } - : null), - }, - } - - return cloudTask.createTask({ parent, task }).then((result) => { - return result[0].name ?? undefined - }) -} diff --git a/pkg/bull-queue-admin/index.js b/pkg/bull-queue-admin/index.js index fee71ed8f..49b275ec3 100644 --- a/pkg/bull-queue-admin/index.js +++ b/pkg/bull-queue-admin/index.js @@ -59,7 +59,10 @@ const run = async () => { const connection = new Redis(secrets.REDIS_URL, redisOptions(secrets)) console.log('set connection: ', connection) - const rssRefreshFeed = new Queue('omnivore-backend-queue', { + const backendQueue = new Queue('omnivore-backend-queue', { + connection: connection, + }) + const contentFetchQueue = new Queue('omnivore-content-fetch-queue', { connection: connection, }) @@ -67,7 +70,10 @@ const run = async () => { serverAdapter.setBasePath('/ui') createBullBoard({ - queues: [new BullMQAdapter(rssRefreshFeed)], + queues: [ + new BullMQAdapter(backendQueue), + new BullMQAdapter(contentFetchQueue), + ], serverAdapter, }) diff --git a/yarn.lock b/yarn.lock index 7dae9e991..6252be488 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2799,7 +2799,7 @@ google-gax "^3.5.7" protobufjs "^7.2.5" -"@google-cloud/functions-framework@3.1.2", "@google-cloud/functions-framework@^3.0.0": +"@google-cloud/functions-framework@3.1.2": version "3.1.2" resolved "https://registry.yarnpkg.com/@google-cloud/functions-framework/-/functions-framework-3.1.2.tgz#2cd92ce4307bf7f32555d028dca22e398473b410" integrity sha512-pYvEH65/Rqh1JNPdcBmorcV7Xoom2/iOSmbtYza8msro7Inl+qOYxbyMiQfySD2gwAyn38WyWPRqsDRcf/BFLg== @@ -8067,7 +8067,7 @@ dependencies: "@types/express" "*" -"@types/express@*", "@types/express@^4.17.13", "@types/express@^4.17.14", "@types/express@^4.17.21", "@types/express@^4.17.7": +"@types/express@*", "@types/express@^4.17.1", "@types/express@^4.17.13", "@types/express@^4.17.14", "@types/express@^4.17.21", "@types/express@^4.17.7": version "4.17.21" resolved "https://registry.yarnpkg.com/@types/express/-/express-4.17.21.tgz#c26d4a151e60efe0084b23dc3369ebc631ed192d" integrity sha512-ejlPM315qwLpaQlQDTjPdsUFSc6ZsP4AN6AlWnogPjQ7CVi7PYF3YVz+CY3jE2pwYf7E/7HlDAN0rV2GxTG0HQ== @@ -15721,6 +15721,11 @@ expr-eval@^2.0.2: resolved "https://registry.yarnpkg.com/expr-eval/-/expr-eval-2.0.2.tgz#fa6f044a7b0c93fde830954eb9c5b0f7fbc7e201" integrity sha512-4EMSHGOPSwAfBiibw3ndnP0AvjDWLsMvGOvWEZ2F96IGk0bIVdjQisOHxReSkE13mHcfbuCiXw+G4y0zv6N8Eg== +express-async-handler@^1.2.0: + version "1.2.0" + resolved "https://registry.yarnpkg.com/express-async-handler/-/express-async-handler-1.2.0.tgz#ffc9896061d90f8d2e71a2d2b8668db5b0934391" + integrity sha512-rCSVtPXRmQSW8rmik/AIb2P0op6l7r1fMW538yyvTMltCO4xQEWMmobfrIxN2V1/mVrgxB8Az3reYF6yUZw37w== + express-http-context2@^1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/express-http-context2/-/express-http-context2-1.0.0.tgz#58cd9fb0d233739e0dcd7aabb766d1dc74522d77"