From cf101c6d18e39f252d7b2b0355803201ca0b96be Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Wed, 18 Oct 2023 16:26:03 +0800 Subject: [PATCH] Cache and check feed checksums to reduce fetching --- packages/api/src/entity/subscription.ts | 3 + packages/api/src/generated/graphql.ts | 1 + packages/api/src/generated/schema.graphql | 1 + .../api/src/resolvers/subscriptions/index.ts | 1 + packages/api/src/routers/svc/rss_feed.ts | 2 +- packages/api/src/schema.ts | 1 + packages/api/src/utils/createTask.ts | 1 + ...do.add_checksum_to_subscriptions_table.sql | 7 ++ ...do.add_checksum_to_subscriptions_table.sql | 9 ++ packages/rss-handler/src/index.ts | 84 +++++++++++++------ 10 files changed, 85 insertions(+), 25 deletions(-) create mode 100755 packages/db/migrations/0138.do.add_checksum_to_subscriptions_table.sql create mode 100755 packages/db/migrations/0138.undo.add_checksum_to_subscriptions_table.sql diff --git a/packages/api/src/entity/subscription.ts b/packages/api/src/entity/subscription.ts index 90f7d593b..b63400d56 100644 --- a/packages/api/src/entity/subscription.ts +++ b/packages/api/src/entity/subscription.ts @@ -59,6 +59,9 @@ export class Subscription { @Column('timestamp', { nullable: true }) lastFetchedAt?: Date | null + @Column('text', { nullable: true }) + lastFetchedChecksum?: string | null + @CreateDateColumn({ default: () => 'CURRENT_TIMESTAMP' }) createdAt!: Date diff --git a/packages/api/src/generated/graphql.ts b/packages/api/src/generated/graphql.ts index f0f41ff68..8ceb89481 100644 --- a/packages/api/src/generated/graphql.ts +++ b/packages/api/src/generated/graphql.ts @@ -2978,6 +2978,7 @@ export type UpdateSubscriptionInput = { description?: InputMaybe; id: Scalars['ID']; lastFetchedAt?: InputMaybe; + lastfetchedChecksum?: InputMaybe; name?: InputMaybe; status?: InputMaybe; }; diff --git a/packages/api/src/generated/schema.graphql b/packages/api/src/generated/schema.graphql index 3ef33f886..39a6c00e7 100644 --- a/packages/api/src/generated/schema.graphql +++ b/packages/api/src/generated/schema.graphql @@ -2391,6 +2391,7 @@ input UpdateSubscriptionInput { description: String id: ID! lastFetchedAt: Date + lastfetchedChecksum: String name: String status: SubscriptionStatus } diff --git a/packages/api/src/resolvers/subscriptions/index.ts b/packages/api/src/resolvers/subscriptions/index.ts index 74375441d..a1069423b 100644 --- a/packages/api/src/resolvers/subscriptions/index.ts +++ b/packages/api/src/resolvers/subscriptions/index.ts @@ -290,6 +290,7 @@ export const updateSubscriptionResolver = authorized< lastFetchedAt: input.lastFetchedAt ? new Date(input.lastFetchedAt) : undefined, + lastFetchedChecksum: input.lastfetchedChecksum, status: input.status || undefined, }) diff --git a/packages/api/src/routers/svc/rss_feed.ts b/packages/api/src/routers/svc/rss_feed.ts index 0834e5025..c06ec9f71 100644 --- a/packages/api/src/routers/svc/rss_feed.ts +++ b/packages/api/src/routers/svc/rss_feed.ts @@ -24,7 +24,7 @@ export function rssFeedRouter() { // get all active rss feed subscriptions const subscriptions = await getRepository(Subscription).find({ - select: ['id', 'url', 'user', 'lastFetchedAt'], + select: ['id', 'url', 'user', 'lastFetchedAt', 'lastFetchedChecksum'], where: { type: SubscriptionType.Rss, status: SubscriptionStatus.Active, diff --git a/packages/api/src/schema.ts b/packages/api/src/schema.ts index e3b2b3442..6d78471cb 100755 --- a/packages/api/src/schema.ts +++ b/packages/api/src/schema.ts @@ -2548,6 +2548,7 @@ const schema = gql` name: String description: String lastFetchedAt: Date + lastfetchedChecksum: String status: SubscriptionStatus } diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index bb3457e00..483921bc9 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -601,6 +601,7 @@ export const enqueueRssFeedFetch = async ( subscriptionId: rssFeedSubscription.id, feedUrl: rssFeedSubscription.url, lastFetchedAt: rssFeedSubscription.lastFetchedAt?.getTime() || 0, // unix timestamp in milliseconds + lastFetchedChecksum: rssFeedSubscription.lastFetchedChecksum || null, } const headers = { diff --git a/packages/db/migrations/0138.do.add_checksum_to_subscriptions_table.sql b/packages/db/migrations/0138.do.add_checksum_to_subscriptions_table.sql new file mode 100755 index 000000000..5f704a666 --- /dev/null +++ b/packages/db/migrations/0138.do.add_checksum_to_subscriptions_table.sql @@ -0,0 +1,7 @@ +-- Type: DO +-- Name: add_checksum_to_subscriptions_table +-- Description: Add a last fetched checksum field to the subscriptions table + +BEGIN; + +COMMIT; diff --git a/packages/db/migrations/0138.undo.add_checksum_to_subscriptions_table.sql b/packages/db/migrations/0138.undo.add_checksum_to_subscriptions_table.sql new file mode 100755 index 000000000..e138d501a --- /dev/null +++ b/packages/db/migrations/0138.undo.add_checksum_to_subscriptions_table.sql @@ -0,0 +1,9 @@ +-- Type: UNDO +-- Name: add_checksum_to_subscriptions_table +-- Description: Add a last fetched checksum field to the subscriptions table + +BEGIN; + +ALTER TABLE omnivore.subscriptions DROP COLUMN last_fetched_checksum ; + +COMMIT; diff --git a/packages/rss-handler/src/index.ts b/packages/rss-handler/src/index.ts index 60e789e73..4edd58709 100644 --- a/packages/rss-handler/src/index.ts +++ b/packages/rss-handler/src/index.ts @@ -1,5 +1,6 @@ import * as Sentry from '@sentry/serverless' import axios from 'axios' +import crypto from 'crypto' import * as dotenv from 'dotenv' // see https://github.com/motdotla/dotenv#how-do-i-use-dotenv-with-import import * as jwt from 'jsonwebtoken' import Parser, { Item } from 'rss-parser' @@ -10,6 +11,7 @@ interface RssFeedRequest { subscriptionId: string feedUrl: string lastFetchedAt: number // unix timestamp in milliseconds + lastFetchedChecksum: string | undefined } // link can be a string or an object @@ -21,10 +23,42 @@ function isRssFeedRequest(body: any): body is RssFeedRequest { ) } +type FeedFetchResult = { + url: string + content: string + checksum: string +} + +async function fetchAndChecksum(url: string): Promise { + try { + // Fetch the content from the URL + const response = await axios.get(url, { + responseType: 'arraybuffer', + headers: { + 'User-Agent': + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36', + Accept: + 'application/rss+xml, application/rdf+xml;q=0.8, application/atom+xml;q=0.6, application/xml;q=0.4, text/xml;q=0.4', + }, + }) + + // Create a sha256 hash of the content + const hash = crypto.createHash('sha256') + hash.update(response.data) + + return { url, content: response.data, checksum: hash.digest('hex') } + } catch (error) { + throw new Error( + `Failed to fetch or hash content from ${url}. Error: ${error}` + ) + } +} + const sendUpdateSubscriptionMutation = async ( userId: string, subscriptionId: string, - lastFetchedAt: Date + lastFetchedAt: Date, + lastFetchedChecksum: string ) => { const JWT_SECRET = process.env.JWT_SECRET const REST_BACKEND_ENDPOINT = process.env.REST_BACKEND_ENDPOINT @@ -51,6 +85,7 @@ const sendUpdateSubscriptionMutation = async ( input: { id: subscriptionId, lastFetchedAt, + lastFetchedChecksum, }, }, }) @@ -121,15 +156,12 @@ const parser = new Parser({ timeout: 60000, // 60 seconds maxRedirects: 10, customFields: { - item: [['link', 'links', { keepArray: true }], 'published', 'updated'], - feed: ['dc:date', 'lastBuildDate', 'pubDate'], - }, - headers: { - // some rss feeds require user agent - 'User-Agent': - 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36', - Accept: - 'application/rss+xml, application/rdf+xml;q=0.8, application/atom+xml;q=0.6, application/xml;q=0.4, text/xml;q=0.4', + item: [ + ['link', 'links', { keepArray: true }], + 'published', + 'updated', + 'created', + ], }, }) @@ -190,31 +222,34 @@ export const rssHandler = Sentry.GCPFunction.wrapHttpFunction( return res.status(400).send('INVALID_REQUEST_BODY') } - const { feedUrl, subscriptionId, lastFetchedAt } = req.body + const { feedUrl, subscriptionId, lastFetchedAt, lastFetchedChecksum } = + req.body console.log('Processing feed', feedUrl, lastFetchedAt) let lastItemFetchedAt: Date | null = null let lastValidItem: Item | null = null + let updatedLastFetchedChecksum: string | null + + let fetchResult = await fetchAndChecksum(feedUrl) + if (fetchResult.checksum === lastFetchedChecksum) { + console.log('feed has not been updated', feedUrl, lastFetchedChecksum) + return res.status(200) + } + updatedLastFetchedChecksum = fetchResult.checksum // fetch feed let itemCount = 0 - const feed = await parser.parseURL(feedUrl) - console.log('Fetched feed', feed, new Date()) - - const feedPubDate = (feed['dc:date'] || - feed.pubDate || - feed.lastBuildDate) as string | undefined - console.log('Feed pub date', feedPubDate) - if (feedPubDate && new Date(feedPubDate) < new Date(lastFetchedAt)) { - console.log('Skipping old feed', feedPubDate) - return res.send('ok') - } + const feed = await parser.parseString(fetchResult.content) + console.log('Fetched feed', feed.title, new Date()) // save each item in the feed for (const item of feed.items) { // use published or updated if isoDate is not available for atom feeds item.isoDate = - item.isoDate || (item.published as string) || (item.updated as string) + item.isoDate || + (item.published as string) || + (item.updated as string) || + (item.created as string) console.log('Processing feed item', item.links, item.isoDate) if (!item.links || item.links.length === 0) { @@ -299,7 +334,8 @@ export const rssHandler = Sentry.GCPFunction.wrapHttpFunction( const updatedSubscription = await sendUpdateSubscriptionMutation( userId, subscriptionId, - lastItemFetchedAt + lastItemFetchedAt, + updatedLastFetchedChecksum ) console.log('Updated subscription', updatedSubscription)