diff --git a/packages/api/src/jobs/bulk_action.ts b/packages/api/src/jobs/bulk_action.ts index cea355d6e..28ee8461c 100644 --- a/packages/api/src/jobs/bulk_action.ts +++ b/packages/api/src/jobs/bulk_action.ts @@ -27,6 +27,7 @@ export const bulkAction = async (data: BulkActionData) => { for (let offset = 0; offset < count; offset += batchSize) { const searchArgs = { size: batchSize, + includePending: true, query: `(${query}) AND updated:*..${now}`, // only process items that have not been updated } diff --git a/packages/api/src/repository/library_item.ts b/packages/api/src/repository/library_item.ts index 7b1470922..c37e56f82 100644 --- a/packages/api/src/repository/library_item.ts +++ b/packages/api/src/repository/library_item.ts @@ -25,7 +25,7 @@ export const libraryItemRepository = appDataSource .andWhere('md5(original_url) = md5(:url)', { url }) if (forUpdate) { - qb.setLock('pessimistic_write') + qb.setLock('pessimistic_read') } return qb.getOne() diff --git a/packages/api/src/resolvers/article/index.ts b/packages/api/src/resolvers/article/index.ts index 73f17a2c3..b28ca00ad 100644 --- a/packages/api/src/resolvers/article/index.ts +++ b/packages/api/src/resolvers/article/index.ts @@ -760,9 +760,10 @@ export const bulkActionResolver = authorized< }, }) - const batchSize = 100 + const batchSize = 20 const searchArgs = { query, + includePending: true, size: 0, } const count = await countLibraryItems(searchArgs, uid) @@ -778,13 +779,13 @@ export const bulkActionResolver = authorized< action, count, }) - // if there are less than 100 items, update them synchronously + // if there are less than batchSize items, update them synchronously await batchUpdateLibraryItems(action, searchArgs, uid, labelIds, args) return { success: true } } - // if there are more than 100 items, update them asynchronously + // if there are more than batchSize items, update them asynchronously const data = { userId: uid, action, diff --git a/packages/api/src/server.ts b/packages/api/src/server.ts index 37df57fcc..d44c80e32 100755 --- a/packages/api/src/server.ts +++ b/packages/api/src/server.ts @@ -46,7 +46,7 @@ import { analytics } from './utils/analytics' import { corsConfig } from './utils/corsConfig' import { getClientFromUserAgent } from './utils/helpers' import { buildLogger, buildLoggerTransport, logger } from './utils/logger' -import { apiLimiter, authLimiter } from './utils/rate_limit' +import { apiHourLimiter, apiLimiter, authLimiter } from './utils/rate_limit' import { shortcutsRouter } from './routers/shortcuts_router' const PORT = process.env.PORT || 4000 @@ -68,7 +68,7 @@ export const createApp = (): Express => { app.set('trust proxy', env.server.trustProxy) // Apply the rate limiting middleware to API calls only - app.use('/api/', apiLimiter) + app.use('/api/', apiLimiter, apiHourLimiter) // set client info in the request context app.use(httpContext.middleware) diff --git a/packages/api/src/services/create_page_save_request.ts b/packages/api/src/services/create_page_save_request.ts index 8e2470e2c..86d45765d 100644 --- a/packages/api/src/services/create_page_save_request.ts +++ b/packages/api/src/services/create_page_save_request.ts @@ -8,11 +8,11 @@ import { PageType, } from '../generated/graphql' import { createPubSubClient, PubsubClient } from '../pubsub' -import { Merge } from '../util' +import { redisDataSource } from '../redis_data_source' import { enqueueParseRequest } from '../utils/createTask' import { cleanUrl, generateSlug } from '../utils/helpers' import { logger } from '../utils/logger' -import { countBySavedAt, createOrUpdateLibraryItem } from './library_item' +import { createOrUpdateLibraryItem } from './library_item' interface PageSaveRequest { user: User @@ -34,13 +34,47 @@ const SAVING_CONTENT = 'Your link is being saved...' const isPrivateIP = privateIpLib.default +const recentSavedItemKey = (userId: string) => `recent-saved-item:${userId}` + +const addRecentSavedItem = async (userId: string) => { + const redisClient = redisDataSource.redisClient + + if (redisClient) { + const key = recentSavedItemKey(userId) + try { + // add now to the sorted set for rate limiting + await redisClient.zadd(key, Date.now(), Date.now()) + } catch (error) { + logger.error('error adding recently saved item in redis', { + key, + error, + }) + } + } +} + // 5 items saved in the last minute: use low queue // default: use normal queue const getPriorityByRateLimit = async ( userId: string -): Promise<'low' | 'high'> => { - const count = await countBySavedAt(userId, new Date(Date.now() - 60 * 1000)) - return count >= 5 ? 'low' : 'high' +): Promise<'low' | 'high' | undefined> => { + const redisClient = redisDataSource.redisClient + if (redisClient) { + const oneMinuteAgo = Date.now() - 60 * 1000 + const key = recentSavedItemKey(userId) + + try { + // Remove items older than one minute + await redisClient.zremrangebyscore(key, '-inf', oneMinuteAgo) + + // Count items in the last minute + const count = await redisClient.zcard(key) + + return count >= 5 ? 'low' : 'high' + } catch (error) { + logger.error('Failed to get priority by rate limit', { userId, error }) + } + } } export const validateUrl = (url: string): URL => { @@ -118,8 +152,12 @@ export const createPageSaveRequest = async ({ pubsub ) + // add to recent saved item + await addRecentSavedItem(userId) + // get priority by checking rate limit if not specified priority = priority || (await getPriorityByRateLimit(userId)) + logger.debug('priority', { priority }) // enqueue task to parse item await enqueueParseRequest({ diff --git a/packages/api/src/services/library_item.ts b/packages/api/src/services/library_item.ts index b7c838202..ed133eaff 100644 --- a/packages/api/src/services/library_item.ts +++ b/packages/api/src/services/library_item.ts @@ -1192,7 +1192,7 @@ export const findLibraryItemsByPrefix = async ( ) } -export const countBySavedAt = async ( +export const countByCreatedAt = async ( userId: string, startDate = new Date(0), endDate = new Date() @@ -1202,7 +1202,7 @@ export const countBySavedAt = async ( tx .createQueryBuilder(LibraryItem, 'library_item') .where('library_item.user_id = :userId', { userId }) - .andWhere('library_item.saved_at between :startDate and :endDate', { + .andWhere('library_item.created_at between :startDate and :endDate', { startDate, endDate, }) @@ -1256,7 +1256,7 @@ export const batchUpdateLibraryItems = async ( const queryBuilder = getQueryBuilder(userId, em) if (forUpdate) { - queryBuilder.setLock('pessimistic_write') + queryBuilder.setLock('pessimistic_read') } const libraryItems = await queryBuilder diff --git a/packages/api/src/utils/rate_limit.ts b/packages/api/src/utils/rate_limit.ts index 4f6e8c11f..27675dc6e 100644 --- a/packages/api/src/utils/rate_limit.ts +++ b/packages/api/src/utils/rate_limit.ts @@ -27,7 +27,7 @@ const configs: Partial = { export const apiLimiter = rateLimit({ ...configs, max: async (req) => { - // 100 RPM for an authenticated request, 15 for a non-authenticated request + // 60 RPM for authenticated request, 15 for non-authenticated request const token = getTokenByRequest(req) try { const claims = await getClaimsByToken(token) @@ -43,6 +43,26 @@ export const apiLimiter = rateLimit({ store: getStore('api-rate-limit'), }) +export const apiHourLimiter = rateLimit({ + ...configs, + windowMs: 60 * 60 * 1000, // 1 hour + max: async (req) => { + // 600 for authenticated request, 150 for non-authenticated request + const token = getTokenByRequest(req) + try { + const claims = await getClaimsByToken(token) + return claims ? 600 : 150 + } catch (e) { + console.log('non-authenticated request') + return 150 + } + }, + keyGenerator: (req) => { + return getTokenByRequest(req) || req.ip + }, + store: getStore('api-hour-rate-limit'), +}) + // 5 RPM for auth requests export const authLimiter = rateLimit({ ...configs, diff --git a/packages/content-fetch/Dockerfile b/packages/content-fetch/Dockerfile index 52824edf6..dc43e8bb8 100644 --- a/packages/content-fetch/Dockerfile +++ b/packages/content-fetch/Dockerfile @@ -23,7 +23,6 @@ COPY .prettierrc . COPY .eslintrc . COPY /packages/content-fetch/package.json ./packages/content-fetch/package.json -COPY /packages/readabilityjs/package.json ./packages/readabilityjs/package.json COPY /packages/content-handler/package.json ./packages/content-handler/package.json COPY /packages/puppeteer-parse/package.json ./packages/puppeteer-parse/package.json COPY /packages/utils/package.json ./packages/utils/package.json @@ -33,7 +32,6 @@ RUN yarn install --pure-lockfile ADD /packages/content-fetch ./packages/content-fetch ADD /packages/content-handler ./packages/content-handler ADD /packages/puppeteer-parse ./packages/puppeteer-parse -ADD /packages/readabilityjs ./packages/readabilityjs ADD /packages/utils ./packages/utils RUN yarn workspace @omnivore/utils build RUN yarn workspace @omnivore/content-handler build diff --git a/packages/content-fetch/Dockerfile-gcf b/packages/content-fetch/Dockerfile-gcf index 2cbabd931..7691ab1ae 100644 --- a/packages/content-fetch/Dockerfile-gcf +++ b/packages/content-fetch/Dockerfile-gcf @@ -27,7 +27,6 @@ COPY .prettierrc . COPY .eslintrc . COPY /packages/content-fetch/package.json ./packages/content-fetch/package.json -COPY /packages/readabilityjs/package.json ./packages/readabilityjs/package.json COPY /packages/content-handler/package.json ./packages/content-handler/package.json COPY /packages/puppeteer-parse/package.json ./packages/puppeteer-parse/package.json COPY /packages/utils/package.json ./packages/utils/package.json @@ -37,7 +36,6 @@ RUN yarn install --pure-lockfile ADD /packages/content-handler ./packages/content-handler ADD /packages/puppeteer-parse ./packages/puppeteer-parse ADD /packages/content-fetch ./packages/content-fetch -ADD /packages/readabilityjs ./packages/readabilityjs ADD /packages/utils ./packages/utils RUN yarn workspace @omnivore/utils build RUN yarn workspace @omnivore/content-handler build diff --git a/packages/content-fetch/src/job.ts b/packages/content-fetch/src/job.ts index 68e1d3cd1..ae282bfb7 100644 --- a/packages/content-fetch/src/job.ts +++ b/packages/content-fetch/src/job.ts @@ -77,7 +77,10 @@ export const queueSavePageJob = async ( data: job.data, opts: getOpts(job), })) - console.log('queue save page jobs:', JSON.stringify(jobs, null, 2)) + console.log( + 'queue save page jobs:', + jobs.map((job) => job.data.finalUrl) + ) const queue = new Queue(QUEUE_NAME, { connection: redisDataSource.queueRedisClient, diff --git a/packages/content-fetch/src/request_handler.ts b/packages/content-fetch/src/request_handler.ts index 81fc04de2..fafe53ead 100644 --- a/packages/content-fetch/src/request_handler.ts +++ b/packages/content-fetch/src/request_handler.ts @@ -129,6 +129,52 @@ const getCachedFetchResult = async ( return fetchResult } +const failureRedisKey = (domain: string) => `fetch-failure:${domain}` + +const isDomainBlocked = async ( + redisDataSource: RedisDataSource, + domain: string +) => { + const blockedDomains = ['localhost', 'weibo.com'] + if (blockedDomains.includes(domain)) { + return true + } + + const key = failureRedisKey(domain) + const redisClient = redisDataSource.cacheClient + try { + const result = await redisClient.get(key) + // if the domain has failed to fetch more than certain times, block it + const maxFailures = parseInt(process.env.MAX_FEED_FETCH_FAILURES ?? '10') + if (result && parseInt(result) > maxFailures) { + console.info(`domain is blocked: ${domain}`) + return true + } + } catch (error) { + console.error('Failed to check domain block status', { domain, error }) + } + + return false +} + +const incrementContentFetchFailure = async ( + redisDataSource: RedisDataSource, + domain: string +) => { + const redisClient = redisDataSource.cacheClient + const key = failureRedisKey(domain) + try { + const result = await redisClient.incr(key) + // expire the key in 1 day + await redisClient.expire(key, 24 * 60 * 60) + + return result + } catch (error) { + console.error('Failed to increment failure in redis', { domain, error }) + return null + } +} + export const contentFetchRequestHandler: RequestHandler = async (req, res) => { const functionStartTime = Date.now() @@ -200,8 +246,22 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { url ) - fetchResult = await fetchContent(url, locale, timezone) - console.log('content has been fetched') + const domain = new URL(url).hostname + const isBlocked = await isDomainBlocked(redisDataSource, domain) + if (isBlocked) { + console.log('domain is blocked', domain) + + return res.sendStatus(200) + } + + try { + fetchResult = await fetchContent(url, locale, timezone) + console.log('content has been fetched') + } catch (error) { + await incrementContentFetchFailure(redisDataSource, domain) + + throw error + } if (fetchResult.content && !NO_CACHE_URLS.includes(url)) { const cacheResult = await cacheFetchResult( diff --git a/packages/puppeteer-parse/package.json b/packages/puppeteer-parse/package.json index 14637c232..0683a93ee 100644 --- a/packages/puppeteer-parse/package.json +++ b/packages/puppeteer-parse/package.json @@ -9,20 +9,12 @@ ], "dependencies": { "@omnivore/content-handler": "1.0.0", - "@omnivore/readability": "1.0.0", - "axios": "^1.4.0", - "crypto": "^1.0.1", - "dompurify": "^2.4.1", - "linkedom": "^0.14.9", "puppeteer-core": "^22.12.1", "puppeteer-extra": "^3.3.6", "puppeteer-extra-plugin-adblocker": "^2.13.6", - "puppeteer-extra-plugin-stealth": "^2.11.2", - "urlsafe-base64": "^1.0.0" + "puppeteer-extra-plugin-stealth": "^2.11.2" }, "devDependencies": { - "@types/dompurify": "^3.0.5", - "@types/urlsafe-base64": "^1.0.31", "chai": "^4.3.6", "mocha": "^10.0.0" }, diff --git a/packages/puppeteer-parse/src/index.ts b/packages/puppeteer-parse/src/index.ts index 78031e58c..853c9a6ef 100644 --- a/packages/puppeteer-parse/src/index.ts +++ b/packages/puppeteer-parse/src/index.ts @@ -1,8 +1,5 @@ -/* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ import { preHandleContent } from '@omnivore/content-handler' -import axios from 'axios' -import { parseHTML } from 'linkedom' import path from 'path' import { BrowserContext, Page, Protocol } from 'puppeteer-core' import { getBrowser } from './browser' @@ -16,22 +13,6 @@ const ALLOWED_CONTENT_TYPES = [ 'application/pdf', ] -const fetchContentWithScrapingBee = async (url: string) => { - const response = await axios.get('https://app.scrapingbee.com/api/v1', { - params: { - api_key: process.env.SCRAPINGBEE_API_KEY, - url: url, - render_js: 'false', - premium_proxy: 'true', - country_code: 'us', - }, - timeout: 10_000, - }) - - const dom = parseHTML(response.data).document - return { title: dom.title, domContent: dom.documentElement.outerHTML, url } -} - const enableJavascriptForUrl = (url: string) => { try { const u = new URL(url) @@ -60,39 +41,28 @@ export const fetchContent = async ( } console.log(`content-fetch request`, logRecord) - let page: Page | undefined, - title: string | undefined, + let title: string | undefined, content: string | undefined, contentType: string | undefined, context: BrowserContext | undefined try { url = getUrl(url) - if (!url) { - throw new Error('Valid URL to parse not specified') - } // pre handle url with custom handlers try { const result = await preHandleContent(url) - if (result && result.url) { - validateUrlString(url) - url = result.url - } - if (result && result.title) { - title = result.title - } - if (result && result.content) { - content = result.content - } - if (result && result.contentType) { - contentType = result.contentType + if (result?.url) { + url = getUrl(result.url) } + title = result?.title + content = result?.content + contentType = result?.contentType } catch (e) { - console.info('error with handler: ', e) + console.error('error with handler: ', e) } - if ((!content || !title) && contentType !== 'application/pdf') { + if (contentType !== 'application/pdf' && (!content || !title)) { const result = await retrievePage( url, logRecord, @@ -100,59 +70,27 @@ export const fetchContent = async ( locale, timezone ) - if (result && result.context) { - context = result.context - } - if (result && result.page) { - page = result.page - } - if (result && result.finalUrl) { - url = result.finalUrl - } - if (result && result.contentType) { - contentType = result.contentType - } - } + context = result.context + url = result.finalUrl + contentType = result.contentType - if (contentType !== 'application/pdf') { - if (page && (!content || !title)) { + const page = result.page + if (page) { const result = await retrieveHtml(page, logRecord) - if (result.isBlocked) { - const sbResult = await fetchContentWithScrapingBee(url) - title = sbResult.title - content = sbResult.domContent - } else { - title = result.title - content = result.domContent - } - } else { - console.info('using prefetched content and title') + title = result.title + content = result.domContent } } } catch (e) { console.error(`Error while retrieving page ${url}`, e) - // fallback to scrapingbee for non pdf content - if (url && contentType !== 'application/pdf') { - console.info('fallback to scrapingbee', url) - - const sbResult = await fetchContentWithScrapingBee(url) - - return { - finalUrl: url, - title: sbResult.title, - content: sbResult.domContent, - contentType, - } - } - throw e } finally { // close browser context if it was created if (context) { - console.info('closing page...', url) + console.info('closing context...', url) await context.close() - console.info('page closed', url) + console.info('context closed', url) } console.info(`content-fetch result`, logRecord) @@ -519,8 +457,12 @@ async function retrieveHtml(page: Page, logRecord: Record) { throw e } + if (domContent === 'IS_BLOCKED') { - return { isBlocked: true } + logRecord.blockedByClient = true + + throw new Error('Page is blocked') } + return { domContent, title } } diff --git a/packages/puppeteer-parse/src/readability.d.ts b/packages/puppeteer-parse/src/readability.d.ts deleted file mode 100644 index 4722588cd..000000000 --- a/packages/puppeteer-parse/src/readability.d.ts +++ /dev/null @@ -1,173 +0,0 @@ -// Type definitions for non-npm package mozilla-readability 0.2 -// Project: https://github.com/mozilla/readability -// Definitions by: Charles Vandevoorde , Alex Wendland -// Definitions: https://github.com/DefinitelyTyped/DefinitelyTyped -// TypeScript Version: 2.2 - -declare module '@omnivore/readability' { - /** - * A standalone version of the readability library used for Firefox Reader View. - * - * Note that isProbablyReaderable() was moved into a separate file in https://github.com/mozilla/readability/commit/2620542dd1e8380220d82afa97a2c283ae636e40 - * and therefore is no longer part of the Readability class. - */ - class Readability { - /** - * ## Usage on the web - * - * To parse a document, you must create a new Readability object from a - * DOM document object, and then call parse(). Here's an example: - * - * ```js - * var article = new Readability(document).parse(); - * ``` - * - * If you're using Readability on the web, you will likely be able to - * use a document reference from elsewhere (e.g. fetched via XMLHttpRequest, - * in a same-origin