always update the state of the pdf item to succeeded/failed, even if we fail to parse the content
This commit is contained in:
@ -8,16 +8,17 @@ import { findUploadFileById, setFileUploadComplete } from './upload_file'
|
||||
|
||||
export interface UpdateContentMessage {
|
||||
fileId: string
|
||||
content: string
|
||||
content?: string
|
||||
title?: string
|
||||
author?: string
|
||||
description?: string
|
||||
state?: LibraryItemState
|
||||
}
|
||||
|
||||
export const isUpdateContentMessage = (
|
||||
data: any
|
||||
): data is UpdateContentMessage => {
|
||||
return 'fileId' in data && 'content' in data
|
||||
return 'fileId' in data
|
||||
}
|
||||
|
||||
export const updateContentForFileItem = async (msg: UpdateContentMessage) => {
|
||||
@ -51,16 +52,13 @@ export const updateContentForFileItem = async (msg: UpdateContentMessage) => {
|
||||
}
|
||||
|
||||
const itemToUpdate: QueryDeepPartialEntity<LibraryItem> = {
|
||||
...msg,
|
||||
originalContent: msg.content,
|
||||
// This event is fired after the file is fully uploaded,
|
||||
// so along with updating content, we mark it as
|
||||
// succeeded or failed based on the message state
|
||||
state: msg.state || LibraryItemState.Succeeded,
|
||||
}
|
||||
if (msg.title) itemToUpdate.title = msg.title
|
||||
if (msg.author) itemToUpdate.author = msg.author
|
||||
if (msg.description) itemToUpdate.description = msg.description
|
||||
|
||||
// This event is fired after the file is fully uploaded,
|
||||
// so along with updating content, we mark it as
|
||||
// succeeded.
|
||||
itemToUpdate.state = LibraryItemState.Succeeded
|
||||
|
||||
try {
|
||||
const uploadFileData = await setFileUploadComplete(
|
||||
@ -80,7 +78,8 @@ export const updateContentForFileItem = async (msg: UpdateContentMessage) => {
|
||||
logger.info('Updating library item text', {
|
||||
id: libraryItem.id,
|
||||
result,
|
||||
content: msg.content.substring(0, 20),
|
||||
content: msg.content?.substring(0, 20),
|
||||
state: msg.state,
|
||||
})
|
||||
|
||||
return true
|
||||
|
||||
@ -18,6 +18,7 @@ import {
|
||||
} from '../utils/uploads'
|
||||
import { validateUrl } from './create_page_save_request'
|
||||
import { createOrUpdateLibraryItem } from './library_item'
|
||||
import { v4 as uuid } from 'uuid'
|
||||
|
||||
const isFileUrl = (url: string): boolean => {
|
||||
const parsedUrl = new URL(url)
|
||||
@ -90,8 +91,18 @@ export const uploadFile = async (
|
||||
}
|
||||
}
|
||||
|
||||
let url = input.url
|
||||
|
||||
const uploadFileId = uuid()
|
||||
const uploadFilePathName = generateUploadFilePathName(uploadFileId, fileName)
|
||||
// If this is a file URL, we swap in a special URL
|
||||
if (isFileUrl(url)) {
|
||||
url = `https://omnivore.app/attachments/${uploadFilePathName}`
|
||||
}
|
||||
|
||||
const uploadFileData = await authTrx((t) =>
|
||||
t.getRepository(UploadFile).save({
|
||||
id: uploadFileId,
|
||||
url: input.url,
|
||||
user: { id: uid },
|
||||
fileName,
|
||||
@ -99,24 +110,11 @@ export const uploadFile = async (
|
||||
contentType: input.contentType,
|
||||
})
|
||||
)
|
||||
const uploadFileId = uploadFileData.id
|
||||
const uploadFilePathName = generateUploadFilePathName(uploadFileId, fileName)
|
||||
const uploadSignedUrl = await generateUploadSignedUrl(
|
||||
uploadFilePathName,
|
||||
input.contentType
|
||||
)
|
||||
|
||||
// If this is a file URL, we swap in a special URL
|
||||
const attachmentUrl = `https://omnivore.app/attachments/${uploadFilePathName}`
|
||||
if (isFileUrl(input.url)) {
|
||||
await authTrx(async (tx) => {
|
||||
await tx.getRepository(UploadFile).update(uploadFileId, {
|
||||
url: attachmentUrl,
|
||||
status: UploadFileStatus.Initialized,
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
const itemType = itemTypeForContentType(input.contentType)
|
||||
if (input.createPageEntry) {
|
||||
// If we have a file:// URL, don't try to match it
|
||||
@ -125,7 +123,7 @@ export const uploadFile = async (
|
||||
const item = await createOrUpdateLibraryItem(
|
||||
{
|
||||
id: input.clientRequestId || undefined,
|
||||
originalUrl: isFileUrl(input.url) ? attachmentUrl : input.url,
|
||||
originalUrl: url,
|
||||
user: { id: uid },
|
||||
title,
|
||||
readableContent: '',
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
import { GetSignedUrlConfig, Storage } from '@google-cloud/storage'
|
||||
import * as Sentry from '@sentry/serverless'
|
||||
import { parsePdf } from './pdf'
|
||||
import { queueUpdatePageJob } from './job'
|
||||
import { queueUpdatePageJob, State } from './job'
|
||||
|
||||
Sentry.GCPFunction.init({
|
||||
dsn: process.env.SENTRY_DSN,
|
||||
@ -50,10 +50,11 @@ const getDocumentUrl = async (
|
||||
|
||||
export const updatePageContent = async (
|
||||
fileId: string,
|
||||
content: string,
|
||||
content?: string,
|
||||
title?: string,
|
||||
author?: string,
|
||||
description?: string
|
||||
description?: string,
|
||||
state?: State
|
||||
): Promise<string | undefined> => {
|
||||
const job = await queueUpdatePageJob({
|
||||
fileId,
|
||||
@ -61,6 +62,7 @@ export const updatePageContent = async (
|
||||
title,
|
||||
author,
|
||||
description,
|
||||
state,
|
||||
})
|
||||
return job.id
|
||||
}
|
||||
@ -86,45 +88,68 @@ export const pdfHandler = Sentry.GCPFunction.wrapHttpFunction(
|
||||
if ('message' in req.body && 'data' in req.body.message) {
|
||||
const pubSubMessage = req.body.message.data as string
|
||||
const data = getStorageEventData(pubSubMessage)
|
||||
if (data) {
|
||||
try {
|
||||
if (shouldHandle(data)) {
|
||||
console.log('handling pdf data', data)
|
||||
|
||||
const url = await getDocumentUrl(data)
|
||||
console.log('PDF url: ', url)
|
||||
if (!url) {
|
||||
console.log('Could not fetch PDF', data.bucket, data.name)
|
||||
return res.status(404).send('Could not fetch PDF')
|
||||
}
|
||||
|
||||
const parsed = await parsePdf(url)
|
||||
const result = await updatePageContent(
|
||||
data.name,
|
||||
parsed.content,
|
||||
parsed.title,
|
||||
parsed.author,
|
||||
parsed.description
|
||||
)
|
||||
console.log(
|
||||
'publish result',
|
||||
result,
|
||||
'title',
|
||||
parsed.title,
|
||||
'author',
|
||||
parsed.author
|
||||
)
|
||||
} else {
|
||||
console.log('not handling pdf data', data)
|
||||
}
|
||||
} catch (err) {
|
||||
console.log('error handling event', { err, data })
|
||||
return res.status(500).send('Error handling event')
|
||||
}
|
||||
if (!data) {
|
||||
console.log('no data found in pubsub message')
|
||||
return res.send('ok')
|
||||
}
|
||||
|
||||
if (!shouldHandle(data)) {
|
||||
console.log('not handling pdf data', data)
|
||||
return res.send('ok')
|
||||
}
|
||||
|
||||
console.log('handling pdf data', data)
|
||||
|
||||
let content,
|
||||
title,
|
||||
author,
|
||||
description,
|
||||
state: State = 'SUCCEEDED' // Default to succeeded even if we fail to parse
|
||||
|
||||
try {
|
||||
const url = await getDocumentUrl(data)
|
||||
console.log('PDF url: ', url)
|
||||
if (!url) {
|
||||
console.log('Could not fetch PDF', data.bucket, data.name)
|
||||
// If we can't fetch the PDF, mark it as failed
|
||||
state = 'FAILED'
|
||||
|
||||
return res.status(404).send('Could not fetch PDF')
|
||||
}
|
||||
|
||||
// Parse the PDF to update the content and metadata
|
||||
const parsed = await parsePdf(url)
|
||||
content = parsed.content
|
||||
title = parsed.title
|
||||
author = parsed.author
|
||||
description = parsed.description
|
||||
} catch (err) {
|
||||
console.log('error parsing pdf', { err, data })
|
||||
|
||||
return res.status(500).send('Error parsing pdf')
|
||||
} finally {
|
||||
// Always update the state, even if we fail to parse
|
||||
const result = await updatePageContent(
|
||||
data.name,
|
||||
content,
|
||||
title,
|
||||
author,
|
||||
description,
|
||||
state
|
||||
)
|
||||
console.log(
|
||||
'publish result',
|
||||
result,
|
||||
'title',
|
||||
title,
|
||||
'author',
|
||||
author,
|
||||
'state',
|
||||
state
|
||||
)
|
||||
}
|
||||
} else {
|
||||
console.log('no pubsub message')
|
||||
}
|
||||
|
||||
res.send('ok')
|
||||
}
|
||||
)
|
||||
|
||||
@ -8,12 +8,15 @@ const queue = new Queue(QUEUE_NAME, {
|
||||
connection: redisDataSource.queueRedisClient,
|
||||
})
|
||||
|
||||
export type State = 'SUCCEEDED' | 'FAILED'
|
||||
|
||||
type UpdatePageJobData = {
|
||||
fileId: string
|
||||
content: string
|
||||
content?: string
|
||||
title?: string
|
||||
author?: string
|
||||
description?: string
|
||||
state?: State
|
||||
}
|
||||
|
||||
export const queueUpdatePageJob = async (data: UpdatePageJobData) => {
|
||||
|
||||
Reference in New Issue
Block a user