update inbound-email-handler to use @omnivore/utils to create redis connection

This commit is contained in:
Hongbo Wu
2024-07-02 14:59:42 +08:00
parent 4844f9eac2
commit 0d15833511
6 changed files with 45 additions and 113 deletions

View File

@ -35,11 +35,11 @@
"dependencies": {
"@google-cloud/functions-framework": "3.1.2",
"@google-cloud/storage": "^7.0.1",
"@omnivore/utils": "1.0.0",
"@sentry/serverless": "^7.77.0",
"addressparser": "^1.0.1",
"bullmq": "^5.1.1",
"dotenv": "^8.2.0",
"ioredis": "^5.3.2",
"parse-headers": "^2.0.4",
"parse-multipart-data": "^1.2.1",
"rfc2047": "^4.0.1",

View File

@ -1,4 +1,5 @@
import { Storage } from '@google-cloud/storage'
import { RedisDataSource } from '@omnivore/utils'
import { v4 as uuid } from 'uuid'
import { EmailJobType, queueEmailJob } from './job'
@ -37,6 +38,7 @@ export const uploadToBucket = async (
}
export const handleAttachments = async (
redisDataSource: RedisDataSource,
from: string,
to: string,
subject: string,
@ -54,7 +56,7 @@ export const handleAttachments = async (
public: false,
})
await queueEmailJob(EmailJobType.SaveAttachment, {
await queueEmailJob(redisDataSource, EmailJobType.SaveAttachment, {
from,
to,
uploadFile: {

View File

@ -2,7 +2,9 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
/* eslint-disable @typescript-eslint/no-unsafe-argument */
/* eslint-disable @typescript-eslint/no-unused-vars */
import { RedisDataSource } from '@omnivore/utils'
import * as Sentry from '@sentry/serverless'
import 'dotenv/config'
import parseHeaders from 'parse-headers'
import * as multipart from 'parse-multipart-data'
import rfc2047 from 'rfc2047'
@ -72,6 +74,17 @@ export const inboundEmailHandler = Sentry.GCPFunction.wrapHttpFunction(
? parseUnsubscribe(unSubHeader)
: undefined
const redisDataSource = new RedisDataSource({
cache: {
url: process.env.REDIS_URL,
cert: process.env.REDIS_CERT,
},
mq: {
url: process.env.MQ_REDIS_URL,
cert: process.env.MQ_REDIS_CERT,
},
})
try {
// check if it is a subscription or google confirmation email
const isGoogleConfirmation = isGoogleConfirmationEmail(from, subject)
@ -79,11 +92,16 @@ export const inboundEmailHandler = Sentry.GCPFunction.wrapHttpFunction(
console.debug('handleConfirmation', from, subject)
// we need to parse the confirmation code from the email
if (isGoogleConfirmation) {
await handleGoogleConfirmationEmail(from, to, subject)
await handleGoogleConfirmationEmail(
redisDataSource,
from,
to,
subject
)
}
// forward emails
await queueEmailJob(EmailJobType.ForwardEmail, {
await queueEmailJob(redisDataSource, EmailJobType.ForwardEmail, {
from,
to,
subject,
@ -98,13 +116,19 @@ export const inboundEmailHandler = Sentry.GCPFunction.wrapHttpFunction(
if (attachments.length > 0) {
console.debug('handle attachments', from, to, subject)
// save the attachments as articles
await handleAttachments(from, to, subject, attachments)
await handleAttachments(
redisDataSource,
from,
to,
subject,
attachments
)
return res.send('ok')
}
// all other emails are considered newsletters
// queue newsletter emails
await queueEmailJob(EmailJobType.SaveNewsletter, {
await queueEmailJob(redisDataSource, EmailJobType.SaveNewsletter, {
from,
to,
subject,
@ -128,7 +152,7 @@ export const inboundEmailHandler = Sentry.GCPFunction.wrapHttpFunction(
)
// fallback to forward the email
await queueEmailJob(EmailJobType.ForwardEmail, {
await queueEmailJob(redisDataSource, EmailJobType.ForwardEmail, {
from,
to,
subject,
@ -139,7 +163,9 @@ export const inboundEmailHandler = Sentry.GCPFunction.wrapHttpFunction(
replyTo,
})
res.send('ok')
return res.send('ok')
} finally {
await redisDataSource.shutdown()
}
} catch (e) {
console.error(e)

View File

@ -1,5 +1,5 @@
import { RedisDataSource } from '@omnivore/utils'
import { BulkJobOptions, Queue } from 'bullmq'
import { redisDataSource } from './redis_data_source'
const QUEUE_NAME = 'omnivore-backend-queue'
export enum EmailJobType {
@ -28,10 +28,6 @@ interface EmailJobData {
confirmationCode?: string
}
const queue = new Queue(QUEUE_NAME, {
connection: redisDataSource.queueRedisClient,
})
const getPriority = (jobType: EmailJobType): number => {
// we want to prioritized jobs by the expected time to complete
// lower number means higher priority
@ -65,8 +61,13 @@ const getOpts = (jobType: EmailJobType): BulkJobOptions => {
}
export const queueEmailJob = async (
redisDataSource: RedisDataSource,
jobType: EmailJobType,
data: EmailJobData
) => {
const queue = new Queue(QUEUE_NAME, {
connection: redisDataSource.queueRedisClient,
})
await queue.add(jobType, data, getOpts(jobType))
}

View File

@ -1,3 +1,4 @@
import { RedisDataSource } from '@omnivore/utils'
import addressparser from 'addressparser'
import { EmailJobType, queueEmailJob } from './job'
@ -40,6 +41,7 @@ export const parseAuthor = (address: string): string => {
}
export const handleGoogleConfirmationEmail = async (
redisDataSource: RedisDataSource,
from: string,
to: string,
subject: string
@ -58,7 +60,7 @@ export const handleGoogleConfirmationEmail = async (
}
const message = { from, to, confirmationCode, subject }
return queueEmailJob(EmailJobType.ConfirmationEmail, message)
return queueEmailJob(redisDataSource, EmailJobType.ConfirmationEmail, message)
}
export const getConfirmationCode = (subject: string): string | undefined => {

View File

@ -1,99 +0,0 @@
import Redis, { RedisOptions } from 'ioredis'
import 'dotenv/config'
type RedisClientType = 'cache' | 'mq'
type RedisDataSourceOption = {
url?: string
cert?: string
}
export type RedisDataSourceOptions = {
[key in RedisClientType]: RedisDataSourceOption
}
export class RedisDataSource {
options: RedisDataSourceOptions
cacheClient: Redis
queueRedisClient: Redis
constructor(options: RedisDataSourceOptions) {
this.options = options
const cacheClient = createIORedisClient('cache', this.options)
if (!cacheClient) throw 'Error initializing cache redis client'
this.cacheClient = cacheClient
this.queueRedisClient =
createIORedisClient('mq', this.options) || this.cacheClient // if mq is not defined, use cache
}
async shutdown(): Promise<void> {
try {
await this.queueRedisClient?.quit()
await this.cacheClient?.quit()
} catch (err) {
console.error('error while shutting down redis', err)
}
}
}
const createIORedisClient = (
name: RedisClientType,
options: RedisDataSourceOptions
): Redis | undefined => {
const option = options[name]
const redisURL = option.url
if (!redisURL) {
console.log(`no redisURL supplied: ${name}`)
return undefined
}
const redisCert = option.cert
const tls =
redisURL.startsWith('rediss://') && redisCert
? {
ca: redisCert,
rejectUnauthorized: false,
}
: undefined
const redisOptions: RedisOptions = {
tls,
name,
connectTimeout: 10000,
maxRetriesPerRequest: null,
offlineQueue: false,
}
return new Redis(redisURL, redisOptions)
}
export const redisDataSource = new RedisDataSource({
cache: {
url: process.env.REDIS_URL,
cert: process.env.REDIS_CERT,
},
mq: {
url: process.env.MQ_REDIS_URL,
cert: process.env.MQ_REDIS_CERT,
},
})
const gracefulShutdown = async (signal: string) => {
console.log(`Received ${signal}, shutting down gracefully...`)
await redisDataSource.shutdown()
console.log('redis shutdown successfully')
process.exit(0)
}
process.on('SIGINT', () => {
;(async () => {
await gracefulShutdown('SIGINT')
})()
})
process.on('SIGTERM', () => {
;(async () => {
await gracefulShutdown('SIGTERM')
})()
})