diff --git a/packages/api/src/jobs/batch_update.ts b/packages/api/src/jobs/batch_update.ts deleted file mode 100644 index 2657fedec..000000000 --- a/packages/api/src/jobs/batch_update.ts +++ /dev/null @@ -1,18 +0,0 @@ -import { BulkActionType } from '../generated/graphql' -import { batchUpdateLibraryItems } from '../services/library_item' - -export interface BatchUpdateData { - userId: string - libraryItemIds: string[] - action: BulkActionType - labelIds?: string[] - args?: unknown -} - -export const BATCH_UPDATE_JOB_NAME = 'batch-update' - -export const batchUpdate = async (data: BatchUpdateData) => { - const { userId, action, labelIds, libraryItemIds, args } = data - - return batchUpdateLibraryItems(action, libraryItemIds, userId, labelIds, args) -} diff --git a/packages/api/src/jobs/bulk_action.ts b/packages/api/src/jobs/bulk_action.ts index 5a3e6a6a7..abea14493 100644 --- a/packages/api/src/jobs/bulk_action.ts +++ b/packages/api/src/jobs/bulk_action.ts @@ -1,9 +1,7 @@ import { BulkActionType } from '../generated/graphql' import { getBackendQueue } from '../queue-processor' -import { searchLibraryItems } from '../services/library_item' -import { stringToHash } from '../utils/helpers' +import { batchUpdateLibraryItems } from '../services/library_item' import { logger } from '../utils/logger' -import { BATCH_UPDATE_JOB_NAME } from './batch_update' export interface BulkActionData { count: number @@ -17,49 +15,26 @@ export interface BulkActionData { export const BULK_ACTION_JOB_NAME = 'bulk-action' -export const bulkAction = async (data: BulkActionData, id?: string) => { - if (!id) { - throw new Error('Missing id') - } - - const { userId, action, query, labelIds, count, args, batchSize } = data +export const bulkAction = async (data: BulkActionData) => { + const { userId, action, query, labelIds, args, batchSize, count } = data const queue = await getBackendQueue() if (!queue) { throw new Error('Queue not initialized') } + const now = new Date().toISOString() let offset = 0 do { const searchArgs = { size: batchSize, - from: offset, - query, + query: `(${query}) AND updated:<${now}`, } - const searchResult = await searchLibraryItems(searchArgs, userId) - const libraryItemIds = searchResult.libraryItems.map((item) => item.id) - const data = { - userId, - action, - labelIds, - libraryItemIds, - args, - } - const libraryItemIdsStr = libraryItemIds.sort().join() - const jobId = `${BATCH_UPDATE_JOB_NAME}-${stringToHash(libraryItemIdsStr)}` - - // enqueue job for each batch try { - await queue.add(BATCH_UPDATE_JOB_NAME, data, { - attempts: 1, - priority: 10, - jobId, // deduplication - removeOnComplete: true, - removeOnFail: true, - }) + await batchUpdateLibraryItems(action, searchArgs, userId, labelIds, args) } catch (error) { - logger.error('Error enqueuing batch update job', error) + logger.error('batch update error', error) } offset += batchSize diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index ecb255e95..a5f1a38d1 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -14,12 +14,15 @@ import express, { Express } from 'express' import { SnakeNamingStrategy } from 'typeorm-naming-strategies' import { appDataSource } from './data_source' import { env } from './env' -import { batchUpdate, BATCH_UPDATE_JOB_NAME } from './jobs/batch_update' import { bulkAction, BULK_ACTION_JOB_NAME } from './jobs/bulk_action' import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail' import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds' import { refreshFeed } from './jobs/rss/refreshFeed' import { savePageJob } from './jobs/save_page' +import { + syncReadPositionsJob, + SYNC_READ_POSITIONS_JOB_NAME, +} from './jobs/sync_read_positions' import { triggerRule, TRIGGER_RULE_JOB_NAME } from './jobs/trigger_rule' import { updateHighlight, @@ -29,12 +32,8 @@ import { } from './jobs/update_db' import { updatePDFContentJob } from './jobs/update_pdf_content' import { redisDataSource } from './redis_data_source' -import { logger, CustomTypeOrmLogger } from './utils/logger' -import { - SYNC_READ_POSITIONS_JOB_NAME, - syncReadPositionsJob, -} from './jobs/sync_read_positions' import { CACHED_READING_POSITION_PREFIX } from './services/cached_reading_position' +import { CustomTypeOrmLogger, logger } from './utils/logger' export const QUEUE_NAME = 'omnivore-backend-queue' @@ -87,9 +86,7 @@ export const createWorker = (connection: ConnectionOptions) => case SYNC_READ_POSITIONS_JOB_NAME: return syncReadPositionsJob(job.data) case BULK_ACTION_JOB_NAME: - return bulkAction(job.data, job.id) - case BATCH_UPDATE_JOB_NAME: - return batchUpdate(job.data) + return bulkAction(job.data) } }, { diff --git a/packages/api/src/resolvers/article/index.ts b/packages/api/src/resolvers/article/index.ts index 0ac442f62..04666c7dc 100644 --- a/packages/api/src/resolvers/article/index.ts +++ b/packages/api/src/resolvers/article/index.ts @@ -889,14 +889,7 @@ export const bulkActionResolver = authorized< if (count <= batchSize) { // if there are less than 100 items, update them synchronously - const libraryItemIds = searchResult.libraryItems.map((item) => item.id) - await batchUpdateLibraryItems( - action, - libraryItemIds, - uid, - labelIds, - args - ) + await batchUpdateLibraryItems(action, searchArgs, uid, labelIds, args) return { success: true } } diff --git a/packages/api/src/services/library_item.ts b/packages/api/src/services/library_item.ts index c2cc0857b..30649636c 100644 --- a/packages/api/src/services/library_item.ts +++ b/packages/api/src/services/library_item.ts @@ -3,6 +3,7 @@ import { DateTime } from 'luxon' import { DeepPartial, FindOptionsWhere, ObjectLiteral } from 'typeorm' import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity' import { ReadingProgressDataSource } from '../datasources/reading_progress_data_source' +import { appDataSource } from '../data_source' import { Highlight } from '../entity/highlight' import { Label } from '../entity/label' import { LibraryItem, LibraryItemState } from '../entity/library_item' @@ -939,15 +940,11 @@ export const countByCreatedAt = async ( export const batchUpdateLibraryItems = async ( action: BulkActionType, - libraryItemIds: string[], + searchArgs: SearchArgs, userId: string, labelIds?: string[] | null, args?: unknown ) => { - if (libraryItemIds.length === 0) { - return - } - interface FolderArguments { folder: string } @@ -956,6 +953,23 @@ export const batchUpdateLibraryItems = async ( return 'folder' in args } + if (!searchArgs.query) { + throw new Error('Search query is required') + } + + const searchQuery = parseSearchQuery(searchArgs.query) + const parameters: ObjectLiteral[] = [] + const query = buildQuery(searchQuery, parameters) + const queryBuilder = appDataSource + .createQueryBuilder(LibraryItem, 'library_item') + .where('library_item.user_id = :userId', { userId }) + + if (query) { + queryBuilder + .andWhere(`(${query})`) + .setParameters(paramtersToObject(parameters)) + } + const now = new Date().toISOString() // build the script let values: Record = {} @@ -972,24 +986,28 @@ export const batchUpdateLibraryItems = async ( deletedAt: now, } break - case BulkActionType.AddLabels: - if (!labelIds || labelIds.length === 0) { + case BulkActionType.AddLabels: { + if (!labelIds) { throw new Error('Labels are required for this action') } + const libraryItems = await queryBuilder.getMany() // add labels to library items - for (const libraryItemId of libraryItemIds) { - await addLabelsToLibraryItem(labelIds, libraryItemId, userId) + for (const libraryItem of libraryItems) { + await addLabelsToLibraryItem(labelIds, libraryItem.id, userId) } return - case BulkActionType.MarkAsRead: + } + case BulkActionType.MarkAsRead: { + const libraryItems = await queryBuilder.getMany() // update reading progress for library items - for (const libraryItemId of libraryItemIds) { - await markItemAsRead(libraryItemId, userId) + for (const libraryItem of libraryItems) { + await markItemAsRead(libraryItem.id, userId) } return + } case BulkActionType.MoveToFolder: if (!args || !isFolderArguments(args)) { throw new Error('Invalid arguments') @@ -1005,11 +1023,7 @@ export const batchUpdateLibraryItems = async ( throw new Error('Invalid bulk action') } - return authTrx( - async (tx) => tx.getRepository(LibraryItem).update(libraryItemIds, values), - undefined, - userId - ) + await queryBuilder.update(LibraryItem).set(values).execute() } export const deleteLibraryItemById = async (id: string, userId?: string) => { diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index b08ce1083..da1115200 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -734,7 +734,7 @@ export const enqueueBulkAction = async (data: BulkActionData) => { removeOnFail: true, }) } catch (error) { - logger.error('error enqueuing update highlight job', error) + logger.error('error enqueuing bulk action job', error) } }