Merge pull request #4052 from omnivore-app/feature/prune-trash-items

feature/prune trash items
This commit is contained in:
Hongbo Wu
2024-06-13 17:58:18 +08:00
committed by GitHub
12 changed files with 136 additions and 64 deletions

View File

@ -0,0 +1,14 @@
import { appDataSource } from '../data_source'
export const PRUNE_TRASH_JOB = 'prune_trash'
interface PruneTrashJobData {
numDays: number
}
export const pruneTrashJob = async (jobData: PruneTrashJobData) => {
// call the stored procedure to delete trash items older than {numDays} days
await appDataSource.query(
`CALL omnivore.batch_delete_trash_items(${jobData.numDays});`
)
}

View File

@ -7,7 +7,7 @@ import { enqueueUpdateHomeJob } from '../utils/createTask'
import { lanaugeToCode } from '../utils/helpers'
import { logger } from '../utils/logger'
export const SCORE_LIBRARY_ITEM_JOB = 'SCORE_LIBRARY_ITEM_JOB'
export const SCORE_LIBRARY_ITEM_JOB = 'score-library-item'
export interface ScoreLibraryItemJobData {
userId: string

View File

@ -6,8 +6,8 @@ import {
fetchCachedReadingPositionsAndMembers,
reduceCachedReadingPositionMembers,
} from '../services/cached_reading_position'
import { logger } from '../utils/logger'
import { updateLibraryItemReadingProgress } from '../services/library_item'
import { logger } from '../utils/logger'
export const SYNC_READ_POSITIONS_JOB_NAME = 'sync-read-positions'
@ -86,6 +86,10 @@ export const syncReadPositionsJob = async (_data: any) => {
const updates = getSyncUpdatesIterator(redis)
for await (const value of updates) {
await syncReadPosition(value)
try {
await syncReadPosition(value)
} catch (error) {
logger.error('error syncing reading position', { error, value })
}
}
}

View File

@ -13,7 +13,7 @@ import { findActiveUser } from '../services/user'
import { lanaugeToCode } from '../utils/helpers'
import { logError, logger } from '../utils/logger'
export const UPDATE_HOME_JOB = 'UPDATE_HOME_JOB'
export const UPDATE_HOME_JOB = 'update-home'
export interface UpdateHomeJobData {
userId: string
@ -438,7 +438,7 @@ const mixHomeItems = (
// use prometheus to monitor the latency of each step
const latency = new client.Histogram({
name: 'update_home_latency',
name: 'omnivore_update_home_latency',
help: 'Latency of update home job',
labelNames: ['step'],
buckets: [0.1, 0.5, 1, 2, 5, 10],

View File

@ -4,7 +4,7 @@ import { logger } from '../utils/logger'
import { htmlToHighlightedMarkdown, htmlToMarkdown } from '../utils/parser'
import { isFileExists, uploadToBucket } from '../utils/uploads'
export const UPLOAD_CONTENT_JOB = 'UPLOAD_CONTENT_JOB'
export const UPLOAD_CONTENT_JOB = 'upload-content'
export type ContentFormat =
| 'markdown'

View File

@ -50,6 +50,7 @@ import {
PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME,
PROCESS_YOUTUBE_VIDEO_JOB_NAME,
} from './jobs/process-youtube-video'
import { pruneTrashJob, PRUNE_TRASH_JOB } from './jobs/prune_trash'
import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds'
import { refreshFeed } from './jobs/rss/refreshFeed'
import { savePageJob } from './jobs/save_page'
@ -214,6 +215,8 @@ export const createWorker = (connection: ConnectionOptions) =>
return scoreLibraryItem(job.data)
case GENERATE_PREVIEW_CONTENT_JOB:
return generatePreviewContent(job.data)
case PRUNE_TRASH_JOB:
return pruneTrashJob(job.data)
default:
logger.warning(`[queue-processor] unhandled job: ${job.name}`)
}
@ -227,6 +230,7 @@ export const createWorker = (connection: ConnectionOptions) =>
connection,
autorun: true, // start processing jobs immediately
lockDuration: 60_000, // 1 minute
concurrency: 10,
}
)

View File

@ -2,12 +2,10 @@
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
/* eslint-disable @typescript-eslint/explicit-module-boundary-types */
import express from 'express'
import { LessThan } from 'typeorm'
import { LibraryItemState } from '../../entity/library_item'
import { readPushSubscription } from '../../pubsub'
import { userRepository } from '../../repository/user'
import { createPageSaveRequest } from '../../services/create_page_save_request'
import { deleteLibraryItemsByAdmin } from '../../services/library_item'
import { enqueuePruneTrashJob } from '../../utils/createTask'
import { logger } from '../../utils/logger'
interface CreateLinkRequestMessage {
@ -16,28 +14,7 @@ interface CreateLinkRequestMessage {
}
type PruneMessage = {
expireInDays: number
folder?: string
state?: LibraryItemState
}
const isPruneMessage = (obj: any): obj is PruneMessage => 'expireInDays' in obj
const getPruneMessage = (msgStr: string): PruneMessage => {
try {
const obj = JSON.parse(msgStr) as unknown
if (isPruneMessage(obj)) {
return obj
}
} catch (err) {
logger.error('error deserializing event: ', { msgStr, err })
}
// default to prune following folder items older than 30 days
return {
folder: 'following',
expireInDays: 30,
}
ttlInDays?: number
}
export function linkServiceRouter() {
@ -84,28 +61,28 @@ export function linkServiceRouter() {
}
})
router.post('/prune', async (req, res) => {
router.post('/pruneTrash', async (req, res) => {
const { message: msgStr, expired } = readPushSubscription(req)
if (!msgStr) {
return res.status(200).send('Bad Request')
}
if (expired) {
logger.info('discarding expired message')
return res.status(200).send('Expired')
}
const pruneMessage = getPruneMessage(msgStr)
const expireTime = pruneMessage.expireInDays * 1000 * 60 * 60 * 24 // convert days to milliseconds
// default to prune trash items older than 14 days
let ttlInDays = 14
if (msgStr) {
const pruneMessage = JSON.parse(msgStr) as PruneMessage
if (pruneMessage.ttlInDays) {
ttlInDays = pruneMessage.ttlInDays
}
}
try {
const result = await deleteLibraryItemsByAdmin({
folder: pruneMessage.folder,
state: pruneMessage.state,
updatedAt: LessThan(new Date(Date.now() - expireTime)),
})
logger.info('prune result', result)
const job = await enqueuePruneTrashJob(ttlInDays)
logger.info('enqueue prune trash job', { id: job?.id })
return res.sendStatus(200)
} catch (error) {

View File

@ -1354,17 +1354,6 @@ export const deleteLibraryItemsByUserId = async (userId: string) => {
)
}
export const deleteLibraryItemsByAdmin = async (
criteria: FindOptionsWhere<LibraryItem>
) => {
return authTrx(
async (tx) => tx.withRepository(libraryItemRepository).delete(criteria),
undefined,
undefined,
'admin'
)
}
export const batchDelete = async (criteria: FindOptionsWhere<LibraryItem>) => {
const batchSize = 1000
@ -1392,6 +1381,30 @@ export const batchDelete = async (criteria: FindOptionsWhere<LibraryItem>) => {
return authTrx(async (t) => t.query(sql))
}
export const batchDeleteAllTrash = async () => {
const sql = `
DO $$
DECLARE
user_record RECORD;
user_cursor CURSOR FOR SELECT id FROM omnivore.user WHERE status = 'ACTIVE'; -- Adjust the condition as needed
BEGIN
OPEN user_cursor;
LOOP
FETCH NEXT FROM user_cursor INTO user_record;
EXIT WHEN NOT FOUND;
DELETE FROM omnivore.library_item WHERE user_id = user_record.id AND state = 'DELETED' AND deleted_at < '2023-01-01';
RETURN NEXT;
END LOOP;
CLOSE user_cursor;
END $$;`
return authTrx(async (t) => t.query(sql))
}
export const findLibraryItemIdsByLabelId = async (
labelId: string,
userId: string

View File

@ -41,6 +41,7 @@ import {
PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME,
PROCESS_YOUTUBE_VIDEO_JOB_NAME,
} from '../jobs/process-youtube-video'
import { PRUNE_TRASH_JOB } from '../jobs/prune_trash'
import {
queueRSSRefreshFeedJob,
REFRESH_ALL_FEEDS_JOB_NAME,
@ -112,6 +113,7 @@ export const getJobPriority = (jobName: string): number => {
case REFRESH_ALL_FEEDS_JOB_NAME:
case THUMBNAIL_JOB:
case GENERATE_PREVIEW_CONTENT_JOB:
case PRUNE_TRASH_JOB:
return 100
default:
@ -1051,4 +1053,23 @@ export const enqueueGeneratePreviewContentJob = async (
)
}
export const enqueuePruneTrashJob = async (numDays: number) => {
const queue = await getBackendQueue()
if (!queue) {
return undefined
}
return queue.add(
PRUNE_TRASH_JOB,
{ numDays },
{
jobId: `${PRUNE_TRASH_JOB}_${numDays}_${JOB_VERSION}`,
removeOnComplete: true,
removeOnFail: true,
priority: getJobPriority(PRUNE_TRASH_JOB),
attempts: 3,
}
)
}
export default createHttpTaskWithToken

View File

@ -49,8 +49,6 @@ type FillNodeResponse = {
}
function getTextNodesBetween(rootNode: Node, startNode: Node, endNode: Node) {
const maxTime = 10_000 // 10 seconds
const start = Date.now()
let textNodeStartingPoint = 0
let articleText = ''
let newParagraph = false
@ -70,13 +68,6 @@ function getTextNodesBetween(rootNode: Node, startNode: Node, endNode: Node) {
}
function getTextNodes(node: Node) {
// If the function takes too long, throw an error
if (Date.now() - start > maxTime) {
const error = new Error('getTextNodes Timeout')
logger.error(error)
throw error
}
if (!node) return
if (node == startNode) {

View File

@ -0,0 +1,39 @@
-- Type: DO
-- Name: batch_delete_trash_items
-- Description: Create a function to batch delete library items in trash
BEGIN;
CREATE OR REPLACE PROCEDURE omnivore.batch_delete_trash_items(
num_days INT
)
LANGUAGE plpgsql AS $$
DECLARE
user_record RECORD;
user_cursor CURSOR FOR
SELECT
id
FROM
omnivore.user
WHERE
status = 'ACTIVE';
BEGIN
FOR user_record IN user_cursor LOOP
BEGIN
-- For Row Level Security
PERFORM omnivore.set_claims(user_record.id, 'omnivore_user');
DELETE FROM omnivore.library_item
WHERE
user_id = user_record.id
AND state = 'DELETED'
AND deleted_at < NOW() - INTERVAL '1 day' * num_days;
COMMIT;
END;
END LOOP;
END
$$;
COMMIT;

View File

@ -0,0 +1,9 @@
-- Type: UNDO
-- Name: batch_delete_trash_items
-- Description: Create a function to batch delete library items in trash
BEGIN;
DROP PROCEDURE IF EXISTS omnivore.batch_delete_trash_items();
COMMIT;