use async job to handle exporter
This commit is contained in:
@ -51,6 +51,7 @@
|
||||
"alfaaz": "^1.1.0",
|
||||
"apollo-datasource": "^3.3.1",
|
||||
"apollo-server-express": "^3.6.3",
|
||||
"archiver": "^7.0.1",
|
||||
"axios": "^0.27.2",
|
||||
"bcryptjs": "^2.4.3",
|
||||
"bullmq": "^5.1.1",
|
||||
@ -123,6 +124,7 @@
|
||||
"@istanbuljs/nyc-config-typescript": "^1.0.2",
|
||||
"@types/addressparser": "^1.0.1",
|
||||
"@types/analytics-node": "^3.1.7",
|
||||
"@types/archiver": "^6.0.2",
|
||||
"@types/bcryptjs": "^2.4.2",
|
||||
"@types/chai": "^4.2.18",
|
||||
"@types/chai-as-promised": "^7.1.5",
|
||||
|
||||
@ -1,14 +1,71 @@
|
||||
import axios from 'axios'
|
||||
import jwt from 'jsonwebtoken'
|
||||
import { env } from '../env'
|
||||
import archiver, { Archiver } from 'archiver'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { LibraryItem } from '../entity/library_item'
|
||||
import { findHighlightsByLibraryItemId } from '../services/highlights'
|
||||
import { searchLibraryItems } from '../services/library_item'
|
||||
import { sendExportCompletedEmail } from '../services/send_emails'
|
||||
import { findActiveUser } from '../services/user'
|
||||
import { logger } from '../utils/logger'
|
||||
import { highlightToMarkdown } from '../utils/parser'
|
||||
import { createGCSFile, generateDownloadSignedUrl } from '../utils/uploads'
|
||||
|
||||
export interface ExportJobData {
|
||||
userId: string
|
||||
}
|
||||
|
||||
export const EXPORT_JOB_NAME = 'export'
|
||||
const GCS_BUCKET = 'omnivore-export'
|
||||
|
||||
const uploadToBucket = async (
|
||||
userId: string,
|
||||
items: Array<LibraryItem>,
|
||||
cursor: number,
|
||||
size: number,
|
||||
archive: Archiver
|
||||
): Promise<number> => {
|
||||
// Add the metadata.json file to the root of the zip
|
||||
const metadata = items.map((item) => ({
|
||||
id: item.id,
|
||||
slug: item.slug,
|
||||
title: item.title,
|
||||
description: item.description,
|
||||
author: item.author,
|
||||
url: item.originalUrl,
|
||||
state: item.state,
|
||||
readingProgress: item.readingProgressBottomPercent,
|
||||
thumbnail: item.thumbnail,
|
||||
labels: item.labelNames,
|
||||
savedAt: item.savedAt,
|
||||
updatedAt: item.updatedAt,
|
||||
publishedAt: item.publishedAt,
|
||||
}))
|
||||
|
||||
const endCursor = cursor + size
|
||||
archive.append(JSON.stringify(metadata, null, 2), {
|
||||
name: `metadata_${cursor}_to_${endCursor}.json`,
|
||||
})
|
||||
|
||||
// Loop through the items and add files to /content and /highlights directories
|
||||
for (const item of items) {
|
||||
const slug = item.slug
|
||||
// Add content files to /content
|
||||
archive.append(item.readableContent, {
|
||||
name: `content/${slug}.html`,
|
||||
})
|
||||
|
||||
if (item.highlightAnnotations?.length) {
|
||||
const highlights = await findHighlightsByLibraryItemId(item.id, userId)
|
||||
const markdown = highlights.map(highlightToMarkdown).join('\n\n')
|
||||
|
||||
// Add highlight files to /highlights
|
||||
archive.append(markdown, {
|
||||
name: `highlights/${slug}.md`,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return endCursor
|
||||
}
|
||||
|
||||
export const exportJob = async (jobData: ExportJobData) => {
|
||||
const { userId } = jobData
|
||||
@ -24,17 +81,95 @@ export const exportJob = async (jobData: ExportJobData) => {
|
||||
userId,
|
||||
})
|
||||
|
||||
const token = jwt.sign(
|
||||
{
|
||||
uid: userId,
|
||||
},
|
||||
env.server.jwtSecret,
|
||||
{ expiresIn: '1d' }
|
||||
)
|
||||
// export data as a zip file:
|
||||
// exports/{userId}/{date}/{uuid}.zip
|
||||
// - metadata.json
|
||||
// - /content
|
||||
// - {slug}.html
|
||||
// - /highlights
|
||||
// - {slug}.md
|
||||
const dateStr = new Date().toISOString()
|
||||
const fileUuid = uuidv4()
|
||||
const fullPath = `exports/${userId}/${dateStr}/${fileUuid}.zip`
|
||||
|
||||
await axios.post(env.queue.exportTaskHandlerUrl, undefined, {
|
||||
headers: {
|
||||
OmnivoreAuthorizationHeader: token,
|
||||
const file = createGCSFile(GCS_BUCKET, fullPath)
|
||||
|
||||
// Create a write stream
|
||||
const writeStream = file.createWriteStream({
|
||||
metadata: {
|
||||
contentType: 'application/zip',
|
||||
},
|
||||
})
|
||||
|
||||
// Handle any errors in the streams
|
||||
writeStream.on('error', (err) => {
|
||||
console.error('Error writing to GCS:', err)
|
||||
})
|
||||
|
||||
writeStream.on('finish', () => {
|
||||
console.log('File successfully written to GCS')
|
||||
})
|
||||
|
||||
// Initialize archiver for zipping files
|
||||
const archive = archiver('zip', {
|
||||
zlib: { level: 9 }, // Compression level
|
||||
})
|
||||
|
||||
// Handle any archiver errors
|
||||
archive.on('error', (err) => {
|
||||
console.error('Error zipping files:', err)
|
||||
throw err
|
||||
})
|
||||
|
||||
// Pipe the archiver output to the write stream
|
||||
archive.pipe(writeStream)
|
||||
|
||||
try {
|
||||
// fetch data from the database
|
||||
const batchSize = 20
|
||||
let cursor = 0
|
||||
let hasNext = false
|
||||
do {
|
||||
const items = await searchLibraryItems(
|
||||
{
|
||||
from: cursor,
|
||||
size: batchSize,
|
||||
query: 'in:all',
|
||||
includeContent: false,
|
||||
includeDeleted: false,
|
||||
includePending: false,
|
||||
},
|
||||
userId
|
||||
)
|
||||
|
||||
const size = items.length
|
||||
// write data to the csv file
|
||||
if (size > 0) {
|
||||
cursor = await uploadToBucket(userId, items, cursor, size, archive)
|
||||
|
||||
hasNext = size === batchSize
|
||||
}
|
||||
} while (hasNext)
|
||||
} catch (err) {
|
||||
console.error('Error exporting data:', err)
|
||||
} finally {
|
||||
// Finalize the archive
|
||||
await archive.finalize()
|
||||
}
|
||||
|
||||
// generate a temporary signed url for the zip file
|
||||
const signedUrl = await generateDownloadSignedUrl(fullPath, {
|
||||
expires: 60 * 60 * 24, // 24 hours
|
||||
bucketName: GCS_BUCKET,
|
||||
})
|
||||
|
||||
const job = await sendExportCompletedEmail(userId, signedUrl)
|
||||
if (!job) {
|
||||
logger.error('failed to send export completed email', {
|
||||
userId,
|
||||
signedUrl,
|
||||
})
|
||||
|
||||
throw new Error('failed to send export completed email')
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import cors from 'cors'
|
||||
import express, { Router } from 'express'
|
||||
import { jobStateToTaskState } from '../queue-processor'
|
||||
import { getClaimsByToken, getTokenByRequest } from '../utils/auth'
|
||||
import { corsConfig } from '../utils/corsConfig'
|
||||
import { queueExportJob } from '../utils/createTask'
|
||||
@ -26,7 +27,7 @@ export function exportRouter() {
|
||||
try {
|
||||
const job = await queueExportJob(userId)
|
||||
|
||||
if (!job) {
|
||||
if (!job || !job.id) {
|
||||
logger.error('Failed to queue export job', {
|
||||
userId,
|
||||
})
|
||||
@ -40,8 +41,10 @@ export function exportRouter() {
|
||||
jobId: job.id,
|
||||
})
|
||||
|
||||
const jobState = await job.getState()
|
||||
res.send({
|
||||
jobId: job.id,
|
||||
state: jobStateToTaskState(jobState),
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error('Error exporting all items', {
|
||||
|
||||
@ -113,3 +113,14 @@ export const sendPasswordResetEmail = async (user: {
|
||||
|
||||
return !!result
|
||||
}
|
||||
|
||||
export const sendExportCompletedEmail = async (
|
||||
userId: string,
|
||||
urlToDownload: string
|
||||
) => {
|
||||
return enqueueSendEmail({
|
||||
userId,
|
||||
subject: 'Your Omnivore export is ready',
|
||||
html: `<p>Your export is ready. You can download it from the following link: <a href="${urlToDownload}">${urlToDownload}</a></p>`,
|
||||
})
|
||||
}
|
||||
|
||||
@ -20,7 +20,7 @@ import showdown from 'showdown'
|
||||
import { ILike } from 'typeorm'
|
||||
import { promisify } from 'util'
|
||||
import { v4 as uuid } from 'uuid'
|
||||
import { Highlight } from '../entity/highlight'
|
||||
import { Highlight, HighlightType } from '../entity/highlight'
|
||||
import { StatusType } from '../entity/user'
|
||||
import { env } from '../env'
|
||||
import { PageType, PreparedDocumentInput } from '../generated/graphql'
|
||||
@ -865,3 +865,21 @@ export const parseFeed = async (
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
const formatHighlightQuote = (quote: string): string => {
|
||||
// replace all empty lines with blockquote '>' to preserve paragraphs
|
||||
return quote.replace(/^(?=\n)$|^\s*?\n/gm, '> ')
|
||||
}
|
||||
|
||||
export const highlightToMarkdown = (highlight: Highlight): string => {
|
||||
if (highlight.highlightType === HighlightType.Highlight && highlight.quote) {
|
||||
const quote = formatHighlightQuote(highlight.quote)
|
||||
const labels = highlight.labels?.map((label) => `#${label.name}`).join(' ')
|
||||
const note = highlight.annotation
|
||||
return `> ${quote} ${labels ? `\n\n${labels}` : ''}${
|
||||
note ? `\n\n${note}` : ''
|
||||
}`
|
||||
}
|
||||
|
||||
return ''
|
||||
}
|
||||
|
||||
@ -67,17 +67,17 @@ export const generateUploadSignedUrl = async (
|
||||
export const generateDownloadSignedUrl = async (
|
||||
filePathName: string,
|
||||
config?: {
|
||||
bucketName?: string
|
||||
expires?: number
|
||||
}
|
||||
): Promise<string> => {
|
||||
const options: GetSignedUrlConfig = {
|
||||
version: 'v4',
|
||||
action: 'read',
|
||||
expires: Date.now() + 240 * 60 * 1000, // four hours
|
||||
...config,
|
||||
expires: config?.expires ?? Date.now() + 240 * 60 * 1000, // four hours
|
||||
}
|
||||
const [url] = await storage
|
||||
.bucket(bucketName)
|
||||
.bucket(config?.bucketName || bucketName)
|
||||
.file(filePathName)
|
||||
.getSignedUrl(options)
|
||||
logger.info(`generating download signed url: ${url}`)
|
||||
@ -116,8 +116,11 @@ export const uploadToBucket = async (
|
||||
.save(data, { timeout: 30000, ...options }) // default timeout 30s
|
||||
}
|
||||
|
||||
export const createGCSFile = (filename: string): File => {
|
||||
return storage.bucket(bucketName).file(filename)
|
||||
export const createGCSFile = (
|
||||
filename: string,
|
||||
selectedBucket = bucketName
|
||||
): File => {
|
||||
return storage.bucket(selectedBucket).file(filename)
|
||||
}
|
||||
|
||||
export const downloadFromUrl = async (
|
||||
|
||||
@ -30,7 +30,6 @@
|
||||
"@omnivore/utils": "1.0.0",
|
||||
"@sentry/serverless": "^7.77.0",
|
||||
"archiver": "^7.0.1",
|
||||
"csv-stringify": "^6.4.0",
|
||||
"dotenv": "^16.0.1",
|
||||
"jsonwebtoken": "^8.5.1",
|
||||
"nodemon": "^2.0.15",
|
||||
|
||||
Reference in New Issue
Block a user