save export tasks in db and check db before starting export

This commit is contained in:
Hongbo Wu
2024-08-27 17:48:05 +08:00
parent 0e523d8c73
commit f77ded31e1
7 changed files with 328 additions and 114 deletions

View File

@ -0,0 +1,51 @@
/*
CREATE TABLE omnivore.export (
id UUID PRIMARY KEY DEFAULT uuid_generate_v1mc(),
user_id UUID NOT NULL REFERENCES omnivore.user(id) ON DELETE CASCADE,
task_id TEXT NOT NULL,
state TEXT NOT NULL,
total_items INT DEFAULT 0,
processed_items INT DEFAULT 0,
signed_url TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
);
*/
import {
Column,
CreateDateColumn,
Entity,
PrimaryGeneratedColumn,
UpdateDateColumn,
} from 'typeorm'
@Entity()
export class Export {
@PrimaryGeneratedColumn('uuid')
id!: string
@Column('uuid')
userId!: string
@Column('text')
taskId!: string
@Column('text')
state!: string
@Column('int', { default: 0 })
totalItems!: number
@Column('int', { default: 0 })
processedItems!: number
@Column('text', { nullable: true })
signedUrl?: string
@CreateDateColumn({ type: 'timestamptz' })
createdAt!: Date
@UpdateDateColumn({ type: 'timestamptz' })
updatedAt!: Date
}

View File

