remove stub job

This commit is contained in:
Hongbo Wu
2024-04-12 18:51:42 +08:00
parent 111dcd1562
commit bc955fbeeb
2 changed files with 32 additions and 34 deletions

View File

@ -3,12 +3,11 @@ import express from 'express'
import { TaskState } from '../generated/graphql'
import { CREATE_DIGEST_JOB } from '../jobs/create_digest'
import { createJobId, getJob, jobStateToTaskState } from '../queue-processor'
import { getDigest } from '../services/digest'
import { getDigest, setDigest } from '../services/digest'
import { findActiveUser } from '../services/user'
import { analytics } from '../utils/analytics'
import { getClaimsByToken, getTokenByRequest } from '../utils/auth'
import { corsConfig } from '../utils/corsConfig'
import { enqueueCreateDigest } from '../utils/createTask'
import { logger } from '../utils/logger'
interface Feedback {
@ -62,23 +61,24 @@ export function digestRouter() {
return res.sendStatus(401)
}
// check if job is already in queue
// check if digest job is running
// if yes then return 202 accepted
// else enqueue job
const jobId = createJobId(CREATE_DIGEST_JOB, userId)
const existingJob = await getJob(jobId)
if (existingJob) {
logger.info(`Job already in queue: ${jobId}`)
const digest = await getDigest(userId)
if (digest?.jobState === TaskState.Running) {
logger.info(`job is running: ${userId}`)
return res.sendStatus(202)
}
// enqueue job and return job id
const result = await enqueueCreateDigest({
userId,
const jobId = await setDigest(userId, {
jobState: TaskState.Running,
})
// return job id
return res.status(201).send(result)
return res.status(201).send({
jobId,
})
} catch (error) {
logger.error('Error while enqueuing create digest task', error)
return res.sendStatus(500)
@ -112,31 +112,22 @@ export function digestRouter() {
return res.sendStatus(401)
}
// get job by user id
const jobId = createJobId(CREATE_DIGEST_JOB, userId)
const job = await getJob(jobId)
if (job) {
// if job is in queue then return job state
const jobState = await job.getState()
return res.send({
jobId: job.id,
jobState: jobStateToTaskState(jobState),
})
}
// if job is done and removed then get the digest from redis
// get digest from redis
const digest = await getDigest(userId)
if (!digest) {
logger.info(`Digest not found: ${userId}`)
return res.sendStatus(404)
}
if (digest.jobState === TaskState.Running) {
// if job is in queue then return job state
return res.status(202).send({
jobState: TaskState.Running,
})
}
// return digest
return res.send({
...digest,
jobId,
jobState: TaskState.Succeeded,
})
return res.send(digest)
} catch (error) {
logger.error('Error while getting digest', error)
return res.sendStatus(500)

View File

@ -1,11 +1,12 @@
import { redisDataSource } from '../redis_data_source'
export interface Digest {
url: string
title: string
content: string
chapters: Chapter[]
urlsToAudio: string[]
url?: string
title?: string
content?: string
chapters?: Chapter[]
urlsToAudio?: string[]
jobState: string
}
interface Chapter {
@ -19,7 +20,11 @@ export const getDigest = async (userId: string): Promise<Digest | null> => {
return digest ? (JSON.parse(digest) as Digest) : null
}
export const setDigest = async (userId: string, digest: Digest) => {
export const setDigest = async (
userId: string,
digest: Digest
): Promise<string> => {
const key = digestKey(userId)
const result = await redisDataSource.redisClient?.set(
digestKey(userId),
JSON.stringify(digest),
@ -30,4 +35,6 @@ export const setDigest = async (userId: string, digest: Digest) => {
if (result != 'OK') {
throw new Error('Failed to set digest')
}
return key
}