From 95657e3d3ea2d01145db02120119e53cf463907e Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Sun, 21 Jul 2024 11:11:29 +0800 Subject: [PATCH] queue email sending jobs in exporter and importer --- packages/api/src/jobs/ai/create_digest.ts | 1 + packages/api/src/jobs/email/inbound_emails.ts | 1 + packages/api/src/jobs/email/send_email.ts | 14 +++++- .../api/src/resolvers/recent_emails/index.ts | 1 + packages/api/src/server.ts | 1 - packages/api/src/services/send_emails.ts | 3 ++ packages/import-handler/src/csv.ts | 13 ++++-- packages/import-handler/src/index.ts | 46 +++++++++++++------ packages/import-handler/src/job.ts | 23 ++++++++++ packages/import-handler/src/matterHistory.ts | 14 +++--- packages/import-handler/src/metrics.ts | 10 +++- packages/import-handler/test/csv/csv.test.ts | 2 +- .../test/matter/matter_importer.test.ts | 2 +- packages/import-handler/test/util.ts | 8 ++-- 14 files changed, 104 insertions(+), 35 deletions(-) create mode 100644 packages/import-handler/src/job.ts diff --git a/packages/api/src/jobs/ai/create_digest.ts b/packages/api/src/jobs/ai/create_digest.ts index 7bdbd8870..f265fd34d 100644 --- a/packages/api/src/jobs/ai/create_digest.ts +++ b/packages/api/src/jobs/ai/create_digest.ts @@ -623,6 +623,7 @@ const sendEmail = async (user: User, digest: Digest, channels: Channel[]) => { ` await enqueueSendEmail({ + userId: user.id, to: user.email, from: env.sender.message, subject: subTitle, diff --git a/packages/api/src/jobs/email/inbound_emails.ts b/packages/api/src/jobs/email/inbound_emails.ts index 6f1721660..5a047d51b 100644 --- a/packages/api/src/jobs/email/inbound_emails.ts +++ b/packages/api/src/jobs/email/inbound_emails.ts @@ -117,6 +117,7 @@ export const forwardEmailJob = async (data: EmailJobData) => { // forward non-newsletter emails to the registered email address const result = await enqueueSendEmail({ + userId: user.id, from: env.sender.message, to: user.email, subject: `Fwd: ${subject}`, diff --git a/packages/api/src/jobs/email/send_email.ts b/packages/api/src/jobs/email/send_email.ts index 130e2b14a..416cf94f0 100644 --- a/packages/api/src/jobs/email/send_email.ts +++ b/packages/api/src/jobs/email/send_email.ts @@ -1,5 +1,6 @@ import { env } from '../../env' import { sendWithMailJet } from '../../services/send_emails' +import { findActiveUser } from '../../services/user' import { Merge } from '../../util' import { logger } from '../../utils/logger' import { sendEmail } from '../../utils/sendEmail' @@ -9,7 +10,8 @@ export const SEND_EMAIL_JOB = 'send-email' type ContentType = { html: string } | { text: string } | { templateId: string } export type SendEmailJobData = Merge< { - to: string + userId: string + to?: string from?: string subject?: string html?: string @@ -22,6 +24,16 @@ export type SendEmailJobData = Merge< > export const sendEmailJob = async (data: SendEmailJobData) => { + if (!data.to) { + const user = await findActiveUser(data.userId) + if (!user) { + logger.error('user not found', data.userId) + return false + } + + data.to = user.email + } + if (process.env.USE_MAILJET && data.dynamicTemplateData) { return sendWithMailJet(data.to, data.dynamicTemplateData.link) } diff --git a/packages/api/src/resolvers/recent_emails/index.ts b/packages/api/src/resolvers/recent_emails/index.ts index cd19c0d11..de3538436 100644 --- a/packages/api/src/resolvers/recent_emails/index.ts +++ b/packages/api/src/resolvers/recent_emails/index.ts @@ -127,6 +127,7 @@ export const replyToEmailResolver = authorized< } const result = await enqueueSendEmail({ + userId: uid, to: recentEmail.replyTo || recentEmail.from, // send to the reply-to address if it exists or the from address subject: 'Re: ' + recentEmail.subject, text: reply, diff --git a/packages/api/src/server.ts b/packages/api/src/server.ts index 37df57fcc..13630af62 100755 --- a/packages/api/src/server.ts +++ b/packages/api/src/server.ts @@ -95,7 +95,6 @@ export const createApp = (): Express => { app.use('/api/auth', authLimiter, authRouter()) app.use('/api/mobile-auth', authLimiter, mobileAuthRouter()) app.use('/api/page', pageRouter()) - app.use('/api/user', userRouter()) app.use('/api/shortcuts', shortcutsRouter()) app.use('/api/article', articleRouter()) app.use('/api/ai-summary', aiSummariesRouter()) diff --git a/packages/api/src/services/send_emails.ts b/packages/api/src/services/send_emails.ts index 69a537bdb..988ed9183 100644 --- a/packages/api/src/services/send_emails.ts +++ b/packages/api/src/services/send_emails.ts @@ -18,6 +18,7 @@ export const sendNewAccountVerificationEmail = async (user: { } const result = await enqueueSendEmail({ + userId: user.id, to: user.email, dynamicTemplateData: dynamicTemplateData, templateId: env.sendgrid.confirmationTemplateId, @@ -78,6 +79,7 @@ export const sendAccountChangeEmail = async (user: { } const result = await enqueueSendEmail({ + userId: user.id, to: user.email, dynamicTemplateData: dynamicTemplateData, templateId: env.sendgrid.verificationTemplateId, @@ -100,6 +102,7 @@ export const sendPasswordResetEmail = async (user: { } const result = await enqueueSendEmail({ + userId: user.id, to: user.email, dynamicTemplateData: dynamicTemplateData, templateId: env.sendgrid.resetPasswordTemplateId, diff --git a/packages/import-handler/src/csv.ts b/packages/import-handler/src/csv.ts index fa3d209e1..7acdff4ce 100644 --- a/packages/import-handler/src/csv.ts +++ b/packages/import-handler/src/csv.ts @@ -46,7 +46,12 @@ const parseDate = (date: string): Date => { export const importCsv = async (ctx: ImportContext, stream: Stream) => { // create metrics in redis - await createMetrics(ctx.redisClient, ctx.userId, ctx.taskId, ctx.source) + await createMetrics( + ctx.redisDataSource.cacheClient, + ctx.userId, + ctx.taskId, + ctx.source + ) const parser = parse({ headers: true, @@ -68,7 +73,7 @@ export const importCsv = async (ctx: ImportContext, stream: Stream) => { // update total counter await updateMetrics( - ctx.redisClient, + ctx.redisDataSource, ctx.userId, ctx.taskId, ImportStatus.TOTAL @@ -79,7 +84,7 @@ export const importCsv = async (ctx: ImportContext, stream: Stream) => { ctx.countImported += 1 // update started counter await updateMetrics( - ctx.redisClient, + ctx.redisDataSource, ctx.userId, ctx.taskId, ImportStatus.STARTED @@ -96,7 +101,7 @@ export const importCsv = async (ctx: ImportContext, stream: Stream) => { ctx.countFailed += 1 // update invalid counter await updateMetrics( - ctx.redisClient, + ctx.redisDataSource, ctx.userId, ctx.taskId, ImportStatus.INVALID diff --git a/packages/import-handler/src/index.ts b/packages/import-handler/src/index.ts index de3379a26..06617dfe2 100644 --- a/packages/import-handler/src/index.ts +++ b/packages/import-handler/src/index.ts @@ -4,13 +4,13 @@ import { RedisDataSource } from '@omnivore/utils' import * as Sentry from '@sentry/serverless' import axios from 'axios' import 'dotenv/config' -import Redis from 'ioredis' import * as jwt from 'jsonwebtoken' import { Stream } from 'node:stream' import * as path from 'path' import { promisify } from 'util' import { v4 as uuid } from 'uuid' import { importCsv } from './csv' +import { queueEmailJob } from './job' import { importMatterArchive } from './matterHistory' import { ImportStatus, updateMetrics } from './metrics' import { CONTENT_FETCH_URL, createCloudTask, emailUserUrl } from './task' @@ -57,7 +57,7 @@ export type ImportContext = { countFailed: number urlHandler: UrlHandler contentHandler: ContentHandler - redisClient: Redis + redisDataSource: RedisDataSource taskId: string source: string } @@ -140,32 +140,40 @@ const createEmailCloudTask = async (userId: string, payload: unknown) => { ) } -const sendImportFailedEmail = async (userId: string) => { - return createEmailCloudTask(userId, { +const sendImportFailedEmail = async ( + redisDataSource: RedisDataSource, + userId: string +) => { + return queueEmailJob(redisDataSource, { + userId, subject: 'Your Omnivore import failed.', - body: `There was an error importing your file. Please ensure you uploaded the correct file type, if you need help, please email feedback@omnivore.app`, + html: `There was an error importing your file. Please ensure you uploaded the correct file type, if you need help, please email feedback@omnivore.app`, }) } export const sendImportStartedEmail = async ( + redisDataSource: RedisDataSource, userId: string, urlsEnqueued: number, urlsFailed: number ) => { - return createEmailCloudTask(userId, { + return queueEmailJob(redisDataSource, { + userId, subject: 'Your Omnivore import has started', - body: `We have started processing ${urlsEnqueued} URLs. ${urlsFailed} URLs are invalid.`, + html: `We have started processing ${urlsEnqueued} URLs. ${urlsFailed} URLs are invalid.`, }) } export const sendImportCompletedEmail = async ( + redisDataSource: RedisDataSource, userId: string, urlsImported: number, urlsFailed: number ) => { - return createEmailCloudTask(userId, { + return queueEmailJob(redisDataSource, { + userId, subject: 'Your Omnivore import has finished', - body: `We have finished processing ${ + html: `We have finished processing ${ urlsImported + urlsFailed } URLs. ${urlsImported} URLs have been added to your library. ${urlsFailed} URLs failed to be parsed.`, }) @@ -298,7 +306,10 @@ const contentHandler = async ( return Promise.resolve() } -const handleEvent = async (data: StorageEvent, redisClient: Redis) => { +const handleEvent = async ( + data: StorageEvent, + redisDataSource: RedisDataSource +) => { if (shouldHandle(data)) { const handler = handlerForFile(data.name) if (!handler) { @@ -329,7 +340,7 @@ const handleEvent = async (data: StorageEvent, redisClient: Redis) => { countFailed: 0, urlHandler, contentHandler, - redisClient, + redisDataSource, taskId: data.name, source: importSource(data.name), } @@ -337,9 +348,14 @@ const handleEvent = async (data: StorageEvent, redisClient: Redis) => { await handler(ctx, stream) if (ctx.countImported > 0) { - await sendImportStartedEmail(userId, ctx.countImported, ctx.countFailed) + await sendImportStartedEmail( + ctx.redisDataSource, + userId, + ctx.countImported, + ctx.countFailed + ) } else { - await sendImportFailedEmail(userId) + await sendImportFailedEmail(ctx.redisDataSource, userId) } } } @@ -377,7 +393,7 @@ export const importHandler = Sentry.GCPFunction.wrapHttpFunction( }) try { - await handleEvent(obj, redisDataSource.cacheClient) + await handleEvent(obj, redisDataSource) } catch (err) { console.log('error handling event', { err, obj }) throw err @@ -436,7 +452,7 @@ export const importMetricsCollector = Sentry.GCPFunction.wrapHttpFunction( try { // update metrics await updateMetrics( - redisDataSource.cacheClient, + redisDataSource, userId, req.body.taskId, req.body.status diff --git a/packages/import-handler/src/job.ts b/packages/import-handler/src/job.ts new file mode 100644 index 000000000..0f040b8f3 --- /dev/null +++ b/packages/import-handler/src/job.ts @@ -0,0 +1,23 @@ +import { RedisDataSource } from '@omnivore/utils' +import { Queue } from 'bullmq' + +const QUEUE_NAME = 'omnivore-backend-queue' +export const SEND_EMAIL_JOB = 'send-email' + +interface SendEmailJobData { + userId: string + from?: string + subject?: string + html?: string +} + +export const queueEmailJob = async ( + redisDataSource: RedisDataSource, + data: SendEmailJobData +) => { + const queue = new Queue(QUEUE_NAME, { + connection: redisDataSource.queueRedisClient, + }) + + await queue.add(SEND_EMAIL_JOB, data) +} diff --git a/packages/import-handler/src/matterHistory.ts b/packages/import-handler/src/matterHistory.ts index 378f9d51e..2d7ee9e99 100644 --- a/packages/import-handler/src/matterHistory.ts +++ b/packages/import-handler/src/matterHistory.ts @@ -37,7 +37,7 @@ export const importMatterHistoryCsv = async ( const url = new URL(row['URL']) // update total counter await updateMetrics( - ctx.redisClient, + ctx.redisDataSource, ctx.userId, ctx.taskId, ImportStatus.TOTAL @@ -46,7 +46,7 @@ export const importMatterHistoryCsv = async ( ctx.countImported += 1 // update started counter await updateMetrics( - ctx.redisClient, + ctx.redisDataSource, ctx.userId, ctx.taskId, ImportStatus.STARTED @@ -219,7 +219,7 @@ const handleMatterHistoryRow = async ( ctx.countFailed += 1 // update failed counter await updateMetrics( - ctx.redisClient, + ctx.redisDataSource, ctx.userId, ctx.taskId, ImportStatus.FAILED @@ -254,7 +254,7 @@ export const importMatterArchive = async ( try { // create metrics in redis await createMetrics( - ctx.redisClient, + ctx.redisDataSource.cacheClient, ctx.userId, ctx.taskId, 'matter-importer' @@ -273,7 +273,7 @@ export const importMatterArchive = async ( try { // update total metrics await updateMetrics( - ctx.redisClient, + ctx.redisDataSource, ctx.userId, ctx.taskId, ImportStatus.TOTAL @@ -284,7 +284,7 @@ export const importMatterArchive = async ( ctx.countImported += 1 // update started metrics await updateMetrics( - ctx.redisClient, + ctx.redisDataSource, ctx.userId, ctx.taskId, ImportStatus.STARTED @@ -294,7 +294,7 @@ export const importMatterArchive = async ( ctx.countFailed += 1 // update failed metrics await updateMetrics( - ctx.redisClient, + ctx.redisDataSource, ctx.userId, ctx.taskId, ImportStatus.FAILED diff --git a/packages/import-handler/src/metrics.ts b/packages/import-handler/src/metrics.ts index 41e4c734c..c737527b6 100644 --- a/packages/import-handler/src/metrics.ts +++ b/packages/import-handler/src/metrics.ts @@ -47,13 +47,14 @@ export const createMetrics = async ( } export const updateMetrics = async ( - redisClient: Redis, + redisDataSource: RedisDataSource, userId: string, taskId: string, status: ImportStatus ) => { const key = `import:${userId}:${taskId}` + const redisClient = redisDataSource.cacheClient /** * Define our command */ @@ -109,7 +110,12 @@ export const updateMetrics = async ( if ((state as ImportTaskState) == ImportTaskState.FINISHED) { const metrics = await getMetrics(redisClient, userId, taskId) if (metrics) { - await sendImportCompletedEmail(userId, metrics.imported, metrics.failed) + await sendImportCompletedEmail( + redisDataSource, + userId, + metrics.imported, + metrics.failed + ) } } } catch (error) { diff --git a/packages/import-handler/test/csv/csv.test.ts b/packages/import-handler/test/csv/csv.test.ts index 582444cab..66de76b01 100644 --- a/packages/import-handler/test/csv/csv.test.ts +++ b/packages/import-handler/test/csv/csv.test.ts @@ -26,7 +26,7 @@ describe('Test csv importer', () => { }, }) - stub = stubImportCtx(redisDataSource.cacheClient) + stub = stubImportCtx(redisDataSource) }) afterEach(async () => { diff --git a/packages/import-handler/test/matter/matter_importer.test.ts b/packages/import-handler/test/matter/matter_importer.test.ts index 3d6a84c00..649bb8f0f 100644 --- a/packages/import-handler/test/matter/matter_importer.test.ts +++ b/packages/import-handler/test/matter/matter_importer.test.ts @@ -30,7 +30,7 @@ describe('matter importer', () => { }, }) - stub = stubImportCtx(redisDataSource.cacheClient) + stub = stubImportCtx(redisDataSource) }) afterEach(async () => { diff --git a/packages/import-handler/test/util.ts b/packages/import-handler/test/util.ts index 14550783d..dbdd948c7 100644 --- a/packages/import-handler/test/util.ts +++ b/packages/import-handler/test/util.ts @@ -1,8 +1,10 @@ import { Readability } from '@omnivore/readability' -import Redis from 'ioredis' +import { RedisDataSource } from '@omnivore/utils' import { ArticleSavingRequestStatus, ImportContext } from '../src' -export const stubImportCtx = (redisClient: Redis): ImportContext => { +export const stubImportCtx = ( + redisDataSource: RedisDataSource +): ImportContext => { return { userId: '', countImported: 0, @@ -24,7 +26,7 @@ export const stubImportCtx = (redisClient: Redis): ImportContext => { ): Promise => { return Promise.resolve() }, - redisClient, + redisDataSource, taskId: '', source: 'csv-importer', }