diff --git a/packages/api/src/jobs/rss/refreshAllFeeds.ts b/packages/api/src/jobs/rss/refreshAllFeeds.ts index d45b1d98e..4deca190c 100644 --- a/packages/api/src/jobs/rss/refreshAllFeeds.ts +++ b/packages/api/src/jobs/rss/refreshAllFeeds.ts @@ -4,11 +4,9 @@ import { stringToHash } from '../../utils/helpers' import { RssSubscriptionGroup } from '../../utils/createTask' import { Job, Queue } from 'bullmq' import { QUEUE_NAME } from '../../queue-processor' +import { redisDataSource } from '../../redis_data_source' -export const refreshAllFeeds = async ( - db: DataSource, - redis: Redis -): Promise => { +export const refreshAllFeeds = async (db: DataSource): Promise => { const subscriptionGroups = (await db.createEntityManager().query( ` SELECT @@ -51,20 +49,23 @@ export const refreshAllFeeds = async ( folders: group.folders, } - await queueRSSRefreshFeedJob(redis, jobid, payload) + await queueRSSRefreshFeedJob(jobid, payload) } return true } -const createBackendQueue = (redis: Redis): Queue | undefined => { +const createBackendQueue = (): Queue | undefined => { + if (!redisDataSource.workerRedisClient) { + throw new Error('Can not create queues, redis is not initialized') + } return new Queue(QUEUE_NAME, { - connection: redis, + connection: redisDataSource.workerRedisClient, }) } -export const queueRSSRefreshAllFeedsJob = async (redis: Redis) => { - const queue = createBackendQueue(redis) +export const queueRSSRefreshAllFeedsJob = async () => { + const queue = createBackendQueue() if (!queue) { return false } @@ -72,11 +73,10 @@ export const queueRSSRefreshAllFeedsJob = async (redis: Redis) => { } export const queueRSSRefreshFeedJob = async ( - redis: Redis, jobid: string, payload: any ): Promise => { - const queue = createBackendQueue(redis) + const queue = createBackendQueue() if (!queue) { return undefined } diff --git a/packages/api/src/jobs/rss/refreshFeed.ts b/packages/api/src/jobs/rss/refreshFeed.ts index 8aeca24a2..198cf056b 100644 --- a/packages/api/src/jobs/rss/refreshFeed.ts +++ b/packages/api/src/jobs/rss/refreshFeed.ts @@ -7,7 +7,7 @@ import Parser, { Item } from 'rss-parser' import { promisify } from 'util' import createHttpTaskWithToken from '../../utils/createTask' import { env } from '../../env' -import Redis from 'ioredis' +import { redisDataSource } from '../../redis_data_source' type FolderType = 'following' | 'inbox' @@ -72,14 +72,15 @@ export const isOldItem = (item: RssFeedItem, lastFetchedAt: number) => { const feedFetchFailedRedisKey = (feedUrl: string) => `feed-fetch-failure:${feedUrl}` -const isFeedBlocked = async (feedUrl: string, redisClient: Redis) => { +const isFeedBlocked = async (feedUrl: string) => { const key = feedFetchFailedRedisKey(feedUrl) + const redisClient = redisDataSource.redisClient try { console.log('checking if feed is blocked:', { - redisClient, - status: redisClient.status, + redisClient: redisClient, + status: redisClient?.status, }) - const result = await redisClient.get(key) + const result = await redisClient?.get(key) // if the feed has failed to fetch more than certain times, block it const maxFailures = parseInt(process.env.MAX_FEED_FETCH_FAILURES ?? '10') if (result && parseInt(result) > maxFailures) { @@ -93,12 +94,13 @@ const isFeedBlocked = async (feedUrl: string, redisClient: Redis) => { return false } -const incrementFeedFailure = async (feedUrl: string, redisClient: Redis) => { +const incrementFeedFailure = async (feedUrl: string) => { + const redisClient = redisDataSource.redisClient const key = feedFetchFailedRedisKey(feedUrl) try { - const result = await redisClient.incr(key) + const result = await redisClient?.incr(key) // expire the key in 1 day - await redisClient.expire(key, 24 * 60 * 60) + await redisClient?.expire(key, 24 * 60 * 60) return result } catch (error) { @@ -260,14 +262,10 @@ const sendUpdateSubscriptionMutation = async ( } } -const isItemRecentlySaved = async ( - redisClient: Redis, - userId: string, - url: string -) => { +const isItemRecentlySaved = async (userId: string, url: string) => { const key = `recent-saved-item:${userId}:${url}` try { - const result = await redisClient.get(key) + const result = await redisDataSource.redisClient?.get(key) return !!result } catch (err) { console.error('error checking if item is old', err) @@ -281,15 +279,10 @@ const createTask = async ( feedUrl: string, item: RssFeedItem, fetchContent: boolean, - folder: FolderType, - redisClient: Redis + folder: FolderType ) => { console.log('creating task to fetch', feedUrl) - const isRecentlySaved = await isItemRecentlySaved( - redisClient, - userId, - item.link - ) + const isRecentlySaved = await isItemRecentlySaved(userId, item.link) if (isRecentlySaved) { console.log('Item recently saved', item.link) return true @@ -484,8 +477,7 @@ const processSubscription = async ( lastFetchedChecksum: string, fetchContent: boolean, folder: FolderType, - feed: RssFeed, - redisClient: Redis + feed: RssFeed ) => { let lastItemFetchedAt: Date | null = null let lastValidItem: RssFeedItem | null = null @@ -561,8 +553,7 @@ const processSubscription = async ( feedUrl, feedItem, fetchContent, - folder, - redisClient + folder ) if (!created) { console.error('Failed to create task for feed item', feedItem.link) @@ -591,8 +582,7 @@ const processSubscription = async ( feedUrl, lastValidItem, fetchContent, - folder, - redisClient + folder ) if (!created) { console.error('Failed to create task for feed item', lastValidItem.link) @@ -619,18 +609,15 @@ const processSubscription = async ( console.log('Updated subscription', updatedSubscription) } -export const refreshFeed = async (redisClient: Redis, request: any) => { +export const refreshFeed = async (request: any) => { if (isRefreshFeedRequest(request)) { - return _refreshFeed(redisClient, request) + return _refreshFeed(request) } console.log('not a feed to refresh') return false } -export const _refreshFeed = async ( - redisClient: Redis, - request: RefreshFeedRequest -) => { +export const _refreshFeed = async (request: RefreshFeedRequest) => { try { const { feedUrl, @@ -644,7 +631,7 @@ export const _refreshFeed = async ( } = request console.log('Processing feed', feedUrl) - const isBlocked = await isFeedBlocked(feedUrl, redisClient) + const isBlocked = await isFeedBlocked(feedUrl) if (isBlocked) { console.log('feed is blocked: ', feedUrl) return @@ -653,14 +640,14 @@ export const _refreshFeed = async ( const fetchResult = await fetchAndChecksum(feedUrl) if (!fetchResult) { console.error('Failed to fetch RSS feed', feedUrl) - await incrementFeedFailure(feedUrl, redisClient) + await incrementFeedFailure(feedUrl) return } const feed = await parseFeed(feedUrl, fetchResult.content) if (!feed) { console.error('Failed to parse RSS feed', feedUrl) - await incrementFeedFailure(feedUrl, redisClient) + await incrementFeedFailure(feedUrl) return } diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index 0593d7881..1b8974984 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -67,10 +67,10 @@ const main = async () => { async (job: Job) => { switch (job.name) { case 'refresh-all-feeds': { - return await refreshAllFeeds(appDataSource, workerRedisClient) + return await refreshAllFeeds(appDataSource) } case 'refresh-feed': { - return await refreshFeed(redisClient, job.data) + return await refreshFeed(job.data) } } return true