@ -1,12 +1,14 @@
import archiver, { Archiver } from 'archiver'
import { v4 as uuidv4 } from 'uuid'
import { LibraryItem } from '../entity/library_item'
import { LibraryItem, LibraryItemState } from '../entity/library_item'
import { TaskState } from '../generated/graphql'
import { findExportById, saveExport } from '../services/export'
import { findHighlightsByLibraryItemId } from '../services/highlights'
import {
findLibraryItemById,
searchLibraryItems,
} from '../services/library_item'
import { sendExportCompletedEmail } from '../services/send_emails'
import { sendExportJobEmail } from '../services/send_emails'
import { findActiveUser } from '../services/user'
import { logger } from '../utils/logger'
import { highlightToMarkdown } from '../utils/parser'
@ -14,10 +16,23 @@ import { contentFilePath, createGCSFile } from '../utils/uploads'
export interface ExportJobData {
userId: string
exportId: string
}
export const EXPORT_JOB_NAME = 'export'
const itemStateMappping = (state: LibraryItemState) => {
switch (state) {
case LibraryItemState.Archived:
return 'Archived'
case LibraryItemState.ContentNotFetched:
case LibraryItemState.Succeeded:
return 'Active'
default:
return 'Unknown'
}
}
const uploadContent = async (
userId: string,
libraryItem: LibraryItem,
@ -77,7 +92,7 @@ const uploadToBucket = async (
description: item.description,
author: item.author,
url: item.originalUrl,
state: item.state,
state: itemStateMappping(item.state),
readingProgress: item.readingProgressBottomPercent,
thumbnail: item.thumbnail,
labels: item.labelNames,
@ -111,124 +126,162 @@ const uploadToBucket = async (
}
export const exportJob = async (jobData: ExportJobData) => {
const { userId } = jobData
const user = await findActiveUser(userId)
if (!user) {
logger.error('user not found', {
userId,
})
return
}
logger.info('exporting all items...', {
userId,
})
// export data as a zip file:
// exports/{userId}/{date}/{uuid}.zip
// - metadata.json
// - /content
// - {slug}.html
// - /highlights
// - {slug}.md
const dateStr = new Date().toISOString()
const fileUuid = uuidv4()
const fullPath = `exports/${userId}/${dateStr}/${fileUuid}.zip`
const file = createGCSFile(fullPath)
// Create a write stream
const writeStream = file.createWriteStream({
metadata: {
contentType: 'application/zip',
},
})
// Handle any errors in the streams
writeStream.on('error', (err) => {
logger.error('Error writing to GCS:', err)
})
writeStream.on('finish', () => {
logger.info('File successfully written to GCS')
})
// Initialize archiver for zipping files
const archive = archiver('zip', {
zlib: { level: 9 }, // Compression level
})
// Handle any archiver errors
archive.on('error', (err) => {
throw err
})
// Pipe the archiver output to the write stream
archive.pipe(writeStream)
const { userId, exportId } = jobData
try {
// fetch data from the database
const batchSize = 20
let cursor = 0
let hasNext = false
do {
const items = await searchLibraryItems(
{
from: cursor,
size: batchSize,
query: 'in:all',
includeContent: false,
includeDeleted: false,
includePending: false,
},
userId
)
const user = await findActiveUser(userId)
if (!user) {
logger.error('user not found', {
userId,
})
return
}
const size = items.length
// write data to the csv file
if (size > 0) {
cursor = await uploadToBucket(userId, items, cursor, size, archive)
const exportTask = await findExportById(exportId, userId)
if (!exportTask) {
logger.error('export task not found', {
userId,
exportId,
})
return
}
hasNext = size === batchSize
}
} while (hasNext)
} catch (err) {
logger.error('Error exporting data:', err)
await saveExport(userId, {
id: exportId,
state: TaskState.Running,
})
throw err
} finally {
// Finalize the archive
await archive.finalize()
}
const emailJob = await sendExportJobEmail(userId, 'started')
if (!emailJob) {
logger.error('Failed to send export job email', {
userId,
})
return
}
// Ensure that the writeStream has finished
await new Promise((resolve, reject) => {
writeStream.on('finish', resolve)
writeStream.on('error', reject)
})
logger.info('exporting all items...', {
userId,
})
logger.info('export completed', {
userId,
})
// export data as a zip file:
// exports/{userId}/{date}/{uuid}.zip
// - metadata.json
// - /content
// - {slug}.html
// - /highlights
// - {slug}.md
const dateStr = new Date().toISOString()
const fileUuid = uuidv4()
const fullPath = `exports/${userId}/${dateStr}/${fileUuid}.zip`
// generate a temporary signed url for the zip file
const [signedUrl] = await file.getSignedUrl({
action: 'read',
expires: Date.now() + 86400 * 1000, // 15 minutes
})
const file = createGCSFile(fullPath)
logger.info('signed url for export:', {
userId,
signedUrl,
})
// Create a write stream
const writeStream = file.createWriteStream({
metadata: {
contentType: 'application/zip',
},
})
const job = await sendExportCompletedEmail(userId, signedUrl)
if (!job) {
logger.error('failed to send export completed email', {
// Handle any errors in the streams
writeStream.on('error', (err) => {
logger.error('Error writing to GCS:', err)
})
writeStream.on('finish', () => {
logger.info('File successfully written to GCS')
})
// Initialize archiver for zipping files
const archive = archiver('zip', {
zlib: { level: 9 }, // Compression level
})
// Handle any archiver errors
archive.on('error', (err) => {
throw err
})
// Pipe the archiver output to the write stream
archive.pipe(writeStream)
try {
// fetch data from the database
const batchSize = 20
let cursor = 0
let hasNext = false
do {
const items = await searchLibraryItems(
{
from: cursor,
size: batchSize,
query: 'in:all',
includeContent: false,
includeDeleted: false,
includePending: false,
},
userId
)
const size = items.length
// write data to the csv file
if (size > 0) {
cursor = await uploadToBucket(userId, items, cursor, size, archive)
hasNext = size === batchSize
}
} while (hasNext)
} finally {
// Finalize the archive
await archive.finalize()
}
// Ensure that the writeStream has finished
await new Promise((resolve, reject) => {
writeStream.on('finish', resolve)
writeStream.on('error', reject)
})
logger.info('export completed', {
userId,
})
// generate a temporary signed url for the zip file
const [signedUrl] = await file.getSignedUrl({
action: 'read',
expires: Date.now() + 86400 * 1000, // 15 minutes
})
logger.info('signed url for export:', {
userId,
signedUrl,
})
throw new Error('failed to send export completed email')
await saveExport(userId, {
id: exportId,
state: TaskState.Succeeded,
})
const job = await sendExportJobEmail(userId, 'completed', signedUrl)
if (!job) {
logger.error('failed to send export completed email', {
userId,
signedUrl,
})
}
} catch (error) {
logger.error('export failed', error)
await saveExport(userId, {
id: exportId,
state: TaskState.Failed,
})
const job = await sendExportJobEmail(userId, 'failed')
if (!job) {
logger.error('failed to send export failed email', {
userId,
})
}
}
}

View File

@ -1,6 +1,8 @@
import cors from 'cors'
import express, { Router } from 'express'
import { jobStateToTaskState } from '../queue-processor'
import { countExportsWithin24Hours, saveExport } from '../services/export'
import { sendExportJobEmail } from '../services/send_emails'
import { getClaimsByToken, getTokenByRequest } from '../utils/auth'
import { corsConfig } from '../utils/corsConfig'
import { queueExportJob } from '../utils/createTask'
@ -25,6 +27,17 @@ export function exportRouter() {
const userId = claims.uid
try {
const exportsWithin24Hours = await countExportsWithin24Hours(userId)
if (exportsWithin24Hours >= 3) {
logger.error('User has reached the limit of exports within 24 hours', {
userId,
exportsWithin24Hours,
})
return res.status(400).send({
error: 'EXPORT_LIMIT_REACHED',
})
}
const job = await queueExportJob(userId)
if (!job || !job.id) {
@ -41,10 +54,17 @@ export function exportRouter() {
jobId: job.id,
})
const taskId = job.id
const jobState = await job.getState()
const state = jobStateToTaskState(jobState)
await saveExport(userId, {
taskId: job.id,
state,
})
res.send({
jobId: job.id,
state: jobStateToTaskState(jobState),
taskId,
state,
})
} catch (error) {
logger.error('Error exporting all items', {

View File

@ -0,0 +1,34 @@
import { In, MoreThan } from 'typeorm'
import { Export } from '../entity/export'
import { TaskState } from '../generated/graphql'
import { getRepository } from '../repository'
export const saveExport = async (
userId: string,
exportData: Partial<Export>
): Promise<Export> => {
return getRepository(Export).save({
userId,
...exportData,
})
}
export const countExportsWithin24Hours = async (
userId: string
): Promise<number> => {
return getRepository(Export).countBy({
userId,
createdAt: MoreThan(new Date(Date.now() - 24 * 60 * 60 * 1000)),
state: In([TaskState.Pending, TaskState.Running, TaskState.Succeeded]),
})
}
export const findExportById = async (
id: string,
userId: string
): Promise<Export | null> => {
return getRepository(Export).findOneBy({
id,
userId,
})
}

View File

@ -114,13 +114,39 @@ export const sendPasswordResetEmail = async (user: {
return !!result
}
export const sendExportCompletedEmail = async (
export const sendExportJobEmail = async (
userId: string,
urlToDownload: string
state: 'completed' | 'failed' | 'started',
urlToDownload?: string
) => {
let subject = ''
let html = ''
switch (state) {
case 'completed':
if (!urlToDownload) {
throw new Error('urlToDownload is required')
}
subject = 'Your Omnivore export is ready'
html = `<p>Your export is ready. You can download it from the following link: <a href="${urlToDownload}">${urlToDownload}</a></p>`
break
case 'failed':
subject = 'Your Omnivore export failed'
html = '<p>Your export failed. Please try again later.</p>'
break
case 'started':
subject = 'Your Omnivore export has started'
html =
'<p>Your export has started. You will receive an email once it is completed.</p>'
break
default:
throw new Error('Invalid state')
}
return enqueueSendEmail({
userId,
subject: 'Your Omnivore export is ready',
html: `<p>Your export is ready. You can download it from the following link: <a href="${urlToDownload}">${urlToDownload}</a></p>`,
subject,
html,
})
}

View File

@ -0,0 +1,21 @@
-- Type: DO
-- Name: create_export_table
-- Description: Create a table to store the export information
BEGIN;
CREATE TABLE omnivore.export (
id UUID PRIMARY KEY DEFAULT uuid_generate_v1mc(),
user_id UUID NOT NULL REFERENCES omnivore.user(id) ON DELETE CASCADE,
task_id TEXT NOT NULL,
state TEXT NOT NULL,
total_items INT DEFAULT 0,
processed_items INT DEFAULT 0,
signed_url TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX export_user_id_idx ON omnivore.export(user_id);
COMMIT;

View File

@ -0,0 +1,9 @@
-- Type: UNDO
-- Name: create_export_table
-- Description: Create a table to store the export information
BEGIN;
DROP TABLE omnivore.export;
COMMIT;