add a REST API to trigger the expire folder job

This commit is contained in:
Hongbo Wu
2024-06-13 17:57:29 +08:00
parent 6f496b9336
commit 88e3d648c8
15 changed files with 115 additions and 160 deletions

View File

@ -363,7 +363,6 @@ export type CreateFolderPolicyInput = {
action: FolderPolicyAction;
afterDays: Scalars['Int'];
folder: Scalars['String'];
minimumItems?: InputMaybe<Scalars['Int']>;
};
export type CreateFolderPolicyResult = CreateFolderPolicyError | CreateFolderPolicySuccess;
@ -1145,7 +1144,6 @@ export type FolderPolicy = {
createdAt: Scalars['Date'];
folder: Scalars['String'];
id: Scalars['ID'];
minimumItems: Scalars['Int'];
updatedAt: Scalars['Date'];
};
@ -3661,7 +3659,6 @@ export type UpdateFolderPolicyInput = {
action?: InputMaybe<FolderPolicyAction>;
afterDays?: InputMaybe<Scalars['Int']>;
id: Scalars['ID'];
minimumItems?: InputMaybe<Scalars['Int']>;
};
export type UpdateFolderPolicyResult = UpdateFolderPolicyError | UpdateFolderPolicySuccess;
@ -6191,7 +6188,6 @@ export type FolderPolicyResolvers<ContextType = ResolverContext, ParentType exte
createdAt?: Resolver<ResolversTypes['Date'], ParentType, ContextType>;
folder?: Resolver<ResolversTypes['String'], ParentType, ContextType>;
id?: Resolver<ResolversTypes['ID'], ParentType, ContextType>;
minimumItems?: Resolver<ResolversTypes['Int'], ParentType, ContextType>;
updatedAt?: Resolver<ResolversTypes['Date'], ParentType, ContextType>;
__isTypeOf?: IsTypeOfResolverFn<ParentType, ContextType>;
};

View File

@ -320,7 +320,6 @@ input CreateFolderPolicyInput {
action: FolderPolicyAction!
afterDays: Int!
folder: String!
minimumItems: Int
}
union CreateFolderPolicyResult = CreateFolderPolicyError | CreateFolderPolicySuccess
@ -1029,7 +1028,6 @@ type FolderPolicy {
createdAt: Date!
folder: String!
id: ID!
minimumItems: Int!
updatedAt: Date!
}
@ -2918,7 +2916,6 @@ input UpdateFolderPolicyInput {
action: FolderPolicyAction
afterDays: Int
id: ID!
minimumItems: Int
}
union UpdateFolderPolicyResult = UpdateFolderPolicyError | UpdateFolderPolicySuccess

View File

@ -0,0 +1,7 @@
import { appDataSource } from '../data_source'
export const EXPIRE_FOLDERS_JOB_NAME = 'expire-folders'
export const expireFoldersJob = async () => {
await appDataSource.query('CALL omnivore.expire_folders()')
}

View File

@ -1,50 +0,0 @@
import { FolderPolicyAction } from '../../entity/folder_policy'
import { BulkActionType } from '../../generated/graphql'
import { findFolderPolicyById } from '../../services/folder_policy'
import { batchUpdateLibraryItems } from '../../services/library_item'
import { logger } from '../../utils/logger'
export const EXPIRE_FOLDER_JOB_NAME = 'EXPIRE_FOLDER_JOB'
interface ExpireFolderJobData {
userId: string
folderPolicyId: string
}
export const expireFolderJob = async (data: ExpireFolderJobData) => {
const { userId, folderPolicyId } = data
const policy = await findFolderPolicyById(userId, folderPolicyId)
if (!policy) {
logger.error('Policy not found')
return
}
logger.info(`Expiring items for policy ${policy.id}`)
const getBulkActionType = (action: FolderPolicyAction) => {
switch (action) {
case FolderPolicyAction.Archive:
return BulkActionType.Archive
case FolderPolicyAction.Delete:
return BulkActionType.Delete
default:
logger.error('Unsupported action')
throw new Error('Unsupported action')
}
}
const action = getBulkActionType(policy.action)
const savedAfter = new Date(
Date.now() - policy.afterDays * 24 * 60 * 60 * 1000
)
await batchUpdateLibraryItems(
action,
{
useFolders: true,
query: `in:${policy.folder} saved:<${savedAfter.toISOString()}`,
},
userId
)
}

View File

