diff --git a/packages/api/src/jobs/email/inbound_emails.ts b/packages/api/src/jobs/email/inbound_emails.ts new file mode 100644 index 000000000..db8fc02eb --- /dev/null +++ b/packages/api/src/jobs/email/inbound_emails.ts @@ -0,0 +1,306 @@ +import { handleNewsletter } from '@omnivore/content-handler' +import { Converter } from 'showdown' +import { ContentReaderType, LibraryItemState } from '../../entity/library_item' +import { SubscriptionStatus } from '../../entity/subscription' +import { UploadFile } from '../../entity/upload_file' +import { env } from '../../env' +import { PageType, UploadFileStatus } from '../../generated/graphql' +import { authTrx } from '../../repository' +import { createOrUpdateLibraryItem } from '../../services/library_item' +import { + findNewsletterEmailByAddress, + updateConfirmationCode, +} from '../../services/newsletters' +import { + saveReceivedEmail, + updateReceivedEmail, +} from '../../services/received_emails' +import { saveNewsletter } from '../../services/save_newsletter_email' +import { saveUrlFromEmail } from '../../services/save_url' +import { getSubscriptionByName } from '../../services/subscriptions' +import { analytics } from '../../utils/analytics' +import { enqueueSendEmail } from '../../utils/createTask' +import { generateSlug, isUrl } from '../../utils/helpers' +import { logger } from '../../utils/logger' +import { + parseEmailAddress, + isProbablyArticle, + getTitleFromEmailSubject, + generateUniqueUrl, +} from '../../utils/parser' +import { + generateUploadFilePathName, + getStorageFileDetails, +} from '../../utils/uploads' + +interface EmailJobData { + from: string + to: string + subject: string + html: string + text: string + headers: Record + unsubMailTo?: string + unsubHttpUrl?: string + forwardedFrom?: string + replyTo?: string + confirmationCode?: string + uploadFile?: { + fileName: string + contentType: string + id: string + } +} + +const converter = new Converter() +export const FORWARD_EMAIL_JOB = 'forward-email' +export const SAVE_NEWSLETTER_JOB = 'save-newsletter' +export const CONFIRM_EMAIL_JOB = 'confirmation-email' +export const SAVE_ATTACHMENT_JOB = 'save-attachment' + +export const plainTextToHtml = (text: string): string => { + return converter.makeHtml(text) +} + +export const forwardEmailJob = async (data: EmailJobData) => { + const { from, to, subject, html, text, replyTo, forwardedFrom } = data + + // get user from newsletter email + const newsletterEmail = await findNewsletterEmailByAddress(to) + + if (!newsletterEmail) { + logger.error(`newsletter email not found: ${to}`) + return false + } + + const user = newsletterEmail.user + const parsedFrom = parseEmailAddress(from) + + const { id: receivedEmailId } = await saveReceivedEmail( + from, + to, + subject, + text, + html, + user.id, + 'non-article', + replyTo + ) + + if ( + await isProbablyArticle( + forwardedFrom || parsedFrom.address || from, + subject + ) + ) { + logger.info('handling as article') + const savedNewsletter = await saveNewsletter( + { + title: getTitleFromEmailSubject(subject), + author: parsedFrom.name || from, + url: generateUniqueUrl(), + content: html || text, + receivedEmailId, + email: newsletterEmail.address, + }, + newsletterEmail + ) + if (!savedNewsletter) { + logger.error('Failed to save email', { from, to, subject }) + return false + } + + // update received email type + await updateReceivedEmail(receivedEmailId, 'article', user.id) + + return true + } + + analytics.capture({ + distinctId: user.id, + event: 'non_newsletter_email_received', + properties: { + env: env.server.apiEnv, + }, + }) + + // forward non-newsletter emails to the registered email address + const result = await enqueueSendEmail({ + from: env.sender.message, + to: user.email, + subject: `Fwd: ${subject}`, + html, + text, + replyTo: replyTo || from, + }) + + return !!result +} + +export const saveNewsletterJob = async (data: EmailJobData) => { + const { + from, + to, + subject, + html, + text, + replyTo, + headers, + unsubMailTo, + unsubHttpUrl, + } = data + + // get user from newsletter email + const newsletterEmail = await findNewsletterEmailByAddress(to) + if (!newsletterEmail) { + logger.error(`newsletter email not found: ${to}`) + + return false + } + + const user = newsletterEmail.user + const { id: receivedEmailId } = await saveReceivedEmail( + from, + to, + subject, + text, + html, + user.id, + 'article', + replyTo + ) + + if (isUrl(subject)) { + // save url if the title is a parsable url + const result = await saveUrlFromEmail( + subject, + receivedEmailId, + newsletterEmail.user.id + ) + + return result + } + + // convert text to html if html is not available + const content = html || plainTextToHtml(text) + const newsletter = await handleNewsletter({ + from, + to, + subject, + html: content, + headers, + }) + + const parsedFrom = parseEmailAddress(from) + const author = parsedFrom.name || from + + // do not subscribe if subscription already exists and is unsubscribed + const existingSubscription = await getSubscriptionByName( + author, + newsletterEmail.user.id + ) + if (existingSubscription?.status === SubscriptionStatus.Unsubscribed) { + logger.info(`newsletter already unsubscribed: ${from}`) + return false + } + + // save newsletter instead + const result = await saveNewsletter( + { + email: newsletterEmail.address, + content, + url: generateUniqueUrl(), + title: subject, + author, + unsubMailTo, + unsubHttpUrl, + receivedEmailId, + ...newsletter, + }, + newsletterEmail + ) + + return result +} + +export const saveAttachmentJob = async (data: EmailJobData) => { + const { from, to, subject, html, text, replyTo, uploadFile } = data + + // get user from newsletter email + const newsletterEmail = await findNewsletterEmailByAddress(to) + if (!newsletterEmail) { + logger.error(`newsletter email not found: ${to}`) + + return false + } + + const user = newsletterEmail.user + await saveReceivedEmail( + from, + to, + subject, + text, + html, + user.id, + 'article', + replyTo + ) + + const uploadFileData = await authTrx( + (tx) => + tx.getRepository(UploadFile).save({ + ...uploadFile, + status: UploadFileStatus.Completed, + user: { id: user.id }, + }), + undefined, + user.id + ) + + const uploadFileDetails = await getStorageFileDetails( + uploadFileData.id, + uploadFileData.fileName + ) + + const uploadFilePathName = generateUploadFilePathName( + uploadFileData.id, + uploadFileData.fileName + ) + + const uploadFileUrlOverride = `https://omnivore.app/attachments/${uploadFilePathName}` + const uploadFileHash = uploadFileDetails.md5Hash + const itemType = + uploadFileData.contentType === 'application/pdf' + ? PageType.File + : PageType.Book + const title = subject || uploadFileData.fileName + const itemToCreate = { + originalUrl: uploadFileUrlOverride, + itemType, + textContentHash: uploadFileHash, + uploadFile: { id: uploadFileData.id }, + title, + readableContent: '', + slug: generateSlug(title), + state: LibraryItemState.Succeeded, + user: { id: user.id }, + contentReader: + itemType === PageType.File + ? ContentReaderType.PDF + : ContentReaderType.EPUB, + } + + await createOrUpdateLibraryItem(itemToCreate, user.id) + + return true +} + +export const confirmEmailJob = async (data: EmailJobData) => { + const { confirmationCode, to } = data + if (!confirmationCode) { + logger.error('confirmation code not provided') + return false + } + + return updateConfirmationCode(to, confirmationCode) +} diff --git a/packages/api/src/jobs/send_email.ts b/packages/api/src/jobs/email/send_email.ts similarity index 75% rename from packages/api/src/jobs/send_email.ts rename to packages/api/src/jobs/email/send_email.ts index 1c94174f1..130e2b14a 100644 --- a/packages/api/src/jobs/send_email.ts +++ b/packages/api/src/jobs/email/send_email.ts @@ -1,8 +1,8 @@ -import { env } from '../env' -import { sendWithMailJet } from '../services/send_emails' -import { Merge } from '../util' -import { logger } from '../utils/logger' -import { sendEmail } from '../utils/sendEmail' +import { env } from '../../env' +import { sendWithMailJet } from '../../services/send_emails' +import { Merge } from '../../util' +import { logger } from '../../utils/logger' +import { sendEmail } from '../../utils/sendEmail' export const SEND_EMAIL_JOB = 'send-email' @@ -16,6 +16,7 @@ export type SendEmailJobData = Merge< text?: string templateId?: string dynamicTemplateData?: Record + replyTo?: string }, ContentType > diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index d13e934de..136a854f4 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -36,7 +36,7 @@ import { import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds' import { refreshFeed } from './jobs/rss/refreshFeed' import { savePageJob } from './jobs/save_page' -import { sendEmailJob, SEND_EMAIL_JOB } from './jobs/send_email' +import { sendEmailJob, SEND_EMAIL_JOB } from './jobs/email/send_email' import { syncReadPositionsJob, SYNC_READ_POSITIONS_JOB_NAME, @@ -53,6 +53,16 @@ import { redisDataSource } from './redis_data_source' import { CACHED_READING_POSITION_PREFIX } from './services/cached_reading_position' import { getJobPriority } from './utils/createTask' import { logger } from './utils/logger' +import { + confirmEmailJob, + CONFIRM_EMAIL_JOB, + forwardEmailJob, + FORWARD_EMAIL_JOB, + saveAttachmentJob, + saveNewsletterJob, + SAVE_ATTACHMENT_JOB, + SAVE_NEWSLETTER_JOB, +} from './jobs/email/inbound_emails' export const QUEUE_NAME = 'omnivore-backend-queue' export const JOB_VERSION = 'v001' @@ -160,6 +170,14 @@ export const createWorker = (connection: ConnectionOptions) => return exportAllItems(job.data) case SEND_EMAIL_JOB: return sendEmailJob(job.data) + case CONFIRM_EMAIL_JOB: + return confirmEmailJob(job.data) + case SAVE_ATTACHMENT_JOB: + return saveAttachmentJob(job.data) + case SAVE_NEWSLETTER_JOB: + return saveNewsletterJob(job.data) + case FORWARD_EMAIL_JOB: + return forwardEmailJob(job.data) default: logger.warning(`[queue-processor] unhandled job: ${job.name}`) } diff --git a/packages/api/src/resolvers/recent_emails/index.ts b/packages/api/src/resolvers/recent_emails/index.ts index 84aca707a..b8bbd275c 100644 --- a/packages/api/src/resolvers/recent_emails/index.ts +++ b/packages/api/src/resolvers/recent_emails/index.ts @@ -89,7 +89,7 @@ export const markEmailAsItemResolver = authorized< title: recentEmail.subject, content: recentEmail.html, url: generateUniqueUrl(), - author: parseEmailAddress(recentEmail.from).name, + author: parseEmailAddress(recentEmail.from).name || recentEmail.from, receivedEmailId: recentEmail.id, }, newsletterEmail diff --git a/packages/api/src/routers/svc/emails.ts b/packages/api/src/routers/svc/emails.ts index eda01a541..47f8f0b77 100644 --- a/packages/api/src/routers/svc/emails.ts +++ b/packages/api/src/routers/svc/emails.ts @@ -83,7 +83,7 @@ export function emailsServiceRouter() { const savedNewsletter = await saveNewsletter( { title: getTitleFromEmailSubject(data.subject), - author: parsedFrom.name, + author: parsedFrom.name || data.from, url: generateUniqueUrl(), content: data.html || data.text, receivedEmailId: data.receivedEmailId, diff --git a/packages/api/src/services/save_newsletter_email.ts b/packages/api/src/services/save_newsletter_email.ts index 08d6fa67b..f11601288 100644 --- a/packages/api/src/services/save_newsletter_email.ts +++ b/packages/api/src/services/save_newsletter_email.ts @@ -76,59 +76,5 @@ export const saveNewsletter = async ( return false } - // sends push notification - // const deviceTokens = await getDeviceTokensByUserId(newsletterEmail.user.id) - // if (!deviceTokens) { - // logger.info('Device tokens not set:', newsletterEmail.user.id) - // return true - // } - - // const multicastMessage = messageForLink(page, deviceTokens) - // await sendMulticastPushNotifications( - // newsletterEmail.user.id, - // multicastMessage, - // 'newsletter' - // ) - return true } - -// const messageForLink = ( -// link: Page, -// deviceTokens: UserDeviceToken[] -// ): MulticastMessage => { -// let title = '📫 - An article was added to your Omnivore Inbox' - -// if (link.author) { -// title = `📫 - ${link.author} has published a new article` -// } - -// const pushData = !link -// ? undefined -// : { -// link: Buffer.from( -// JSON.stringify({ -// id: link.id, -// url: link.url, -// slug: link.slug, -// title: link.title, -// image: link.image, -// author: link.author, -// isArchived: !!link.archivedAt, -// contentReader: ContentReader.Web, -// readingProgressPercent: link.readingProgressPercent, -// readingProgressAnchorIndex: link.readingProgressAnchorIndex, -// }) -// ).toString('base64'), -// } - -// return { -// notification: { -// title: title, -// body: link.title, -// imageUrl: link.image || undefined, -// }, -// data: pushData, -// tokens: deviceTokens.map((token) => token.token), -// } -// } diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index c2ce23fa9..001daee64 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -35,7 +35,7 @@ import { REFRESH_ALL_FEEDS_JOB_NAME, REFRESH_FEED_JOB_NAME, } from '../jobs/rss/refreshAllFeeds' -import { SendEmailJobData, SEND_EMAIL_JOB } from '../jobs/send_email' +import { SendEmailJobData, SEND_EMAIL_JOB } from '../jobs/email/send_email' import { SYNC_READ_POSITIONS_JOB_NAME } from '../jobs/sync_read_positions' import { TriggerRuleJobData, TRIGGER_RULE_JOB_NAME } from '../jobs/trigger_rule' import { diff --git a/packages/inbound-email-handler/.eslintrc b/packages/inbound-email-handler/.eslintrc index e006282a6..301be9795 100644 --- a/packages/inbound-email-handler/.eslintrc +++ b/packages/inbound-email-handler/.eslintrc @@ -2,5 +2,13 @@ "extends": "../../.eslintrc", "parserOptions": { "project": "tsconfig.json" + }, + "rules": { + "@typescript-eslint/no-floating-promises": [ + "error", + { + "ignoreIIFE": true + } + ] } -} \ No newline at end of file +} diff --git a/packages/inbound-email-handler/package.json b/packages/inbound-email-handler/package.json index 2330a4308..b2aa9ba52 100644 --- a/packages/inbound-email-handler/package.json +++ b/packages/inbound-email-handler/package.json @@ -32,17 +32,16 @@ }, "dependencies": { "@google-cloud/functions-framework": "3.1.2", - "@google-cloud/pubsub": "^4.0.0", - "@omnivore/content-handler": "1.0.0", - "@sendgrid/client": "^7.6.0", + "@google-cloud/storage": "^7.0.1", "@sentry/serverless": "^7.77.0", "addressparser": "^1.0.1", - "axios": "^0.27.2", - "jsonwebtoken": "^8.5.1", + "bullmq": "^5.1.1", + "dotenv": "^8.2.0", + "ioredis": "^5.3.2", "parse-headers": "^2.0.4", "parse-multipart-data": "^1.2.1", "rfc2047": "^4.0.1", - "showdown": "^2.1.0" + "uuid": "^8.3.1" }, "volta": { "extends": "../../package.json" diff --git a/packages/inbound-email-handler/src/attachment.ts b/packages/inbound-email-handler/src/attachment.ts index 273555bae..db3d0d094 100644 --- a/packages/inbound-email-handler/src/attachment.ts +++ b/packages/inbound-email-handler/src/attachment.ts @@ -1,8 +1,11 @@ -import axios, { AxiosResponse } from 'axios' -import * as jwt from 'jsonwebtoken' -import { promisify } from 'util' +import { Storage } from '@google-cloud/storage' +import { v4 as uuid } from 'uuid' +import { EmailJobType, queueEmailJob } from './job' -const signToken = promisify(jwt.sign) +const storage = process.env.GCS_UPLOAD_SA_KEY_FILE_PATH + ? new Storage({ keyFilename: process.env.GCS_UPLOAD_SA_KEY_FILE_PATH }) + : new Storage() +const bucketName = process.env.GCS_UPLOAD_BUCKET || 'omnivore-files' export interface Attachment { contentType: string @@ -10,11 +13,6 @@ export interface Attachment { filename: string | undefined } -type UploadResponse = { - id: string - url: string -} - export const isAttachment = (contentType: string, data: Buffer): boolean => { return ( (contentType === 'application/pdf' || @@ -23,11 +21,26 @@ export const isAttachment = (contentType: string, data: Buffer): boolean => { ) } +export const uploadToBucket = async ( + fileName: string, + data: Buffer, + options?: { contentType?: string; public?: boolean } +) => { + const uploadFileId = uuid() + + await storage + .bucket(bucketName) + .file(`u/${uploadFileId}/${fileName}`) + .save(data, { ...options, timeout: 30000 }) + + return uploadFileId +} + export const handleAttachments = async ( - email: string, + from: string, + to: string, subject: string, - attachments: Attachment[], - receivedEmailId: string + attachments: Attachment[] ): Promise => { for await (const attachment of attachments) { const { contentType, data } = attachment @@ -36,98 +49,20 @@ export const handleAttachments = async ( ? 'attachment.pdf' : 'attachment.epub' - try { - const uploadResult = await getUploadIdAndSignedUrl( - email, - filename, - contentType - ) - if (!uploadResult.url || !uploadResult.id) { - console.log('failed to create upload request', uploadResult) - return - } - await uploadToSignedUrl(uploadResult.url, data, contentType) - await createArticle(email, uploadResult.id, subject, receivedEmailId) - } catch (error) { - console.error('handleAttachments error', error) - } - } -} + const uploadFileId = await uploadToBucket(filename, data, { + contentType, + public: false, + }) -const getUploadIdAndSignedUrl = async ( - email: string, - fileName: string, - contentType: string -): Promise => { - if (process.env.JWT_SECRET === undefined) { - throw new Error('JWT_SECRET is not defined') - } - const auth = await signToken(email, process.env.JWT_SECRET) - const data = { - fileName, - email, - contentType, - } - - if (process.env.INTERNAL_SVC_ENDPOINT === undefined) { - throw new Error('REST_BACKEND_ENDPOINT is not defined') - } - const response = await axios.post( - `${process.env.INTERNAL_SVC_ENDPOINT}svc/email-attachment/upload`, - data, - { - headers: { - Authorization: `${auth as string}`, - 'Content-Type': 'application/json', + await queueEmailJob(EmailJobType.SaveAttachment, { + from, + to, + uploadFile: { + fileName: filename, + contentType, + id: uploadFileId, }, - } - ) - return response.data as UploadResponse -} - -const uploadToSignedUrl = async ( - uploadUrl: string, - data: Buffer, - contentType: string -): Promise => { - return axios.put(uploadUrl, data, { - headers: { - 'Content-Type': contentType, - }, - maxBodyLength: 1000000000, - maxContentLength: 100000000, - }) -} - -const createArticle = async ( - email: string, - uploadFileId: string, - subject: string, - receivedEmailId: string -): Promise => { - const data = { - email, - uploadFileId, - subject, - receivedEmailId, + subject, + }) } - - if (process.env.JWT_SECRET === undefined) { - throw new Error('JWT_SECRET is not defined') - } - const auth = await signToken(email, process.env.JWT_SECRET) - - if (process.env.INTERNAL_SVC_ENDPOINT === undefined) { - throw new Error('REST_BACKEND_ENDPOINT is not defined') - } - return axios.post( - `${process.env.INTERNAL_SVC_ENDPOINT}svc/email-attachment/create-article`, - data, - { - headers: { - Authorization: `${auth as string}`, - 'Content-Type': 'application/json', - }, - } - ) } diff --git a/packages/inbound-email-handler/src/index.ts b/packages/inbound-email-handler/src/index.ts index 391244e35..0abd80dac 100644 --- a/packages/inbound-email-handler/src/index.ts +++ b/packages/inbound-email-handler/src/index.ts @@ -2,92 +2,29 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ /* eslint-disable @typescript-eslint/no-unsafe-argument */ /* eslint-disable @typescript-eslint/no-unused-vars */ - -import { PubSub } from '@google-cloud/pubsub' -import { handleNewsletter } from '@omnivore/content-handler' -import { generateUniqueUrl } from '@omnivore/content-handler/build/src/content-handler' import * as Sentry from '@sentry/serverless' -import axios from 'axios' -import * as jwt from 'jsonwebtoken' import parseHeaders from 'parse-headers' import * as multipart from 'parse-multipart-data' import rfc2047 from 'rfc2047' -import { Converter } from 'showdown' -import { promisify } from 'util' import { Attachment, handleAttachments, isAttachment } from './attachment' +import { EmailJobType, queueEmailJob } from './job' import { handleGoogleConfirmationEmail, isGoogleConfirmationEmail, isSubscriptionConfirmationEmail, - parseAuthor, parseUnsubscribe, } from './newsletter' -interface SaveReceivedEmailResponse { - id: string -} - interface Envelope { to: string[] from: string } -const signToken = promisify(jwt.sign) - Sentry.GCPFunction.init({ dsn: process.env.SENTRY_DSN, tracesSampleRate: 0, }) -const NEWSLETTER_EMAIL_RECEIVED_TOPIC = 'newsletterEmailReceived' -const NON_NEWSLETTER_EMAIL_TOPIC = 'nonNewsletterEmailReceived' -const pubsub = new PubSub() -const converter = new Converter() - -export const plainTextToHtml = (text: string): string => { - return converter.makeHtml(text) -} - -export const publishMessage = async ( - topic: string, - message: any -): Promise => { - return pubsub - .topic(topic) - .publishMessage({ json: message }) - .catch((err) => { - console.log('error publishing message:', err) - return undefined - }) -} - -const saveReceivedEmail = async ( - email: string, - data: any -): Promise => { - if (process.env.JWT_SECRET === undefined) { - throw new Error('JWT_SECRET is not defined') - } - const auth = await signToken(email, process.env.JWT_SECRET) - - if (process.env.INTERNAL_SVC_ENDPOINT === undefined) { - throw new Error('REST_BACKEND_ENDPOINT is not defined') - } - - const response = await axios.post( - `${process.env.INTERNAL_SVC_ENDPOINT}svc/pubsub/emails/save`, - data, - { - headers: { - Authorization: `${auth as string}`, - 'Content-Type': 'application/json', - }, - } - ) - - return response.data as SaveReceivedEmailResponse -} - export const parsedTo = (parsed: Record): string => { // envelope to contains the real recipient email address try { @@ -135,97 +72,77 @@ export const inboundEmailHandler = Sentry.GCPFunction.wrapHttpFunction( ? parseUnsubscribe(unSubHeader) : undefined - const { id: receivedEmailId } = await saveReceivedEmail(to, { - from, - to, - subject, - html, - text, - replyTo, - }) - try { // check if it is a subscription or google confirmation email const isGoogleConfirmation = isGoogleConfirmationEmail(from, subject) if (isGoogleConfirmation || isSubscriptionConfirmationEmail(subject)) { console.debug('handleConfirmation', from, subject) // we need to parse the confirmation code from the email - isGoogleConfirmation && - (await handleGoogleConfirmationEmail(to, subject)) - // queue non-newsletter emails - await pubsub.topic(NON_NEWSLETTER_EMAIL_TOPIC).publishMessage({ - json: { - from, - to, - subject, - html, - text, - unsubMailTo: unsubscribe?.mailTo, - unsubHttpUrl: unsubscribe?.httpUrl, - forwardedFrom, - receivedEmailId, - }, + if (isGoogleConfirmation) { + await handleGoogleConfirmationEmail(from, to, subject) + } + + // forward emails + await queueEmailJob(EmailJobType.ForwardEmail, { + from, + to, + subject, + html, + text, + headers, + forwardedFrom, + replyTo, }) return res.send('ok') } if (attachments.length > 0) { console.debug('handle attachments', from, to, subject) // save the attachments as articles - await handleAttachments(to, subject, attachments, receivedEmailId) + await handleAttachments(from, to, subject, attachments) return res.send('ok') } - // convert text to html if html is not available - const content = html || plainTextToHtml(text) - // all other emails are considered newsletters - const newsletterMessage = await handleNewsletter({ + // queue newsletter emails + await queueEmailJob(EmailJobType.SaveNewsletter, { from, to, subject, - html: content, + html, + text, headers, + unsubMailTo: unsubscribe?.mailTo, + unsubHttpUrl: unsubscribe?.httpUrl, + forwardedFrom, + replyTo, }) - // queue newsletter emails - await pubsub.topic(NEWSLETTER_EMAIL_RECEIVED_TOPIC).publishMessage({ - json: { - email: to, - content, - url: generateUniqueUrl(), - title: subject, - author: parseAuthor(from), - unsubMailTo: unsubscribe?.mailTo, - unsubHttpUrl: unsubscribe?.httpUrl, - receivedEmailId, - ...newsletterMessage, - }, - }) res.send('newsletter received') } catch (error) { - console.log( + console.error( 'error handling emails, will forward.', from, to, subject, error ) - // queue error emails - await pubsub.topic(NON_NEWSLETTER_EMAIL_TOPIC).publishMessage({ - json: { - from, - to, - subject, - html, - text, - forwardedFrom, - receivedEmailId, - }, + + // fallback to forward the email + await queueEmailJob(EmailJobType.ForwardEmail, { + from, + to, + subject, + html, + text, + headers, + forwardedFrom, + replyTo, }) + res.send('ok') } } catch (e) { - console.log(e) + console.error(e) res.send(e) } } diff --git a/packages/inbound-email-handler/src/job.ts b/packages/inbound-email-handler/src/job.ts new file mode 100644 index 000000000..89a2fea40 --- /dev/null +++ b/packages/inbound-email-handler/src/job.ts @@ -0,0 +1,72 @@ +import { BulkJobOptions, Queue } from 'bullmq' +import { redisDataSource } from './redis_data_source' + +const QUEUE_NAME = 'omnivore-backend-queue' +export enum EmailJobType { + ForwardEmail = 'forward-email', + SaveNewsletter = 'save-newsletter', + ConfirmationEmail = 'confirmation-email', + SaveAttachment = 'save-attachment', +} + +interface EmailJobData { + from: string + to: string + subject: string + html?: string + text?: string + headers?: Record + unsubMailTo?: string + unsubHttpUrl?: string + forwardedFrom?: string + replyTo?: string + uploadFile?: { + fileName: string + contentType: string + id: string + } + confirmationCode?: string +} + +const queue = new Queue(QUEUE_NAME, { + connection: redisDataSource.queueRedisClient, +}) + +const getPriority = (jobType: EmailJobType): number => { + // we want to prioritized jobs by the expected time to complete + // lower number means higher priority + // priority 1: jobs that are expected to finish immediately + // priority 5: jobs that are expected to finish in less than 10 second + // priority 10: jobs that are expected to finish in less than 10 minutes + // priority 100: jobs that are expected to finish in less than 1 hour + switch (jobType) { + case EmailJobType.ForwardEmail: + case EmailJobType.ConfirmationEmail: + return 1 + case EmailJobType.SaveAttachment: + case EmailJobType.SaveNewsletter: + return 5 + default: + throw new Error(`unknown job type: ${jobType as string}`) + } +} + +const getOpts = (jobType: EmailJobType): BulkJobOptions => { + return { + removeOnComplete: true, + removeOnFail: true, + attempts: 3, + priority: getPriority(jobType), + backoff: { + type: 'exponential', + delay: 2000, + }, + } +} + +export const queueEmailJob = async ( + jobType: EmailJobType, + data: EmailJobData +) => { + await queue.add(jobType, data, getOpts(jobType)) +} diff --git a/packages/inbound-email-handler/src/newsletter.ts b/packages/inbound-email-handler/src/newsletter.ts index 54be5d3dd..b31103ada 100644 --- a/packages/inbound-email-handler/src/newsletter.ts +++ b/packages/inbound-email-handler/src/newsletter.ts @@ -1,12 +1,11 @@ import addressparser from 'addressparser' -import { publishMessage } from './index' +import { EmailJobType, queueEmailJob } from './job' interface Unsubscribe { mailTo?: string httpUrl?: string } -const GOOGLE_CONFIRMATION_CODE_RECEIVED_TOPIC = 'emailConfirmationCodeReceived' const GOOGLE_CONFIRMATION_EMAIL_SENDER_ADDRESS = 'forwarding-noreply@google.com' // check unicode parentheses too const GOOGLE_CONFIRMATION_CODE_PATTERN = /\d+/u @@ -41,24 +40,25 @@ export const parseAuthor = (address: string): string => { } export const handleGoogleConfirmationEmail = async ( - email: string, + from: string, + to: string, subject: string ) => { - console.log('confirmation email', email, subject) + console.log('confirmation email', from, to, subject) const confirmationCode = getConfirmationCode(subject) - if (!email || !confirmationCode) { + if (!to || !confirmationCode) { console.log( 'confirmation email error, user email:', - email, + to, 'confirmationCode', confirmationCode ) throw new Error('invalid confirmation email') } - const message = { emailAddress: email, confirmationCode: confirmationCode } - return publishMessage(GOOGLE_CONFIRMATION_CODE_RECEIVED_TOPIC, message) + const message = { from, to, confirmationCode, subject } + return queueEmailJob(EmailJobType.ConfirmationEmail, message) } export const getConfirmationCode = (subject: string): string | undefined => { diff --git a/packages/inbound-email-handler/src/redis_data_source.ts b/packages/inbound-email-handler/src/redis_data_source.ts new file mode 100644 index 000000000..b8ca0ce6b --- /dev/null +++ b/packages/inbound-email-handler/src/redis_data_source.ts @@ -0,0 +1,102 @@ +import Redis, { RedisOptions } from 'ioredis' +import 'dotenv/config' + +type RedisClientType = 'cache' | 'mq' +type RedisDataSourceOption = { + url?: string + cert?: string +} +export type RedisDataSourceOptions = { + [key in RedisClientType]: RedisDataSourceOption +} + +export class RedisDataSource { + options: RedisDataSourceOptions + + cacheClient: Redis + queueRedisClient: Redis + + constructor(options: RedisDataSourceOptions) { + this.options = options + + const cacheClient = createIORedisClient('cache', this.options) + if (!cacheClient) throw 'Error initializing cache redis client' + + this.cacheClient = cacheClient + this.queueRedisClient = + createIORedisClient('mq', this.options) || this.cacheClient // if mq is not defined, use cache + } + + async shutdown(): Promise { + try { + await this.queueRedisClient?.quit() + await this.cacheClient?.quit() + } catch (err) { + console.error('error while shutting down redis', err) + } + } +} + +const createIORedisClient = ( + name: RedisClientType, + options: RedisDataSourceOptions +): Redis | undefined => { + const option = options[name] + const redisURL = option.url + if (!redisURL) { + console.log(`no redisURL supplied: ${name}`) + return undefined + } + + const redisCert = option.cert + const tls = + redisURL.startsWith('rediss://') && redisCert + ? { + ca: redisCert, + rejectUnauthorized: false, + } + : undefined + + const redisOptions: RedisOptions = { + tls, + name, + connectTimeout: 10000, + maxRetriesPerRequest: null, + offlineQueue: false, + } + return new Redis(redisURL, redisOptions) +} + +export const redisDataSource = new RedisDataSource({ + cache: { + url: process.env.REDIS_URL, + cert: process.env.REDIS_CERT, + }, + mq: { + url: process.env.MQ_REDIS_URL, + cert: process.env.MQ_REDIS_CERT, + }, +}) + +const gracefulShutdown = async (signal: string) => { + console.log(`Received ${signal}, shutting down gracefully...`) + + try { + await redisDataSource.shutdown() + console.log('redis shutdown successfully') + } catch (error) { + console.error('error while shutting down redis', error) + } + process.exit(0) +} + +process.on('SIGINT', () => { + ;(async () => { + await gracefulShutdown('SIGINT') + })() +}) +process.on('SIGTERM', () => { + ;(async () => { + await gracefulShutdown('SIGTERM') + })() +}) diff --git a/yarn.lock b/yarn.lock index 074fd4c76..46b7be829 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5783,7 +5783,7 @@ lodash "^4.17.4" read-pkg-up "^7.0.0" -"@sendgrid/client@^7.6.0", "@sendgrid/client@^7.7.0": +"@sendgrid/client@^7.7.0": version "7.7.0" resolved "https://registry.yarnpkg.com/@sendgrid/client/-/client-7.7.0.tgz#f8f67abd604205a0d0b1af091b61517ef465fdbf" integrity sha512-SxH+y8jeAQSnDavrTD0uGDXYIIkFylCo+eDofVmZLQ0f862nnqbC3Vd1ej6b7Le7lboyzQF6F7Fodv02rYspuA== @@ -11679,6 +11679,7 @@ caniuse-lite@^1.0.30001109, caniuse-lite@^1.0.30001251, caniuse-lite@^1.0.300012 version "1.0.30001600" resolved "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001600.tgz" integrity sha512-+2S9/2JFhYmYaDpZvo0lKkfvuKIglrx68MwOBqMGHhQsNkLjB5xtc/TGoEPs+MxjSyN/72qer2g97nzR641mOQ== + capital-case@^1.0.4: version "1.0.4" resolved "https://registry.yarnpkg.com/capital-case/-/capital-case-1.0.4.tgz#9d130292353c9249f6b00fa5852bee38a717e669"