Merge pull request #3732 from omnivore-app/fix/redis-oom
remove content from job data
This commit is contained in:
@ -1,10 +1,13 @@
|
||||
import { LiqeQuery } from '@omnivore/liqe'
|
||||
import { ReadingProgressDataSource } from '../datasources/reading_progress_data_source'
|
||||
import { LibraryItemState } from '../entity/library_item'
|
||||
import { LibraryItem, LibraryItemState } from '../entity/library_item'
|
||||
import { Rule, RuleAction, RuleActionType, RuleEventType } from '../entity/rule'
|
||||
import { addLabelsToLibraryItem } from '../services/labels'
|
||||
import {
|
||||
filterItemEvents,
|
||||
ItemEvent,
|
||||
RequiresSearchQueryError,
|
||||
searchLibraryItems,
|
||||
softDeleteLibraryItem,
|
||||
updateLibraryItem,
|
||||
} from '../services/library_item'
|
||||
@ -24,7 +27,7 @@ interface RuleActionObj {
|
||||
libraryItemId: string
|
||||
userId: string
|
||||
action: RuleAction
|
||||
data: ItemEvent
|
||||
data: ItemEvent | LibraryItem
|
||||
}
|
||||
type RuleActionFunc = (obj: RuleActionObj) => Promise<unknown>
|
||||
|
||||
@ -107,33 +110,51 @@ const triggerActions = async (
|
||||
const actionPromises: Promise<unknown>[] = []
|
||||
|
||||
for (const rule of rules) {
|
||||
let filteredData: ItemEvent
|
||||
let ast: LiqeQuery
|
||||
let results: (ItemEvent | LibraryItem)[]
|
||||
|
||||
try {
|
||||
const ast = parseSearchQuery(rule.filter)
|
||||
// filter library item by rule filter
|
||||
const results = filterItemEvents(ast, [data])
|
||||
if (results.length === 0) {
|
||||
logger.info(`No items found for rule ${rule.id}`)
|
||||
continue
|
||||
}
|
||||
|
||||
filteredData = results[0]
|
||||
ast = parseSearchQuery(rule.filter)
|
||||
} catch (error) {
|
||||
// failed to search for library items, mark rule as failed
|
||||
logger.error('Error parsing filter in rules', error)
|
||||
await markRuleAsFailed(rule.id, userId)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// filter library item by metadata
|
||||
try {
|
||||
results = filterItemEvents(ast, [data])
|
||||
} catch (error) {
|
||||
if (error instanceof RequiresSearchQueryError) {
|
||||
logger.info('Failed to filter items by metadata, running search query')
|
||||
const searchResult = await searchLibraryItems(
|
||||
{
|
||||
query: `includes:${libraryItemId} AND (${rule.filter})`,
|
||||
size: 1,
|
||||
},
|
||||
userId
|
||||
)
|
||||
results = searchResult.libraryItems
|
||||
} else {
|
||||
logger.error('Error filtering item events', error)
|
||||
await markRuleAsFailed(rule.id, userId)
|
||||
|
||||
continue
|
||||
}
|
||||
}
|
||||
if (results.length === 0) {
|
||||
logger.info(`No items found for rule ${rule.id}`)
|
||||
continue
|
||||
}
|
||||
|
||||
for (const action of rule.actions) {
|
||||
const actionFunc = getRuleAction(action.type)
|
||||
const actionObj: RuleActionObj = {
|
||||
libraryItemId,
|
||||
userId,
|
||||
action,
|
||||
data: filteredData,
|
||||
data: results[0],
|
||||
}
|
||||
|
||||
actionPromises.push(actionFunc(actionObj))
|
||||
|
||||
@ -43,6 +43,12 @@ export type UpdateItemEvent = Omit<
|
||||
IgnoredFields
|
||||
>
|
||||
|
||||
export class RequiresSearchQueryError extends Error {
|
||||
constructor() {
|
||||
super('Requires a search query')
|
||||
}
|
||||
}
|
||||
|
||||
enum ReadFilter {
|
||||
ALL = 'all',
|
||||
READ = 'read',
|
||||
@ -869,7 +875,11 @@ export const updateLibraryItem = async (
|
||||
// send create event if the item was created
|
||||
await pubsub.entityCreated<CreateItemEvent>(
|
||||
EntityType.PAGE,
|
||||
updatedLibraryItem,
|
||||
{
|
||||
...updatedLibraryItem,
|
||||
originalContent: undefined,
|
||||
readableContent: undefined,
|
||||
},
|
||||
userId,
|
||||
id
|
||||
)
|
||||
@ -879,7 +889,11 @@ export const updateLibraryItem = async (
|
||||
|
||||
await pubsub.entityUpdated<UpdateItemEvent>(
|
||||
EntityType.PAGE,
|
||||
libraryItem,
|
||||
{
|
||||
...libraryItem,
|
||||
originalContent: undefined,
|
||||
readableContent: undefined,
|
||||
},
|
||||
userId,
|
||||
id
|
||||
)
|
||||
@ -1043,7 +1057,11 @@ export const createOrUpdateLibraryItem = async (
|
||||
|
||||
await pubsub.entityCreated<CreateItemEvent>(
|
||||
EntityType.PAGE,
|
||||
newLibraryItem,
|
||||
{
|
||||
...newLibraryItem,
|
||||
originalContent: undefined,
|
||||
readableContent: undefined,
|
||||
},
|
||||
userId,
|
||||
newLibraryItem.id
|
||||
)
|
||||
@ -1320,7 +1338,7 @@ export const findLibraryItemIdsByLabelId = async (
|
||||
export const filterItemEvents = (
|
||||
ast: LiqeQuery,
|
||||
events: readonly ItemEvent[]
|
||||
): readonly ItemEvent[] => {
|
||||
): ItemEvent[] => {
|
||||
const testNo = (value: string, event: ItemEvent) => {
|
||||
const keywordRegexMap: Record<string, RegExp> = {
|
||||
highlightAnnotations: /^highlight(s)?$/i,
|
||||
@ -1353,28 +1371,10 @@ export const filterItemEvents = (
|
||||
throw new Error('Expected a literal expression.')
|
||||
}
|
||||
|
||||
const lowercasedValue = expression.value?.toString().toLowerCase()
|
||||
const lowercasedValue = expression.value?.toString()?.toLowerCase()
|
||||
|
||||
if (field.type === 'ImplicitField') {
|
||||
if (!lowercasedValue) {
|
||||
return true
|
||||
}
|
||||
|
||||
const textFields = [
|
||||
'author',
|
||||
'title',
|
||||
'description',
|
||||
'note',
|
||||
'siteName',
|
||||
'readableContent',
|
||||
'originalUrl',
|
||||
]
|
||||
const text = textFields
|
||||
.map((field) => event[field as keyof ItemEvent])
|
||||
.join(' ')
|
||||
|
||||
// TODO: Implement full text search
|
||||
return text.match(new RegExp(lowercasedValue, 'i'))
|
||||
throw new RequiresSearchQueryError()
|
||||
}
|
||||
|
||||
if (!lowercasedValue) {
|
||||
@ -1422,7 +1422,7 @@ export const filterItemEvents = (
|
||||
}
|
||||
}
|
||||
case 'type': {
|
||||
return event.itemType?.toString().toLowerCase() === lowercasedValue
|
||||
return event.itemType?.toString()?.toLowerCase() === lowercasedValue
|
||||
}
|
||||
case 'label': {
|
||||
const labels = event.labelNames as string[] | undefined
|
||||
@ -1495,7 +1495,7 @@ export const filterItemEvents = (
|
||||
// get camel case column name
|
||||
const key = camelCase(columnName) as 'subscription' | 'itemLanguage'
|
||||
|
||||
return event[key]?.toString().toLowerCase() === lowercasedValue
|
||||
return event[key]?.toString()?.toLowerCase() === lowercasedValue
|
||||
}
|
||||
// match filters
|
||||
case 'author':
|
||||
@ -1512,7 +1512,7 @@ export const filterItemEvents = (
|
||||
| 'siteName'
|
||||
|
||||
// TODO: Implement full text search
|
||||
return event[key]?.toString().match(new RegExp(lowercasedValue, 'i'))
|
||||
return event[key]?.toString()?.match(new RegExp(lowercasedValue, 'i'))
|
||||
}
|
||||
case 'includes': {
|
||||
const ids = lowercasedValue.split(',')
|
||||
|
||||
@ -41,6 +41,7 @@ export const addRecommendation = async (
|
||||
uploadFile: item.uploadFile,
|
||||
wordCount: item.wordCount,
|
||||
publishedAt: item.publishedAt,
|
||||
recommenderNames: item.recommenderNames,
|
||||
}
|
||||
|
||||
recommendedItem = await createOrUpdateLibraryItem(newItem, userId)
|
||||
|
||||
Reference in New Issue
Block a user