Merge pull request #1612 from omnivore-app/feat/email-after-import
Send email to user after a bulk import completes
This commit is contained in:
@ -1,8 +1,5 @@
|
||||
/* eslint-disable @typescript-eslint/restrict-template-expressions */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
|
||||
/* eslint-disable @typescript-eslint/explicit-module-boundary-types */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-call */
|
||||
import express from 'express'
|
||||
import { CreateArticleErrorCode } from '../generated/graphql'
|
||||
import { isSiteBlockedForParse } from '../utils/blocked'
|
||||
|
||||
61
packages/api/src/routers/user_router.ts
Normal file
61
packages/api/src/routers/user_router.ts
Normal file
@ -0,0 +1,61 @@
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
|
||||
import express from 'express'
|
||||
import { sendEmail } from '../utils/sendEmail'
|
||||
import { env } from '../env'
|
||||
import { buildLogger } from '../utils/logger'
|
||||
import { getRepository } from '../entity/utils'
|
||||
import { User } from '../entity/user'
|
||||
import { getClaimsByToken } from '../utils/auth'
|
||||
import { corsConfig } from '../utils/corsConfig'
|
||||
import cors from 'cors'
|
||||
|
||||
const logger = buildLogger('app.dispatch')
|
||||
|
||||
export function userRouter() {
|
||||
const router = express.Router()
|
||||
|
||||
router.post('/email', cors<express.Request>(corsConfig), async (req, res) => {
|
||||
logger.info('email to-user router')
|
||||
const token = req?.headers?.authorization
|
||||
const claims = await getClaimsByToken(token)
|
||||
if (!claims) {
|
||||
res.status(401).send('UNAUTHORIZED')
|
||||
return
|
||||
}
|
||||
const from = process.env.SENDER_MESSAGE
|
||||
const { body, subject } = req.body as {
|
||||
body?: string
|
||||
subject?: string
|
||||
}
|
||||
if (!subject || !body || !from) {
|
||||
console.log(subject, body, from)
|
||||
res.status(400).send('Bad Request')
|
||||
return
|
||||
}
|
||||
try {
|
||||
const user = await getRepository(User).findOneBy({ id: claims.uid })
|
||||
if (!user) {
|
||||
res.status(400).send('Bad Request')
|
||||
return
|
||||
}
|
||||
const result = await sendEmail({
|
||||
from: env.sender.message,
|
||||
to: user.email,
|
||||
subject: subject,
|
||||
text: body,
|
||||
})
|
||||
if (!result) {
|
||||
logger.error('Email not sent to user')
|
||||
res.status(500).send('Failed to send email')
|
||||
return
|
||||
}
|
||||
res.status(200).send('Email sent to user')
|
||||
} catch (e) {
|
||||
logger.info(e)
|
||||
res.status(500).send('Email sent to user')
|
||||
}
|
||||
})
|
||||
|
||||
return router
|
||||
}
|
||||
@ -48,6 +48,7 @@ import { integrationsServiceRouter } from './routers/svc/integrations'
|
||||
import { textToSpeechRouter } from './routers/text_to_speech'
|
||||
import * as httpContext from 'express-http-context'
|
||||
import { notificationRouter } from './routers/notification_router'
|
||||
import { userRouter } from './routers/user_router'
|
||||
|
||||
const PORT = process.env.PORT || 4000
|
||||
|
||||
@ -129,6 +130,7 @@ export const createApp = (): {
|
||||
|
||||
app.use('/api/auth', authRouter())
|
||||
app.use('/api/page', pageRouter())
|
||||
app.use('/api/user', userRouter())
|
||||
app.use('/api/article', articleRouter())
|
||||
app.use('/api/mobile-auth', mobileAuthRouter())
|
||||
app.use('/api/text-to-speech', textToSpeechRouter())
|
||||
|
||||
@ -29,8 +29,6 @@
|
||||
"@google-cloud/storage": "^5.18.1",
|
||||
"@google-cloud/tasks": "^3.0.5",
|
||||
"@types/express": "^4.17.13",
|
||||
"axios": "^0.27.2",
|
||||
"concurrently": "^7.0.0",
|
||||
"csv-parser": "^3.0.0",
|
||||
"nodemon": "^2.0.15"
|
||||
}
|
||||
|
||||
@ -8,7 +8,12 @@ import * as path from 'path'
|
||||
import { importMatterHistory } from './matterHistory'
|
||||
import { Stream } from 'node:stream'
|
||||
import { v4 as uuid } from 'uuid'
|
||||
import { createCloudTask } from './task'
|
||||
import { CONTENT_FETCH_URL, createCloudTask, EMAIL_USER_URL } from './task'
|
||||
|
||||
import { promisify } from 'util'
|
||||
import * as jwt from 'jsonwebtoken'
|
||||
|
||||
const signToken = promisify(jwt.sign)
|
||||
|
||||
const storage = new Storage()
|
||||
|
||||
@ -42,7 +47,7 @@ const importURL = async (
|
||||
url: URL,
|
||||
source: string
|
||||
): Promise<string | undefined> => {
|
||||
return createCloudTask({
|
||||
return createCloudTask(CONTENT_FETCH_URL, {
|
||||
userId,
|
||||
source,
|
||||
url: url.toString(),
|
||||
@ -50,6 +55,41 @@ const importURL = async (
|
||||
})
|
||||
}
|
||||
|
||||
const createEmailCloudTask = async (userId: string, payload: unknown) => {
|
||||
if (!process.env.JWT_SECRET) {
|
||||
throw 'Envrionment not setup correctly'
|
||||
}
|
||||
|
||||
const exp = Math.floor(Date.now() / 1000) + 60 * 60 * 24 // 1 day
|
||||
const authToken = (await signToken(
|
||||
{ uid: userId, exp },
|
||||
process.env.JWT_SECRET
|
||||
)) as string
|
||||
const headers = {
|
||||
Authorization: authToken,
|
||||
}
|
||||
|
||||
return createCloudTask(EMAIL_USER_URL, payload, headers)
|
||||
}
|
||||
|
||||
const sendImportFailedEmail = async (userId: string) => {
|
||||
return createEmailCloudTask(userId, {
|
||||
subject: 'Your Omnivore import failed.',
|
||||
body: `There was an error importing your file. Please ensure you uploaded the correct file type, if you need help, please email feedback@omnivore.app`,
|
||||
})
|
||||
}
|
||||
|
||||
const sendImportCompletedEmail = async (
|
||||
userId: string,
|
||||
urlsEnqueued: number,
|
||||
urlsFailed: number
|
||||
) => {
|
||||
return createEmailCloudTask(userId, {
|
||||
subject: 'Your Omnivore import has completed processing',
|
||||
body: `${urlsEnqueued} URLs have been pcoessed and should be available in your library. ${urlsFailed} URLs failed to be parsed.`,
|
||||
})
|
||||
}
|
||||
|
||||
const handlerForFile = (name: string): importHandlerFunc | undefined => {
|
||||
const fileName = path.parse(name).name
|
||||
if (fileName.startsWith('MATTER')) {
|
||||
@ -79,21 +119,36 @@ export const importHandler: EventFunction = async (event, context) => {
|
||||
return
|
||||
}
|
||||
|
||||
const regex = new RegExp('imports/(.*?)/')
|
||||
const groups = regex.exec(data.name)
|
||||
if (!groups || groups.length < 2) {
|
||||
console.log('could not match file pattern: ', data.name)
|
||||
return
|
||||
}
|
||||
const userId = [...groups][1]
|
||||
if (!userId) {
|
||||
console.log('could not extract userId from file name')
|
||||
return
|
||||
}
|
||||
|
||||
let countFailed = 0
|
||||
let countImported = 0
|
||||
await handler(stream, async (url): Promise<void> => {
|
||||
try {
|
||||
// Imports are stored in the format imports/<user id>/<type>-<uuid>.csv
|
||||
const regex = new RegExp('imports/(.*?)/')
|
||||
const groups = regex.exec(data.name)
|
||||
if (!groups || groups.length < 2) {
|
||||
console.log('could not match file pattern: ', data.name)
|
||||
return
|
||||
}
|
||||
const userId = [...groups][1]
|
||||
const result = await importURL(userId, url, 'csv-importer')
|
||||
console.log('import url result', result)
|
||||
countImported = countImported + 1
|
||||
} catch (err) {
|
||||
console.log('error importing url', err)
|
||||
countFailed = countFailed + 1
|
||||
}
|
||||
})
|
||||
|
||||
if (countImported < 1) {
|
||||
await sendImportFailedEmail(userId)
|
||||
} else {
|
||||
await sendImportCompletedEmail(userId, countImported, countFailed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,20 +1,25 @@
|
||||
/* eslint-disable @typescript-eslint/restrict-template-expressions */
|
||||
import { CloudTasksClient, protos } from '@google-cloud/tasks'
|
||||
|
||||
type TaskPayload = {
|
||||
url: string
|
||||
userId: string
|
||||
saveRequestId: string
|
||||
source: string
|
||||
}
|
||||
|
||||
const cloudTask = new CloudTasksClient()
|
||||
|
||||
export const createCloudTask = async (payload: TaskPayload) => {
|
||||
export const EMAIL_USER_URL = (() => {
|
||||
if (!process.env.INTERNAL_SVC_ENDPOINT) {
|
||||
throw `Environment not configured correctly, no SVC endpoint`
|
||||
}
|
||||
return (process.env.INTERNAL_SVC_ENDPOINT ?? '') + '/api/user/email'
|
||||
})()
|
||||
|
||||
export const CONTENT_FETCH_URL = process.env.CONTENT_FETCH_GCF_URL
|
||||
|
||||
export const createCloudTask = async (
|
||||
taskHandlerUrl: string | undefined,
|
||||
payload: unknown,
|
||||
requestHeaders?: Record<string, string>
|
||||
) => {
|
||||
const queue = 'omnivore-import-queue'
|
||||
const location = process.env.GCP_LOCATION
|
||||
const project = process.env.GCP_PROJECT_ID
|
||||
const taskHandlerUrl = process.env.CONTENT_FETCH_GCF_URL
|
||||
|
||||
if (!project || !location || !queue || !taskHandlerUrl) {
|
||||
throw `Environment not configured: ${project}, ${location}, ${queue}, ${taskHandlerUrl}`
|
||||
@ -40,6 +45,7 @@ export const createCloudTask = async (payload: TaskPayload) => {
|
||||
url: taskHandlerUrl,
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
...requestHeaders,
|
||||
},
|
||||
body,
|
||||
...(serviceAccountEmail
|
||||
|
||||
Reference in New Issue
Block a user