feat: send email notification once the import has finished

This commit is contained in:
Hongbo Wu
2023-05-25 13:43:33 +08:00
parent 19facec4e2
commit 67014e68fd
5 changed files with 96 additions and 20 deletions

View File

@ -6,6 +6,7 @@
import { parse } from '@fast-csv/parse'
import { Stream } from 'stream'
import { ImportContext } from '.'
import { ImportStatus, updateMetrics } from './metrics'
export const importCsv = async (ctx: ImportContext, stream: Stream) => {
const parser = parse()
@ -23,11 +24,39 @@ export const importCsv = async (ctx: ImportContext, stream: Stream) => {
.map((l) => l.trim())
.filter((l) => l !== '')
: undefined
// update total counter
await updateMetrics(
ctx.redisClient,
ctx.userId,
ctx.taskId,
ImportStatus.TOTAL,
ctx.source
)
await ctx.urlHandler(ctx, url, state, labels)
ctx.countImported += 1
// update started counter
await updateMetrics(
ctx.redisClient,
ctx.userId,
ctx.taskId,
ImportStatus.STARTED,
ctx.source
)
} catch (error) {
console.log('invalid url', row, error)
ctx.countFailed += 1
// update invalid counter
await updateMetrics(
ctx.redisClient,
ctx.userId,
ctx.taskId,
ImportStatus.INVALID,
ctx.source
)
}
}
}

View File

