From 68667053c868e002a98af76c780dfcdba28d5d9b Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Fri, 7 Jul 2023 21:15:42 +0800 Subject: [PATCH] create an endpoint to fetch all rss feeds --- packages/api/src/routers/svc/rss_feed.ts | 52 ++++++++++++++++++++++++ packages/api/src/server.ts | 2 + packages/api/src/util.ts | 5 ++- packages/api/src/utils/createTask.ts | 28 +++++++++++++ 4 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 packages/api/src/routers/svc/rss_feed.ts diff --git a/packages/api/src/routers/svc/rss_feed.ts b/packages/api/src/routers/svc/rss_feed.ts new file mode 100644 index 000000000..a23f1acf0 --- /dev/null +++ b/packages/api/src/routers/svc/rss_feed.ts @@ -0,0 +1,52 @@ +/* eslint-disable @typescript-eslint/no-misused-promises */ +import express from 'express' +import { readPushSubscription } from '../../datalayer/pubsub' +import { Subscription } from '../../entity/subscription' +import { getRepository } from '../../entity/utils' +import { SubscriptionStatus, SubscriptionType } from '../../generated/graphql' +import { enqueueRssFeedFetch } from '../../utils/createTask' + +export function rssFeedRouter() { + const router = express.Router() + + router.post('/fetchAll', async (req, res) => { + console.log('fetch all rss feeds') + + const { message: msgStr, expired } = readPushSubscription(req) + console.log('read pubsub message', msgStr, 'has expired', expired) + + if (expired) { + console.log('discarding expired message') + return res.status(200).send('Expired') + } + + try { + // get all active rss feed subscriptions + const subscriptions = await getRepository(Subscription).find({ + where: { + type: SubscriptionType.Rss, + status: SubscriptionStatus.Active, + }, + relations: ['user'], + }) + + // create a cloud taks to fetch rss feed item for each subscription + await Promise.all( + subscriptions.map((subscription) => { + try { + return enqueueRssFeedFetch(subscription) + } catch (error) { + console.log('error creating rss feed fetch task', error) + } + }) + ) + + res.send('OK') + } catch (error) { + console.log('error fetching rss feeds', error) + res.status(500).send('Internal Server Error') + } + }) + + return router +} diff --git a/packages/api/src/server.ts b/packages/api/src/server.ts index 31789b6ec..bc87360ae 100755 --- a/packages/api/src/server.ts +++ b/packages/api/src/server.ts @@ -43,6 +43,7 @@ import { integrationsServiceRouter } from './routers/svc/integrations' import { linkServiceRouter } from './routers/svc/links' import { newsletterServiceRouter } from './routers/svc/newsletters' import { remindersServiceRouter } from './routers/svc/reminders' +import { rssFeedRouter } from './routers/svc/rss_feed' import { uploadServiceRouter } from './routers/svc/upload' import { webhooksServiceRouter } from './routers/svc/webhooks' import { textToSpeechRouter } from './routers/text_to_speech' @@ -159,6 +160,7 @@ export const createApp = (): { app.use('/svc/pubsub/integrations', integrationsServiceRouter()) app.use('/svc/reminders', remindersServiceRouter()) app.use('/svc/email-attachment', emailAttachmentRouter()) + app.use('/svc/rss-feed', rssFeedRouter()) if (env.dev.isLocal) { app.use('/local/debug', localDebugRouter()) diff --git a/packages/api/src/util.ts b/packages/api/src/util.ts index 2f2c9a5ef..039785d9e 100755 --- a/packages/api/src/util.ts +++ b/packages/api/src/util.ts @@ -1,8 +1,8 @@ /* eslint-disable @typescript-eslint/no-unsafe-return */ /* eslint-disable @typescript-eslint/naming-convention */ /* eslint-disable @typescript-eslint/no-explicit-any */ -import os from 'os' import * as dotenv from 'dotenv' +import os from 'os' interface BackendEnv { pg: { @@ -67,6 +67,7 @@ interface BackendEnv { textToSpeechTaskHandlerUrl: string recommendationTaskHandlerUrl: string thumbnailTaskHandlerUrl: string + rssFeedTaskHandlerUrl: string } fileUpload: { gcsUploadBucket: string @@ -161,6 +162,7 @@ const nullableEnvVars = [ 'RECOMMENDATION_TASK_HANDLER_URL', 'POCKET_CONSUMER_KEY', 'THUMBNAIL_TASK_HANDLER_URL', + 'RSS_FEED_TASK_HANDLER_URL', ] // Allow some vars to be null/empty /* If not in GAE and Prod/QA/Demo env (f.e. on localhost/dev env), allow following env vars to be null */ @@ -248,6 +250,7 @@ export function getEnv(): BackendEnv { textToSpeechTaskHandlerUrl: parse('TEXT_TO_SPEECH_TASK_HANDLER_URL'), recommendationTaskHandlerUrl: parse('RECOMMENDATION_TASK_HANDLER_URL'), thumbnailTaskHandlerUrl: parse('THUMBNAIL_TASK_HANDLER_URL'), + rssFeedTaskHandlerUrl: parse('RSS_FEED_TASK_HANDLER_URL'), } const imageProxy = { url: parse('IMAGE_PROXY_URL'), diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index 03752913e..f928cf1c0 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -6,6 +6,7 @@ import { google } from '@google-cloud/tasks/build/protos/protos' import axios from 'axios' import { nanoid } from 'nanoid' import { Recommendation } from '../elastic/types' +import { Subscription } from '../entity/subscription' import { env } from '../env' import { ArticleSavingRequestStatus, @@ -559,4 +560,31 @@ export const enqueueThumbnailTask = async ( return createdTasks[0].name } +export const enqueueRssFeedFetch = async ( + rssFeedSubscription: Subscription +): Promise => { + const { GOOGLE_CLOUD_PROJECT } = process.env + const payload = { + subscriptionId: rssFeedSubscription.id, + userId: rssFeedSubscription.user.id, + feedUrl: rssFeedSubscription.url, + } + + const createdTasks = await createHttpTaskWithToken({ + project: GOOGLE_CLOUD_PROJECT, + queue: 'omnivore-rss-feed-queue', + payload, + taskHandlerUrl: env.queue.rssFeedTaskHandlerUrl, + }) + + if (!createdTasks || !createdTasks[0].name) { + logger.error(`Unable to get the name of the task`, { + payload, + createdTasks, + }) + throw new CreateTaskError(`Unable to get the name of the task`) + } + return createdTasks[0].name +} + export default createHttpTaskWithToken