dedupe repeated jobs

This commit is contained in:
Hongbo Wu
2024-04-17 15:36:16 +08:00
parent 6222d567a4
commit af184ded22
4 changed files with 40 additions and 34 deletions

View File

@ -14,6 +14,7 @@ import { env } from '../env'
import {
ArticleSavingRequestStatus,
CreateLabelInput,
TaskState,
} from '../generated/graphql'
import { AISummarizeJobData, AI_SUMMARIZE_JOB_NAME } from '../jobs/ai-summarize'
import {
@ -50,7 +51,7 @@ import {
UPDATE_HIGHLIGHT_JOB,
UPDATE_LABELS_JOB,
} from '../jobs/update_db'
import { createJobId, getBackendQueue, JOB_VERSION } from '../queue-processor'
import { getBackendQueue, JOB_VERSION } from '../queue-processor'
import { redisDataSource } from '../redis_data_source'
import { signFeatureToken } from '../services/features'
import { OmnivoreAuthorizationHeader } from './auth'
@ -58,6 +59,7 @@ import { CreateTaskError } from './errors'
import { stringToHash } from './helpers'
import { logger } from './logger'
import View = google.cloud.tasks.v2.Task.View
import { writeDigest } from '../services/digest'
// Instantiates a client.
const client = new CloudTasksClient()
@ -868,9 +870,8 @@ export const enqueueCreateDigest = async (
throw new Error('No queue found')
}
const jobId = createJobId(CREATE_DIGEST_JOB, data.userId)
const job = await queue.add(CREATE_DIGEST_JOB, data, {
jobId, // dedupe by userId
jobId: data.id, // dedupe by job id
removeOnComplete: true,
removeOnFail: true,
attempts: 3,
@ -886,8 +887,17 @@ export const enqueueCreateDigest = async (
logger.info('create digest job enqueued', { jobId: job.id })
const digest = {
id: data.id,
jobState: TaskState.Running,
}
// update digest job state in redis
await writeDigest(data.userId, digest)
return {
jobId,
jobId: digest.id,
jobState: digest.jobState,
}
}