From f50d1c9cb92fb8ffecf2a3031d5595c6982fa6ef Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Thu, 19 Oct 2023 13:41:09 +0800 Subject: [PATCH] Group RSS subscriptions by feed URL and create cloud task for the groups --- .../api/src/resolvers/subscriptions/index.ts | 12 ++- packages/api/src/routers/svc/rss_feed.ts | 40 ++++--- packages/api/src/services/subscriptions.ts | 8 +- packages/api/src/utils/createTask.ts | 47 ++++---- packages/api/test/routers/rss_feed.test.ts | 101 ++++++++++++++++++ 5 files changed, 173 insertions(+), 35 deletions(-) create mode 100644 packages/api/test/routers/rss_feed.test.ts diff --git a/packages/api/src/resolvers/subscriptions/index.ts b/packages/api/src/resolvers/subscriptions/index.ts index 430347654..7bf145452 100644 --- a/packages/api/src/resolvers/subscriptions/index.ts +++ b/packages/api/src/resolvers/subscriptions/index.ts @@ -235,11 +235,19 @@ export const subscribeResolver = authorized< } } + const newSubscription = newSubscriptions[0] + // create a cloud task to fetch rss feed item for the new subscription - await enqueueRssFeedFetch(uid, newSubscriptions[0]) + await enqueueRssFeedFetch({ + user_ids: [uid], + url: input.url, + subscription_ids: [newSubscription.id], + scheduled_timestamps: [null], + last_fetched_timestamps: [null], + }) return { - subscriptions: newSubscriptions, + subscriptions: [newSubscription], } } diff --git a/packages/api/src/routers/svc/rss_feed.ts b/packages/api/src/routers/svc/rss_feed.ts index c06ec9f71..93077f6e4 100644 --- a/packages/api/src/routers/svc/rss_feed.ts +++ b/packages/api/src/routers/svc/rss_feed.ts @@ -4,7 +4,10 @@ import { Subscription } from '../../entity/subscription' import { SubscriptionStatus, SubscriptionType } from '../../generated/graphql' import { readPushSubscription } from '../../pubsub' import { getRepository } from '../../repository' -import { enqueueRssFeedFetch } from '../../utils/createTask' +import { + enqueueRssFeedFetch, + RssSubscriptionGroup, +} from '../../utils/createTask' import { logger } from '../../utils/logger' export function rssFeedRouter() { @@ -22,21 +25,34 @@ export function rssFeedRouter() { return res.status(200).send('Expired') } - // get all active rss feed subscriptions - const subscriptions = await getRepository(Subscription).find({ - select: ['id', 'url', 'user', 'lastFetchedAt', 'lastFetchedChecksum'], - where: { - type: SubscriptionType.Rss, - status: SubscriptionStatus.Active, - }, - relations: ['user'], - }) + // get active rss feed subscriptions scheduled for fetch and group by feed url + const subscriptionGroups = (await getRepository(Subscription).query( + ` + SELECT + url, + ARRAY_AGG(id) AS subscription_ids, + ARRAY_AGG(user_id) AS user_ids, + ARRAY_AGG(last_fetched_at) AS last_fetched_timestamps, + ARRAY_AGG(scheduled_at) AS scheduled_timestamps + FROM + omnivore.subscriptions + WHERE + type = $1 + AND status = $2 + AND (scheduled_at <= NOW() OR scheduled_at IS NULL) + GROUP BY + url + `, + [SubscriptionType.Rss, SubscriptionStatus.Active] + )) as RssSubscriptionGroup[] + + logger.info('scheduledSubscriptions', subscriptionGroups) // create a cloud taks to fetch rss feed item for each subscription await Promise.all( - subscriptions.map((subscription) => { + subscriptionGroups.map((subscriptionGroup) => { try { - return enqueueRssFeedFetch(subscription.user.id, subscription) + return enqueueRssFeedFetch(subscriptionGroup) } catch (error) { logger.info('error creating rss feed fetch task', error) } diff --git a/packages/api/src/services/subscriptions.ts b/packages/api/src/services/subscriptions.ts index 3a30469a8..9e6e78b37 100644 --- a/packages/api/src/services/subscriptions.ts +++ b/packages/api/src/services/subscriptions.ts @@ -1,5 +1,5 @@ import axios from 'axios' -import { DeleteResult } from 'typeorm' +import { DeepPartial, DeleteResult } from 'typeorm' import { appDataSource } from '../data_source' import { NewsletterEmail } from '../entity/newsletter_email' import { Subscription } from '../entity/subscription' @@ -215,3 +215,9 @@ export const deleteSubscription = async ( userId ) } + +export const createRssSubscriptions = async ( + subscriptions: DeepPartial[] +) => { + return getRepository(Subscription).save(subscriptions) +} diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index f7cc4c829..7e09968ea 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -7,7 +7,6 @@ import axios from 'axios' import { nanoid } from 'nanoid' import { DeepPartial } from 'typeorm' import { Recommendation } from '../entity/recommendation' -import { Subscription } from '../entity/subscription' import { env } from '../env' import { ArticleSavingRequestStatus, @@ -592,21 +591,29 @@ export const enqueueThumbnailTask = async ( return createdTasks[0].name } -export const enqueueRssFeedFetch = async ( - userId: string, - rssFeedSubscription: Subscription -): Promise => { - const { GOOGLE_CLOUD_PROJECT } = process.env - const payload = { - subscriptionId: rssFeedSubscription.id, - feedUrl: rssFeedSubscription.url, - lastFetchedAt: rssFeedSubscription.lastFetchedAt?.getTime() || 0, // unix timestamp in milliseconds - lastFetchedChecksum: rssFeedSubscription.lastFetchedChecksum || null, - scheduledAt: new Date().getTime(), // unix timestamp in milliseconds - } +export interface RssSubscriptionGroup { + url: string + subscription_ids: string[] + user_ids: string[] + last_fetched_timestamps: (Date | null)[] + scheduled_timestamps: (Date | null)[] +} - const headers = { - [OmnivoreAuthorizationHeader]: generateVerificationToken({ id: userId }), +export const enqueueRssFeedFetch = async ( + subscriptionGroup: RssSubscriptionGroup +): Promise => { + const { GOOGLE_CLOUD_PROJECT, PUBSUB_VERIFICATION_TOKEN } = process.env + const payload = { + subscriptionIds: subscriptionGroup.subscription_ids, + feedUrl: subscriptionGroup.url, + lastFetchedTimestamps: subscriptionGroup.last_fetched_timestamps.map( + (timestamp) => timestamp?.getTime() || 0 + ), // unix timestamp in milliseconds + lastFetchedChecksum: rssFeedSubscription.lastFetchedChecksum || null, + scheduledTimestamps: subscriptionGroup.scheduled_timestamps.map( + (timestamp) => timestamp?.getTime() || 0 + ), // unix timestamp in milliseconds + userIds: subscriptionGroup.user_ids, } // If there is no Google Cloud Project Id exposed, it means that we are in local environment @@ -615,9 +622,10 @@ export const enqueueRssFeedFetch = async ( // Calling the handler function directly. setTimeout(() => { axios - .post(env.queue.rssFeedTaskHandlerUrl, payload, { - headers, - }) + .post( + `${env.queue.rssFeedTaskHandlerUrl}?token=${PUBSUB_VERIFICATION_TOKEN}`, + payload + ) .catch((error) => { logError(error) }) @@ -630,8 +638,7 @@ export const enqueueRssFeedFetch = async ( project: GOOGLE_CLOUD_PROJECT, queue: 'omnivore-rss-queue', payload, - taskHandlerUrl: env.queue.rssFeedTaskHandlerUrl, - requestHeaders: headers, + taskHandlerUrl: `${env.queue.rssFeedTaskHandlerUrl}?token=${PUBSUB_VERIFICATION_TOKEN}`, }) if (!createdTasks || !createdTasks[0].name) { diff --git a/packages/api/test/routers/rss_feed.test.ts b/packages/api/test/routers/rss_feed.test.ts new file mode 100644 index 000000000..a8e35a345 --- /dev/null +++ b/packages/api/test/routers/rss_feed.test.ts @@ -0,0 +1,101 @@ +import chai, { expect } from 'chai' +import 'mocha' +import sinon from 'sinon' +import sinonChai from 'sinon-chai' +import { User } from '../../src/entity/user' +import { SubscriptionType } from '../../src/generated/graphql' +import { createRssSubscriptions } from '../../src/services/subscriptions' +import { deleteUser } from '../../src/services/user' +import * as createTask from '../../src/utils/createTask' +import { createTestUser } from '../db' +import { request } from '../util' + +chai.use(sinonChai) + +describe('Rss feeds Router', () => { + const token = process.env.PUBSUB_VERIFICATION_TOKEN || '' + + let user: User + let user1: User + let user2: User + + before(async () => { + // create test user and login + user = await createTestUser('fakeUser') + await request + .post('/local/debug/fake-user-login') + .send({ fakeEmail: user.email }) + + user1 = await createTestUser('fakeUser1') + user2 = await createTestUser('fakeUser2') + // create test subscriptions + const name1 = 'NPR' + const url1 = 'https://www.npr.org/rss/rss.php?id=1001' + const name2 = 'BBC' + const url2 = 'http://feeds.bbci.co.uk/news/rss.xml' + await createRssSubscriptions([ + { + name: name1, + user: { id: user1.id }, + scheduledAt: new Date(), + url: url1, + type: SubscriptionType.Rss, + }, + { + name: name1, + user: { id: user2.id }, + scheduledAt: new Date(), + url: url1, + type: SubscriptionType.Rss, + }, + { + name: name2, + user: { id: user1.id }, + url: url2, + type: SubscriptionType.Rss, + }, + { + name: name2, + user: { id: user2.id }, + // 1 hour in the future + scheduledAt: new Date(Date.now() + 60 * 60 * 1000), + url: url2, + type: SubscriptionType.Rss, + }, + ]) + }) + + after(async () => { + // clean up + await deleteUser(user.id) + await deleteUser(user1.id) + await deleteUser(user2.id) + }) + + it('fetches all scheduled RSS feeds', async () => { + const data = { + message: { + data: Buffer.from('').toString('base64'), + publishTime: new Date().toISOString(), + }, + } + + // fake enqueueRssFeedFetch function + const fake = sinon.replace( + createTask, + 'enqueueRssFeedFetch', + sinon.fake.resolves('task name') + ) + + const res = await request + .post('/svc/pubsub/rss-feed/fetchAll?token=' + token) + .send(data) + .expect(200) + expect(res.text).to.eql('OK') + + // check if enqueueRssFeedFetch is called + expect(fake).to.have.been.called + + sinon.restore() + }) +})