add bulk upload original content job

This commit is contained in:
Hongbo Wu
2024-05-10 14:37:05 +08:00
parent a56562041d
commit 01ebcbb16b
12 changed files with 189 additions and 48 deletions

View File

@ -127,7 +127,9 @@ export const _findThumbnail = (imagesSizes: (ImageSize | null)[]) => {
export const findThumbnail = async (data: Data) => {
const { libraryItemId, userId } = data
const item = await findLibraryItemById(libraryItemId, userId)
const item = await findLibraryItemById(libraryItemId, userId, {
select: ['thumbnail', 'readableContent'],
})
if (!item) {
logger.info('page not found')
return false

View File

@ -12,6 +12,7 @@ import { saveFile } from '../services/save_file'
import { savePage } from '../services/save_page'
import { uploadFile } from '../services/upload_file'
import { logError, logger } from '../utils/logger'
import { downloadFromUrl, uploadToSignedUrl } from '../utils/uploads'
const signToken = promisify(jwt.sign)
@ -47,39 +48,6 @@ const isFetchResult = (obj: unknown): obj is FetchResult => {
return typeof obj === 'object' && obj !== null && 'finalUrl' in obj
}
const uploadToSignedUrl = async (
uploadSignedUrl: string,
contentType: string,
contentObjUrl: string
) => {
const maxContentLength = 10 * 1024 * 1024 // 10MB
logger.info('downloading content', {
contentObjUrl,
})
// download the content as stream and max 10MB
const response = await axios.get(contentObjUrl, {
responseType: 'stream',
maxContentLength,
timeout: REQUEST_TIMEOUT,
})
logger.info('uploading to signed url', {
uploadSignedUrl,
contentType,
})
// upload the stream to the signed url
await axios.put(uploadSignedUrl, response.data, {
headers: {
'Content-Type': contentType,
},
maxBodyLength: maxContentLength,
timeout: REQUEST_TIMEOUT,
})
}
const uploadPdf = async (
url: string,
userId: string,
@ -98,7 +66,19 @@ const uploadPdf = async (
throw new Error('error while getting upload id and signed url')
}
await uploadToSignedUrl(result.uploadSignedUrl, 'application/pdf', url)
logger.info('downloading content', {
url,
})
const data = await downloadFromUrl(url, REQUEST_TIMEOUT)
const uploadSignedUrl = result.uploadSignedUrl
const contentType = 'application/pdf'
logger.info('uploading to signed url', {
uploadSignedUrl,
contentType,
})
await uploadToSignedUrl(uploadSignedUrl, data, contentType, REQUEST_TIMEOUT)
logger.info('pdf uploaded successfully', {
url,

View File

@ -0,0 +1,56 @@
import { findLibraryItemById } from '../services/library_item'
import { htmlToHighlightedMarkdown, htmlToMarkdown } from '../utils/parser'
import { uploadToSignedUrl } from '../utils/uploads'
export const UPLOAD_CONTENT_JOB = 'UPLOAD_CONTENT_JOB'
type ContentFormat = 'markdown' | 'highlightedMarkdown' | 'original'
export interface UploadContentJobData {
libraryItemId: string
userId: string
format: ContentFormat
uploadUrl: string
}
const convertContent = (content: string, format: ContentFormat): string => {
switch (format) {
case 'markdown':
return htmlToMarkdown(content)
case 'highlightedMarkdown':
return htmlToHighlightedMarkdown(content)
case 'original':
return content
default:
throw new Error('Unsupported format')
}
}
const CONTENT_TYPES = {
markdown: 'text/markdown',
highlightedMarkdown: 'text/markdown',
original: 'text/html',
}
export const uploadContentJob = async (data: UploadContentJobData) => {
const { libraryItemId, userId, format, uploadUrl } = data
const libraryItem = await findLibraryItemById(libraryItemId, userId, {
select: ['originalContent'],
})
if (!libraryItem) {
throw new Error('Library item not found')
}
if (!libraryItem.originalContent) {
throw new Error('Original content not found')
}
const content = convertContent(libraryItem.originalContent, format)
// 1 minute timeout
const timeout = 60000
const contentType = CONTENT_TYPES[format]
await uploadToSignedUrl(uploadUrl, Buffer.from(content), contentType, timeout)
}

View File

@ -60,6 +60,7 @@ import {
UPDATE_LABELS_JOB,
} from './jobs/update_db'
import { updatePDFContentJob } from './jobs/update_pdf_content'
import { uploadContentJob, UPLOAD_CONTENT_JOB } from './jobs/upload_content'
import { redisDataSource } from './redis_data_source'
import { CACHED_READING_POSITION_PREFIX } from './services/cached_reading_position'
import { getJobPriority } from './utils/createTask'
@ -182,6 +183,8 @@ export const createWorker = (connection: ConnectionOptions) =>
return forwardEmailJob(job.data)
case CREATE_DIGEST_JOB:
return createDigest(job.data)
case UPLOAD_CONTENT_JOB:
return uploadContentJob(job.data)
default:
logger.warning(`[queue-processor] unhandled job: ${job.name}`)
}

View File

@ -399,6 +399,10 @@ export const getArticleResolver = authorized<
'recommendations.recommender',
'recommendations_recommender'
)
.leftJoinAndSelect(
'recommendations_recommender.profile',
'recommendations_recommender_profile'
)
.where('libraryItem.user_id = :uid', { uid })
// We allow the backend to use the ID instead of a slug to fetch the article

View File

@ -82,7 +82,21 @@ export const articleSavingRequestResolver = authorized<
let libraryItem: LibraryItem | null = null
if (id) {
libraryItem = await findLibraryItemById(id, uid)
libraryItem = await findLibraryItemById(id, uid, {
select: [
'id',
'state',
'originalUrl',
'slug',
'title',
'author',
'createdAt',
'updatedAt',
],
relations: {
user: true,
},
})
} else if (url) {
libraryItem = await findLibraryItemByUrl(cleanUrl(url), uid)
}

View File

@ -141,7 +141,14 @@ export const recommendResolver = authorized<
MutationRecommendArgs
>(async (_, { input }, { uid, log, signToken }) => {
try {
const item = await findLibraryItemById(input.pageId, uid)
const item = await findLibraryItemById(input.pageId, uid, {
select: ['id'],
relations: {
highlights: {
user: true,
},
},
})
if (!item) {
return {
errorCodes: [RecommendErrorCode.NotFound],
@ -259,7 +266,9 @@ export const recommendHighlightsResolver = authorized<
}
}
const item = await findLibraryItemById(input.pageId, uid)
const item = await findLibraryItemById(input.pageId, uid, {
select: ['id'],
})
if (!item) {
return {
errorCodes: [RecommendHighlightsErrorCode.NotFound],

View File

@ -94,7 +94,9 @@ export function articleRouter() {
})
try {
const item = await findLibraryItemById(articleId, uid)
const item = await findLibraryItemById(articleId, uid, {
select: ['title', 'readableContent', 'itemLanguage'],
})
if (!item) {
return res.status(404).send('Page not found')
}

View File

@ -146,7 +146,11 @@ export function pageRouter() {
return res.status(400).send({ errorCode: 'BAD_DATA' })
}
const item = await findLibraryItemById(itemId, claims.uid)
const item = await findLibraryItemById(itemId, claims.uid, {
relations: {
highlights: true,
},
})
if (!item) {
return res.status(404).send({ errorCode: 'NOT_FOUND' })
}

View File

@ -782,17 +782,27 @@ export const findLibraryItemsByIds = async (ids: string[], userId: string) => {
export const findLibraryItemById = async (
id: string,
userId: string
userId: string,
options?: {
select?: (keyof LibraryItem)[]
relations?: {
user?: boolean
labels?: boolean
highlights?:
| {
user?: boolean
}
| boolean
}
}
): Promise<LibraryItem | null> => {
return authTrx(
async (tx) =>
tx
.createQueryBuilder(LibraryItem, 'library_item')
.leftJoinAndSelect('library_item.labels', 'labels')
.leftJoinAndSelect('library_item.highlights', 'highlights')
.leftJoinAndSelect('highlights.user', 'user')
.where('library_item.id = :id', { id })
.getOne(),
tx.withRepository(libraryItemRepository).findOne({
select: options?.select,
where: { id },
relations: options?.relations,
}),
undefined,
userId
)

View File

@ -53,6 +53,10 @@ import {
UPDATE_HIGHLIGHT_JOB,
UPDATE_LABELS_JOB,
} from '../jobs/update_db'
import {
UploadContentJobData,
UPLOAD_CONTENT_JOB,
} from '../jobs/upload_content'
import { getBackendQueue, JOB_VERSION } from '../queue-processor'
import { redisDataSource } from '../redis_data_source'
import { writeDigest } from '../services/digest'
@ -94,6 +98,7 @@ export const getJobPriority = (jobName: string): number => {
case `${REFRESH_FEED_JOB_NAME}_low`:
case EXPORT_ITEM_JOB_NAME:
case CREATE_DIGEST_JOB:
case UPLOAD_CONTENT_JOB:
return 50
case EXPORT_ALL_ITEMS_JOB_NAME:
case REFRESH_ALL_FEEDS_JOB_NAME:
@ -953,4 +958,24 @@ export const enqueueCreateDigest = async (
}
}
export const enqueueBulkUploadContentJob = async (
data: UploadContentJobData[]
) => {
const queue = await getBackendQueue()
if (!queue) {
return ''
}
const jobs = data.map((d) => ({
name: UPLOAD_CONTENT_JOB,
data: d,
opts: {
attempts: 3,
priority: getJobPriority(UPLOAD_CONTENT_JOB),
},
}))
return queue.addBulk(jobs)
}
export default createHttpTaskWithToken

View File

@ -1,6 +1,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { File, GetSignedUrlConfig, Storage } from '@google-cloud/storage'
import axios from 'axios'
import { ContentReaderType } from '../entity/library_item'
import { env } from '../env'
import { PageType } from '../generated/graphql'
@ -33,6 +34,7 @@ const storage = env.fileUpload?.gcsUploadSAKeyFilePath
? new Storage({ keyFilename: env.fileUpload.gcsUploadSAKeyFilePath })
: new Storage()
const bucketName = env.fileUpload.gcsUploadBucket
const maxContentLength = 10 * 1024 * 1024 // 10MB
export const countOfFilesWithPrefix = async (prefix: string) => {
const [files] = await storage.bucket(bucketName).getFiles({ prefix })
@ -112,3 +114,33 @@ export const uploadToBucket = async (
export const createGCSFile = (filename: string): File => {
return storage.bucket(bucketName).file(filename)
}
export const downloadFromUrl = async (
contentObjUrl: string,
timeout?: number
) => {
// download the content as stream and max 10MB
const response = await axios.get<Buffer>(contentObjUrl, {
responseType: 'stream',
maxContentLength,
timeout,
})
return response.data
}
export const uploadToSignedUrl = async (
uploadSignedUrl: string,
data: Buffer,
contentType: string,
timeout?: number
) => {
// upload the stream to the signed url
await axios.put(uploadSignedUrl, data, {
headers: {
'Content-Type': contentType,
},
maxBodyLength: maxContentLength,
timeout,
})
}