From 444c78f0cb0cdedee8c47870dbb758d0ab7c9189 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Tue, 27 Aug 2024 14:19:02 +0800 Subject: [PATCH] use async job to handle exporter --- packages/api/package.json | 2 + packages/api/src/jobs/export.ts | 161 ++++++++++++++++++++-- packages/api/src/routers/export_router.ts | 5 +- packages/api/src/services/send_emails.ts | 11 ++ packages/api/src/utils/parser.ts | 20 ++- packages/api/src/utils/uploads.ts | 13 +- packages/export-handler/package.json | 1 - 7 files changed, 192 insertions(+), 21 deletions(-) diff --git a/packages/api/package.json b/packages/api/package.json index 826a2217c..dfb233d4c 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -51,6 +51,7 @@ "alfaaz": "^1.1.0", "apollo-datasource": "^3.3.1", "apollo-server-express": "^3.6.3", + "archiver": "^7.0.1", "axios": "^0.27.2", "bcryptjs": "^2.4.3", "bullmq": "^5.1.1", @@ -123,6 +124,7 @@ "@istanbuljs/nyc-config-typescript": "^1.0.2", "@types/addressparser": "^1.0.1", "@types/analytics-node": "^3.1.7", + "@types/archiver": "^6.0.2", "@types/bcryptjs": "^2.4.2", "@types/chai": "^4.2.18", "@types/chai-as-promised": "^7.1.5", diff --git a/packages/api/src/jobs/export.ts b/packages/api/src/jobs/export.ts index 8d6f6dc5b..3c7884411 100644 --- a/packages/api/src/jobs/export.ts +++ b/packages/api/src/jobs/export.ts @@ -1,14 +1,71 @@ -import axios from 'axios' -import jwt from 'jsonwebtoken' -import { env } from '../env' +import archiver, { Archiver } from 'archiver' +import { v4 as uuidv4 } from 'uuid' +import { LibraryItem } from '../entity/library_item' +import { findHighlightsByLibraryItemId } from '../services/highlights' +import { searchLibraryItems } from '../services/library_item' +import { sendExportCompletedEmail } from '../services/send_emails' import { findActiveUser } from '../services/user' import { logger } from '../utils/logger' +import { highlightToMarkdown } from '../utils/parser' +import { createGCSFile, generateDownloadSignedUrl } from '../utils/uploads' export interface ExportJobData { userId: string } export const EXPORT_JOB_NAME = 'export' +const GCS_BUCKET = 'omnivore-export' + +const uploadToBucket = async ( + userId: string, + items: Array, + cursor: number, + size: number, + archive: Archiver +): Promise => { + // Add the metadata.json file to the root of the zip + const metadata = items.map((item) => ({ + id: item.id, + slug: item.slug, + title: item.title, + description: item.description, + author: item.author, + url: item.originalUrl, + state: item.state, + readingProgress: item.readingProgressBottomPercent, + thumbnail: item.thumbnail, + labels: item.labelNames, + savedAt: item.savedAt, + updatedAt: item.updatedAt, + publishedAt: item.publishedAt, + })) + + const endCursor = cursor + size + archive.append(JSON.stringify(metadata, null, 2), { + name: `metadata_${cursor}_to_${endCursor}.json`, + }) + + // Loop through the items and add files to /content and /highlights directories + for (const item of items) { + const slug = item.slug + // Add content files to /content + archive.append(item.readableContent, { + name: `content/${slug}.html`, + }) + + if (item.highlightAnnotations?.length) { + const highlights = await findHighlightsByLibraryItemId(item.id, userId) + const markdown = highlights.map(highlightToMarkdown).join('\n\n') + + // Add highlight files to /highlights + archive.append(markdown, { + name: `highlights/${slug}.md`, + }) + } + } + + return endCursor +} export const exportJob = async (jobData: ExportJobData) => { const { userId } = jobData @@ -24,17 +81,95 @@ export const exportJob = async (jobData: ExportJobData) => { userId, }) - const token = jwt.sign( - { - uid: userId, - }, - env.server.jwtSecret, - { expiresIn: '1d' } - ) + // export data as a zip file: + // exports/{userId}/{date}/{uuid}.zip + // - metadata.json + // - /content + // - {slug}.html + // - /highlights + // - {slug}.md + const dateStr = new Date().toISOString() + const fileUuid = uuidv4() + const fullPath = `exports/${userId}/${dateStr}/${fileUuid}.zip` - await axios.post(env.queue.exportTaskHandlerUrl, undefined, { - headers: { - OmnivoreAuthorizationHeader: token, + const file = createGCSFile(GCS_BUCKET, fullPath) + + // Create a write stream + const writeStream = file.createWriteStream({ + metadata: { + contentType: 'application/zip', }, }) + + // Handle any errors in the streams + writeStream.on('error', (err) => { + console.error('Error writing to GCS:', err) + }) + + writeStream.on('finish', () => { + console.log('File successfully written to GCS') + }) + + // Initialize archiver for zipping files + const archive = archiver('zip', { + zlib: { level: 9 }, // Compression level + }) + + // Handle any archiver errors + archive.on('error', (err) => { + console.error('Error zipping files:', err) + throw err + }) + + // Pipe the archiver output to the write stream + archive.pipe(writeStream) + + try { + // fetch data from the database + const batchSize = 20 + let cursor = 0 + let hasNext = false + do { + const items = await searchLibraryItems( + { + from: cursor, + size: batchSize, + query: 'in:all', + includeContent: false, + includeDeleted: false, + includePending: false, + }, + userId + ) + + const size = items.length + // write data to the csv file + if (size > 0) { + cursor = await uploadToBucket(userId, items, cursor, size, archive) + + hasNext = size === batchSize + } + } while (hasNext) + } catch (err) { + console.error('Error exporting data:', err) + } finally { + // Finalize the archive + await archive.finalize() + } + + // generate a temporary signed url for the zip file + const signedUrl = await generateDownloadSignedUrl(fullPath, { + expires: 60 * 60 * 24, // 24 hours + bucketName: GCS_BUCKET, + }) + + const job = await sendExportCompletedEmail(userId, signedUrl) + if (!job) { + logger.error('failed to send export completed email', { + userId, + signedUrl, + }) + + throw new Error('failed to send export completed email') + } } diff --git a/packages/api/src/routers/export_router.ts b/packages/api/src/routers/export_router.ts index 495d1a3fc..2b5edb359 100644 --- a/packages/api/src/routers/export_router.ts +++ b/packages/api/src/routers/export_router.ts @@ -1,5 +1,6 @@ import cors from 'cors' import express, { Router } from 'express' +import { jobStateToTaskState } from '../queue-processor' import { getClaimsByToken, getTokenByRequest } from '../utils/auth' import { corsConfig } from '../utils/corsConfig' import { queueExportJob } from '../utils/createTask' @@ -26,7 +27,7 @@ export function exportRouter() { try { const job = await queueExportJob(userId) - if (!job) { + if (!job || !job.id) { logger.error('Failed to queue export job', { userId, }) @@ -40,8 +41,10 @@ export function exportRouter() { jobId: job.id, }) + const jobState = await job.getState() res.send({ jobId: job.id, + state: jobStateToTaskState(jobState), }) } catch (error) { logger.error('Error exporting all items', { diff --git a/packages/api/src/services/send_emails.ts b/packages/api/src/services/send_emails.ts index f1138cc3a..3d814f708 100644 --- a/packages/api/src/services/send_emails.ts +++ b/packages/api/src/services/send_emails.ts @@ -113,3 +113,14 @@ export const sendPasswordResetEmail = async (user: { return !!result } + +export const sendExportCompletedEmail = async ( + userId: string, + urlToDownload: string +) => { + return enqueueSendEmail({ + userId, + subject: 'Your Omnivore export is ready', + html: `

Your export is ready. You can download it from the following link: ${urlToDownload}

`, + }) +} diff --git a/packages/api/src/utils/parser.ts b/packages/api/src/utils/parser.ts index 51aa15d06..3acd06886 100644 --- a/packages/api/src/utils/parser.ts +++ b/packages/api/src/utils/parser.ts @@ -20,7 +20,7 @@ import showdown from 'showdown' import { ILike } from 'typeorm' import { promisify } from 'util' import { v4 as uuid } from 'uuid' -import { Highlight } from '../entity/highlight' +import { Highlight, HighlightType } from '../entity/highlight' import { StatusType } from '../entity/user' import { env } from '../env' import { PageType, PreparedDocumentInput } from '../generated/graphql' @@ -865,3 +865,21 @@ export const parseFeed = async ( return null } } + +const formatHighlightQuote = (quote: string): string => { + // replace all empty lines with blockquote '>' to preserve paragraphs + return quote.replace(/^(?=\n)$|^\s*?\n/gm, '> ') +} + +export const highlightToMarkdown = (highlight: Highlight): string => { + if (highlight.highlightType === HighlightType.Highlight && highlight.quote) { + const quote = formatHighlightQuote(highlight.quote) + const labels = highlight.labels?.map((label) => `#${label.name}`).join(' ') + const note = highlight.annotation + return `> ${quote} ${labels ? `\n\n${labels}` : ''}${ + note ? `\n\n${note}` : '' + }` + } + + return '' +} diff --git a/packages/api/src/utils/uploads.ts b/packages/api/src/utils/uploads.ts index 801f4f623..c4b71b5ca 100644 --- a/packages/api/src/utils/uploads.ts +++ b/packages/api/src/utils/uploads.ts @@ -67,17 +67,17 @@ export const generateUploadSignedUrl = async ( export const generateDownloadSignedUrl = async ( filePathName: string, config?: { + bucketName?: string expires?: number } ): Promise => { const options: GetSignedUrlConfig = { version: 'v4', action: 'read', - expires: Date.now() + 240 * 60 * 1000, // four hours - ...config, + expires: config?.expires ?? Date.now() + 240 * 60 * 1000, // four hours } const [url] = await storage - .bucket(bucketName) + .bucket(config?.bucketName || bucketName) .file(filePathName) .getSignedUrl(options) logger.info(`generating download signed url: ${url}`) @@ -116,8 +116,11 @@ export const uploadToBucket = async ( .save(data, { timeout: 30000, ...options }) // default timeout 30s } -export const createGCSFile = (filename: string): File => { - return storage.bucket(bucketName).file(filename) +export const createGCSFile = ( + filename: string, + selectedBucket = bucketName +): File => { + return storage.bucket(selectedBucket).file(filename) } export const downloadFromUrl = async ( diff --git a/packages/export-handler/package.json b/packages/export-handler/package.json index 42d22ba73..b9c546bdb 100644 --- a/packages/export-handler/package.json +++ b/packages/export-handler/package.json @@ -30,7 +30,6 @@ "@omnivore/utils": "1.0.0", "@sentry/serverless": "^7.77.0", "archiver": "^7.0.1", - "csv-stringify": "^6.4.0", "dotenv": "^16.0.1", "jsonwebtoken": "^8.5.1", "nodemon": "^2.0.15",