diff --git a/packages/api/src/generated/graphql.ts b/packages/api/src/generated/graphql.ts index 0ad412b4f..6d04536da 100644 --- a/packages/api/src/generated/graphql.ts +++ b/packages/api/src/generated/graphql.ts @@ -2980,6 +2980,7 @@ export type UpdateSubscriptionInput = { lastFetchedAt?: InputMaybe; lastFetchedChecksum?: InputMaybe; name?: InputMaybe; + scheduledAt?: InputMaybe; status?: InputMaybe; }; diff --git a/packages/api/src/generated/schema.graphql b/packages/api/src/generated/schema.graphql index af9064bee..f50844aee 100644 --- a/packages/api/src/generated/schema.graphql +++ b/packages/api/src/generated/schema.graphql @@ -2393,6 +2393,7 @@ input UpdateSubscriptionInput { lastFetchedAt: Date lastFetchedChecksum: String name: String + scheduledAt: Date status: SubscriptionStatus } diff --git a/packages/api/src/resolvers/subscriptions/index.ts b/packages/api/src/resolvers/subscriptions/index.ts index 7bf145452..dd0b8a7c7 100644 --- a/packages/api/src/resolvers/subscriptions/index.ts +++ b/packages/api/src/resolvers/subscriptions/index.ts @@ -239,11 +239,12 @@ export const subscribeResolver = authorized< // create a cloud task to fetch rss feed item for the new subscription await enqueueRssFeedFetch({ - user_ids: [uid], + userIds: [uid], url: input.url, - subscription_ids: [newSubscription.id], - scheduled_timestamps: [null], - last_fetched_timestamps: [null], + subscriptionIds: [newSubscription.id], + scheduledDates: [new Date()], // fetch immediately + fetchedDates: [null], + checksums: [null], }) return { @@ -300,6 +301,9 @@ export const updateSubscriptionResolver = authorized< : undefined, lastFetchedChecksum: input.lastFetchedChecksum || undefined, status: input.status || undefined, + scheduledAt: input.scheduledAt + ? new Date(input.scheduledAt) + : undefined, }) return repo.findOneByOrFail({ diff --git a/packages/api/src/routers/svc/rss_feed.ts b/packages/api/src/routers/svc/rss_feed.ts index 93077f6e4..053eb7d6f 100644 --- a/packages/api/src/routers/svc/rss_feed.ts +++ b/packages/api/src/routers/svc/rss_feed.ts @@ -30,10 +30,11 @@ export function rssFeedRouter() { ` SELECT url, - ARRAY_AGG(id) AS subscription_ids, - ARRAY_AGG(user_id) AS user_ids, - ARRAY_AGG(last_fetched_at) AS last_fetched_timestamps, - ARRAY_AGG(scheduled_at) AS scheduled_timestamps + ARRAY_AGG(id) AS "subscriptionIds", + ARRAY_AGG(user_id) AS "userIds", + ARRAY_AGG(last_fetched_at) AS "fetchedDates", + ARRAY_AGG(IFNULL(scheduled_at, NOW())) AS "scheduledDates", + ARRAY_AGG(last_fetched_checksum) AS "checksums" FROM omnivore.subscriptions WHERE diff --git a/packages/api/src/schema.ts b/packages/api/src/schema.ts index 2cc4e8b58..4d125b10a 100755 --- a/packages/api/src/schema.ts +++ b/packages/api/src/schema.ts @@ -2550,6 +2550,7 @@ const schema = gql` lastFetchedAt: Date lastFetchedChecksum: String status: SubscriptionStatus + scheduledAt: Date } union UpdateSubscriptionResult = diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index 7e09968ea..f6f2ac6dc 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -593,10 +593,11 @@ export const enqueueThumbnailTask = async ( export interface RssSubscriptionGroup { url: string - subscription_ids: string[] - user_ids: string[] - last_fetched_timestamps: (Date | null)[] - scheduled_timestamps: (Date | null)[] + subscriptionIds: string[] + userIds: string[] + fetchedDates: (Date | null)[] + scheduledDates: Date[] + checksums: (string | null)[] } export const enqueueRssFeedFetch = async ( @@ -604,16 +605,16 @@ export const enqueueRssFeedFetch = async ( ): Promise => { const { GOOGLE_CLOUD_PROJECT, PUBSUB_VERIFICATION_TOKEN } = process.env const payload = { - subscriptionIds: subscriptionGroup.subscription_ids, + subscriptionIds: subscriptionGroup.subscriptionIds, feedUrl: subscriptionGroup.url, - lastFetchedTimestamps: subscriptionGroup.last_fetched_timestamps.map( + lastFetchedTimestamps: subscriptionGroup.fetchedDates.map( (timestamp) => timestamp?.getTime() || 0 ), // unix timestamp in milliseconds - lastFetchedChecksum: rssFeedSubscription.lastFetchedChecksum || null, - scheduledTimestamps: subscriptionGroup.scheduled_timestamps.map( - (timestamp) => timestamp?.getTime() || 0 + lastFetchedChecksums: subscriptionGroup.checksums, + scheduledTimestamps: subscriptionGroup.scheduledDates.map((timestamp) => + timestamp.getTime() ), // unix timestamp in milliseconds - userIds: subscriptionGroup.user_ids, + userIds: subscriptionGroup.userIds, } // If there is no Google Cloud Project Id exposed, it means that we are in local environment diff --git a/packages/rss-handler/src/index.ts b/packages/rss-handler/src/index.ts index 8b3021659..2c874f3c1 100644 --- a/packages/rss-handler/src/index.ts +++ b/packages/rss-handler/src/index.ts @@ -8,10 +8,12 @@ import { promisify } from 'util' import { CONTENT_FETCH_URL, createCloudTask } from './task' interface RssFeedRequest { - subscriptionId: string + subscriptionIds: string[] feedUrl: string - lastFetchedAt: number // unix timestamp in milliseconds - lastFetchedChecksum: string | undefined + lastFetchedTimestamps: number[] // unix timestamp in milliseconds + scheduledTimestamps: number[] // unix timestamp in milliseconds + lastFetchedChecksums: string[] + userIds: string[] } // link can be a string or an object @@ -19,7 +21,12 @@ type RssFeedItemLink = string | { $: { rel?: string; href: string } } function isRssFeedRequest(body: any): body is RssFeedRequest { return ( - 'subscriptionId' in body && 'feedUrl' in body && 'lastFetchedAt' in body + 'subscriptionIds' in body && + 'feedUrl' in body && + 'lastFetchedTimestamps' in body && + 'scheduledTimestamps' in body && + 'userIds' in body && + 'lastFetchedChecksums' in body ) } @@ -53,7 +60,8 @@ const sendUpdateSubscriptionMutation = async ( userId: string, subscriptionId: string, lastFetchedAt: Date, - lastFetchedChecksum: string + lastFetchedChecksum: string, + scheduledAt: Date ) => { const JWT_SECRET = process.env.JWT_SECRET const REST_BACKEND_ENDPOINT = process.env.REST_BACKEND_ENDPOINT @@ -81,6 +89,7 @@ const sendUpdateSubscriptionMutation = async ( id: subscriptionId, lastFetchedAt, lastFetchedChecksum, + scheduledAt, }, }, }) @@ -155,10 +164,36 @@ const parser = new Parser({ 'updated', 'created', ], - feed: ['dc:date', 'lastBuildDate', 'pubDate'], + feed: [ + 'dc:date', + 'lastBuildDate', + 'pubDate', + 'syn:updatePeriod', + 'syn:updateFrequency', + 'sy:updatePeriod', + 'sy:updateFrequency', + ], }, }) +const getUpdatePeriodInHours = (updatePeriod: string) => { + switch (updatePeriod) { + case 'hourly': + return 1 + case 'daily': + return 24 + case 'weekly': + return 7 * 24 + case 'monthly': + return 30 * 24 + case 'yearly': + return 365 * 24 + default: + // default to hourly + return 1 + } +} + // get link following the order of preference: via, alternate, self const getLink = (links: RssFeedItemLink[]) => { // sort links by preference @@ -185,162 +220,182 @@ const getLink = (links: RssFeedItemLink[]) => { return sortedLinks.find((link) => !!link) } -export const rssHandler = Sentry.GCPFunction.wrapHttpFunction( - async (req, res) => { - if (!process.env.JWT_SECRET) { - console.error('Missing JWT_SECRET in environment') - return res.status(500).send('INTERNAL_SERVER_ERROR') +const processSubscription = async ( + feedUrl: string, + userId: string, + subscriptionId: string, + fetchResult: { content: string; checksum: string }, + lastFetchedAt: number, + scheduledAt: number, + lastFetchedChecksum: string +) => { + let lastItemFetchedAt: Date | null = null + let lastValidItem: Item | null = null + + if (fetchResult.checksum === lastFetchedChecksum) { + console.log('feed has not been updated', feedUrl, lastFetchedChecksum) + return + } + const updatedLastFetchedChecksum = fetchResult.checksum + + // fetch feed + let itemCount = 0 + const feed = await parser.parseString(fetchResult.content) + console.log('Fetched feed', feed.title, new Date()) + + const feedPubDate = (feed['dc:date'] || + feed.pubDate || + feed.lastBuildDate) as string | undefined + console.log('Feed pub date', feedPubDate) + if (feedPubDate && new Date(feedPubDate) < new Date(lastFetchedAt)) { + console.log('Skipping old feed', feedPubDate) + return + } + + // save each item in the feed + for (const item of feed.items) { + // use published or updated if isoDate is not available for atom feeds + item.isoDate = + item.isoDate || + (item.published as string) || + (item.updated as string) || + (item.created as string) + console.log('Processing feed item', item.links, item.isoDate) + + if (!item.links || item.links.length === 0) { + console.log('Invalid feed item', item) + continue } - const token = req.header('Omnivore-Authorization') - if (!token) { - console.error('Missing authorization header') - return res.status(401).send('UNAUTHORIZED') + item.link = getLink(item.links as RssFeedItemLink[]) + if (!item.link) { + console.log('Invalid feed item links', item.links) + continue + } + + console.log('Fetching feed item', item.link) + + const publishedAt = item.isoDate ? new Date(item.isoDate) : new Date() + // remember the last valid item + if ( + !lastValidItem || + (lastValidItem.isoDate && publishedAt > new Date(lastValidItem.isoDate)) + ) { + lastValidItem = item + } + + // Max limit per-feed update + if (itemCount > 99) { + continue + } + + // skip old items and items that were published before 24h + if ( + publishedAt < new Date(lastFetchedAt) || + publishedAt < new Date(Date.now() - 24 * 60 * 60 * 1000) + ) { + console.log('Skipping old feed item', item.link) + continue + } + + const created = await createSavingItemTask(userId, feedUrl, item) + if (!created) { + console.error('Failed to create task for feed item', item.link) + continue + } + + // remember the last item fetched at + if (!lastItemFetchedAt || publishedAt > lastItemFetchedAt) { + lastItemFetchedAt = publishedAt + } + + itemCount = itemCount + 1 + } + + // no items saved + if (!lastItemFetchedAt) { + // the feed has been fetched before, no new valid items found + if (lastFetchedAt || !lastValidItem) { + console.log('No new valid items found') + return + } + + // the feed has never been fetched, save at least the last valid item + const created = await createSavingItemTask(userId, feedUrl, lastValidItem) + if (!created) { + console.error('Failed to create task for feed item', lastValidItem.link) + throw new Error('Failed to create task for feed item') + } + + lastItemFetchedAt = lastValidItem.isoDate + ? new Date(lastValidItem.isoDate) + : new Date() + } + + const updateFrequency = (feed['syn:updateFrequency'] || + feed['sy:updateFrequency'] || + 1) as number + const updatePeriod = (feed['syn:updatePeriod'] || + feed['sy:updatePeriod'] || + 'hourly') as string + const nextScheduledAt = + scheduledAt + + getUpdatePeriodInHours(updatePeriod) * 60 * 60 * 1000 * updateFrequency + + // update subscription lastFetchedAt + const updatedSubscription = await sendUpdateSubscriptionMutation( + userId, + subscriptionId, + lastItemFetchedAt, + updatedLastFetchedChecksum, + new Date(nextScheduledAt) + ) + console.log('Updated subscription', updatedSubscription) +} + +export const rssHandler = Sentry.GCPFunction.wrapHttpFunction( + async (req, res) => { + if (req.query.token !== process.env.PUBSUB_VERIFICATION_TOKEN) { + console.log('query does not include valid token') + return res.sendStatus(403) } try { - let userId: string - - try { - const decoded = jwt.verify(token, process.env.JWT_SECRET) as { - uid: string - } - userId = decoded.uid - } catch (e) { - console.error('Authorization error', e) - return res.status(401).send('UNAUTHORIZED') - } - if (!isRssFeedRequest(req.body)) { console.error('Invalid request body', req.body) return res.status(400).send('INVALID_REQUEST_BODY') } - const { feedUrl, subscriptionId, lastFetchedAt, lastFetchedChecksum } = - req.body - console.log('Processing feed', feedUrl, lastFetchedAt) - - let lastItemFetchedAt: Date | null = null - let lastValidItem: Item | null = null + const { + feedUrl, + subscriptionIds, + lastFetchedTimestamps, + scheduledTimestamps, + userIds, + lastFetchedChecksums, + } = req.body + console.log('Processing feed', feedUrl) const fetchResult = await fetchAndChecksum(feedUrl) - if (fetchResult.checksum === lastFetchedChecksum) { - console.log('feed has not been updated', feedUrl, lastFetchedChecksum) - return res.send('ok') - } - const updatedLastFetchedChecksum = fetchResult.checksum - // fetch feed - let itemCount = 0 - const feed = await parser.parseString(fetchResult.content) - console.log('Fetched feed', feed.title, new Date()) + for (let i = 0; i < subscriptionIds.length; i++) { + const subscriptionId = subscriptionIds[i] + const lastFetchedAt = lastFetchedTimestamps[i] + const scheduledAt = scheduledTimestamps[i] + const userId = userIds[i] + const lastFetchedChecksum = lastFetchedChecksums[i] - const feedPubDate = (feed['dc:date'] || - feed.pubDate || - feed.lastBuildDate) as string | undefined - console.log('Feed pub date', feedPubDate) - if (feedPubDate && new Date(feedPubDate) < new Date(lastFetchedAt)) { - console.log('Skipping old feed', feedPubDate) - return res.send('ok') - } - - // save each item in the feed - for (const item of feed.items) { - // use published or updated if isoDate is not available for atom feeds - item.isoDate = - item.isoDate || - (item.published as string) || - (item.updated as string) || - (item.created as string) - console.log('Processing feed item', item.links, item.isoDate) - - if (!item.links || item.links.length === 0) { - console.log('Invalid feed item', item) - continue - } - - item.link = getLink(item.links as RssFeedItemLink[]) - if (!item.link) { - console.log('Invalid feed item links', item.links) - continue - } - - console.log('Fetching feed item', item.link) - - const publishedAt = item.isoDate ? new Date(item.isoDate) : new Date() - // remember the last valid item - if ( - !lastValidItem || - (lastValidItem.isoDate && - publishedAt > new Date(lastValidItem.isoDate)) - ) { - lastValidItem = item - } - - // Max limit per-feed update - if (itemCount > 99) { - continue - } - - // skip old items and items that were published before 24h - if ( - publishedAt < new Date(lastFetchedAt) || - publishedAt < new Date(Date.now() - 24 * 60 * 60 * 1000) - ) { - console.log('Skipping old feed item', item.link) - continue - } - - const created = await createSavingItemTask(userId, feedUrl, item) - if (!created) { - console.error('Failed to create task for feed item', item.link) - continue - } - - // remember the last item fetched at - if (!lastItemFetchedAt || publishedAt > lastItemFetchedAt) { - lastItemFetchedAt = publishedAt - } - - itemCount = itemCount + 1 - } - - // no items saved - if (!lastItemFetchedAt) { - // the feed has been fetched before, no new valid items found - if (lastFetchedAt || !lastValidItem) { - console.log('No new valid items found') - return res.send('ok') - } - - // the feed has never been fetched, save at least the last valid item - const created = await createSavingItemTask( + await processSubscription( + subscriptionId, userId, feedUrl, - lastValidItem + fetchResult, + lastFetchedAt, + scheduledAt, + lastFetchedChecksum ) - if (!created) { - console.error( - 'Failed to create task for feed item', - lastValidItem.link - ) - return res.status(500).send('INTERNAL_SERVER_ERROR') - } - - lastItemFetchedAt = lastValidItem.isoDate - ? new Date(lastValidItem.isoDate) - : new Date() } - // update subscription lastFetchedAt - const updatedSubscription = await sendUpdateSubscriptionMutation( - userId, - subscriptionId, - lastItemFetchedAt, - updatedLastFetchedChecksum - ) - console.log('Updated subscription', updatedSubscription) - res.send('ok') } catch (e) { console.error('Error while parsing RSS feed', e)