diff --git a/packages/api/src/jobs/export.ts b/packages/api/src/jobs/export.ts new file mode 100644 index 000000000..8d6f6dc5b --- /dev/null +++ b/packages/api/src/jobs/export.ts @@ -0,0 +1,40 @@ +import axios from 'axios' +import jwt from 'jsonwebtoken' +import { env } from '../env' +import { findActiveUser } from '../services/user' +import { logger } from '../utils/logger' + +export interface ExportJobData { + userId: string +} + +export const EXPORT_JOB_NAME = 'export' + +export const exportJob = async (jobData: ExportJobData) => { + const { userId } = jobData + const user = await findActiveUser(userId) + if (!user) { + logger.error('user not found', { + userId, + }) + return + } + + logger.info('exporting all items...', { + userId, + }) + + const token = jwt.sign( + { + uid: userId, + }, + env.server.jwtSecret, + { expiresIn: '1d' } + ) + + await axios.post(env.queue.exportTaskHandlerUrl, undefined, { + headers: { + OmnivoreAuthorizationHeader: token, + }, + }) +} diff --git a/packages/api/src/routers/export_router.ts b/packages/api/src/routers/export_router.ts new file mode 100644 index 000000000..495d1a3fc --- /dev/null +++ b/packages/api/src/routers/export_router.ts @@ -0,0 +1,58 @@ +import cors from 'cors' +import express, { Router } from 'express' +import { getClaimsByToken, getTokenByRequest } from '../utils/auth' +import { corsConfig } from '../utils/corsConfig' +import { queueExportJob } from '../utils/createTask' +import { logger } from '../utils/logger' + +export function exportRouter() { + const router = Router() + + // eslint-disable-next-line @typescript-eslint/no-misused-promises + router.get('/', cors(corsConfig), async (req, res) => { + const token = getTokenByRequest(req) + // get claims from token + const claims = await getClaimsByToken(token) + if (!claims) { + logger.error('Token not found') + return res.status(401).send({ + error: 'UNAUTHORIZED', + }) + } + + // get user by uid from claims + const userId = claims.uid + + try { + const job = await queueExportJob(userId) + + if (!job) { + logger.error('Failed to queue export job', { + userId, + }) + return res.status(500).send({ + error: 'INTERNAL_ERROR', + }) + } + + logger.info('Export job queued', { + userId, + jobId: job.id, + }) + + res.send({ + jobId: job.id, + }) + } catch (error) { + logger.error('Error exporting all items', { + userId, + error, + }) + return res.status(500).send({ + error: 'INTERNAL_ERROR', + }) + } + }) + + return router +} diff --git a/packages/api/src/util.ts b/packages/api/src/util.ts index 9590c2cd6..8d0eefc5c 100755 --- a/packages/api/src/util.ts +++ b/packages/api/src/util.ts @@ -87,6 +87,7 @@ export interface BackendEnv { integrationExporterUrl: string integrationImporterUrl: string importerMetricsUrl: string + exportTaskHandlerUrl: string } fileUpload: { gcsUploadBucket: string @@ -199,6 +200,7 @@ const nullableEnvVars = [ 'INTERCOM_WEB_SECRET', 'INTERCOM_IOS_SECRET', 'INTERCOM_ANDROID_SECRET', + 'EXPORT_TASK_HANDLER_URL', ] // Allow some vars to be null/empty const envParser = @@ -300,6 +302,7 @@ export function getEnv(): BackendEnv { integrationExporterUrl: parse('INTEGRATION_EXPORTER_URL'), integrationImporterUrl: parse('INTEGRATION_IMPORTER_URL'), importerMetricsUrl: parse('IMPORTER_METRICS_COLLECTOR_URL'), + exportTaskHandlerUrl: parse('EXPORT_TASK_HANDLER_URL'), } const imageProxy = { url: parse('IMAGE_PROXY_URL'), diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index 2b21e63b0..4b704d9c0 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -29,6 +29,7 @@ import { BulkActionData, BULK_ACTION_JOB_NAME } from '../jobs/bulk_action' import { CallWebhookJobData, CALL_WEBHOOK_JOB_NAME } from '../jobs/call_webhook' import { SendEmailJobData, SEND_EMAIL_JOB } from '../jobs/email/send_email' import { EXPIRE_FOLDERS_JOB_NAME } from '../jobs/expire_folders' +import { EXPORT_JOB_NAME } from '../jobs/export' import { THUMBNAIL_JOB } from '../jobs/find_thumbnail' import { GENERATE_PREVIEW_CONTENT_JOB } from '../jobs/generate_preview_content' import { EXPORT_ALL_ITEMS_JOB_NAME } from '../jobs/integration/export_all_items' @@ -113,14 +114,13 @@ export const getJobPriority = (jobName: string): number => { case THUMBNAIL_JOB: return 10 case `${REFRESH_FEED_JOB_NAME}_low`: - case EXPORT_ITEM_JOB_NAME: case CREATE_DIGEST_JOB: return 50 - case EXPORT_ALL_ITEMS_JOB_NAME: case REFRESH_ALL_FEEDS_JOB_NAME: case GENERATE_PREVIEW_CONTENT_JOB: case PRUNE_TRASH_JOB: case EXPIRE_FOLDERS_JOB_NAME: + case EXPORT_JOB_NAME: return 100 default: @@ -1073,4 +1073,23 @@ export const enqueueExpireFoldersJob = async () => { ) } +export const queueExportJob = async (userId: string) => { + const queue = await getQueue() + if (!queue) { + return undefined + } + + return queue.add( + EXPORT_JOB_NAME, + { userId }, + { + jobId: `${EXPORT_JOB_NAME}_${userId}_${JOB_VERSION}`, + removeOnComplete: true, + removeOnFail: true, + priority: getJobPriority(EXPORT_JOB_NAME), + attempts: 1, + } + ) +} + export default createHttpTaskWithToken