Merge pull request #3316 from omnivore-app/perf/rss

improve rss feeder performance
This commit is contained in:
Hongbo Wu
2024-01-05 15:57:49 +08:00
committed by GitHub
13 changed files with 229 additions and 37 deletions

View File

@ -82,6 +82,7 @@
"pg": "^8.3.3",
"postgrator": "^4.2.0",
"private-ip": "^2.3.3",
"redis": "^4.3.1",
"rss-parser": "^3.13.0",
"sanitize-html": "^2.3.2",
"sax": "^1.3.0",

18
packages/api/src/redis.ts Normal file
View File

@ -0,0 +1,18 @@
import { createClient } from 'redis'
import { env } from './env'
export const redisClient = createClient({
url: env.redis.url,
socket: {
tls: env.redis.url?.startsWith('rediss://'), // rediss:// is the protocol for TLS
cert: env.redis.cert?.replace(/\\n/g, '\n'), // replace \n with new line
rejectUnauthorized: false, // for self-signed certs
connectTimeout: 10000, // 10 seconds
reconnectStrategy(retries: number): number | Error {
if (retries > 10) {
return new Error('Retries exhausted')
}
return 1000
},
},
})

View File

@ -15,6 +15,7 @@ import { config, loggers } from 'winston'
import { makeApolloServer } from './apollo'
import { appDataSource } from './data_source'
import { env } from './env'
import { redisClient } from './redis'
import { articleRouter } from './routers/article_router'
import { authRouter } from './routers/auth/auth_router'
import { mobileAuthRouter } from './routers/auth/mobile/mobile_auth_router'
@ -157,6 +158,16 @@ const main = async (): Promise<void> => {
// as healthy.
await appDataSource.initialize()
// redis is optional
if (env.redis.url) {
redisClient.on('error', (err) => {
console.error('Redis Client Error', err)
})
await redisClient.connect()
console.log('Redis Client Connected:', env.redis.url)
}
const { app, apollo, httpServer } = createApp()
await apollo.start()

View File

@ -6,6 +6,7 @@ import { EntityLabel } from '../entity/entity_label'
import { Highlight } from '../entity/highlight'
import { Label } from '../entity/label'
import { LibraryItem, LibraryItemState } from '../entity/library_item'
import { env } from '../env'
import { BulkActionType, InputMaybe, SortParams } from '../generated/graphql'
import { createPubSubClient, EntityType } from '../pubsub'
import {
@ -15,7 +16,7 @@ import {
valuesToRawSql,
} from '../repository'
import { libraryItemRepository } from '../repository/library_item'
import { wordsCount } from '../utils/helpers'
import { setRecentlySavedItemInRedis, wordsCount } from '../utils/helpers'
import { parseSearchQuery } from '../utils/search'
enum ReadFilter {
@ -818,6 +819,11 @@ export const createLibraryItem = async (
userId
)
// set recently saved item in redis if redis is enabled
if (env.redis.url) {
await setRecentlySavedItemInRedis(userId, newLibraryItem.originalUrl)
}
if (skipPubSub) {
return newLibraryItem
}

View File

@ -13,6 +13,7 @@ import {
SavePageInput,
SaveResult,
} from '../generated/graphql'
import { redisClient } from '../redis'
import { authTrx } from '../repository'
import { enqueueThumbnailTask } from '../utils/createTask'
import {

View File

@ -109,6 +109,10 @@ interface BackendEnv {
max: number
}
}
redis: {
url?: string
cert?: string
}
}
/***
@ -173,6 +177,8 @@ const nullableEnvVars = [
'INTEGRATION_EXPORTER_URL',
'INTEGRATION_IMPORTER_URL',
'SUBSCRIPTION_FEED_MAX',
'REDIS_URL',
'REDIS_CERT',
] // Allow some vars to be null/empty
/* If not in GAE and Prod/QA/Demo env (f.e. on localhost/dev env), allow following env vars to be null */
@ -316,6 +322,10 @@ export function getEnv(): BackendEnv {
: 256, // default to 256
},
}
const redis = {
url: parse('REDIS_URL'),
cert: parse('REDIS_CERT'),
}
return {
pg,
@ -338,6 +348,7 @@ export function getEnv(): BackendEnv {
gcp,
pocket,
subscription,
redis,
}
}

View File

@ -25,6 +25,7 @@ import {
SearchItem,
} from '../generated/graphql'
import { createPubSubClient } from '../pubsub'
import { redisClient } from '../redis'
import { Claims, WithDataSourcesContext } from '../resolvers/types'
import { validateUrl } from '../services/create_page_save_request'
import { updateLibraryItem } from '../services/library_item'
@ -407,3 +408,23 @@ export const isRelativeUrl = (url: string): boolean => {
export const getAbsoluteUrl = (url: string, baseUrl: string): string => {
return new URL(url, baseUrl).href
}
export const setRecentlySavedItemInRedis = async (
userId: string,
url: string
) => {
// save the url in redis for 8 hours so rss-feeder won't try to re-save it
const redisKey = `recent-saved-item:${userId}:${url}`
const ttlInSeconds = 60 * 60 * 8
try {
return redisClient.set(redisKey, 1, {
EX: ttlInSeconds,
NX: true,
})
} catch (error) {
logger.error('error setting recently saved item in redis', {
redisKey,
error,
})
}
}

View File

@ -1,3 +1,5 @@
import { env } from '../src/env'
import { redisClient } from '../src/redis'
import { createTestConnection } from './db'
import { startApolloServer } from './util'
@ -5,6 +7,11 @@ export const mochaGlobalSetup = async () => {
await createTestConnection()
console.log('db connection created')
if (env.redis.url) {
await redisClient.connect()
console.log('redis connection created')
}
await startApolloServer()
console.log('apollo server started')
}

View File

@ -1,4 +1,6 @@
import { appDataSource } from '../src/data_source'
import { env } from '../src/env'
import { redisClient } from '../src/redis'
import { stopApolloServer } from './util'
export const mochaGlobalTeardown = async () => {
@ -7,4 +9,9 @@ export const mochaGlobalTeardown = async () => {
await appDataSource.destroy()
console.log('db connection closed')
if (env.redis.url) {
await redisClient.disconnect()
console.log('redis connection closed')
}
}

View File

@ -29,6 +29,7 @@
"dotenv": "^16.0.1",
"jsonwebtoken": "^8.5.1",
"linkedom": "^0.16.4",
"redis": "^4.3.1",
"rss-parser": "^3.13.0"
},
"volta": {

View File

@ -4,11 +4,15 @@ import crypto from 'crypto'
import * as dotenv from 'dotenv' // see https://github.com/motdotla/dotenv#how-do-i-use-dotenv-with-import
import * as jwt from 'jsonwebtoken'
import { parseHTML } from 'linkedom'
import { createClient } from 'redis'
import Parser, { Item } from 'rss-parser'
import { promisify } from 'util'
import { createRedisClient } from './redis'
import { CONTENT_FETCH_URL, createCloudTask } from './task'
type FolderType = 'following' | 'inbox'
// explicitly create the return type of RedisClient
type RedisClient = ReturnType<typeof createClient>
interface RssFeedRequest {
subscriptionIds: string[]
@ -39,9 +43,10 @@ type RssFeed = Parser.Output<{
type RssFeedItemMedia = {
$: { url: string; width?: string; height?: string; medium?: string }
}
type RssFeedItem = Item & {
export type RssFeedItem = Item & {
'media:thumbnail'?: RssFeedItemMedia
'media:content'?: RssFeedItemMedia[]
link: string
}
export const isOldItem = (item: RssFeedItem, lastFetchedAt: number) => {
@ -53,6 +58,40 @@ export const isOldItem = (item: RssFeedItem, lastFetchedAt: number) => {
)
}
const feedFetchFailedRedisKey = (feedUrl: string) =>
`feed-fetch-failure:${feedUrl}`
const isFeedBlocked = async (feedUrl: string, redisClient: RedisClient) => {
const key = feedFetchFailedRedisKey(feedUrl)
try {
const result = await redisClient.get(key)
// if the feed has failed to fetch more than certain times, block it
const maxFailures = parseInt(process.env.MAX_FEED_FETCH_FAILURES ?? '10')
if (result && parseInt(result) > maxFailures) {
console.log('feed is blocked: ', feedUrl)
return true
}
} catch (error) {
console.error('Failed to check feed block status', feedUrl, error)
}
return false
}
const blockFeed = async (feedUrl: string, redisClient: RedisClient) => {
const key = feedFetchFailedRedisKey(feedUrl)
try {
const result = await redisClient.incr(key)
// expire the key in 1 day
await redisClient.expire(key, 24 * 60 * 60, 'NX')
return result
} catch (error) {
console.error('Failed to block feed', feedUrl, error)
return null
}
}
export const isContentFetchBlocked = (feedUrl: string) => {
if (feedUrl.startsWith('https://arxiv.org/')) {
return true
@ -214,13 +253,34 @@ const sendUpdateSubscriptionMutation = async (
}
}
const isItemRecentlySaved = async (
redisClient: RedisClient,
userId: string,
url: string
) => {
const key = `recent-saved-item:${userId}:${url}`
const result = await redisClient.get(key)
return !!result
}
const createTask = async (
userId: string,
feedUrl: string,
item: RssFeedItem,
fetchContent: boolean,
folder: FolderType
folder: FolderType,
redisClient: RedisClient
) => {
const isRecentlySaved = await isItemRecentlySaved(
redisClient,
userId,
item.link
)
if (isRecentlySaved) {
console.log('Item recently saved', item.link)
return true
}
if (folder === 'following' && !fetchContent) {
return createItemWithPreviewContent(userId, feedUrl, item)
}
@ -363,7 +423,7 @@ const getUpdatePeriodInHours = (feed: RssFeed) => {
}
// get link following the order of preference: via, alternate, self
const getLink = (links: RssFeedItemLink[]) => {
const getLink = (links: RssFeedItemLink[]): string | undefined => {
// sort links by preference
const sortedLinks: string[] = []
@ -398,10 +458,11 @@ const processSubscription = async (
lastFetchedChecksum: string,
fetchContent: boolean,
folder: FolderType,
feed: RssFeed
feed: RssFeed,
redisClient: RedisClient
) => {
let lastItemFetchedAt: Date | null = null
let lastValidItem: Item | null = null
let lastValidItem: RssFeedItem | null = null
if (fetchResult.checksum === lastFetchedChecksum) {
console.log('feed has not been updated', feedUrl, lastFetchedChecksum)
@ -425,7 +486,7 @@ const processSubscription = async (
// save each item in the feed
for (const item of feed.items) {
// use published or updated if isoDate is not available for atom feeds
item.isoDate =
const isoDate =
item.isoDate || item.published || item.updated || item.created
console.log('Processing feed item', item.links, item.isoDate, feed.feedUrl)
@ -434,21 +495,28 @@ const processSubscription = async (
continue
}
item.link = getLink(item.links)
if (!item.link) {
const link = getLink(item.links)
if (!link) {
console.log('Invalid feed item links', item.links)
continue
}
console.log('Fetching feed item', item.link)
console.log('Fetching feed item', link)
const feedItem = {
...item,
isoDate,
link,
}
const publishedAt = item.isoDate ? new Date(item.isoDate) : new Date()
const publishedAt = feedItem.isoDate
? new Date(feedItem.isoDate)
: new Date()
// remember the last valid item
if (
!lastValidItem ||
(lastValidItem.isoDate && publishedAt > new Date(lastValidItem.isoDate))
) {
lastValidItem = item
lastValidItem = feedItem
}
// Max limit per-feed update
@ -457,20 +525,21 @@ const processSubscription = async (
}
// skip old items
if (isOldItem(item, lastFetchedAt)) {
console.log('Skipping old feed item', item.link)
if (isOldItem(feedItem, lastFetchedAt)) {
console.log('Skipping old feed item', feedItem.link)
continue
}
const created = await createTask(
userId,
feedUrl,
item,
feedItem,
fetchContent,
folder
folder,
redisClient
)
if (!created) {
console.error('Failed to create task for feed item', item.link)
console.error('Failed to create task for feed item', feedItem.link)
continue
}
@ -496,7 +565,8 @@ const processSubscription = async (
feedUrl,
lastValidItem,
fetchContent,
folder
folder,
redisClient
)
if (!created) {
console.error('Failed to create task for feed item', lastValidItem.link)
@ -536,6 +606,12 @@ export const rssHandler = Sentry.GCPFunction.wrapHttpFunction(
return res.status(400).send('INVALID_REQUEST_BODY')
}
// create redis client
const redisClient = await createRedisClient(
process.env.REDIS_URL,
process.env.REDIS_CERT
)
const {
feedUrl,
subscriptionIds,
@ -548,10 +624,17 @@ export const rssHandler = Sentry.GCPFunction.wrapHttpFunction(
} = req.body
console.log('Processing feed', feedUrl)
const isBlocked = await isFeedBlocked(feedUrl, redisClient)
if (isBlocked) {
console.log('feed is blocked: ', feedUrl)
return res.sendStatus(200)
}
const fetchResult = await fetchAndChecksum(feedUrl)
const feed = await parseFeed(feedUrl, fetchResult.content)
if (!feed) {
console.error('Failed to parse RSS feed', feedUrl)
await blockFeed(feedUrl, redisClient)
return res.status(500).send('INVALID_RSS_FEED')
}
@ -563,22 +646,22 @@ export const rssHandler = Sentry.GCPFunction.wrapHttpFunction(
console.log('Fetched feed', feed.title, new Date())
await Promise.all(
subscriptionIds.map((_, i) =>
processSubscription(
subscriptionIds[i],
userIds[i],
feedUrl,
fetchResult,
lastFetchedTimestamps[i],
scheduledTimestamps[i],
lastFetchedChecksums[i],
fetchContents[i] && allowFetchContent,
folders[i],
feed
)
// process each subscription sequentially
for (let i = 0; i < subscriptionIds.length; i++) {
await processSubscription(
subscriptionIds[i],
userIds[i],
feedUrl,
fetchResult,
lastFetchedTimestamps[i],
scheduledTimestamps[i],
lastFetchedChecksums[i],
fetchContents[i] && allowFetchContent,
folders[i],
feed,
redisClient
)
)
}
res.send('ok')
} catch (e) {

View File

@ -0,0 +1,26 @@
import { createClient } from 'redis'
export const createRedisClient = async (url?: string, cert?: string) => {
const redisClient = createClient({
url,
socket: {
tls: url?.startsWith('rediss://'), // rediss:// is the protocol for TLS
cert: cert?.replace(/\\n/g, '\n'), // replace \n with new line
rejectUnauthorized: false, // for self-signed certs
connectTimeout: 10000, // 10 seconds
reconnectStrategy(retries: number): number | Error {
if (retries > 10) {
return new Error('Retries exhausted')
}
return 1000
},
},
})
redisClient.on('error', (err) => console.error('Redis Client Error', err))
await redisClient.connect()
console.log('Redis Client Connected:', url)
return redisClient
}

View File

@ -1,13 +1,12 @@
import { expect } from 'chai'
import 'mocha'
import { Item } from 'rss-parser'
import { isOldItem } from '../src'
import { isOldItem, RssFeedItem } from '../src'
describe('isOldItem', () => {
it('returns true if item is older than 1 day', () => {
const item = {
pubDate: '2020-01-01',
} as Item
} as RssFeedItem
const lastFetchedAt = Date.now()
expect(isOldItem(item, lastFetchedAt)).to.be.true
@ -17,7 +16,7 @@ describe('isOldItem', () => {
const lastFetchedAt = Date.now()
const item = {
pubDate: new Date(lastFetchedAt).toISOString(),
} as Item
} as RssFeedItem
expect(isOldItem(item, lastFetchedAt)).to.be.true
})