@ -1,20 +0,0 @@
import { findFolderPolicies } from '../../services/folder_policy'
import { enqueueExpireFolderJob } from '../../utils/createTask'
import { logError } from '../../utils/logger'
export const EXPIRE_ALL_FOLDERS_JOB_NAME = 'EXPIRE_ALL_FOLDERS_JOB'
export const expireAllFoldersJob = async () => {
const policies = await findFolderPolicies()
// sequentially enqueues a job to expire items for each policy
for (const policy of policies) {
try {
await enqueueExpireFolderJob(policy.userId, policy.id)
} catch (error) {
logError(error)
continue
}
}
}

View File

@ -31,12 +31,11 @@ import {
SAVE_NEWSLETTER_JOB,
} from './jobs/email/inbound_emails'
import { sendEmailJob, SEND_EMAIL_JOB } from './jobs/email/send_email'
import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail'
import { expireFolderJob, EXPIRE_FOLDER_JOB_NAME } from './jobs/folder/expire'
import {
expireAllFoldersJob,
EXPIRE_ALL_FOLDERS_JOB_NAME,
} from './jobs/folder/expire_all'
expireFoldersJob,
EXPIRE_FOLDERS_JOB_NAME,
} from './jobs/expire_folders'
import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail'
import {
generatePreviewContent,
GENERATE_PREVIEW_CONTENT_JOB,
@ -222,10 +221,8 @@ export const createWorker = (connection: ConnectionOptions) =>
return generatePreviewContent(job.data)
case PRUNE_TRASH_JOB:
return pruneTrashJob(job.data)
case EXPIRE_ALL_FOLDERS_JOB_NAME:
return expireAllFoldersJob()
case EXPIRE_FOLDER_JOB_NAME:
return expireFolderJob(job.data)
case EXPIRE_FOLDERS_JOB_NAME:
return expireFoldersJob()
default:
logger.warning(`[queue-processor] unhandled job: ${job.name}`)
}

View File

