skip recently saved rss item
This commit is contained in:
@ -29,6 +29,7 @@
|
||||
"dotenv": "^16.0.1",
|
||||
"jsonwebtoken": "^8.5.1",
|
||||
"linkedom": "^0.16.4",
|
||||
"redis": "^4.3.1",
|
||||
"rss-parser": "^3.13.0"
|
||||
},
|
||||
"volta": {
|
||||
|
||||
@ -4,11 +4,15 @@ import crypto from 'crypto'
|
||||
import * as dotenv from 'dotenv' // see https://github.com/motdotla/dotenv#how-do-i-use-dotenv-with-import
|
||||
import * as jwt from 'jsonwebtoken'
|
||||
import { parseHTML } from 'linkedom'
|
||||
import { createClient } from 'redis'
|
||||
import Parser, { Item } from 'rss-parser'
|
||||
import { promisify } from 'util'
|
||||
import { createRedisClient } from './redis'
|
||||
import { CONTENT_FETCH_URL, createCloudTask } from './task'
|
||||
|
||||
type FolderType = 'following' | 'inbox'
|
||||
// explicitly create the return type of RedisClient
|
||||
type RedisClient = ReturnType<typeof createClient>
|
||||
|
||||
interface RssFeedRequest {
|
||||
subscriptionIds: string[]
|
||||
@ -42,6 +46,7 @@ type RssFeedItemMedia = {
|
||||
type RssFeedItem = Item & {
|
||||
'media:thumbnail'?: RssFeedItemMedia
|
||||
'media:content'?: RssFeedItemMedia[]
|
||||
link: string
|
||||
}
|
||||
|
||||
export const isOldItem = (item: RssFeedItem, lastFetchedAt: number) => {
|
||||
@ -214,13 +219,34 @@ const sendUpdateSubscriptionMutation = async (
|
||||
}
|
||||
}
|
||||
|
||||
const isItemRecentlySaved = async (
|
||||
redisClient: RedisClient,
|
||||
userId: string,
|
||||
url: string
|
||||
) => {
|
||||
const key = `rss-recent-save:${userId}:${url}`
|
||||
const result = await redisClient.get(key)
|
||||
return !!result
|
||||
}
|
||||
|
||||
const createTask = async (
|
||||
userId: string,
|
||||
feedUrl: string,
|
||||
item: RssFeedItem,
|
||||
fetchContent: boolean,
|
||||
folder: FolderType
|
||||
folder: FolderType,
|
||||
redisClient: RedisClient
|
||||
) => {
|
||||
const isRecentlySaved = await isItemRecentlySaved(
|
||||
redisClient,
|
||||
userId,
|
||||
item.link
|
||||
)
|
||||
if (isRecentlySaved) {
|
||||
console.log('Item recently saved', item.link)
|
||||
return true
|
||||
}
|
||||
|
||||
if (folder === 'following' && !fetchContent) {
|
||||
return createItemWithPreviewContent(userId, feedUrl, item)
|
||||
}
|
||||
@ -363,7 +389,7 @@ const getUpdatePeriodInHours = (feed: RssFeed) => {
|
||||
}
|
||||
|
||||
// get link following the order of preference: via, alternate, self
|
||||
const getLink = (links: RssFeedItemLink[]) => {
|
||||
const getLink = (links: RssFeedItemLink[]): string | undefined => {
|
||||
// sort links by preference
|
||||
const sortedLinks: string[] = []
|
||||
|
||||
@ -398,10 +424,11 @@ const processSubscription = async (
|
||||
lastFetchedChecksum: string,
|
||||
fetchContent: boolean,
|
||||
folder: FolderType,
|
||||
feed: RssFeed
|
||||
feed: RssFeed,
|
||||
redisClient: RedisClient
|
||||
) => {
|
||||
let lastItemFetchedAt: Date | null = null
|
||||
let lastValidItem: Item | null = null
|
||||
let lastValidItem: RssFeedItem | null = null
|
||||
|
||||
if (fetchResult.checksum === lastFetchedChecksum) {
|
||||
console.log('feed has not been updated', feedUrl, lastFetchedChecksum)
|
||||
@ -425,7 +452,7 @@ const processSubscription = async (
|
||||
// save each item in the feed
|
||||
for (const item of feed.items) {
|
||||
// use published or updated if isoDate is not available for atom feeds
|
||||
item.isoDate =
|
||||
const isoDate =
|
||||
item.isoDate || item.published || item.updated || item.created
|
||||
console.log('Processing feed item', item.links, item.isoDate, feed.feedUrl)
|
||||
|
||||
@ -434,21 +461,28 @@ const processSubscription = async (
|
||||
continue
|
||||
}
|
||||
|
||||
item.link = getLink(item.links)
|
||||
if (!item.link) {
|
||||
const link = getLink(item.links)
|
||||
if (!link) {
|
||||
console.log('Invalid feed item links', item.links)
|
||||
continue
|
||||
}
|
||||
|
||||
console.log('Fetching feed item', item.link)
|
||||
console.log('Fetching feed item', link)
|
||||
const feedItem = {
|
||||
...item,
|
||||
isoDate,
|
||||
link,
|
||||
}
|
||||
|
||||
const publishedAt = item.isoDate ? new Date(item.isoDate) : new Date()
|
||||
const publishedAt = feedItem.isoDate
|
||||
? new Date(feedItem.isoDate)
|
||||
: new Date()
|
||||
// remember the last valid item
|
||||
if (
|
||||
!lastValidItem ||
|
||||
(lastValidItem.isoDate && publishedAt > new Date(lastValidItem.isoDate))
|
||||
) {
|
||||
lastValidItem = item
|
||||
lastValidItem = feedItem
|
||||
}
|
||||
|
||||
// Max limit per-feed update
|
||||
@ -457,20 +491,21 @@ const processSubscription = async (
|
||||
}
|
||||
|
||||
// skip old items
|
||||
if (isOldItem(item, lastFetchedAt)) {
|
||||
console.log('Skipping old feed item', item.link)
|
||||
if (isOldItem(feedItem, lastFetchedAt)) {
|
||||
console.log('Skipping old feed item', feedItem.link)
|
||||
continue
|
||||
}
|
||||
|
||||
const created = await createTask(
|
||||
userId,
|
||||
feedUrl,
|
||||
item,
|
||||
feedItem,
|
||||
fetchContent,
|
||||
folder
|
||||
folder,
|
||||
redisClient
|
||||
)
|
||||
if (!created) {
|
||||
console.error('Failed to create task for feed item', item.link)
|
||||
console.error('Failed to create task for feed item', feedItem.link)
|
||||
continue
|
||||
}
|
||||
|
||||
@ -496,7 +531,8 @@ const processSubscription = async (
|
||||
feedUrl,
|
||||
lastValidItem,
|
||||
fetchContent,
|
||||
folder
|
||||
folder,
|
||||
redisClient
|
||||
)
|
||||
if (!created) {
|
||||
console.error('Failed to create task for feed item', lastValidItem.link)
|
||||
@ -536,6 +572,12 @@ export const rssHandler = Sentry.GCPFunction.wrapHttpFunction(
|
||||
return res.status(400).send('INVALID_REQUEST_BODY')
|
||||
}
|
||||
|
||||
// create redis client
|
||||
const redisClient = await createRedisClient(
|
||||
process.env.REDIS_URL,
|
||||
process.env.REDIS_CERT
|
||||
)
|
||||
|
||||
const {
|
||||
feedUrl,
|
||||
subscriptionIds,
|
||||
@ -563,22 +605,22 @@ export const rssHandler = Sentry.GCPFunction.wrapHttpFunction(
|
||||
|
||||
console.log('Fetched feed', feed.title, new Date())
|
||||
|
||||
await Promise.all(
|
||||
subscriptionIds.map((_, i) =>
|
||||
processSubscription(
|
||||
subscriptionIds[i],
|
||||
userIds[i],
|
||||
feedUrl,
|
||||
fetchResult,
|
||||
lastFetchedTimestamps[i],
|
||||
scheduledTimestamps[i],
|
||||
lastFetchedChecksums[i],
|
||||
fetchContents[i] && allowFetchContent,
|
||||
folders[i],
|
||||
feed
|
||||
)
|
||||
// process each subscription sequentially
|
||||
for (let i = 0; i < subscriptionIds.length; i++) {
|
||||
await processSubscription(
|
||||
subscriptionIds[i],
|
||||
userIds[i],
|
||||
feedUrl,
|
||||
fetchResult,
|
||||
lastFetchedTimestamps[i],
|
||||
scheduledTimestamps[i],
|
||||
lastFetchedChecksums[i],
|
||||
fetchContents[i] && allowFetchContent,
|
||||
folders[i],
|
||||
feed,
|
||||
redisClient
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
res.send('ok')
|
||||
} catch (e) {
|
||||
|
||||
26
packages/rss-handler/src/redis.ts
Normal file
26
packages/rss-handler/src/redis.ts
Normal file
@ -0,0 +1,26 @@
|
||||
import { createClient } from 'redis'
|
||||
|
||||
export const createRedisClient = async (url?: string, cert?: string) => {
|
||||
const redisClient = createClient({
|
||||
url,
|
||||
socket: {
|
||||
tls: url?.startsWith('rediss://'), // rediss:// is the protocol for TLS
|
||||
cert: cert?.replace(/\\n/g, '\n'), // replace \n with new line
|
||||
rejectUnauthorized: false, // for self-signed certs
|
||||
connectTimeout: 10000, // 10 seconds
|
||||
reconnectStrategy(retries: number): number | Error {
|
||||
if (retries > 10) {
|
||||
return new Error('Retries exhausted')
|
||||
}
|
||||
return 1000
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
redisClient.on('error', (err) => console.error('Redis Client Error', err))
|
||||
|
||||
await redisClient.connect()
|
||||
console.log('Redis Client Connected:', url)
|
||||
|
||||
return redisClient
|
||||
}
|
||||
Reference in New Issue
Block a user