From 48b3f736f0fc0606b5ae6cfb55d356fa6541d71a Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Tue, 27 Aug 2024 14:57:31 +0800 Subject: [PATCH] wait for write stream to finish --- packages/api/src/jobs/export.ts | 32 +++++++++++++++++++---------- packages/api/src/queue-processor.ts | 3 +++ packages/api/src/server.ts | 2 ++ packages/api/src/utils/uploads.ts | 8 ++++++++ 4 files changed, 34 insertions(+), 11 deletions(-) diff --git a/packages/api/src/jobs/export.ts b/packages/api/src/jobs/export.ts index 3c7884411..400d7ecbb 100644 --- a/packages/api/src/jobs/export.ts +++ b/packages/api/src/jobs/export.ts @@ -7,14 +7,13 @@ import { sendExportCompletedEmail } from '../services/send_emails' import { findActiveUser } from '../services/user' import { logger } from '../utils/logger' import { highlightToMarkdown } from '../utils/parser' -import { createGCSFile, generateDownloadSignedUrl } from '../utils/uploads' +import { createGCSFile } from '../utils/uploads' export interface ExportJobData { userId: string } export const EXPORT_JOB_NAME = 'export' -const GCS_BUCKET = 'omnivore-export' const uploadToBucket = async ( userId: string, @@ -92,7 +91,7 @@ export const exportJob = async (jobData: ExportJobData) => { const fileUuid = uuidv4() const fullPath = `exports/${userId}/${dateStr}/${fileUuid}.zip` - const file = createGCSFile(GCS_BUCKET, fullPath) + const file = createGCSFile(fullPath) // Create a write stream const writeStream = file.createWriteStream({ @@ -103,11 +102,11 @@ export const exportJob = async (jobData: ExportJobData) => { // Handle any errors in the streams writeStream.on('error', (err) => { - console.error('Error writing to GCS:', err) + logger.error('Error writing to GCS:', err) }) writeStream.on('finish', () => { - console.log('File successfully written to GCS') + logger.info('File successfully written to GCS') }) // Initialize archiver for zipping files @@ -117,7 +116,6 @@ export const exportJob = async (jobData: ExportJobData) => { // Handle any archiver errors archive.on('error', (err) => { - console.error('Error zipping files:', err) throw err }) @@ -135,7 +133,7 @@ export const exportJob = async (jobData: ExportJobData) => { from: cursor, size: batchSize, query: 'in:all', - includeContent: false, + includeContent: true, includeDeleted: false, includePending: false, }, @@ -151,16 +149,28 @@ export const exportJob = async (jobData: ExportJobData) => { } } while (hasNext) } catch (err) { - console.error('Error exporting data:', err) + logger.error('Error exporting data:', err) + + throw err } finally { // Finalize the archive await archive.finalize() } + // Ensure that the writeStream has finished + await new Promise((resolve, reject) => { + writeStream.on('finish', resolve) + writeStream.on('error', reject) + }) + + logger.info('export completed', { + userId, + }) + // generate a temporary signed url for the zip file - const signedUrl = await generateDownloadSignedUrl(fullPath, { - expires: 60 * 60 * 24, // 24 hours - bucketName: GCS_BUCKET, + const [signedUrl] = await file.getSignedUrl({ + action: 'read', + expires: Date.now() + 86400 * 1000, // 15 minutes }) const job = await sendExportCompletedEmail(userId, signedUrl) diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index c0030300f..cdc7e9817 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -35,6 +35,7 @@ import { expireFoldersJob, EXPIRE_FOLDERS_JOB_NAME, } from './jobs/expire_folders' +import { exportJob, EXPORT_JOB_NAME } from './jobs/export' import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail' import { generatePreviewContent, @@ -223,6 +224,8 @@ export const createWorker = (connection: ConnectionOptions) => return pruneTrashJob(job.data) case EXPIRE_FOLDERS_JOB_NAME: return expireFoldersJob() + case EXPORT_JOB_NAME: + return exportJob(job.data) default: logger.warning(`[queue-processor] unhandled job: ${job.name}`) } diff --git a/packages/api/src/server.ts b/packages/api/src/server.ts index 069f1f9cf..09ba82946 100755 --- a/packages/api/src/server.ts +++ b/packages/api/src/server.ts @@ -24,6 +24,7 @@ import { mobileAuthRouter } from './routers/auth/mobile/mobile_auth_router' import { contentRouter } from './routers/content_router' import { digestRouter } from './routers/digest_router' import { explainRouter } from './routers/explain_router' +import { exportRouter } from './routers/export_router' import { integrationRouter } from './routers/integration_router' import { localDebugRouter } from './routers/local_debug_router' import { notificationRouter } from './routers/notification_router' @@ -106,6 +107,7 @@ export const createApp = (): Express => { app.use('/api/tasks', taskRouter()) app.use('/api/digest', digestRouter()) app.use('/api/content', contentRouter()) + app.use('/api/export', exportRouter()) app.use('/svc/pubsub/content', contentServiceRouter()) app.use('/svc/pubsub/links', linkServiceRouter()) diff --git a/packages/api/src/utils/uploads.ts b/packages/api/src/utils/uploads.ts index c4b71b5ca..b39745172 100644 --- a/packages/api/src/utils/uploads.ts +++ b/packages/api/src/utils/uploads.ts @@ -64,6 +64,14 @@ export const generateUploadSignedUrl = async ( return url } +const createSignedUrl = async (file: File): Promise => { + const signedUrl = await file.getSignedUrl({ + action: 'read', + expires: Date.now() + 15 * 60 * 1000, // 15 minutes + }) + return signedUrl[0] +} + export const generateDownloadSignedUrl = async ( filePathName: string, config?: {