diff --git a/packages/api/src/jobs/batch_update.ts b/packages/api/src/jobs/batch_update.ts index b7961a2a2..2657fedec 100644 --- a/packages/api/src/jobs/batch_update.ts +++ b/packages/api/src/jobs/batch_update.ts @@ -3,24 +3,16 @@ import { batchUpdateLibraryItems } from '../services/library_item' export interface BatchUpdateData { userId: string - labelIds: string[] libraryItemIds: string[] action: BulkActionType - size: number + labelIds?: string[] args?: unknown } export const BATCH_UPDATE_JOB_NAME = 'batch-update' export const batchUpdate = async (data: BatchUpdateData) => { - const { userId, action, labelIds, libraryItemIds, args, size } = data + const { userId, action, labelIds, libraryItemIds, args } = data - const searchArgs = { - size, - query: `in:all includes:${libraryItemIds.join()}`, - } - - await batchUpdateLibraryItems(action, searchArgs, userId, labelIds, args) - - return true + return batchUpdateLibraryItems(action, libraryItemIds, userId, labelIds, args) } diff --git a/packages/api/src/jobs/bulk_action.ts b/packages/api/src/jobs/bulk_action.ts index b8f3623cc..fbef5b2b9 100644 --- a/packages/api/src/jobs/bulk_action.ts +++ b/packages/api/src/jobs/bulk_action.ts @@ -1,6 +1,7 @@ import { BulkActionType } from '../generated/graphql' import { getBackendQueue } from '../queue-processor' import { searchLibraryItems } from '../services/library_item' +import { stringToHash } from '../utils/helpers' import { logger } from '../utils/logger' import { BATCH_UPDATE_JOB_NAME } from './batch_update' @@ -9,15 +10,19 @@ export interface BulkActionData { userId: string action: BulkActionType query: string - labelIds: string[] batchSize: number + labelIds?: string[] args?: unknown useFolders?: boolean } export const BULK_ACTION_JOB_NAME = 'bulk-action' -export const bulkAction = async (data: BulkActionData) => { +export const bulkAction = async (data: BulkActionData, id?: string) => { + if (!id) { + throw new Error('Missing id') + } + const { userId, action, @@ -33,7 +38,7 @@ export const bulkAction = async (data: BulkActionData) => { if (!queue) { throw new Error('Queue not initialized') } - + const parent = { id, queue: queue.name } let offset = 0 do { @@ -54,12 +59,18 @@ export const bulkAction = async (data: BulkActionData) => { args, size: batchSize, } + const libraryItemIdsStr = libraryItemIds.sort().join() + const jobId = `${BATCH_UPDATE_JOB_NAME}-${stringToHash(libraryItemIdsStr)}` // enqueue job for each batch try { await queue.add(BATCH_UPDATE_JOB_NAME, data, { attempts: 1, priority: 10, + jobId, // deduplication + removeOnComplete: true, + removeOnFail: true, + parent, // for tracking }) } catch (error) { logger.error('Error enqueuing batch update job', error) diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index 70f7d7359..ecb255e95 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -87,7 +87,7 @@ export const createWorker = (connection: ConnectionOptions) => case SYNC_READ_POSITIONS_JOB_NAME: return syncReadPositionsJob(job.data) case BULK_ACTION_JOB_NAME: - return bulkAction(job.data) + return bulkAction(job.data, job.id) case BATCH_UPDATE_JOB_NAME: return batchUpdate(job.data) } diff --git a/packages/api/src/resolvers/article/index.ts b/packages/api/src/resolvers/article/index.ts index c60f821d2..1441acbd3 100644 --- a/packages/api/src/resolvers/article/index.ts +++ b/packages/api/src/resolvers/article/index.ts @@ -59,12 +59,11 @@ import { UpdatesSinceError, UpdatesSinceSuccess, } from '../../generated/graphql' -import { BULK_ACTION_JOB_NAME } from '../../jobs/bulk_action' -import { getBackendQueue } from '../../queue-processor' import { authTrx, getColumns } from '../../repository' import { getInternalLabelWithColor } from '../../repository/label' import { libraryItemRepository } from '../../repository/library_item' import { userRepository } from '../../repository/user' +import { clearCachedReadingPosition } from '../../services/cached_reading_position' import { createPageSaveRequest } from '../../services/create_page_save_request' import { findHighlightsByLibraryItemId } from '../../services/highlights' import { @@ -93,6 +92,7 @@ import { import { traceAs } from '../../tracing' import { analytics } from '../../utils/analytics' import { isSiteBlockedForParse } from '../../utils/blocked' +import { enqueueBulkAction } from '../../utils/createTask' import { authorized } from '../../utils/gql-utils' import { cleanUrl, @@ -112,10 +112,6 @@ import { parsePreparedContent, } from '../../utils/parser' import { getStorageFileDetails } from '../../utils/uploads' -import { - clearCachedReadingPosition, - fetchCachedReadingPosition, -} from '../../services/cached_reading_position' export enum ArticleFormat { Markdown = 'markdown', @@ -879,10 +875,11 @@ export const bulkActionResolver = authorized< }, }) + const batchSize = 100 const useFolders = query.includes('use:folders') const searchArgs = { query, - size: 0, + size: batchSize, useFolders, } const searchResult = await searchLibraryItems(searchArgs, uid) @@ -892,34 +889,35 @@ export const bulkActionResolver = authorized< return { success: true } } - const batchSize = 100 if (count <= batchSize) { // if there are less than 100 items, update them synchronously - await batchUpdateLibraryItems(action, searchArgs, uid, labelIds, args) + const libraryItemIds = searchResult.libraryItems.map((item) => item.id) + await batchUpdateLibraryItems( + action, + libraryItemIds, + uid, + labelIds, + args + ) return { success: true } } // if there are more than 100 items, update them asynchronously - const queue = await getBackendQueue() - if (!queue) { - throw new Error('Queue not initialized') - } - const data = { userId: uid, action, - labelIds, + labelIds: labelIds || undefined, query, count, args, batchSize, useFolders, } - await queue.add(BULK_ACTION_JOB_NAME, data, { - attempts: 1, - priority: 10, - }) + const job = await enqueueBulkAction(data) + if (!job) { + return { errorCodes: [BulkActionErrorCode.BadRequest] } + } return { success: true } } catch (error) { diff --git a/packages/api/src/services/library_item.ts b/packages/api/src/services/library_item.ts index a695e0cd5..c2cc0857b 100644 --- a/packages/api/src/services/library_item.ts +++ b/packages/api/src/services/library_item.ts @@ -2,6 +2,7 @@ import { ExpressionToken, LiqeQuery } from '@omnivore/liqe' import { DateTime } from 'luxon' import { DeepPartial, FindOptionsWhere, ObjectLiteral } from 'typeorm' import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity' +import { ReadingProgressDataSource } from '../datasources/reading_progress_data_source' import { Highlight } from '../entity/highlight' import { Label } from '../entity/label' import { LibraryItem, LibraryItemState } from '../entity/library_item' @@ -102,6 +103,20 @@ interface Select { alias?: string } +const readingProgressDataSource = new ReadingProgressDataSource() + +const markItemAsRead = async (libraryItemId: string, userId: string) => { + return await readingProgressDataSource.updateReadingProgress( + userId, + libraryItemId, + { + readingProgressPercent: 100, + readingProgressTopPercent: 100, + readingProgressAnchorIndex: undefined, + } + ) +} + const handleNoCase = (value: string) => { const keywordRegexMap: Record = { highlight: /^highlight(s)?$/i, @@ -924,11 +939,15 @@ export const countByCreatedAt = async ( export const batchUpdateLibraryItems = async ( action: BulkActionType, - searchArgs: SearchArgs, + libraryItemIds: string[], userId: string, labelIds?: string[] | null, args?: unknown ) => { + if (libraryItemIds.length === 0) { + return + } + interface FolderArguments { folder: string } @@ -940,7 +959,6 @@ export const batchUpdateLibraryItems = async ( const now = new Date().toISOString() // build the script let values: Record = {} - let addLabels = false switch (action) { case BulkActionType.Archive: values = { @@ -955,15 +973,23 @@ export const batchUpdateLibraryItems = async ( } break case BulkActionType.AddLabels: - addLabels = true - break - case BulkActionType.MarkAsRead: - values = { - readAt: now, - readingProgressTopPercent: 100, - readingProgressBottomPercent: 100, + if (!labelIds || labelIds.length === 0) { + throw new Error('Labels are required for this action') } - break + + // add labels to library items + for (const libraryItemId of libraryItemIds) { + await addLabelsToLibraryItem(labelIds, libraryItemId, userId) + } + + return + case BulkActionType.MarkAsRead: + // update reading progress for library items + for (const libraryItemId of libraryItemIds) { + await markItemAsRead(libraryItemId, userId) + } + + return case BulkActionType.MoveToFolder: if (!args || !isFolderArguments(args)) { throw new Error('Invalid arguments') @@ -979,42 +1005,8 @@ export const batchUpdateLibraryItems = async ( throw new Error('Invalid bulk action') } - if (!searchArgs.query) { - throw new Error('Search query is required') - } - - const searchQuery = parseSearchQuery(searchArgs.query) - const parameters: ObjectLiteral[] = [] - const query = buildQuery(searchQuery, parameters) - - await authTrx( - async (tx) => { - const queryBuilder = tx - .createQueryBuilder(LibraryItem, 'library_item') - .where('library_item.user_id = :userId', { userId }) - - if (query) { - queryBuilder - .andWhere(`(${query})`) - .setParameters(paramtersToObject(parameters)) - } - - if (addLabels) { - if (!labelIds) { - throw new Error('Labels are required for this action') - } - - const libraryItems = await queryBuilder.getMany() - // add labels to library items - for (const libraryItem of libraryItems) { - await addLabelsToLibraryItem(labelIds, libraryItem.id, userId) - } - - return - } - - await queryBuilder.update(LibraryItem).set(values).execute() - }, + return authTrx( + async (tx) => tx.getRepository(LibraryItem).update(libraryItemIds, values), undefined, userId ) diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index 8b4d57d1a..b08ce1083 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -14,6 +14,7 @@ import { ArticleSavingRequestStatus, CreateLabelInput, } from '../generated/graphql' +import { BulkActionData, BULK_ACTION_JOB_NAME } from '../jobs/bulk_action' import { THUMBNAIL_JOB } from '../jobs/find_thumbnail' import { queueRSSRefreshFeedJob } from '../jobs/rss/refreshAllFeeds' import { TriggerRuleJobData, TRIGGER_RULE_JOB_NAME } from '../jobs/trigger_rule' @@ -716,4 +717,25 @@ export const enqueueUpdateHighlight = async (data: UpdateHighlightData) => { } } +export const enqueueBulkAction = async (data: BulkActionData) => { + const queue = await getBackendQueue() + if (!queue) { + return undefined + } + + const jobId = `${BULK_ACTION_JOB_NAME}-${data.userId}` + + try { + return queue.add(BULK_ACTION_JOB_NAME, data, { + attempts: 1, + priority: 10, + jobId, // deduplication + removeOnComplete: true, + removeOnFail: true, + }) + } catch (error) { + logger.error('error enqueuing update highlight job', error) + } +} + export default createHttpTaskWithToken