diff --git a/packages/api/src/entity/export.ts b/packages/api/src/entity/export.ts new file mode 100644 index 000000000..defb0edd2 --- /dev/null +++ b/packages/api/src/entity/export.ts @@ -0,0 +1,51 @@ +/* +CREATE TABLE omnivore.export ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v1mc(), + user_id UUID NOT NULL REFERENCES omnivore.user(id) ON DELETE CASCADE, + task_id TEXT NOT NULL, + state TEXT NOT NULL, + total_items INT DEFAULT 0, + processed_items INT DEFAULT 0, + signed_url TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP +); +*/ + +import { + Column, + CreateDateColumn, + Entity, + PrimaryGeneratedColumn, + UpdateDateColumn, +} from 'typeorm' + +@Entity() +export class Export { + @PrimaryGeneratedColumn('uuid') + id!: string + + @Column('uuid') + userId!: string + + @Column('text') + taskId!: string + + @Column('text') + state!: string + + @Column('int', { default: 0 }) + totalItems!: number + + @Column('int', { default: 0 }) + processedItems!: number + + @Column('text', { nullable: true }) + signedUrl?: string + + @CreateDateColumn({ type: 'timestamptz' }) + createdAt!: Date + + @UpdateDateColumn({ type: 'timestamptz' }) + updatedAt!: Date +} diff --git a/packages/api/src/jobs/export.ts b/packages/api/src/jobs/export.ts index 666af1981..41b15a84e 100644 --- a/packages/api/src/jobs/export.ts +++ b/packages/api/src/jobs/export.ts @@ -1,12 +1,14 @@ import archiver, { Archiver } from 'archiver' import { v4 as uuidv4 } from 'uuid' -import { LibraryItem } from '../entity/library_item' +import { LibraryItem, LibraryItemState } from '../entity/library_item' +import { TaskState } from '../generated/graphql' +import { findExportById, saveExport } from '../services/export' import { findHighlightsByLibraryItemId } from '../services/highlights' import { findLibraryItemById, searchLibraryItems, } from '../services/library_item' -import { sendExportCompletedEmail } from '../services/send_emails' +import { sendExportJobEmail } from '../services/send_emails' import { findActiveUser } from '../services/user' import { logger } from '../utils/logger' import { highlightToMarkdown } from '../utils/parser' @@ -14,10 +16,23 @@ import { contentFilePath, createGCSFile } from '../utils/uploads' export interface ExportJobData { userId: string + exportId: string } export const EXPORT_JOB_NAME = 'export' +const itemStateMappping = (state: LibraryItemState) => { + switch (state) { + case LibraryItemState.Archived: + return 'Archived' + case LibraryItemState.ContentNotFetched: + case LibraryItemState.Succeeded: + return 'Active' + default: + return 'Unknown' + } +} + const uploadContent = async ( userId: string, libraryItem: LibraryItem, @@ -77,7 +92,7 @@ const uploadToBucket = async ( description: item.description, author: item.author, url: item.originalUrl, - state: item.state, + state: itemStateMappping(item.state), readingProgress: item.readingProgressBottomPercent, thumbnail: item.thumbnail, labels: item.labelNames, @@ -111,124 +126,162 @@ const uploadToBucket = async ( } export const exportJob = async (jobData: ExportJobData) => { - const { userId } = jobData - const user = await findActiveUser(userId) - if (!user) { - logger.error('user not found', { - userId, - }) - return - } - - logger.info('exporting all items...', { - userId, - }) - - // export data as a zip file: - // exports/{userId}/{date}/{uuid}.zip - // - metadata.json - // - /content - // - {slug}.html - // - /highlights - // - {slug}.md - const dateStr = new Date().toISOString() - const fileUuid = uuidv4() - const fullPath = `exports/${userId}/${dateStr}/${fileUuid}.zip` - - const file = createGCSFile(fullPath) - - // Create a write stream - const writeStream = file.createWriteStream({ - metadata: { - contentType: 'application/zip', - }, - }) - - // Handle any errors in the streams - writeStream.on('error', (err) => { - logger.error('Error writing to GCS:', err) - }) - - writeStream.on('finish', () => { - logger.info('File successfully written to GCS') - }) - - // Initialize archiver for zipping files - const archive = archiver('zip', { - zlib: { level: 9 }, // Compression level - }) - - // Handle any archiver errors - archive.on('error', (err) => { - throw err - }) - - // Pipe the archiver output to the write stream - archive.pipe(writeStream) + const { userId, exportId } = jobData try { - // fetch data from the database - const batchSize = 20 - let cursor = 0 - let hasNext = false - do { - const items = await searchLibraryItems( - { - from: cursor, - size: batchSize, - query: 'in:all', - includeContent: false, - includeDeleted: false, - includePending: false, - }, - userId - ) + const user = await findActiveUser(userId) + if (!user) { + logger.error('user not found', { + userId, + }) + return + } - const size = items.length - // write data to the csv file - if (size > 0) { - cursor = await uploadToBucket(userId, items, cursor, size, archive) + const exportTask = await findExportById(exportId, userId) + if (!exportTask) { + logger.error('export task not found', { + userId, + exportId, + }) + return + } - hasNext = size === batchSize - } - } while (hasNext) - } catch (err) { - logger.error('Error exporting data:', err) + await saveExport(userId, { + id: exportId, + state: TaskState.Running, + }) - throw err - } finally { - // Finalize the archive - await archive.finalize() - } + const emailJob = await sendExportJobEmail(userId, 'started') + if (!emailJob) { + logger.error('Failed to send export job email', { + userId, + }) + return + } - // Ensure that the writeStream has finished - await new Promise((resolve, reject) => { - writeStream.on('finish', resolve) - writeStream.on('error', reject) - }) + logger.info('exporting all items...', { + userId, + }) - logger.info('export completed', { - userId, - }) + // export data as a zip file: + // exports/{userId}/{date}/{uuid}.zip + // - metadata.json + // - /content + // - {slug}.html + // - /highlights + // - {slug}.md + const dateStr = new Date().toISOString() + const fileUuid = uuidv4() + const fullPath = `exports/${userId}/${dateStr}/${fileUuid}.zip` - // generate a temporary signed url for the zip file - const [signedUrl] = await file.getSignedUrl({ - action: 'read', - expires: Date.now() + 86400 * 1000, // 15 minutes - }) + const file = createGCSFile(fullPath) - logger.info('signed url for export:', { - userId, - signedUrl, - }) + // Create a write stream + const writeStream = file.createWriteStream({ + metadata: { + contentType: 'application/zip', + }, + }) - const job = await sendExportCompletedEmail(userId, signedUrl) - if (!job) { - logger.error('failed to send export completed email', { + // Handle any errors in the streams + writeStream.on('error', (err) => { + logger.error('Error writing to GCS:', err) + }) + + writeStream.on('finish', () => { + logger.info('File successfully written to GCS') + }) + + // Initialize archiver for zipping files + const archive = archiver('zip', { + zlib: { level: 9 }, // Compression level + }) + + // Handle any archiver errors + archive.on('error', (err) => { + throw err + }) + + // Pipe the archiver output to the write stream + archive.pipe(writeStream) + + try { + // fetch data from the database + const batchSize = 20 + let cursor = 0 + let hasNext = false + do { + const items = await searchLibraryItems( + { + from: cursor, + size: batchSize, + query: 'in:all', + includeContent: false, + includeDeleted: false, + includePending: false, + }, + userId + ) + + const size = items.length + // write data to the csv file + if (size > 0) { + cursor = await uploadToBucket(userId, items, cursor, size, archive) + + hasNext = size === batchSize + } + } while (hasNext) + } finally { + // Finalize the archive + await archive.finalize() + } + + // Ensure that the writeStream has finished + await new Promise((resolve, reject) => { + writeStream.on('finish', resolve) + writeStream.on('error', reject) + }) + + logger.info('export completed', { + userId, + }) + + // generate a temporary signed url for the zip file + const [signedUrl] = await file.getSignedUrl({ + action: 'read', + expires: Date.now() + 86400 * 1000, // 15 minutes + }) + + logger.info('signed url for export:', { userId, signedUrl, }) - throw new Error('failed to send export completed email') + await saveExport(userId, { + id: exportId, + state: TaskState.Succeeded, + }) + + const job = await sendExportJobEmail(userId, 'completed', signedUrl) + if (!job) { + logger.error('failed to send export completed email', { + userId, + signedUrl, + }) + } + } catch (error) { + logger.error('export failed', error) + + await saveExport(userId, { + id: exportId, + state: TaskState.Failed, + }) + + const job = await sendExportJobEmail(userId, 'failed') + if (!job) { + logger.error('failed to send export failed email', { + userId, + }) + } } } diff --git a/packages/api/src/routers/export_router.ts b/packages/api/src/routers/export_router.ts index 2b5edb359..03f751ed4 100644 --- a/packages/api/src/routers/export_router.ts +++ b/packages/api/src/routers/export_router.ts @@ -1,6 +1,8 @@ import cors from 'cors' import express, { Router } from 'express' import { jobStateToTaskState } from '../queue-processor' +import { countExportsWithin24Hours, saveExport } from '../services/export' +import { sendExportJobEmail } from '../services/send_emails' import { getClaimsByToken, getTokenByRequest } from '../utils/auth' import { corsConfig } from '../utils/corsConfig' import { queueExportJob } from '../utils/createTask' @@ -25,6 +27,17 @@ export function exportRouter() { const userId = claims.uid try { + const exportsWithin24Hours = await countExportsWithin24Hours(userId) + if (exportsWithin24Hours >= 3) { + logger.error('User has reached the limit of exports within 24 hours', { + userId, + exportsWithin24Hours, + }) + return res.status(400).send({ + error: 'EXPORT_LIMIT_REACHED', + }) + } + const job = await queueExportJob(userId) if (!job || !job.id) { @@ -41,10 +54,17 @@ export function exportRouter() { jobId: job.id, }) + const taskId = job.id const jobState = await job.getState() + const state = jobStateToTaskState(jobState) + await saveExport(userId, { + taskId: job.id, + state, + }) + res.send({ - jobId: job.id, - state: jobStateToTaskState(jobState), + taskId, + state, }) } catch (error) { logger.error('Error exporting all items', { diff --git a/packages/api/src/services/export.ts b/packages/api/src/services/export.ts new file mode 100644 index 000000000..f6ea147c4 --- /dev/null +++ b/packages/api/src/services/export.ts @@ -0,0 +1,34 @@ +import { In, MoreThan } from 'typeorm' +import { Export } from '../entity/export' +import { TaskState } from '../generated/graphql' +import { getRepository } from '../repository' + +export const saveExport = async ( + userId: string, + exportData: Partial +): Promise => { + return getRepository(Export).save({ + userId, + ...exportData, + }) +} + +export const countExportsWithin24Hours = async ( + userId: string +): Promise => { + return getRepository(Export).countBy({ + userId, + createdAt: MoreThan(new Date(Date.now() - 24 * 60 * 60 * 1000)), + state: In([TaskState.Pending, TaskState.Running, TaskState.Succeeded]), + }) +} + +export const findExportById = async ( + id: string, + userId: string +): Promise => { + return getRepository(Export).findOneBy({ + id, + userId, + }) +} diff --git a/packages/api/src/services/send_emails.ts b/packages/api/src/services/send_emails.ts index 3d814f708..ed5c64307 100644 --- a/packages/api/src/services/send_emails.ts +++ b/packages/api/src/services/send_emails.ts @@ -114,13 +114,39 @@ export const sendPasswordResetEmail = async (user: { return !!result } -export const sendExportCompletedEmail = async ( +export const sendExportJobEmail = async ( userId: string, - urlToDownload: string + state: 'completed' | 'failed' | 'started', + urlToDownload?: string ) => { + let subject = '' + let html = '' + + switch (state) { + case 'completed': + if (!urlToDownload) { + throw new Error('urlToDownload is required') + } + + subject = 'Your Omnivore export is ready' + html = `

Your export is ready. You can download it from the following link: ${urlToDownload}

` + break + case 'failed': + subject = 'Your Omnivore export failed' + html = '

Your export failed. Please try again later.

' + break + case 'started': + subject = 'Your Omnivore export has started' + html = + '

Your export has started. You will receive an email once it is completed.

' + break + default: + throw new Error('Invalid state') + } + return enqueueSendEmail({ userId, - subject: 'Your Omnivore export is ready', - html: `

Your export is ready. You can download it from the following link: ${urlToDownload}

`, + subject, + html, }) } diff --git a/packages/db/migrations/0186.do.create_export_table.sql b/packages/db/migrations/0186.do.create_export_table.sql new file mode 100755 index 000000000..8770f494e --- /dev/null +++ b/packages/db/migrations/0186.do.create_export_table.sql @@ -0,0 +1,21 @@ +-- Type: DO +-- Name: create_export_table +-- Description: Create a table to store the export information + +BEGIN; + +CREATE TABLE omnivore.export ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v1mc(), + user_id UUID NOT NULL REFERENCES omnivore.user(id) ON DELETE CASCADE, + task_id TEXT NOT NULL, + state TEXT NOT NULL, + total_items INT DEFAULT 0, + processed_items INT DEFAULT 0, + signed_url TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX export_user_id_idx ON omnivore.export(user_id); + +COMMIT; diff --git a/packages/db/migrations/0186.undo.create_export_table.sql b/packages/db/migrations/0186.undo.create_export_table.sql new file mode 100755 index 000000000..b481b771e --- /dev/null +++ b/packages/db/migrations/0186.undo.create_export_table.sql @@ -0,0 +1,9 @@ +-- Type: UNDO +-- Name: create_export_table +-- Description: Create a table to store the export information + +BEGIN; + +DROP TABLE omnivore.export; + +COMMIT;