Merge pull request #2962 from omnivore-app/feat/api-cache-rss-checksums
Cache and check feed checksums to reduce fetching
This commit is contained in:
@ -59,6 +59,9 @@ export class Subscription {
|
||||
@Column('timestamp', { nullable: true })
|
||||
lastFetchedAt?: Date | null
|
||||
|
||||
@Column('text', { nullable: true })
|
||||
lastFetchedChecksum?: string | null
|
||||
|
||||
@CreateDateColumn({ default: () => 'CURRENT_TIMESTAMP' })
|
||||
createdAt!: Date
|
||||
|
||||
|
||||
@ -2978,6 +2978,7 @@ export type UpdateSubscriptionInput = {
|
||||
description?: InputMaybe<Scalars['String']>;
|
||||
id: Scalars['ID'];
|
||||
lastFetchedAt?: InputMaybe<Scalars['Date']>;
|
||||
lastFetchedChecksum?: InputMaybe<Scalars['String']>;
|
||||
name?: InputMaybe<Scalars['String']>;
|
||||
status?: InputMaybe<SubscriptionStatus>;
|
||||
};
|
||||
|
||||
@ -2391,6 +2391,7 @@ input UpdateSubscriptionInput {
|
||||
description: String
|
||||
id: ID!
|
||||
lastFetchedAt: Date
|
||||
lastFetchedChecksum: String
|
||||
name: String
|
||||
status: SubscriptionStatus
|
||||
}
|
||||
|
||||
@ -290,6 +290,7 @@ export const updateSubscriptionResolver = authorized<
|
||||
lastFetchedAt: input.lastFetchedAt
|
||||
? new Date(input.lastFetchedAt)
|
||||
: undefined,
|
||||
lastFetchedChecksum: input.lastFetchedChecksum || undefined,
|
||||
status: input.status || undefined,
|
||||
})
|
||||
|
||||
|
||||
@ -24,7 +24,7 @@ export function rssFeedRouter() {
|
||||
|
||||
// get all active rss feed subscriptions
|
||||
const subscriptions = await getRepository(Subscription).find({
|
||||
select: ['id', 'url', 'user', 'lastFetchedAt'],
|
||||
select: ['id', 'url', 'user', 'lastFetchedAt', 'lastFetchedChecksum'],
|
||||
where: {
|
||||
type: SubscriptionType.Rss,
|
||||
status: SubscriptionStatus.Active,
|
||||
|
||||
@ -2548,6 +2548,7 @@ const schema = gql`
|
||||
name: String
|
||||
description: String
|
||||
lastFetchedAt: Date
|
||||
lastFetchedChecksum: String
|
||||
status: SubscriptionStatus
|
||||
}
|
||||
|
||||
|
||||
@ -601,6 +601,7 @@ export const enqueueRssFeedFetch = async (
|
||||
subscriptionId: rssFeedSubscription.id,
|
||||
feedUrl: rssFeedSubscription.url,
|
||||
lastFetchedAt: rssFeedSubscription.lastFetchedAt?.getTime() || 0, // unix timestamp in milliseconds
|
||||
lastFetchedChecksum: rssFeedSubscription.lastFetchedChecksum || null,
|
||||
}
|
||||
|
||||
const headers = {
|
||||
|
||||
9
packages/db/migrations/0138.do.add_checksum_to_subscriptions_table.sql
Executable file
9
packages/db/migrations/0138.do.add_checksum_to_subscriptions_table.sql
Executable file
@ -0,0 +1,9 @@
|
||||
-- Type: DO
|
||||
-- Name: add_checksum_to_subscriptions_table
|
||||
-- Description: Add a last fetched checksum field to the subscriptions table
|
||||
|
||||
BEGIN;
|
||||
|
||||
ALTER TABLE omnivore.subscriptions ADD COLUMN last_fetched_checksum TEXT ;
|
||||
|
||||
COMMIT;
|
||||
9
packages/db/migrations/0138.undo.add_checksum_to_subscriptions_table.sql
Executable file
9
packages/db/migrations/0138.undo.add_checksum_to_subscriptions_table.sql
Executable file
@ -0,0 +1,9 @@
|
||||
-- Type: UNDO
|
||||
-- Name: add_checksum_to_subscriptions_table
|
||||
-- Description: Add a last fetched checksum field to the subscriptions table
|
||||
|
||||
BEGIN;
|
||||
|
||||
ALTER TABLE omnivore.subscriptions DROP COLUMN last_fetched_checksum ;
|
||||
|
||||
COMMIT;
|
||||
@ -17,7 +17,8 @@
|
||||
"devDependencies": {
|
||||
"chai": "^4.3.6",
|
||||
"eslint-plugin-prettier": "^4.0.0",
|
||||
"mocha": "^10.0.0"
|
||||
"mocha": "^10.0.0",
|
||||
"nock": "^13.3.4"
|
||||
},
|
||||
"dependencies": {
|
||||
"@google-cloud/functions-framework": "3.1.2",
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import * as Sentry from '@sentry/serverless'
|
||||
import axios from 'axios'
|
||||
import crypto from 'crypto'
|
||||
import * as dotenv from 'dotenv' // see https://github.com/motdotla/dotenv#how-do-i-use-dotenv-with-import
|
||||
import * as jwt from 'jsonwebtoken'
|
||||
import Parser, { Item } from 'rss-parser'
|
||||
@ -10,6 +11,7 @@ interface RssFeedRequest {
|
||||
subscriptionId: string
|
||||
feedUrl: string
|
||||
lastFetchedAt: number // unix timestamp in milliseconds
|
||||
lastFetchedChecksum: string | undefined
|
||||
}
|
||||
|
||||
// link can be a string or an object
|
||||
@ -21,10 +23,37 @@ function isRssFeedRequest(body: any): body is RssFeedRequest {
|
||||
)
|
||||
}
|
||||
|
||||
export const fetchAndChecksum = async (url: string) => {
|
||||
try {
|
||||
const response = await axios.get(url, {
|
||||
responseType: 'arraybuffer',
|
||||
timeout: 60_000,
|
||||
maxRedirects: 10,
|
||||
headers: {
|
||||
'User-Agent':
|
||||
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
|
||||
Accept:
|
||||
'application/rss+xml, application/rdf+xml;q=0.8, application/atom+xml;q=0.6, application/xml;q=0.4, text/xml;q=0.4',
|
||||
},
|
||||
})
|
||||
|
||||
const hash = crypto.createHash('sha256')
|
||||
hash.update(response.data as Buffer)
|
||||
|
||||
const dataStr = (response.data as Buffer).toString()
|
||||
|
||||
return { url, content: dataStr, checksum: hash.digest('hex') }
|
||||
} catch (error) {
|
||||
console.log(error)
|
||||
throw new Error(`Failed to fetch or hash content from ${url}.`)
|
||||
}
|
||||
}
|
||||
|
||||
const sendUpdateSubscriptionMutation = async (
|
||||
userId: string,
|
||||
subscriptionId: string,
|
||||
lastFetchedAt: Date
|
||||
lastFetchedAt: Date,
|
||||
lastFetchedChecksum: string
|
||||
) => {
|
||||
const JWT_SECRET = process.env.JWT_SECRET
|
||||
const REST_BACKEND_ENDPOINT = process.env.REST_BACKEND_ENDPOINT
|
||||
@ -51,6 +80,7 @@ const sendUpdateSubscriptionMutation = async (
|
||||
input: {
|
||||
id: subscriptionId,
|
||||
lastFetchedAt,
|
||||
lastFetchedChecksum,
|
||||
},
|
||||
},
|
||||
})
|
||||
@ -118,19 +148,15 @@ Sentry.GCPFunction.init({
|
||||
|
||||
const signToken = promisify(jwt.sign)
|
||||
const parser = new Parser({
|
||||
timeout: 60000, // 60 seconds
|
||||
maxRedirects: 10,
|
||||
customFields: {
|
||||
item: [['link', 'links', { keepArray: true }], 'published', 'updated'],
|
||||
item: [
|
||||
['link', 'links', { keepArray: true }],
|
||||
'published',
|
||||
'updated',
|
||||
'created',
|
||||
],
|
||||
feed: ['dc:date', 'lastBuildDate', 'pubDate'],
|
||||
},
|
||||
headers: {
|
||||
// some rss feeds require user agent
|
||||
'User-Agent':
|
||||
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
|
||||
Accept:
|
||||
'application/rss+xml, application/rdf+xml;q=0.8, application/atom+xml;q=0.6, application/xml;q=0.4, text/xml;q=0.4',
|
||||
},
|
||||
})
|
||||
|
||||
// get link following the order of preference: via, alternate, self
|
||||
@ -190,16 +216,24 @@ export const rssHandler = Sentry.GCPFunction.wrapHttpFunction(
|
||||
return res.status(400).send('INVALID_REQUEST_BODY')
|
||||
}
|
||||
|
||||
const { feedUrl, subscriptionId, lastFetchedAt } = req.body
|
||||
const { feedUrl, subscriptionId, lastFetchedAt, lastFetchedChecksum } =
|
||||
req.body
|
||||
console.log('Processing feed', feedUrl, lastFetchedAt)
|
||||
|
||||
let lastItemFetchedAt: Date | null = null
|
||||
let lastValidItem: Item | null = null
|
||||
|
||||
const fetchResult = await fetchAndChecksum(feedUrl)
|
||||
if (fetchResult.checksum === lastFetchedChecksum) {
|
||||
console.log('feed has not been updated', feedUrl, lastFetchedChecksum)
|
||||
return res.status(200)
|
||||
}
|
||||
const updatedLastFetchedChecksum = fetchResult.checksum
|
||||
|
||||
// fetch feed
|
||||
let itemCount = 0
|
||||
const feed = await parser.parseURL(feedUrl)
|
||||
console.log('Fetched feed', feed, new Date())
|
||||
const feed = await parser.parseString(fetchResult.content)
|
||||
console.log('Fetched feed', feed.title, new Date())
|
||||
|
||||
const feedPubDate = (feed['dc:date'] ||
|
||||
feed.pubDate ||
|
||||
@ -214,7 +248,10 @@ export const rssHandler = Sentry.GCPFunction.wrapHttpFunction(
|
||||
for (const item of feed.items) {
|
||||
// use published or updated if isoDate is not available for atom feeds
|
||||
item.isoDate =
|
||||
item.isoDate || (item.published as string) || (item.updated as string)
|
||||
item.isoDate ||
|
||||
(item.published as string) ||
|
||||
(item.updated as string) ||
|
||||
(item.created as string)
|
||||
console.log('Processing feed item', item.links, item.isoDate)
|
||||
|
||||
if (!item.links || item.links.length === 0) {
|
||||
@ -299,7 +336,8 @@ export const rssHandler = Sentry.GCPFunction.wrapHttpFunction(
|
||||
const updatedSubscription = await sendUpdateSubscriptionMutation(
|
||||
userId,
|
||||
subscriptionId,
|
||||
lastItemFetchedAt
|
||||
lastItemFetchedAt,
|
||||
updatedLastFetchedChecksum
|
||||
)
|
||||
console.log('Updated subscription', updatedSubscription)
|
||||
|
||||
|
||||
14
packages/rss-handler/test/checksum.test.ts
Normal file
14
packages/rss-handler/test/checksum.test.ts
Normal file
@ -0,0 +1,14 @@
|
||||
import 'mocha'
|
||||
import nock from 'nock'
|
||||
import { expect } from 'chai'
|
||||
import { fetchAndChecksum } from '../src/index'
|
||||
|
||||
describe('fetchAndChecksum', () => {
|
||||
it('should hash the content available', async () => {
|
||||
nock('https://fake.com', {}).get('/rss.xml').reply(200, 'i am some content')
|
||||
const result = await fetchAndChecksum('https://fake.com/rss.xml')
|
||||
expect(result.checksum).to.eq(
|
||||
'd6bc10faec048d999d0cf4b2f7103d84557fb9cd94c3bccd17884b1288949375'
|
||||
)
|
||||
})
|
||||
})
|
||||
@ -1,8 +0,0 @@
|
||||
import 'mocha'
|
||||
import { expect } from 'chai'
|
||||
|
||||
describe('stub test', () => {
|
||||
it('should pass', () => {
|
||||
expect(true).to.be.true
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user