diff --git a/packages/api/package.json b/packages/api/package.json index 34d9b24fb..75cecca83 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -82,6 +82,7 @@ "pg": "^8.3.3", "postgrator": "^4.2.0", "private-ip": "^2.3.3", + "redis": "^4.3.1", "rss-parser": "^3.13.0", "sanitize-html": "^2.3.2", "sax": "^1.3.0", diff --git a/packages/api/src/redis.ts b/packages/api/src/redis.ts new file mode 100644 index 000000000..14559c0be --- /dev/null +++ b/packages/api/src/redis.ts @@ -0,0 +1,18 @@ +import { createClient } from 'redis' +import { env } from './env' + +export const redisClient = createClient({ + url: env.redis.url, + socket: { + tls: env.redis.url?.startsWith('rediss://'), // rediss:// is the protocol for TLS + cert: env.redis.cert?.replace(/\\n/g, '\n'), // replace \n with new line + rejectUnauthorized: false, // for self-signed certs + connectTimeout: 10000, // 10 seconds + reconnectStrategy(retries: number): number | Error { + if (retries > 10) { + return new Error('Retries exhausted') + } + return 1000 + }, + }, +}) diff --git a/packages/api/src/server.ts b/packages/api/src/server.ts index 1bb75a085..a6458730f 100755 --- a/packages/api/src/server.ts +++ b/packages/api/src/server.ts @@ -15,6 +15,7 @@ import { config, loggers } from 'winston' import { makeApolloServer } from './apollo' import { appDataSource } from './data_source' import { env } from './env' +import { redisClient } from './redis' import { articleRouter } from './routers/article_router' import { authRouter } from './routers/auth/auth_router' import { mobileAuthRouter } from './routers/auth/mobile/mobile_auth_router' @@ -157,6 +158,16 @@ const main = async (): Promise => { // as healthy. await appDataSource.initialize() + // redis is optional + if (env.redis.url) { + redisClient.on('error', (err) => { + console.error('Redis Client Error', err) + }) + + await redisClient.connect() + console.log('Redis Client Connected:', env.redis.url) + } + const { app, apollo, httpServer } = createApp() await apollo.start() diff --git a/packages/api/src/services/library_item.ts b/packages/api/src/services/library_item.ts index d6944af12..8478f33f3 100644 --- a/packages/api/src/services/library_item.ts +++ b/packages/api/src/services/library_item.ts @@ -6,6 +6,7 @@ import { EntityLabel } from '../entity/entity_label' import { Highlight } from '../entity/highlight' import { Label } from '../entity/label' import { LibraryItem, LibraryItemState } from '../entity/library_item' +import { env } from '../env' import { BulkActionType, InputMaybe, SortParams } from '../generated/graphql' import { createPubSubClient, EntityType } from '../pubsub' import { @@ -15,7 +16,7 @@ import { valuesToRawSql, } from '../repository' import { libraryItemRepository } from '../repository/library_item' -import { wordsCount } from '../utils/helpers' +import { setRecentlySavedItemInRedis, wordsCount } from '../utils/helpers' import { parseSearchQuery } from '../utils/search' enum ReadFilter { @@ -818,6 +819,11 @@ export const createLibraryItem = async ( userId ) + // set recently saved item in redis if redis is enabled + if (env.redis.url) { + await setRecentlySavedItemInRedis(userId, newLibraryItem.originalUrl) + } + if (skipPubSub) { return newLibraryItem } diff --git a/packages/api/src/services/save_page.ts b/packages/api/src/services/save_page.ts index 442dcea93..0cded9a52 100644 --- a/packages/api/src/services/save_page.ts +++ b/packages/api/src/services/save_page.ts @@ -13,6 +13,7 @@ import { SavePageInput, SaveResult, } from '../generated/graphql' +import { redisClient } from '../redis' import { authTrx } from '../repository' import { enqueueThumbnailTask } from '../utils/createTask' import { diff --git a/packages/api/src/util.ts b/packages/api/src/util.ts index c056d17cd..f37b51587 100755 --- a/packages/api/src/util.ts +++ b/packages/api/src/util.ts @@ -109,6 +109,10 @@ interface BackendEnv { max: number } } + redis: { + url?: string + cert?: string + } } /*** @@ -173,6 +177,8 @@ const nullableEnvVars = [ 'INTEGRATION_EXPORTER_URL', 'INTEGRATION_IMPORTER_URL', 'SUBSCRIPTION_FEED_MAX', + 'REDIS_URL', + 'REDIS_CERT', ] // Allow some vars to be null/empty /* If not in GAE and Prod/QA/Demo env (f.e. on localhost/dev env), allow following env vars to be null */ @@ -316,6 +322,10 @@ export function getEnv(): BackendEnv { : 256, // default to 256 }, } + const redis = { + url: parse('REDIS_URL'), + cert: parse('REDIS_CERT'), + } return { pg, @@ -338,6 +348,7 @@ export function getEnv(): BackendEnv { gcp, pocket, subscription, + redis, } } diff --git a/packages/api/src/utils/helpers.ts b/packages/api/src/utils/helpers.ts index df6537996..cf179fd35 100644 --- a/packages/api/src/utils/helpers.ts +++ b/packages/api/src/utils/helpers.ts @@ -25,6 +25,7 @@ import { SearchItem, } from '../generated/graphql' import { createPubSubClient } from '../pubsub' +import { redisClient } from '../redis' import { Claims, WithDataSourcesContext } from '../resolvers/types' import { validateUrl } from '../services/create_page_save_request' import { updateLibraryItem } from '../services/library_item' @@ -407,3 +408,23 @@ export const isRelativeUrl = (url: string): boolean => { export const getAbsoluteUrl = (url: string, baseUrl: string): string => { return new URL(url, baseUrl).href } + +export const setRecentlySavedItemInRedis = async ( + userId: string, + url: string +) => { + // save the url in redis for 8 hours so rss-feeder won't try to re-save it + const redisKey = `recent-saved-item:${userId}:${url}` + const ttlInSeconds = 60 * 60 * 8 + try { + return redisClient.set(redisKey, 1, { + EX: ttlInSeconds, + NX: true, + }) + } catch (error) { + logger.error('error setting recently saved item in redis', { + redisKey, + error, + }) + } +} diff --git a/packages/api/test/global-setup.ts b/packages/api/test/global-setup.ts index d4a56e120..b36cfa15b 100644 --- a/packages/api/test/global-setup.ts +++ b/packages/api/test/global-setup.ts @@ -1,3 +1,5 @@ +import { env } from '../src/env' +import { redisClient } from '../src/redis' import { createTestConnection } from './db' import { startApolloServer } from './util' @@ -5,6 +7,11 @@ export const mochaGlobalSetup = async () => { await createTestConnection() console.log('db connection created') + if (env.redis.url) { + await redisClient.connect() + console.log('redis connection created') + } + await startApolloServer() console.log('apollo server started') } diff --git a/packages/api/test/global-teardown.ts b/packages/api/test/global-teardown.ts index ec201d2db..2d11b2a62 100644 --- a/packages/api/test/global-teardown.ts +++ b/packages/api/test/global-teardown.ts @@ -1,4 +1,6 @@ import { appDataSource } from '../src/data_source' +import { env } from '../src/env' +import { redisClient } from '../src/redis' import { stopApolloServer } from './util' export const mochaGlobalTeardown = async () => { @@ -7,4 +9,9 @@ export const mochaGlobalTeardown = async () => { await appDataSource.destroy() console.log('db connection closed') + + if (env.redis.url) { + await redisClient.disconnect() + console.log('redis connection closed') + } } diff --git a/packages/rss-handler/package.json b/packages/rss-handler/package.json index 49452e22e..728cd9b82 100644 --- a/packages/rss-handler/package.json +++ b/packages/rss-handler/package.json @@ -29,6 +29,7 @@ "dotenv": "^16.0.1", "jsonwebtoken": "^8.5.1", "linkedom": "^0.16.4", + "redis": "^4.3.1", "rss-parser": "^3.13.0" }, "volta": { diff --git a/packages/rss-handler/src/index.ts b/packages/rss-handler/src/index.ts index a615fa675..b3636e41c 100644 --- a/packages/rss-handler/src/index.ts +++ b/packages/rss-handler/src/index.ts @@ -4,11 +4,15 @@ import crypto from 'crypto' import * as dotenv from 'dotenv' // see https://github.com/motdotla/dotenv#how-do-i-use-dotenv-with-import import * as jwt from 'jsonwebtoken' import { parseHTML } from 'linkedom' +import { createClient } from 'redis' import Parser, { Item } from 'rss-parser' import { promisify } from 'util' +import { createRedisClient } from './redis' import { CONTENT_FETCH_URL, createCloudTask } from './task' type FolderType = 'following' | 'inbox' +// explicitly create the return type of RedisClient +type RedisClient = ReturnType interface RssFeedRequest { subscriptionIds: string[] @@ -39,9 +43,10 @@ type RssFeed = Parser.Output<{ type RssFeedItemMedia = { $: { url: string; width?: string; height?: string; medium?: string } } -type RssFeedItem = Item & { +export type RssFeedItem = Item & { 'media:thumbnail'?: RssFeedItemMedia 'media:content'?: RssFeedItemMedia[] + link: string } export const isOldItem = (item: RssFeedItem, lastFetchedAt: number) => { @@ -53,6 +58,40 @@ export const isOldItem = (item: RssFeedItem, lastFetchedAt: number) => { ) } +const feedFetchFailedRedisKey = (feedUrl: string) => + `feed-fetch-failure:${feedUrl}` + +const isFeedBlocked = async (feedUrl: string, redisClient: RedisClient) => { + const key = feedFetchFailedRedisKey(feedUrl) + try { + const result = await redisClient.get(key) + // if the feed has failed to fetch more than certain times, block it + const maxFailures = parseInt(process.env.MAX_FEED_FETCH_FAILURES ?? '10') + if (result && parseInt(result) > maxFailures) { + console.log('feed is blocked: ', feedUrl) + return true + } + } catch (error) { + console.error('Failed to check feed block status', feedUrl, error) + } + + return false +} + +const blockFeed = async (feedUrl: string, redisClient: RedisClient) => { + const key = feedFetchFailedRedisKey(feedUrl) + try { + const result = await redisClient.incr(key) + // expire the key in 1 day + await redisClient.expire(key, 24 * 60 * 60, 'NX') + + return result + } catch (error) { + console.error('Failed to block feed', feedUrl, error) + return null + } +} + export const isContentFetchBlocked = (feedUrl: string) => { if (feedUrl.startsWith('https://arxiv.org/')) { return true @@ -214,13 +253,34 @@ const sendUpdateSubscriptionMutation = async ( } } +const isItemRecentlySaved = async ( + redisClient: RedisClient, + userId: string, + url: string +) => { + const key = `recent-saved-item:${userId}:${url}` + const result = await redisClient.get(key) + return !!result +} + const createTask = async ( userId: string, feedUrl: string, item: RssFeedItem, fetchContent: boolean, - folder: FolderType + folder: FolderType, + redisClient: RedisClient ) => { + const isRecentlySaved = await isItemRecentlySaved( + redisClient, + userId, + item.link + ) + if (isRecentlySaved) { + console.log('Item recently saved', item.link) + return true + } + if (folder === 'following' && !fetchContent) { return createItemWithPreviewContent(userId, feedUrl, item) } @@ -363,7 +423,7 @@ const getUpdatePeriodInHours = (feed: RssFeed) => { } // get link following the order of preference: via, alternate, self -const getLink = (links: RssFeedItemLink[]) => { +const getLink = (links: RssFeedItemLink[]): string | undefined => { // sort links by preference const sortedLinks: string[] = [] @@ -398,10 +458,11 @@ const processSubscription = async ( lastFetchedChecksum: string, fetchContent: boolean, folder: FolderType, - feed: RssFeed + feed: RssFeed, + redisClient: RedisClient ) => { let lastItemFetchedAt: Date | null = null - let lastValidItem: Item | null = null + let lastValidItem: RssFeedItem | null = null if (fetchResult.checksum === lastFetchedChecksum) { console.log('feed has not been updated', feedUrl, lastFetchedChecksum) @@ -425,7 +486,7 @@ const processSubscription = async ( // save each item in the feed for (const item of feed.items) { // use published or updated if isoDate is not available for atom feeds - item.isoDate = + const isoDate = item.isoDate || item.published || item.updated || item.created console.log('Processing feed item', item.links, item.isoDate, feed.feedUrl) @@ -434,21 +495,28 @@ const processSubscription = async ( continue } - item.link = getLink(item.links) - if (!item.link) { + const link = getLink(item.links) + if (!link) { console.log('Invalid feed item links', item.links) continue } - console.log('Fetching feed item', item.link) + console.log('Fetching feed item', link) + const feedItem = { + ...item, + isoDate, + link, + } - const publishedAt = item.isoDate ? new Date(item.isoDate) : new Date() + const publishedAt = feedItem.isoDate + ? new Date(feedItem.isoDate) + : new Date() // remember the last valid item if ( !lastValidItem || (lastValidItem.isoDate && publishedAt > new Date(lastValidItem.isoDate)) ) { - lastValidItem = item + lastValidItem = feedItem } // Max limit per-feed update @@ -457,20 +525,21 @@ const processSubscription = async ( } // skip old items - if (isOldItem(item, lastFetchedAt)) { - console.log('Skipping old feed item', item.link) + if (isOldItem(feedItem, lastFetchedAt)) { + console.log('Skipping old feed item', feedItem.link) continue } const created = await createTask( userId, feedUrl, - item, + feedItem, fetchContent, - folder + folder, + redisClient ) if (!created) { - console.error('Failed to create task for feed item', item.link) + console.error('Failed to create task for feed item', feedItem.link) continue } @@ -496,7 +565,8 @@ const processSubscription = async ( feedUrl, lastValidItem, fetchContent, - folder + folder, + redisClient ) if (!created) { console.error('Failed to create task for feed item', lastValidItem.link) @@ -536,6 +606,12 @@ export const rssHandler = Sentry.GCPFunction.wrapHttpFunction( return res.status(400).send('INVALID_REQUEST_BODY') } + // create redis client + const redisClient = await createRedisClient( + process.env.REDIS_URL, + process.env.REDIS_CERT + ) + const { feedUrl, subscriptionIds, @@ -548,10 +624,17 @@ export const rssHandler = Sentry.GCPFunction.wrapHttpFunction( } = req.body console.log('Processing feed', feedUrl) + const isBlocked = await isFeedBlocked(feedUrl, redisClient) + if (isBlocked) { + console.log('feed is blocked: ', feedUrl) + return res.sendStatus(200) + } + const fetchResult = await fetchAndChecksum(feedUrl) const feed = await parseFeed(feedUrl, fetchResult.content) if (!feed) { console.error('Failed to parse RSS feed', feedUrl) + await blockFeed(feedUrl, redisClient) return res.status(500).send('INVALID_RSS_FEED') } @@ -563,22 +646,22 @@ export const rssHandler = Sentry.GCPFunction.wrapHttpFunction( console.log('Fetched feed', feed.title, new Date()) - await Promise.all( - subscriptionIds.map((_, i) => - processSubscription( - subscriptionIds[i], - userIds[i], - feedUrl, - fetchResult, - lastFetchedTimestamps[i], - scheduledTimestamps[i], - lastFetchedChecksums[i], - fetchContents[i] && allowFetchContent, - folders[i], - feed - ) + // process each subscription sequentially + for (let i = 0; i < subscriptionIds.length; i++) { + await processSubscription( + subscriptionIds[i], + userIds[i], + feedUrl, + fetchResult, + lastFetchedTimestamps[i], + scheduledTimestamps[i], + lastFetchedChecksums[i], + fetchContents[i] && allowFetchContent, + folders[i], + feed, + redisClient ) - ) + } res.send('ok') } catch (e) { diff --git a/packages/rss-handler/src/redis.ts b/packages/rss-handler/src/redis.ts new file mode 100644 index 000000000..350cb09f6 --- /dev/null +++ b/packages/rss-handler/src/redis.ts @@ -0,0 +1,26 @@ +import { createClient } from 'redis' + +export const createRedisClient = async (url?: string, cert?: string) => { + const redisClient = createClient({ + url, + socket: { + tls: url?.startsWith('rediss://'), // rediss:// is the protocol for TLS + cert: cert?.replace(/\\n/g, '\n'), // replace \n with new line + rejectUnauthorized: false, // for self-signed certs + connectTimeout: 10000, // 10 seconds + reconnectStrategy(retries: number): number | Error { + if (retries > 10) { + return new Error('Retries exhausted') + } + return 1000 + }, + }, + }) + + redisClient.on('error', (err) => console.error('Redis Client Error', err)) + + await redisClient.connect() + console.log('Redis Client Connected:', url) + + return redisClient +} diff --git a/packages/rss-handler/test/index.test.ts b/packages/rss-handler/test/index.test.ts index 5abf0b77c..b43cf7973 100644 --- a/packages/rss-handler/test/index.test.ts +++ b/packages/rss-handler/test/index.test.ts @@ -1,13 +1,12 @@ import { expect } from 'chai' import 'mocha' -import { Item } from 'rss-parser' -import { isOldItem } from '../src' +import { isOldItem, RssFeedItem } from '../src' describe('isOldItem', () => { it('returns true if item is older than 1 day', () => { const item = { pubDate: '2020-01-01', - } as Item + } as RssFeedItem const lastFetchedAt = Date.now() expect(isOldItem(item, lastFetchedAt)).to.be.true @@ -17,7 +16,7 @@ describe('isOldItem', () => { const lastFetchedAt = Date.now() const item = { pubDate: new Date(lastFetchedAt).toISOString(), - } as Item + } as RssFeedItem expect(isOldItem(item, lastFetchedAt)).to.be.true })