async bulk action on item count > 100 and create batch jobs

This commit is contained in:
Hongbo Wu
2024-01-31 23:11:42 +08:00
parent 7f4f7c6a80
commit 702bd6c3c6
5 changed files with 173 additions and 85 deletions

View File

@ -0,0 +1,26 @@
import { BulkActionType } from '../generated/graphql'
import { batchUpdateLibraryItems } from '../services/library_item'
export interface BatchUpdateData {
userId: string
labelIds: string[]
libraryItemIds: string[]
action: BulkActionType
size: number
args?: unknown
}
export const BATCH_UPDATE_JOB_NAME = 'batch-update'
export const batchUpdate = async (data: BatchUpdateData) => {
const { userId, action, labelIds, libraryItemIds, args, size } = data
const searchArgs = {
size,
query: `in:all includes:${libraryItemIds.join()}`,
}
await batchUpdateLibraryItems(action, searchArgs, userId, labelIds, args)
return true
}

View File

@ -0,0 +1,61 @@
import { BulkActionType } from '../generated/graphql'
import { getBackendQueue } from '../queue-processor'
import { searchLibraryItems } from '../services/library_item'
import { logger } from '../utils/logger'
import { BATCH_UPDATE_JOB_NAME } from './batch_update'
export interface BulkActionData {
count: number
userId: string
action: BulkActionType
query: string
labelIds: string[]
batchSize: number
args?: unknown
}
export const BULK_ACTION_JOB_NAME = 'bulk-action'
export const bulkAction = async (data: BulkActionData) => {
const { userId, action, query, labelIds, count, args, batchSize } = data
const queue = await getBackendQueue()
if (!queue) {
throw new Error('Queue not initialized')
}
let offset = 0
do {
const searchArgs = {
size: batchSize,
from: offset,
query,
}
const searchResult = await searchLibraryItems(searchArgs, userId)
const libraryItemIds = searchResult.libraryItems.map((item) => item.id)
const data = {
userId,
action,
labelIds,
libraryItemIds,
args,
size: batchSize,
}
// enqueue job for each batch
try {
await queue.add(BATCH_UPDATE_JOB_NAME, data, {
attempts: 1,
priority: 10,
})
} catch (error) {
logger.error('Error enqueuing batch update job', error)
}
offset += batchSize
} while (offset < count)
return true
}

View File

