diff --git a/packages/api/src/jobs/rss/refreshAllFeeds.ts b/packages/api/src/jobs/rss/refreshAllFeeds.ts new file mode 100644 index 000000000..6f1824c2d --- /dev/null +++ b/packages/api/src/jobs/rss/refreshAllFeeds.ts @@ -0,0 +1,88 @@ +import Redis from 'ioredis' +import { DataSource } from 'typeorm' +import { stringToHash } from '../../utils/helpers' +import { RssSubscriptionGroup } from '../../utils/createTask' +import { Queue } from 'bullmq' +import { QUEUE_NAME } from '../../queue-processor' + +export const refreshAllFeeds = async ( + db: DataSource, + redis: Redis +): Promise => { + const subscriptionGroups = (await db.createEntityManager().query( + ` + SELECT + url, + ARRAY_AGG(id) AS "subscriptionIds", + ARRAY_AGG(user_id) AS "userIds", + ARRAY_AGG(last_fetched_at) AS "fetchedDates", + ARRAY_AGG(coalesce(scheduled_at, NOW())) AS "scheduledDates", + ARRAY_AGG(last_fetched_checksum) AS checksums, + ARRAY_AGG(fetch_content) AS "fetchContents", + ARRAY_AGG(coalesce(folder, $3)) AS folders + FROM + omnivore.subscriptions + WHERE + type = $1 + AND status = $2 + AND (scheduled_at <= NOW() OR scheduled_at IS NULL) + GROUP BY + url + `, + ['RSS', 'ACTIVE', 'following'] + )) as RssSubscriptionGroup[] + + for (let group of subscriptionGroups) { + let jobid = `refresh-feed_${stringToHash(group.url)}_${stringToHash( + JSON.stringify(group.userIds.sort()) + )}` + const payload = { + subscriptionIds: group.subscriptionIds, + feedUrl: group.url, + lastFetchedTimestamps: group.fetchedDates.map( + (timestamp) => timestamp?.getTime() || 0 + ), // unix timestamp in milliseconds + lastFetchedChecksums: group.checksums, + scheduledTimestamps: group.scheduledDates.map((timestamp) => + timestamp.getTime() + ), // unix timestamp in milliseconds + userIds: group.userIds, + fetchContents: group.fetchContents, + folders: group.folders, + } + + await queueRSSRefreshFeedJob(redis, jobid, payload) + } + + return true +} + +const createBackendQueue = (redis: Redis): Queue | undefined => { + return new Queue(QUEUE_NAME, { + connection: redis, + }) +} + +export const queueRSSRefreshAllFeedsJob = async (redis: Redis) => { + const queue = createBackendQueue(redis) + if (!queue) { + return false + } + return queue.add('refresh-all-feeds', {}) +} + +export const queueRSSRefreshFeedJob = async ( + redis: Redis, + jobid: string, + payload: any +) => { + const queue = createBackendQueue(redis) + if (!queue) { + return false + } + return queue.add('refresh-feed', payload, { + jobId: jobid, + removeOnComplete: true, + removeOnFail: true, + }) +} diff --git a/packages/api/src/jobs/rss/refreshFeed.ts b/packages/api/src/jobs/rss/refreshFeed.ts new file mode 100644 index 000000000..95bfdae35 --- /dev/null +++ b/packages/api/src/jobs/rss/refreshFeed.ts @@ -0,0 +1,674 @@ +import axios from 'axios' +import crypto from 'crypto' +import * as dotenv from 'dotenv' // see https://github.com/motdotla/dotenv#how-do-i-use-dotenv-with-import +import * as jwt from 'jsonwebtoken' +import { parseHTML } from 'linkedom' +import { createClient } from 'redis' +import Parser, { Item } from 'rss-parser' +import { promisify } from 'util' +import { CONTENT_FETCH_URL, createCloudTask } from './task' + +type FolderType = 'following' | 'inbox' +// explicitly create the return type of RedisClient +type RedisClient = ReturnType + +interface RefreshFeedRequest { + subscriptionIds: string[] + feedUrl: string + lastFetchedTimestamps: number[] // unix timestamp in milliseconds + scheduledTimestamps: number[] // unix timestamp in milliseconds + lastFetchedChecksums: string[] + userIds: string[] + fetchContents: boolean[] + folders: FolderType[] +} + +export const isRefreshFeedRequest = (data: any): data is RefreshFeedRequest => { + console.log('body: ', data) + return ( + 'subscriptionIds' in data && + 'feedUrl' in data && + 'lastFetchedTimestamps' in data && + 'scheduledTimestamps' in data && + 'userIds' in data && + 'lastFetchedChecksums' in data && + 'fetchContents' in data && + 'folders' in data + ) +} + +// link can be a string or an object +type RssFeedItemLink = string | { $: { rel?: string; href: string } } +type RssFeed = Parser.Output<{ + published?: string + updated?: string + created?: string + link?: RssFeedItemLink + links?: RssFeedItemLink[] +}> & { + lastBuildDate?: string + 'syn:updatePeriod'?: string + 'syn:updateFrequency'?: string + 'sy:updatePeriod'?: string + 'sy:updateFrequency'?: string +} +type RssFeedItemMedia = { + $: { url: string; width?: string; height?: string; medium?: string } +} +export type RssFeedItem = Item & { + 'media:thumbnail'?: RssFeedItemMedia + 'media:content'?: RssFeedItemMedia[] + link: string +} + +export const isOldItem = (item: RssFeedItem, lastFetchedAt: number) => { + // existing items and items that were published before 24h + const publishedAt = item.isoDate ? new Date(item.isoDate) : new Date() + return ( + publishedAt <= new Date(lastFetchedAt) || + publishedAt < new Date(Date.now() - 24 * 60 * 60 * 1000) + ) +} + +const feedFetchFailedRedisKey = (feedUrl: string) => + `feed-fetch-failure:${feedUrl}` + +const isFeedBlocked = async (feedUrl: string, redisClient: RedisClient) => { + const key = feedFetchFailedRedisKey(feedUrl) + try { + const result = await redisClient.get(key) + // if the feed has failed to fetch more than certain times, block it + const maxFailures = parseInt(process.env.MAX_FEED_FETCH_FAILURES ?? '10') + if (result && parseInt(result) > maxFailures) { + console.log('feed is blocked: ', feedUrl) + return true + } + } catch (error) { + console.error('Failed to check feed block status', feedUrl, error) + } + + return false +} + +const incrementFeedFailure = async ( + feedUrl: string, + redisClient: RedisClient +) => { + const key = feedFetchFailedRedisKey(feedUrl) + try { + const result = await redisClient.incr(key) + // expire the key in 1 day + await redisClient.expire(key, 24 * 60 * 60) + + return result + } catch (error) { + console.error('Failed to block feed', feedUrl, error) + return null + } +} + +export const isContentFetchBlocked = (feedUrl: string) => { + if (feedUrl.startsWith('https://arxiv.org/')) { + return true + } + return false +} + +const getThumbnail = (item: RssFeedItem) => { + if (item['media:thumbnail']) { + return item['media:thumbnail'].$.url + } + + return item['media:content']?.find((media) => media.$.medium === 'image')?.$ + .url +} + +export const fetchAndChecksum = async (url: string) => { + try { + const response = await axios.get(url, { + responseType: 'arraybuffer', + timeout: 60_000, + maxRedirects: 10, + headers: { + 'User-Agent': + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36', + Accept: + 'application/rss+xml, application/rdf+xml;q=0.8, application/atom+xml;q=0.6, application/xml;q=0.4, text/xml, text/html;q=0.4', + }, + }) + + const hash = crypto.createHash('sha256') + hash.update(response.data as Buffer) + + const dataStr = (response.data as Buffer).toString() + + return { url, content: dataStr, checksum: hash.digest('hex') } + } catch (error) { + console.log(`Failed to fetch or hash content from ${url}.`, error) + return null + } +} + +const parseFeed = async (url: string, content: string) => { + try { + // check if url is a telegram channel + const telegramRegex = /https:\/\/t\.me\/([a-zA-Z0-9_]+)/ + const telegramMatch = url.match(telegramRegex) + if (telegramMatch) { + const dom = parseHTML(content).document + const title = dom.querySelector('meta[property="og:title"]') + // post has attribute data-post + const posts = dom.querySelectorAll('[data-post]') + const items = Array.from(posts) + .map((post) => { + const id = post.getAttribute('data-post') + if (!id) { + return null + } + + const url = `https://t.me/${telegramMatch[1]}/${id}` + // find the