diff --git a/packages/api/src/jobs/batch_update.ts b/packages/api/src/jobs/batch_update.ts new file mode 100644 index 000000000..b7961a2a2 --- /dev/null +++ b/packages/api/src/jobs/batch_update.ts @@ -0,0 +1,26 @@ +import { BulkActionType } from '../generated/graphql' +import { batchUpdateLibraryItems } from '../services/library_item' + +export interface BatchUpdateData { + userId: string + labelIds: string[] + libraryItemIds: string[] + action: BulkActionType + size: number + args?: unknown +} + +export const BATCH_UPDATE_JOB_NAME = 'batch-update' + +export const batchUpdate = async (data: BatchUpdateData) => { + const { userId, action, labelIds, libraryItemIds, args, size } = data + + const searchArgs = { + size, + query: `in:all includes:${libraryItemIds.join()}`, + } + + await batchUpdateLibraryItems(action, searchArgs, userId, labelIds, args) + + return true +} diff --git a/packages/api/src/jobs/bulk_action.ts b/packages/api/src/jobs/bulk_action.ts new file mode 100644 index 000000000..07e9a35f6 --- /dev/null +++ b/packages/api/src/jobs/bulk_action.ts @@ -0,0 +1,61 @@ +import { BulkActionType } from '../generated/graphql' +import { getBackendQueue } from '../queue-processor' +import { searchLibraryItems } from '../services/library_item' +import { logger } from '../utils/logger' +import { BATCH_UPDATE_JOB_NAME } from './batch_update' + +export interface BulkActionData { + count: number + userId: string + action: BulkActionType + query: string + labelIds: string[] + batchSize: number + args?: unknown +} + +export const BULK_ACTION_JOB_NAME = 'bulk-action' + +export const bulkAction = async (data: BulkActionData) => { + const { userId, action, query, labelIds, count, args, batchSize } = data + + const queue = await getBackendQueue() + if (!queue) { + throw new Error('Queue not initialized') + } + + let offset = 0 + + do { + const searchArgs = { + size: batchSize, + from: offset, + query, + } + + const searchResult = await searchLibraryItems(searchArgs, userId) + const libraryItemIds = searchResult.libraryItems.map((item) => item.id) + const data = { + userId, + action, + labelIds, + libraryItemIds, + args, + size: batchSize, + } + + // enqueue job for each batch + try { + await queue.add(BATCH_UPDATE_JOB_NAME, data, { + attempts: 1, + priority: 10, + }) + } catch (error) { + logger.error('Error enqueuing batch update job', error) + } + + offset += batchSize + } while (offset < count) + + return true +} diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index 241ef809f..70f7d7359 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -14,6 +14,8 @@ import express, { Express } from 'express' import { SnakeNamingStrategy } from 'typeorm-naming-strategies' import { appDataSource } from './data_source' import { env } from './env' +import { batchUpdate, BATCH_UPDATE_JOB_NAME } from './jobs/batch_update' +import { bulkAction, BULK_ACTION_JOB_NAME } from './jobs/bulk_action' import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail' import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds' import { refreshFeed } from './jobs/rss/refreshFeed' @@ -84,6 +86,10 @@ export const createWorker = (connection: ConnectionOptions) => return updateHighlight(job.data) case SYNC_READ_POSITIONS_JOB_NAME: return syncReadPositionsJob(job.data) + case BULK_ACTION_JOB_NAME: + return bulkAction(job.data) + case BATCH_UPDATE_JOB_NAME: + return batchUpdate(job.data) } }, { diff --git a/packages/api/src/resolvers/article/index.ts b/packages/api/src/resolvers/article/index.ts index b68bf7024..1b746d03d 100644 --- a/packages/api/src/resolvers/article/index.ts +++ b/packages/api/src/resolvers/article/index.ts @@ -16,7 +16,6 @@ import { BulkActionError, BulkActionErrorCode, BulkActionSuccess, - BulkActionType, ContentReader, CreateArticleError, CreateArticleErrorCode, @@ -60,6 +59,8 @@ import { UpdatesSinceError, UpdatesSinceSuccess, } from '../../generated/graphql' +import { BULK_ACTION_JOB_NAME } from '../../jobs/bulk_action' +import { getBackendQueue } from '../../queue-processor' import { authTrx, getColumns } from '../../repository' import { getInternalLabelWithColor } from '../../repository/label' import { libraryItemRepository } from '../../repository/library_item' @@ -69,7 +70,6 @@ import { findHighlightsByLibraryItemId } from '../../services/highlights' import { addLabelsToLibraryItem, createAndSaveLabelsInLibraryItem, - findLabelsByIds, findOrCreateLabels, } from '../../services/labels' import { @@ -879,35 +879,57 @@ export const bulkActionResolver = authorized< }, }) - // the query size is limited to 4000 characters to allow for 100 items - if (!query || query.length > 4000) { - log.error('bulkActionResolver error', { - error: 'QueryTooLong', - query, - }) - return { errorCodes: [BulkActionErrorCode.BadRequest] } - } - - // get labels if needed - let labels = undefined - if (action === BulkActionType.AddLabels) { - if (!labelIds || labelIds.length === 0) { - return { errorCodes: [BulkActionErrorCode.BadRequest] } - } - - labels = await findLabelsByIds(labelIds, uid) - } - - await batchUpdateLibraryItems( - action, + const searchResult = await searchLibraryItems( { query, + includePending: true, + includeDeleted: true, useFolders: query.includes('use:folders'), }, - uid, - labels, - args + uid ) + const count = searchResult.count + if (count === 0) { + log.info('No items found for bulk action') + return { success: true } + } + + const batchSize = 100 + if (count <= batchSize) { + // if there are less than 100 items, update them synchronously + await batchUpdateLibraryItems( + action, + { + query, + useFolders: query.includes('use:folders'), + }, + uid, + labelIds, + args + ) + + return { success: true } + } + + // if there are more than 100 items, update them asynchronously + const queue = await getBackendQueue() + if (!queue) { + throw new Error('Queue not initialized') + } + + const data = { + userId: uid, + action, + labelIds, + query, + count, + args, + batchSize, + } + await queue.add(BULK_ACTION_JOB_NAME, data, { + attempts: 1, + priority: 10, + }) return { success: true } } catch (error) { diff --git a/packages/api/src/services/library_item.ts b/packages/api/src/services/library_item.ts index 6aa52eeb1..a695e0cd5 100644 --- a/packages/api/src/services/library_item.ts +++ b/packages/api/src/services/library_item.ts @@ -13,7 +13,6 @@ import { getColumns, isUniqueViolation, queryBuilderToRawSql, - valuesToRawSql, } from '../repository' import { libraryItemRepository } from '../repository/library_item' import { setRecentlySavedItemInRedis, wordsCount } from '../utils/helpers' @@ -927,7 +926,7 @@ export const batchUpdateLibraryItems = async ( action: BulkActionType, searchArgs: SearchArgs, userId: string, - labels?: Label[], + labelIds?: string[] | null, args?: unknown ) => { interface FolderArguments { @@ -945,14 +944,14 @@ export const batchUpdateLibraryItems = async ( switch (action) { case BulkActionType.Archive: values = { - archived_at: now, + archivedAt: now, state: LibraryItemState.Archived, } break case BulkActionType.Delete: values = { state: LibraryItemState.Deleted, - deleted_at: now, + deletedAt: now, } break case BulkActionType.AddLabels: @@ -960,9 +959,9 @@ export const batchUpdateLibraryItems = async ( break case BulkActionType.MarkAsRead: values = { - read_at: now, - reading_progress_top_percent: 100, - reading_progress_bottom_percent: 100, + readAt: now, + readingProgressTopPercent: 100, + readingProgressBottomPercent: 100, } break case BulkActionType.MoveToFolder: @@ -972,7 +971,7 @@ export const batchUpdateLibraryItems = async ( values = { folder: args.folder, - saved_at: now, + savedAt: now, } break @@ -988,63 +987,37 @@ export const batchUpdateLibraryItems = async ( const parameters: ObjectLiteral[] = [] const query = buildQuery(searchQuery, parameters) - await authTrx(async (tx) => { - const queryBuilder = tx - .createQueryBuilder(LibraryItem, 'library_item') - .where('library_item.user_id = :userId', { userId }) + await authTrx( + async (tx) => { + const queryBuilder = tx + .createQueryBuilder(LibraryItem, 'library_item') + .where('library_item.user_id = :userId', { userId }) - if (query) { - queryBuilder - .andWhere(`(${query})`) - .setParameters(paramtersToObject(parameters)) - } - - if (addLabels) { - if (!labels) { - throw new Error('Labels are required for this action') + if (query) { + queryBuilder + .andWhere(`(${query})`) + .setParameters(paramtersToObject(parameters)) } - const labelIds = labels.map((label) => label.id) - const libraryItems = await queryBuilder.getMany() - // add labels in library items - for (const libraryItem of libraryItems) { - await addLabelsToLibraryItem(labelIds, libraryItem.id, userId) + if (addLabels) { + if (!labelIds) { + throw new Error('Labels are required for this action') + } - libraryItem.labels = labels + const libraryItems = await queryBuilder.getMany() + // add labels to library items + for (const libraryItem of libraryItems) { + await addLabelsToLibraryItem(labelIds, libraryItem.id, userId) + } + + return } - return - } - - // generate raw sql because postgres doesn't support prepared statements in DO blocks - const countSql = queryBuilderToRawSql(queryBuilder.select('COUNT(1)')) - const subQuery = queryBuilderToRawSql(queryBuilder.select('id')) - const valuesSql = valuesToRawSql(values) - - const start = new Date().toISOString() - const batchSize = 1000 - const sql = ` - -- Set batch size - DO $$ - DECLARE - batch_size INT := ${batchSize}; - BEGIN - -- Loop through batches - FOR i IN 0..CEIL((${countSql}) * 1.0 / batch_size) - 1 LOOP - -- Update the batch - UPDATE omnivore.library_item - SET ${valuesSql} - WHERE id = ANY( - ${subQuery} - AND updated_at < '${start}' - LIMIT batch_size - ); - END LOOP; - END $$ - ` - - return tx.query(sql) - }) + await queryBuilder.update(LibraryItem).set(values).execute() + }, + undefined, + userId + ) } export const deleteLibraryItemById = async (id: string, userId?: string) => {