diff --git a/packages/api/src/jobs/rss/refreshFeed.ts b/packages/api/src/jobs/rss/refreshFeed.ts index 496d1e4c3..2c7f5d143 100644 --- a/packages/api/src/jobs/rss/refreshFeed.ts +++ b/packages/api/src/jobs/rss/refreshFeed.ts @@ -1,12 +1,14 @@ import axios from 'axios' import crypto from 'crypto' -import * as jwt from 'jsonwebtoken' import { parseHTML } from 'linkedom' import Parser, { Item } from 'rss-parser' -import { promisify } from 'util' +import { SubscriptionStatus } from '../../entity/subscription' import { env } from '../../env' import { redisDataSource } from '../../redis_data_source' -import { updateSubscription } from '../../services/update_subscription' +import { + updateSubscription, + updateSubscriptions, +} from '../../services/update_subscription' import createHttpTaskWithToken from '../../utils/createTask' import { RSSRefreshContext } from './refreshAllFeeds' @@ -333,7 +335,6 @@ const createItemWithPreviewContent = async ( } } -const signToken = promisify(jwt.sign) const parser = new Parser({ customFields: { item: [ @@ -441,7 +442,8 @@ const processSubscription = async ( const updatedLastFetchedChecksum = fetchResult.checksum // fetch feed - let itemCount = 0 + let itemCount = 0, + errorCount = 0 const feedLastBuildDate = feed.lastBuildDate console.log('Feed last build date', feedLastBuildDate) @@ -455,69 +457,71 @@ const processSubscription = async ( // save each item in the feed for (const item of feed.items) { - // use published or updated if isoDate is not available for atom feeds - const isoDate = - item.isoDate || item.published || item.updated || item.created - console.log('Processing feed item', item.links, item.isoDate, feedUrl) + try { + // use published or updated if isoDate is not available for atom feeds + const isoDate = + item.isoDate || item.published || item.updated || item.created + console.log('Processing feed item', item.links, item.isoDate, feedUrl) - if (!item.links || item.links.length === 0) { - console.log('Invalid feed item', item) - continue + if (!item.links || item.links.length === 0) { + throw new Error('Invalid feed item') + } + + const link = getLink(item.links) + if (!link) { + throw new Error('Invalid feed item link') + } + + const feedItem = { + ...item, + isoDate, + link, + } + + const publishedAt = feedItem.isoDate + ? new Date(feedItem.isoDate) + : new Date() + // remember the last valid item + if ( + !lastValidItem || + (lastValidItem.isoDate && publishedAt > new Date(lastValidItem.isoDate)) + ) { + lastValidItem = feedItem + } + + // Max limit per-feed update + if (itemCount > 99) { + continue + } + + // skip old items + if (isOldItem(feedItem, mostRecentItemDate)) { + console.log('Skipping old feed item', feedItem.link) + continue + } + + const created = await createTask( + fetchContentTasks, + userId, + feedUrl, + feedItem, + fetchContent, + folder + ) + if (!created) { + throw new Error('Failed to create task for feed item') + } + + // remember the last item fetched at + if (!lastItemFetchedAt || publishedAt > lastItemFetchedAt) { + lastItemFetchedAt = publishedAt + } + + itemCount = itemCount + 1 + } catch (error) { + console.error('Error while saving RSS feed item', error, item) + errorCount = errorCount + 1 } - - const link = getLink(item.links) - if (!link) { - console.log('Invalid feed item links', item.links) - continue - } - - const feedItem = { - ...item, - isoDate, - link, - } - - const publishedAt = feedItem.isoDate - ? new Date(feedItem.isoDate) - : new Date() - // remember the last valid item - if ( - !lastValidItem || - (lastValidItem.isoDate && publishedAt > new Date(lastValidItem.isoDate)) - ) { - lastValidItem = feedItem - } - - // Max limit per-feed update - if (itemCount > 99) { - continue - } - - // skip old items - if (isOldItem(feedItem, mostRecentItemDate)) { - console.log('Skipping old feed item', feedItem.link) - continue - } - - const created = await createTask( - fetchContentTasks, - userId, - feedUrl, - feedItem, - fetchContent, - folder - ) - if (!created) { - console.error('Failed to create task for feed item', feedItem.link) - continue - } - - // remember the last item fetched at - if (!lastItemFetchedAt || publishedAt > lastItemFetchedAt) { - lastItemFetchedAt = publishedAt - } - - itemCount = itemCount + 1 } // no items saved @@ -539,17 +543,18 @@ const processSubscription = async ( ) if (!created) { console.error('Failed to create task for feed item', lastValidItem.link) - throw new Error('Failed to create task for feed item') + errorCount = errorCount + 1 } lastItemFetchedAt = lastValidItem.isoDate ? new Date(lastValidItem.isoDate) - : new Date() + : refreshedAt } const updateFrequency = getUpdateFrequency(feed) const updatePeriodInMs = getUpdatePeriodInHours(feed) * 60 * 60 * 1000 const nextScheduledAt = scheduledAt + updatePeriodInMs * updateFrequency + const status = errorCount > 0 ? SubscriptionStatus.RefreshError : undefined // update subscription mostRecentItemDate and refreshedAt const updatedSubscription = await updateSubscription(userId, subscriptionId, { @@ -557,6 +562,7 @@ const processSubscription = async ( lastFetchedChecksum: updatedLastFetchedChecksum, scheduledAt: new Date(nextScheduledAt), refreshedAt, + status, }) console.log('Updated subscription', updatedSubscription) } @@ -570,38 +576,39 @@ export const refreshFeed = async (request: any) => { } export const _refreshFeed = async (request: RefreshFeedRequest) => { - try { - const { - feedUrl, - subscriptionIds, - mostRecentItemDates, - scheduledTimestamps, - userIds, - lastFetchedChecksums, - fetchContents, - folders, - refreshContext, - } = request - console.log('Processing feed', feedUrl, { refreshContext: refreshContext }) + const { + feedUrl, + subscriptionIds, + mostRecentItemDates, + scheduledTimestamps, + userIds, + lastFetchedChecksums, + fetchContents, + folders, + refreshContext, + } = request + console.log('Processing feed', feedUrl, { refreshContext: refreshContext }) + + try { const isBlocked = await isFeedBlocked(feedUrl) if (isBlocked) { console.log('feed is blocked: ', feedUrl) - return + throw new Error('feed is blocked') } const fetchResult = await fetchAndChecksum(feedUrl) if (!fetchResult) { console.error('Failed to fetch RSS feed', feedUrl) await incrementFeedFailure(feedUrl) - return + throw new Error('Failed to fetch RSS feed') } const feed = await parseFeed(feedUrl, fetchResult.content) if (!feed) { console.error('Failed to parse RSS feed', feedUrl) await incrementFeedFailure(feedUrl) - return + throw new Error('Failed to parse RSS feed') } let allowFetchContent = true @@ -615,19 +622,23 @@ export const _refreshFeed = async (request: RefreshFeedRequest) => { const fetchContentTasks = new Map() // url -> FetchContentTask // process each subscription sequentially for (let i = 0; i < subscriptionIds.length; i++) { - await processSubscription( - fetchContentTasks, - subscriptionIds[i], - userIds[i], - feedUrl, - fetchResult, - mostRecentItemDates[i], - scheduledTimestamps[i], - lastFetchedChecksums[i], - fetchContents[i] && allowFetchContent, - folders[i], - feed - ) + try { + await processSubscription( + fetchContentTasks, + subscriptionIds[i], + userIds[i], + feedUrl, + fetchResult, + mostRecentItemDates[i], + scheduledTimestamps[i], + lastFetchedChecksums[i], + fetchContents[i] && allowFetchContent, + folders[i], + feed + ) + } catch (error) { + console.error('Error while processing subscription', error) + } } // create fetch content tasks @@ -638,7 +649,17 @@ export const _refreshFeed = async (request: RefreshFeedRequest) => { task.item ) } + + return true } catch (e) { console.error('Error while saving RSS feeds', e) + + // mark subscriptions as error if we failed to get the feed + await updateSubscriptions(subscriptionIds, { + status: SubscriptionStatus.RefreshError, + refreshedAt: new Date(), + }) + + return false } } diff --git a/packages/api/src/services/update_subscription.ts b/packages/api/src/services/update_subscription.ts index c74b43f48..b6d1a51da 100644 --- a/packages/api/src/services/update_subscription.ts +++ b/packages/api/src/services/update_subscription.ts @@ -43,9 +43,7 @@ export const updateSubscription = async ( refreshedAt: newData.refreshedAt || undefined, lastFetchedChecksum: newData.lastFetchedChecksum || undefined, status: newData.status || undefined, - scheduledAt: newData.scheduledAt - ? new Date(newData.scheduledAt) - : undefined, + scheduledAt: newData.scheduledAt || undefined, autoAddToLibrary: newData.autoAddToLibrary ?? undefined, isPrivate: newData.isPrivate ?? undefined, fetchContent: newData.fetchContent ?? undefined, @@ -57,3 +55,25 @@ export const updateSubscription = async ( user: { id: userId }, }) } + +export const updateSubscriptions = async ( + subscriptionIds: string[], + newData: UpdateSubscriptionData +) => { + return getRepository(Subscription).save( + subscriptionIds.map((id) => ({ + id, + name: newData.name || undefined, + description: newData.description || undefined, + mostRecentItemDate: newData.mostRecentItemDate || undefined, + refreshedAt: newData.refreshedAt || undefined, + lastFetchedChecksum: newData.lastFetchedChecksum || undefined, + status: newData.status || undefined, + scheduledAt: newData.scheduledAt || undefined, + autoAddToLibrary: newData.autoAddToLibrary ?? undefined, + isPrivate: newData.isPrivate ?? undefined, + fetchContent: newData.fetchContent ?? undefined, + folder: newData.folder ?? undefined, + })) + ) +}