Change around redis initialization so we can pull in secrets from secret manager async on startup
This commit is contained in:
@ -4,40 +4,63 @@
|
||||
/* eslint-disable @typescript-eslint/no-misused-promises */
|
||||
import express, { Express } from 'express'
|
||||
import { appDataSource } from './data_source'
|
||||
import { env } from './env'
|
||||
import { redisClient, mqRedisClient } from './redis'
|
||||
|
||||
import { Worker, Job, QueueEvents } from 'bullmq'
|
||||
import { loadEnvFromGCPSecrets } from './gcp-utils'
|
||||
import { getEnv } from './util'
|
||||
import { redisDataSource } from './redis_data_source'
|
||||
import { CustomTypeOrmLogger } from './utils/logger'
|
||||
import { SnakeNamingStrategy } from 'typeorm-naming-strategies'
|
||||
import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds'
|
||||
import { Job, Worker, QueueEvents } from 'bullmq'
|
||||
import { refreshFeed } from './jobs/rss/refreshFeed'
|
||||
|
||||
export const QUEUE_NAME = 'omnivore-backend-queue'
|
||||
|
||||
const main = async () => {
|
||||
console.log('calling queue-processor start')
|
||||
console.log('[queue-processor]: starting queue processor')
|
||||
|
||||
let env = (await loadEnvFromGCPSecrets()) ?? getEnv()
|
||||
const app: Express = express()
|
||||
const port = process.env.PORT || 3002
|
||||
|
||||
await appDataSource.initialize()
|
||||
redisDataSource.setOptions({
|
||||
REDIS_URL: env.redis.url,
|
||||
REDIS_CERT: env.redis.cert,
|
||||
})
|
||||
|
||||
appDataSource.setOptions({
|
||||
type: 'postgres',
|
||||
host: env.pg.host,
|
||||
port: env.pg.port,
|
||||
schema: 'omnivore',
|
||||
username: env.pg.userName,
|
||||
password: env.pg.password,
|
||||
database: env.pg.dbName,
|
||||
logging: ['query', 'info'],
|
||||
entities: [__dirname + '/entity/**/*{.js,.ts}'],
|
||||
subscribers: [__dirname + '/events/**/*{.js,.ts}'],
|
||||
namingStrategy: new SnakeNamingStrategy(),
|
||||
logger: new CustomTypeOrmLogger(['query', 'info']),
|
||||
connectTimeoutMS: 40000, // 40 seconds
|
||||
maxQueryExecutionTime: 10000, // 10 seconds
|
||||
})
|
||||
|
||||
// respond healthy to auto-scaler.
|
||||
app.get('/_ah/health', (req, res) => res.sendStatus(200))
|
||||
|
||||
// redis is optional
|
||||
if (env.redis.url) {
|
||||
redisClient.on('error', (err) => {
|
||||
console.error('Redis Client Error', err)
|
||||
})
|
||||
const server = app.listen(port, () => {
|
||||
console.log(`[queue-processor]: started`)
|
||||
})
|
||||
|
||||
await redisClient.connect()
|
||||
console.log('Redis Client Connected:', env.redis.url)
|
||||
}
|
||||
// This is done after all the setup so it can access the
|
||||
// environment that was loaded from GCP
|
||||
await appDataSource.initialize()
|
||||
await redisDataSource.initialize()
|
||||
|
||||
// redis for message queue
|
||||
if (env.redis.url) {
|
||||
mqRedisClient?.on('error', (err) => {
|
||||
console.error('Redis Client Error', err)
|
||||
})
|
||||
const redisClient = redisDataSource.redisClient
|
||||
const ioRedisClient = redisDataSource.ioRedisClient
|
||||
|
||||
if (!redisClient || !ioRedisClient) {
|
||||
throw '[queue-processor] error redis is not initialized'
|
||||
}
|
||||
|
||||
const worker = new Worker(
|
||||
@ -45,7 +68,7 @@ const main = async () => {
|
||||
async (job: Job) => {
|
||||
switch (job.name) {
|
||||
case 'refresh-all-feeds': {
|
||||
return await refreshAllFeeds(appDataSource, mqRedisClient)
|
||||
return await refreshAllFeeds(appDataSource, ioRedisClient)
|
||||
}
|
||||
case 'refresh-feed': {
|
||||
return await refreshFeed(redisClient, job.data)
|
||||
@ -54,12 +77,12 @@ const main = async () => {
|
||||
return true
|
||||
},
|
||||
{
|
||||
connection: mqRedisClient,
|
||||
connection: ioRedisClient,
|
||||
}
|
||||
)
|
||||
|
||||
const queueEvents = new QueueEvents(QUEUE_NAME, {
|
||||
connection: mqRedisClient,
|
||||
connection: ioRedisClient,
|
||||
})
|
||||
|
||||
queueEvents.on('added', async (job) => {
|
||||
@ -76,18 +99,18 @@ const main = async () => {
|
||||
|
||||
const gracefulShutdown = async (signal: string) => {
|
||||
console.log(`[queue-processor]: Received ${signal}, closing server...`)
|
||||
await worker.close()
|
||||
await redisClient.disconnect()
|
||||
mqRedisClient.disconnect()
|
||||
process.exit(0)
|
||||
server.close(async () => {
|
||||
console.log(
|
||||
'[queue-processor]: Server closed. shuting down and exiting process...'
|
||||
)
|
||||
await worker.close()
|
||||
await redisDataSource.shutdown()
|
||||
process.exit(0)
|
||||
})
|
||||
}
|
||||
|
||||
process.on('SIGINT', () => gracefulShutdown('SIGINT'))
|
||||
process.on('SIGTERM', () => gracefulShutdown('SIGTERM'))
|
||||
|
||||
app.listen(port, () => {
|
||||
console.log(`[queue-processor]: started`)
|
||||
})
|
||||
}
|
||||
|
||||
// only call main if the file was called from the CLI and wasn't required from another module
|
||||
|
||||
Reference in New Issue
Block a user