@ -5,12 +5,18 @@ import axios from 'axios'
import * as jwt from 'jsonwebtoken'
import { Stream } from 'node:stream'
import * as path from 'path'
import { createClient } from 'redis'
import { promisify } from 'util'
import { v4 as uuid } from 'uuid'
import { importCsv } from './csv'
import { importMatterArchive } from './matterHistory'
import { createMetrics } from './metrics'
import { createRedisClient } from './redis'
import { CONTENT_FETCH_URL, createCloudTask, emailUserUrl } from './task'
// explicitly create the return type of RedisClient
type RedisClient = ReturnType<typeof createClient>
export enum ArticleSavingRequestStatus {
Failed = 'FAILED',
Processing = 'PROCESSING',
@ -51,6 +57,9 @@ export type ImportContext = {
countFailed: number
urlHandler: UrlHandler
contentHandler: ContentHandler
redisClient: RedisClient
taskId: string
source: string
}
type importHandlerFunc = (ctx: ImportContext, stream: Stream) => Promise<void>
@ -118,14 +127,27 @@ const sendImportFailedEmail = async (userId: string) => {
})
}
const sendImportCompletedEmail = async (
export const sendImportStartedEmail = async (
userId: string,
urlsEnqueued: number,
urlsFailed: number
) => {
return createEmailCloudTask(userId, {
subject: 'Your Omnivore import has completed processing',
body: `${urlsEnqueued} URLs have been processed and should be available in your library. ${urlsFailed} URLs failed to be parsed.`,
subject: 'Your Omnivore import has started',
body: `We have started processing ${urlsEnqueued} URLs. ${urlsFailed} URLs are invalid.`,
})
}
export const sendImportCompletedEmail = async (
userId: string,
urlsImported: number,
urlsFailed: number
) => {
return createEmailCloudTask(userId, {
subject: 'Your Omnivore import has finished',
body: `We have finished processing ${
urlsImported + urlsFailed
} URLs. ${urlsImported} URLs have been added to your library. ${urlsFailed} URLs failed to be parsed.`,
})
}
@ -222,7 +244,7 @@ const contentHandler = async (
return Promise.resolve()
}
const handleEvent = async (data: StorageEvent) => {
const handleEvent = async (data: StorageEvent, redisClient: RedisClient) => {
if (shouldHandle(data)) {
const handler = handlerForFile(data.name)
if (!handler) {
@ -253,12 +275,18 @@ const handleEvent = async (data: StorageEvent) => {
countFailed: 0,
urlHandler,
contentHandler,
redisClient,
taskId: data.name,
source: 'csv-importer',
}
// create metrics in redis
await createMetrics(redisClient, ctx.userId, ctx.taskId, ctx.source)
await handler(ctx, stream)
if (ctx.countImported > 0) {
await sendImportCompletedEmail(userId, ctx.countImported, ctx.countFailed)
await sendImportStartedEmail(userId, ctx.countImported, ctx.countFailed)
} else {
await sendImportFailedEmail(userId)
}
@ -285,11 +313,19 @@ export const importHandler = Sentry.GCPFunction.wrapHttpFunction(
const pubSubMessage = req.body.message.data as string
const obj = getStorageEvent(pubSubMessage)
if (obj) {
// create redis client
const redisClient = await createRedisClient(
process.env.REDIS_URL,
process.env.REDIS_CERT
)
try {
await handleEvent(obj)
await handleEvent(obj, redisClient)
} catch (err) {
console.log('error handling event', { err, obj })
throw err
} finally {
// close redis client
await redisClient.quit()
}
}
} else {

View File

@ -27,5 +27,8 @@ if (status == "imported" or status == "failed") then
-- all the records have been processed
-- update the metrics
redis.call('HSET', key, 'end_time', timestamp, 'state', 1);
return 1
end
end
return 0;

View File

@ -1,10 +1,11 @@
import { createClient } from 'redis'
import { sendImportCompletedEmail } from '.'
import { lua } from './redis'
// explicitly create the return type of RedisClient
type RedisClient = ReturnType<typeof createClient>
enum ImportStatus {
export enum ImportStatus {
STARTED = 'started',
INVALID = 'invalid',
IMPORTED = 'imported',
@ -23,24 +24,24 @@ interface ImportMetrics {
imported: number
failed: number
total: number
importer: string
source: string
state: ImportTaskState
startTime: number
endTime: number
}
export const startImport = async (
export const createMetrics = async (
redisClient: RedisClient,
userId: string,
taskId: string,
importer: string
source: string
) => {
const key = `import:${userId}:${taskId}`
try {
// set multiple fields
await redisClient.hSet(key, {
['start_time']: Date.now(), // unix timestamp in seconds
['importer']: importer,
['start_time']: Date.now(),
['source']: source,
['state']: ImportTaskState.STARTED,
})
} catch (error) {
@ -52,16 +53,25 @@ export const updateMetrics = async (
redisClient: RedisClient,
userId: string,
taskId: string,
status: ImportStatus
status: ImportStatus,
source: string
) => {
const key = `import:${userId}:${taskId}`
try {
// use lua script to increment hash field
await redisClient.evalSha(lua.sha, {
const state = await redisClient.evalSha(lua.sha, {
keys: [key],
arguments: [status, Date.now().toString()],
arguments: [status, Date.now().toString(), source],
})
// if the task is finished, send email
if (state == ImportTaskState.FINISHED) {
const metrics = await getMetrics(redisClient, userId, taskId)
if (metrics) {
await sendImportCompletedEmail(userId, metrics.imported, metrics.failed)
}
}
} catch (error) {
console.error('Redis Error', error)
}
@ -83,7 +93,7 @@ export const getMetrics = async (
imported: parseInt(metrics.imported, 10),
failed: parseInt(metrics.failed, 10),
total: parseInt(metrics.total, 10),
importer: metrics.importer,
source: metrics.source,
state: parseInt(metrics.state, 10),
startTime: parseInt(metrics.start_time, 10),
endTime: parseInt(metrics.end_time, 10),

View File

@ -224,7 +224,7 @@ const sendSavePageMutation = async (userId, input) => {
}
}`,
variables: {
input,
input: Object.assign({}, input , { source: 'puppeteer-parse' }),
},
});
@ -256,7 +256,7 @@ async function fetchContent(req, res) {
const articleSavingRequestId = (req.query ? req.query.saveRequestId : undefined) || (req.body ? req.body.saveRequestId : undefined);
const state = req.body.state
const labels = req.body.labels
const source = req.body.source || 'puppeteer-parse';
const source = req.body.source || 'parseContent';
let logRecord = {
url,
@ -343,7 +343,6 @@ async function fetchContent(req, res) {
parseResult: readabilityResult,
state,
labels,
source,
});
logRecord.totalTime = Date.now() - functionStartTime;
@ -380,7 +379,6 @@ async function fetchContent(req, res) {
parseResult: readabilityResult,
state,
labels,
source,
});
logRecord.totalTime = Date.now() - functionStartTime;