add export job and get api

This commit is contained in:
Hongbo Wu
2024-08-26 19:05:09 +08:00
parent d4aee94d7e
commit a75acc5185
4 changed files with 122 additions and 2 deletions

View File

@ -0,0 +1,40 @@
import axios from 'axios'
import jwt from 'jsonwebtoken'
import { env } from '../env'
import { findActiveUser } from '../services/user'
import { logger } from '../utils/logger'
export interface ExportJobData {
userId: string
}
export const EXPORT_JOB_NAME = 'export'
export const exportJob = async (jobData: ExportJobData) => {
const { userId } = jobData
const user = await findActiveUser(userId)
if (!user) {
logger.error('user not found', {
userId,
})
return
}
logger.info('exporting all items...', {
userId,
})
const token = jwt.sign(
{
uid: userId,
},
env.server.jwtSecret,
{ expiresIn: '1d' }
)
await axios.post(env.queue.exportTaskHandlerUrl, undefined, {
headers: {
OmnivoreAuthorizationHeader: token,
},
})
}

View File

@ -0,0 +1,58 @@
import cors from 'cors'
import express, { Router } from 'express'
import { getClaimsByToken, getTokenByRequest } from '../utils/auth'
import { corsConfig } from '../utils/corsConfig'
import { queueExportJob } from '../utils/createTask'
import { logger } from '../utils/logger'
export function exportRouter() {
const router = Router()
// eslint-disable-next-line @typescript-eslint/no-misused-promises
router.get('/', cors<express.Request>(corsConfig), async (req, res) => {
const token = getTokenByRequest(req)
// get claims from token
const claims = await getClaimsByToken(token)
if (!claims) {
logger.error('Token not found')
return res.status(401).send({
error: 'UNAUTHORIZED',
})
}
// get user by uid from claims
const userId = claims.uid
try {
const job = await queueExportJob(userId)
if (!job) {
logger.error('Failed to queue export job', {
userId,
})
return res.status(500).send({
error: 'INTERNAL_ERROR',
})
}
logger.info('Export job queued', {
userId,
jobId: job.id,
})
res.send({
jobId: job.id,
})
} catch (error) {
logger.error('Error exporting all items', {
userId,
error,
})
return res.status(500).send({
error: 'INTERNAL_ERROR',
})
}
})
return router
}

View File

@ -87,6 +87,7 @@ export interface BackendEnv {
integrationExporterUrl: string
integrationImporterUrl: string
importerMetricsUrl: string
exportTaskHandlerUrl: string
}
fileUpload: {
gcsUploadBucket: string
@ -199,6 +200,7 @@ const nullableEnvVars = [
'INTERCOM_WEB_SECRET',
'INTERCOM_IOS_SECRET',
'INTERCOM_ANDROID_SECRET',
'EXPORT_TASK_HANDLER_URL',
] // Allow some vars to be null/empty
const envParser =
@ -300,6 +302,7 @@ export function getEnv(): BackendEnv {
integrationExporterUrl: parse('INTEGRATION_EXPORTER_URL'),
integrationImporterUrl: parse('INTEGRATION_IMPORTER_URL'),
importerMetricsUrl: parse('IMPORTER_METRICS_COLLECTOR_URL'),
exportTaskHandlerUrl: parse('EXPORT_TASK_HANDLER_URL'),
}
const imageProxy = {
url: parse('IMAGE_PROXY_URL'),

View File

@ -29,6 +29,7 @@ import { BulkActionData, BULK_ACTION_JOB_NAME } from '../jobs/bulk_action'
import { CallWebhookJobData, CALL_WEBHOOK_JOB_NAME } from '../jobs/call_webhook'
import { SendEmailJobData, SEND_EMAIL_JOB } from '../jobs/email/send_email'
import { EXPIRE_FOLDERS_JOB_NAME } from '../jobs/expire_folders'
import { EXPORT_JOB_NAME } from '../jobs/export'
import { THUMBNAIL_JOB } from '../jobs/find_thumbnail'
import { GENERATE_PREVIEW_CONTENT_JOB } from '../jobs/generate_preview_content'
import { EXPORT_ALL_ITEMS_JOB_NAME } from '../jobs/integration/export_all_items'
@ -113,14 +114,13 @@ export const getJobPriority = (jobName: string): number => {
case THUMBNAIL_JOB:
return 10
case `${REFRESH_FEED_JOB_NAME}_low`:
case EXPORT_ITEM_JOB_NAME:
case CREATE_DIGEST_JOB:
return 50
case EXPORT_ALL_ITEMS_JOB_NAME:
case REFRESH_ALL_FEEDS_JOB_NAME:
case GENERATE_PREVIEW_CONTENT_JOB:
case PRUNE_TRASH_JOB:
case EXPIRE_FOLDERS_JOB_NAME:
case EXPORT_JOB_NAME:
return 100
default:
@ -1073,4 +1073,23 @@ export const enqueueExpireFoldersJob = async () => {
)
}
export const queueExportJob = async (userId: string) => {
const queue = await getQueue()
if (!queue) {
return undefined
}
return queue.add(
EXPORT_JOB_NAME,
{ userId },
{
jobId: `${EXPORT_JOB_NAME}_${userId}_${JOB_VERSION}`,
removeOnComplete: true,
removeOnFail: true,
priority: getJobPriority(EXPORT_JOB_NAME),
attempts: 1,
}
)
}
export default createHttpTaskWithToken