Cache and check feed checksums to reduce fetching

This commit is contained in:
Jackson Harper
2023-10-18 16:26:03 +08:00
parent 14349d8257
commit cf101c6d18
10 changed files with 85 additions and 25 deletions

View File

@ -59,6 +59,9 @@ export class Subscription {
@Column('timestamp', { nullable: true })
lastFetchedAt?: Date | null
@Column('text', { nullable: true })
lastFetchedChecksum?: string | null
@CreateDateColumn({ default: () => 'CURRENT_TIMESTAMP' })
createdAt!: Date

View File

@ -2978,6 +2978,7 @@ export type UpdateSubscriptionInput = {
description?: InputMaybe<Scalars['String']>;
id: Scalars['ID'];
lastFetchedAt?: InputMaybe<Scalars['Date']>;
lastfetchedChecksum?: InputMaybe<Scalars['String']>;
name?: InputMaybe<Scalars['String']>;
status?: InputMaybe<SubscriptionStatus>;
};

View File

@ -2391,6 +2391,7 @@ input UpdateSubscriptionInput {
description: String
id: ID!
lastFetchedAt: Date
lastfetchedChecksum: String
name: String
status: SubscriptionStatus
}

View File

@ -290,6 +290,7 @@ export const updateSubscriptionResolver = authorized<
lastFetchedAt: input.lastFetchedAt
? new Date(input.lastFetchedAt)
: undefined,
lastFetchedChecksum: input.lastfetchedChecksum,
status: input.status || undefined,
})

View File

