diff --git a/packages/rss-handler/package.json b/packages/rss-handler/package.json index 49452e22e..728cd9b82 100644 --- a/packages/rss-handler/package.json +++ b/packages/rss-handler/package.json @@ -29,6 +29,7 @@ "dotenv": "^16.0.1", "jsonwebtoken": "^8.5.1", "linkedom": "^0.16.4", + "redis": "^4.3.1", "rss-parser": "^3.13.0" }, "volta": { diff --git a/packages/rss-handler/src/index.ts b/packages/rss-handler/src/index.ts index a615fa675..2da93f3f7 100644 --- a/packages/rss-handler/src/index.ts +++ b/packages/rss-handler/src/index.ts @@ -4,11 +4,15 @@ import crypto from 'crypto' import * as dotenv from 'dotenv' // see https://github.com/motdotla/dotenv#how-do-i-use-dotenv-with-import import * as jwt from 'jsonwebtoken' import { parseHTML } from 'linkedom' +import { createClient } from 'redis' import Parser, { Item } from 'rss-parser' import { promisify } from 'util' +import { createRedisClient } from './redis' import { CONTENT_FETCH_URL, createCloudTask } from './task' type FolderType = 'following' | 'inbox' +// explicitly create the return type of RedisClient +type RedisClient = ReturnType interface RssFeedRequest { subscriptionIds: string[] @@ -42,6 +46,7 @@ type RssFeedItemMedia = { type RssFeedItem = Item & { 'media:thumbnail'?: RssFeedItemMedia 'media:content'?: RssFeedItemMedia[] + link: string } export const isOldItem = (item: RssFeedItem, lastFetchedAt: number) => { @@ -214,13 +219,34 @@ const sendUpdateSubscriptionMutation = async ( } } +const isItemRecentlySaved = async ( + redisClient: RedisClient, + userId: string, + url: string +) => { + const key = `rss-recent-save:${userId}:${url}` + const result = await redisClient.get(key) + return !!result +} + const createTask = async ( userId: string, feedUrl: string, item: RssFeedItem, fetchContent: boolean, - folder: FolderType + folder: FolderType, + redisClient: RedisClient ) => { + const isRecentlySaved = await isItemRecentlySaved( + redisClient, + userId, + item.link + ) + if (isRecentlySaved) { + console.log('Item recently saved', item.link) + return true + } + if (folder === 'following' && !fetchContent) { return createItemWithPreviewContent(userId, feedUrl, item) } @@ -363,7 +389,7 @@ const getUpdatePeriodInHours = (feed: RssFeed) => { } // get link following the order of preference: via, alternate, self -const getLink = (links: RssFeedItemLink[]) => { +const getLink = (links: RssFeedItemLink[]): string | undefined => { // sort links by preference const sortedLinks: string[] = [] @@ -398,10 +424,11 @@ const processSubscription = async ( lastFetchedChecksum: string, fetchContent: boolean, folder: FolderType, - feed: RssFeed + feed: RssFeed, + redisClient: RedisClient ) => { let lastItemFetchedAt: Date | null = null - let lastValidItem: Item | null = null + let lastValidItem: RssFeedItem | null = null if (fetchResult.checksum === lastFetchedChecksum) { console.log('feed has not been updated', feedUrl, lastFetchedChecksum) @@ -425,7 +452,7 @@ const processSubscription = async ( // save each item in the feed for (const item of feed.items) { // use published or updated if isoDate is not available for atom feeds - item.isoDate = + const isoDate = item.isoDate || item.published || item.updated || item.created console.log('Processing feed item', item.links, item.isoDate, feed.feedUrl) @@ -434,21 +461,28 @@ const processSubscription = async ( continue } - item.link = getLink(item.links) - if (!item.link) { + const link = getLink(item.links) + if (!link) { console.log('Invalid feed item links', item.links) continue } - console.log('Fetching feed item', item.link) + console.log('Fetching feed item', link) + const feedItem = { + ...item, + isoDate, + link, + } - const publishedAt = item.isoDate ? new Date(item.isoDate) : new Date() + const publishedAt = feedItem.isoDate + ? new Date(feedItem.isoDate) + : new Date() // remember the last valid item if ( !lastValidItem || (lastValidItem.isoDate && publishedAt > new Date(lastValidItem.isoDate)) ) { - lastValidItem = item + lastValidItem = feedItem } // Max limit per-feed update @@ -457,20 +491,21 @@ const processSubscription = async ( } // skip old items - if (isOldItem(item, lastFetchedAt)) { - console.log('Skipping old feed item', item.link) + if (isOldItem(feedItem, lastFetchedAt)) { + console.log('Skipping old feed item', feedItem.link) continue } const created = await createTask( userId, feedUrl, - item, + feedItem, fetchContent, - folder + folder, + redisClient ) if (!created) { - console.error('Failed to create task for feed item', item.link) + console.error('Failed to create task for feed item', feedItem.link) continue } @@ -496,7 +531,8 @@ const processSubscription = async ( feedUrl, lastValidItem, fetchContent, - folder + folder, + redisClient ) if (!created) { console.error('Failed to create task for feed item', lastValidItem.link) @@ -536,6 +572,12 @@ export const rssHandler = Sentry.GCPFunction.wrapHttpFunction( return res.status(400).send('INVALID_REQUEST_BODY') } + // create redis client + const redisClient = await createRedisClient( + process.env.REDIS_URL, + process.env.REDIS_CERT + ) + const { feedUrl, subscriptionIds, @@ -563,22 +605,22 @@ export const rssHandler = Sentry.GCPFunction.wrapHttpFunction( console.log('Fetched feed', feed.title, new Date()) - await Promise.all( - subscriptionIds.map((_, i) => - processSubscription( - subscriptionIds[i], - userIds[i], - feedUrl, - fetchResult, - lastFetchedTimestamps[i], - scheduledTimestamps[i], - lastFetchedChecksums[i], - fetchContents[i] && allowFetchContent, - folders[i], - feed - ) + // process each subscription sequentially + for (let i = 0; i < subscriptionIds.length; i++) { + await processSubscription( + subscriptionIds[i], + userIds[i], + feedUrl, + fetchResult, + lastFetchedTimestamps[i], + scheduledTimestamps[i], + lastFetchedChecksums[i], + fetchContents[i] && allowFetchContent, + folders[i], + feed, + redisClient ) - ) + } res.send('ok') } catch (e) { diff --git a/packages/rss-handler/src/redis.ts b/packages/rss-handler/src/redis.ts new file mode 100644 index 000000000..350cb09f6 --- /dev/null +++ b/packages/rss-handler/src/redis.ts @@ -0,0 +1,26 @@ +import { createClient } from 'redis' + +export const createRedisClient = async (url?: string, cert?: string) => { + const redisClient = createClient({ + url, + socket: { + tls: url?.startsWith('rediss://'), // rediss:// is the protocol for TLS + cert: cert?.replace(/\\n/g, '\n'), // replace \n with new line + rejectUnauthorized: false, // for self-signed certs + connectTimeout: 10000, // 10 seconds + reconnectStrategy(retries: number): number | Error { + if (retries > 10) { + return new Error('Retries exhausted') + } + return 1000 + }, + }, + }) + + redisClient.on('error', (err) => console.error('Redis Client Error', err)) + + await redisClient.connect() + console.log('Redis Client Connected:', url) + + return redisClient +}