Merge pull request #4327 from omnivore-app/feature/exporter
feat: update exporter
This commit is contained in:
@ -51,6 +51,7 @@
|
||||
"alfaaz": "^1.1.0",
|
||||
"apollo-datasource": "^3.3.1",
|
||||
"apollo-server-express": "^3.6.3",
|
||||
"archiver": "^7.0.1",
|
||||
"axios": "^0.27.2",
|
||||
"bcryptjs": "^2.4.3",
|
||||
"bullmq": "^5.1.1",
|
||||
@ -123,6 +124,7 @@
|
||||
"@istanbuljs/nyc-config-typescript": "^1.0.2",
|
||||
"@types/addressparser": "^1.0.1",
|
||||
"@types/analytics-node": "^3.1.7",
|
||||
"@types/archiver": "^6.0.2",
|
||||
"@types/bcryptjs": "^2.4.2",
|
||||
"@types/chai": "^4.2.18",
|
||||
"@types/chai-as-promised": "^7.1.5",
|
||||
|
||||
37
packages/api/src/entity/export.ts
Normal file
37
packages/api/src/entity/export.ts
Normal file
@ -0,0 +1,37 @@
|
||||
import {
|
||||
Column,
|
||||
CreateDateColumn,
|
||||
Entity,
|
||||
PrimaryGeneratedColumn,
|
||||
UpdateDateColumn,
|
||||
} from 'typeorm'
|
||||
|
||||
@Entity()
|
||||
export class Export {
|
||||
@PrimaryGeneratedColumn('uuid')
|
||||
id!: string
|
||||
|
||||
@Column('uuid')
|
||||
userId!: string
|
||||
|
||||
@Column('text', { nullable: true })
|
||||
taskId?: string
|
||||
|
||||
@Column('text')
|
||||
state!: string
|
||||
|
||||
@Column('int', { default: 0 })
|
||||
totalItems!: number
|
||||
|
||||
@Column('int', { default: 0 })
|
||||
processedItems!: number
|
||||
|
||||
@Column('text', { nullable: true })
|
||||
signedUrl?: string
|
||||
|
||||
@CreateDateColumn({ type: 'timestamptz' })
|
||||
createdAt!: Date
|
||||
|
||||
@UpdateDateColumn({ type: 'timestamptz' })
|
||||
updatedAt!: Date
|
||||
}
|
||||
287
packages/api/src/jobs/export.ts
Normal file
287
packages/api/src/jobs/export.ts
Normal file
@ -0,0 +1,287 @@
|
||||
import archiver, { Archiver } from 'archiver'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { LibraryItem, LibraryItemState } from '../entity/library_item'
|
||||
import { TaskState } from '../generated/graphql'
|
||||
import { findExportById, saveExport } from '../services/export'
|
||||
import { findHighlightsByLibraryItemId } from '../services/highlights'
|
||||
import {
|
||||
findLibraryItemById,
|
||||
searchLibraryItems,
|
||||
} from '../services/library_item'
|
||||
import { sendExportJobEmail } from '../services/send_emails'
|
||||
import { findActiveUser } from '../services/user'
|
||||
import { logger } from '../utils/logger'
|
||||
import { highlightToMarkdown } from '../utils/parser'
|
||||
import { contentFilePath, createGCSFile } from '../utils/uploads'
|
||||
|
||||
export interface ExportJobData {
|
||||
userId: string
|
||||
exportId: string
|
||||
}
|
||||
|
||||
export const EXPORT_JOB_NAME = 'export'
|
||||
|
||||
const itemStateMappping = (state: LibraryItemState) => {
|
||||
switch (state) {
|
||||
case LibraryItemState.Archived:
|
||||
return 'Archived'
|
||||
case LibraryItemState.ContentNotFetched:
|
||||
case LibraryItemState.Succeeded:
|
||||
return 'Active'
|
||||
default:
|
||||
return 'Unknown'
|
||||
}
|
||||
}
|
||||
|
||||
const uploadContent = async (
|
||||
userId: string,
|
||||
libraryItem: LibraryItem,
|
||||
archive: Archiver
|
||||
) => {
|
||||
const filePath = contentFilePath({
|
||||
userId,
|
||||
libraryItemId: libraryItem.id,
|
||||
format: 'readable',
|
||||
savedAt: libraryItem.savedAt,
|
||||
updatedAt: libraryItem.updatedAt,
|
||||
})
|
||||
|
||||
const file = createGCSFile(filePath)
|
||||
|
||||
// check if file is already uploaded
|
||||
const [exists] = await file.exists()
|
||||
if (!exists) {
|
||||
logger.info(`File not found: ${filePath}`)
|
||||
|
||||
// upload the content to GCS
|
||||
const item = await findLibraryItemById(libraryItem.id, userId, {
|
||||
select: ['readableContent'],
|
||||
})
|
||||
if (!item?.readableContent) {
|
||||
logger.error('Item not found', {
|
||||
userId,
|
||||
libraryItemId: libraryItem.id,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
await file.save(item.readableContent, {
|
||||
contentType: 'text/html',
|
||||
private: true,
|
||||
})
|
||||
}
|
||||
|
||||
// append the existing file to the archive
|
||||
archive.append(file.createReadStream(), {
|
||||
name: `content/${libraryItem.slug}.html`,
|
||||
})
|
||||
}
|
||||
|
||||
const uploadToBucket = async (
|
||||
userId: string,
|
||||
items: Array<LibraryItem>,
|
||||
cursor: number,
|
||||
size: number,
|
||||
archive: Archiver
|
||||
): Promise<number> => {
|
||||
// Add the metadata.json file to the root of the zip
|
||||
const metadata = items.map((item) => ({
|
||||
id: item.id,
|
||||
slug: item.slug,
|
||||
title: item.title,
|
||||
description: item.description,
|
||||
author: item.author,
|
||||
url: item.originalUrl,
|
||||
state: itemStateMappping(item.state),
|
||||
readingProgress: item.readingProgressBottomPercent,
|
||||
thumbnail: item.thumbnail,
|
||||
labels: item.labelNames,
|
||||
savedAt: item.savedAt,
|
||||
updatedAt: item.updatedAt,
|
||||
publishedAt: item.publishedAt,
|
||||
}))
|
||||
|
||||
const endCursor = cursor + size
|
||||
archive.append(JSON.stringify(metadata, null, 2), {
|
||||
name: `metadata_${cursor}_to_${endCursor}.json`,
|
||||
})
|
||||
|
||||
// Loop through the items and add files to /content and /highlights directories
|
||||
for (const item of items) {
|
||||
// Add content files to /content
|
||||
await uploadContent(userId, item, archive)
|
||||
|
||||
if (item.highlightAnnotations?.length) {
|
||||
const highlights = await findHighlightsByLibraryItemId(item.id, userId)
|
||||
const markdown = highlights.map(highlightToMarkdown).join('\n\n')
|
||||
|
||||
// Add highlight files to /highlights
|
||||
archive.append(markdown, {
|
||||
name: `highlights/${item.slug}.md`,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return endCursor
|
||||
}
|
||||
|
||||
export const exportJob = async (jobData: ExportJobData) => {
|
||||
const { userId, exportId } = jobData
|
||||
|
||||
try {
|
||||
const user = await findActiveUser(userId)
|
||||
if (!user) {
|
||||
logger.error('user not found', {
|
||||
userId,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
const exportTask = await findExportById(exportId, userId)
|
||||
if (!exportTask) {
|
||||
logger.error('export task not found', {
|
||||
userId,
|
||||
exportId,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
await saveExport(userId, {
|
||||
id: exportId,
|
||||
state: TaskState.Running,
|
||||
})
|
||||
|
||||
const emailJob = await sendExportJobEmail(userId, 'started')
|
||||
if (!emailJob) {
|
||||
logger.error('Failed to send export job email', {
|
||||
userId,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
logger.info('exporting all items...', {
|
||||
userId,
|
||||
})
|
||||
|
||||
// export data as a zip file:
|
||||
// exports/{userId}/{date}/{uuid}.zip
|
||||
// - metadata.json
|
||||
// - /content
|
||||
// - {slug}.html
|
||||
// - /highlights
|
||||
// - {slug}.md
|
||||
const dateStr = new Date().toISOString()
|
||||
const fileUuid = uuidv4()
|
||||
const fullPath = `exports/${userId}/${dateStr}/${fileUuid}.zip`
|
||||
|
||||
const file = createGCSFile(fullPath)
|
||||
|
||||
// Create a write stream
|
||||
const writeStream = file.createWriteStream({
|
||||
metadata: {
|
||||
contentType: 'application/zip',
|
||||
},
|
||||
})
|
||||
|
||||
// Handle any errors in the streams
|
||||
writeStream.on('error', (err) => {
|
||||
logger.error('Error writing to GCS:', err)
|
||||
})
|
||||
|
||||
writeStream.on('finish', () => {
|
||||
logger.info('File successfully written to GCS')
|
||||
})
|
||||
|
||||
// Initialize archiver for zipping files
|
||||
const archive = archiver('zip', {
|
||||
zlib: { level: 9 }, // Compression level
|
||||
})
|
||||
|
||||
// Handle any archiver errors
|
||||
archive.on('error', (err) => {
|
||||
throw err
|
||||
})
|
||||
|
||||
// Pipe the archiver output to the write stream
|
||||
archive.pipe(writeStream)
|
||||
|
||||
try {
|
||||
// fetch data from the database
|
||||
const batchSize = 20
|
||||
let cursor = 0
|
||||
let hasNext = false
|
||||
do {
|
||||
const items = await searchLibraryItems(
|
||||
{
|
||||
from: cursor,
|
||||
size: batchSize,
|
||||
query: 'in:all',
|
||||
includeContent: false,
|
||||
includeDeleted: false,
|
||||
includePending: false,
|
||||
},
|
||||
userId
|
||||
)
|
||||
|
||||
const size = items.length
|
||||
// write data to the csv file
|
||||
if (size > 0) {
|
||||
cursor = await uploadToBucket(userId, items, cursor, size, archive)
|
||||
|
||||
hasNext = size === batchSize
|
||||
}
|
||||
} while (hasNext)
|
||||
} finally {
|
||||
// Finalize the archive
|
||||
await archive.finalize()
|
||||
}
|
||||
|
||||
// Ensure that the writeStream has finished
|
||||
await new Promise((resolve, reject) => {
|
||||
writeStream.on('finish', resolve)
|
||||
writeStream.on('error', reject)
|
||||
})
|
||||
|
||||
logger.info('export completed', {
|
||||
userId,
|
||||
})
|
||||
|
||||
// generate a temporary signed url for the zip file
|
||||
const [signedUrl] = await file.getSignedUrl({
|
||||
action: 'read',
|
||||
expires: Date.now() + 86400 * 1000, // 15 minutes
|
||||
})
|
||||
|
||||
logger.info('signed url for export:', {
|
||||
userId,
|
||||
signedUrl,
|
||||
})
|
||||
|
||||
await saveExport(userId, {
|
||||
id: exportId,
|
||||
state: TaskState.Succeeded,
|
||||
})
|
||||
|
||||
const job = await sendExportJobEmail(userId, 'completed', signedUrl)
|
||||
if (!job) {
|
||||
logger.error('failed to send export completed email', {
|
||||
userId,
|
||||
signedUrl,
|
||||
})
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('export failed', error)
|
||||
|
||||
await saveExport(userId, {
|
||||
id: exportId,
|
||||
state: TaskState.Failed,
|
||||
})
|
||||
|
||||
const job = await sendExportJobEmail(userId, 'failed')
|
||||
if (!job) {
|
||||
logger.error('failed to send export failed email', {
|
||||
userId,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -35,6 +35,7 @@ import {
|
||||
expireFoldersJob,
|
||||
EXPIRE_FOLDERS_JOB_NAME,
|
||||
} from './jobs/expire_folders'
|
||||
import { exportJob, EXPORT_JOB_NAME } from './jobs/export'
|
||||
import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail'
|
||||
import {
|
||||
generatePreviewContent,
|
||||
@ -223,6 +224,8 @@ export const createWorker = (connection: ConnectionOptions) =>
|
||||
return pruneTrashJob(job.data)
|
||||
case EXPIRE_FOLDERS_JOB_NAME:
|
||||
return expireFoldersJob()
|
||||
case EXPORT_JOB_NAME:
|
||||
return exportJob(job.data)
|
||||
default:
|
||||
logger.warning(`[queue-processor] unhandled job: ${job.name}`)
|
||||
}
|
||||
|
||||
87
packages/api/src/routers/export_router.ts
Normal file
87
packages/api/src/routers/export_router.ts
Normal file
@ -0,0 +1,87 @@
|
||||
import cors from 'cors'
|
||||
import express, { Router } from 'express'
|
||||
import { TaskState } from '../generated/graphql'
|
||||
import { jobStateToTaskState } from '../queue-processor'
|
||||
import { countExportsWithin24Hours, saveExport } from '../services/export'
|
||||
import { sendExportJobEmail } from '../services/send_emails'
|
||||
import { getClaimsByToken, getTokenByRequest } from '../utils/auth'
|
||||
import { corsConfig } from '../utils/corsConfig'
|
||||
import { queueExportJob } from '../utils/createTask'
|
||||
import { logger } from '../utils/logger'
|
||||
|
||||
export function exportRouter() {
|
||||
const router = Router()
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-misused-promises
|
||||
router.get('/', cors<express.Request>(corsConfig), async (req, res) => {
|
||||
const token = getTokenByRequest(req)
|
||||
// get claims from token
|
||||
const claims = await getClaimsByToken(token)
|
||||
if (!claims) {
|
||||
logger.error('Token not found')
|
||||
return res.status(401).send({
|
||||
error: 'UNAUTHORIZED',
|
||||
})
|
||||
}
|
||||
|
||||
// get user by uid from claims
|
||||
const userId = claims.uid
|
||||
|
||||
try {
|
||||
const exportsWithin24Hours = await countExportsWithin24Hours(userId)
|
||||
if (exportsWithin24Hours >= 3) {
|
||||
logger.error('User has reached the limit of exports within 24 hours', {
|
||||
userId,
|
||||
exportsWithin24Hours,
|
||||
})
|
||||
return res.status(400).send({
|
||||
error: 'EXPORT_LIMIT_REACHED',
|
||||
})
|
||||
}
|
||||
|
||||
const exportTask = await saveExport(userId, {
|
||||
state: TaskState.Pending,
|
||||
})
|
||||
|
||||
const job = await queueExportJob(userId, exportTask.id)
|
||||
|
||||
if (!job || !job.id) {
|
||||
logger.error('Failed to queue export job', {
|
||||
userId,
|
||||
})
|
||||
return res.status(500).send({
|
||||
error: 'INTERNAL_ERROR',
|
||||
})
|
||||
}
|
||||
|
||||
logger.info('Export job queued', {
|
||||
userId,
|
||||
jobId: job.id,
|
||||
})
|
||||
|
||||
const taskId = job.id
|
||||
const jobState = await job.getState()
|
||||
const state = jobStateToTaskState(jobState)
|
||||
await saveExport(userId, {
|
||||
id: exportTask.id,
|
||||
taskId,
|
||||
state,
|
||||
})
|
||||
|
||||
res.send({
|
||||
taskId,
|
||||
state,
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error('Error exporting all items', {
|
||||
userId,
|
||||
error,
|
||||
})
|
||||
return res.status(500).send({
|
||||
error: 'INTERNAL_ERROR',
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
return router
|
||||
}
|
||||
@ -24,6 +24,7 @@ import { mobileAuthRouter } from './routers/auth/mobile/mobile_auth_router'
|
||||
import { contentRouter } from './routers/content_router'
|
||||
import { digestRouter } from './routers/digest_router'
|
||||
import { explainRouter } from './routers/explain_router'
|
||||
import { exportRouter } from './routers/export_router'
|
||||
import { integrationRouter } from './routers/integration_router'
|
||||
import { localDebugRouter } from './routers/local_debug_router'
|
||||
import { notificationRouter } from './routers/notification_router'
|
||||
@ -106,6 +107,7 @@ export const createApp = (): Express => {
|
||||
app.use('/api/tasks', taskRouter())
|
||||
app.use('/api/digest', digestRouter())
|
||||
app.use('/api/content', contentRouter())
|
||||
app.use('/api/export', exportRouter())
|
||||
|
||||
app.use('/svc/pubsub/content', contentServiceRouter())
|
||||
app.use('/svc/pubsub/links', linkServiceRouter())
|
||||
|
||||
34
packages/api/src/services/export.ts
Normal file
34
packages/api/src/services/export.ts
Normal file
@ -0,0 +1,34 @@
|
||||
import { In, MoreThan } from 'typeorm'
|
||||
import { Export } from '../entity/export'
|
||||
import { TaskState } from '../generated/graphql'
|
||||
import { getRepository } from '../repository'
|
||||
|
||||
export const saveExport = async (
|
||||
userId: string,
|
||||
exportData: Partial<Export>
|
||||
): Promise<Export> => {
|
||||
return getRepository(Export).save({
|
||||
...exportData,
|
||||
userId,
|
||||
})
|
||||
}
|
||||
|
||||
export const countExportsWithin24Hours = async (
|
||||
userId: string
|
||||
): Promise<number> => {
|
||||
return getRepository(Export).countBy({
|
||||
userId,
|
||||
createdAt: MoreThan(new Date(Date.now() - 24 * 60 * 60 * 1000)),
|
||||
state: In([TaskState.Pending, TaskState.Running, TaskState.Succeeded]),
|
||||
})
|
||||
}
|
||||
|
||||
export const findExportById = async (
|
||||
id: string,
|
||||
userId: string
|
||||
): Promise<Export | null> => {
|
||||
return getRepository(Export).findOneBy({
|
||||
id,
|
||||
userId,
|
||||
})
|
||||
}
|
||||
@ -113,3 +113,40 @@ export const sendPasswordResetEmail = async (user: {
|
||||
|
||||
return !!result
|
||||
}
|
||||
|
||||
export const sendExportJobEmail = async (
|
||||
userId: string,
|
||||
state: 'completed' | 'failed' | 'started',
|
||||
urlToDownload?: string
|
||||
) => {
|
||||
let subject = ''
|
||||
let html = ''
|
||||
|
||||
switch (state) {
|
||||
case 'completed':
|
||||
if (!urlToDownload) {
|
||||
throw new Error('urlToDownload is required')
|
||||
}
|
||||
|
||||
subject = 'Your Omnivore export is ready'
|
||||
html = `<p>Your export is ready. You can download it from the following link: <a href="${urlToDownload}">${urlToDownload}</a></p>`
|
||||
break
|
||||
case 'failed':
|
||||
subject = 'Your Omnivore export failed'
|
||||
html = '<p>Your export failed. Please try again later.</p>'
|
||||
break
|
||||
case 'started':
|
||||
subject = 'Your Omnivore export has started'
|
||||
html =
|
||||
'<p>Your export has started. You will receive an email once it is completed.</p>'
|
||||
break
|
||||
default:
|
||||
throw new Error('Invalid state')
|
||||
}
|
||||
|
||||
return enqueueSendEmail({
|
||||
userId,
|
||||
subject,
|
||||
html,
|
||||
})
|
||||
}
|
||||
|
||||
@ -87,6 +87,7 @@ export interface BackendEnv {
|
||||
integrationExporterUrl: string
|
||||
integrationImporterUrl: string
|
||||
importerMetricsUrl: string
|
||||
exportTaskHandlerUrl: string
|
||||
}
|
||||
fileUpload: {
|
||||
gcsUploadBucket: string
|
||||
@ -199,6 +200,7 @@ const nullableEnvVars = [
|
||||
'INTERCOM_WEB_SECRET',
|
||||
'INTERCOM_IOS_SECRET',
|
||||
'INTERCOM_ANDROID_SECRET',
|
||||
'EXPORT_TASK_HANDLER_URL',
|
||||
] // Allow some vars to be null/empty
|
||||
|
||||
const envParser =
|
||||
@ -300,6 +302,7 @@ export function getEnv(): BackendEnv {
|
||||
integrationExporterUrl: parse('INTEGRATION_EXPORTER_URL'),
|
||||
integrationImporterUrl: parse('INTEGRATION_IMPORTER_URL'),
|
||||
importerMetricsUrl: parse('IMPORTER_METRICS_COLLECTOR_URL'),
|
||||
exportTaskHandlerUrl: parse('EXPORT_TASK_HANDLER_URL'),
|
||||
}
|
||||
const imageProxy = {
|
||||
url: parse('IMAGE_PROXY_URL'),
|
||||
|
||||
@ -29,6 +29,7 @@ import { BulkActionData, BULK_ACTION_JOB_NAME } from '../jobs/bulk_action'
|
||||
import { CallWebhookJobData, CALL_WEBHOOK_JOB_NAME } from '../jobs/call_webhook'
|
||||
import { SendEmailJobData, SEND_EMAIL_JOB } from '../jobs/email/send_email'
|
||||
import { EXPIRE_FOLDERS_JOB_NAME } from '../jobs/expire_folders'
|
||||
import { EXPORT_JOB_NAME } from '../jobs/export'
|
||||
import { THUMBNAIL_JOB } from '../jobs/find_thumbnail'
|
||||
import { GENERATE_PREVIEW_CONTENT_JOB } from '../jobs/generate_preview_content'
|
||||
import { EXPORT_ALL_ITEMS_JOB_NAME } from '../jobs/integration/export_all_items'
|
||||
@ -113,14 +114,13 @@ export const getJobPriority = (jobName: string): number => {
|
||||
case THUMBNAIL_JOB:
|
||||
return 10
|
||||
case `${REFRESH_FEED_JOB_NAME}_low`:
|
||||
case EXPORT_ITEM_JOB_NAME:
|
||||
case CREATE_DIGEST_JOB:
|
||||
return 50
|
||||
case EXPORT_ALL_ITEMS_JOB_NAME:
|
||||
case REFRESH_ALL_FEEDS_JOB_NAME:
|
||||
case GENERATE_PREVIEW_CONTENT_JOB:
|
||||
case PRUNE_TRASH_JOB:
|
||||
case EXPIRE_FOLDERS_JOB_NAME:
|
||||
case EXPORT_JOB_NAME:
|
||||
return 100
|
||||
|
||||
default:
|
||||
@ -1073,4 +1073,23 @@ export const enqueueExpireFoldersJob = async () => {
|
||||
)
|
||||
}
|
||||
|
||||
export const queueExportJob = async (userId: string, exportId: string) => {
|
||||
const queue = await getQueue()
|
||||
if (!queue) {
|
||||
return undefined
|
||||
}
|
||||
|
||||
return queue.add(
|
||||
EXPORT_JOB_NAME,
|
||||
{ userId, exportId },
|
||||
{
|
||||
jobId: `${EXPORT_JOB_NAME}_${userId}_${JOB_VERSION}`,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
priority: getJobPriority(EXPORT_JOB_NAME),
|
||||
attempts: 1,
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
export default createHttpTaskWithToken
|
||||
|
||||
@ -20,7 +20,7 @@ import showdown from 'showdown'
|
||||
import { ILike } from 'typeorm'
|
||||
import { promisify } from 'util'
|
||||
import { v4 as uuid } from 'uuid'
|
||||
import { Highlight } from '../entity/highlight'
|
||||
import { Highlight, HighlightType } from '../entity/highlight'
|
||||
import { StatusType } from '../entity/user'
|
||||
import { env } from '../env'
|
||||
import { PageType, PreparedDocumentInput } from '../generated/graphql'
|
||||
@ -865,3 +865,21 @@ export const parseFeed = async (
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
const formatHighlightQuote = (quote: string): string => {
|
||||
// replace all empty lines with blockquote '>' to preserve paragraphs
|
||||
return quote.replace(/^(?=\n)$|^\s*?\n/gm, '> ')
|
||||
}
|
||||
|
||||
export const highlightToMarkdown = (highlight: Highlight): string => {
|
||||
if (highlight.highlightType === HighlightType.Highlight && highlight.quote) {
|
||||
const quote = formatHighlightQuote(highlight.quote)
|
||||
const labels = highlight.labels?.map((label) => `#${label.name}`).join(' ')
|
||||
const note = highlight.annotation
|
||||
return `> ${quote} ${labels ? `\n\n${labels}` : ''}${
|
||||
note ? `\n\n${note}` : ''
|
||||
}`
|
||||
}
|
||||
|
||||
return ''
|
||||
}
|
||||
|
||||
@ -67,17 +67,17 @@ export const generateUploadSignedUrl = async (
|
||||
export const generateDownloadSignedUrl = async (
|
||||
filePathName: string,
|
||||
config?: {
|
||||
bucketName?: string
|
||||
expires?: number
|
||||
}
|
||||
): Promise<string> => {
|
||||
const options: GetSignedUrlConfig = {
|
||||
version: 'v4',
|
||||
action: 'read',
|
||||
expires: Date.now() + 240 * 60 * 1000, // four hours
|
||||
...config,
|
||||
expires: config?.expires ?? Date.now() + 240 * 60 * 1000, // four hours
|
||||
}
|
||||
const [url] = await storage
|
||||
.bucket(bucketName)
|
||||
.bucket(config?.bucketName || bucketName)
|
||||
.file(filePathName)
|
||||
.getSignedUrl(options)
|
||||
logger.info(`generating download signed url: ${url}`)
|
||||
@ -116,8 +116,11 @@ export const uploadToBucket = async (
|
||||
.save(data, { timeout: 30000, ...options }) // default timeout 30s
|
||||
}
|
||||
|
||||
export const createGCSFile = (filename: string): File => {
|
||||
return storage.bucket(bucketName).file(filename)
|
||||
export const createGCSFile = (
|
||||
filename: string,
|
||||
selectedBucket = bucketName
|
||||
): File => {
|
||||
return storage.bucket(selectedBucket).file(filename)
|
||||
}
|
||||
|
||||
export const downloadFromUrl = async (
|
||||
|
||||
23
packages/db/migrations/0186.do.create_export_table.sql
Executable file
23
packages/db/migrations/0186.do.create_export_table.sql
Executable file
@ -0,0 +1,23 @@
|
||||
-- Type: DO
|
||||
-- Name: create_export_table
|
||||
-- Description: Create a table to store the export information
|
||||
|
||||
BEGIN;
|
||||
|
||||
CREATE TABLE omnivore.export (
|
||||
id UUID PRIMARY KEY DEFAULT uuid_generate_v1mc(),
|
||||
user_id UUID NOT NULL REFERENCES omnivore.user(id) ON DELETE CASCADE,
|
||||
state TEXT NOT NULL,
|
||||
total_items INT DEFAULT 0,
|
||||
processed_items INT DEFAULT 0,
|
||||
task_id TEXT,
|
||||
signed_url TEXT,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE INDEX export_user_id_idx ON omnivore.export(user_id);
|
||||
|
||||
GRANT SELECT, INSERT, UPDATE, DELETE ON omnivore.export TO omnivore_user;
|
||||
|
||||
COMMIT;
|
||||
9
packages/db/migrations/0186.undo.create_export_table.sql
Executable file
9
packages/db/migrations/0186.undo.create_export_table.sql
Executable file
@ -0,0 +1,9 @@
|
||||
-- Type: UNDO
|
||||
-- Name: create_export_table
|
||||
-- Description: Create a table to store the export information
|
||||
|
||||
BEGIN;
|
||||
|
||||
DROP TABLE omnivore.export;
|
||||
|
||||
COMMIT;
|
||||
@ -18,6 +18,7 @@
|
||||
"dev": "concurrently \"tsc -w\" \"nodemon --watch ./build/ --exec npm run start\""
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/archiver": "^6.0.2",
|
||||
"@types/chai": "^4.3.4",
|
||||
"@types/mocha": "^10.0.1",
|
||||
"eslint-plugin-prettier": "^4.0.0"
|
||||
@ -28,7 +29,7 @@
|
||||
"@omnivore-app/api": "^1.0.4",
|
||||
"@omnivore/utils": "1.0.0",
|
||||
"@sentry/serverless": "^7.77.0",
|
||||
"csv-stringify": "^6.4.0",
|
||||
"archiver": "^7.0.1",
|
||||
"dotenv": "^16.0.1",
|
||||
"jsonwebtoken": "^8.5.1",
|
||||
"nodemon": "^2.0.15",
|
||||
|
||||
@ -1,10 +1,11 @@
|
||||
import { File, Storage } from '@google-cloud/storage'
|
||||
import { Omnivore } from '@omnivore-app/api'
|
||||
import { Highlight, Omnivore } from '@omnivore-app/api'
|
||||
import { RedisDataSource } from '@omnivore/utils'
|
||||
import * as Sentry from '@sentry/serverless'
|
||||
import { stringify } from 'csv-stringify'
|
||||
import archiver from 'archiver'
|
||||
import * as dotenv from 'dotenv'
|
||||
import * as jwt from 'jsonwebtoken'
|
||||
import { PassThrough } from 'stream'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { queueEmailJob } from './job'
|
||||
|
||||
@ -35,7 +36,7 @@ const createSignedUrl = async (file: File): Promise<string> => {
|
||||
return signedUrl[0]
|
||||
}
|
||||
|
||||
export const sendExportCompletedEmail = async (
|
||||
const sendExportCompletedEmail = async (
|
||||
redisDataSource: RedisDataSource,
|
||||
userId: string,
|
||||
urlToDownload: string
|
||||
@ -47,6 +48,24 @@ export const sendExportCompletedEmail = async (
|
||||
})
|
||||
}
|
||||
|
||||
const formatHighlightQuote = (quote: string): string => {
|
||||
// replace all empty lines with blockquote '>' to preserve paragraphs
|
||||
return quote.replace(/^(?=\n)$|^\s*?\n/gm, '> ')
|
||||
}
|
||||
|
||||
const highlightToMarkdown = (highlight: Highlight): string => {
|
||||
if (highlight.type === 'HIGHLIGHT' && highlight.quote) {
|
||||
const quote = formatHighlightQuote(highlight.quote)
|
||||
const labels = highlight.labels?.map((label) => `#${label.name}`).join(' ')
|
||||
const note = highlight.annotation
|
||||
return `> ${quote} ${labels ? `\n\n${labels}` : ''}${
|
||||
note ? `\n\n${note}` : ''
|
||||
}`
|
||||
}
|
||||
|
||||
return ''
|
||||
}
|
||||
|
||||
export const exporter = Sentry.GCPFunction.wrapHttpFunction(
|
||||
async (req, res) => {
|
||||
console.log('start to export')
|
||||
@ -81,103 +100,132 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction(
|
||||
})
|
||||
|
||||
try {
|
||||
// write the exported data to a csv file and upload it to gcs
|
||||
// path style: exports/<uid>/<date>/<uuid>.csv
|
||||
// export data as a zip file:
|
||||
// exports/{userId}/{date}/{uuid}.zip
|
||||
// - metadata.json
|
||||
// - /content
|
||||
// - {slug}.html
|
||||
// - /highlights
|
||||
// - {slug}.md
|
||||
const dateStr = new Date().toISOString()
|
||||
const fileUuid = uuidv4()
|
||||
const fullPath = `exports/${claims.uid}/${dateStr}/${fileUuid}.csv`
|
||||
const fullPath = `exports/${claims.uid}/${dateStr}/${fileUuid}.zip`
|
||||
|
||||
const file = createGCSFile(GCS_BUCKET, fullPath)
|
||||
|
||||
// stringify the data and pipe it to the write_stream
|
||||
const stringifier = stringify({
|
||||
header: true,
|
||||
columns: [
|
||||
'id',
|
||||
'title',
|
||||
'description',
|
||||
'labels',
|
||||
'author',
|
||||
'site_name',
|
||||
'original_url',
|
||||
'slug',
|
||||
'updated_at',
|
||||
'saved_at',
|
||||
'type',
|
||||
'published_at',
|
||||
'url',
|
||||
'thumbnail',
|
||||
'read_at',
|
||||
'word_count',
|
||||
'reading_progress_percent',
|
||||
'archived_at',
|
||||
],
|
||||
// Create a PassThrough stream
|
||||
const passthroughStream = new PassThrough()
|
||||
|
||||
// Pipe the PassThrough stream to the GCS file write stream
|
||||
const writeStream = file.createWriteStream({
|
||||
metadata: {
|
||||
contentType: 'application/zip',
|
||||
},
|
||||
})
|
||||
|
||||
stringifier
|
||||
.pipe(
|
||||
file.createWriteStream({
|
||||
contentType: 'text/csv',
|
||||
})
|
||||
)
|
||||
.on('error', (err) => {
|
||||
console.error('error writing to file', err)
|
||||
})
|
||||
.on('finish', () => {
|
||||
console.log('done writing to file')
|
||||
})
|
||||
passthroughStream.pipe(writeStream)
|
||||
|
||||
// Handle any errors in the streams
|
||||
writeStream.on('error', (err) => {
|
||||
console.error('Error writing to GCS:', err)
|
||||
})
|
||||
|
||||
writeStream.on('finish', () => {
|
||||
console.log('File successfully written to GCS')
|
||||
})
|
||||
|
||||
// Initialize archiver for zipping files
|
||||
const archive = archiver('zip', {
|
||||
zlib: { level: 9 }, // Compression level
|
||||
})
|
||||
|
||||
// Handle any archiver errors
|
||||
archive.on('error', (err) => {
|
||||
throw err
|
||||
})
|
||||
|
||||
// Pipe the archiver output to the PassThrough stream
|
||||
archive.pipe(passthroughStream)
|
||||
|
||||
// fetch data from the database
|
||||
const omnivore = new Omnivore({
|
||||
apiKey: claims.token,
|
||||
})
|
||||
|
||||
const batchSize = 20
|
||||
let cursor = 0
|
||||
let hasNext = false
|
||||
do {
|
||||
const response = await omnivore.items.search({
|
||||
first: 100,
|
||||
first: batchSize,
|
||||
after: cursor,
|
||||
includeContent: false,
|
||||
includeContent: true,
|
||||
query: 'in:all',
|
||||
})
|
||||
|
||||
const items = response.edges.map((edge) => edge.node)
|
||||
cursor = response.pageInfo.endCursor
|
||||
? parseInt(response.pageInfo.endCursor)
|
||||
: 0
|
||||
hasNext = response.pageInfo.hasNextPage
|
||||
|
||||
const size = items.length
|
||||
// write data to the csv file
|
||||
if (items.length > 0) {
|
||||
// write the list of urls, state and labels to the stream
|
||||
items.forEach((item) =>
|
||||
stringifier.write({
|
||||
id: item.id,
|
||||
title: item.title,
|
||||
description: item.description,
|
||||
labels: item.labels?.map((label) => label.name).join(','),
|
||||
author: item.author,
|
||||
site_name: item.siteName,
|
||||
original_url: item.originalArticleUrl,
|
||||
slug: item.slug,
|
||||
updated_at: item.updatedAt,
|
||||
saved_at: item.savedAt,
|
||||
type: item.pageType,
|
||||
published_at: item.publishedAt,
|
||||
url: item.url,
|
||||
thumbnail: item.image,
|
||||
read_at: item.readAt,
|
||||
word_count: item.wordsCount,
|
||||
reading_progress_percent: item.readingProgressPercent,
|
||||
archived_at: item.archivedAt,
|
||||
})
|
||||
)
|
||||
if (size > 0) {
|
||||
// Add the metadata.json file to the root of the zip
|
||||
const metadata = items.map((item) => ({
|
||||
id: item.id,
|
||||
slug: item.slug,
|
||||
title: item.title,
|
||||
description: item.description,
|
||||
author: item.author,
|
||||
url: item.originalArticleUrl,
|
||||
state: item.isArchived ? 'archived' : 'active',
|
||||
readingProgress: item.readingProgressPercent,
|
||||
thumbnail: item.image,
|
||||
labels: item.labels?.map((label) => label.name),
|
||||
savedAt: item.savedAt,
|
||||
updatedAt: item.updatedAt,
|
||||
publishedAt: item.publishedAt,
|
||||
}))
|
||||
|
||||
// sleep for 1 second to avoid rate limiting
|
||||
await new Promise((resolve) => setTimeout(resolve, 1000))
|
||||
archive.append(JSON.stringify(metadata, null, 2), {
|
||||
name: `metadata_${cursor}_to_${cursor + size}.json`,
|
||||
})
|
||||
|
||||
// Loop through the items and add files to /content and /highlights directories
|
||||
items.forEach((item) => {
|
||||
const slug = item.slug
|
||||
const content = item.content
|
||||
const highlights = item.highlights
|
||||
if (content) {
|
||||
// Add content files to /content
|
||||
archive.append(content, {
|
||||
name: `content/${slug}.html`,
|
||||
})
|
||||
}
|
||||
|
||||
if (highlights?.length) {
|
||||
const markdown = highlights.map(highlightToMarkdown).join('\n\n')
|
||||
|
||||
// Add highlight files to /highlights
|
||||
archive.append(markdown, {
|
||||
name: `highlights/${slug}.md`,
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
cursor = response.pageInfo.endCursor
|
||||
? parseInt(response.pageInfo.endCursor)
|
||||
: 0
|
||||
hasNext = response.pageInfo.hasNextPage
|
||||
}
|
||||
} while (hasNext)
|
||||
|
||||
stringifier.end()
|
||||
// Finalize the archive
|
||||
await archive.finalize()
|
||||
|
||||
// Wait until the zip file is completely written
|
||||
await new Promise((resolve, reject) => {
|
||||
writeStream.on('finish', resolve)
|
||||
writeStream.on('error', reject)
|
||||
})
|
||||
|
||||
// generate a temporary signed url for the csv file
|
||||
const signedUrl = await createSignedUrl(file)
|
||||
|
||||
Reference in New Issue
Block a user