wait for write stream to finish

This commit is contained in:
Hongbo Wu
2024-08-27 14:57:31 +08:00
parent 444c78f0cb
commit 48b3f736f0
4 changed files with 34 additions and 11 deletions

View File

@ -7,14 +7,13 @@ import { sendExportCompletedEmail } from '../services/send_emails'
import { findActiveUser } from '../services/user'
import { logger } from '../utils/logger'
import { highlightToMarkdown } from '../utils/parser'
import { createGCSFile, generateDownloadSignedUrl } from '../utils/uploads'
import { createGCSFile } from '../utils/uploads'
export interface ExportJobData {
userId: string
}
export const EXPORT_JOB_NAME = 'export'
const GCS_BUCKET = 'omnivore-export'
const uploadToBucket = async (
userId: string,
@ -92,7 +91,7 @@ export const exportJob = async (jobData: ExportJobData) => {
const fileUuid = uuidv4()
const fullPath = `exports/${userId}/${dateStr}/${fileUuid}.zip`
const file = createGCSFile(GCS_BUCKET, fullPath)
const file = createGCSFile(fullPath)
// Create a write stream
const writeStream = file.createWriteStream({
@ -103,11 +102,11 @@ export const exportJob = async (jobData: ExportJobData) => {
// Handle any errors in the streams
writeStream.on('error', (err) => {
console.error('Error writing to GCS:', err)
logger.error('Error writing to GCS:', err)
})
writeStream.on('finish', () => {
console.log('File successfully written to GCS')
logger.info('File successfully written to GCS')
})
// Initialize archiver for zipping files
@ -117,7 +116,6 @@ export const exportJob = async (jobData: ExportJobData) => {
// Handle any archiver errors
archive.on('error', (err) => {
console.error('Error zipping files:', err)
throw err
})
@ -135,7 +133,7 @@ export const exportJob = async (jobData: ExportJobData) => {
from: cursor,
size: batchSize,
query: 'in:all',
includeContent: false,
includeContent: true,
includeDeleted: false,
includePending: false,
},
@ -151,16 +149,28 @@ export const exportJob = async (jobData: ExportJobData) => {
}
} while (hasNext)
} catch (err) {
console.error('Error exporting data:', err)
logger.error('Error exporting data:', err)
throw err
} finally {
// Finalize the archive
await archive.finalize()
}
// Ensure that the writeStream has finished
await new Promise((resolve, reject) => {
writeStream.on('finish', resolve)
writeStream.on('error', reject)
})
logger.info('export completed', {
userId,
})
// generate a temporary signed url for the zip file
const signedUrl = await generateDownloadSignedUrl(fullPath, {
expires: 60 * 60 * 24, // 24 hours
bucketName: GCS_BUCKET,
const [signedUrl] = await file.getSignedUrl({
action: 'read',
expires: Date.now() + 86400 * 1000, // 15 minutes
})
const job = await sendExportCompletedEmail(userId, signedUrl)

View File

@ -35,6 +35,7 @@ import {
expireFoldersJob,
EXPIRE_FOLDERS_JOB_NAME,
} from './jobs/expire_folders'
import { exportJob, EXPORT_JOB_NAME } from './jobs/export'
import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail'
import {
generatePreviewContent,
@ -223,6 +224,8 @@ export const createWorker = (connection: ConnectionOptions) =>
return pruneTrashJob(job.data)
case EXPIRE_FOLDERS_JOB_NAME:
return expireFoldersJob()
case EXPORT_JOB_NAME:
return exportJob(job.data)
default:
logger.warning(`[queue-processor] unhandled job: ${job.name}`)
}

View File

@ -24,6 +24,7 @@ import { mobileAuthRouter } from './routers/auth/mobile/mobile_auth_router'
import { contentRouter } from './routers/content_router'
import { digestRouter } from './routers/digest_router'
import { explainRouter } from './routers/explain_router'
import { exportRouter } from './routers/export_router'
import { integrationRouter } from './routers/integration_router'
import { localDebugRouter } from './routers/local_debug_router'
import { notificationRouter } from './routers/notification_router'
@ -106,6 +107,7 @@ export const createApp = (): Express => {
app.use('/api/tasks', taskRouter())
app.use('/api/digest', digestRouter())
app.use('/api/content', contentRouter())
app.use('/api/export', exportRouter())
app.use('/svc/pubsub/content', contentServiceRouter())
app.use('/svc/pubsub/links', linkServiceRouter())

View File

@ -64,6 +64,14 @@ export const generateUploadSignedUrl = async (
return url
}
const createSignedUrl = async (file: File): Promise<string> => {
const signedUrl = await file.getSignedUrl({
action: 'read',
expires: Date.now() + 15 * 60 * 1000, // 15 minutes
})
return signedUrl[0]
}
export const generateDownloadSignedUrl = async (
filePathName: string,
config?: {