From 67014e68fd2b82258ad3c69ae9279c7ed4cb603b Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Thu, 25 May 2023 13:43:33 +0800 Subject: [PATCH] feat: send email notification once the import has finished --- packages/import-handler/src/csv.ts | 29 +++++++++++ packages/import-handler/src/index.ts | 48 ++++++++++++++++--- .../src/luaScripts/updateMetrics.lua | 3 ++ packages/import-handler/src/metrics.ts | 30 ++++++++---- packages/puppeteer-parse/index.js | 6 +-- 5 files changed, 96 insertions(+), 20 deletions(-) diff --git a/packages/import-handler/src/csv.ts b/packages/import-handler/src/csv.ts index 140cf00f0..2ff059735 100644 --- a/packages/import-handler/src/csv.ts +++ b/packages/import-handler/src/csv.ts @@ -6,6 +6,7 @@ import { parse } from '@fast-csv/parse' import { Stream } from 'stream' import { ImportContext } from '.' +import { ImportStatus, updateMetrics } from './metrics' export const importCsv = async (ctx: ImportContext, stream: Stream) => { const parser = parse() @@ -23,11 +24,39 @@ export const importCsv = async (ctx: ImportContext, stream: Stream) => { .map((l) => l.trim()) .filter((l) => l !== '') : undefined + + // update total counter + await updateMetrics( + ctx.redisClient, + ctx.userId, + ctx.taskId, + ImportStatus.TOTAL, + ctx.source + ) + await ctx.urlHandler(ctx, url, state, labels) + ctx.countImported += 1 + // update started counter + await updateMetrics( + ctx.redisClient, + ctx.userId, + ctx.taskId, + ImportStatus.STARTED, + ctx.source + ) } catch (error) { console.log('invalid url', row, error) + ctx.countFailed += 1 + // update invalid counter + await updateMetrics( + ctx.redisClient, + ctx.userId, + ctx.taskId, + ImportStatus.INVALID, + ctx.source + ) } } } diff --git a/packages/import-handler/src/index.ts b/packages/import-handler/src/index.ts index 89b469ad1..53d02e926 100644 --- a/packages/import-handler/src/index.ts +++ b/packages/import-handler/src/index.ts @@ -5,12 +5,18 @@ import axios from 'axios' import * as jwt from 'jsonwebtoken' import { Stream } from 'node:stream' import * as path from 'path' +import { createClient } from 'redis' import { promisify } from 'util' import { v4 as uuid } from 'uuid' import { importCsv } from './csv' import { importMatterArchive } from './matterHistory' +import { createMetrics } from './metrics' +import { createRedisClient } from './redis' import { CONTENT_FETCH_URL, createCloudTask, emailUserUrl } from './task' +// explicitly create the return type of RedisClient +type RedisClient = ReturnType + export enum ArticleSavingRequestStatus { Failed = 'FAILED', Processing = 'PROCESSING', @@ -51,6 +57,9 @@ export type ImportContext = { countFailed: number urlHandler: UrlHandler contentHandler: ContentHandler + redisClient: RedisClient + taskId: string + source: string } type importHandlerFunc = (ctx: ImportContext, stream: Stream) => Promise @@ -118,14 +127,27 @@ const sendImportFailedEmail = async (userId: string) => { }) } -const sendImportCompletedEmail = async ( +export const sendImportStartedEmail = async ( userId: string, urlsEnqueued: number, urlsFailed: number ) => { return createEmailCloudTask(userId, { - subject: 'Your Omnivore import has completed processing', - body: `${urlsEnqueued} URLs have been processed and should be available in your library. ${urlsFailed} URLs failed to be parsed.`, + subject: 'Your Omnivore import has started', + body: `We have started processing ${urlsEnqueued} URLs. ${urlsFailed} URLs are invalid.`, + }) +} + +export const sendImportCompletedEmail = async ( + userId: string, + urlsImported: number, + urlsFailed: number +) => { + return createEmailCloudTask(userId, { + subject: 'Your Omnivore import has finished', + body: `We have finished processing ${ + urlsImported + urlsFailed + } URLs. ${urlsImported} URLs have been added to your library. ${urlsFailed} URLs failed to be parsed.`, }) } @@ -222,7 +244,7 @@ const contentHandler = async ( return Promise.resolve() } -const handleEvent = async (data: StorageEvent) => { +const handleEvent = async (data: StorageEvent, redisClient: RedisClient) => { if (shouldHandle(data)) { const handler = handlerForFile(data.name) if (!handler) { @@ -253,12 +275,18 @@ const handleEvent = async (data: StorageEvent) => { countFailed: 0, urlHandler, contentHandler, + redisClient, + taskId: data.name, + source: 'csv-importer', } + // create metrics in redis + await createMetrics(redisClient, ctx.userId, ctx.taskId, ctx.source) + await handler(ctx, stream) if (ctx.countImported > 0) { - await sendImportCompletedEmail(userId, ctx.countImported, ctx.countFailed) + await sendImportStartedEmail(userId, ctx.countImported, ctx.countFailed) } else { await sendImportFailedEmail(userId) } @@ -285,11 +313,19 @@ export const importHandler = Sentry.GCPFunction.wrapHttpFunction( const pubSubMessage = req.body.message.data as string const obj = getStorageEvent(pubSubMessage) if (obj) { + // create redis client + const redisClient = await createRedisClient( + process.env.REDIS_URL, + process.env.REDIS_CERT + ) try { - await handleEvent(obj) + await handleEvent(obj, redisClient) } catch (err) { console.log('error handling event', { err, obj }) throw err + } finally { + // close redis client + await redisClient.quit() } } } else { diff --git a/packages/import-handler/src/luaScripts/updateMetrics.lua b/packages/import-handler/src/luaScripts/updateMetrics.lua index 94c45e1ab..4e7b08f26 100644 --- a/packages/import-handler/src/luaScripts/updateMetrics.lua +++ b/packages/import-handler/src/luaScripts/updateMetrics.lua @@ -27,5 +27,8 @@ if (status == "imported" or status == "failed") then -- all the records have been processed -- update the metrics redis.call('HSET', key, 'end_time', timestamp, 'state', 1); + return 1 end end + +return 0; diff --git a/packages/import-handler/src/metrics.ts b/packages/import-handler/src/metrics.ts index 7be5edff5..75086639d 100644 --- a/packages/import-handler/src/metrics.ts +++ b/packages/import-handler/src/metrics.ts @@ -1,10 +1,11 @@ import { createClient } from 'redis' +import { sendImportCompletedEmail } from '.' import { lua } from './redis' // explicitly create the return type of RedisClient type RedisClient = ReturnType -enum ImportStatus { +export enum ImportStatus { STARTED = 'started', INVALID = 'invalid', IMPORTED = 'imported', @@ -23,24 +24,24 @@ interface ImportMetrics { imported: number failed: number total: number - importer: string + source: string state: ImportTaskState startTime: number endTime: number } -export const startImport = async ( +export const createMetrics = async ( redisClient: RedisClient, userId: string, taskId: string, - importer: string + source: string ) => { const key = `import:${userId}:${taskId}` try { // set multiple fields await redisClient.hSet(key, { - ['start_time']: Date.now(), // unix timestamp in seconds - ['importer']: importer, + ['start_time']: Date.now(), + ['source']: source, ['state']: ImportTaskState.STARTED, }) } catch (error) { @@ -52,16 +53,25 @@ export const updateMetrics = async ( redisClient: RedisClient, userId: string, taskId: string, - status: ImportStatus + status: ImportStatus, + source: string ) => { const key = `import:${userId}:${taskId}` try { // use lua script to increment hash field - await redisClient.evalSha(lua.sha, { + const state = await redisClient.evalSha(lua.sha, { keys: [key], - arguments: [status, Date.now().toString()], + arguments: [status, Date.now().toString(), source], }) + + // if the task is finished, send email + if (state == ImportTaskState.FINISHED) { + const metrics = await getMetrics(redisClient, userId, taskId) + if (metrics) { + await sendImportCompletedEmail(userId, metrics.imported, metrics.failed) + } + } } catch (error) { console.error('Redis Error', error) } @@ -83,7 +93,7 @@ export const getMetrics = async ( imported: parseInt(metrics.imported, 10), failed: parseInt(metrics.failed, 10), total: parseInt(metrics.total, 10), - importer: metrics.importer, + source: metrics.source, state: parseInt(metrics.state, 10), startTime: parseInt(metrics.start_time, 10), endTime: parseInt(metrics.end_time, 10), diff --git a/packages/puppeteer-parse/index.js b/packages/puppeteer-parse/index.js index b25498790..405f0a219 100644 --- a/packages/puppeteer-parse/index.js +++ b/packages/puppeteer-parse/index.js @@ -224,7 +224,7 @@ const sendSavePageMutation = async (userId, input) => { } }`, variables: { - input, + input: Object.assign({}, input , { source: 'puppeteer-parse' }), }, }); @@ -256,7 +256,7 @@ async function fetchContent(req, res) { const articleSavingRequestId = (req.query ? req.query.saveRequestId : undefined) || (req.body ? req.body.saveRequestId : undefined); const state = req.body.state const labels = req.body.labels - const source = req.body.source || 'puppeteer-parse'; + const source = req.body.source || 'parseContent'; let logRecord = { url, @@ -343,7 +343,6 @@ async function fetchContent(req, res) { parseResult: readabilityResult, state, labels, - source, }); logRecord.totalTime = Date.now() - functionStartTime; @@ -380,7 +379,6 @@ async function fetchContent(req, res) { parseResult: readabilityResult, state, labels, - source, }); logRecord.totalTime = Date.now() - functionStartTime;