diff --git a/packages/api/src/jobs/process-youtube-video.ts b/packages/api/src/jobs/process-youtube-video.ts index 427444959..916f7fdb0 100644 --- a/packages/api/src/jobs/process-youtube-video.ts +++ b/packages/api/src/jobs/process-youtube-video.ts @@ -1,21 +1,25 @@ -import { Storage } from '@google-cloud/storage' import { PromptTemplate } from '@langchain/core/prompts' import { OpenAI } from '@langchain/openai' import { parseHTML } from 'linkedom' import showdown from 'showdown' -import * as stream from 'stream' import { Chapter, Client as YouTubeClient } from 'youtubei' import { LibraryItem, LibraryItemState } from '../entity/library_item' -import { env } from '../env' -import { authTrx } from '../repository' -import { libraryItemRepository } from '../repository/library_item' import { FeatureName, findGrantedFeatureByName } from '../services/features' +import { + findLibraryItemById, + updateLibraryItem, +} from '../services/library_item' +import { OPENAI_MODEL } from '../utils/ai' import { enqueueProcessYouTubeTranscript } from '../utils/createTask' import { stringToHash } from '../utils/helpers' import { logger } from '../utils/logger' import { parsePreparedContent } from '../utils/parser' +import { + downloadFromBucket, + isFileExists, + uploadToBucket, +} from '../utils/uploads' import { videoIdFromYouTubeUrl } from '../utils/youtube' -import { OPENAI_MODEL } from '../utils/ai' export interface ProcessYouTubeVideoJobData { userId: string @@ -143,132 +147,44 @@ export const addTranscriptToReadableContent = async ( originalHTML: string, transcriptHTML: string ): Promise => { - const html = parseHTML(originalHTML) + const document = parseHTML(originalHTML).document - const transcriptNode = html.document.querySelector( - '#_omnivore_youtube_transcript' - ) + const rootElement = document.querySelector('#readability-page-1') + if (!rootElement) { + logger.warning('no readability-page-1 element found') + return undefined + } + + const transcriptNode = + rootElement.querySelector('#_omnivore_youtube_transcript') || + rootElement.querySelector('._omnivore_youtube_transcript') if (transcriptNode) { transcriptNode.innerHTML = transcriptHTML } else { - const div = html.document.createElement('div') + const div = document.createElement('div') div.innerHTML = transcriptHTML - html.document.body.appendChild(div) - } + div.className = '_omnivore_youtube_transcript' - const preparedDocument = { - document: html.document.toString(), - pageInfo: {}, - } - const updatedContent = await parsePreparedContent( - originalUrl, - preparedDocument, - true - ) - return updatedContent.parsedContent?.content -} - -export const addTranscriptPlaceholdReadableContent = async ( - originalUrl: string, - originalHTML: string -): Promise => { - const html = parseHTML(originalHTML) - - const transcriptNode = html.document.querySelector( - '#_omnivore_youtube_transcript' - ) - - if (transcriptNode) { - transcriptNode.innerHTML = TRANSCRIPT_PLACEHOLDER_TEXT - } else { - const div = html.document.createElement('div') - div.innerHTML = TRANSCRIPT_PLACEHOLDER_TEXT - html.document.body.appendChild(div) - } - - const preparedDocument = { - document: html.document.toString(), - pageInfo: {}, - } - const updatedContent = await parsePreparedContent( - originalUrl, - preparedDocument, - true - ) - return updatedContent.parsedContent?.content -} - -async function readStringFromStorage( - bucketName: string, - fileName: string -): Promise { - try { - const storage = env.fileUpload?.gcsUploadSAKeyFilePath - ? new Storage({ keyFilename: env.fileUpload.gcsUploadSAKeyFilePath }) - : new Storage() - - const existsResponse = await storage - .bucket(bucketName) - .file(fileName) - .exists() - const exists = existsResponse[0] - - if (!exists) { - throw new Error( - `File '${fileName}' does not exist in bucket '${bucketName}'.` - ) + const videoElement = rootElement.querySelector('#_omnivore_youtube') + if (!videoElement) { + logger.warning('no video element found') + return undefined } - // Download the file contents as a string - const fileContentResponse = await storage - .bucket(bucketName) - .file(fileName) - .download() - const fileContent = fileContentResponse[0].toString() - return fileContent - } catch (error) { - // This isn't a catastrophic error it just means the file doesn't exist - logger.info('Error downloading file:', error) - return undefined + videoElement.appendChild(div) } -} -const writeStringToStorage = async ( - bucketName: string, - fileName: string, - content: string -): Promise => { - try { - const storage = env.fileUpload?.gcsUploadSAKeyFilePath - ? new Storage({ keyFilename: env.fileUpload.gcsUploadSAKeyFilePath }) - : new Storage() - - const writableStream = storage - .bucket(bucketName) - .file(fileName) - .createWriteStream() - - // Convert the string content to a readable stream - const readableStream = new stream.Readable() - readableStream.push(content) - readableStream.push(null) // Signal the end of the stream - - // Pipe the readable stream to the writable stream to upload the file content - await new Promise((resolve, reject) => { - readableStream - .pipe(writableStream) - .on('finish', resolve) - .on('error', reject) - }) - - logger.info( - `File '${fileName}' uploaded successfully to bucket '${bucketName}'.` - ) - } catch (error) { - logger.error('Error uploading file:', error) - throw error + const preparedDocument = { + document: `${rootElement.innerHTML}`, + pageInfo: {}, } + const updatedContent = await parsePreparedContent( + originalUrl, + preparedDocument, + true + ) + return updatedContent.parsedContent?.content } const fetchCachedYouTubeTranscript = async ( @@ -276,13 +192,16 @@ const fetchCachedYouTubeTranscript = async ( transcriptHash: string, promptHash: string ): Promise => { - const bucketName = env.fileUpload.gcsUploadBucket - try { - return await readStringFromStorage( - bucketName, - `youtube-transcripts/${videoId}/${transcriptHash}.${promptHash}.html` - ) + const filePath = `youtube-transcripts/${videoId}/${transcriptHash}.${promptHash}.html` + const exists = await isFileExists(filePath) + if (!exists) { + logger.info(`cached transcript not found: ${filePath}`) + return undefined + } + + const buffer = await downloadFromBucket(filePath) + return buffer.toString() } catch (err) { logger.info(`unable to fetch cached transcript`, { error: err }) } @@ -296,124 +215,105 @@ const cacheYouTubeTranscript = async ( promptHash: string, transcript: string ): Promise => { - const bucketName = env.fileUpload.gcsUploadBucket - - try { - await writeStringToStorage( - bucketName, - `youtube-transcripts/${videoId}/${transcriptHash}.${promptHash}.html`, - transcript - ) - } catch (err) { - logger.info(`unable to cache transcript`, { error: err }) - } + await uploadToBucket( + `youtube-transcripts/${videoId}/${transcriptHash}.${promptHash}.html`, + Buffer.from(transcript) + ) } export const processYouTubeVideo = async ( jobData: ProcessYouTubeVideoJobData ) => { - let videoURL: URL | undefined - try { - const libraryItem = await authTrx( - async (tx) => - tx - .withRepository(libraryItemRepository) - .findById(jobData.libraryItemId), - undefined, + const libraryItem = await findLibraryItemById( + jobData.libraryItemId, + jobData.userId, + { + select: [ + 'id', + 'originalUrl', + 'description', + 'wordCount', + 'publishedAt', + 'state', + 'readableContent', + ], + } + ) + if (!libraryItem || libraryItem.state !== LibraryItemState.Succeeded) { + logger.info( + `Not ready to get YouTube metadata job state: ${ + libraryItem?.state ?? 'null' + }` + ) + return + } + + const videoURL = new URL(libraryItem.originalUrl) + const videoId = videoIdFromYouTubeUrl(videoURL.href) + + if (!videoId) { + logger.warning('no video id for supplied youtube url', { + url: libraryItem.originalUrl, + }) + return + } + + const updatedLibraryItem: Partial = {} + const youtube = new YouTubeClient() + const video = await youtube.getVideo(videoId) + if (!video) { + logger.warning('no video found for youtube url', { + url: libraryItem.originalUrl, + }) + return + } + + if (video.description && libraryItem.description !== video.description) { + updatedLibraryItem.description = video.description + } + + let duration = -1 + if ('duration' in video && video.duration > 0) { + updatedLibraryItem.wordCount = calculateWordCount(video.duration) + duration = video.duration + } + + if (video.uploadDate && !Number.isNaN(Date.parse(video.uploadDate))) { + updatedLibraryItem.publishedAt = new Date(video.uploadDate) + } + + if ( + await findGrantedFeatureByName( + FeatureName.YouTubeTranscripts, jobData.userId ) - if (!libraryItem || libraryItem.state !== LibraryItemState.Succeeded) { - logger.info( - `Not ready to get YouTube metadata job state: ${ - libraryItem?.state ?? 'null' - }` + ) { + if ('getTranscript' in video && duration > 0 && duration < 1801) { + // If the video has a transcript available, put a placehold in and + // enqueue a job to process the full transcript + const updatedContent = await addTranscriptToReadableContent( + libraryItem.originalUrl, + libraryItem.readableContent, + TRANSCRIPT_PLACEHOLDER_TEXT ) - return - } - videoURL = new URL(libraryItem.originalUrl) - const videoId = videoIdFromYouTubeUrl(libraryItem.originalUrl) - - if (!videoId) { - logger.warning('no video id for supplied youtube url', { - url: libraryItem.originalUrl, - }) - return - } - - let needsUpdate = false - const youtube = new YouTubeClient() - const video = await youtube.getVideo(videoId) - if (!video) { - logger.warning('no video found for youtube url', { - url: libraryItem.originalUrl, - }) - return - } - - if (video.description && libraryItem.description !== video.description) { - needsUpdate = true - libraryItem.description = video.description - } - - let duration = -1 - if ('duration' in video && video.duration > 0) { - needsUpdate = true - libraryItem.wordCount = calculateWordCount(video.duration) - duration = video.duration - } - - if (video.uploadDate && !Number.isNaN(Date.parse(video.uploadDate))) { - needsUpdate = true - libraryItem.publishedAt = new Date(video.uploadDate) - } - - if ( - await findGrantedFeatureByName( - FeatureName.YouTubeTranscripts, - jobData.userId - ) - ) { - if ('getTranscript' in video && duration > 0 && duration < 1801) { - // If the video has a transcript available, put a placehold in and - // enqueue a job to process the full transcript - const updatedContent = await addTranscriptPlaceholdReadableContent( - libraryItem.originalUrl, - libraryItem.readableContent - ) - - if (updatedContent) { - needsUpdate = true - libraryItem.readableContent = updatedContent - } - - await enqueueProcessYouTubeTranscript({ - videoId, - ...jobData, - }) + if (updatedContent) { + updatedLibraryItem.readableContent = updatedContent } - } - if (needsUpdate) { - const updated = await authTrx( - async (t) => { - return t - .getRepository(LibraryItem) - .update(jobData.libraryItemId, libraryItem) - }, - undefined, - jobData.userId - ) - if (!updated) { - logger.warning('could not updated library item') - } + await enqueueProcessYouTubeTranscript({ + videoId, + ...jobData, + }) } - } catch (err) { - logger.warning('error getting youtube metadata: ', { - err, - jobData, - videoURL, - }) + } + + if (updatedLibraryItem !== {}) { + await updateLibraryItem( + jobData.libraryItemId, + updatedLibraryItem, + jobData.userId + ) } } @@ -426,79 +326,63 @@ export interface ProcessYouTubeTranscriptJobData { export const processYouTubeTranscript = async ( jobData: ProcessYouTubeTranscriptJobData ) => { - try { - const libraryItem = await authTrx( - async (tx) => - tx - .withRepository(libraryItemRepository) - .findById(jobData.libraryItemId), - undefined, - jobData.userId + const libraryItem = await findLibraryItemById( + jobData.libraryItemId, + jobData.userId, + { + select: ['id', 'originalUrl', 'readableContent', 'state'], + } + ) + if (!libraryItem || libraryItem.state !== LibraryItemState.Succeeded) { + logger.info( + `Not ready to get YouTube metadata job state: ${ + libraryItem?.state ?? 'null' + }` ) - if (!libraryItem || libraryItem.state !== LibraryItemState.Succeeded) { - logger.info( - `Not ready to get YouTube metadata job state: ${ - libraryItem?.state ?? 'null' - }` - ) - return + return + } + + const youtube = new YouTubeClient() + const video = await youtube.getVideo(jobData.videoId) + if (!video) { + logger.warning('no video found for youtube url', { + url: libraryItem.originalUrl, + }) + return + } + + let chapters: Chapter[] = [] + if ('chapters' in video) { + chapters = video.chapters + } + + let transcript: TranscriptProperties[] | undefined = undefined + if ('getTranscript' in video) { + transcript = await video.getTranscript() + } + + if (transcript) { + if (chapters) { + transcript = addTranscriptChapters(chapters, transcript) } + const transcriptHTML = await createTranscriptHTML( + jobData.videoId, + transcript + ) + const updatedContent = await addTranscriptToReadableContent( + libraryItem.originalUrl, + libraryItem.readableContent, + transcriptHTML + ) - let needsUpdate = false - const youtube = new YouTubeClient() - const video = await youtube.getVideo(jobData.videoId) - if (!video) { - logger.warning('no video found for youtube url', { - url: libraryItem.originalUrl, - }) - return - } - - let chapters: Chapter[] = [] - if ('chapters' in video) { - chapters = video.chapters - } - - let transcript: TranscriptProperties[] | undefined = undefined - if ('getTranscript' in video) { - transcript = await video.getTranscript() - } - - if (transcript) { - if (chapters) { - transcript = addTranscriptChapters(chapters, transcript) - } - const transcriptHTML = await createTranscriptHTML( - jobData.videoId, - transcript - ) - const updatedContent = await addTranscriptToReadableContent( - libraryItem.originalUrl, - libraryItem.readableContent, - transcriptHTML - ) - - if (updatedContent) { - needsUpdate = true - libraryItem.readableContent = updatedContent - } - } - - if (needsUpdate) { - const updated = await authTrx( - async (t) => { - return t - .getRepository(LibraryItem) - .update(jobData.libraryItemId, libraryItem) + if (updatedContent) { + await updateLibraryItem( + jobData.libraryItemId, + { + readableContent: updatedContent, }, - undefined, jobData.userId ) - if (!updated) { - logger.warning('could not updated library item') - } } - } catch (err) { - logger.warning('error getting youtube transcript: ', { err, jobData }) } }