feat: add importer metrics collector

This commit is contained in:
Hongbo Wu
2023-05-25 12:15:05 +08:00
parent d69e732579
commit 19facec4e2
6 changed files with 224 additions and 4 deletions

View File

@ -12,9 +12,10 @@
"test": "yarn mocha -r ts-node/register --config mocha-config.json",
"lint": "eslint src --ext ts,js,tsx,jsx",
"compile": "tsc",
"build": "tsc",
"build": "tsc && yarn copy-files",
"start": "functions-framework --target=importHandler",
"dev": "concurrently \"tsc -w\" \"nodemon --watch ./build/ --exec npm run start\""
"dev": "concurrently \"tsc -w\" \"nodemon --watch ./build/ --exec npm run start\"",
"copy-files": "copyfiles -u 1 src/**/*.lua build/src"
},
"devDependencies": {
"@types/chai": "^4.3.4",
@ -28,6 +29,7 @@
"@types/unzip-stream": "^0.3.1",
"@types/urlsafe-base64": "^1.0.28",
"@types/uuid": "^9.0.0",
"copyfiles": "^2.4.1",
"eslint-plugin-prettier": "^4.0.0"
},
"dependencies": {
@ -46,6 +48,7 @@
"jsonwebtoken": "^8.5.1",
"linkedom": "^0.14.21",
"nodemon": "^2.0.15",
"redis": "^4.6.6",
"unzip-stream": "^0.3.1",
"urlsafe-base64": "^1.0.0",
"uuid": "^9.0.0"

View File

@ -0,0 +1,31 @@
local key = tostring(KEYS[1]);
local status = tostring(ARGV[1]);
local timestamp = tonumber(ARGV[2]);
-- increment the status counter
redis.call('HINCRBY', key, status, 1);
if (status == "imported" or status == "failed") then
-- get the current metrics
local bulk = redis.call('HGETALL', key);
-- get the total, imported and failed counters
local result = {}
local nextkey
for i, v in ipairs(bulk) do
if i % 2 == 1 then
nextkey = v
else
result[nextkey] = v
end
end
local imported = tonumber(result['imported']) or 0;
local failed = tonumber(result['failed']) or 0;
local total = tonumber(result['total']) or 0;
local state = tonumber(result['state']) or 0;
if (state == 0 and imported + failed >= total) then
-- all the records have been processed
-- update the metrics
redis.call('HSET', key, 'end_time', timestamp, 'state', 1);
end
end

View File

@ -0,0 +1,95 @@
import { createClient } from 'redis'
import { lua } from './redis'
// explicitly create the return type of RedisClient
type RedisClient = ReturnType<typeof createClient>
enum ImportStatus {
STARTED = 'started',
INVALID = 'invalid',
IMPORTED = 'imported',
FAILED = 'failed',
TOTAL = 'total',
}
enum ImportTaskState {
STARTED,
FINISHED,
}
interface ImportMetrics {
started: number
invalid: number
imported: number
failed: number
total: number
importer: string
state: ImportTaskState
startTime: number
endTime: number
}
export const startImport = async (
redisClient: RedisClient,
userId: string,
taskId: string,
importer: string
) => {
const key = `import:${userId}:${taskId}`
try {
// set multiple fields
await redisClient.hSet(key, {
['start_time']: Date.now(), // unix timestamp in seconds
['importer']: importer,
['state']: ImportTaskState.STARTED,
})
} catch (error) {
console.error('Redis Error', error)
}
}
export const updateMetrics = async (
redisClient: RedisClient,
userId: string,
taskId: string,
status: ImportStatus
) => {
const key = `import:${userId}:${taskId}`
try {
// use lua script to increment hash field
await redisClient.evalSha(lua.sha, {
keys: [key],
arguments: [status, Date.now().toString()],
})
} catch (error) {
console.error('Redis Error', error)
}
}
export const getMetrics = async (
redisClient: RedisClient,
userId: string,
taskId: string
): Promise<ImportMetrics | null> => {
const key = `import:${userId}:${taskId}`
try {
const metrics = await redisClient.hGetAll(key)
return {
// convert to integer
started: parseInt(metrics.started, 10),
invalid: parseInt(metrics.invalid, 10),
imported: parseInt(metrics.imported, 10),
failed: parseInt(metrics.failed, 10),
total: parseInt(metrics.total, 10),
importer: metrics.importer,
state: parseInt(metrics.state, 10),
startTime: parseInt(metrics.start_time, 10),
endTime: parseInt(metrics.end_time, 10),
}
} catch (error) {
console.error('Redis Error', error)
return null
}
}

View File

@ -0,0 +1,37 @@
import fs from 'fs'
import { createClient } from 'redis'
// load lua script
export const lua = {
script: fs.readFileSync('./luaScripts/updateMetrics.lua', 'utf8'),
sha: '',
}
export const createRedisClient = async (url?: string, cert?: string) => {
const redisClient = createClient({
url,
socket: {
tls: url?.startsWith('rediss://'), // rediss:// is the protocol for TLS
cert: cert?.replace(/\\n/g, '\n'), // replace \n with new line
rejectUnauthorized: false, // for self-signed certs
connectTimeout: 10000, // 10 seconds
reconnectStrategy(retries: number): number | Error {
if (retries > 10) {
return new Error('Retries exhausted')
}
return 1000
},
},
})
redisClient.on('error', (err) => console.error('Redis Client Error', err))
await redisClient.connect()
console.log('Redis Client Connected:', url)
// load script to redis
lua.sha = await redisClient.scriptLoad(lua.script)
console.log('Redis Lua Script Loaded', lua.sha)
return redisClient
}