add job_id and parent for tracking

This commit is contained in:
Hongbo Wu
2024-02-02 10:50:07 +08:00
parent 2a0e6f8fa4
commit 1bccf33320
6 changed files with 95 additions and 80 deletions

View File

@ -3,24 +3,16 @@ import { batchUpdateLibraryItems } from '../services/library_item'
export interface BatchUpdateData {
userId: string
labelIds: string[]
libraryItemIds: string[]
action: BulkActionType
size: number
labelIds?: string[]
args?: unknown
}
export const BATCH_UPDATE_JOB_NAME = 'batch-update'
export const batchUpdate = async (data: BatchUpdateData) => {
const { userId, action, labelIds, libraryItemIds, args, size } = data
const { userId, action, labelIds, libraryItemIds, args } = data
const searchArgs = {
size,
query: `in:all includes:${libraryItemIds.join()}`,
}
await batchUpdateLibraryItems(action, searchArgs, userId, labelIds, args)
return true
return batchUpdateLibraryItems(action, libraryItemIds, userId, labelIds, args)
}

View File

@ -1,6 +1,7 @@
import { BulkActionType } from '../generated/graphql'
import { getBackendQueue } from '../queue-processor'
import { searchLibraryItems } from '../services/library_item'
import { stringToHash } from '../utils/helpers'
import { logger } from '../utils/logger'
import { BATCH_UPDATE_JOB_NAME } from './batch_update'
@ -9,15 +10,19 @@ export interface BulkActionData {
userId: string
action: BulkActionType
query: string
labelIds: string[]
batchSize: number
labelIds?: string[]
args?: unknown
useFolders?: boolean
}
export const BULK_ACTION_JOB_NAME = 'bulk-action'
export const bulkAction = async (data: BulkActionData) => {
export const bulkAction = async (data: BulkActionData, id?: string) => {
if (!id) {
throw new Error('Missing id')
}
const {
userId,
action,
@ -33,7 +38,7 @@ export const bulkAction = async (data: BulkActionData) => {
if (!queue) {
throw new Error('Queue not initialized')
}
const parent = { id, queue: queue.name }
let offset = 0
do {
@ -54,12 +59,18 @@ export const bulkAction = async (data: BulkActionData) => {
args,
size: batchSize,
}
const libraryItemIdsStr = libraryItemIds.sort().join()
const jobId = `${BATCH_UPDATE_JOB_NAME}-${stringToHash(libraryItemIdsStr)}`
// enqueue job for each batch
try {
await queue.add(BATCH_UPDATE_JOB_NAME, data, {
attempts: 1,
priority: 10,
jobId, // deduplication
removeOnComplete: true,
removeOnFail: true,
parent, // for tracking
})
} catch (error) {
logger.error('Error enqueuing batch update job', error)

View File

@ -87,7 +87,7 @@ export const createWorker = (connection: ConnectionOptions) =>
case SYNC_READ_POSITIONS_JOB_NAME:
return syncReadPositionsJob(job.data)
case BULK_ACTION_JOB_NAME:
return bulkAction(job.data)
return bulkAction(job.data, job.id)
case BATCH_UPDATE_JOB_NAME:
return batchUpdate(job.data)
}

View File

@ -59,12 +59,11 @@ import {
UpdatesSinceError,
UpdatesSinceSuccess,
} from '../../generated/graphql'
import { BULK_ACTION_JOB_NAME } from '../../jobs/bulk_action'
import { getBackendQueue } from '../../queue-processor'
import { authTrx, getColumns } from '../../repository'
import { getInternalLabelWithColor } from '../../repository/label'
import { libraryItemRepository } from '../../repository/library_item'
import { userRepository } from '../../repository/user'
import { clearCachedReadingPosition } from '../../services/cached_reading_position'
import { createPageSaveRequest } from '../../services/create_page_save_request'
import { findHighlightsByLibraryItemId } from '../../services/highlights'
import {
@ -93,6 +92,7 @@ import {
import { traceAs } from '../../tracing'
import { analytics } from '../../utils/analytics'
import { isSiteBlockedForParse } from '../../utils/blocked'
import { enqueueBulkAction } from '../../utils/createTask'
import { authorized } from '../../utils/gql-utils'
import {
cleanUrl,
@ -112,10 +112,6 @@ import {
parsePreparedContent,
} from '../../utils/parser'
import { getStorageFileDetails } from '../../utils/uploads'
import {
clearCachedReadingPosition,
fetchCachedReadingPosition,
} from '../../services/cached_reading_position'
export enum ArticleFormat {
Markdown = 'markdown',
@ -879,10 +875,11 @@ export const bulkActionResolver = authorized<
},
})
const batchSize = 100
const useFolders = query.includes('use:folders')
const searchArgs = {
query,
size: 0,
size: batchSize,
useFolders,
}
const searchResult = await searchLibraryItems(searchArgs, uid)
@ -892,34 +889,35 @@ export const bulkActionResolver = authorized<
return { success: true }
}
const batchSize = 100
if (count <= batchSize) {
// if there are less than 100 items, update them synchronously
await batchUpdateLibraryItems(action, searchArgs, uid, labelIds, args)
const libraryItemIds = searchResult.libraryItems.map((item) => item.id)
await batchUpdateLibraryItems(
action,
libraryItemIds,
uid,
labelIds,
args
)
return { success: true }
}
// if there are more than 100 items, update them asynchronously
const queue = await getBackendQueue()
if (!queue) {
throw new Error('Queue not initialized')
}
const data = {
userId: uid,
action,
labelIds,
labelIds: labelIds || undefined,
query,
count,
args,
batchSize,
useFolders,
}
await queue.add(BULK_ACTION_JOB_NAME, data, {
attempts: 1,
priority: 10,
})
const job = await enqueueBulkAction(data)
if (!job) {
return { errorCodes: [BulkActionErrorCode.BadRequest] }
}
return { success: true }
} catch (error) {

View File

@ -2,6 +2,7 @@ import { ExpressionToken, LiqeQuery } from '@omnivore/liqe'
import { DateTime } from 'luxon'
import { DeepPartial, FindOptionsWhere, ObjectLiteral } from 'typeorm'
import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity'
import { ReadingProgressDataSource } from '../datasources/reading_progress_data_source'
import { Highlight } from '../entity/highlight'
import { Label } from '../entity/label'
import { LibraryItem, LibraryItemState } from '../entity/library_item'
@ -102,6 +103,20 @@ interface Select {
alias?: string
}
const readingProgressDataSource = new ReadingProgressDataSource()
const markItemAsRead = async (libraryItemId: string, userId: string) => {
return await readingProgressDataSource.updateReadingProgress(
userId,
libraryItemId,
{
readingProgressPercent: 100,
readingProgressTopPercent: 100,
readingProgressAnchorIndex: undefined,
}
)
}
const handleNoCase = (value: string) => {
const keywordRegexMap: Record<string, RegExp> = {
highlight: /^highlight(s)?$/i,
@ -924,11 +939,15 @@ export const countByCreatedAt = async (
export const batchUpdateLibraryItems = async (
action: BulkActionType,
searchArgs: SearchArgs,
libraryItemIds: string[],
userId: string,
labelIds?: string[] | null,
args?: unknown
) => {
if (libraryItemIds.length === 0) {
return
}
interface FolderArguments {
folder: string
}
@ -940,7 +959,6 @@ export const batchUpdateLibraryItems = async (
const now = new Date().toISOString()
// build the script
let values: Record<string, string | number> = {}
let addLabels = false
switch (action) {
case BulkActionType.Archive:
values = {
@ -955,15 +973,23 @@ export const batchUpdateLibraryItems = async (
}
break
case BulkActionType.AddLabels:
addLabels = true
break
case BulkActionType.MarkAsRead:
values = {
readAt: now,
readingProgressTopPercent: 100,
readingProgressBottomPercent: 100,
if (!labelIds || labelIds.length === 0) {
throw new Error('Labels are required for this action')
}
break
// add labels to library items
for (const libraryItemId of libraryItemIds) {
await addLabelsToLibraryItem(labelIds, libraryItemId, userId)
}
return
case BulkActionType.MarkAsRead:
// update reading progress for library items
for (const libraryItemId of libraryItemIds) {
await markItemAsRead(libraryItemId, userId)
}
return
case BulkActionType.MoveToFolder:
if (!args || !isFolderArguments(args)) {
throw new Error('Invalid arguments')
@ -979,42 +1005,8 @@ export const batchUpdateLibraryItems = async (
throw new Error('Invalid bulk action')
}
if (!searchArgs.query) {
throw new Error('Search query is required')
}
const searchQuery = parseSearchQuery(searchArgs.query)
const parameters: ObjectLiteral[] = []
const query = buildQuery(searchQuery, parameters)
await authTrx(
async (tx) => {
const queryBuilder = tx
.createQueryBuilder(LibraryItem, 'library_item')
.where('library_item.user_id = :userId', { userId })
if (query) {
queryBuilder
.andWhere(`(${query})`)
.setParameters(paramtersToObject(parameters))
}
if (addLabels) {
if (!labelIds) {
throw new Error('Labels are required for this action')
}
const libraryItems = await queryBuilder.getMany()
// add labels to library items
for (const libraryItem of libraryItems) {
await addLabelsToLibraryItem(labelIds, libraryItem.id, userId)
}
return
}
await queryBuilder.update(LibraryItem).set(values).execute()
},
return authTrx(
async (tx) => tx.getRepository(LibraryItem).update(libraryItemIds, values),
undefined,
userId
)

View File

@ -14,6 +14,7 @@ import {
ArticleSavingRequestStatus,
CreateLabelInput,
} from '../generated/graphql'
import { BulkActionData, BULK_ACTION_JOB_NAME } from '../jobs/bulk_action'
import { THUMBNAIL_JOB } from '../jobs/find_thumbnail'
import { queueRSSRefreshFeedJob } from '../jobs/rss/refreshAllFeeds'
import { TriggerRuleJobData, TRIGGER_RULE_JOB_NAME } from '../jobs/trigger_rule'
@ -716,4 +717,25 @@ export const enqueueUpdateHighlight = async (data: UpdateHighlightData) => {
}
}
export const enqueueBulkAction = async (data: BulkActionData) => {
const queue = await getBackendQueue()
if (!queue) {
return undefined
}
const jobId = `${BULK_ACTION_JOB_NAME}-${data.userId}`
try {
return queue.add(BULK_ACTION_JOB_NAME, data, {
attempts: 1,
priority: 10,
jobId, // deduplication
removeOnComplete: true,
removeOnFail: true,
})
} catch (error) {
logger.error('error enqueuing update highlight job', error)
}
}
export default createHttpTaskWithToken