Merge pull request #4136 from omnivore-app/fix/pdf-handler

fix: pdf handler sends update-content-job to the wrong redis
This commit is contained in:
Hongbo Wu
2024-07-02 15:36:40 +08:00
committed by GitHub
21 changed files with 137 additions and 150 deletions

View File

@ -11,11 +11,14 @@ COPY .eslintrc .
COPY /packages/inbound-email-handler/package.json ./packages/inbound-email-handler/package.json COPY /packages/inbound-email-handler/package.json ./packages/inbound-email-handler/package.json
COPY /packages/content-handler/package.json ./packages/content-handler/package.json COPY /packages/content-handler/package.json ./packages/content-handler/package.json
COPY /packages/utils/package.json ./packages/utils/package.json
RUN yarn install --pure-lockfile RUN yarn install --pure-lockfile
ADD /packages/inbound-email-handler ./packages/inbound-email-handler ADD /packages/inbound-email-handler ./packages/inbound-email-handler
ADD /packages/content-handler ./packages/content-handler ADD /packages/content-handler ./packages/content-handler
ADD /packages/utils ./packages/utils
RUN yarn workspace @omnivore/utils build
RUN yarn workspace @omnivore/content-handler build RUN yarn workspace @omnivore/content-handler build
RUN yarn workspace @omnivore/inbound-email-handler build RUN yarn workspace @omnivore/inbound-email-handler build

View File

@ -1,6 +1,5 @@
{ {
"extension": ["ts"], "extension": ["ts"],
"spec": "test/**/*.test.ts", "spec": "test/**/*.test.ts",
"require": ["test/global-teardown.ts"],
"timeout": 10000 "timeout": 10000
} }

View File

