diff --git a/packages/api/package.json b/packages/api/package.json index 469586434..cea023808 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -113,6 +113,7 @@ "voca": "^1.4.0", "winston": "^3.3.3", "word-counting": "^1.1.4", + "yaml": "^2.4.1", "youtubei": "1.4.0" }, "devDependencies": { diff --git a/packages/api/src/jobs/ai/create_digest.ts b/packages/api/src/jobs/ai/create_digest.ts index 2026e73ee..feb12f78d 100644 --- a/packages/api/src/jobs/ai/create_digest.ts +++ b/packages/api/src/jobs/ai/create_digest.ts @@ -4,17 +4,68 @@ import { v4 as uuid } from 'uuid' import { OpenAI } from '@langchain/openai' import { PromptTemplate } from '@langchain/core/prompts' import { LibraryItem } from '../../entity/library_item' -import { CreateDigestJobData } from '../../services/digest' -import { htmlToSsmlItems } from '@omnivore/text-to-speech-handler' +import { htmlToSpeechFile, SpeechFile } from '@omnivore/text-to-speech-handler' +import axios from 'axios' +import { searchLibraryItems } from '../../services/library_item' +import { redisDataSource } from '../../redis_data_source' +import { htmlToMarkdown } from '../../utils/parser' +import yaml from 'yaml' +import { JsonOutputParser } from '@langchain/core/output_parsers' +import showdown from 'showdown' +import { Digest, writeDigest } from '../../services/digest' -const USER_PROFILE_PROMPT = - 'Create a user profile based on the supplied titles\n\ntitles:\n{titles}' +interface Selector { + query: string + count: number + reason: string +} -const SUMMARIZE_PROMPT = - 'Summarize the supplied article.\n\ntitle: {title}\nauthor: {author}\ncontent: {content}' +interface ZeroShotDefinition { + userPreferencesProfilePrompt: string + rankPrompt: string +} -// TODO: Makes multiple DB queries and combines the results -const getPreferencesList = (userId: string): Promise => { +interface DigestDefinition { + name: string + preferenceSelectors: Selector[] + candidateSelectors: Selector[] + contentFeaturesPrompt: string + contentRatingPrompt: string + summaryPrompt: string + assemblePrompt: string + + zeroShot: ZeroShotDefinition +} + +export interface CreateDigestJobData { + userId: string +} + +export interface CreateDigestJobResponse { + jobId: string +} + +export const CREATE_DIGEST_JOB = 'create-digest' + +let digestDefinition: DigestDefinition + +const fetchDigestDefinition = async (): Promise => { + const promptFileUrl = process.env.PROMPT_FILE_URL + if (!promptFileUrl) { + const msg = 'PROMPT_FILE_URL not set' + logger.error(msg) + throw new Error(msg) + } + + // fetch the yaml file + const response = await axios.get(promptFileUrl) + + // parse the yaml file + return yaml.parse(response.data) as DigestDefinition +} + +// Makes multiple DB queries and combines the results +const getPreferencesList = async (userId: string): Promise => { // use the queries from the digest definitions to lookup preferences // There should be a list of multiple queries we use. For now we can // hardcode these queries: @@ -24,21 +75,72 @@ const getPreferencesList = (userId: string): Promise => { // - query: "in:all is:read OR has:highlights sort:saved-asc wordsCount:>=20" // count: 4 // reason: "some older items that were interacted with" - return Promise.resolve([]) + + const preferences = await Promise.all( + digestDefinition.preferenceSelectors.map(async (selector) => { + // use the selector to fetch items + const results = await searchLibraryItems( + { + query: selector.query, + size: selector.count, + }, + userId + ) + + return results.libraryItems + }) + ) + + // deduplicate and flatten the items + const dedupedPreferences = preferences + .flat() + .filter( + (item, index, self) => index === self.findIndex((t) => t.id === item.id) + ) + + return dedupedPreferences } -// TODO: Makes multiple DB queries and combines the results -const getCandidatesList = (userId: string): Promise => { +// Makes multiple DB queries and combines the results +const getCandidatesList = async (userId: string): Promise => { // use the queries from the digest definitions to lookup preferences // There should be a list of multiple queries we use. For now we can // hardcode these queries: // - query: "in:all is:unread saved:last24hrs sort:saved-desc wordsCount:>=500" // count: 100 // reason: "most recent 100 items saved over 500 words - return Promise.resolve([]) + + const candidates = await Promise.all( + digestDefinition.candidateSelectors.map(async (selector) => { + // use the selector to fetch items + const results = await searchLibraryItems( + { + includeContent: true, + query: selector.query, + size: selector.count, + }, + userId + ) + + return results.libraryItems + }) + ) + + // deduplicate and flatten the items + const dedupedCandidates = candidates + .flat() + .filter( + (item, index, self) => index === self.findIndex((t) => t.id === item.id) + ) + .map((item) => ({ + ...item, + readableContent: htmlToMarkdown(item.readableContent), + })) // convert the html content to markdown + + return dedupedCandidates } -// TODO: Takes a list of library items, and uses a prompt to generate +// Takes a list of library items, and uses a prompt to generate // a text representation of a user profile const createUserProfile = async ( preferences: LibraryItem[] @@ -50,39 +152,68 @@ const createUserProfile = async ( }, }) - const contextualTemplate = PromptTemplate.fromTemplate(USER_PROFILE_PROMPT) + const contextualTemplate = PromptTemplate.fromTemplate( + digestDefinition.zeroShot.userPreferencesProfilePrompt + ) const chain = contextualTemplate.pipe(llm) const result = await chain.invoke({ - titles: preferences.map((item) => `* ${item}`).join('\n'), + titles: preferences.map((item) => `* ${item.title}`).join('\n'), }) return result } -// TODO: Checks redis for a user profile, if not found creates one and writes +// Checks redis for a user profile, if not found creates one and writes // it to redis const findOrCreateUserProfile = async (userId: string): Promise => { // check redis for user profile, return if found + const key = `userProfile:${userId}` + const existingProfile = await redisDataSource.redisClient?.get(key) + if (existingProfile) { + return existingProfile + } + // if not found const preferences = await getPreferencesList(userId) const profile = await createUserProfile(preferences) - // TODO: write to redis here + + // write to redis here + await redisDataSource.redisClient?.set(key, profile) + return profile } type RankedItem = { topic: string - summary?: string + summary: string libraryItem: LibraryItem } -// TODO: Uses OpenAI to rank all the titles based on the user profiles +// Uses OpenAI to rank all the titles based on the user profiles const rankCandidates = async ( candidates: LibraryItem[], userProfile: string ): Promise => { - return Promise.resolve([]) + const llm = new OpenAI({ + modelName: 'gpt-4-0125-preview', + configuration: { + apiKey: process.env.OPENAI_API_KEY, + }, + }) + + const contextualTemplate = PromptTemplate.fromTemplate( + digestDefinition.zeroShot.rankPrompt + ) + + const outputParser = new JsonOutputParser() + const chain = contextualTemplate.pipe(llm).pipe(outputParser) + const contextStr = await chain.invoke({ + userProfile, + titles: JSON.stringify(candidates.map((item) => item.title)), + }) + + return contextStr as RankedItem[] } // Does some grouping by topic while trying to maintain ranking @@ -107,8 +238,8 @@ const chooseRankedSelections = (rankedCandidates: RankedItem[]) => { } } - console.log('rankedTopics: ', rankedTopics) - console.log('finalSelections: ', selected) + logger.info('rankedTopics: ', rankedTopics) + logger.info('finalSelections: ', selected) const finalSelections = [] @@ -117,12 +248,11 @@ const chooseRankedSelections = (rankedCandidates: RankedItem[]) => { finalSelections.push(...matches) } - console.log('finalSelections: ', finalSelections) + logger.info('finalSelections: ', finalSelections) return finalSelections } -// TODO: we could paralleize this step sending all the ranked candidates to openAI at once const summarizeItems = async ( rankedCandidates: RankedItem[] ): Promise => { @@ -133,83 +263,78 @@ const summarizeItems = async ( }, }) - for (const item of rankedCandidates) { - const contextualTemplate = PromptTemplate.fromTemplate(SUMMARIZE_PROMPT) + const contextualTemplate = PromptTemplate.fromTemplate( + digestDefinition.summaryPrompt + ) + const chain = contextualTemplate.pipe(llm) - const chain = contextualTemplate.pipe(llm) - const summary = await chain.invoke({ + // send all the ranked candidates to openAI at once in a batch + const summaries = await chain.batch( + rankedCandidates.map((item) => ({ title: item.libraryItem.title, author: item.libraryItem.author ?? '', content: item.libraryItem.readableContent, // markdown content - }) - item.summary = summary - } + })) + ) + + summaries.forEach( + (summary, index) => (rankedCandidates[index].summary = summary) + ) return rankedCandidates } -// TODO: we can use something more sophisticated to generate titles -const generateTitle = (selections: RankedItem[]): Promise => { - return Promise.resolve( - 'Omnivore digest: ' + - selections.map((item) => item.libraryItem.title).join(',') - ) +// generate speech files from the summaries +const generateSpeechFiles = (rankedItems: RankedItem[]): SpeechFile[] => { + // convert the summaries from markdown to HTML + const converter = new showdown.Converter({ + backslashEscapesHTMLTags: true, + }) + + const speechFiles = rankedItems.map((item) => { + const html = converter.makeHtml(item.summary) + return htmlToSpeechFile({ + content: html, + options: {}, + }) + }) + + return speechFiles } -// TODO: write the digest to redis here -// export interface Digest { -// jobState: string +// TODO: we should have a QA step here that does some +// basic checks to make sure the summaries are good. +const filterSummaries = (summaries: RankedItem[]): RankedItem[] => { + return summaries.filter((item) => item.summary.length > 100) +} -// url?: string -// title?: string -// content?: string -// chapters?: Chapter[] +// TODO: we can use something more sophisticated to generate titles +const generateTitle = (selections: RankedItem[]): string => + 'Omnivore digest: ' + + selections.map((item) => item.libraryItem.title).join(',') -// urlsToAudio?: string[] -// speechFiles?: SpeechFile[] -// } -// export interface SpeechFile { -// wordCount: number; -// language: string; -// defaultVoice: string; -// utterances: Utterance[]; -//} -const writeDigest = async (userId: string, selections: RankedItem[]) => { - const title = await generateTitle(selections) - const speechFiles = selections.map((selection) => { - // convert the summary item to a SpeechFile here - return { - wordCount: 0, - language: 'en', - defaultVoice: 'Josh', - utterances: [], - } - }) - const digest = { +export const createDigestJob = async (jobData: CreateDigestJobData) => { + digestDefinition = await fetchDigestDefinition() + + const candidates = await getCandidatesList(jobData.userId) + const userProfile = await findOrCreateUserProfile(jobData.userId) + const rankedCandidates = await rankCandidates(candidates, userProfile) + const selections = chooseRankedSelections(rankedCandidates) + + const summaries = await summarizeItems(selections) + + const filteredSummaries = filterSummaries(summaries) + + const speechFiles = generateSpeechFiles(filteredSummaries) + const title = generateTitle(summaries) + const digest: Digest = { id: uuid(), - title: title, + title, content: 'content', urlsToAudio: [], jobState: 'completed', - speechFiles: speechFiles, - } - // write to redis -} - -export const CreateDigestJob = async (jobData: CreateDigestJobData) => { - try { - const candidates = await getCandidatesList(jobData.userId) - const userProfile = await findOrCreateUserProfile(jobData.userId) - const rankedCandidates = await rankCandidates(candidates, userProfile) - const selections = chooseRankedSelections(rankedCandidates) - - const summaries = await summarizeItems(selections) - - // TODO: we should have a QA step here that does some - // basic checks to make sure the summaries are good. - - await writeDigest(jobData.userId, summaries) - } catch (err) { - console.log('error creating digest: ', err) + speechFiles, } + + await writeDigest(jobData.userId, digest) } diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index dac1f74d9..5f0fc6a21 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -16,6 +16,7 @@ import { appDataSource } from './data_source' import { env } from './env' import { TaskState } from './generated/graphql' import { aiSummarize, AI_SUMMARIZE_JOB_NAME } from './jobs/ai-summarize' +import { createDigestJob, CREATE_DIGEST_JOB } from './jobs/ai/create_digest' import { bulkAction, BULK_ACTION_JOB_NAME } from './jobs/bulk_action' import { callWebhook, CALL_WEBHOOK_JOB_NAME } from './jobs/call_webhook' import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail' @@ -65,7 +66,6 @@ import { } from './jobs/email/inbound_emails' export const QUEUE_NAME = 'omnivore-backend-queue' -export const LONG_RUNNING_QUEUE_NAME = 'omnivore-backend-long-running-queue' export const JOB_VERSION = 'v001' export const getBackendQueue = async ( @@ -180,6 +180,8 @@ export const createWorker = (connection: ConnectionOptions) => return saveNewsletterJob(job.data) case FORWARD_EMAIL_JOB: return forwardEmailJob(job.data) + case CREATE_DIGEST_JOB: + return createDigestJob(job.data) default: logger.warning(`[queue-processor] unhandled job: ${job.name}`) } diff --git a/packages/api/src/routers/digest_router.ts b/packages/api/src/routers/digest_router.ts index f2c5ec65d..0a7de6cf9 100644 --- a/packages/api/src/routers/digest_router.ts +++ b/packages/api/src/routers/digest_router.ts @@ -1,13 +1,9 @@ import cors from 'cors' import express from 'express' import { env } from '../env' -import { - createJobId, - getJob, - jobStateToTaskState, - LONG_RUNNING_QUEUE_NAME, -} from '../queue-processor' -import { CREATE_DIGEST_JOB, getDigest } from '../services/digest' +import { CREATE_DIGEST_JOB } from '../jobs/ai/create_digest' +import { createJobId, getJob, jobStateToTaskState } from '../queue-processor' +import { getDigest } from '../services/digest' import { findActiveUser } from '../services/user' import { analytics } from '../utils/analytics' import { getClaimsByToken, getTokenByRequest } from '../utils/auth' @@ -70,7 +66,7 @@ export function digestRouter() { // if yes then return 202 accepted // else enqueue job const jobId = createJobId(CREATE_DIGEST_JOB, userId) - const existingJob = await getJob(jobId, LONG_RUNNING_QUEUE_NAME) + const existingJob = await getJob(jobId) if (existingJob) { logger.info(`Job already in queue: ${jobId}`) return res.sendStatus(202) @@ -118,7 +114,7 @@ export function digestRouter() { // get job by user id const jobId = createJobId(CREATE_DIGEST_JOB, userId) - const job = await getJob(jobId, LONG_RUNNING_QUEUE_NAME) + const job = await getJob(jobId) if (job) { // if job is in queue then return job state const jobState = await job.getState() diff --git a/packages/api/src/services/digest.ts b/packages/api/src/services/digest.ts index a41bb23f8..271e79765 100644 --- a/packages/api/src/services/digest.ts +++ b/packages/api/src/services/digest.ts @@ -1,17 +1,9 @@ import { redisDataSource } from '../redis_data_source' import { SpeechFile } from '@omnivore/text-to-speech-handler' - -export const CREATE_DIGEST_JOB = 'create-digest' - -export interface CreateDigestJobData { - userId: string -} - -export interface CreateDigestJobResponse { - jobId: string -} +import { logger } from '../utils/logger' export interface Digest { + id: string jobState: string url?: string @@ -30,42 +22,20 @@ interface Chapter { const digestKey = (userId: string) => `digest:${userId}` export const getDigest = async (userId: string): Promise => { - await redisDataSource.redisClient?.set( - digestKey(userId), - JSON.stringify({ - id: 'BB3D5D89-70A2-4AE1-ADDC-713232B1281D', - title: - 'SOTU response collapses, Trump hits new low, Biden fundraising explodes 3/11/24 TDPS Podcast', - content: 'content', - urlsToAudio: [], - jobState: 'completed', - speechFiles: [ - { - pageId: '1234', - wordCount: 2124, - language: 'en-US', - defaultVoice: 'en-US-ChristopherNeural', - utterances: [ - { - idx: '', - text: 'TOP prospect JOINS Canucks - Team SPEAKS OUT on Demko Injury | Canucks News', - wordOffset: 0, - wordCount: 14, - voice: 'en-US-ChristopherNeural', - }, - { - idx: '4', - text: 'Intro', - wordOffset: 14, - wordCount: 1, - voice: 'en-US-ChristopherNeural', - }, - ], - }, - ], - }) - ) - const digest = await redisDataSource.redisClient?.get(digestKey(userId)) return digest ? (JSON.parse(digest) as Digest) : null } + +export const writeDigest = async (userId: string, digest: Digest) => { + // write to redis + const result = await redisDataSource.redisClient?.set( + digestKey(userId), + JSON.stringify(digest) + ) + + if (!result) { + const msg = `Error while writing digest to redis: ${userId}` + logger.error(msg) + throw new Error(msg) + } +} diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index d64471252..cb835da13 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -16,6 +16,11 @@ import { CreateLabelInput, } from '../generated/graphql' import { AISummarizeJobData, AI_SUMMARIZE_JOB_NAME } from '../jobs/ai-summarize' +import { + CreateDigestJobData, + CreateDigestJobResponse, + CREATE_DIGEST_JOB, +} from '../jobs/ai/create_digest' import { BulkActionData, BULK_ACTION_JOB_NAME } from '../jobs/bulk_action' import { CallWebhookJobData, CALL_WEBHOOK_JOB_NAME } from '../jobs/call_webhook' import { THUMBNAIL_JOB } from '../jobs/find_thumbnail' @@ -44,18 +49,8 @@ import { UPDATE_HIGHLIGHT_JOB, UPDATE_LABELS_JOB, } from '../jobs/update_db' -import { - createJobId, - getBackendQueue, - JOB_VERSION, - LONG_RUNNING_QUEUE_NAME, -} from '../queue-processor' +import { createJobId, getBackendQueue, JOB_VERSION } from '../queue-processor' import { redisDataSource } from '../redis_data_source' -import { - CreateDigestJobData, - CreateDigestJobResponse, - CREATE_DIGEST_JOB, -} from '../services/digest' import { signFeatureToken } from '../services/features' import { OmnivoreAuthorizationHeader } from './auth' import { CreateTaskError } from './errors' @@ -93,6 +88,7 @@ export const getJobPriority = (jobName: string): number => { case PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME: case `${REFRESH_FEED_JOB_NAME}_low`: case EXPORT_ITEM_JOB_NAME: + case CREATE_DIGEST_JOB: return 50 case EXPORT_ALL_ITEMS_JOB_NAME: case REFRESH_ALL_FEEDS_JOB_NAME: @@ -865,7 +861,7 @@ export const enqueueSendEmail = async (jobData: SendEmailJobData) => { export const enqueueCreateDigest = async ( data: CreateDigestJobData ): Promise => { - const queue = await getBackendQueue(LONG_RUNNING_QUEUE_NAME) + const queue = await getBackendQueue() if (!queue) { throw new Error('No queue found') } @@ -876,6 +872,7 @@ export const enqueueCreateDigest = async ( removeOnComplete: true, removeOnFail: true, attempts: 3, + priority: getJobPriority(CREATE_DIGEST_JOB), }) logger.info('create digest job enqueued', { jobId: job.id }) diff --git a/packages/api/src/utils/parser.ts b/packages/api/src/utils/parser.ts index 395363634..3122a72bd 100644 --- a/packages/api/src/utils/parser.ts +++ b/packages/api/src/utils/parser.ts @@ -703,6 +703,10 @@ export const htmlToMarkdown = (html: string) => { return nhm.translate(/* html */ html) } +export const markdownToHtml = (markdown: string) => { + return nhm.translate(/* markdown */ markdown) +} + export const getDistillerResult = async ( uid: string, html: string diff --git a/yarn.lock b/yarn.lock index 034c6ee5c..9c651f55f 100644 --- a/yarn.lock +++ b/yarn.lock @@ -32048,6 +32048,11 @@ yaml@^2.2.1: resolved "https://registry.yarnpkg.com/yaml/-/yaml-2.4.0.tgz#2376db1083d157f4b3a452995803dbcf43b08140" integrity sha512-j9iR8g+/t0lArF4V6NE/QCfT+CO7iLqrXAHZbJdo+LfjqP1vR8Fg5bSiaq6Q2lOD1AUEVrEVIgABvBFYojJVYQ== +yaml@^2.4.1: + version "2.4.1" + resolved "https://registry.yarnpkg.com/yaml/-/yaml-2.4.1.tgz#2e57e0b5e995292c25c75d2658f0664765210eed" + integrity sha512-pIXzoImaqmfOrL7teGUBt/T7ZDnyeGBWyXQBvOVhLkWLN37GXv8NMLK406UY6dS51JfcQHsmcW5cJ441bHg6Lg== + yargs-parser@20.2.4: version "20.2.4" resolved "https://registry.yarnpkg.com/yargs-parser/-/yargs-parser-20.2.4.tgz#b42890f14566796f85ae8e3a25290d205f154a54"