Group RSS subscriptions by feed URL and create cloud task for the groups

This commit is contained in:
Hongbo Wu
2023-10-19 13:41:09 +08:00
parent 5283a84908
commit f50d1c9cb9
5 changed files with 173 additions and 35 deletions

View File

@ -235,11 +235,19 @@ export const subscribeResolver = authorized<
}
}
const newSubscription = newSubscriptions[0]
// create a cloud task to fetch rss feed item for the new subscription
await enqueueRssFeedFetch(uid, newSubscriptions[0])
await enqueueRssFeedFetch({
user_ids: [uid],
url: input.url,
subscription_ids: [newSubscription.id],
scheduled_timestamps: [null],
last_fetched_timestamps: [null],
})
return {
subscriptions: newSubscriptions,
subscriptions: [newSubscription],
}
}

View File

@ -4,7 +4,10 @@ import { Subscription } from '../../entity/subscription'
import { SubscriptionStatus, SubscriptionType } from '../../generated/graphql'
import { readPushSubscription } from '../../pubsub'
import { getRepository } from '../../repository'
import { enqueueRssFeedFetch } from '../../utils/createTask'
import {
enqueueRssFeedFetch,
RssSubscriptionGroup,
} from '../../utils/createTask'
import { logger } from '../../utils/logger'
export function rssFeedRouter() {
@ -22,21 +25,34 @@ export function rssFeedRouter() {
return res.status(200).send('Expired')
}
// get all active rss feed subscriptions
const subscriptions = await getRepository(Subscription).find({
select: ['id', 'url', 'user', 'lastFetchedAt', 'lastFetchedChecksum'],
where: {
type: SubscriptionType.Rss,
status: SubscriptionStatus.Active,
},
relations: ['user'],
})
// get active rss feed subscriptions scheduled for fetch and group by feed url
const subscriptionGroups = (await getRepository(Subscription).query(
`
SELECT
url,
ARRAY_AGG(id) AS subscription_ids,
ARRAY_AGG(user_id) AS user_ids,
ARRAY_AGG(last_fetched_at) AS last_fetched_timestamps,
ARRAY_AGG(scheduled_at) AS scheduled_timestamps
FROM
omnivore.subscriptions
WHERE
type = $1
AND status = $2
AND (scheduled_at <= NOW() OR scheduled_at IS NULL)
GROUP BY
url
`,
[SubscriptionType.Rss, SubscriptionStatus.Active]
)) as RssSubscriptionGroup[]
logger.info('scheduledSubscriptions', subscriptionGroups)
// create a cloud taks to fetch rss feed item for each subscription
await Promise.all(
subscriptions.map((subscription) => {
subscriptionGroups.map((subscriptionGroup) => {
try {
return enqueueRssFeedFetch(subscription.user.id, subscription)
return enqueueRssFeedFetch(subscriptionGroup)
} catch (error) {
logger.info('error creating rss feed fetch task', error)
}

View File

@ -1,5 +1,5 @@
import axios from 'axios'
import { DeleteResult } from 'typeorm'
import { DeepPartial, DeleteResult } from 'typeorm'
import { appDataSource } from '../data_source'
import { NewsletterEmail } from '../entity/newsletter_email'
import { Subscription } from '../entity/subscription'
@ -215,3 +215,9 @@ export const deleteSubscription = async (
userId
)
}
export const createRssSubscriptions = async (
subscriptions: DeepPartial<Subscription>[]
) => {
return getRepository(Subscription).save(subscriptions)
}

View File

@ -7,7 +7,6 @@ import axios from 'axios'
import { nanoid } from 'nanoid'
import { DeepPartial } from 'typeorm'
import { Recommendation } from '../entity/recommendation'
import { Subscription } from '../entity/subscription'
import { env } from '../env'
import {
ArticleSavingRequestStatus,
@ -592,21 +591,29 @@ export const enqueueThumbnailTask = async (
return createdTasks[0].name
}
export const enqueueRssFeedFetch = async (
userId: string,
rssFeedSubscription: Subscription
): Promise<string> => {
const { GOOGLE_CLOUD_PROJECT } = process.env
const payload = {
subscriptionId: rssFeedSubscription.id,
feedUrl: rssFeedSubscription.url,
lastFetchedAt: rssFeedSubscription.lastFetchedAt?.getTime() || 0, // unix timestamp in milliseconds
lastFetchedChecksum: rssFeedSubscription.lastFetchedChecksum || null,
scheduledAt: new Date().getTime(), // unix timestamp in milliseconds
}
export interface RssSubscriptionGroup {
url: string
subscription_ids: string[]
user_ids: string[]
last_fetched_timestamps: (Date | null)[]
scheduled_timestamps: (Date | null)[]
}
const headers = {
[OmnivoreAuthorizationHeader]: generateVerificationToken({ id: userId }),
export const enqueueRssFeedFetch = async (
subscriptionGroup: RssSubscriptionGroup
): Promise<string> => {
const { GOOGLE_CLOUD_PROJECT, PUBSUB_VERIFICATION_TOKEN } = process.env
const payload = {
subscriptionIds: subscriptionGroup.subscription_ids,
feedUrl: subscriptionGroup.url,
lastFetchedTimestamps: subscriptionGroup.last_fetched_timestamps.map(
(timestamp) => timestamp?.getTime() || 0
), // unix timestamp in milliseconds
lastFetchedChecksum: rssFeedSubscription.lastFetchedChecksum || null,
scheduledTimestamps: subscriptionGroup.scheduled_timestamps.map(
(timestamp) => timestamp?.getTime() || 0
), // unix timestamp in milliseconds
userIds: subscriptionGroup.user_ids,
}
// If there is no Google Cloud Project Id exposed, it means that we are in local environment
@ -615,9 +622,10 @@ export const enqueueRssFeedFetch = async (
// Calling the handler function directly.
setTimeout(() => {
axios
.post(env.queue.rssFeedTaskHandlerUrl, payload, {
headers,
})
.post(
`${env.queue.rssFeedTaskHandlerUrl}?token=${PUBSUB_VERIFICATION_TOKEN}`,
payload
)
.catch((error) => {
logError(error)
})
@ -630,8 +638,7 @@ export const enqueueRssFeedFetch = async (
project: GOOGLE_CLOUD_PROJECT,
queue: 'omnivore-rss-queue',
payload,
taskHandlerUrl: env.queue.rssFeedTaskHandlerUrl,
requestHeaders: headers,
taskHandlerUrl: `${env.queue.rssFeedTaskHandlerUrl}?token=${PUBSUB_VERIFICATION_TOKEN}`,
})
if (!createdTasks || !createdTasks[0].name) {

View File

@ -0,0 +1,101 @@
import chai, { expect } from 'chai'
import 'mocha'
import sinon from 'sinon'
import sinonChai from 'sinon-chai'
import { User } from '../../src/entity/user'
import { SubscriptionType } from '../../src/generated/graphql'
import { createRssSubscriptions } from '../../src/services/subscriptions'
import { deleteUser } from '../../src/services/user'
import * as createTask from '../../src/utils/createTask'
import { createTestUser } from '../db'
import { request } from '../util'
chai.use(sinonChai)
describe('Rss feeds Router', () => {
const token = process.env.PUBSUB_VERIFICATION_TOKEN || ''
let user: User
let user1: User
let user2: User
before(async () => {
// create test user and login
user = await createTestUser('fakeUser')
await request
.post('/local/debug/fake-user-login')
.send({ fakeEmail: user.email })
user1 = await createTestUser('fakeUser1')
user2 = await createTestUser('fakeUser2')
// create test subscriptions
const name1 = 'NPR'
const url1 = 'https://www.npr.org/rss/rss.php?id=1001'
const name2 = 'BBC'
const url2 = 'http://feeds.bbci.co.uk/news/rss.xml'
await createRssSubscriptions([
{
name: name1,
user: { id: user1.id },
scheduledAt: new Date(),
url: url1,
type: SubscriptionType.Rss,
},
{
name: name1,
user: { id: user2.id },
scheduledAt: new Date(),
url: url1,
type: SubscriptionType.Rss,
},
{
name: name2,
user: { id: user1.id },
url: url2,
type: SubscriptionType.Rss,
},
{
name: name2,
user: { id: user2.id },
// 1 hour in the future
scheduledAt: new Date(Date.now() + 60 * 60 * 1000),
url: url2,
type: SubscriptionType.Rss,
},
])
})
after(async () => {
// clean up
await deleteUser(user.id)
await deleteUser(user1.id)
await deleteUser(user2.id)
})
it('fetches all scheduled RSS feeds', async () => {
const data = {
message: {
data: Buffer.from('').toString('base64'),
publishTime: new Date().toISOString(),
},
}
// fake enqueueRssFeedFetch function
const fake = sinon.replace(
createTask,
'enqueueRssFeedFetch',
sinon.fake.resolves('task name')
)
const res = await request
.post('/svc/pubsub/rss-feed/fetchAll?token=' + token)
.send(data)
.expect(200)
expect(res.text).to.eql('OK')
// check if enqueueRssFeedFetch is called
expect(fake).to.have.been.called
sinon.restore()
})
})