From bbe6204b91092d65961f1b2c39eaadc5f8915074 Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Tue, 27 Feb 2024 09:42:03 +0800 Subject: [PATCH] summaries --- packages/api/package.json | 1 + packages/api/src/pubsub.ts | 13 +++++++++++++ packages/api/src/queue-processor.ts | 3 +++ packages/api/src/services/features.ts | 2 +- packages/api/src/utils/createTask.ts | 14 ++++++++++++++ 5 files changed, 32 insertions(+), 1 deletion(-) diff --git a/packages/api/package.json b/packages/api/package.json index 1c45f926e..b8c605609 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -77,6 +77,7 @@ "ioredis": "^5.3.2", "jsonwebtoken": "^8.5.1", "jwks-rsa": "^2.0.3", + "langchain": "^0.1.21", "linkedom": "^0.14.9", "lodash": "^4.17.21", "luxon": "^3.2.1", diff --git a/packages/api/src/pubsub.ts b/packages/api/src/pubsub.ts index 5ca9d2693..7bf25d921 100644 --- a/packages/api/src/pubsub.ts +++ b/packages/api/src/pubsub.ts @@ -5,12 +5,18 @@ import { env } from './env' import { ReportType } from './generated/graphql' import { Merge } from './util' import { + enqueueAISummarizeJob, enqueueExportItem, enqueueTriggerRuleJob, enqueueWebhookJob, } from './utils/createTask' import { deepDelete } from './utils/helpers' import { buildLogger } from './utils/logger' +import { + FeatureName, + findFeatureByName, + getFeatureName, +} from './services/features' const logger = buildLogger('pubsub') @@ -82,6 +88,13 @@ export const createPubSubClient = (): PubsubClient => { data, }) + if (await findFeatureByName(FeatureName.AISummaries, userId)) { + await enqueueAISummarizeJob({ + userId, + libraryItemId, + }) + } + return publish( 'entityCreated', Buffer.from(JSON.stringify({ type, userId, ...cleanData })) diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index cbd06787b..6cd13b8a0 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -43,6 +43,7 @@ import { redisDataSource } from './redis_data_source' import { CACHED_READING_POSITION_PREFIX } from './services/cached_reading_position' import { getJobPriority } from './utils/createTask' import { logger } from './utils/logger' +import { AI_SUMMARIZE_JOB_NAME, aiSummarize } from './jobs/ai-summarize' export const QUEUE_NAME = 'omnivore-backend-queue' export const JOB_VERSION = 'v001' @@ -113,6 +114,8 @@ export const createWorker = (connection: ConnectionOptions) => return callWebhook(job.data) case EXPORT_ITEM_JOB_NAME: return exportItem(job.data) + case AI_SUMMARIZE_JOB_NAME: + return aiSummarize(job.data) case EXPORT_ALL_ITEMS_JOB_NAME: return exportAllItems(job.data) } diff --git a/packages/api/src/services/features.ts b/packages/api/src/services/features.ts index e0a9b337a..3d91115c1 100644 --- a/packages/api/src/services/features.ts +++ b/packages/api/src/services/features.ts @@ -7,6 +7,7 @@ import { getRepository } from '../repository' import { logger } from '../utils/logger' export enum FeatureName { + AISummaries = 'ai-summaries', UltraRealisticVoice = 'ultra-realistic-voice', } @@ -21,7 +22,6 @@ export const optInFeature = async ( if (name === FeatureName.UltraRealisticVoice) { return optInUltraRealisticVoice(uid) } - return undefined } diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index 95777a240..e63cd6c7f 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -44,6 +44,7 @@ import { CreateTaskError } from './errors' import { stringToHash } from './helpers' import { logger } from './logger' import View = google.cloud.tasks.v2.Task.View +import { AISummarizeJobData, AI_SUMMARIZE_JOB_NAME } from '../jobs/ai-summarize' // Instantiates a client. const client = new CloudTasksClient() @@ -66,6 +67,7 @@ export const getJobPriority = (jobName: string): number => { case TRIGGER_RULE_JOB_NAME: case CALL_WEBHOOK_JOB_NAME: case EXPORT_ITEM_JOB_NAME: + case AI_SUMMARIZE_JOB_NAME: return 5 case BULK_ACTION_JOB_NAME: case `${REFRESH_FEED_JOB_NAME}_high`: @@ -694,6 +696,18 @@ export const enqueueWebhookJob = async (data: CallWebhookJobData) => { }) } +export const enqueueAISummarizeJob = async (data: AISummarizeJobData) => { + const queue = await getBackendQueue() + if (!queue) { + return undefined + } + + return queue.add(AI_SUMMARIZE_JOB_NAME, data, { + priority: getJobPriority(AI_SUMMARIZE_JOB_NAME), + attempts: 3, + }) +} + export const bulkEnqueueUpdateLabels = async (data: UpdateLabelsData[]) => { const queue = await getBackendQueue() if (!queue) {