diff --git a/packages/api/src/jobs/prune_trash.ts b/packages/api/src/jobs/prune_trash.ts new file mode 100644 index 000000000..55f54694c --- /dev/null +++ b/packages/api/src/jobs/prune_trash.ts @@ -0,0 +1,14 @@ +import { appDataSource } from '../data_source' + +export const PRUNE_TRASH_JOB = 'prune_trash' + +interface PruneTrashJobData { + numDays: number +} + +export const pruneTrashJob = async (jobData: PruneTrashJobData) => { + // call the stored procedure to delete trash items older than {numDays} days + await appDataSource.query( + `CALL omnivore.batch_delete_trash_items(${jobData.numDays});` + ) +} diff --git a/packages/api/src/jobs/score_library_item.ts b/packages/api/src/jobs/score_library_item.ts index 0dc3152dc..702bba53c 100644 --- a/packages/api/src/jobs/score_library_item.ts +++ b/packages/api/src/jobs/score_library_item.ts @@ -7,7 +7,7 @@ import { enqueueUpdateHomeJob } from '../utils/createTask' import { lanaugeToCode } from '../utils/helpers' import { logger } from '../utils/logger' -export const SCORE_LIBRARY_ITEM_JOB = 'SCORE_LIBRARY_ITEM_JOB' +export const SCORE_LIBRARY_ITEM_JOB = 'score-library-item' export interface ScoreLibraryItemJobData { userId: string diff --git a/packages/api/src/jobs/sync_read_positions.ts b/packages/api/src/jobs/sync_read_positions.ts index 26ec0b639..756d6429d 100644 --- a/packages/api/src/jobs/sync_read_positions.ts +++ b/packages/api/src/jobs/sync_read_positions.ts @@ -6,8 +6,8 @@ import { fetchCachedReadingPositionsAndMembers, reduceCachedReadingPositionMembers, } from '../services/cached_reading_position' -import { logger } from '../utils/logger' import { updateLibraryItemReadingProgress } from '../services/library_item' +import { logger } from '../utils/logger' export const SYNC_READ_POSITIONS_JOB_NAME = 'sync-read-positions' @@ -86,6 +86,10 @@ export const syncReadPositionsJob = async (_data: any) => { const updates = getSyncUpdatesIterator(redis) for await (const value of updates) { - await syncReadPosition(value) + try { + await syncReadPosition(value) + } catch (error) { + logger.error('error syncing reading position', { error, value }) + } } } diff --git a/packages/api/src/jobs/update_home.ts b/packages/api/src/jobs/update_home.ts index c80e18656..8994fee20 100644 --- a/packages/api/src/jobs/update_home.ts +++ b/packages/api/src/jobs/update_home.ts @@ -13,7 +13,7 @@ import { findActiveUser } from '../services/user' import { lanaugeToCode } from '../utils/helpers' import { logError, logger } from '../utils/logger' -export const UPDATE_HOME_JOB = 'UPDATE_HOME_JOB' +export const UPDATE_HOME_JOB = 'update-home' export interface UpdateHomeJobData { userId: string @@ -438,7 +438,7 @@ const mixHomeItems = ( // use prometheus to monitor the latency of each step const latency = new client.Histogram({ - name: 'update_home_latency', + name: 'omnivore_update_home_latency', help: 'Latency of update home job', labelNames: ['step'], buckets: [0.1, 0.5, 1, 2, 5, 10], diff --git a/packages/api/src/jobs/upload_content.ts b/packages/api/src/jobs/upload_content.ts index 2389aad4e..4ffb5bdb9 100644 --- a/packages/api/src/jobs/upload_content.ts +++ b/packages/api/src/jobs/upload_content.ts @@ -4,7 +4,7 @@ import { logger } from '../utils/logger' import { htmlToHighlightedMarkdown, htmlToMarkdown } from '../utils/parser' import { isFileExists, uploadToBucket } from '../utils/uploads' -export const UPLOAD_CONTENT_JOB = 'UPLOAD_CONTENT_JOB' +export const UPLOAD_CONTENT_JOB = 'upload-content' export type ContentFormat = | 'markdown' diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index 4c4643070..00440da53 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -50,6 +50,7 @@ import { PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME, PROCESS_YOUTUBE_VIDEO_JOB_NAME, } from './jobs/process-youtube-video' +import { pruneTrashJob, PRUNE_TRASH_JOB } from './jobs/prune_trash' import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds' import { refreshFeed } from './jobs/rss/refreshFeed' import { savePageJob } from './jobs/save_page' @@ -214,6 +215,8 @@ export const createWorker = (connection: ConnectionOptions) => return scoreLibraryItem(job.data) case GENERATE_PREVIEW_CONTENT_JOB: return generatePreviewContent(job.data) + case PRUNE_TRASH_JOB: + return pruneTrashJob(job.data) default: logger.warning(`[queue-processor] unhandled job: ${job.name}`) } @@ -227,6 +230,7 @@ export const createWorker = (connection: ConnectionOptions) => connection, autorun: true, // start processing jobs immediately lockDuration: 60_000, // 1 minute + concurrency: 10, } ) diff --git a/packages/api/src/routers/svc/links.ts b/packages/api/src/routers/svc/links.ts index f247b6ffa..bd35351cc 100644 --- a/packages/api/src/routers/svc/links.ts +++ b/packages/api/src/routers/svc/links.ts @@ -2,12 +2,10 @@ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ /* eslint-disable @typescript-eslint/explicit-module-boundary-types */ import express from 'express' -import { LessThan } from 'typeorm' -import { LibraryItemState } from '../../entity/library_item' import { readPushSubscription } from '../../pubsub' import { userRepository } from '../../repository/user' import { createPageSaveRequest } from '../../services/create_page_save_request' -import { deleteLibraryItemsByAdmin } from '../../services/library_item' +import { enqueuePruneTrashJob } from '../../utils/createTask' import { logger } from '../../utils/logger' interface CreateLinkRequestMessage { @@ -16,28 +14,7 @@ interface CreateLinkRequestMessage { } type PruneMessage = { - expireInDays: number - folder?: string - state?: LibraryItemState -} - -const isPruneMessage = (obj: any): obj is PruneMessage => 'expireInDays' in obj - -const getPruneMessage = (msgStr: string): PruneMessage => { - try { - const obj = JSON.parse(msgStr) as unknown - if (isPruneMessage(obj)) { - return obj - } - } catch (err) { - logger.error('error deserializing event: ', { msgStr, err }) - } - - // default to prune following folder items older than 30 days - return { - folder: 'following', - expireInDays: 30, - } + ttlInDays?: number } export function linkServiceRouter() { @@ -84,28 +61,28 @@ export function linkServiceRouter() { } }) - router.post('/prune', async (req, res) => { + router.post('/pruneTrash', async (req, res) => { const { message: msgStr, expired } = readPushSubscription(req) - if (!msgStr) { - return res.status(200).send('Bad Request') - } - if (expired) { logger.info('discarding expired message') return res.status(200).send('Expired') } - const pruneMessage = getPruneMessage(msgStr) - const expireTime = pruneMessage.expireInDays * 1000 * 60 * 60 * 24 // convert days to milliseconds + // default to prune trash items older than 14 days + let ttlInDays = 14 + + if (msgStr) { + const pruneMessage = JSON.parse(msgStr) as PruneMessage + + if (pruneMessage.ttlInDays) { + ttlInDays = pruneMessage.ttlInDays + } + } try { - const result = await deleteLibraryItemsByAdmin({ - folder: pruneMessage.folder, - state: pruneMessage.state, - updatedAt: LessThan(new Date(Date.now() - expireTime)), - }) - logger.info('prune result', result) + const job = await enqueuePruneTrashJob(ttlInDays) + logger.info('enqueue prune trash job', { id: job?.id }) return res.sendStatus(200) } catch (error) { diff --git a/packages/api/src/services/library_item.ts b/packages/api/src/services/library_item.ts index fb5a5c2cf..2727cedf5 100644 --- a/packages/api/src/services/library_item.ts +++ b/packages/api/src/services/library_item.ts @@ -1354,17 +1354,6 @@ export const deleteLibraryItemsByUserId = async (userId: string) => { ) } -export const deleteLibraryItemsByAdmin = async ( - criteria: FindOptionsWhere -) => { - return authTrx( - async (tx) => tx.withRepository(libraryItemRepository).delete(criteria), - undefined, - undefined, - 'admin' - ) -} - export const batchDelete = async (criteria: FindOptionsWhere) => { const batchSize = 1000 @@ -1392,6 +1381,30 @@ export const batchDelete = async (criteria: FindOptionsWhere) => { return authTrx(async (t) => t.query(sql)) } +export const batchDeleteAllTrash = async () => { + const sql = ` + DO $$ + DECLARE + user_record RECORD; + user_cursor CURSOR FOR SELECT id FROM omnivore.user WHERE status = 'ACTIVE'; -- Adjust the condition as needed + BEGIN + OPEN user_cursor; + + LOOP + FETCH NEXT FROM user_cursor INTO user_record; + EXIT WHEN NOT FOUND; + + DELETE FROM omnivore.library_item WHERE user_id = user_record.id AND state = 'DELETED' AND deleted_at < '2023-01-01'; + + RETURN NEXT; + END LOOP; + + CLOSE user_cursor; + END $$;` + + return authTrx(async (t) => t.query(sql)) +} + export const findLibraryItemIdsByLabelId = async ( labelId: string, userId: string diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index 212628d38..2957926fd 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -41,6 +41,7 @@ import { PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME, PROCESS_YOUTUBE_VIDEO_JOB_NAME, } from '../jobs/process-youtube-video' +import { PRUNE_TRASH_JOB } from '../jobs/prune_trash' import { queueRSSRefreshFeedJob, REFRESH_ALL_FEEDS_JOB_NAME, @@ -112,6 +113,7 @@ export const getJobPriority = (jobName: string): number => { case REFRESH_ALL_FEEDS_JOB_NAME: case THUMBNAIL_JOB: case GENERATE_PREVIEW_CONTENT_JOB: + case PRUNE_TRASH_JOB: return 100 default: @@ -1051,4 +1053,23 @@ export const enqueueGeneratePreviewContentJob = async ( ) } +export const enqueuePruneTrashJob = async (numDays: number) => { + const queue = await getBackendQueue() + if (!queue) { + return undefined + } + + return queue.add( + PRUNE_TRASH_JOB, + { numDays }, + { + jobId: `${PRUNE_TRASH_JOB}_${numDays}_${JOB_VERSION}`, + removeOnComplete: true, + removeOnFail: true, + priority: getJobPriority(PRUNE_TRASH_JOB), + attempts: 3, + } + ) +} + export default createHttpTaskWithToken diff --git a/packages/api/src/utils/highlightGenerator.ts b/packages/api/src/utils/highlightGenerator.ts index 8f9da4e1f..4cc891f33 100644 --- a/packages/api/src/utils/highlightGenerator.ts +++ b/packages/api/src/utils/highlightGenerator.ts @@ -49,8 +49,6 @@ type FillNodeResponse = { } function getTextNodesBetween(rootNode: Node, startNode: Node, endNode: Node) { - const maxTime = 10_000 // 10 seconds - const start = Date.now() let textNodeStartingPoint = 0 let articleText = '' let newParagraph = false @@ -70,13 +68,6 @@ function getTextNodesBetween(rootNode: Node, startNode: Node, endNode: Node) { } function getTextNodes(node: Node) { - // If the function takes too long, throw an error - if (Date.now() - start > maxTime) { - const error = new Error('getTextNodes Timeout') - logger.error(error) - throw error - } - if (!node) return if (node == startNode) { diff --git a/packages/db/migrations/0181.do.batch_delete_trash_items.sql b/packages/db/migrations/0181.do.batch_delete_trash_items.sql new file mode 100755 index 000000000..afcdcc880 --- /dev/null +++ b/packages/db/migrations/0181.do.batch_delete_trash_items.sql @@ -0,0 +1,39 @@ +-- Type: DO +-- Name: batch_delete_trash_items +-- Description: Create a function to batch delete library items in trash + +BEGIN; + +CREATE OR REPLACE PROCEDURE omnivore.batch_delete_trash_items( + num_days INT +) +LANGUAGE plpgsql AS $$ +DECLARE + user_record RECORD; + user_cursor CURSOR FOR + SELECT + id + FROM + omnivore.user + WHERE + status = 'ACTIVE'; +BEGIN + FOR user_record IN user_cursor LOOP + BEGIN + + -- For Row Level Security + PERFORM omnivore.set_claims(user_record.id, 'omnivore_user'); + + DELETE FROM omnivore.library_item + WHERE + user_id = user_record.id + AND state = 'DELETED' + AND deleted_at < NOW() - INTERVAL '1 day' * num_days; + + COMMIT; + END; + END LOOP; +END +$$; + +COMMIT; diff --git a/packages/db/migrations/0181.undo.batch_delete_trash_items.sql b/packages/db/migrations/0181.undo.batch_delete_trash_items.sql new file mode 100755 index 000000000..3734a1db1 --- /dev/null +++ b/packages/db/migrations/0181.undo.batch_delete_trash_items.sql @@ -0,0 +1,9 @@ +-- Type: UNDO +-- Name: batch_delete_trash_items +-- Description: Create a function to batch delete library items in trash + +BEGIN; + +DROP PROCEDURE IF EXISTS omnivore.batch_delete_trash_items(); + +COMMIT;