Merge pull request #4221 from omnivore-app/fix/content-fetch-error
improve content-fetch
This commit is contained in:
@ -27,6 +27,7 @@ export const bulkAction = async (data: BulkActionData) => {
|
||||
for (let offset = 0; offset < count; offset += batchSize) {
|
||||
const searchArgs = {
|
||||
size: batchSize,
|
||||
includePending: true,
|
||||
query: `(${query}) AND updated:*..${now}`, // only process items that have not been updated
|
||||
}
|
||||
|
||||
|
||||
@ -25,7 +25,7 @@ export const libraryItemRepository = appDataSource
|
||||
.andWhere('md5(original_url) = md5(:url)', { url })
|
||||
|
||||
if (forUpdate) {
|
||||
qb.setLock('pessimistic_write')
|
||||
qb.setLock('pessimistic_read')
|
||||
}
|
||||
|
||||
return qb.getOne()
|
||||
|
||||
@ -760,9 +760,10 @@ export const bulkActionResolver = authorized<
|
||||
},
|
||||
})
|
||||
|
||||
const batchSize = 100
|
||||
const batchSize = 20
|
||||
const searchArgs = {
|
||||
query,
|
||||
includePending: true,
|
||||
size: 0,
|
||||
}
|
||||
const count = await countLibraryItems(searchArgs, uid)
|
||||
@ -778,13 +779,13 @@ export const bulkActionResolver = authorized<
|
||||
action,
|
||||
count,
|
||||
})
|
||||
// if there are less than 100 items, update them synchronously
|
||||
// if there are less than batchSize items, update them synchronously
|
||||
await batchUpdateLibraryItems(action, searchArgs, uid, labelIds, args)
|
||||
|
||||
return { success: true }
|
||||
}
|
||||
|
||||
// if there are more than 100 items, update them asynchronously
|
||||
// if there are more than batchSize items, update them asynchronously
|
||||
const data = {
|
||||
userId: uid,
|
||||
action,
|
||||
|
||||
@ -46,7 +46,7 @@ import { analytics } from './utils/analytics'
|
||||
import { corsConfig } from './utils/corsConfig'
|
||||
import { getClientFromUserAgent } from './utils/helpers'
|
||||
import { buildLogger, buildLoggerTransport, logger } from './utils/logger'
|
||||
import { apiLimiter, authLimiter } from './utils/rate_limit'
|
||||
import { apiHourLimiter, apiLimiter, authLimiter } from './utils/rate_limit'
|
||||
import { shortcutsRouter } from './routers/shortcuts_router'
|
||||
|
||||
const PORT = process.env.PORT || 4000
|
||||
@ -68,7 +68,7 @@ export const createApp = (): Express => {
|
||||
app.set('trust proxy', env.server.trustProxy)
|
||||
|
||||
// Apply the rate limiting middleware to API calls only
|
||||
app.use('/api/', apiLimiter)
|
||||
app.use('/api/', apiLimiter, apiHourLimiter)
|
||||
|
||||
// set client info in the request context
|
||||
app.use(httpContext.middleware)
|
||||
|
||||
@ -8,11 +8,11 @@ import {
|
||||
PageType,
|
||||
} from '../generated/graphql'
|
||||
import { createPubSubClient, PubsubClient } from '../pubsub'
|
||||
import { Merge } from '../util'
|
||||
import { redisDataSource } from '../redis_data_source'
|
||||
import { enqueueParseRequest } from '../utils/createTask'
|
||||
import { cleanUrl, generateSlug } from '../utils/helpers'
|
||||
import { logger } from '../utils/logger'
|
||||
import { countBySavedAt, createOrUpdateLibraryItem } from './library_item'
|
||||
import { createOrUpdateLibraryItem } from './library_item'
|
||||
|
||||
interface PageSaveRequest {
|
||||
user: User
|
||||
@ -34,13 +34,47 @@ const SAVING_CONTENT = 'Your link is being saved...'
|
||||
|
||||
const isPrivateIP = privateIpLib.default
|
||||
|
||||
const recentSavedItemKey = (userId: string) => `recent-saved-item:${userId}`
|
||||
|
||||
const addRecentSavedItem = async (userId: string) => {
|
||||
const redisClient = redisDataSource.redisClient
|
||||
|
||||
if (redisClient) {
|
||||
const key = recentSavedItemKey(userId)
|
||||
try {
|
||||
// add now to the sorted set for rate limiting
|
||||
await redisClient.zadd(key, Date.now(), Date.now())
|
||||
} catch (error) {
|
||||
logger.error('error adding recently saved item in redis', {
|
||||
key,
|
||||
error,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 5 items saved in the last minute: use low queue
|
||||
// default: use normal queue
|
||||
const getPriorityByRateLimit = async (
|
||||
userId: string
|
||||
): Promise<'low' | 'high'> => {
|
||||
const count = await countBySavedAt(userId, new Date(Date.now() - 60 * 1000))
|
||||
return count >= 5 ? 'low' : 'high'
|
||||
): Promise<'low' | 'high' | undefined> => {
|
||||
const redisClient = redisDataSource.redisClient
|
||||
if (redisClient) {
|
||||
const oneMinuteAgo = Date.now() - 60 * 1000
|
||||
const key = recentSavedItemKey(userId)
|
||||
|
||||
try {
|
||||
// Remove items older than one minute
|
||||
await redisClient.zremrangebyscore(key, '-inf', oneMinuteAgo)
|
||||
|
||||
// Count items in the last minute
|
||||
const count = await redisClient.zcard(key)
|
||||
|
||||
return count >= 5 ? 'low' : 'high'
|
||||
} catch (error) {
|
||||
logger.error('Failed to get priority by rate limit', { userId, error })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const validateUrl = (url: string): URL => {
|
||||
@ -118,8 +152,12 @@ export const createPageSaveRequest = async ({
|
||||
pubsub
|
||||
)
|
||||
|
||||
// add to recent saved item
|
||||
await addRecentSavedItem(userId)
|
||||
|
||||
// get priority by checking rate limit if not specified
|
||||
priority = priority || (await getPriorityByRateLimit(userId))
|
||||
logger.debug('priority', { priority })
|
||||
|
||||
// enqueue task to parse item
|
||||
await enqueueParseRequest({
|
||||
|
||||
@ -1192,7 +1192,7 @@ export const findLibraryItemsByPrefix = async (
|
||||
)
|
||||
}
|
||||
|
||||
export const countBySavedAt = async (
|
||||
export const countByCreatedAt = async (
|
||||
userId: string,
|
||||
startDate = new Date(0),
|
||||
endDate = new Date()
|
||||
@ -1202,7 +1202,7 @@ export const countBySavedAt = async (
|
||||
tx
|
||||
.createQueryBuilder(LibraryItem, 'library_item')
|
||||
.where('library_item.user_id = :userId', { userId })
|
||||
.andWhere('library_item.saved_at between :startDate and :endDate', {
|
||||
.andWhere('library_item.created_at between :startDate and :endDate', {
|
||||
startDate,
|
||||
endDate,
|
||||
})
|
||||
@ -1256,7 +1256,7 @@ export const batchUpdateLibraryItems = async (
|
||||
const queryBuilder = getQueryBuilder(userId, em)
|
||||
|
||||
if (forUpdate) {
|
||||
queryBuilder.setLock('pessimistic_write')
|
||||
queryBuilder.setLock('pessimistic_read')
|
||||
}
|
||||
|
||||
const libraryItems = await queryBuilder
|
||||
|
||||
@ -27,7 +27,7 @@ const configs: Partial<Options> = {
|
||||
export const apiLimiter = rateLimit({
|
||||
...configs,
|
||||
max: async (req) => {
|
||||
// 100 RPM for an authenticated request, 15 for a non-authenticated request
|
||||
// 60 RPM for authenticated request, 15 for non-authenticated request
|
||||
const token = getTokenByRequest(req)
|
||||
try {
|
||||
const claims = await getClaimsByToken(token)
|
||||
@ -43,6 +43,26 @@ export const apiLimiter = rateLimit({
|
||||
store: getStore('api-rate-limit'),
|
||||
})
|
||||
|
||||
export const apiHourLimiter = rateLimit({
|
||||
...configs,
|
||||
windowMs: 60 * 60 * 1000, // 1 hour
|
||||
max: async (req) => {
|
||||
// 600 for authenticated request, 150 for non-authenticated request
|
||||
const token = getTokenByRequest(req)
|
||||
try {
|
||||
const claims = await getClaimsByToken(token)
|
||||
return claims ? 600 : 150
|
||||
} catch (e) {
|
||||
console.log('non-authenticated request')
|
||||
return 150
|
||||
}
|
||||
},
|
||||
keyGenerator: (req) => {
|
||||
return getTokenByRequest(req) || req.ip
|
||||
},
|
||||
store: getStore('api-hour-rate-limit'),
|
||||
})
|
||||
|
||||
// 5 RPM for auth requests
|
||||
export const authLimiter = rateLimit({
|
||||
...configs,
|
||||
|
||||
Reference in New Issue
Block a user