@ -44,9 +44,9 @@ export const createFolderPolicyResolver = authorized<
CreateFolderPolicyError,
MutationCreateFolderPolicyArgs
>(async (_, { input }, { uid, log }) => {
const { folder, action, afterDays, minimumItems } = input
const { folder, action, afterDays } = input
if (afterDays < 0 || (minimumItems && minimumItems < 0)) {
if (afterDays < 0) {
log.error('Invalid values')
return {
@ -59,7 +59,6 @@ export const createFolderPolicyResolver = authorized<
folder,
action: action as unknown as FolderPolicyAction,
afterDays,
minimumItems: minimumItems ?? 0,
})
return {
@ -72,9 +71,9 @@ export const updateFolderPolicyResolver = authorized<
UpdateFolderPolicyError,
MutationUpdateFolderPolicyArgs
>(async (_, { input }, { log, uid }) => {
const { id, action, afterDays, minimumItems } = input
const { id, action, afterDays } = input
if (!action && !afterDays && !minimumItems) {
if (!action && !afterDays) {
log.error('No fields to update')
return {
@ -82,7 +81,7 @@ export const updateFolderPolicyResolver = authorized<
}
}
if ((afterDays && afterDays < 0) || (minimumItems && minimumItems < 0)) {
if (afterDays && afterDays < 0) {
log.error('Invalid values')
return {
@ -93,7 +92,6 @@ export const updateFolderPolicyResolver = authorized<
const result = await updateFolderPolicy(uid, id, {
action: action ? (action as unknown as FolderPolicyAction) : undefined,
afterDays: afterDays ?? undefined,
minimumItems: minimumItems ?? undefined,
})
if (!result.affected) {

View File

@ -6,6 +6,7 @@ import { readPushSubscription } from '../../pubsub'
import { userRepository } from '../../repository/user'
import { createPageSaveRequest } from '../../services/create_page_save_request'
import { enqueuePruneTrashJob } from '../../utils/createTask'
import { enqueueExpireFoldersJob } from '../../utils/createTask'
import { logger } from '../../utils/logger'
interface CreateLinkRequestMessage {
@ -92,5 +93,25 @@ export function linkServiceRouter() {
}
})
router.post('/expireFolders', async (req, res) => {
const { expired } = readPushSubscription(req)
if (expired) {
logger.info('discarding expired message')
return res.status(200).send('Expired')
}
try {
const job = await enqueueExpireFoldersJob()
logger.info('enqueue job', { id: job?.id })
return res.sendStatus(200)
} catch (error) {
logger.error('error expire folders', error)
return res.sendStatus(500)
}
})
return router
}

View File

@ -3252,7 +3252,6 @@ const schema = gql`
folder: String!
action: FolderPolicyAction!
afterDays: Int!
minimumItems: Int!
createdAt: Date!
updatedAt: Date!
}
@ -3281,7 +3280,6 @@ const schema = gql`
folder: String! @sanitize(minLength: 1, maxLength: 255)
action: FolderPolicyAction!
afterDays: Int!
minimumItems: Int
}
union CreateFolderPolicyResult =
@ -3338,7 +3336,6 @@ const schema = gql`
id: ID!
action: FolderPolicyAction
afterDays: Int
minimumItems: Int
}
# Mutations

View File

@ -6,7 +6,6 @@ export const createFolderPolicy = async (folderPolicy: {
folder: string
action: FolderPolicyAction
afterDays: number
minimumItems: number
}) => {
return getRepository(FolderPolicy).save(folderPolicy)
}
@ -39,10 +38,6 @@ export const deleteFolderPolicy = async (
})
}
export const findFolderPolicies = async () => {
return getRepository(FolderPolicy).find()
}
export const findFolderPolicyById = async (
userId: string,
folderPolicyId: string

View File

@ -28,9 +28,8 @@ import {
import { BulkActionData, BULK_ACTION_JOB_NAME } from '../jobs/bulk_action'
import { CallWebhookJobData, CALL_WEBHOOK_JOB_NAME } from '../jobs/call_webhook'
import { SendEmailJobData, SEND_EMAIL_JOB } from '../jobs/email/send_email'
import { EXPIRE_FOLDERS_JOB_NAME } from '../jobs/expire_folders'
import { THUMBNAIL_JOB } from '../jobs/find_thumbnail'
import { EXPIRE_FOLDER_JOB_NAME } from '../jobs/folder/expire'
import { EXPIRE_ALL_FOLDERS_JOB_NAME } from '../jobs/folder/expire_all'
import { GENERATE_PREVIEW_CONTENT_JOB } from '../jobs/generate_preview_content'
import { EXPORT_ALL_ITEMS_JOB_NAME } from '../jobs/integration/export_all_items'
import {
@ -116,8 +115,7 @@ export const getJobPriority = (jobName: string): number => {
case THUMBNAIL_JOB:
case GENERATE_PREVIEW_CONTENT_JOB:
case PRUNE_TRASH_JOB:
case EXPIRE_ALL_FOLDERS_JOB_NAME:
case EXPIRE_FOLDER_JOB_NAME:
case EXPIRE_FOLDERS_JOB_NAME:
return 100
default:
@ -1076,42 +1074,20 @@ export const enqueuePruneTrashJob = async (numDays: number) => {
)
}
export const enqueueExpireAllFoldersJob = async () => {
export const enqueueExpireFoldersJob = async () => {
const queue = await getBackendQueue()
if (!queue) {
return undefined
}
return queue.add(
EXPIRE_ALL_FOLDERS_JOB_NAME,
EXPIRE_FOLDERS_JOB_NAME,
{},
{
jobId: `${EXPIRE_ALL_FOLDERS_JOB_NAME}_${JOB_VERSION}`,
jobId: `${EXPIRE_FOLDERS_JOB_NAME}_${JOB_VERSION}`,
removeOnComplete: true,
removeOnFail: true,
priority: getJobPriority(EXPIRE_ALL_FOLDERS_JOB_NAME),
attempts: 1,
}
)
}
export const enqueueExpireFolderJob = async (
userId: string,
folderPolicyId: string
) => {
const queue = await getBackendQueue()
if (!queue) {
return undefined
}
return queue.add(
EXPIRE_FOLDER_JOB_NAME,
{ userId, folderPolicyId },
{
jobId: `${EXPIRE_FOLDER_JOB_NAME}_${folderPolicyId}_${JOB_VERSION}`,
removeOnComplete: true,
removeOnFail: true,
priority: getJobPriority(EXPIRE_FOLDER_JOB_NAME),
priority: getJobPriority(EXPIRE_FOLDERS_JOB_NAME),
attempts: 3,
}
)

View File

@ -57,14 +57,12 @@ describe('Folder Policy API', () => {
folder: 'inbox',
action: FolderPolicyAction.Archive,
afterDays: 30,
minimumItems: 10,
})
const existingPolicy1 = await createFolderPolicy({
userId: loginUser.id,
folder: 'following',
action: FolderPolicyAction.Archive,
afterDays: 30,
minimumItems: 10,
})
const res = await graphqlRequest(query, authToken).expect(200)
@ -131,7 +129,6 @@ describe('Folder Policy API', () => {
folder: 'test-folder',
action: FolderPolicyAction.Archive,
afterDays: 30,
minimumItems: 10,
})
})
@ -188,7 +185,6 @@ describe('Folder Policy API', () => {
folder: 'test-folder',
action: FolderPolicyAction.Archive,
afterDays: 30,
minimumItems: 10,
})
})

View File

@ -1,25 +0,0 @@
-- Type: DO
-- Name: folder_policy
-- Description: Create a folder_policy table to contain the folder expiration policies for user and folder
BEGIN;
CREATE TYPE folder_action AS ENUM ('DELETE', 'ARCHIVE');
CREATE TABLE omnivore.folder_policy (
id UUID PRIMARY KEY DEFAULT uuid_generate_v1mc(),
user_id UUID NOT NULL REFERENCES omnivore.user(id) ON DELETE CASCADE,
folder TEXT NOT NULL, -- folder name in lowercase
action folder_action NOT NULL, -- delete or archive
after_days INT NOT NULL, -- number of days after which the action should be taken
minimum_items INT NOT NULL DEFAULT 0, -- minimum number of items to keep in the folder
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE (user_id, folder) -- only one policy per folder per user
);
CREATE TRIGGER update_folder_policy_modtime BEFORE UPDATE ON omnivore.folder_policy FOR EACH ROW EXECUTE PROCEDURE update_updated_at_column();
GRANT SELECT, INSERT, UPDATE, DELETE ON omnivore.folder_policy TO omnivore_user;
COMMIT;

View File

@ -0,0 +1,68 @@
-- Type: DO
-- Name: folder_policy
-- Description: Create a folder_policy table to contain the folder expiration policies for user and folder
BEGIN;
CREATE TYPE folder_action AS ENUM ('DELETE', 'ARCHIVE');
CREATE TABLE omnivore.folder_policy (
id UUID PRIMARY KEY DEFAULT uuid_generate_v1mc(),
user_id UUID NOT NULL REFERENCES omnivore.user(id) ON DELETE CASCADE,
folder TEXT NOT NULL, -- folder name in lowercase
action folder_action NOT NULL, -- delete or archive
after_days INT NOT NULL, -- number of days after which the action should be taken
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE (user_id, folder, action) -- only one policy per user and folder action
);
CREATE TRIGGER update_folder_policy_modtime BEFORE UPDATE ON omnivore.folder_policy FOR EACH ROW EXECUTE PROCEDURE update_updated_at_column();
GRANT SELECT, INSERT, UPDATE, DELETE ON omnivore.folder_policy TO omnivore_user;
CREATE PROCEDURE omnivore.expire_folders()
LANGUAGE plpgsql
AS $$
DECLARE
folder_record RECORD;
folder_name TEXT;
folder_action folder_action;
folder_user_id UUID;
folder_after_days INT;
old_states library_item_state[];
new_state library_item_state;
column_name TEXT;
folder_policy_cursor CURSOR FOR SELECT id, user_id, folder, action, after_days FROM omnivore.folder_policy;
BEGIN
FOR folder_record IN folder_policy_cursor LOOP
folder_user_id := folder_record.user_id;
folder_name := folder_record.folder;
folder_action := folder_record.action;
folder_after_days := folder_record.after_days;
IF folder_action = 'DELETE' THEN
old_states := ARRAY['SUCCEEDED', 'FAILED', 'ARCHIVED', 'PROCESSING', 'CONTENT_NOT_FETCHED'::library_item_state];
new_state := 'DELETED';
column_name := 'deleted_at';
ELSIF folder_action = 'ARCHIVE' THEN
old_states := ARRAY['SUCCEEDED', 'FAILED', 'PROCESSING', 'CONTENT_NOT_FETCHED'::library_item_state];
new_state := 'ARCHIVED';
column_name := 'archived_at';
END IF;
BEGIN
PERFORM omnivore.set_claims(folder_user_id, 'omnivore_user');
EXECUTE format('UPDATE omnivore.library_item '
'SET state = $1, %I = CURRENT_TIMESTAMP '
'WHERE user_id = $2 AND state = ANY ($3) AND folder = $4 AND created_at < CURRENT_TIMESTAMP - INTERVAL ''$5 days''', column_name)
USING new_state, folder_user_id, old_states, folder_name, folder_after_days;
COMMIT;
END;
END LOOP;
END;
$$;
COMMIT;

View File

@ -8,4 +8,6 @@ DROP TABLE omnivore.folder_policy;
DROP TYPE folder_action;
DROP PROCEDURE omnivore.expire_folders();
COMMIT;