migrate inbound-email tasks from pubsub to bullmq

This commit is contained in:
Hongbo Wu
2024-04-11 10:44:57 +08:00
parent cd6594a2fd
commit f66d3fa36c
15 changed files with 608 additions and 303 deletions

View File

@ -2,5 +2,13 @@
"extends": "../../.eslintrc",
"parserOptions": {
"project": "tsconfig.json"
},
"rules": {
"@typescript-eslint/no-floating-promises": [
"error",
{
"ignoreIIFE": true
}
]
}
}
}

View File

@ -32,17 +32,16 @@
},
"dependencies": {
"@google-cloud/functions-framework": "3.1.2",
"@google-cloud/pubsub": "^4.0.0",
"@omnivore/content-handler": "1.0.0",
"@sendgrid/client": "^7.6.0",
"@google-cloud/storage": "^7.0.1",
"@sentry/serverless": "^7.77.0",
"addressparser": "^1.0.1",
"axios": "^0.27.2",
"jsonwebtoken": "^8.5.1",
"bullmq": "^5.1.1",
"dotenv": "^8.2.0",
"ioredis": "^5.3.2",
"parse-headers": "^2.0.4",
"parse-multipart-data": "^1.2.1",
"rfc2047": "^4.0.1",
"showdown": "^2.1.0"
"uuid": "^8.3.1"
},
"volta": {
"extends": "../../package.json"

View File

@ -1,8 +1,11 @@
import axios, { AxiosResponse } from 'axios'
import * as jwt from 'jsonwebtoken'
import { promisify } from 'util'
import { Storage } from '@google-cloud/storage'
import { v4 as uuid } from 'uuid'
import { EmailJobType, queueEmailJob } from './job'
const signToken = promisify(jwt.sign)
const storage = process.env.GCS_UPLOAD_SA_KEY_FILE_PATH
? new Storage({ keyFilename: process.env.GCS_UPLOAD_SA_KEY_FILE_PATH })
: new Storage()
const bucketName = process.env.GCS_UPLOAD_BUCKET || 'omnivore-files'
export interface Attachment {
contentType: string
@ -10,11 +13,6 @@ export interface Attachment {
filename: string | undefined
}
type UploadResponse = {
id: string
url: string
}
export const isAttachment = (contentType: string, data: Buffer): boolean => {
return (
(contentType === 'application/pdf' ||
@ -23,11 +21,26 @@ export const isAttachment = (contentType: string, data: Buffer): boolean => {
)
}
export const uploadToBucket = async (
fileName: string,
data: Buffer,
options?: { contentType?: string; public?: boolean }
) => {
const uploadFileId = uuid()
await storage
.bucket(bucketName)
.file(`u/${uploadFileId}/${fileName}`)
.save(data, { ...options, timeout: 30000 })
return uploadFileId
}
export const handleAttachments = async (
email: string,
from: string,
to: string,
subject: string,
attachments: Attachment[],
receivedEmailId: string
attachments: Attachment[]
): Promise<void> => {
for await (const attachment of attachments) {
const { contentType, data } = attachment
@ -36,98 +49,20 @@ export const handleAttachments = async (
? 'attachment.pdf'
: 'attachment.epub'
try {
const uploadResult = await getUploadIdAndSignedUrl(
email,
filename,
contentType
)
if (!uploadResult.url || !uploadResult.id) {
console.log('failed to create upload request', uploadResult)
return
}
await uploadToSignedUrl(uploadResult.url, data, contentType)
await createArticle(email, uploadResult.id, subject, receivedEmailId)
} catch (error) {
console.error('handleAttachments error', error)
}
}
}
const uploadFileId = await uploadToBucket(filename, data, {
contentType,
public: false,
})
const getUploadIdAndSignedUrl = async (
email: string,
fileName: string,
contentType: string
): Promise<UploadResponse> => {
if (process.env.JWT_SECRET === undefined) {
throw new Error('JWT_SECRET is not defined')
}
const auth = await signToken(email, process.env.JWT_SECRET)
const data = {
fileName,
email,
contentType,
}
if (process.env.INTERNAL_SVC_ENDPOINT === undefined) {
throw new Error('REST_BACKEND_ENDPOINT is not defined')
}
const response = await axios.post(
`${process.env.INTERNAL_SVC_ENDPOINT}svc/email-attachment/upload`,
data,
{
headers: {
Authorization: `${auth as string}`,
'Content-Type': 'application/json',
await queueEmailJob(EmailJobType.SaveAttachment, {
from,
to,
uploadFile: {
fileName: filename,
contentType,
id: uploadFileId,
},
}
)
return response.data as UploadResponse
}
const uploadToSignedUrl = async (
uploadUrl: string,
data: Buffer,
contentType: string
): Promise<AxiosResponse> => {
return axios.put(uploadUrl, data, {
headers: {
'Content-Type': contentType,
},
maxBodyLength: 1000000000,
maxContentLength: 100000000,
})
}
const createArticle = async (
email: string,
uploadFileId: string,
subject: string,
receivedEmailId: string
): Promise<AxiosResponse> => {
const data = {
email,
uploadFileId,
subject,
receivedEmailId,
subject,
})
}
if (process.env.JWT_SECRET === undefined) {
throw new Error('JWT_SECRET is not defined')
}
const auth = await signToken(email, process.env.JWT_SECRET)
if (process.env.INTERNAL_SVC_ENDPOINT === undefined) {
throw new Error('REST_BACKEND_ENDPOINT is not defined')
}
return axios.post(
`${process.env.INTERNAL_SVC_ENDPOINT}svc/email-attachment/create-article`,
data,
{
headers: {
Authorization: `${auth as string}`,
'Content-Type': 'application/json',
},
}
)
}

View File

@ -2,92 +2,29 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
/* eslint-disable @typescript-eslint/no-unsafe-argument */
/* eslint-disable @typescript-eslint/no-unused-vars */
import { PubSub } from '@google-cloud/pubsub'
import { handleNewsletter } from '@omnivore/content-handler'
import { generateUniqueUrl } from '@omnivore/content-handler/build/src/content-handler'
import * as Sentry from '@sentry/serverless'
import axios from 'axios'
import * as jwt from 'jsonwebtoken'
import parseHeaders from 'parse-headers'
import * as multipart from 'parse-multipart-data'
import rfc2047 from 'rfc2047'
import { Converter } from 'showdown'
import { promisify } from 'util'
import { Attachment, handleAttachments, isAttachment } from './attachment'
import { EmailJobType, queueEmailJob } from './job'
import {
handleGoogleConfirmationEmail,
isGoogleConfirmationEmail,
isSubscriptionConfirmationEmail,
parseAuthor,
parseUnsubscribe,
} from './newsletter'
interface SaveReceivedEmailResponse {
id: string
}
interface Envelope {
to: string[]
from: string
}
const signToken = promisify(jwt.sign)
Sentry.GCPFunction.init({
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 0,
})
const NEWSLETTER_EMAIL_RECEIVED_TOPIC = 'newsletterEmailReceived'
const NON_NEWSLETTER_EMAIL_TOPIC = 'nonNewsletterEmailReceived'
const pubsub = new PubSub()
const converter = new Converter()
export const plainTextToHtml = (text: string): string => {
return converter.makeHtml(text)
}
export const publishMessage = async (
topic: string,
message: any
): Promise<string | undefined> => {
return pubsub
.topic(topic)
.publishMessage({ json: message })
.catch((err) => {
console.log('error publishing message:', err)
return undefined
})
}
const saveReceivedEmail = async (
email: string,
data: any
): Promise<SaveReceivedEmailResponse> => {
if (process.env.JWT_SECRET === undefined) {
throw new Error('JWT_SECRET is not defined')
}
const auth = await signToken(email, process.env.JWT_SECRET)
if (process.env.INTERNAL_SVC_ENDPOINT === undefined) {
throw new Error('REST_BACKEND_ENDPOINT is not defined')
}
const response = await axios.post(
`${process.env.INTERNAL_SVC_ENDPOINT}svc/pubsub/emails/save`,
data,
{
headers: {
Authorization: `${auth as string}`,
'Content-Type': 'application/json',
},
}
)
return response.data as SaveReceivedEmailResponse
}
export const parsedTo = (parsed: Record<string, string>): string => {
// envelope to contains the real recipient email address
try {
@ -135,97 +72,77 @@ export const inboundEmailHandler = Sentry.GCPFunction.wrapHttpFunction(
? parseUnsubscribe(unSubHeader)
: undefined
const { id: receivedEmailId } = await saveReceivedEmail(to, {
from,
to,
subject,
html,
text,
replyTo,
})
try {
// check if it is a subscription or google confirmation email
const isGoogleConfirmation = isGoogleConfirmationEmail(from, subject)
if (isGoogleConfirmation || isSubscriptionConfirmationEmail(subject)) {
console.debug('handleConfirmation', from, subject)
// we need to parse the confirmation code from the email
isGoogleConfirmation &&
(await handleGoogleConfirmationEmail(to, subject))
// queue non-newsletter emails
await pubsub.topic(NON_NEWSLETTER_EMAIL_TOPIC).publishMessage({
json: {
from,
to,
subject,
html,
text,
unsubMailTo: unsubscribe?.mailTo,
unsubHttpUrl: unsubscribe?.httpUrl,
forwardedFrom,
receivedEmailId,
},
if (isGoogleConfirmation) {
await handleGoogleConfirmationEmail(from, to, subject)
}
// forward emails
await queueEmailJob(EmailJobType.ForwardEmail, {
from,
to,
subject,
html,
text,
headers,
forwardedFrom,
replyTo,
})
return res.send('ok')
}
if (attachments.length > 0) {
console.debug('handle attachments', from, to, subject)
// save the attachments as articles
await handleAttachments(to, subject, attachments, receivedEmailId)
await handleAttachments(from, to, subject, attachments)
return res.send('ok')
}
// convert text to html if html is not available
const content = html || plainTextToHtml(text)
// all other emails are considered newsletters
const newsletterMessage = await handleNewsletter({
// queue newsletter emails
await queueEmailJob(EmailJobType.SaveNewsletter, {
from,
to,
subject,
html: content,
html,
text,
headers,
unsubMailTo: unsubscribe?.mailTo,
unsubHttpUrl: unsubscribe?.httpUrl,
forwardedFrom,
replyTo,
})
// queue newsletter emails
await pubsub.topic(NEWSLETTER_EMAIL_RECEIVED_TOPIC).publishMessage({
json: {
email: to,
content,
url: generateUniqueUrl(),
title: subject,
author: parseAuthor(from),
unsubMailTo: unsubscribe?.mailTo,
unsubHttpUrl: unsubscribe?.httpUrl,
receivedEmailId,
...newsletterMessage,
},
})
res.send('newsletter received')
} catch (error) {
console.log(
console.error(
'error handling emails, will forward.',
from,
to,
subject,
error
)
// queue error emails
await pubsub.topic(NON_NEWSLETTER_EMAIL_TOPIC).publishMessage({
json: {
from,
to,
subject,
html,
text,
forwardedFrom,
receivedEmailId,
},
// fallback to forward the email
await queueEmailJob(EmailJobType.ForwardEmail, {
from,
to,
subject,
html,
text,
headers,
forwardedFrom,
replyTo,
})
res.send('ok')
}
} catch (e) {
console.log(e)
console.error(e)
res.send(e)
}
}

View File

@ -0,0 +1,72 @@
import { BulkJobOptions, Queue } from 'bullmq'
import { redisDataSource } from './redis_data_source'
const QUEUE_NAME = 'omnivore-backend-queue'
export enum EmailJobType {
ForwardEmail = 'forward-email',
SaveNewsletter = 'save-newsletter',
ConfirmationEmail = 'confirmation-email',
SaveAttachment = 'save-attachment',
}
interface EmailJobData {
from: string
to: string
subject: string
html?: string
text?: string
headers?: Record<string, string | string[]>
unsubMailTo?: string
unsubHttpUrl?: string
forwardedFrom?: string
replyTo?: string
uploadFile?: {
fileName: string
contentType: string
id: string
}
confirmationCode?: string
}
const queue = new Queue(QUEUE_NAME, {
connection: redisDataSource.queueRedisClient,
})
const getPriority = (jobType: EmailJobType): number => {
// we want to prioritized jobs by the expected time to complete
// lower number means higher priority
// priority 1: jobs that are expected to finish immediately
// priority 5: jobs that are expected to finish in less than 10 second
// priority 10: jobs that are expected to finish in less than 10 minutes
// priority 100: jobs that are expected to finish in less than 1 hour
switch (jobType) {
case EmailJobType.ForwardEmail:
case EmailJobType.ConfirmationEmail:
return 1
case EmailJobType.SaveAttachment:
case EmailJobType.SaveNewsletter:
return 5
default:
throw new Error(`unknown job type: ${jobType as string}`)
}
}
const getOpts = (jobType: EmailJobType): BulkJobOptions => {
return {
removeOnComplete: true,
removeOnFail: true,
attempts: 3,
priority: getPriority(jobType),
backoff: {
type: 'exponential',
delay: 2000,
},
}
}
export const queueEmailJob = async (
jobType: EmailJobType,
data: EmailJobData
) => {
await queue.add(jobType, data, getOpts(jobType))
}

View File

@ -1,12 +1,11 @@
import addressparser from 'addressparser'
import { publishMessage } from './index'
import { EmailJobType, queueEmailJob } from './job'
interface Unsubscribe {
mailTo?: string
httpUrl?: string
}
const GOOGLE_CONFIRMATION_CODE_RECEIVED_TOPIC = 'emailConfirmationCodeReceived'
const GOOGLE_CONFIRMATION_EMAIL_SENDER_ADDRESS = 'forwarding-noreply@google.com'
// check unicode parentheses too
const GOOGLE_CONFIRMATION_CODE_PATTERN = /\d+/u
@ -41,24 +40,25 @@ export const parseAuthor = (address: string): string => {
}
export const handleGoogleConfirmationEmail = async (
email: string,
from: string,
to: string,
subject: string
) => {
console.log('confirmation email', email, subject)
console.log('confirmation email', from, to, subject)
const confirmationCode = getConfirmationCode(subject)
if (!email || !confirmationCode) {
if (!to || !confirmationCode) {
console.log(
'confirmation email error, user email:',
email,
to,
'confirmationCode',
confirmationCode
)
throw new Error('invalid confirmation email')
}
const message = { emailAddress: email, confirmationCode: confirmationCode }
return publishMessage(GOOGLE_CONFIRMATION_CODE_RECEIVED_TOPIC, message)
const message = { from, to, confirmationCode, subject }
return queueEmailJob(EmailJobType.ConfirmationEmail, message)
}
export const getConfirmationCode = (subject: string): string | undefined => {

View File

@ -0,0 +1,102 @@
import Redis, { RedisOptions } from 'ioredis'
import 'dotenv/config'
type RedisClientType = 'cache' | 'mq'
type RedisDataSourceOption = {
url?: string
cert?: string
}
export type RedisDataSourceOptions = {
[key in RedisClientType]: RedisDataSourceOption
}
export class RedisDataSource {
options: RedisDataSourceOptions
cacheClient: Redis
queueRedisClient: Redis
constructor(options: RedisDataSourceOptions) {
this.options = options
const cacheClient = createIORedisClient('cache', this.options)
if (!cacheClient) throw 'Error initializing cache redis client'
this.cacheClient = cacheClient
this.queueRedisClient =
createIORedisClient('mq', this.options) || this.cacheClient // if mq is not defined, use cache
}
async shutdown(): Promise<void> {
try {
await this.queueRedisClient?.quit()
await this.cacheClient?.quit()
} catch (err) {
console.error('error while shutting down redis', err)
}
}
}
const createIORedisClient = (
name: RedisClientType,
options: RedisDataSourceOptions
): Redis | undefined => {
const option = options[name]
const redisURL = option.url
if (!redisURL) {
console.log(`no redisURL supplied: ${name}`)
return undefined
}
const redisCert = option.cert
const tls =
redisURL.startsWith('rediss://') && redisCert
? {
ca: redisCert,
rejectUnauthorized: false,
}
: undefined
const redisOptions: RedisOptions = {
tls,
name,
connectTimeout: 10000,
maxRetriesPerRequest: null,
offlineQueue: false,
}
return new Redis(redisURL, redisOptions)
}
export const redisDataSource = new RedisDataSource({
cache: {
url: process.env.REDIS_URL,
cert: process.env.REDIS_CERT,
},
mq: {
url: process.env.MQ_REDIS_URL,
cert: process.env.MQ_REDIS_CERT,
},
})
const gracefulShutdown = async (signal: string) => {
console.log(`Received ${signal}, shutting down gracefully...`)
try {
await redisDataSource.shutdown()
console.log('redis shutdown successfully')
} catch (error) {
console.error('error while shutting down redis', error)
}
process.exit(0)
}
process.on('SIGINT', () => {
;(async () => {
await gracefulShutdown('SIGINT')
})()
})
process.on('SIGTERM', () => {
;(async () => {
await gracefulShutdown('SIGTERM')
})()
})