fix youtube transcript
This commit is contained in:
@ -1,21 +1,25 @@
|
||||
import { Storage } from '@google-cloud/storage'
|
||||
import { PromptTemplate } from '@langchain/core/prompts'
|
||||
import { OpenAI } from '@langchain/openai'
|
||||
import { parseHTML } from 'linkedom'
|
||||
import showdown from 'showdown'
|
||||
import * as stream from 'stream'
|
||||
import { Chapter, Client as YouTubeClient } from 'youtubei'
|
||||
import { LibraryItem, LibraryItemState } from '../entity/library_item'
|
||||
import { env } from '../env'
|
||||
import { authTrx } from '../repository'
|
||||
import { libraryItemRepository } from '../repository/library_item'
|
||||
import { FeatureName, findGrantedFeatureByName } from '../services/features'
|
||||
import {
|
||||
findLibraryItemById,
|
||||
updateLibraryItem,
|
||||
} from '../services/library_item'
|
||||
import { OPENAI_MODEL } from '../utils/ai'
|
||||
import { enqueueProcessYouTubeTranscript } from '../utils/createTask'
|
||||
import { stringToHash } from '../utils/helpers'
|
||||
import { logger } from '../utils/logger'
|
||||
import { parsePreparedContent } from '../utils/parser'
|
||||
import {
|
||||
downloadFromBucket,
|
||||
isFileExists,
|
||||
uploadToBucket,
|
||||
} from '../utils/uploads'
|
||||
import { videoIdFromYouTubeUrl } from '../utils/youtube'
|
||||
import { OPENAI_MODEL } from '../utils/ai'
|
||||
|
||||
export interface ProcessYouTubeVideoJobData {
|
||||
userId: string
|
||||
@ -143,132 +147,44 @@ export const addTranscriptToReadableContent = async (
|
||||
originalHTML: string,
|
||||
transcriptHTML: string
|
||||
): Promise<string | undefined> => {
|
||||
const html = parseHTML(originalHTML)
|
||||
const document = parseHTML(originalHTML).document
|
||||
|
||||
const transcriptNode = html.document.querySelector(
|
||||
'#_omnivore_youtube_transcript'
|
||||
)
|
||||
const rootElement = document.querySelector('#readability-page-1')
|
||||
if (!rootElement) {
|
||||
logger.warning('no readability-page-1 element found')
|
||||
return undefined
|
||||
}
|
||||
|
||||
const transcriptNode =
|
||||
rootElement.querySelector('#_omnivore_youtube_transcript') ||
|
||||
rootElement.querySelector('._omnivore_youtube_transcript')
|
||||
|
||||
if (transcriptNode) {
|
||||
transcriptNode.innerHTML = transcriptHTML
|
||||
} else {
|
||||
const div = html.document.createElement('div')
|
||||
const div = document.createElement('div')
|
||||
div.innerHTML = transcriptHTML
|
||||
html.document.body.appendChild(div)
|
||||
}
|
||||
div.className = '_omnivore_youtube_transcript'
|
||||
|
||||
const preparedDocument = {
|
||||
document: html.document.toString(),
|
||||
pageInfo: {},
|
||||
}
|
||||
const updatedContent = await parsePreparedContent(
|
||||
originalUrl,
|
||||
preparedDocument,
|
||||
true
|
||||
)
|
||||
return updatedContent.parsedContent?.content
|
||||
}
|
||||
|
||||
export const addTranscriptPlaceholdReadableContent = async (
|
||||
originalUrl: string,
|
||||
originalHTML: string
|
||||
): Promise<string | undefined> => {
|
||||
const html = parseHTML(originalHTML)
|
||||
|
||||
const transcriptNode = html.document.querySelector(
|
||||
'#_omnivore_youtube_transcript'
|
||||
)
|
||||
|
||||
if (transcriptNode) {
|
||||
transcriptNode.innerHTML = TRANSCRIPT_PLACEHOLDER_TEXT
|
||||
} else {
|
||||
const div = html.document.createElement('div')
|
||||
div.innerHTML = TRANSCRIPT_PLACEHOLDER_TEXT
|
||||
html.document.body.appendChild(div)
|
||||
}
|
||||
|
||||
const preparedDocument = {
|
||||
document: html.document.toString(),
|
||||
pageInfo: {},
|
||||
}
|
||||
const updatedContent = await parsePreparedContent(
|
||||
originalUrl,
|
||||
preparedDocument,
|
||||
true
|
||||
)
|
||||
return updatedContent.parsedContent?.content
|
||||
}
|
||||
|
||||
async function readStringFromStorage(
|
||||
bucketName: string,
|
||||
fileName: string
|
||||
): Promise<string | undefined> {
|
||||
try {
|
||||
const storage = env.fileUpload?.gcsUploadSAKeyFilePath
|
||||
? new Storage({ keyFilename: env.fileUpload.gcsUploadSAKeyFilePath })
|
||||
: new Storage()
|
||||
|
||||
const existsResponse = await storage
|
||||
.bucket(bucketName)
|
||||
.file(fileName)
|
||||
.exists()
|
||||
const exists = existsResponse[0]
|
||||
|
||||
if (!exists) {
|
||||
throw new Error(
|
||||
`File '${fileName}' does not exist in bucket '${bucketName}'.`
|
||||
)
|
||||
const videoElement = rootElement.querySelector('#_omnivore_youtube')
|
||||
if (!videoElement) {
|
||||
logger.warning('no video element found')
|
||||
return undefined
|
||||
}
|
||||
|
||||
// Download the file contents as a string
|
||||
const fileContentResponse = await storage
|
||||
.bucket(bucketName)
|
||||
.file(fileName)
|
||||
.download()
|
||||
const fileContent = fileContentResponse[0].toString()
|
||||
return fileContent
|
||||
} catch (error) {
|
||||
// This isn't a catastrophic error it just means the file doesn't exist
|
||||
logger.info('Error downloading file:', error)
|
||||
return undefined
|
||||
videoElement.appendChild(div)
|
||||
}
|
||||
}
|
||||
|
||||
const writeStringToStorage = async (
|
||||
bucketName: string,
|
||||
fileName: string,
|
||||
content: string
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const storage = env.fileUpload?.gcsUploadSAKeyFilePath
|
||||
? new Storage({ keyFilename: env.fileUpload.gcsUploadSAKeyFilePath })
|
||||
: new Storage()
|
||||
|
||||
const writableStream = storage
|
||||
.bucket(bucketName)
|
||||
.file(fileName)
|
||||
.createWriteStream()
|
||||
|
||||
// Convert the string content to a readable stream
|
||||
const readableStream = new stream.Readable()
|
||||
readableStream.push(content)
|
||||
readableStream.push(null) // Signal the end of the stream
|
||||
|
||||
// Pipe the readable stream to the writable stream to upload the file content
|
||||
await new Promise((resolve, reject) => {
|
||||
readableStream
|
||||
.pipe(writableStream)
|
||||
.on('finish', resolve)
|
||||
.on('error', reject)
|
||||
})
|
||||
|
||||
logger.info(
|
||||
`File '${fileName}' uploaded successfully to bucket '${bucketName}'.`
|
||||
)
|
||||
} catch (error) {
|
||||
logger.error('Error uploading file:', error)
|
||||
throw error
|
||||
const preparedDocument = {
|
||||
document: `<html><body>${rootElement.innerHTML}</body></html>`,
|
||||
pageInfo: {},
|
||||
}
|
||||
const updatedContent = await parsePreparedContent(
|
||||
originalUrl,
|
||||
preparedDocument,
|
||||
true
|
||||
)
|
||||
return updatedContent.parsedContent?.content
|
||||
}
|
||||
|
||||
const fetchCachedYouTubeTranscript = async (
|
||||
@ -276,13 +192,16 @@ const fetchCachedYouTubeTranscript = async (
|
||||
transcriptHash: string,
|
||||
promptHash: string
|
||||
): Promise<string | undefined> => {
|
||||
const bucketName = env.fileUpload.gcsUploadBucket
|
||||
|
||||
try {
|
||||
return await readStringFromStorage(
|
||||
bucketName,
|
||||
`youtube-transcripts/${videoId}/${transcriptHash}.${promptHash}.html`
|
||||
)
|
||||
const filePath = `youtube-transcripts/${videoId}/${transcriptHash}.${promptHash}.html`
|
||||
const exists = await isFileExists(filePath)
|
||||
if (!exists) {
|
||||
logger.info(`cached transcript not found: ${filePath}`)
|
||||
return undefined
|
||||
}
|
||||
|
||||
const buffer = await downloadFromBucket(filePath)
|
||||
return buffer.toString()
|
||||
} catch (err) {
|
||||
logger.info(`unable to fetch cached transcript`, { error: err })
|
||||
}
|
||||
@ -296,124 +215,105 @@ const cacheYouTubeTranscript = async (
|
||||
promptHash: string,
|
||||
transcript: string
|
||||
): Promise<void> => {
|
||||
const bucketName = env.fileUpload.gcsUploadBucket
|
||||
|
||||
try {
|
||||
await writeStringToStorage(
|
||||
bucketName,
|
||||
`youtube-transcripts/${videoId}/${transcriptHash}.${promptHash}.html`,
|
||||
transcript
|
||||
)
|
||||
} catch (err) {
|
||||
logger.info(`unable to cache transcript`, { error: err })
|
||||
}
|
||||
await uploadToBucket(
|
||||
`youtube-transcripts/${videoId}/${transcriptHash}.${promptHash}.html`,
|
||||
Buffer.from(transcript)
|
||||
)
|
||||
}
|
||||
|
||||
export const processYouTubeVideo = async (
|
||||
jobData: ProcessYouTubeVideoJobData
|
||||
) => {
|
||||
let videoURL: URL | undefined
|
||||
try {
|
||||
const libraryItem = await authTrx(
|
||||
async (tx) =>
|
||||
tx
|
||||
.withRepository(libraryItemRepository)
|
||||
.findById(jobData.libraryItemId),
|
||||
undefined,
|
||||
const libraryItem = await findLibraryItemById(
|
||||
jobData.libraryItemId,
|
||||
jobData.userId,
|
||||
{
|
||||
select: [
|
||||
'id',
|
||||
'originalUrl',
|
||||
'description',
|
||||
'wordCount',
|
||||
'publishedAt',
|
||||
'state',
|
||||
'readableContent',
|
||||
],
|
||||
}
|
||||
)
|
||||
if (!libraryItem || libraryItem.state !== LibraryItemState.Succeeded) {
|
||||
logger.info(
|
||||
`Not ready to get YouTube metadata job state: ${
|
||||
libraryItem?.state ?? 'null'
|
||||
}`
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
const videoURL = new URL(libraryItem.originalUrl)
|
||||
const videoId = videoIdFromYouTubeUrl(videoURL.href)
|
||||
|
||||
if (!videoId) {
|
||||
logger.warning('no video id for supplied youtube url', {
|
||||
url: libraryItem.originalUrl,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
const updatedLibraryItem: Partial<LibraryItem> = {}
|
||||
const youtube = new YouTubeClient()
|
||||
const video = await youtube.getVideo(videoId)
|
||||
if (!video) {
|
||||
logger.warning('no video found for youtube url', {
|
||||
url: libraryItem.originalUrl,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
if (video.description && libraryItem.description !== video.description) {
|
||||
updatedLibraryItem.description = video.description
|
||||
}
|
||||
|
||||
let duration = -1
|
||||
if ('duration' in video && video.duration > 0) {
|
||||
updatedLibraryItem.wordCount = calculateWordCount(video.duration)
|
||||
duration = video.duration
|
||||
}
|
||||
|
||||
if (video.uploadDate && !Number.isNaN(Date.parse(video.uploadDate))) {
|
||||
updatedLibraryItem.publishedAt = new Date(video.uploadDate)
|
||||
}
|
||||
|
||||
if (
|
||||
await findGrantedFeatureByName(
|
||||
FeatureName.YouTubeTranscripts,
|
||||
jobData.userId
|
||||
)
|
||||
if (!libraryItem || libraryItem.state !== LibraryItemState.Succeeded) {
|
||||
logger.info(
|
||||
`Not ready to get YouTube metadata job state: ${
|
||||
libraryItem?.state ?? 'null'
|
||||
}`
|
||||
) {
|
||||
if ('getTranscript' in video && duration > 0 && duration < 1801) {
|
||||
// If the video has a transcript available, put a placehold in and
|
||||
// enqueue a job to process the full transcript
|
||||
const updatedContent = await addTranscriptToReadableContent(
|
||||
libraryItem.originalUrl,
|
||||
libraryItem.readableContent,
|
||||
TRANSCRIPT_PLACEHOLDER_TEXT
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
videoURL = new URL(libraryItem.originalUrl)
|
||||
const videoId = videoIdFromYouTubeUrl(libraryItem.originalUrl)
|
||||
|
||||
if (!videoId) {
|
||||
logger.warning('no video id for supplied youtube url', {
|
||||
url: libraryItem.originalUrl,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
let needsUpdate = false
|
||||
const youtube = new YouTubeClient()
|
||||
const video = await youtube.getVideo(videoId)
|
||||
if (!video) {
|
||||
logger.warning('no video found for youtube url', {
|
||||
url: libraryItem.originalUrl,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
if (video.description && libraryItem.description !== video.description) {
|
||||
needsUpdate = true
|
||||
libraryItem.description = video.description
|
||||
}
|
||||
|
||||
let duration = -1
|
||||
if ('duration' in video && video.duration > 0) {
|
||||
needsUpdate = true
|
||||
libraryItem.wordCount = calculateWordCount(video.duration)
|
||||
duration = video.duration
|
||||
}
|
||||
|
||||
if (video.uploadDate && !Number.isNaN(Date.parse(video.uploadDate))) {
|
||||
needsUpdate = true
|
||||
libraryItem.publishedAt = new Date(video.uploadDate)
|
||||
}
|
||||
|
||||
if (
|
||||
await findGrantedFeatureByName(
|
||||
FeatureName.YouTubeTranscripts,
|
||||
jobData.userId
|
||||
)
|
||||
) {
|
||||
if ('getTranscript' in video && duration > 0 && duration < 1801) {
|
||||
// If the video has a transcript available, put a placehold in and
|
||||
// enqueue a job to process the full transcript
|
||||
const updatedContent = await addTranscriptPlaceholdReadableContent(
|
||||
libraryItem.originalUrl,
|
||||
libraryItem.readableContent
|
||||
)
|
||||
|
||||
if (updatedContent) {
|
||||
needsUpdate = true
|
||||
libraryItem.readableContent = updatedContent
|
||||
}
|
||||
|
||||
await enqueueProcessYouTubeTranscript({
|
||||
videoId,
|
||||
...jobData,
|
||||
})
|
||||
if (updatedContent) {
|
||||
updatedLibraryItem.readableContent = updatedContent
|
||||
}
|
||||
}
|
||||
|
||||
if (needsUpdate) {
|
||||
const updated = await authTrx(
|
||||
async (t) => {
|
||||
return t
|
||||
.getRepository(LibraryItem)
|
||||
.update(jobData.libraryItemId, libraryItem)
|
||||
},
|
||||
undefined,
|
||||
jobData.userId
|
||||
)
|
||||
if (!updated) {
|
||||
logger.warning('could not updated library item')
|
||||
}
|
||||
await enqueueProcessYouTubeTranscript({
|
||||
videoId,
|
||||
...jobData,
|
||||
})
|
||||
}
|
||||
} catch (err) {
|
||||
logger.warning('error getting youtube metadata: ', {
|
||||
err,
|
||||
jobData,
|
||||
videoURL,
|
||||
})
|
||||
}
|
||||
|
||||
if (updatedLibraryItem !== {}) {
|
||||
await updateLibraryItem(
|
||||
jobData.libraryItemId,
|
||||
updatedLibraryItem,
|
||||
jobData.userId
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@ -426,79 +326,63 @@ export interface ProcessYouTubeTranscriptJobData {
|
||||
export const processYouTubeTranscript = async (
|
||||
jobData: ProcessYouTubeTranscriptJobData
|
||||
) => {
|
||||
try {
|
||||
const libraryItem = await authTrx(
|
||||
async (tx) =>
|
||||
tx
|
||||
.withRepository(libraryItemRepository)
|
||||
.findById(jobData.libraryItemId),
|
||||
undefined,
|
||||
jobData.userId
|
||||
const libraryItem = await findLibraryItemById(
|
||||
jobData.libraryItemId,
|
||||
jobData.userId,
|
||||
{
|
||||
select: ['id', 'originalUrl', 'readableContent', 'state'],
|
||||
}
|
||||
)
|
||||
if (!libraryItem || libraryItem.state !== LibraryItemState.Succeeded) {
|
||||
logger.info(
|
||||
`Not ready to get YouTube metadata job state: ${
|
||||
libraryItem?.state ?? 'null'
|
||||
}`
|
||||
)
|
||||
if (!libraryItem || libraryItem.state !== LibraryItemState.Succeeded) {
|
||||
logger.info(
|
||||
`Not ready to get YouTube metadata job state: ${
|
||||
libraryItem?.state ?? 'null'
|
||||
}`
|
||||
)
|
||||
return
|
||||
return
|
||||
}
|
||||
|
||||
const youtube = new YouTubeClient()
|
||||
const video = await youtube.getVideo(jobData.videoId)
|
||||
if (!video) {
|
||||
logger.warning('no video found for youtube url', {
|
||||
url: libraryItem.originalUrl,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
let chapters: Chapter[] = []
|
||||
if ('chapters' in video) {
|
||||
chapters = video.chapters
|
||||
}
|
||||
|
||||
let transcript: TranscriptProperties[] | undefined = undefined
|
||||
if ('getTranscript' in video) {
|
||||
transcript = await video.getTranscript()
|
||||
}
|
||||
|
||||
if (transcript) {
|
||||
if (chapters) {
|
||||
transcript = addTranscriptChapters(chapters, transcript)
|
||||
}
|
||||
const transcriptHTML = await createTranscriptHTML(
|
||||
jobData.videoId,
|
||||
transcript
|
||||
)
|
||||
const updatedContent = await addTranscriptToReadableContent(
|
||||
libraryItem.originalUrl,
|
||||
libraryItem.readableContent,
|
||||
transcriptHTML
|
||||
)
|
||||
|
||||
let needsUpdate = false
|
||||
const youtube = new YouTubeClient()
|
||||
const video = await youtube.getVideo(jobData.videoId)
|
||||
if (!video) {
|
||||
logger.warning('no video found for youtube url', {
|
||||
url: libraryItem.originalUrl,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
let chapters: Chapter[] = []
|
||||
if ('chapters' in video) {
|
||||
chapters = video.chapters
|
||||
}
|
||||
|
||||
let transcript: TranscriptProperties[] | undefined = undefined
|
||||
if ('getTranscript' in video) {
|
||||
transcript = await video.getTranscript()
|
||||
}
|
||||
|
||||
if (transcript) {
|
||||
if (chapters) {
|
||||
transcript = addTranscriptChapters(chapters, transcript)
|
||||
}
|
||||
const transcriptHTML = await createTranscriptHTML(
|
||||
jobData.videoId,
|
||||
transcript
|
||||
)
|
||||
const updatedContent = await addTranscriptToReadableContent(
|
||||
libraryItem.originalUrl,
|
||||
libraryItem.readableContent,
|
||||
transcriptHTML
|
||||
)
|
||||
|
||||
if (updatedContent) {
|
||||
needsUpdate = true
|
||||
libraryItem.readableContent = updatedContent
|
||||
}
|
||||
}
|
||||
|
||||
if (needsUpdate) {
|
||||
const updated = await authTrx(
|
||||
async (t) => {
|
||||
return t
|
||||
.getRepository(LibraryItem)
|
||||
.update(jobData.libraryItemId, libraryItem)
|
||||
if (updatedContent) {
|
||||
await updateLibraryItem(
|
||||
jobData.libraryItemId,
|
||||
{
|
||||
readableContent: updatedContent,
|
||||
},
|
||||
undefined,
|
||||
jobData.userId
|
||||
)
|
||||
if (!updated) {
|
||||
logger.warning('could not updated library item')
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
logger.warning('error getting youtube transcript: ', { err, jobData })
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user