fetch rss item by url
This commit is contained in:
@ -335,6 +335,7 @@ async function fetchContent(req, res) {
|
||||
const urlStr = (req.query ? req.query.url : undefined) || (req.body ? req.body.url : undefined);
|
||||
const locale = (req.query ? req.query.locale : undefined) || (req.body ? req.body.locale : undefined);
|
||||
const timezone = (req.query ? req.query.timezone : undefined) || (req.body ? req.body.timezone : undefined);
|
||||
const rssFeedUrl = (req.query ? req.query.rssFeedUrl : undefined) || (req.body ? req.body.rssFeedUrl : undefined);
|
||||
|
||||
let logRecord = {
|
||||
url: urlStr,
|
||||
@ -348,6 +349,7 @@ async function fetchContent(req, res) {
|
||||
taskId: taskId,
|
||||
locale,
|
||||
timezone,
|
||||
rssFeedUrl,
|
||||
};
|
||||
|
||||
console.info(`Article parsing request`, logRecord);
|
||||
@ -453,6 +455,7 @@ async function fetchContent(req, res) {
|
||||
parseResult: readabilityResult,
|
||||
state,
|
||||
labels,
|
||||
rssFeedUrl,
|
||||
});
|
||||
if (!apiResponse) {
|
||||
logRecord.error = 'error while saving page';
|
||||
|
||||
@ -21,6 +21,7 @@
|
||||
},
|
||||
"dependencies": {
|
||||
"@google-cloud/functions-framework": "3.1.2",
|
||||
"@google-cloud/tasks": "^3.0.5",
|
||||
"@sentry/serverless": "^6.16.1",
|
||||
"axios": "^1.4.0",
|
||||
"dotenv": "^16.0.1",
|
||||
|
||||
@ -4,6 +4,7 @@ import * as dotenv from 'dotenv' // see https://github.com/motdotla/dotenv#how-d
|
||||
import * as jwt from 'jsonwebtoken'
|
||||
import Parser from 'rss-parser'
|
||||
import { promisify } from 'util'
|
||||
import { CONTENT_FETCH_URL, createCloudTask } from './task'
|
||||
|
||||
interface RssFeedRequest {
|
||||
subscriptionId: string
|
||||
@ -21,57 +22,6 @@ function isRssFeedRequest(body: any): body is RssFeedRequest {
|
||||
)
|
||||
}
|
||||
|
||||
const sendSavePageMutation = async (userId: string, input: unknown) => {
|
||||
const JWT_SECRET = process.env.JWT_SECRET
|
||||
const REST_BACKEND_ENDPOINT = process.env.REST_BACKEND_ENDPOINT
|
||||
|
||||
if (!JWT_SECRET || !REST_BACKEND_ENDPOINT) {
|
||||
throw 'Environment not configured correctly'
|
||||
}
|
||||
|
||||
const data = JSON.stringify({
|
||||
query: `mutation SavePage ($input: SavePageInput!){
|
||||
savePage(input:$input){
|
||||
... on SaveSuccess{
|
||||
url
|
||||
clientRequestId
|
||||
}
|
||||
... on SaveError{
|
||||
errorCodes
|
||||
}
|
||||
}
|
||||
}`,
|
||||
variables: {
|
||||
input: Object.assign({}, input, { source: 'puppeteer-parse' }),
|
||||
},
|
||||
})
|
||||
|
||||
const auth = (await signToken({ uid: userId }, JWT_SECRET)) as string
|
||||
try {
|
||||
const response = await axios.post(
|
||||
`${REST_BACKEND_ENDPOINT}/graphql`,
|
||||
data,
|
||||
{
|
||||
headers: {
|
||||
Cookie: `auth=${auth};`,
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
timeout: 30000, // 30s
|
||||
}
|
||||
)
|
||||
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||
return !!response.data.data.savePage
|
||||
} catch (error) {
|
||||
if (axios.isAxiosError(error)) {
|
||||
console.error('save page mutation error', error.message)
|
||||
} else {
|
||||
console.error(error)
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
const sendUpdateSubscriptionMutation = async (
|
||||
userId: string,
|
||||
subscriptionId: string,
|
||||
@ -155,40 +105,44 @@ export const rssHandler = Sentry.GCPFunction.wrapHttpFunction(
|
||||
}
|
||||
|
||||
const { userId, feedUrl, subscriptionId, lastFetchedAt } = req.body
|
||||
console.log('Processing feed', feedUrl, lastFetchedAt)
|
||||
|
||||
// fetch feed
|
||||
const feed = await parser.parseURL(feedUrl)
|
||||
const newFetchedAt = new Date()
|
||||
console.log('Fetched feed', feed.title, newFetchedAt)
|
||||
|
||||
// save each item in the feed
|
||||
for (const item of feed.items) {
|
||||
if (!item.link || !item.title || !item.content || !item.isoDate) {
|
||||
for await (const item of feed.items) {
|
||||
const publishedAt = item.pubDate || item.isoDate
|
||||
console.log('Processing feed item', item.link, publishedAt)
|
||||
|
||||
if (!item.link || !publishedAt) {
|
||||
console.log('Invalid feed item', item)
|
||||
continue
|
||||
}
|
||||
|
||||
if (new Date(item.isoDate) <= lastFetchedAt) {
|
||||
console.log('Skipping old feed item', item.title)
|
||||
if (new Date(publishedAt) <= lastFetchedAt) {
|
||||
console.log('Skipping old feed item', item.link)
|
||||
continue
|
||||
}
|
||||
|
||||
const input = {
|
||||
userId,
|
||||
source: 'rss-feeder',
|
||||
url: item.link,
|
||||
clientRequestId: '',
|
||||
saveRequestId: '',
|
||||
labels: [{ name: 'RSS' }],
|
||||
title: item.title,
|
||||
originalContent: item.content,
|
||||
rssFeedUrl: feedUrl,
|
||||
}
|
||||
|
||||
try {
|
||||
console.log('Saving page', input.title)
|
||||
console.log('Creating task', input.url)
|
||||
// save page
|
||||
const result = await sendSavePageMutation(userId, input)
|
||||
console.log('Saved page', result)
|
||||
const task = await createCloudTask(CONTENT_FETCH_URL, input)
|
||||
console.log('Created task', task)
|
||||
} catch (error) {
|
||||
console.error('Error while saving page', error)
|
||||
console.error('Error while creating task', error)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
48
packages/rss-handler/src/task.ts
Normal file
48
packages/rss-handler/src/task.ts
Normal file
@ -0,0 +1,48 @@
|
||||
/* eslint-disable @typescript-eslint/restrict-template-expressions */
|
||||
import { CloudTasksClient, protos } from '@google-cloud/tasks'
|
||||
|
||||
const cloudTask = new CloudTasksClient()
|
||||
|
||||
export const CONTENT_FETCH_URL = process.env.CONTENT_FETCH_GCF_URL
|
||||
|
||||
export const createCloudTask = async (
|
||||
taskHandlerUrl: string | undefined,
|
||||
payload: unknown,
|
||||
requestHeaders?: Record<string, string>,
|
||||
queue = 'omnivore-rss-feed-queue'
|
||||
) => {
|
||||
const location = process.env.GCP_LOCATION
|
||||
const project = process.env.GCP_PROJECT_ID
|
||||
|
||||
if (!project || !location || !queue || !taskHandlerUrl) {
|
||||
throw `Environment not configured: ${project}, ${location}, ${queue}, ${taskHandlerUrl}`
|
||||
}
|
||||
|
||||
const serviceAccountEmail = `${project}@appspot.gserviceaccount.com`
|
||||
|
||||
const parent = cloudTask.queuePath(project, location, queue)
|
||||
const convertedPayload = JSON.stringify(payload)
|
||||
const body = Buffer.from(convertedPayload).toString('base64')
|
||||
const task: protos.google.cloud.tasks.v2.ITask = {
|
||||
httpRequest: {
|
||||
httpMethod: 'POST',
|
||||
url: taskHandlerUrl,
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
...requestHeaders,
|
||||
},
|
||||
body,
|
||||
...(serviceAccountEmail
|
||||
? {
|
||||
oidcToken: {
|
||||
serviceAccountEmail,
|
||||
},
|
||||
}
|
||||
: null),
|
||||
},
|
||||
}
|
||||
|
||||
return cloudTask.createTask({ parent, task }).then((result) => {
|
||||
return result[0].name ?? undefined
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user