replace redis client library with ioredis
This commit is contained in:
@ -46,10 +46,10 @@
|
||||
"dompurify": "^2.4.3",
|
||||
"fs-extra": "^11.1.0",
|
||||
"glob": "^8.1.0",
|
||||
"ioredis": "^5.3.2",
|
||||
"jsonwebtoken": "^8.5.1",
|
||||
"linkedom": "^0.14.21",
|
||||
"nodemon": "^2.0.15",
|
||||
"redis": "^4.3.1",
|
||||
"unzip-stream": "^0.3.1",
|
||||
"urlsafe-base64": "^1.0.0",
|
||||
"uuid": "^9.0.0"
|
||||
|
||||
@ -2,10 +2,10 @@ import { Storage } from '@google-cloud/storage'
|
||||
import { Readability } from '@omnivore/readability'
|
||||
import * as Sentry from '@sentry/serverless'
|
||||
import axios from 'axios'
|
||||
import Redis from 'ioredis'
|
||||
import * as jwt from 'jsonwebtoken'
|
||||
import { Stream } from 'node:stream'
|
||||
import * as path from 'path'
|
||||
import { createClient } from 'redis'
|
||||
import { promisify } from 'util'
|
||||
import { v4 as uuid } from 'uuid'
|
||||
import { importCsv } from './csv'
|
||||
@ -14,9 +14,6 @@ import { ImportStatus, updateMetrics } from './metrics'
|
||||
import { createRedisClient } from './redis'
|
||||
import { CONTENT_FETCH_URL, createCloudTask, emailUserUrl } from './task'
|
||||
|
||||
// explicitly create the return type of RedisClient
|
||||
type RedisClient = ReturnType<typeof createClient>
|
||||
|
||||
export enum ArticleSavingRequestStatus {
|
||||
Failed = 'FAILED',
|
||||
Processing = 'PROCESSING',
|
||||
@ -59,7 +56,7 @@ export type ImportContext = {
|
||||
countFailed: number
|
||||
urlHandler: UrlHandler
|
||||
contentHandler: ContentHandler
|
||||
redisClient: RedisClient
|
||||
redisClient: Redis
|
||||
taskId: string
|
||||
source: string
|
||||
}
|
||||
@ -300,7 +297,7 @@ const contentHandler = async (
|
||||
return Promise.resolve()
|
||||
}
|
||||
|
||||
const handleEvent = async (data: StorageEvent, redisClient: RedisClient) => {
|
||||
const handleEvent = async (data: StorageEvent, redisClient: Redis) => {
|
||||
if (shouldHandle(data)) {
|
||||
const handler = handlerForFile(data.name)
|
||||
if (!handler) {
|
||||
@ -367,7 +364,7 @@ export const importHandler = Sentry.GCPFunction.wrapHttpFunction(
|
||||
const obj = getStorageEvent(pubSubMessage)
|
||||
if (obj) {
|
||||
// create redis client
|
||||
const redisClient = await createRedisClient(
|
||||
const redisClient = createRedisClient(
|
||||
process.env.REDIS_URL,
|
||||
process.env.REDIS_CERT
|
||||
)
|
||||
@ -416,7 +413,7 @@ export const importMetricsCollector = Sentry.GCPFunction.wrapHttpFunction(
|
||||
return res.status(400).send('Bad Request')
|
||||
}
|
||||
|
||||
const redisClient = await createRedisClient(
|
||||
const redisClient = createRedisClient(
|
||||
process.env.REDIS_URL,
|
||||
process.env.REDIS_CERT
|
||||
)
|
||||
|
||||
@ -1,10 +1,7 @@
|
||||
import { createClient } from 'redis'
|
||||
import Redis from 'ioredis'
|
||||
import { sendImportCompletedEmail } from '.'
|
||||
import { lua } from './redis'
|
||||
|
||||
// explicitly create the return type of RedisClient
|
||||
type RedisClient = ReturnType<typeof createClient>
|
||||
|
||||
export enum ImportStatus {
|
||||
STARTED = 'started',
|
||||
INVALID = 'invalid',
|
||||
@ -31,7 +28,7 @@ interface ImportMetrics {
|
||||
}
|
||||
|
||||
export const createMetrics = async (
|
||||
redisClient: RedisClient,
|
||||
redisClient: Redis,
|
||||
userId: string,
|
||||
taskId: string,
|
||||
source: string
|
||||
@ -39,7 +36,7 @@ export const createMetrics = async (
|
||||
const key = `import:${userId}:${taskId}`
|
||||
try {
|
||||
// set multiple fields
|
||||
await redisClient.hSet(key, {
|
||||
await redisClient.hset(key, {
|
||||
['start_time']: Date.now(),
|
||||
['source']: source,
|
||||
['state']: ImportTaskState.STARTED,
|
||||
@ -50,7 +47,7 @@ export const createMetrics = async (
|
||||
}
|
||||
|
||||
export const updateMetrics = async (
|
||||
redisClient: RedisClient,
|
||||
redisClient: Redis,
|
||||
userId: string,
|
||||
taskId: string,
|
||||
status: ImportStatus
|
||||
@ -59,10 +56,13 @@ export const updateMetrics = async (
|
||||
|
||||
try {
|
||||
// use lua script to increment hash field
|
||||
const state = await redisClient.evalSha(lua.sha, {
|
||||
keys: [key],
|
||||
arguments: [status, Date.now().toString()],
|
||||
})
|
||||
const state = await redisClient.evalsha(
|
||||
lua.sha,
|
||||
1,
|
||||
key,
|
||||
status,
|
||||
Date.now().toString()
|
||||
)
|
||||
|
||||
// if the task is finished, send email
|
||||
if (state == ImportTaskState.FINISHED) {
|
||||
@ -77,13 +77,13 @@ export const updateMetrics = async (
|
||||
}
|
||||
|
||||
export const getMetrics = async (
|
||||
redisClient: RedisClient,
|
||||
redisClient: Redis,
|
||||
userId: string,
|
||||
taskId: string
|
||||
): Promise<ImportMetrics | null> => {
|
||||
const key = `import:${userId}:${taskId}`
|
||||
try {
|
||||
const metrics = await redisClient.hGetAll(key)
|
||||
const metrics = await redisClient.hgetall(key)
|
||||
|
||||
return {
|
||||
// convert to integer
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import { readFileSync } from 'fs'
|
||||
import { Redis } from 'ioredis'
|
||||
import path from 'path'
|
||||
import { createClient } from 'redis'
|
||||
|
||||
// load lua script
|
||||
export const lua = {
|
||||
@ -11,31 +11,35 @@ export const lua = {
|
||||
sha: '',
|
||||
}
|
||||
|
||||
export const createRedisClient = async (url?: string, cert?: string) => {
|
||||
const redisClient = createClient({
|
||||
url,
|
||||
socket: {
|
||||
tls: url?.startsWith('rediss://'), // rediss:// is the protocol for TLS
|
||||
cert: cert?.replace(/\\n/g, '\n'), // replace \n with new line
|
||||
rejectUnauthorized: false, // for self-signed certs
|
||||
connectTimeout: 10000, // 10 seconds
|
||||
reconnectStrategy(retries: number): number | Error {
|
||||
if (retries > 10) {
|
||||
return new Error('Retries exhausted')
|
||||
export const createRedisClient = (url?: string, cert?: string) => {
|
||||
return new Redis(url || 'redis://localhost:6379', {
|
||||
connectTimeout: 10000, // 10 seconds
|
||||
tls: cert
|
||||
? {
|
||||
cert,
|
||||
rejectUnauthorized: false, // for self-signed certs
|
||||
}
|
||||
return 1000
|
||||
},
|
||||
: undefined,
|
||||
reconnectOnError: (err) => {
|
||||
const targetErrors = [/READONLY/, /ETIMEDOUT/]
|
||||
|
||||
targetErrors.forEach((targetError) => {
|
||||
if (targetError.test(err.message)) {
|
||||
// Only reconnect when the error contains the keyword
|
||||
return true
|
||||
}
|
||||
})
|
||||
|
||||
return false
|
||||
},
|
||||
retryStrategy: (times) => {
|
||||
if (times > 10) {
|
||||
// End reconnecting after a specific number of tries and flush all commands with a individual error
|
||||
return null
|
||||
}
|
||||
|
||||
// reconnect after
|
||||
return Math.min(times * 50, 2000)
|
||||
},
|
||||
})
|
||||
|
||||
redisClient.on('error', (err) => console.error('Redis Client Error', err))
|
||||
|
||||
await redisClient.connect()
|
||||
console.log('Redis Client Connected:', url)
|
||||
|
||||
// load script to redis
|
||||
lua.sha = await redisClient.scriptLoad(lua.script)
|
||||
console.log('Redis Lua Script Loaded', lua.sha)
|
||||
|
||||
return redisClient
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user