Update rss handler to fetch a group of rss feeds in one task

This commit is contained in:
Hongbo Wu
2023-10-19 17:25:07 +08:00
parent f50d1c9cb9
commit 1880f2ace4
7 changed files with 225 additions and 161 deletions

View File

@ -2980,6 +2980,7 @@ export type UpdateSubscriptionInput = {
lastFetchedAt?: InputMaybe<Scalars['Date']>;
lastFetchedChecksum?: InputMaybe<Scalars['String']>;
name?: InputMaybe<Scalars['String']>;
scheduledAt?: InputMaybe<Scalars['Date']>;
status?: InputMaybe<SubscriptionStatus>;
};

View File

@ -2393,6 +2393,7 @@ input UpdateSubscriptionInput {
lastFetchedAt: Date
lastFetchedChecksum: String
name: String
scheduledAt: Date
status: SubscriptionStatus
}

View File

@ -239,11 +239,12 @@ export const subscribeResolver = authorized<
// create a cloud task to fetch rss feed item for the new subscription
await enqueueRssFeedFetch({
user_ids: [uid],
userIds: [uid],
url: input.url,
subscription_ids: [newSubscription.id],
scheduled_timestamps: [null],
last_fetched_timestamps: [null],
subscriptionIds: [newSubscription.id],
scheduledDates: [new Date()], // fetch immediately
fetchedDates: [null],
checksums: [null],
})
return {
@ -300,6 +301,9 @@ export const updateSubscriptionResolver = authorized<
: undefined,
lastFetchedChecksum: input.lastFetchedChecksum || undefined,
status: input.status || undefined,
scheduledAt: input.scheduledAt
? new Date(input.scheduledAt)
: undefined,
})
return repo.findOneByOrFail({

View File

@ -30,10 +30,11 @@ export function rssFeedRouter() {
`
SELECT
url,
ARRAY_AGG(id) AS subscription_ids,
ARRAY_AGG(user_id) AS user_ids,
ARRAY_AGG(last_fetched_at) AS last_fetched_timestamps,
ARRAY_AGG(scheduled_at) AS scheduled_timestamps
ARRAY_AGG(id) AS "subscriptionIds",
ARRAY_AGG(user_id) AS "userIds",
ARRAY_AGG(last_fetched_at) AS "fetchedDates",
ARRAY_AGG(IFNULL(scheduled_at, NOW())) AS "scheduledDates",
ARRAY_AGG(last_fetched_checksum) AS "checksums"
FROM
omnivore.subscriptions
WHERE

View File

@ -2550,6 +2550,7 @@ const schema = gql`
lastFetchedAt: Date
lastFetchedChecksum: String
status: SubscriptionStatus
scheduledAt: Date
}
union UpdateSubscriptionResult =

View File

@ -593,10 +593,11 @@ export const enqueueThumbnailTask = async (
export interface RssSubscriptionGroup {
url: string
subscription_ids: string[]
user_ids: string[]
last_fetched_timestamps: (Date | null)[]
scheduled_timestamps: (Date | null)[]
subscriptionIds: string[]
userIds: string[]
fetchedDates: (Date | null)[]
scheduledDates: Date[]
checksums: (string | null)[]
}
export const enqueueRssFeedFetch = async (
@ -604,16 +605,16 @@ export const enqueueRssFeedFetch = async (
): Promise<string> => {
const { GOOGLE_CLOUD_PROJECT, PUBSUB_VERIFICATION_TOKEN } = process.env
const payload = {
subscriptionIds: subscriptionGroup.subscription_ids,
subscriptionIds: subscriptionGroup.subscriptionIds,
feedUrl: subscriptionGroup.url,
lastFetchedTimestamps: subscriptionGroup.last_fetched_timestamps.map(
lastFetchedTimestamps: subscriptionGroup.fetchedDates.map(
(timestamp) => timestamp?.getTime() || 0
), // unix timestamp in milliseconds
lastFetchedChecksum: rssFeedSubscription.lastFetchedChecksum || null,
scheduledTimestamps: subscriptionGroup.scheduled_timestamps.map(
(timestamp) => timestamp?.getTime() || 0
lastFetchedChecksums: subscriptionGroup.checksums,
scheduledTimestamps: subscriptionGroup.scheduledDates.map((timestamp) =>
timestamp.getTime()
), // unix timestamp in milliseconds
userIds: subscriptionGroup.user_ids,
userIds: subscriptionGroup.userIds,
}
// If there is no Google Cloud Project Id exposed, it means that we are in local environment

View File

@ -8,10 +8,12 @@ import { promisify } from 'util'
import { CONTENT_FETCH_URL, createCloudTask } from './task'
interface RssFeedRequest {
subscriptionId: string
subscriptionIds: string[]
feedUrl: string
lastFetchedAt: number // unix timestamp in milliseconds
lastFetchedChecksum: string | undefined
lastFetchedTimestamps: number[] // unix timestamp in milliseconds
scheduledTimestamps: number[] // unix timestamp in milliseconds
lastFetchedChecksums: string[]
userIds: string[]
}
// link can be a string or an object
@ -19,7 +21,12 @@ type RssFeedItemLink = string | { $: { rel?: string; href: string } }
function isRssFeedRequest(body: any): body is RssFeedRequest {
return (
'subscriptionId' in body && 'feedUrl' in body && 'lastFetchedAt' in body
'subscriptionIds' in body &&
'feedUrl' in body &&
'lastFetchedTimestamps' in body &&
'scheduledTimestamps' in body &&
'userIds' in body &&
'lastFetchedChecksums' in body
)
}
@ -53,7 +60,8 @@ const sendUpdateSubscriptionMutation = async (
userId: string,
subscriptionId: string,
lastFetchedAt: Date,
lastFetchedChecksum: string
lastFetchedChecksum: string,
scheduledAt: Date
) => {
const JWT_SECRET = process.env.JWT_SECRET
const REST_BACKEND_ENDPOINT = process.env.REST_BACKEND_ENDPOINT
@ -81,6 +89,7 @@ const sendUpdateSubscriptionMutation = async (
id: subscriptionId,
lastFetchedAt,
lastFetchedChecksum,
scheduledAt,
},
},
})
@ -155,10 +164,36 @@ const parser = new Parser({
'updated',
'created',
],
feed: ['dc:date', 'lastBuildDate', 'pubDate'],
feed: [
'dc:date',
'lastBuildDate',
'pubDate',
'syn:updatePeriod',
'syn:updateFrequency',
'sy:updatePeriod',
'sy:updateFrequency',
],
},
})
const getUpdatePeriodInHours = (updatePeriod: string) => {
switch (updatePeriod) {
case 'hourly':
return 1
case 'daily':
return 24
case 'weekly':
return 7 * 24
case 'monthly':
return 30 * 24
case 'yearly':
return 365 * 24
default:
// default to hourly
return 1
}
}
// get link following the order of preference: via, alternate, self
const getLink = (links: RssFeedItemLink[]) => {
// sort links by preference
@ -185,162 +220,182 @@ const getLink = (links: RssFeedItemLink[]) => {
return sortedLinks.find((link) => !!link)
}
export const rssHandler = Sentry.GCPFunction.wrapHttpFunction(
async (req, res) => {
if (!process.env.JWT_SECRET) {
console.error('Missing JWT_SECRET in environment')
return res.status(500).send('INTERNAL_SERVER_ERROR')
const processSubscription = async (
feedUrl: string,
userId: string,
subscriptionId: string,
fetchResult: { content: string; checksum: string },
lastFetchedAt: number,
scheduledAt: number,
lastFetchedChecksum: string
) => {
let lastItemFetchedAt: Date | null = null
let lastValidItem: Item | null = null
if (fetchResult.checksum === lastFetchedChecksum) {
console.log('feed has not been updated', feedUrl, lastFetchedChecksum)
return
}
const updatedLastFetchedChecksum = fetchResult.checksum
// fetch feed
let itemCount = 0
const feed = await parser.parseString(fetchResult.content)
console.log('Fetched feed', feed.title, new Date())
const feedPubDate = (feed['dc:date'] ||
feed.pubDate ||
feed.lastBuildDate) as string | undefined
console.log('Feed pub date', feedPubDate)
if (feedPubDate && new Date(feedPubDate) < new Date(lastFetchedAt)) {
console.log('Skipping old feed', feedPubDate)
return
}
// save each item in the feed
for (const item of feed.items) {
// use published or updated if isoDate is not available for atom feeds
item.isoDate =
item.isoDate ||
(item.published as string) ||
(item.updated as string) ||
(item.created as string)
console.log('Processing feed item', item.links, item.isoDate)
if (!item.links || item.links.length === 0) {
console.log('Invalid feed item', item)
continue
}
const token = req.header('Omnivore-Authorization')
if (!token) {
console.error('Missing authorization header')
return res.status(401).send('UNAUTHORIZED')
item.link = getLink(item.links as RssFeedItemLink[])
if (!item.link) {
console.log('Invalid feed item links', item.links)
continue
}
console.log('Fetching feed item', item.link)
const publishedAt = item.isoDate ? new Date(item.isoDate) : new Date()
// remember the last valid item
if (
!lastValidItem ||
(lastValidItem.isoDate && publishedAt > new Date(lastValidItem.isoDate))
) {
lastValidItem = item
}
// Max limit per-feed update
if (itemCount > 99) {
continue
}
// skip old items and items that were published before 24h
if (
publishedAt < new Date(lastFetchedAt) ||
publishedAt < new Date(Date.now() - 24 * 60 * 60 * 1000)
) {
console.log('Skipping old feed item', item.link)
continue
}
const created = await createSavingItemTask(userId, feedUrl, item)
if (!created) {
console.error('Failed to create task for feed item', item.link)
continue
}
// remember the last item fetched at
if (!lastItemFetchedAt || publishedAt > lastItemFetchedAt) {
lastItemFetchedAt = publishedAt
}
itemCount = itemCount + 1
}
// no items saved
if (!lastItemFetchedAt) {
// the feed has been fetched before, no new valid items found
if (lastFetchedAt || !lastValidItem) {
console.log('No new valid items found')
return
}
// the feed has never been fetched, save at least the last valid item
const created = await createSavingItemTask(userId, feedUrl, lastValidItem)
if (!created) {
console.error('Failed to create task for feed item', lastValidItem.link)
throw new Error('Failed to create task for feed item')
}
lastItemFetchedAt = lastValidItem.isoDate
? new Date(lastValidItem.isoDate)
: new Date()
}
const updateFrequency = (feed['syn:updateFrequency'] ||
feed['sy:updateFrequency'] ||
1) as number
const updatePeriod = (feed['syn:updatePeriod'] ||
feed['sy:updatePeriod'] ||
'hourly') as string
const nextScheduledAt =
scheduledAt +
getUpdatePeriodInHours(updatePeriod) * 60 * 60 * 1000 * updateFrequency
// update subscription lastFetchedAt
const updatedSubscription = await sendUpdateSubscriptionMutation(
userId,
subscriptionId,
lastItemFetchedAt,
updatedLastFetchedChecksum,
new Date(nextScheduledAt)
)
console.log('Updated subscription', updatedSubscription)
}
export const rssHandler = Sentry.GCPFunction.wrapHttpFunction(
async (req, res) => {
if (req.query.token !== process.env.PUBSUB_VERIFICATION_TOKEN) {
console.log('query does not include valid token')
return res.sendStatus(403)
}
try {
let userId: string
try {
const decoded = jwt.verify(token, process.env.JWT_SECRET) as {
uid: string
}
userId = decoded.uid
} catch (e) {
console.error('Authorization error', e)
return res.status(401).send('UNAUTHORIZED')
}
if (!isRssFeedRequest(req.body)) {
console.error('Invalid request body', req.body)
return res.status(400).send('INVALID_REQUEST_BODY')
}
const { feedUrl, subscriptionId, lastFetchedAt, lastFetchedChecksum } =
req.body
console.log('Processing feed', feedUrl, lastFetchedAt)
let lastItemFetchedAt: Date | null = null
let lastValidItem: Item | null = null
const {
feedUrl,
subscriptionIds,
lastFetchedTimestamps,
scheduledTimestamps,
userIds,
lastFetchedChecksums,
} = req.body
console.log('Processing feed', feedUrl)
const fetchResult = await fetchAndChecksum(feedUrl)
if (fetchResult.checksum === lastFetchedChecksum) {
console.log('feed has not been updated', feedUrl, lastFetchedChecksum)
return res.send('ok')
}
const updatedLastFetchedChecksum = fetchResult.checksum
// fetch feed
let itemCount = 0
const feed = await parser.parseString(fetchResult.content)
console.log('Fetched feed', feed.title, new Date())
for (let i = 0; i < subscriptionIds.length; i++) {
const subscriptionId = subscriptionIds[i]
const lastFetchedAt = lastFetchedTimestamps[i]
const scheduledAt = scheduledTimestamps[i]
const userId = userIds[i]
const lastFetchedChecksum = lastFetchedChecksums[i]
const feedPubDate = (feed['dc:date'] ||
feed.pubDate ||
feed.lastBuildDate) as string | undefined
console.log('Feed pub date', feedPubDate)
if (feedPubDate && new Date(feedPubDate) < new Date(lastFetchedAt)) {
console.log('Skipping old feed', feedPubDate)
return res.send('ok')
}
// save each item in the feed
for (const item of feed.items) {
// use published or updated if isoDate is not available for atom feeds
item.isoDate =
item.isoDate ||
(item.published as string) ||
(item.updated as string) ||
(item.created as string)
console.log('Processing feed item', item.links, item.isoDate)
if (!item.links || item.links.length === 0) {
console.log('Invalid feed item', item)
continue
}
item.link = getLink(item.links as RssFeedItemLink[])
if (!item.link) {
console.log('Invalid feed item links', item.links)
continue
}
console.log('Fetching feed item', item.link)
const publishedAt = item.isoDate ? new Date(item.isoDate) : new Date()
// remember the last valid item
if (
!lastValidItem ||
(lastValidItem.isoDate &&
publishedAt > new Date(lastValidItem.isoDate))
) {
lastValidItem = item
}
// Max limit per-feed update
if (itemCount > 99) {
continue
}
// skip old items and items that were published before 24h
if (
publishedAt < new Date(lastFetchedAt) ||
publishedAt < new Date(Date.now() - 24 * 60 * 60 * 1000)
) {
console.log('Skipping old feed item', item.link)
continue
}
const created = await createSavingItemTask(userId, feedUrl, item)
if (!created) {
console.error('Failed to create task for feed item', item.link)
continue
}
// remember the last item fetched at
if (!lastItemFetchedAt || publishedAt > lastItemFetchedAt) {
lastItemFetchedAt = publishedAt
}
itemCount = itemCount + 1
}
// no items saved
if (!lastItemFetchedAt) {
// the feed has been fetched before, no new valid items found
if (lastFetchedAt || !lastValidItem) {
console.log('No new valid items found')
return res.send('ok')
}
// the feed has never been fetched, save at least the last valid item
const created = await createSavingItemTask(
await processSubscription(
subscriptionId,
userId,
feedUrl,
lastValidItem
fetchResult,
lastFetchedAt,
scheduledAt,
lastFetchedChecksum
)
if (!created) {
console.error(
'Failed to create task for feed item',
lastValidItem.link
)
return res.status(500).send('INTERNAL_SERVER_ERROR')
}
lastItemFetchedAt = lastValidItem.isoDate
? new Date(lastValidItem.isoDate)
: new Date()
}
// update subscription lastFetchedAt
const updatedSubscription = await sendUpdateSubscriptionMutation(
userId,
subscriptionId,
lastItemFetchedAt,
updatedLastFetchedChecksum
)
console.log('Updated subscription', updatedSubscription)
res.send('ok')
} catch (e) {
console.error('Error while parsing RSS feed', e)