diff --git a/packages/puppeteer-parse/index.js b/packages/puppeteer-parse/index.js index 5ef409f8d..355d3f907 100644 --- a/packages/puppeteer-parse/index.js +++ b/packages/puppeteer-parse/index.js @@ -335,6 +335,7 @@ async function fetchContent(req, res) { const urlStr = (req.query ? req.query.url : undefined) || (req.body ? req.body.url : undefined); const locale = (req.query ? req.query.locale : undefined) || (req.body ? req.body.locale : undefined); const timezone = (req.query ? req.query.timezone : undefined) || (req.body ? req.body.timezone : undefined); + const rssFeedUrl = (req.query ? req.query.rssFeedUrl : undefined) || (req.body ? req.body.rssFeedUrl : undefined); let logRecord = { url: urlStr, @@ -348,6 +349,7 @@ async function fetchContent(req, res) { taskId: taskId, locale, timezone, + rssFeedUrl, }; console.info(`Article parsing request`, logRecord); @@ -453,6 +455,7 @@ async function fetchContent(req, res) { parseResult: readabilityResult, state, labels, + rssFeedUrl, }); if (!apiResponse) { logRecord.error = 'error while saving page'; diff --git a/packages/rss-handler/package.json b/packages/rss-handler/package.json index e24080ae1..82b87db8c 100644 --- a/packages/rss-handler/package.json +++ b/packages/rss-handler/package.json @@ -21,6 +21,7 @@ }, "dependencies": { "@google-cloud/functions-framework": "3.1.2", + "@google-cloud/tasks": "^3.0.5", "@sentry/serverless": "^6.16.1", "axios": "^1.4.0", "dotenv": "^16.0.1", diff --git a/packages/rss-handler/src/index.ts b/packages/rss-handler/src/index.ts index d51bf7215..4e1ffdd2b 100644 --- a/packages/rss-handler/src/index.ts +++ b/packages/rss-handler/src/index.ts @@ -4,6 +4,7 @@ import * as dotenv from 'dotenv' // see https://github.com/motdotla/dotenv#how-d import * as jwt from 'jsonwebtoken' import Parser from 'rss-parser' import { promisify } from 'util' +import { CONTENT_FETCH_URL, createCloudTask } from './task' interface RssFeedRequest { subscriptionId: string @@ -21,57 +22,6 @@ function isRssFeedRequest(body: any): body is RssFeedRequest { ) } -const sendSavePageMutation = async (userId: string, input: unknown) => { - const JWT_SECRET = process.env.JWT_SECRET - const REST_BACKEND_ENDPOINT = process.env.REST_BACKEND_ENDPOINT - - if (!JWT_SECRET || !REST_BACKEND_ENDPOINT) { - throw 'Environment not configured correctly' - } - - const data = JSON.stringify({ - query: `mutation SavePage ($input: SavePageInput!){ - savePage(input:$input){ - ... on SaveSuccess{ - url - clientRequestId - } - ... on SaveError{ - errorCodes - } - } - }`, - variables: { - input: Object.assign({}, input, { source: 'puppeteer-parse' }), - }, - }) - - const auth = (await signToken({ uid: userId }, JWT_SECRET)) as string - try { - const response = await axios.post( - `${REST_BACKEND_ENDPOINT}/graphql`, - data, - { - headers: { - Cookie: `auth=${auth};`, - 'Content-Type': 'application/json', - }, - timeout: 30000, // 30s - } - ) - - /* eslint-disable @typescript-eslint/no-unsafe-member-access */ - return !!response.data.data.savePage - } catch (error) { - if (axios.isAxiosError(error)) { - console.error('save page mutation error', error.message) - } else { - console.error(error) - } - return false - } -} - const sendUpdateSubscriptionMutation = async ( userId: string, subscriptionId: string, @@ -155,40 +105,44 @@ export const rssHandler = Sentry.GCPFunction.wrapHttpFunction( } const { userId, feedUrl, subscriptionId, lastFetchedAt } = req.body + console.log('Processing feed', feedUrl, lastFetchedAt) + // fetch feed const feed = await parser.parseURL(feedUrl) const newFetchedAt = new Date() console.log('Fetched feed', feed.title, newFetchedAt) // save each item in the feed - for (const item of feed.items) { - if (!item.link || !item.title || !item.content || !item.isoDate) { + for await (const item of feed.items) { + const publishedAt = item.pubDate || item.isoDate + console.log('Processing feed item', item.link, publishedAt) + + if (!item.link || !publishedAt) { console.log('Invalid feed item', item) continue } - if (new Date(item.isoDate) <= lastFetchedAt) { - console.log('Skipping old feed item', item.title) + if (new Date(publishedAt) <= lastFetchedAt) { + console.log('Skipping old feed item', item.link) continue } const input = { + userId, source: 'rss-feeder', url: item.link, - clientRequestId: '', + saveRequestId: '', labels: [{ name: 'RSS' }], - title: item.title, - originalContent: item.content, rssFeedUrl: feedUrl, } try { - console.log('Saving page', input.title) + console.log('Creating task', input.url) // save page - const result = await sendSavePageMutation(userId, input) - console.log('Saved page', result) + const task = await createCloudTask(CONTENT_FETCH_URL, input) + console.log('Created task', task) } catch (error) { - console.error('Error while saving page', error) + console.error('Error while creating task', error) } } diff --git a/packages/rss-handler/src/task.ts b/packages/rss-handler/src/task.ts new file mode 100644 index 000000000..8a89fbd43 --- /dev/null +++ b/packages/rss-handler/src/task.ts @@ -0,0 +1,48 @@ +/* eslint-disable @typescript-eslint/restrict-template-expressions */ +import { CloudTasksClient, protos } from '@google-cloud/tasks' + +const cloudTask = new CloudTasksClient() + +export const CONTENT_FETCH_URL = process.env.CONTENT_FETCH_GCF_URL + +export const createCloudTask = async ( + taskHandlerUrl: string | undefined, + payload: unknown, + requestHeaders?: Record, + queue = 'omnivore-rss-feed-queue' +) => { + const location = process.env.GCP_LOCATION + const project = process.env.GCP_PROJECT_ID + + if (!project || !location || !queue || !taskHandlerUrl) { + throw `Environment not configured: ${project}, ${location}, ${queue}, ${taskHandlerUrl}` + } + + const serviceAccountEmail = `${project}@appspot.gserviceaccount.com` + + const parent = cloudTask.queuePath(project, location, queue) + const convertedPayload = JSON.stringify(payload) + const body = Buffer.from(convertedPayload).toString('base64') + const task: protos.google.cloud.tasks.v2.ITask = { + httpRequest: { + httpMethod: 'POST', + url: taskHandlerUrl, + headers: { + 'Content-Type': 'application/json', + ...requestHeaders, + }, + body, + ...(serviceAccountEmail + ? { + oidcToken: { + serviceAccountEmail, + }, + } + : null), + }, + } + + return cloudTask.createTask({ parent, task }).then((result) => { + return result[0].name ?? undefined + }) +}