remove job id

This commit is contained in:
Hongbo Wu
2024-01-18 18:48:28 +08:00
parent a6f0e2f2d9
commit d4f49dceb6
4 changed files with 71 additions and 24 deletions

View File

@ -1,13 +1,12 @@
import axios from 'axios'
import crypto from 'crypto'
import * as dotenv from 'dotenv' // see https://github.com/motdotla/dotenv#how-do-i-use-dotenv-with-import
import * as jwt from 'jsonwebtoken'
import { parseHTML } from 'linkedom'
import Parser, { Item } from 'rss-parser'
import { promisify } from 'util'
import createHttpTaskWithToken from '../../utils/createTask'
import { env } from '../../env'
import { redisDataSource } from '../../redis_data_source'
import createHttpTaskWithToken from '../../utils/createTask'
type FolderType = 'following' | 'inbox'
@ -59,6 +58,18 @@ export type RssFeedItem = Item & {
link: string
}
interface User {
id: string
folder: FolderType
}
interface FetchContentTask {
users: Map<string, User> // userId -> User
item: RssFeedItem
}
const fetchContentTasks = new Map<string, FetchContentTask>() // url -> FetchContentTask
export const isOldItem = (item: RssFeedItem, lastFetchedAt: number) => {
// existing items and items that were published before 24h
const publishedAt = item.isoDate ? new Date(item.isoDate) : new Date()
@ -274,6 +285,25 @@ const isItemRecentlySaved = async (userId: string, url: string) => {
return false
}
const addFetchContentTask = (
userId: string,
folder: FolderType,
item: RssFeedItem
) => {
const url = item.link
const task = fetchContentTasks.get(url)
if (!task) {
fetchContentTasks.set(url, {
users: new Map([[userId, { id: userId, folder }]]),
item,
})
} else {
task.users.set(userId, { id: userId, folder })
}
return true
}
const createTask = async (
userId: string,
feedUrl: string,
@ -291,17 +321,16 @@ const createTask = async (
return createItemWithPreviewContent(userId, feedUrl, item)
}
return fetchContentAndCreateItem(userId, feedUrl, item, folder)
return addFetchContentTask(userId, folder, item)
}
const fetchContentAndCreateItem = async (
userId: string,
users: User[],
feedUrl: string,
item: RssFeedItem,
folder: string
item: RssFeedItem
) => {
const payload = {
userId,
users,
source: 'rss-feeder',
url: item.link.trim(),
saveRequestId: '',
@ -309,7 +338,6 @@ const fetchContentAndCreateItem = async (
rssFeedUrl: feedUrl,
savedAt: item.isoDate,
publishedAt: item.isoDate,
folder,
}
try {
@ -367,12 +395,6 @@ const createItemWithPreviewContent = async (
}
}
dotenv.config()
// Sentry.GCPFunction.init({
// dsn: process.env.SENTRY_DSN,
// tracesSampleRate: 0,
// })
const signToken = promisify(jwt.sign)
const parser = new Parser({
customFields: {
@ -663,6 +685,15 @@ export const _refreshFeed = async (request: RefreshFeedRequest) => {
feed
)
}
// create fetch content tasks
for (const task of fetchContentTasks.values()) {
await fetchContentAndCreateItem(
Array.from(task.users.values()),
feedUrl,
task.item
)
}
} catch (e) {
console.error('Error while saving RSS feeds', e)
}

View File

@ -2,15 +2,16 @@ import { Readability } from '@omnivore/readability'
import axios from 'axios'
import jwt from 'jsonwebtoken'
import { promisify } from 'util'
import { env } from '../env'
const signToken = promisify(jwt.sign)
const IMPORTER_METRICS_COLLECTOR_URL =
process.env.IMPORTER_METRICS_COLLECTOR_URL
const JWT_SECRET = process.env.JWT_SECRET
const REST_BACKEND_ENDPOINT = process.env.REST_BACKEND_ENDPOINT
const JWT_SECRET = env.server.jwtSecret
const REST_BACKEND_ENDPOINT = process.env.INTERNAL_API_URL
if (!IMPORTER_METRICS_COLLECTOR_URL || !JWT_SECRET || !REST_BACKEND_ENDPOINT) {
if (!IMPORTER_METRICS_COLLECTOR_URL || !REST_BACKEND_ENDPOINT) {
throw new Error('Missing environment variables')
}

View File

@ -1,4 +1,4 @@
import { Queue } from 'bullmq'
import { BulkJobOptions, Queue } from 'bullmq'
import { redis } from './redis'
const QUEUE_NAME = 'omnivore-backend-queue'
@ -29,6 +29,25 @@ const getPriority = (job: savePageJob): number => {
return 5
}
const getAttempts = (job: savePageJob): number => {
if (job.isRss || job.isImport) {
// we don't want to retry rss or import jobs
return 1
}
return 2
}
const getOpts = (job: savePageJob): BulkJobOptions => {
return {
// jobId: `${job.userId}-${job.url}`,
// removeOnComplete: true,
// removeOnFail: true,
attempts: getAttempts(job),
priority: getPriority(job),
}
}
const createQueue = (): Queue | undefined => {
return new Queue(QUEUE_NAME, {
connection: redis,
@ -44,12 +63,7 @@ export const queueSavePageJob = async (savePageJobs: savePageJob[]) => {
const jobs = savePageJobs.map((job) => ({
name: JOB_NAME,
data: job.data,
opts: {
jobId: `${job.userId}-${job.url}`,
removeOnComplete: true,
removeOnFail: true,
priority: getPriority(job),
},
opts: getOpts(job),
}))
return queue.addBulk(jobs)

View File

@ -108,6 +108,7 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
url: finalUrl,
userId: user.id,
data: {
userId: user.id,
url: finalUrl,
title,
content,