@ -14,6 +14,8 @@ import express, { Express } from 'express'
import { SnakeNamingStrategy } from 'typeorm-naming-strategies' import { SnakeNamingStrategy } from 'typeorm-naming-strategies'
import { appDataSource } from './data_source' import { appDataSource } from './data_source'
import { env } from './env' import { env } from './env'
import { batchUpdate, BATCH_UPDATE_JOB_NAME } from './jobs/batch_update'
import { bulkAction, BULK_ACTION_JOB_NAME } from './jobs/bulk_action'
import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail' import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail'
import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds' import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds'
import { refreshFeed } from './jobs/rss/refreshFeed' import { refreshFeed } from './jobs/rss/refreshFeed'
@ -84,6 +86,10 @@ export const createWorker = (connection: ConnectionOptions) =>
return updateHighlight(job.data) return updateHighlight(job.data)
case SYNC_READ_POSITIONS_JOB_NAME: case SYNC_READ_POSITIONS_JOB_NAME:
return syncReadPositionsJob(job.data) return syncReadPositionsJob(job.data)
case BULK_ACTION_JOB_NAME:
return bulkAction(job.data)
case BATCH_UPDATE_JOB_NAME:
return batchUpdate(job.data)
} }
}, },
{ {

View File

@ -16,7 +16,6 @@ import {
BulkActionError, BulkActionError,
BulkActionErrorCode, BulkActionErrorCode,
BulkActionSuccess, BulkActionSuccess,
BulkActionType,
ContentReader, ContentReader,
CreateArticleError, CreateArticleError,
CreateArticleErrorCode, CreateArticleErrorCode,
@ -60,6 +59,8 @@ import {
UpdatesSinceError, UpdatesSinceError,
UpdatesSinceSuccess, UpdatesSinceSuccess,
} from '../../generated/graphql' } from '../../generated/graphql'
import { BULK_ACTION_JOB_NAME } from '../../jobs/bulk_action'
import { getBackendQueue } from '../../queue-processor'
import { authTrx, getColumns } from '../../repository' import { authTrx, getColumns } from '../../repository'
import { getInternalLabelWithColor } from '../../repository/label' import { getInternalLabelWithColor } from '../../repository/label'
import { libraryItemRepository } from '../../repository/library_item' import { libraryItemRepository } from '../../repository/library_item'
@ -69,7 +70,6 @@ import { findHighlightsByLibraryItemId } from '../../services/highlights'
import { import {
addLabelsToLibraryItem, addLabelsToLibraryItem,
createAndSaveLabelsInLibraryItem, createAndSaveLabelsInLibraryItem,
findLabelsByIds,
findOrCreateLabels, findOrCreateLabels,
} from '../../services/labels' } from '../../services/labels'
import { import {
@ -879,35 +879,57 @@ export const bulkActionResolver = authorized<
}, },
}) })
// the query size is limited to 4000 characters to allow for 100 items const searchResult = await searchLibraryItems(
if (!query || query.length > 4000) {
log.error('bulkActionResolver error', {
error: 'QueryTooLong',
query,
})
return { errorCodes: [BulkActionErrorCode.BadRequest] }
}
// get labels if needed
let labels = undefined
if (action === BulkActionType.AddLabels) {
if (!labelIds || labelIds.length === 0) {
return { errorCodes: [BulkActionErrorCode.BadRequest] }
}
labels = await findLabelsByIds(labelIds, uid)
}
await batchUpdateLibraryItems(
action,
{ {
query, query,
includePending: true,
includeDeleted: true,
useFolders: query.includes('use:folders'), useFolders: query.includes('use:folders'),
}, },
uid, uid
labels,
args
) )
const count = searchResult.count
if (count === 0) {
log.info('No items found for bulk action')
return { success: true }
}
const batchSize = 100
if (count <= batchSize) {
// if there are less than 100 items, update them synchronously
await batchUpdateLibraryItems(
action,
{
query,
useFolders: query.includes('use:folders'),
},
uid,
labelIds,
args
)
return { success: true }
}
// if there are more than 100 items, update them asynchronously
const queue = await getBackendQueue()
if (!queue) {
throw new Error('Queue not initialized')
}
const data = {
userId: uid,
action,
labelIds,
query,
count,
args,
batchSize,
}
await queue.add(BULK_ACTION_JOB_NAME, data, {
attempts: 1,
priority: 10,
})
return { success: true } return { success: true }
} catch (error) { } catch (error) {

View File

@ -13,7 +13,6 @@ import {
getColumns, getColumns,
isUniqueViolation, isUniqueViolation,
queryBuilderToRawSql, queryBuilderToRawSql,
valuesToRawSql,
} from '../repository' } from '../repository'
import { libraryItemRepository } from '../repository/library_item' import { libraryItemRepository } from '../repository/library_item'
import { setRecentlySavedItemInRedis, wordsCount } from '../utils/helpers' import { setRecentlySavedItemInRedis, wordsCount } from '../utils/helpers'
@ -927,7 +926,7 @@ export const batchUpdateLibraryItems = async (
action: BulkActionType, action: BulkActionType,
searchArgs: SearchArgs, searchArgs: SearchArgs,
userId: string, userId: string,
labels?: Label[], labelIds?: string[] | null,
args?: unknown args?: unknown
) => { ) => {
interface FolderArguments { interface FolderArguments {
@ -945,14 +944,14 @@ export const batchUpdateLibraryItems = async (
switch (action) { switch (action) {
case BulkActionType.Archive: case BulkActionType.Archive:
values = { values = {
archived_at: now, archivedAt: now,
state: LibraryItemState.Archived, state: LibraryItemState.Archived,
} }
break break
case BulkActionType.Delete: case BulkActionType.Delete:
values = { values = {
state: LibraryItemState.Deleted, state: LibraryItemState.Deleted,
deleted_at: now, deletedAt: now,
} }
break break
case BulkActionType.AddLabels: case BulkActionType.AddLabels:
@ -960,9 +959,9 @@ export const batchUpdateLibraryItems = async (
break break
case BulkActionType.MarkAsRead: case BulkActionType.MarkAsRead:
values = { values = {
read_at: now, readAt: now,
reading_progress_top_percent: 100, readingProgressTopPercent: 100,
reading_progress_bottom_percent: 100, readingProgressBottomPercent: 100,
} }
break break
case BulkActionType.MoveToFolder: case BulkActionType.MoveToFolder:
@ -972,7 +971,7 @@ export const batchUpdateLibraryItems = async (
values = { values = {
folder: args.folder, folder: args.folder,
saved_at: now, savedAt: now,
} }
break break
@ -988,63 +987,37 @@ export const batchUpdateLibraryItems = async (
const parameters: ObjectLiteral[] = [] const parameters: ObjectLiteral[] = []
const query = buildQuery(searchQuery, parameters) const query = buildQuery(searchQuery, parameters)
await authTrx(async (tx) => { await authTrx(
const queryBuilder = tx async (tx) => {
.createQueryBuilder(LibraryItem, 'library_item') const queryBuilder = tx
.where('library_item.user_id = :userId', { userId }) .createQueryBuilder(LibraryItem, 'library_item')
.where('library_item.user_id = :userId', { userId })
if (query) { if (query) {
queryBuilder queryBuilder
.andWhere(`(${query})`) .andWhere(`(${query})`)
.setParameters(paramtersToObject(parameters)) .setParameters(paramtersToObject(parameters))
}
if (addLabels) {
if (!labels) {
throw new Error('Labels are required for this action')
} }
const labelIds = labels.map((label) => label.id) if (addLabels) {
const libraryItems = await queryBuilder.getMany() if (!labelIds) {
// add labels in library items throw new Error('Labels are required for this action')
for (const libraryItem of libraryItems) { }
await addLabelsToLibraryItem(labelIds, libraryItem.id, userId)
libraryItem.labels = labels const libraryItems = await queryBuilder.getMany()
// add labels to library items
for (const libraryItem of libraryItems) {
await addLabelsToLibraryItem(labelIds, libraryItem.id, userId)
}
return
} }
return await queryBuilder.update(LibraryItem).set(values).execute()
} },
undefined,
// generate raw sql because postgres doesn't support prepared statements in DO blocks userId
const countSql = queryBuilderToRawSql(queryBuilder.select('COUNT(1)')) )
const subQuery = queryBuilderToRawSql(queryBuilder.select('id'))
const valuesSql = valuesToRawSql(values)
const start = new Date().toISOString()
const batchSize = 1000
const sql = `
-- Set batch size
DO $$
DECLARE
batch_size INT := ${batchSize};
BEGIN
-- Loop through batches
FOR i IN 0..CEIL((${countSql}) * 1.0 / batch_size) - 1 LOOP
-- Update the batch
UPDATE omnivore.library_item
SET ${valuesSql}
WHERE id = ANY(
${subQuery}
AND updated_at < '${start}'
LIMIT batch_size
);
END LOOP;
END $$
`
return tx.query(sql)
})
} }
export const deleteLibraryItemById = async (id: string, userId?: string) => { export const deleteLibraryItemById = async (id: string, userId?: string) => {