From acec3b1acb40ad7777b09a46642966cfffea5792 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Tue, 22 Mar 2022 18:08:08 +0800 Subject: [PATCH] Use lower priority queue for large imports * add a count query in elastic * if priority is not set, check the rate limit of user's libary - if 5 articles added in the last minute: use low queue - default: use normal queue --- packages/api/src/elastic/index.ts | 39 ++++++++++++++++++- packages/api/src/routers/article_router.ts | 8 +--- packages/api/src/routers/svc/links.ts | 7 +--- .../src/services/create_page_save_request.ts | 17 +++++++- packages/api/src/services/save_url.ts | 1 - packages/api/src/utils/createTask.ts | 1 + packages/api/test/elastic/index.test.ts | 28 +++++++++++++ 7 files changed, 84 insertions(+), 17 deletions(-) diff --git a/packages/api/src/elastic/index.ts b/packages/api/src/elastic/index.ts index 5844270fb..1f61398f3 100644 --- a/packages/api/src/elastic/index.ts +++ b/packages/api/src/elastic/index.ts @@ -170,7 +170,6 @@ export const createPage = async ( body: { ...page, updatedAt: new Date(), - createdAt: new Date(), savedAt: new Date(), }, refresh: ctx.refresh, @@ -485,6 +484,44 @@ export const searchPages = async ( } } +export const countByCreatedAt = async ( + userId: string, + from?: number, + to?: number +): Promise => { + try { + const { body } = await client.count({ + index: INDEX_ALIAS, + body: { + query: { + bool: { + filter: [ + { + term: { + userId, + }, + }, + { + range: { + createdAt: { + gte: from, + lte: to, + }, + }, + }, + ], + }, + }, + }, + }) + + return body.count as number + } catch (e) { + console.error('failed to count pages in elastic', e) + return 0 + } +} + export const initElasticsearch = async (): Promise => { try { const response = await client.info() diff --git a/packages/api/src/routers/article_router.ts b/packages/api/src/routers/article_router.ts index b66672ae4..396319222 100644 --- a/packages/api/src/routers/article_router.ts +++ b/packages/api/src/routers/article_router.ts @@ -47,13 +47,7 @@ export function articleRouter() { const requestId = uuidv4() const models = initModels(kx, false) - const result = await createPageSaveRequest( - uid, - url, - models, - 'high', - requestId - ) + const result = await createPageSaveRequest(uid, url, models, requestId) if (isSiteBlockedForParse(url)) { return res diff --git a/packages/api/src/routers/svc/links.ts b/packages/api/src/routers/svc/links.ts index 93ffd189a..b656203c0 100644 --- a/packages/api/src/routers/svc/links.ts +++ b/packages/api/src/routers/svc/links.ts @@ -42,12 +42,7 @@ export function linkServiceRouter() { const models = initModels(kx, false) try { - const request = await createPageSaveRequest( - msg.userId, - msg.url, - models, - 'low' - ) + const request = await createPageSaveRequest(msg.userId, msg.url, models) console.log('create link request', request) res.status(200).send(request) diff --git a/packages/api/src/services/create_page_save_request.ts b/packages/api/src/services/create_page_save_request.ts index 0120155bf..be4a81b16 100644 --- a/packages/api/src/services/create_page_save_request.ts +++ b/packages/api/src/services/create_page_save_request.ts @@ -9,9 +9,19 @@ import { } from '../generated/graphql' import { articleSavingRequestDataToArticleSavingRequest } from '../utils/helpers' import * as privateIpLib from 'private-ip' +import { countByCreatedAt } from '../elastic' const isPrivateIP = privateIpLib.default +// 5 articles added in the last minute: use low queue +// default: use normal queue +const getPriorityByRateLimit = async ( + userId: string +): Promise<'low' | 'high'> => { + const count = await countByCreatedAt(userId, Date.now() - 60 * 1000) + return count >= 5 ? 'low' : 'high' +} + export const validateUrl = (url: string): URL => { const u = new URL(url) // Make sure the URL is http or https @@ -43,8 +53,8 @@ export const createPageSaveRequest = async ( userId: string, url: string, models: DataModels, - priority: 'low' | 'high' = 'high', - articleSavingRequestId = uuidv4() + articleSavingRequestId = uuidv4(), + priority?: 'low' | 'high' ): Promise => { try { validateUrl(url) @@ -63,6 +73,9 @@ export const createPageSaveRequest = async ( }) } + // get priority by checking rate limit if not specified + priority = priority || (await getPriorityByRateLimit(userId)) + const createdTaskName = await enqueueParseRequest( url, userId, diff --git a/packages/api/src/services/save_url.ts b/packages/api/src/services/save_url.ts index cf382f187..2df37cc97 100644 --- a/packages/api/src/services/save_url.ts +++ b/packages/api/src/services/save_url.ts @@ -20,7 +20,6 @@ export const saveUrl = async ( saver.id, input.url, ctx.models, - 'high', input.clientRequestId ) } catch (error) { diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index 75eed7395..5b37f604b 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -204,6 +204,7 @@ export const deleteTask = async ( * @param url - URL address of the article to parse * @param userId - Id of the user authorized * @param saveRequestId - Id of the article_saving_request table record + * @param priority - Priority of the task * @returns Name of the task created */ export const enqueueParseRequest = async ( diff --git a/packages/api/test/elastic/index.test.ts b/packages/api/test/elastic/index.test.ts index 22ac48787..2d6983114 100644 --- a/packages/api/test/elastic/index.test.ts +++ b/packages/api/test/elastic/index.test.ts @@ -1,6 +1,7 @@ import 'mocha' import { addLabelInPage, + countByCreatedAt, createPage, deletePage, getPageById, @@ -178,4 +179,31 @@ describe('elastic api', () => { }) }) }) + + describe('countByCreatedAt', () => { + const createdAt = Date.now() - 60 * 60 * 24 * 1000 + + before(async () => { + const newPageData: Page = { + id: '', + hash: 'hash', + userId: userId, + pageType: PageType.Article, + title: 'test', + content: 'test', + slug: 'test', + createdAt: new Date(createdAt), + readingProgressPercent: 0, + readingProgressAnchorIndex: 0, + url: 'https://blog.omnivore.app/testCount', + } + + await createPage(newPageData, ctx) + }) + + it('counts pages by createdAt', async () => { + const count = await countByCreatedAt(userId, createdAt, createdAt) + expect(count).to.eq(1) + }) + }) })