diff --git a/packages/api/src/jobs/rss/refreshFeed.ts b/packages/api/src/jobs/rss/refreshFeed.ts index c8cafa8bb..47df0084d 100644 --- a/packages/api/src/jobs/rss/refreshFeed.ts +++ b/packages/api/src/jobs/rss/refreshFeed.ts @@ -1,13 +1,12 @@ import axios from 'axios' import crypto from 'crypto' -import * as dotenv from 'dotenv' // see https://github.com/motdotla/dotenv#how-do-i-use-dotenv-with-import import * as jwt from 'jsonwebtoken' import { parseHTML } from 'linkedom' import Parser, { Item } from 'rss-parser' import { promisify } from 'util' -import createHttpTaskWithToken from '../../utils/createTask' import { env } from '../../env' import { redisDataSource } from '../../redis_data_source' +import createHttpTaskWithToken from '../../utils/createTask' type FolderType = 'following' | 'inbox' @@ -59,6 +58,18 @@ export type RssFeedItem = Item & { link: string } +interface User { + id: string + folder: FolderType +} + +interface FetchContentTask { + users: Map // userId -> User + item: RssFeedItem +} + +const fetchContentTasks = new Map() // url -> FetchContentTask + export const isOldItem = (item: RssFeedItem, lastFetchedAt: number) => { // existing items and items that were published before 24h const publishedAt = item.isoDate ? new Date(item.isoDate) : new Date() @@ -274,6 +285,25 @@ const isItemRecentlySaved = async (userId: string, url: string) => { return false } +const addFetchContentTask = ( + userId: string, + folder: FolderType, + item: RssFeedItem +) => { + const url = item.link + const task = fetchContentTasks.get(url) + if (!task) { + fetchContentTasks.set(url, { + users: new Map([[userId, { id: userId, folder }]]), + item, + }) + } else { + task.users.set(userId, { id: userId, folder }) + } + + return true +} + const createTask = async ( userId: string, feedUrl: string, @@ -291,17 +321,16 @@ const createTask = async ( return createItemWithPreviewContent(userId, feedUrl, item) } - return fetchContentAndCreateItem(userId, feedUrl, item, folder) + return addFetchContentTask(userId, folder, item) } const fetchContentAndCreateItem = async ( - userId: string, + users: User[], feedUrl: string, - item: RssFeedItem, - folder: string + item: RssFeedItem ) => { const payload = { - userId, + users, source: 'rss-feeder', url: item.link.trim(), saveRequestId: '', @@ -309,7 +338,6 @@ const fetchContentAndCreateItem = async ( rssFeedUrl: feedUrl, savedAt: item.isoDate, publishedAt: item.isoDate, - folder, } try { @@ -367,12 +395,6 @@ const createItemWithPreviewContent = async ( } } -dotenv.config() -// Sentry.GCPFunction.init({ -// dsn: process.env.SENTRY_DSN, -// tracesSampleRate: 0, -// }) - const signToken = promisify(jwt.sign) const parser = new Parser({ customFields: { @@ -663,6 +685,15 @@ export const _refreshFeed = async (request: RefreshFeedRequest) => { feed ) } + + // create fetch content tasks + for (const task of fetchContentTasks.values()) { + await fetchContentAndCreateItem( + Array.from(task.users.values()), + feedUrl, + task.item + ) + } } catch (e) { console.error('Error while saving RSS feeds', e) } diff --git a/packages/api/src/jobs/save_page.ts b/packages/api/src/jobs/save_page.ts index e3a238636..6f7ef83ca 100644 --- a/packages/api/src/jobs/save_page.ts +++ b/packages/api/src/jobs/save_page.ts @@ -2,15 +2,16 @@ import { Readability } from '@omnivore/readability' import axios from 'axios' import jwt from 'jsonwebtoken' import { promisify } from 'util' +import { env } from '../env' const signToken = promisify(jwt.sign) const IMPORTER_METRICS_COLLECTOR_URL = process.env.IMPORTER_METRICS_COLLECTOR_URL -const JWT_SECRET = process.env.JWT_SECRET -const REST_BACKEND_ENDPOINT = process.env.REST_BACKEND_ENDPOINT +const JWT_SECRET = env.server.jwtSecret +const REST_BACKEND_ENDPOINT = process.env.INTERNAL_API_URL -if (!IMPORTER_METRICS_COLLECTOR_URL || !JWT_SECRET || !REST_BACKEND_ENDPOINT) { +if (!IMPORTER_METRICS_COLLECTOR_URL || !REST_BACKEND_ENDPOINT) { throw new Error('Missing environment variables') } diff --git a/packages/content-fetch/src/job.ts b/packages/content-fetch/src/job.ts index 7e7681a73..6fd67ec60 100644 --- a/packages/content-fetch/src/job.ts +++ b/packages/content-fetch/src/job.ts @@ -1,4 +1,4 @@ -import { Queue } from 'bullmq' +import { BulkJobOptions, Queue } from 'bullmq' import { redis } from './redis' const QUEUE_NAME = 'omnivore-backend-queue' @@ -29,6 +29,25 @@ const getPriority = (job: savePageJob): number => { return 5 } +const getAttempts = (job: savePageJob): number => { + if (job.isRss || job.isImport) { + // we don't want to retry rss or import jobs + return 1 + } + + return 2 +} + +const getOpts = (job: savePageJob): BulkJobOptions => { + return { + // jobId: `${job.userId}-${job.url}`, + // removeOnComplete: true, + // removeOnFail: true, + attempts: getAttempts(job), + priority: getPriority(job), + } +} + const createQueue = (): Queue | undefined => { return new Queue(QUEUE_NAME, { connection: redis, @@ -44,12 +63,7 @@ export const queueSavePageJob = async (savePageJobs: savePageJob[]) => { const jobs = savePageJobs.map((job) => ({ name: JOB_NAME, data: job.data, - opts: { - jobId: `${job.userId}-${job.url}`, - removeOnComplete: true, - removeOnFail: true, - priority: getPriority(job), - }, + opts: getOpts(job), })) return queue.addBulk(jobs) diff --git a/packages/content-fetch/src/request_handler.ts b/packages/content-fetch/src/request_handler.ts index 9319722f5..66acc1a37 100644 --- a/packages/content-fetch/src/request_handler.ts +++ b/packages/content-fetch/src/request_handler.ts @@ -108,6 +108,7 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => { url: finalUrl, userId: user.id, data: { + userId: user.id, url: finalUrl, title, content,