@ -24,7 +24,7 @@ export function rssFeedRouter() {
// get all active rss feed subscriptions
const subscriptions = await getRepository(Subscription).find({
select: ['id', 'url', 'user', 'lastFetchedAt'],
select: ['id', 'url', 'user', 'lastFetchedAt', 'lastFetchedChecksum'],
where: {
type: SubscriptionType.Rss,
status: SubscriptionStatus.Active,

View File

@ -2548,6 +2548,7 @@ const schema = gql`
name: String
description: String
lastFetchedAt: Date
lastfetchedChecksum: String
status: SubscriptionStatus
}

View File

@ -601,6 +601,7 @@ export const enqueueRssFeedFetch = async (
subscriptionId: rssFeedSubscription.id,
feedUrl: rssFeedSubscription.url,
lastFetchedAt: rssFeedSubscription.lastFetchedAt?.getTime() || 0, // unix timestamp in milliseconds
lastFetchedChecksum: rssFeedSubscription.lastFetchedChecksum || null,
}
const headers = {

View File

@ -0,0 +1,7 @@
-- Type: DO
-- Name: add_checksum_to_subscriptions_table
-- Description: Add a last fetched checksum field to the subscriptions table
BEGIN;
COMMIT;

View File

@ -0,0 +1,9 @@
-- Type: UNDO
-- Name: add_checksum_to_subscriptions_table
-- Description: Add a last fetched checksum field to the subscriptions table
BEGIN;
ALTER TABLE omnivore.subscriptions DROP COLUMN last_fetched_checksum ;
COMMIT;

View File

@ -1,5 +1,6 @@
import * as Sentry from '@sentry/serverless'
import axios from 'axios'
import crypto from 'crypto'
import * as dotenv from 'dotenv' // see https://github.com/motdotla/dotenv#how-do-i-use-dotenv-with-import
import * as jwt from 'jsonwebtoken'
import Parser, { Item } from 'rss-parser'
@ -10,6 +11,7 @@ interface RssFeedRequest {
subscriptionId: string
feedUrl: string
lastFetchedAt: number // unix timestamp in milliseconds
lastFetchedChecksum: string | undefined
}
// link can be a string or an object
@ -21,10 +23,42 @@ function isRssFeedRequest(body: any): body is RssFeedRequest {
)
}
type FeedFetchResult = {
url: string
content: string
checksum: string
}
async function fetchAndChecksum(url: string): Promise<FeedFetchResult> {
try {
// Fetch the content from the URL
const response = await axios.get(url, {
responseType: 'arraybuffer',
headers: {
'User-Agent':
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
Accept:
'application/rss+xml, application/rdf+xml;q=0.8, application/atom+xml;q=0.6, application/xml;q=0.4, text/xml;q=0.4',
},
})
// Create a sha256 hash of the content
const hash = crypto.createHash('sha256')
hash.update(response.data)
return { url, content: response.data, checksum: hash.digest('hex') }
} catch (error) {
throw new Error(
`Failed to fetch or hash content from ${url}. Error: ${error}`
)
}
}
const sendUpdateSubscriptionMutation = async (
userId: string,
subscriptionId: string,
lastFetchedAt: Date
lastFetchedAt: Date,
lastFetchedChecksum: string
) => {
const JWT_SECRET = process.env.JWT_SECRET
const REST_BACKEND_ENDPOINT = process.env.REST_BACKEND_ENDPOINT
@ -51,6 +85,7 @@ const sendUpdateSubscriptionMutation = async (
input: {
id: subscriptionId,
lastFetchedAt,
lastFetchedChecksum,
},
},
})
@ -121,15 +156,12 @@ const parser = new Parser({
timeout: 60000, // 60 seconds
maxRedirects: 10,
customFields: {
item: [['link', 'links', { keepArray: true }], 'published', 'updated'],
feed: ['dc:date', 'lastBuildDate', 'pubDate'],
},
headers: {
// some rss feeds require user agent
'User-Agent':
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
Accept:
'application/rss+xml, application/rdf+xml;q=0.8, application/atom+xml;q=0.6, application/xml;q=0.4, text/xml;q=0.4',
item: [
['link', 'links', { keepArray: true }],
'published',
'updated',
'created',
],
},
})
@ -190,31 +222,34 @@ export const rssHandler = Sentry.GCPFunction.wrapHttpFunction(
return res.status(400).send('INVALID_REQUEST_BODY')
}
const { feedUrl, subscriptionId, lastFetchedAt } = req.body
const { feedUrl, subscriptionId, lastFetchedAt, lastFetchedChecksum } =
req.body
console.log('Processing feed', feedUrl, lastFetchedAt)
let lastItemFetchedAt: Date | null = null
let lastValidItem: Item | null = null
let updatedLastFetchedChecksum: string | null
let fetchResult = await fetchAndChecksum(feedUrl)
if (fetchResult.checksum === lastFetchedChecksum) {
console.log('feed has not been updated', feedUrl, lastFetchedChecksum)
return res.status(200)
}
updatedLastFetchedChecksum = fetchResult.checksum
// fetch feed
let itemCount = 0
const feed = await parser.parseURL(feedUrl)
console.log('Fetched feed', feed, new Date())
const feedPubDate = (feed['dc:date'] ||
feed.pubDate ||
feed.lastBuildDate) as string | undefined
console.log('Feed pub date', feedPubDate)
if (feedPubDate && new Date(feedPubDate) < new Date(lastFetchedAt)) {
console.log('Skipping old feed', feedPubDate)
return res.send('ok')
}
const feed = await parser.parseString(fetchResult.content)
console.log('Fetched feed', feed.title, new Date())
// save each item in the feed
for (const item of feed.items) {
// use published or updated if isoDate is not available for atom feeds
item.isoDate =
item.isoDate || (item.published as string) || (item.updated as string)
item.isoDate ||
(item.published as string) ||
(item.updated as string) ||
(item.created as string)
console.log('Processing feed item', item.links, item.isoDate)
if (!item.links || item.links.length === 0) {
@ -299,7 +334,8 @@ export const rssHandler = Sentry.GCPFunction.wrapHttpFunction(
const updatedSubscription = await sendUpdateSubscriptionMutation(
userId,
subscriptionId,
lastItemFetchedAt
lastItemFetchedAt,
updatedLastFetchedChecksum
)
console.log('Updated subscription', updatedSubscription)