@ -35,11 +35,11 @@
"dependencies": { "dependencies": {
"@google-cloud/functions-framework": "3.1.2", "@google-cloud/functions-framework": "3.1.2",
"@google-cloud/storage": "^7.0.1", "@google-cloud/storage": "^7.0.1",
"@omnivore/utils": "1.0.0",
"@sentry/serverless": "^7.77.0", "@sentry/serverless": "^7.77.0",
"addressparser": "^1.0.1", "addressparser": "^1.0.1",
"bullmq": "^5.1.1", "bullmq": "^5.1.1",
"dotenv": "^8.2.0", "dotenv": "^8.2.0",
"ioredis": "^5.3.2",
"parse-headers": "^2.0.4", "parse-headers": "^2.0.4",
"parse-multipart-data": "^1.2.1", "parse-multipart-data": "^1.2.1",
"rfc2047": "^4.0.1", "rfc2047": "^4.0.1",

View File

@ -1,4 +1,5 @@
import { Storage } from '@google-cloud/storage' import { Storage } from '@google-cloud/storage'
import { RedisDataSource } from '@omnivore/utils'
import { v4 as uuid } from 'uuid' import { v4 as uuid } from 'uuid'
import { EmailJobType, queueEmailJob } from './job' import { EmailJobType, queueEmailJob } from './job'
@ -37,6 +38,7 @@ export const uploadToBucket = async (
} }
export const handleAttachments = async ( export const handleAttachments = async (
redisDataSource: RedisDataSource,
from: string, from: string,
to: string, to: string,
subject: string, subject: string,
@ -54,7 +56,7 @@ export const handleAttachments = async (
public: false, public: false,
}) })
await queueEmailJob(EmailJobType.SaveAttachment, { await queueEmailJob(redisDataSource, EmailJobType.SaveAttachment, {
from, from,
to, to,
uploadFile: { uploadFile: {

View File

@ -2,7 +2,9 @@
/* eslint-disable @typescript-eslint/no-explicit-any */ /* eslint-disable @typescript-eslint/no-explicit-any */
/* eslint-disable @typescript-eslint/no-unsafe-argument */ /* eslint-disable @typescript-eslint/no-unsafe-argument */
/* eslint-disable @typescript-eslint/no-unused-vars */ /* eslint-disable @typescript-eslint/no-unused-vars */
import { RedisDataSource } from '@omnivore/utils'
import * as Sentry from '@sentry/serverless' import * as Sentry from '@sentry/serverless'
import 'dotenv/config'
import parseHeaders from 'parse-headers' import parseHeaders from 'parse-headers'
import * as multipart from 'parse-multipart-data' import * as multipart from 'parse-multipart-data'
import rfc2047 from 'rfc2047' import rfc2047 from 'rfc2047'
@ -72,6 +74,17 @@ export const inboundEmailHandler = Sentry.GCPFunction.wrapHttpFunction(
? parseUnsubscribe(unSubHeader) ? parseUnsubscribe(unSubHeader)
: undefined : undefined
const redisDataSource = new RedisDataSource({
cache: {
url: process.env.REDIS_URL,
cert: process.env.REDIS_CERT,
},
mq: {
url: process.env.MQ_REDIS_URL,
cert: process.env.MQ_REDIS_CERT,
},
})
try { try {
// check if it is a subscription or google confirmation email // check if it is a subscription or google confirmation email
const isGoogleConfirmation = isGoogleConfirmationEmail(from, subject) const isGoogleConfirmation = isGoogleConfirmationEmail(from, subject)
@ -79,11 +92,16 @@ export const inboundEmailHandler = Sentry.GCPFunction.wrapHttpFunction(
console.debug('handleConfirmation', from, subject) console.debug('handleConfirmation', from, subject)
// we need to parse the confirmation code from the email // we need to parse the confirmation code from the email
if (isGoogleConfirmation) { if (isGoogleConfirmation) {
await handleGoogleConfirmationEmail(from, to, subject) await handleGoogleConfirmationEmail(
redisDataSource,
from,
to,
subject
)
} }
// forward emails // forward emails
await queueEmailJob(EmailJobType.ForwardEmail, { await queueEmailJob(redisDataSource, EmailJobType.ForwardEmail, {
from, from,
to, to,
subject, subject,
@ -98,13 +116,19 @@ export const inboundEmailHandler = Sentry.GCPFunction.wrapHttpFunction(
if (attachments.length > 0) { if (attachments.length > 0) {
console.debug('handle attachments', from, to, subject) console.debug('handle attachments', from, to, subject)
// save the attachments as articles // save the attachments as articles
await handleAttachments(from, to, subject, attachments) await handleAttachments(
redisDataSource,
from,
to,
subject,
attachments
)
return res.send('ok') return res.send('ok')
} }
// all other emails are considered newsletters // all other emails are considered newsletters
// queue newsletter emails // queue newsletter emails
await queueEmailJob(EmailJobType.SaveNewsletter, { await queueEmailJob(redisDataSource, EmailJobType.SaveNewsletter, {
from, from,
to, to,
subject, subject,
@ -128,7 +152,7 @@ export const inboundEmailHandler = Sentry.GCPFunction.wrapHttpFunction(
) )
// fallback to forward the email // fallback to forward the email
await queueEmailJob(EmailJobType.ForwardEmail, { await queueEmailJob(redisDataSource, EmailJobType.ForwardEmail, {
from, from,
to, to,
subject, subject,
@ -139,7 +163,9 @@ export const inboundEmailHandler = Sentry.GCPFunction.wrapHttpFunction(
replyTo, replyTo,
}) })
res.send('ok') return res.send('ok')
} finally {
await redisDataSource.shutdown()
} }
} catch (e) { } catch (e) {
console.error(e) console.error(e)

View File

@ -1,5 +1,5 @@
import { RedisDataSource } from '@omnivore/utils'
import { BulkJobOptions, Queue } from 'bullmq' import { BulkJobOptions, Queue } from 'bullmq'
import { redisDataSource } from './redis_data_source'
const QUEUE_NAME = 'omnivore-backend-queue' const QUEUE_NAME = 'omnivore-backend-queue'
export enum EmailJobType { export enum EmailJobType {
@ -28,10 +28,6 @@ interface EmailJobData {
confirmationCode?: string confirmationCode?: string
} }
const queue = new Queue(QUEUE_NAME, {
connection: redisDataSource.queueRedisClient,
})
const getPriority = (jobType: EmailJobType): number => { const getPriority = (jobType: EmailJobType): number => {
// we want to prioritized jobs by the expected time to complete // we want to prioritized jobs by the expected time to complete
// lower number means higher priority // lower number means higher priority
@ -65,8 +61,13 @@ const getOpts = (jobType: EmailJobType): BulkJobOptions => {
} }
export const queueEmailJob = async ( export const queueEmailJob = async (
redisDataSource: RedisDataSource,
jobType: EmailJobType, jobType: EmailJobType,
data: EmailJobData data: EmailJobData
) => { ) => {
const queue = new Queue(QUEUE_NAME, {
connection: redisDataSource.queueRedisClient,
})
await queue.add(jobType, data, getOpts(jobType)) await queue.add(jobType, data, getOpts(jobType))
} }

View File

@ -1,3 +1,4 @@
import { RedisDataSource } from '@omnivore/utils'
import addressparser from 'addressparser' import addressparser from 'addressparser'
import { EmailJobType, queueEmailJob } from './job' import { EmailJobType, queueEmailJob } from './job'
@ -40,6 +41,7 @@ export const parseAuthor = (address: string): string => {
} }
export const handleGoogleConfirmationEmail = async ( export const handleGoogleConfirmationEmail = async (
redisDataSource: RedisDataSource,
from: string, from: string,
to: string, to: string,
subject: string subject: string
@ -58,7 +60,7 @@ export const handleGoogleConfirmationEmail = async (
} }
const message = { from, to, confirmationCode, subject } const message = { from, to, confirmationCode, subject }
return queueEmailJob(EmailJobType.ConfirmationEmail, message) return queueEmailJob(redisDataSource, EmailJobType.ConfirmationEmail, message)
} }
export const getConfirmationCode = (subject: string): string | undefined => { export const getConfirmationCode = (subject: string): string | undefined => {

View File

@ -1,5 +0,0 @@
import { redisDataSource } from '../src/redis_data_source'
export const mochaGlobalTeardown = async () => {
await redisDataSource.shutdown()
}

View File

@ -10,10 +10,13 @@ COPY .prettierrc .
COPY .eslintrc . COPY .eslintrc .
COPY /packages/pdf-handler/package.json ./packages/pdf-handler/package.json COPY /packages/pdf-handler/package.json ./packages/pdf-handler/package.json
COPY /packages/utils/package.json ./packages/utils/package.json
RUN yarn install --pure-lockfile RUN yarn install --pure-lockfile
ADD /packages/pdf-handler ./packages/pdf-handler ADD /packages/pdf-handler ./packages/pdf-handler
ADD /packages/utils ./packages/utils
RUN yarn workspace @omnivore/utils build
RUN yarn workspace @omnivore/pdf-handler build RUN yarn workspace @omnivore/pdf-handler build
# After building, fetch the production dependencies # After building, fetch the production dependencies

View File

@ -30,12 +30,12 @@
"@google-cloud/functions-framework": "3.1.2", "@google-cloud/functions-framework": "3.1.2",
"@google-cloud/pubsub": "^4.0.0", "@google-cloud/pubsub": "^4.0.0",
"@google-cloud/storage": "^7.0.1", "@google-cloud/storage": "^7.0.1",
"@omnivore/utils": "1.0.0",
"@sentry/serverless": "^7.77.0", "@sentry/serverless": "^7.77.0",
"axios": "^0.27.2", "axios": "^0.27.2",
"bullmq": "^5.1.4", "bullmq": "^5.1.4",
"concurrently": "^7.0.0", "concurrently": "^7.0.0",
"dotenv": "^8.2.0", "dotenv": "^8.2.0",
"ioredis": "^5.3.2",
"pdfjs-dist": "^2.9.359" "pdfjs-dist": "^2.9.359"
}, },
"volta": { "volta": {

View File

@ -1,7 +1,9 @@
import { GetSignedUrlConfig, Storage } from '@google-cloud/storage' import { GetSignedUrlConfig, Storage } from '@google-cloud/storage'
import { RedisDataSource } from '@omnivore/utils'
import * as Sentry from '@sentry/serverless' import * as Sentry from '@sentry/serverless'
import { parsePdf } from './pdf' import 'dotenv/config'
import { queueUpdatePageJob, State } from './job' import { queueUpdatePageJob, State } from './job'
import { parsePdf } from './pdf'
Sentry.GCPFunction.init({ Sentry.GCPFunction.init({
dsn: process.env.SENTRY_DSN, dsn: process.env.SENTRY_DSN,
@ -49,6 +51,7 @@ const getDocumentUrl = async (
} }
export const updatePageContent = async ( export const updatePageContent = async (
redisDataSource: RedisDataSource,
fileId: string, fileId: string,
content?: string, content?: string,
title?: string, title?: string,
@ -56,7 +59,7 @@ export const updatePageContent = async (
description?: string, description?: string,
state?: State state?: State
): Promise<string | undefined> => { ): Promise<string | undefined> => {
const job = await queueUpdatePageJob({ const job = await queueUpdatePageJob(redisDataSource, {
fileId, fileId,
content, content,
title, title,
@ -106,6 +109,17 @@ export const pdfHandler = Sentry.GCPFunction.wrapHttpFunction(
description, description,
state: State = 'SUCCEEDED' // Default to succeeded even if we fail to parse state: State = 'SUCCEEDED' // Default to succeeded even if we fail to parse
const redisDataSource = new RedisDataSource({
cache: {
url: process.env.REDIS_URL,
cert: process.env.REDIS_CERT,
},
mq: {
url: process.env.MQ_REDIS_URL,
cert: process.env.MQ_REDIS_CERT,
},
})
try { try {
const url = await getDocumentUrl(data) const url = await getDocumentUrl(data)
console.log('PDF url: ', url) console.log('PDF url: ', url)
@ -130,6 +144,7 @@ export const pdfHandler = Sentry.GCPFunction.wrapHttpFunction(
} finally { } finally {
// Always update the state, even if we fail to parse // Always update the state, even if we fail to parse
const result = await updatePageContent( const result = await updatePageContent(
redisDataSource,
data.name, data.name,
content, content,
title, title,
@ -147,6 +162,8 @@ export const pdfHandler = Sentry.GCPFunction.wrapHttpFunction(
'state', 'state',
state state
) )
await redisDataSource.shutdown()
} }
} }

View File

@ -1,13 +1,9 @@
import { RedisDataSource } from '@omnivore/utils'
import { Queue } from 'bullmq' import { Queue } from 'bullmq'
import { redisDataSource } from './redis_data_source'
const QUEUE_NAME = 'omnivore-backend-queue' const QUEUE_NAME = 'omnivore-backend-queue'
const JOB_NAME = 'update-pdf-content' const JOB_NAME = 'update-pdf-content'
const queue = new Queue(QUEUE_NAME, {
connection: redisDataSource.queueRedisClient,
})
export type State = 'SUCCEEDED' | 'FAILED' export type State = 'SUCCEEDED' | 'FAILED'
type UpdatePageJobData = { type UpdatePageJobData = {
@ -19,7 +15,14 @@ type UpdatePageJobData = {
state?: State state?: State
} }
export const queueUpdatePageJob = async (data: UpdatePageJobData) => { export const queueUpdatePageJob = async (
redisDataSource: RedisDataSource,
data: UpdatePageJobData
) => {
const queue = new Queue(QUEUE_NAME, {
connection: redisDataSource.queueRedisClient,
})
return queue.add(JOB_NAME, data, { return queue.add(JOB_NAME, data, {
priority: 5, priority: 5,
attempts: 3, attempts: 3,

View File

@ -1,89 +0,0 @@
import Redis, { RedisOptions } from 'ioredis'
import 'dotenv/config'
export type RedisDataSourceOptions = {
REDIS_URL?: string
REDIS_CERT?: string
}
export class RedisDataSource {
options: RedisDataSourceOptions
cacheClient: Redis
queueRedisClient: Redis
constructor(options: RedisDataSourceOptions) {
this.options = options
this.cacheClient = createRedisClient('cache', this.options)
this.queueRedisClient = createRedisClient('queue', this.options)
}
setOptions(options: RedisDataSourceOptions): void {
this.options = options
}
async shutdown(): Promise<void> {
try {
await this.queueRedisClient?.quit()
await this.cacheClient?.quit()
} catch (err) {
console.error('error while shutting down redis', err)
}
}
}
const createRedisClient = (name: string, options: RedisDataSourceOptions) => {
const redisURL = options.REDIS_URL
const cert = options.REDIS_CERT?.replace(/\\n/g, '\n') // replace \n with new line
if (!redisURL) {
throw 'Error: no redisURL supplied'
}
const redisOptions: RedisOptions = {
name,
connectTimeout: 10000, // 10 seconds
tls: cert
? {
cert,
rejectUnauthorized: false, // for self-signed certs
}
: undefined,
maxRetriesPerRequest: null,
offlineQueue: false,
}
const redis = new Redis(redisURL, redisOptions)
redis.on('connect', () => {
console.log('Redis connected', name)
})
redis.on('error', (err) => {
console.error('Redis error', err, name)
})
redis.on('close', () => {
console.log('Redis closed', name)
})
return redis
}
export const redisDataSource = new RedisDataSource({
REDIS_URL: process.env.REDIS_URL,
REDIS_CERT: process.env.REDIS_CERT,
})
// eslint-disable-next-line @typescript-eslint/no-misused-promises
process.on('SIGINT', async () => {
console.log('SIGINT signal received.')
try {
await redisDataSource.shutdown()
} catch (error) {
console.error('error while shutting down redis', error)
}
process.exit(0)
})

View File

@ -0,0 +1,2 @@
node_modules/
build/

6
packages/utils/.eslintrc Normal file
View File

@ -0,0 +1,6 @@
{
"extends": "../../.eslintrc",
"parserOptions": {
"project": "tsconfig.json"
}
}

View File

@ -0,0 +1,5 @@
{
"extension": ["ts"],
"spec": "test/**/*.test.ts",
"timeout": 10000
}

View File

@ -0,0 +1,25 @@
{
"name": "@omnivore/utils",
"version": "1.0.0",
"description": "Utility functions for Omnivore packages.",
"main": "./build/src/index.js",
"types": "./build/src/index.d.ts",
"scripts": {
"test": "yarn mocha -r ts-node/register --config mocha-config.json",
"lint": "eslint src --ext ts,js,tsx,jsx",
"build": "tsc"
},
"devDependencies": {
"@types/chai": "^4.3.6",
"@types/mocha": "^10.0.0",
"chai": "^4.3.6",
"eslint-plugin-prettier": "^4.0.0",
"mocha": "^10.0.0"
},
"dependencies": {
"ioredis": "^5.3.2"
},
"volta": {
"extends": "../../package.json"
}
}

View File

@ -0,0 +1 @@
export * from './redis_data_source'

View File

@ -1,5 +1,4 @@
import Redis, { RedisOptions } from 'ioredis' import Redis, { RedisOptions } from 'ioredis'
import 'dotenv/config'
type RedisClientType = 'cache' | 'mq' type RedisClientType = 'cache' | 'mq'
type RedisDataSourceOption = { type RedisDataSourceOption = {
@ -31,6 +30,8 @@ export class RedisDataSource {
try { try {
await this.queueRedisClient?.quit() await this.queueRedisClient?.quit()
await this.cacheClient?.quit() await this.cacheClient?.quit()
console.log('redis shutdown complete')
} catch (err) { } catch (err) {
console.error('error while shutting down redis', err) console.error('error while shutting down redis', err)
} }
@ -66,34 +67,3 @@ const createIORedisClient = (
} }
return new Redis(redisURL, redisOptions) return new Redis(redisURL, redisOptions)
} }
export const redisDataSource = new RedisDataSource({
cache: {
url: process.env.REDIS_URL,
cert: process.env.REDIS_CERT,
},
mq: {
url: process.env.MQ_REDIS_URL,
cert: process.env.MQ_REDIS_CERT,
},
})
const gracefulShutdown = async (signal: string) => {
console.log(`Received ${signal}, shutting down gracefully...`)
await redisDataSource.shutdown()
console.log('redis shutdown successfully')
process.exit(0)
}
process.on('SIGINT', () => {
;(async () => {
await gracefulShutdown('SIGINT')
})()
})
process.on('SIGTERM', () => {
;(async () => {
await gracefulShutdown('SIGTERM')
})()
})

View File

@ -0,0 +1,8 @@
import 'mocha'
import { expect } from 'chai'
describe('stub test', () => {
it('should pass', () => {
expect(true).to.be.true
})
})

View File

@ -0,0 +1,8 @@
{
"extends": "./../../tsconfig.json",
"compilerOptions": {
"declaration": true,
"outDir": "build",
},
"include": ["src", "test"]
}