From dc6c047aec8b532ce371853064583aaec59aa12b Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Thu, 14 Mar 2024 16:12:06 +0800 Subject: [PATCH] Add GCS cache --- .../api/src/jobs/process-youtube-video.ts | 152 +++++++++++++++++- 1 file changed, 150 insertions(+), 2 deletions(-) diff --git a/packages/api/src/jobs/process-youtube-video.ts b/packages/api/src/jobs/process-youtube-video.ts index 76bacf23f..7c430fa86 100644 --- a/packages/api/src/jobs/process-youtube-video.ts +++ b/packages/api/src/jobs/process-youtube-video.ts @@ -10,6 +10,11 @@ import { parsePreparedContent } from '../utils/parser' import { OpenAI } from '@langchain/openai' import { PromptTemplate } from '@langchain/core/prompts' import { enqueueProcessYouTubeTranscript } from '../utils/createTask' +import { env } from '../env' +import * as stream from 'stream' + +import { Storage } from '@google-cloud/storage' +import { stringToHash } from '../utils/helpers' export interface ProcessYouTubeVideoJobData { userId: string @@ -68,11 +73,29 @@ export const addTranscriptChapters = ( return transcript } +const createTranscriptHash = (transcript: TranscriptProperties[]): string => { + const rawTranscript = transcript.map((item) => item.text).join(' ') + return stringToHash(rawTranscript) +} + export const createTranscriptHTML = async ( + videoId: string, transcript: TranscriptProperties[] ): Promise => { let transcriptMarkdown = '' + const transcriptHash = createTranscriptHash(transcript) + const promptHash = stringToHash(process.env.YOUTUBE_TRANSCRIPT_PROMPT ?? '') + if (process.env.YOUTUBE_TRANSCRIPT_PROMPT && process.env.OPENAI_API_KEY) { + const cachedTranscriptHTML = await fetchCachedYouTubeTranscript( + videoId, + transcriptHash, + promptHash + ) + if (cachedTranscriptHTML) { + return cachedTranscriptHTML + } + const llm = new OpenAI({ modelName: 'gpt-4', configuration: { @@ -121,7 +144,18 @@ export const createTranscriptHTML = async ( const converter = new showdown.Converter({ backslashEscapesHTMLTags: true, }) - return converter.makeHtml(transcriptMarkdown) + const transcriptHTML = converter.makeHtml(transcriptMarkdown) + + if (process.env.YOUTUBE_TRANSCRIPT_PROMPT && process.env.OPENAI_API_KEY) { + await cacheYouTubeTranscript( + videoId, + transcriptHash, + promptHash, + transcriptHTML + ) + } + + return transcriptHTML } export const addTranscriptToReadableContent = async ( @@ -185,6 +219,117 @@ export const addTranscriptPlaceholdReadableContent = async ( return updatedContent.parsedContent?.content } +async function readStringFromStorage( + bucketName: string, + fileName: string +): Promise { + try { + const storage = env.fileUpload?.gcsUploadSAKeyFilePath + ? new Storage({ keyFilename: env.fileUpload.gcsUploadSAKeyFilePath }) + : new Storage() + + const existsResponse = await storage + .bucket(bucketName) + .file(fileName) + .exists() + const exists = existsResponse[0] + + if (!exists) { + throw new Error( + `File '${fileName}' does not exist in bucket '${bucketName}'.` + ) + } + + // Download the file contents as a string + const fileContentResponse = await storage + .bucket(bucketName) + .file(fileName) + .download() + const fileContent = fileContentResponse[0].toString() + + console.log(`File '${fileName}' downloaded successfully as string.`) + return fileContent + } catch (error) { + console.error('Error downloading file:', error) + throw error + } +} + +const writeStringToStorage = async ( + bucketName: string, + fileName: string, + content: string +): Promise => { + try { + const storage = env.fileUpload?.gcsUploadSAKeyFilePath + ? new Storage({ keyFilename: env.fileUpload.gcsUploadSAKeyFilePath }) + : new Storage() + + const writableStream = storage + .bucket(bucketName) + .file(fileName) + .createWriteStream() + + // Convert the string content to a readable stream + const readableStream = new stream.Readable() + readableStream.push(content) + readableStream.push(null) // Signal the end of the stream + + // Pipe the readable stream to the writable stream to upload the file content + await new Promise((resolve, reject) => { + readableStream + .pipe(writableStream) + .on('finish', resolve) + .on('error', reject) + }) + + console.log( + `File '${fileName}' uploaded successfully to bucket '${bucketName}'.` + ) + } catch (error) { + console.error('Error uploading file:', error) + throw error + } +} + +const fetchCachedYouTubeTranscript = async ( + videoId: string, + transcriptHash: string, + promptHash: string +): Promise => { + const bucketName = env.fileUpload.gcsUploadBucket + + try { + return await readStringFromStorage( + bucketName, + `youtube-transcripts/${videoId}/${transcriptHash}.${promptHash}.html` + ) + } catch (err) { + logger.info(`unable to fetch cached transcript: ${err}`) + } + + return undefined +} + +const cacheYouTubeTranscript = async ( + videoId: string, + transcriptHash: string, + promptHash: string, + transcript: string +): Promise => { + const bucketName = env.fileUpload.gcsUploadBucket + + try { + await writeStringToStorage( + bucketName, + `youtube-transcripts/${videoId}/${transcriptHash}.${promptHash}.html`, + transcript + ) + } catch (err) { + logger.info(`unable to cache transcript: ${err}`) + } +} + export const processYouTubeVideo = async ( jobData: ProcessYouTubeVideoJobData ) => { @@ -335,7 +480,10 @@ export const processYouTubeTranscript = async ( if (chapters) { transcript = addTranscriptChapters(chapters, transcript) } - const transcriptHTML = await createTranscriptHTML(transcript) + const transcriptHTML = await createTranscriptHTML( + jobData.videoId, + transcript + ) const updatedContent = await addTranscriptToReadableContent( libraryItem.originalUrl, libraryItem.originalContent,