Merge pull request #293 from omnivore-app/feature/priority-by-rate-limit
Use lower priority queue for large imports
This commit is contained in:
@ -171,7 +171,6 @@ export const createPage = async (
|
||||
body: {
|
||||
...page,
|
||||
updatedAt: new Date(),
|
||||
createdAt: new Date(),
|
||||
savedAt: new Date(),
|
||||
},
|
||||
refresh: ctx.refresh,
|
||||
@ -493,6 +492,44 @@ export const searchPages = async (
|
||||
}
|
||||
}
|
||||
|
||||
export const countByCreatedAt = async (
|
||||
userId: string,
|
||||
from?: number,
|
||||
to?: number
|
||||
): Promise<number> => {
|
||||
try {
|
||||
const { body } = await client.count({
|
||||
index: INDEX_ALIAS,
|
||||
body: {
|
||||
query: {
|
||||
bool: {
|
||||
filter: [
|
||||
{
|
||||
term: {
|
||||
userId,
|
||||
},
|
||||
},
|
||||
{
|
||||
range: {
|
||||
createdAt: {
|
||||
gte: from,
|
||||
lte: to,
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
return body.count as number
|
||||
} catch (e) {
|
||||
console.error('failed to count pages in elastic', e)
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
export const initElasticsearch = async (): Promise<void> => {
|
||||
try {
|
||||
const response = await client.info()
|
||||
|
||||
@ -47,13 +47,7 @@ export function articleRouter() {
|
||||
|
||||
const requestId = uuidv4()
|
||||
const models = initModels(kx, false)
|
||||
const result = await createPageSaveRequest(
|
||||
uid,
|
||||
url,
|
||||
models,
|
||||
'high',
|
||||
requestId
|
||||
)
|
||||
const result = await createPageSaveRequest(uid, url, models, requestId)
|
||||
|
||||
if (isSiteBlockedForParse(url)) {
|
||||
return res
|
||||
|
||||
@ -42,12 +42,7 @@ export function linkServiceRouter() {
|
||||
const models = initModels(kx, false)
|
||||
|
||||
try {
|
||||
const request = await createPageSaveRequest(
|
||||
msg.userId,
|
||||
msg.url,
|
||||
models,
|
||||
'low'
|
||||
)
|
||||
const request = await createPageSaveRequest(msg.userId, msg.url, models)
|
||||
console.log('create link request', request)
|
||||
|
||||
res.status(200).send(request)
|
||||
|
||||
@ -9,9 +9,19 @@ import {
|
||||
} from '../generated/graphql'
|
||||
import { articleSavingRequestDataToArticleSavingRequest } from '../utils/helpers'
|
||||
import * as privateIpLib from 'private-ip'
|
||||
import { countByCreatedAt } from '../elastic'
|
||||
|
||||
const isPrivateIP = privateIpLib.default
|
||||
|
||||
// 5 articles added in the last minute: use low queue
|
||||
// default: use normal queue
|
||||
const getPriorityByRateLimit = async (
|
||||
userId: string
|
||||
): Promise<'low' | 'high'> => {
|
||||
const count = await countByCreatedAt(userId, Date.now() - 60 * 1000)
|
||||
return count >= 5 ? 'low' : 'high'
|
||||
}
|
||||
|
||||
export const validateUrl = (url: string): URL => {
|
||||
const u = new URL(url)
|
||||
// Make sure the URL is http or https
|
||||
@ -43,8 +53,8 @@ export const createPageSaveRequest = async (
|
||||
userId: string,
|
||||
url: string,
|
||||
models: DataModels,
|
||||
priority: 'low' | 'high' = 'high',
|
||||
articleSavingRequestId = uuidv4()
|
||||
articleSavingRequestId = uuidv4(),
|
||||
priority?: 'low' | 'high'
|
||||
): Promise<ArticleSavingRequest> => {
|
||||
try {
|
||||
validateUrl(url)
|
||||
@ -63,6 +73,9 @@ export const createPageSaveRequest = async (
|
||||
})
|
||||
}
|
||||
|
||||
// get priority by checking rate limit if not specified
|
||||
priority = priority || (await getPriorityByRateLimit(userId))
|
||||
|
||||
const createdTaskName = await enqueueParseRequest(
|
||||
url,
|
||||
userId,
|
||||
|
||||
@ -20,7 +20,6 @@ export const saveUrl = async (
|
||||
saver.id,
|
||||
input.url,
|
||||
ctx.models,
|
||||
'high',
|
||||
input.clientRequestId
|
||||
)
|
||||
} catch (error) {
|
||||
|
||||
@ -204,6 +204,7 @@ export const deleteTask = async (
|
||||
* @param url - URL address of the article to parse
|
||||
* @param userId - Id of the user authorized
|
||||
* @param saveRequestId - Id of the article_saving_request table record
|
||||
* @param priority - Priority of the task
|
||||
* @returns Name of the task created
|
||||
*/
|
||||
export const enqueueParseRequest = async (
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import 'mocha'
|
||||
import {
|
||||
addLabelInPage,
|
||||
countByCreatedAt,
|
||||
createPage,
|
||||
deletePage,
|
||||
getPageById,
|
||||
@ -178,4 +179,31 @@ describe('elastic api', () => {
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('countByCreatedAt', () => {
|
||||
const createdAt = Date.now() - 60 * 60 * 24 * 1000
|
||||
|
||||
before(async () => {
|
||||
const newPageData: Page = {
|
||||
id: '',
|
||||
hash: 'hash',
|
||||
userId: userId,
|
||||
pageType: PageType.Article,
|
||||
title: 'test',
|
||||
content: 'test',
|
||||
slug: 'test',
|
||||
createdAt: new Date(createdAt),
|
||||
readingProgressPercent: 0,
|
||||
readingProgressAnchorIndex: 0,
|
||||
url: 'https://blog.omnivore.app/testCount',
|
||||
}
|
||||
|
||||
await createPage(newPageData, ctx)
|
||||
})
|
||||
|
||||
it('counts pages by createdAt', async () => {
|
||||
const count = await countByCreatedAt(userId, createdAt, createdAt)
|
||||
expect(count).to.eq(1)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user