diff --git a/packages/api/src/routers/svc/rss_feed.ts b/packages/api/src/routers/svc/rss_feed.ts index e5a96af71..e19436bee 100644 --- a/packages/api/src/routers/svc/rss_feed.ts +++ b/packages/api/src/routers/svc/rss_feed.ts @@ -1,12 +1,10 @@ /* eslint-disable @typescript-eslint/no-misused-promises */ -import { stringify } from 'csv-stringify/.' import express from 'express' -import { DateTime } from 'luxon' import { readPushSubscription } from '../../datalayer/pubsub' import { Subscription } from '../../entity/subscription' import { getRepository } from '../../entity/utils' import { SubscriptionStatus, SubscriptionType } from '../../generated/graphql' -import { createGCSFile } from '../../utils/uploads' +import { enqueueRssFeedFetch } from '../../utils/createTask' export function rssFeedRouter() { const router = express.Router() @@ -22,7 +20,6 @@ export function rssFeedRouter() { return res.status(200).send('Expired') } - let writeStream: NodeJS.WritableStream | undefined try { // get all active rss feed subscriptions const subscriptions = await getRepository(Subscription).find({ @@ -34,33 +31,22 @@ export function rssFeedRouter() { relations: ['user'], }) - // write the list of subscriptions to a csv file and upload it to gcs - // path style: rss/.csv - const dateStr = DateTime.now().toISODate() - const fullPath = `rss/${dateStr}.csv` - // open a write_stream to the file - const file = createGCSFile(fullPath) - writeStream = file.createWriteStream({ - contentType: 'text/csv', - }) - // stringify the data and pipe it to the write_stream - const stringifier = stringify({ - header: false, - columns: ['subscriptionId', 'userId', 'feedUrl'], - }) - stringifier.pipe(writeStream) + // create a cloud taks to fetch rss feed item for each subscription + await Promise.all( + subscriptions.map((subscription) => { + try { + return enqueueRssFeedFetch(subscription) + } catch (error) { + console.log('error creating rss feed fetch task', error) + } + }) + ) - subscriptions.forEach((sub) => { - stringifier.write([sub.id, sub.user.id, sub.url]) - }) + res.send('OK') } catch (error) { console.log('error fetching rss feeds', error) - return res.status(500).send('Internal Server Error') - } finally { - writeStream?.end() + res.status(500).send('Internal Server Error') } - - res.send('OK') }) return router diff --git a/packages/rss-handler/package.json b/packages/rss-handler/package.json index 82b87db8c..e24080ae1 100644 --- a/packages/rss-handler/package.json +++ b/packages/rss-handler/package.json @@ -21,7 +21,6 @@ }, "dependencies": { "@google-cloud/functions-framework": "3.1.2", - "@google-cloud/tasks": "^3.0.5", "@sentry/serverless": "^6.16.1", "axios": "^1.4.0", "dotenv": "^16.0.1", diff --git a/packages/rss-handler/src/index.ts b/packages/rss-handler/src/index.ts index 933a9703b..c4e36fbb0 100644 --- a/packages/rss-handler/src/index.ts +++ b/packages/rss-handler/src/index.ts @@ -9,10 +9,16 @@ interface RssFeedRequest { subscriptionId: string userId: string feedUrl: string + lastFetchedAt: Date } function isRssFeedRequest(body: any): body is RssFeedRequest { - return 'subscriptionId' in body && 'userId' in body && 'feedUrl' in body + return ( + 'subscriptionId' in body && + 'userId' in body && + 'feedUrl' in body && + 'lastFetchedAt' in body + ) } const sendSavePageMutation = async (userId: string, input: unknown) => { @@ -148,19 +154,24 @@ export const rssHandler = Sentry.GCPFunction.wrapHttpFunction( return res.status(400).send('INVALID_REQUEST_BODY') } - const { userId, feedUrl, subscriptionId } = req.body + const { userId, feedUrl, subscriptionId, lastFetchedAt } = req.body // fetch feed const feed = await parser.parseURL(feedUrl) - const lastFetchedAt = new Date() - console.log('Fetched feed', feed.title, lastFetchedAt) + const newFetchedAt = new Date() + console.log('Fetched feed', feed.title, newFetchedAt) // save each item in the feed for (const item of feed.items) { - if (!item.link || !item.title || !item.content) { + if (!item.link || !item.title || !item.content || !item.isoDate) { console.log('Invalid feed item', item) continue } + if (new Date(item.isoDate) <= lastFetchedAt) { + console.log('Skipping old feed item', item.title) + continue + } + const input = { source: 'rss-feeder', url: item.link, @@ -184,7 +195,7 @@ export const rssHandler = Sentry.GCPFunction.wrapHttpFunction( const updatedSubscription = await sendUpdateSubscriptionMutation( userId, subscriptionId, - lastFetchedAt + newFetchedAt ) console.log('Updated subscription', updatedSubscription) diff --git a/packages/rss-handler/src/task.ts b/packages/rss-handler/src/task.ts deleted file mode 100644 index 94b3352cc..000000000 --- a/packages/rss-handler/src/task.ts +++ /dev/null @@ -1,56 +0,0 @@ -/* eslint-disable @typescript-eslint/restrict-template-expressions */ -import { CloudTasksClient, protos } from '@google-cloud/tasks' - -const cloudTask = new CloudTasksClient() - -export const emailUserUrl = () => { - const envar = process.env.INTERNAL_SVC_ENDPOINT - if (envar) { - return envar + 'api/user/email' - } - throw 'INTERNAL_SVC_ENDPOINT not set' -} - -export const CONTENT_FETCH_URL = process.env.CONTENT_FETCH_GCF_URL - -export const createCloudTask = async ( - taskHandlerUrl: string | undefined, - payload: unknown, - requestHeaders?: Record, - queue = 'omnivore-import-queue' -) => { - const location = process.env.GCP_LOCATION - const project = process.env.GCP_PROJECT_ID - - if (!project || !location || !queue || !taskHandlerUrl) { - throw `Environment not configured: ${project}, ${location}, ${queue}, ${taskHandlerUrl}` - } - - const serviceAccountEmail = `${project}@appspot.gserviceaccount.com` - - const parent = cloudTask.queuePath(project, location, queue) - const convertedPayload = JSON.stringify(payload) - const body = Buffer.from(convertedPayload).toString('base64') - const task: protos.google.cloud.tasks.v2.ITask = { - httpRequest: { - httpMethod: 'POST', - url: taskHandlerUrl, - headers: { - 'Content-Type': 'application/json', - ...requestHeaders, - }, - body, - ...(serviceAccountEmail - ? { - oidcToken: { - serviceAccountEmail, - }, - } - : null), - }, - } - - return cloudTask.createTask({ parent, task }).then((result) => { - return result[0].name ?? undefined - }) -}