exports/{userId}/{date}/{uuid}.zip
- metadata_{start_page}_to_{end_page}.json
- /content
- {slug}.html
- /highlights
- {slug}.md
This commit is contained in:
@ -18,6 +18,7 @@
|
||||
"dev": "concurrently \"tsc -w\" \"nodemon --watch ./build/ --exec npm run start\""
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/archiver": "^6.0.2",
|
||||
"@types/chai": "^4.3.4",
|
||||
"@types/mocha": "^10.0.1",
|
||||
"eslint-plugin-prettier": "^4.0.0"
|
||||
@ -28,6 +29,7 @@
|
||||
"@omnivore-app/api": "^1.0.4",
|
||||
"@omnivore/utils": "1.0.0",
|
||||
"@sentry/serverless": "^7.77.0",
|
||||
"archiver": "^7.0.1",
|
||||
"csv-stringify": "^6.4.0",
|
||||
"dotenv": "^16.0.1",
|
||||
"jsonwebtoken": "^8.5.1",
|
||||
|
||||
@ -1,10 +1,11 @@
|
||||
import { File, Storage } from '@google-cloud/storage'
|
||||
import { Omnivore } from '@omnivore-app/api'
|
||||
import { Highlight, Omnivore } from '@omnivore-app/api'
|
||||
import { RedisDataSource } from '@omnivore/utils'
|
||||
import * as Sentry from '@sentry/serverless'
|
||||
import { stringify } from 'csv-stringify'
|
||||
import archiver from 'archiver'
|
||||
import * as dotenv from 'dotenv'
|
||||
import * as jwt from 'jsonwebtoken'
|
||||
import { PassThrough } from 'stream'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { queueEmailJob } from './job'
|
||||
|
||||
@ -35,7 +36,7 @@ const createSignedUrl = async (file: File): Promise<string> => {
|
||||
return signedUrl[0]
|
||||
}
|
||||
|
||||
export const sendExportCompletedEmail = async (
|
||||
const sendExportCompletedEmail = async (
|
||||
redisDataSource: RedisDataSource,
|
||||
userId: string,
|
||||
urlToDownload: string
|
||||
@ -47,6 +48,24 @@ export const sendExportCompletedEmail = async (
|
||||
})
|
||||
}
|
||||
|
||||
const formatHighlightQuote = (quote: string): string => {
|
||||
// replace all empty lines with blockquote '>' to preserve paragraphs
|
||||
return quote.replace(/^(?=\n)$|^\s*?\n/gm, '> ')
|
||||
}
|
||||
|
||||
const highlightToMarkdown = (highlight: Highlight): string => {
|
||||
if (highlight.type === 'HIGHLIGHT' && highlight.quote) {
|
||||
const quote = formatHighlightQuote(highlight.quote)
|
||||
const labels = highlight.labels?.map((label) => `#${label.name}`).join(' ')
|
||||
const note = highlight.annotation
|
||||
return `> ${quote} ${labels ? `\n\n${labels}` : ''}${
|
||||
note ? `\n\n${note}` : ''
|
||||
}`
|
||||
}
|
||||
|
||||
return ''
|
||||
}
|
||||
|
||||
export const exporter = Sentry.GCPFunction.wrapHttpFunction(
|
||||
async (req, res) => {
|
||||
console.log('start to export')
|
||||
@ -81,103 +100,132 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction(
|
||||
})
|
||||
|
||||
try {
|
||||
// write the exported data to a csv file and upload it to gcs
|
||||
// path style: exports/<uid>/<date>/<uuid>.csv
|
||||
// export data as a zip file:
|
||||
// exports/{userId}/{date}/{uuid}.zip
|
||||
// - metadata.json
|
||||
// - /content
|
||||
// - {slug}.html
|
||||
// - /highlights
|
||||
// - {slug}.md
|
||||
const dateStr = new Date().toISOString()
|
||||
const fileUuid = uuidv4()
|
||||
const fullPath = `exports/${claims.uid}/${dateStr}/${fileUuid}.csv`
|
||||
const fullPath = `exports/${claims.uid}/${dateStr}/${fileUuid}.zip`
|
||||
|
||||
const file = createGCSFile(GCS_BUCKET, fullPath)
|
||||
|
||||
// stringify the data and pipe it to the write_stream
|
||||
const stringifier = stringify({
|
||||
header: true,
|
||||
columns: [
|
||||
'id',
|
||||
'title',
|
||||
'description',
|
||||
'labels',
|
||||
'author',
|
||||
'site_name',
|
||||
'original_url',
|
||||
'slug',
|
||||
'updated_at',
|
||||
'saved_at',
|
||||
'type',
|
||||
'published_at',
|
||||
'url',
|
||||
'thumbnail',
|
||||
'read_at',
|
||||
'word_count',
|
||||
'reading_progress_percent',
|
||||
'archived_at',
|
||||
],
|
||||
// Create a PassThrough stream
|
||||
const passthroughStream = new PassThrough()
|
||||
|
||||
// Pipe the PassThrough stream to the GCS file write stream
|
||||
const writeStream = file.createWriteStream({
|
||||
metadata: {
|
||||
contentType: 'application/zip',
|
||||
},
|
||||
})
|
||||
|
||||
stringifier
|
||||
.pipe(
|
||||
file.createWriteStream({
|
||||
contentType: 'text/csv',
|
||||
})
|
||||
)
|
||||
.on('error', (err) => {
|
||||
console.error('error writing to file', err)
|
||||
})
|
||||
.on('finish', () => {
|
||||
console.log('done writing to file')
|
||||
})
|
||||
passthroughStream.pipe(writeStream)
|
||||
|
||||
// Handle any errors in the streams
|
||||
writeStream.on('error', (err) => {
|
||||
console.error('Error writing to GCS:', err)
|
||||
})
|
||||
|
||||
writeStream.on('finish', () => {
|
||||
console.log('File successfully written to GCS')
|
||||
})
|
||||
|
||||
// Initialize archiver for zipping files
|
||||
const archive = archiver('zip', {
|
||||
zlib: { level: 9 }, // Compression level
|
||||
})
|
||||
|
||||
// Handle any archiver errors
|
||||
archive.on('error', (err) => {
|
||||
throw err
|
||||
})
|
||||
|
||||
// Pipe the archiver output to the PassThrough stream
|
||||
archive.pipe(passthroughStream)
|
||||
|
||||
// fetch data from the database
|
||||
const omnivore = new Omnivore({
|
||||
apiKey: claims.token,
|
||||
})
|
||||
|
||||
const batchSize = 20
|
||||
let cursor = 0
|
||||
let hasNext = false
|
||||
do {
|
||||
const response = await omnivore.items.search({
|
||||
first: 100,
|
||||
first: batchSize,
|
||||
after: cursor,
|
||||
includeContent: false,
|
||||
includeContent: true,
|
||||
query: 'in:all',
|
||||
})
|
||||
|
||||
const items = response.edges.map((edge) => edge.node)
|
||||
cursor = response.pageInfo.endCursor
|
||||
? parseInt(response.pageInfo.endCursor)
|
||||
: 0
|
||||
hasNext = response.pageInfo.hasNextPage
|
||||
|
||||
const size = items.length
|
||||
// write data to the csv file
|
||||
if (items.length > 0) {
|
||||
// write the list of urls, state and labels to the stream
|
||||
items.forEach((item) =>
|
||||
stringifier.write({
|
||||
id: item.id,
|
||||
title: item.title,
|
||||
description: item.description,
|
||||
labels: item.labels?.map((label) => label.name).join(','),
|
||||
author: item.author,
|
||||
site_name: item.siteName,
|
||||
original_url: item.originalArticleUrl,
|
||||
slug: item.slug,
|
||||
updated_at: item.updatedAt,
|
||||
saved_at: item.savedAt,
|
||||
type: item.pageType,
|
||||
published_at: item.publishedAt,
|
||||
url: item.url,
|
||||
thumbnail: item.image,
|
||||
read_at: item.readAt,
|
||||
word_count: item.wordsCount,
|
||||
reading_progress_percent: item.readingProgressPercent,
|
||||
archived_at: item.archivedAt,
|
||||
})
|
||||
)
|
||||
if (size > 0) {
|
||||
// Add the metadata.json file to the root of the zip
|
||||
const metadata = items.map((item) => ({
|
||||
id: item.id,
|
||||
slug: item.slug,
|
||||
title: item.title,
|
||||
description: item.description,
|
||||
author: item.author,
|
||||
url: item.originalArticleUrl,
|
||||
state: item.isArchived ? 'archived' : 'active',
|
||||
readingProgress: item.readingProgressPercent,
|
||||
thumbnail: item.image,
|
||||
labels: item.labels?.map((label) => label.name),
|
||||
savedAt: item.savedAt,
|
||||
updatedAt: item.updatedAt,
|
||||
publishedAt: item.publishedAt,
|
||||
}))
|
||||
|
||||
// sleep for 1 second to avoid rate limiting
|
||||
await new Promise((resolve) => setTimeout(resolve, 1000))
|
||||
archive.append(JSON.stringify(metadata, null, 2), {
|
||||
name: `metadata_${cursor}_to_${cursor + size}.json`,
|
||||
})
|
||||
|
||||
// Loop through the items and add files to /content and /highlights directories
|
||||
items.forEach((item) => {
|
||||
const slug = item.slug
|
||||
const content = item.content
|
||||
const highlights = item.highlights
|
||||
if (content) {
|
||||
// Add content files to /content
|
||||
archive.append(content, {
|
||||
name: `content/${slug}.html`,
|
||||
})
|
||||
}
|
||||
|
||||
if (highlights?.length) {
|
||||
const markdown = highlights.map(highlightToMarkdown).join('\n\n')
|
||||
|
||||
// Add highlight files to /highlights
|
||||
archive.append(markdown, {
|
||||
name: `highlights/${slug}.md`,
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
cursor = response.pageInfo.endCursor
|
||||
? parseInt(response.pageInfo.endCursor)
|
||||
: 0
|
||||
hasNext = response.pageInfo.hasNextPage
|
||||
}
|
||||
} while (hasNext)
|
||||
|
||||
stringifier.end()
|
||||
// Finalize the archive
|
||||
await archive.finalize()
|
||||
|
||||
// Wait until the zip file is completely written
|
||||
await new Promise((resolve, reject) => {
|
||||
writeStream.on('finish', resolve)
|
||||
writeStream.on('error', reject)
|
||||
})
|
||||
|
||||
// generate a temporary signed url for the csv file
|
||||
const signedUrl = await createSignedUrl(file)
|
||||
|
||||
Reference in New Issue
Block a user