From 0d1583351121a905b0733a7d7318f4edea0ca90f Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Tue, 2 Jul 2024 14:59:42 +0800 Subject: [PATCH] update inbound-email-handler to use @omnivore/utils to create redis connection --- packages/inbound-email-handler/package.json | 2 +- .../inbound-email-handler/src/attachment.ts | 4 +- packages/inbound-email-handler/src/index.ts | 38 +++++-- packages/inbound-email-handler/src/job.ts | 11 ++- .../inbound-email-handler/src/newsletter.ts | 4 +- .../src/redis_data_source.ts | 99 ------------------- 6 files changed, 45 insertions(+), 113 deletions(-) delete mode 100644 packages/inbound-email-handler/src/redis_data_source.ts diff --git a/packages/inbound-email-handler/package.json b/packages/inbound-email-handler/package.json index 2581a61a2..0839a709b 100644 --- a/packages/inbound-email-handler/package.json +++ b/packages/inbound-email-handler/package.json @@ -35,11 +35,11 @@ "dependencies": { "@google-cloud/functions-framework": "3.1.2", "@google-cloud/storage": "^7.0.1", + "@omnivore/utils": "1.0.0", "@sentry/serverless": "^7.77.0", "addressparser": "^1.0.1", "bullmq": "^5.1.1", "dotenv": "^8.2.0", - "ioredis": "^5.3.2", "parse-headers": "^2.0.4", "parse-multipart-data": "^1.2.1", "rfc2047": "^4.0.1", diff --git a/packages/inbound-email-handler/src/attachment.ts b/packages/inbound-email-handler/src/attachment.ts index db3d0d094..30b3833e7 100644 --- a/packages/inbound-email-handler/src/attachment.ts +++ b/packages/inbound-email-handler/src/attachment.ts @@ -1,4 +1,5 @@ import { Storage } from '@google-cloud/storage' +import { RedisDataSource } from '@omnivore/utils' import { v4 as uuid } from 'uuid' import { EmailJobType, queueEmailJob } from './job' @@ -37,6 +38,7 @@ export const uploadToBucket = async ( } export const handleAttachments = async ( + redisDataSource: RedisDataSource, from: string, to: string, subject: string, @@ -54,7 +56,7 @@ export const handleAttachments = async ( public: false, }) - await queueEmailJob(EmailJobType.SaveAttachment, { + await queueEmailJob(redisDataSource, EmailJobType.SaveAttachment, { from, to, uploadFile: { diff --git a/packages/inbound-email-handler/src/index.ts b/packages/inbound-email-handler/src/index.ts index 0abd80dac..8d3232109 100644 --- a/packages/inbound-email-handler/src/index.ts +++ b/packages/inbound-email-handler/src/index.ts @@ -2,7 +2,9 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ /* eslint-disable @typescript-eslint/no-unsafe-argument */ /* eslint-disable @typescript-eslint/no-unused-vars */ +import { RedisDataSource } from '@omnivore/utils' import * as Sentry from '@sentry/serverless' +import 'dotenv/config' import parseHeaders from 'parse-headers' import * as multipart from 'parse-multipart-data' import rfc2047 from 'rfc2047' @@ -72,6 +74,17 @@ export const inboundEmailHandler = Sentry.GCPFunction.wrapHttpFunction( ? parseUnsubscribe(unSubHeader) : undefined + const redisDataSource = new RedisDataSource({ + cache: { + url: process.env.REDIS_URL, + cert: process.env.REDIS_CERT, + }, + mq: { + url: process.env.MQ_REDIS_URL, + cert: process.env.MQ_REDIS_CERT, + }, + }) + try { // check if it is a subscription or google confirmation email const isGoogleConfirmation = isGoogleConfirmationEmail(from, subject) @@ -79,11 +92,16 @@ export const inboundEmailHandler = Sentry.GCPFunction.wrapHttpFunction( console.debug('handleConfirmation', from, subject) // we need to parse the confirmation code from the email if (isGoogleConfirmation) { - await handleGoogleConfirmationEmail(from, to, subject) + await handleGoogleConfirmationEmail( + redisDataSource, + from, + to, + subject + ) } // forward emails - await queueEmailJob(EmailJobType.ForwardEmail, { + await queueEmailJob(redisDataSource, EmailJobType.ForwardEmail, { from, to, subject, @@ -98,13 +116,19 @@ export const inboundEmailHandler = Sentry.GCPFunction.wrapHttpFunction( if (attachments.length > 0) { console.debug('handle attachments', from, to, subject) // save the attachments as articles - await handleAttachments(from, to, subject, attachments) + await handleAttachments( + redisDataSource, + from, + to, + subject, + attachments + ) return res.send('ok') } // all other emails are considered newsletters // queue newsletter emails - await queueEmailJob(EmailJobType.SaveNewsletter, { + await queueEmailJob(redisDataSource, EmailJobType.SaveNewsletter, { from, to, subject, @@ -128,7 +152,7 @@ export const inboundEmailHandler = Sentry.GCPFunction.wrapHttpFunction( ) // fallback to forward the email - await queueEmailJob(EmailJobType.ForwardEmail, { + await queueEmailJob(redisDataSource, EmailJobType.ForwardEmail, { from, to, subject, @@ -139,7 +163,9 @@ export const inboundEmailHandler = Sentry.GCPFunction.wrapHttpFunction( replyTo, }) - res.send('ok') + return res.send('ok') + } finally { + await redisDataSource.shutdown() } } catch (e) { console.error(e) diff --git a/packages/inbound-email-handler/src/job.ts b/packages/inbound-email-handler/src/job.ts index 89a2fea40..2d379daf5 100644 --- a/packages/inbound-email-handler/src/job.ts +++ b/packages/inbound-email-handler/src/job.ts @@ -1,5 +1,5 @@ +import { RedisDataSource } from '@omnivore/utils' import { BulkJobOptions, Queue } from 'bullmq' -import { redisDataSource } from './redis_data_source' const QUEUE_NAME = 'omnivore-backend-queue' export enum EmailJobType { @@ -28,10 +28,6 @@ interface EmailJobData { confirmationCode?: string } -const queue = new Queue(QUEUE_NAME, { - connection: redisDataSource.queueRedisClient, -}) - const getPriority = (jobType: EmailJobType): number => { // we want to prioritized jobs by the expected time to complete // lower number means higher priority @@ -65,8 +61,13 @@ const getOpts = (jobType: EmailJobType): BulkJobOptions => { } export const queueEmailJob = async ( + redisDataSource: RedisDataSource, jobType: EmailJobType, data: EmailJobData ) => { + const queue = new Queue(QUEUE_NAME, { + connection: redisDataSource.queueRedisClient, + }) + await queue.add(jobType, data, getOpts(jobType)) } diff --git a/packages/inbound-email-handler/src/newsletter.ts b/packages/inbound-email-handler/src/newsletter.ts index b31103ada..f72ee2c00 100644 --- a/packages/inbound-email-handler/src/newsletter.ts +++ b/packages/inbound-email-handler/src/newsletter.ts @@ -1,3 +1,4 @@ +import { RedisDataSource } from '@omnivore/utils' import addressparser from 'addressparser' import { EmailJobType, queueEmailJob } from './job' @@ -40,6 +41,7 @@ export const parseAuthor = (address: string): string => { } export const handleGoogleConfirmationEmail = async ( + redisDataSource: RedisDataSource, from: string, to: string, subject: string @@ -58,7 +60,7 @@ export const handleGoogleConfirmationEmail = async ( } const message = { from, to, confirmationCode, subject } - return queueEmailJob(EmailJobType.ConfirmationEmail, message) + return queueEmailJob(redisDataSource, EmailJobType.ConfirmationEmail, message) } export const getConfirmationCode = (subject: string): string | undefined => { diff --git a/packages/inbound-email-handler/src/redis_data_source.ts b/packages/inbound-email-handler/src/redis_data_source.ts deleted file mode 100644 index 05bf8d26c..000000000 --- a/packages/inbound-email-handler/src/redis_data_source.ts +++ /dev/null @@ -1,99 +0,0 @@ -import Redis, { RedisOptions } from 'ioredis' -import 'dotenv/config' - -type RedisClientType = 'cache' | 'mq' -type RedisDataSourceOption = { - url?: string - cert?: string -} -export type RedisDataSourceOptions = { - [key in RedisClientType]: RedisDataSourceOption -} - -export class RedisDataSource { - options: RedisDataSourceOptions - - cacheClient: Redis - queueRedisClient: Redis - - constructor(options: RedisDataSourceOptions) { - this.options = options - - const cacheClient = createIORedisClient('cache', this.options) - if (!cacheClient) throw 'Error initializing cache redis client' - - this.cacheClient = cacheClient - this.queueRedisClient = - createIORedisClient('mq', this.options) || this.cacheClient // if mq is not defined, use cache - } - - async shutdown(): Promise { - try { - await this.queueRedisClient?.quit() - await this.cacheClient?.quit() - } catch (err) { - console.error('error while shutting down redis', err) - } - } -} - -const createIORedisClient = ( - name: RedisClientType, - options: RedisDataSourceOptions -): Redis | undefined => { - const option = options[name] - const redisURL = option.url - if (!redisURL) { - console.log(`no redisURL supplied: ${name}`) - return undefined - } - - const redisCert = option.cert - const tls = - redisURL.startsWith('rediss://') && redisCert - ? { - ca: redisCert, - rejectUnauthorized: false, - } - : undefined - - const redisOptions: RedisOptions = { - tls, - name, - connectTimeout: 10000, - maxRetriesPerRequest: null, - offlineQueue: false, - } - return new Redis(redisURL, redisOptions) -} - -export const redisDataSource = new RedisDataSource({ - cache: { - url: process.env.REDIS_URL, - cert: process.env.REDIS_CERT, - }, - mq: { - url: process.env.MQ_REDIS_URL, - cert: process.env.MQ_REDIS_CERT, - }, -}) - -const gracefulShutdown = async (signal: string) => { - console.log(`Received ${signal}, shutting down gracefully...`) - - await redisDataSource.shutdown() - console.log('redis shutdown successfully') - - process.exit(0) -} - -process.on('SIGINT', () => { - ;(async () => { - await gracefulShutdown('SIGINT') - })() -}) -process.on('SIGTERM', () => { - ;(async () => { - await gracefulShutdown('SIGTERM') - })() -})