mark subscription as error if item was failed
This commit is contained in:
@ -1,12 +1,14 @@
|
||||
import axios from 'axios'
|
||||
import crypto from 'crypto'
|
||||
import * as jwt from 'jsonwebtoken'
|
||||
import { parseHTML } from 'linkedom'
|
||||
import Parser, { Item } from 'rss-parser'
|
||||
import { promisify } from 'util'
|
||||
import { SubscriptionStatus } from '../../entity/subscription'
|
||||
import { env } from '../../env'
|
||||
import { redisDataSource } from '../../redis_data_source'
|
||||
import { updateSubscription } from '../../services/update_subscription'
|
||||
import {
|
||||
updateSubscription,
|
||||
updateSubscriptions,
|
||||
} from '../../services/update_subscription'
|
||||
import createHttpTaskWithToken from '../../utils/createTask'
|
||||
import { RSSRefreshContext } from './refreshAllFeeds'
|
||||
|
||||
@ -333,7 +335,6 @@ const createItemWithPreviewContent = async (
|
||||
}
|
||||
}
|
||||
|
||||
const signToken = promisify(jwt.sign)
|
||||
const parser = new Parser({
|
||||
customFields: {
|
||||
item: [
|
||||
@ -441,7 +442,8 @@ const processSubscription = async (
|
||||
const updatedLastFetchedChecksum = fetchResult.checksum
|
||||
|
||||
// fetch feed
|
||||
let itemCount = 0
|
||||
let itemCount = 0,
|
||||
errorCount = 0
|
||||
|
||||
const feedLastBuildDate = feed.lastBuildDate
|
||||
console.log('Feed last build date', feedLastBuildDate)
|
||||
@ -455,69 +457,71 @@ const processSubscription = async (
|
||||
|
||||
// save each item in the feed
|
||||
for (const item of feed.items) {
|
||||
// use published or updated if isoDate is not available for atom feeds
|
||||
const isoDate =
|
||||
item.isoDate || item.published || item.updated || item.created
|
||||
console.log('Processing feed item', item.links, item.isoDate, feedUrl)
|
||||
try {
|
||||
// use published or updated if isoDate is not available for atom feeds
|
||||
const isoDate =
|
||||
item.isoDate || item.published || item.updated || item.created
|
||||
console.log('Processing feed item', item.links, item.isoDate, feedUrl)
|
||||
|
||||
if (!item.links || item.links.length === 0) {
|
||||
console.log('Invalid feed item', item)
|
||||
continue
|
||||
if (!item.links || item.links.length === 0) {
|
||||
throw new Error('Invalid feed item')
|
||||
}
|
||||
|
||||
const link = getLink(item.links)
|
||||
if (!link) {
|
||||
throw new Error('Invalid feed item link')
|
||||
}
|
||||
|
||||
const feedItem = {
|
||||
...item,
|
||||
isoDate,
|
||||
link,
|
||||
}
|
||||
|
||||
const publishedAt = feedItem.isoDate
|
||||
? new Date(feedItem.isoDate)
|
||||
: new Date()
|
||||
// remember the last valid item
|
||||
if (
|
||||
!lastValidItem ||
|
||||
(lastValidItem.isoDate && publishedAt > new Date(lastValidItem.isoDate))
|
||||
) {
|
||||
lastValidItem = feedItem
|
||||
}
|
||||
|
||||
// Max limit per-feed update
|
||||
if (itemCount > 99) {
|
||||
continue
|
||||
}
|
||||
|
||||
// skip old items
|
||||
if (isOldItem(feedItem, mostRecentItemDate)) {
|
||||
console.log('Skipping old feed item', feedItem.link)
|
||||
continue
|
||||
}
|
||||
|
||||
const created = await createTask(
|
||||
fetchContentTasks,
|
||||
userId,
|
||||
feedUrl,
|
||||
feedItem,
|
||||
fetchContent,
|
||||
folder
|
||||
)
|
||||
if (!created) {
|
||||
throw new Error('Failed to create task for feed item')
|
||||
}
|
||||
|
||||
// remember the last item fetched at
|
||||
if (!lastItemFetchedAt || publishedAt > lastItemFetchedAt) {
|
||||
lastItemFetchedAt = publishedAt
|
||||
}
|
||||
|
||||
itemCount = itemCount + 1
|
||||
} catch (error) {
|
||||
console.error('Error while saving RSS feed item', error, item)
|
||||
errorCount = errorCount + 1
|
||||
}
|
||||
|
||||
const link = getLink(item.links)
|
||||
if (!link) {
|
||||
console.log('Invalid feed item links', item.links)
|
||||
continue
|
||||
}
|
||||
|
||||
const feedItem = {
|
||||
...item,
|
||||
isoDate,
|
||||
link,
|
||||
}
|
||||
|
||||
const publishedAt = feedItem.isoDate
|
||||
? new Date(feedItem.isoDate)
|
||||
: new Date()
|
||||
// remember the last valid item
|
||||
if (
|
||||
!lastValidItem ||
|
||||
(lastValidItem.isoDate && publishedAt > new Date(lastValidItem.isoDate))
|
||||
) {
|
||||
lastValidItem = feedItem
|
||||
}
|
||||
|
||||
// Max limit per-feed update
|
||||
if (itemCount > 99) {
|
||||
continue
|
||||
}
|
||||
|
||||
// skip old items
|
||||
if (isOldItem(feedItem, mostRecentItemDate)) {
|
||||
console.log('Skipping old feed item', feedItem.link)
|
||||
continue
|
||||
}
|
||||
|
||||
const created = await createTask(
|
||||
fetchContentTasks,
|
||||
userId,
|
||||
feedUrl,
|
||||
feedItem,
|
||||
fetchContent,
|
||||
folder
|
||||
)
|
||||
if (!created) {
|
||||
console.error('Failed to create task for feed item', feedItem.link)
|
||||
continue
|
||||
}
|
||||
|
||||
// remember the last item fetched at
|
||||
if (!lastItemFetchedAt || publishedAt > lastItemFetchedAt) {
|
||||
lastItemFetchedAt = publishedAt
|
||||
}
|
||||
|
||||
itemCount = itemCount + 1
|
||||
}
|
||||
|
||||
// no items saved
|
||||
@ -539,17 +543,18 @@ const processSubscription = async (
|
||||
)
|
||||
if (!created) {
|
||||
console.error('Failed to create task for feed item', lastValidItem.link)
|
||||
throw new Error('Failed to create task for feed item')
|
||||
errorCount = errorCount + 1
|
||||
}
|
||||
|
||||
lastItemFetchedAt = lastValidItem.isoDate
|
||||
? new Date(lastValidItem.isoDate)
|
||||
: new Date()
|
||||
: refreshedAt
|
||||
}
|
||||
|
||||
const updateFrequency = getUpdateFrequency(feed)
|
||||
const updatePeriodInMs = getUpdatePeriodInHours(feed) * 60 * 60 * 1000
|
||||
const nextScheduledAt = scheduledAt + updatePeriodInMs * updateFrequency
|
||||
const status = errorCount > 0 ? SubscriptionStatus.RefreshError : undefined
|
||||
|
||||
// update subscription mostRecentItemDate and refreshedAt
|
||||
const updatedSubscription = await updateSubscription(userId, subscriptionId, {
|
||||
@ -557,6 +562,7 @@ const processSubscription = async (
|
||||
lastFetchedChecksum: updatedLastFetchedChecksum,
|
||||
scheduledAt: new Date(nextScheduledAt),
|
||||
refreshedAt,
|
||||
status,
|
||||
})
|
||||
console.log('Updated subscription', updatedSubscription)
|
||||
}
|
||||
@ -570,38 +576,39 @@ export const refreshFeed = async (request: any) => {
|
||||
}
|
||||
|
||||
export const _refreshFeed = async (request: RefreshFeedRequest) => {
|
||||
try {
|
||||
const {
|
||||
feedUrl,
|
||||
subscriptionIds,
|
||||
mostRecentItemDates,
|
||||
scheduledTimestamps,
|
||||
userIds,
|
||||
lastFetchedChecksums,
|
||||
fetchContents,
|
||||
folders,
|
||||
refreshContext,
|
||||
} = request
|
||||
console.log('Processing feed', feedUrl, { refreshContext: refreshContext })
|
||||
const {
|
||||
feedUrl,
|
||||
subscriptionIds,
|
||||
mostRecentItemDates,
|
||||
scheduledTimestamps,
|
||||
userIds,
|
||||
lastFetchedChecksums,
|
||||
fetchContents,
|
||||
folders,
|
||||
refreshContext,
|
||||
} = request
|
||||
|
||||
console.log('Processing feed', feedUrl, { refreshContext: refreshContext })
|
||||
|
||||
try {
|
||||
const isBlocked = await isFeedBlocked(feedUrl)
|
||||
if (isBlocked) {
|
||||
console.log('feed is blocked: ', feedUrl)
|
||||
return
|
||||
throw new Error('feed is blocked')
|
||||
}
|
||||
|
||||
const fetchResult = await fetchAndChecksum(feedUrl)
|
||||
if (!fetchResult) {
|
||||
console.error('Failed to fetch RSS feed', feedUrl)
|
||||
await incrementFeedFailure(feedUrl)
|
||||
return
|
||||
throw new Error('Failed to fetch RSS feed')
|
||||
}
|
||||
|
||||
const feed = await parseFeed(feedUrl, fetchResult.content)
|
||||
if (!feed) {
|
||||
console.error('Failed to parse RSS feed', feedUrl)
|
||||
await incrementFeedFailure(feedUrl)
|
||||
return
|
||||
throw new Error('Failed to parse RSS feed')
|
||||
}
|
||||
|
||||
let allowFetchContent = true
|
||||
@ -615,19 +622,23 @@ export const _refreshFeed = async (request: RefreshFeedRequest) => {
|
||||
const fetchContentTasks = new Map<string, FetchContentTask>() // url -> FetchContentTask
|
||||
// process each subscription sequentially
|
||||
for (let i = 0; i < subscriptionIds.length; i++) {
|
||||
await processSubscription(
|
||||
fetchContentTasks,
|
||||
subscriptionIds[i],
|
||||
userIds[i],
|
||||
feedUrl,
|
||||
fetchResult,
|
||||
mostRecentItemDates[i],
|
||||
scheduledTimestamps[i],
|
||||
lastFetchedChecksums[i],
|
||||
fetchContents[i] && allowFetchContent,
|
||||
folders[i],
|
||||
feed
|
||||
)
|
||||
try {
|
||||
await processSubscription(
|
||||
fetchContentTasks,
|
||||
subscriptionIds[i],
|
||||
userIds[i],
|
||||
feedUrl,
|
||||
fetchResult,
|
||||
mostRecentItemDates[i],
|
||||
scheduledTimestamps[i],
|
||||
lastFetchedChecksums[i],
|
||||
fetchContents[i] && allowFetchContent,
|
||||
folders[i],
|
||||
feed
|
||||
)
|
||||
} catch (error) {
|
||||
console.error('Error while processing subscription', error)
|
||||
}
|
||||
}
|
||||
|
||||
// create fetch content tasks
|
||||
@ -638,7 +649,17 @@ export const _refreshFeed = async (request: RefreshFeedRequest) => {
|
||||
task.item
|
||||
)
|
||||
}
|
||||
|
||||
return true
|
||||
} catch (e) {
|
||||
console.error('Error while saving RSS feeds', e)
|
||||
|
||||
// mark subscriptions as error if we failed to get the feed
|
||||
await updateSubscriptions(subscriptionIds, {
|
||||
status: SubscriptionStatus.RefreshError,
|
||||
refreshedAt: new Date(),
|
||||
})
|
||||
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
@ -43,9 +43,7 @@ export const updateSubscription = async (
|
||||
refreshedAt: newData.refreshedAt || undefined,
|
||||
lastFetchedChecksum: newData.lastFetchedChecksum || undefined,
|
||||
status: newData.status || undefined,
|
||||
scheduledAt: newData.scheduledAt
|
||||
? new Date(newData.scheduledAt)
|
||||
: undefined,
|
||||
scheduledAt: newData.scheduledAt || undefined,
|
||||
autoAddToLibrary: newData.autoAddToLibrary ?? undefined,
|
||||
isPrivate: newData.isPrivate ?? undefined,
|
||||
fetchContent: newData.fetchContent ?? undefined,
|
||||
@ -57,3 +55,25 @@ export const updateSubscription = async (
|
||||
user: { id: userId },
|
||||
})
|
||||
}
|
||||
|
||||
export const updateSubscriptions = async (
|
||||
subscriptionIds: string[],
|
||||
newData: UpdateSubscriptionData
|
||||
) => {
|
||||
return getRepository(Subscription).save(
|
||||
subscriptionIds.map((id) => ({
|
||||
id,
|
||||
name: newData.name || undefined,
|
||||
description: newData.description || undefined,
|
||||
mostRecentItemDate: newData.mostRecentItemDate || undefined,
|
||||
refreshedAt: newData.refreshedAt || undefined,
|
||||
lastFetchedChecksum: newData.lastFetchedChecksum || undefined,
|
||||
status: newData.status || undefined,
|
||||
scheduledAt: newData.scheduledAt || undefined,
|
||||
autoAddToLibrary: newData.autoAddToLibrary ?? undefined,
|
||||
isPrivate: newData.isPrivate ?? undefined,
|
||||
fetchContent: newData.fetchContent ?? undefined,
|
||||
folder: newData.folder ?? undefined,
|
||||
}))
|
||||
)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user