Merge remote-tracking branch 'origin/main' into feat/android-mark-ad-read
This commit is contained in:
@ -18,4 +18,7 @@ export const appDataSource = new DataSource({
|
||||
logger: new CustomTypeOrmLogger(['query', 'info']),
|
||||
connectTimeoutMS: 40000, // 40 seconds
|
||||
maxQueryExecutionTime: 10000, // 10 seconds
|
||||
extra: {
|
||||
options: process.env.PG_EXTRA_OPTIONS,
|
||||
},
|
||||
})
|
||||
|
||||
@ -38,7 +38,6 @@ export enum DirectionalityType {
|
||||
RTL = 'RTL',
|
||||
}
|
||||
|
||||
@Unique('library_item_user_original_url', ['user', 'originalUrl'])
|
||||
@Entity({ name: 'library_item' })
|
||||
export class LibraryItem {
|
||||
@PrimaryGeneratedColumn('uuid')
|
||||
|
||||
44
packages/api/src/jobs/bulk_action.ts
Normal file
44
packages/api/src/jobs/bulk_action.ts
Normal file
@ -0,0 +1,44 @@
|
||||
import { BulkActionType } from '../generated/graphql'
|
||||
import { getBackendQueue } from '../queue-processor'
|
||||
import { batchUpdateLibraryItems } from '../services/library_item'
|
||||
import { logger } from '../utils/logger'
|
||||
|
||||
export interface BulkActionData {
|
||||
count: number
|
||||
userId: string
|
||||
action: BulkActionType
|
||||
query: string
|
||||
batchSize: number
|
||||
labelIds?: string[]
|
||||
args?: unknown
|
||||
}
|
||||
|
||||
export const BULK_ACTION_JOB_NAME = 'bulk-action'
|
||||
|
||||
export const bulkAction = async (data: BulkActionData) => {
|
||||
const { userId, action, query, labelIds, args, batchSize, count } = data
|
||||
|
||||
const queue = await getBackendQueue()
|
||||
if (!queue) {
|
||||
throw new Error('Queue not initialized')
|
||||
}
|
||||
const now = new Date().toISOString()
|
||||
let offset = 0
|
||||
|
||||
do {
|
||||
const searchArgs = {
|
||||
size: batchSize,
|
||||
query: `(${query}) AND updated:*..${now}`, // only process items that have not been updated
|
||||
}
|
||||
|
||||
try {
|
||||
await batchUpdateLibraryItems(action, searchArgs, userId, labelIds, args)
|
||||
} catch (error) {
|
||||
logger.error('batch update error', error)
|
||||
}
|
||||
|
||||
offset += batchSize
|
||||
} while (offset < count)
|
||||
|
||||
return true
|
||||
}
|
||||
56
packages/api/src/jobs/call_webhook.ts
Normal file
56
packages/api/src/jobs/call_webhook.ts
Normal file
@ -0,0 +1,56 @@
|
||||
import axios, { Method } from 'axios'
|
||||
import { findWebhooksByEventType } from '../services/webhook'
|
||||
import { logger } from '../utils/logger'
|
||||
|
||||
export interface CallWebhookJobData {
|
||||
data: unknown
|
||||
userId: string
|
||||
type: string
|
||||
action: string
|
||||
}
|
||||
|
||||
export const CALL_WEBHOOK_JOB_NAME = 'call-webhook'
|
||||
const TIMEOUT = 5000 // 5s
|
||||
|
||||
export const callWebhook = async (jobData: CallWebhookJobData) => {
|
||||
const { data, type, action, userId } = jobData
|
||||
const eventType = `${type}_${action}`.toUpperCase()
|
||||
const webhooks = await findWebhooksByEventType(userId, eventType)
|
||||
|
||||
if (webhooks.length <= 0) {
|
||||
return
|
||||
}
|
||||
|
||||
await Promise.all(
|
||||
webhooks.map((webhook) => {
|
||||
const url = webhook.url
|
||||
const method = webhook.method as Method
|
||||
const body = {
|
||||
action,
|
||||
userId,
|
||||
[type]: data,
|
||||
}
|
||||
|
||||
logger.info('triggering webhook', { url, method })
|
||||
|
||||
return axios
|
||||
.request({
|
||||
url,
|
||||
method,
|
||||
headers: {
|
||||
'Content-Type': webhook.contentType,
|
||||
},
|
||||
data: body,
|
||||
timeout: TIMEOUT,
|
||||
})
|
||||
.then(() => logger.info('webhook triggered'))
|
||||
.catch((error) => {
|
||||
if (axios.isAxiosError(error)) {
|
||||
logger.info('webhook failed', error.response)
|
||||
} else {
|
||||
logger.info('webhook failed', error)
|
||||
}
|
||||
})
|
||||
})
|
||||
)
|
||||
}
|
||||
@ -126,6 +126,15 @@ export const isContentFetchBlocked = (feedUrl: string) => {
|
||||
if (feedUrl.startsWith('https://arxiv.org/')) {
|
||||
return true
|
||||
}
|
||||
if (feedUrl.startsWith('https://rss.arxiv.org')) {
|
||||
return true
|
||||
}
|
||||
if (feedUrl.startsWith('https://xkcd.com')) {
|
||||
return true
|
||||
}
|
||||
if (feedUrl.startsWith('https://daringfireball.net/feeds/')) {
|
||||
return true
|
||||
}
|
||||
if (feedUrl.startsWith('https://lwn.net/headlines/newrss')) {
|
||||
return true
|
||||
}
|
||||
@ -517,6 +526,9 @@ const processSubscription = async (
|
||||
|
||||
// Max limit per-feed update
|
||||
if (itemCount > 99) {
|
||||
if (itemCount == 100) {
|
||||
logger.info(`Max limit reached for feed ${feedUrl}`)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@ -25,6 +25,14 @@ async function* getSyncUpdatesIterator(redis: Redis) {
|
||||
return
|
||||
}
|
||||
|
||||
const isMoreThan60SecondsOld = (iso8601String: string): boolean => {
|
||||
const currentTime = new Date()
|
||||
const parsedDate = new Date(iso8601String)
|
||||
const timeDifferenceInSeconds =
|
||||
(currentTime.getTime() - parsedDate.getTime()) / 1000
|
||||
return timeDifferenceInSeconds > 60
|
||||
}
|
||||
|
||||
const syncReadPosition = async (cacheKey: string) => {
|
||||
const components = componentsForCachedReadingPositionKey(cacheKey)
|
||||
const positions = components
|
||||
@ -37,7 +45,9 @@ const syncReadPosition = async (cacheKey: string) => {
|
||||
components &&
|
||||
positions &&
|
||||
positions.positionItems &&
|
||||
positions.positionItems.length > 0
|
||||
positions.positionItems.length > 0 &&
|
||||
positions.positionItems[0].updatedAt &&
|
||||
isMoreThan60SecondsOld(positions.positionItems[0].updatedAt)
|
||||
) {
|
||||
const position = reduceCachedReadingPositionMembers(
|
||||
components.uid,
|
||||
@ -65,11 +75,6 @@ const syncReadPosition = async (cacheKey: string) => {
|
||||
{ cacheKey }
|
||||
)
|
||||
}
|
||||
} else {
|
||||
logger.warning(
|
||||
'potential error, reading position cache key found with no data',
|
||||
{ cacheKey }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -3,7 +3,7 @@ import express from 'express'
|
||||
import { RuleEventType } from './entity/rule'
|
||||
import { env } from './env'
|
||||
import { ReportType } from './generated/graphql'
|
||||
import { enqueueTriggerRuleJob } from './utils/createTask'
|
||||
import { enqueueTriggerRuleJob, enqueueWebhookJob } from './utils/createTask'
|
||||
import { deepDelete } from './utils/helpers'
|
||||
import { buildLogger } from './utils/logger'
|
||||
|
||||
@ -63,6 +63,13 @@ export const createPubSubClient = (): PubsubClient => {
|
||||
[...fieldsToDelete]
|
||||
)
|
||||
|
||||
await enqueueWebhookJob({
|
||||
userId,
|
||||
type,
|
||||
action: 'created',
|
||||
data,
|
||||
})
|
||||
|
||||
return publish(
|
||||
'entityCreated',
|
||||
Buffer.from(JSON.stringify({ type, userId, ...cleanData }))
|
||||
@ -88,6 +95,13 @@ export const createPubSubClient = (): PubsubClient => {
|
||||
[...fieldsToDelete]
|
||||
)
|
||||
|
||||
await enqueueWebhookJob({
|
||||
userId,
|
||||
type,
|
||||
action: 'updated',
|
||||
data,
|
||||
})
|
||||
|
||||
return publish(
|
||||
'entityUpdated',
|
||||
Buffer.from(JSON.stringify({ type, userId, ...cleanData }))
|
||||
|
||||
@ -14,10 +14,16 @@ import express, { Express } from 'express'
|
||||
import { SnakeNamingStrategy } from 'typeorm-naming-strategies'
|
||||
import { appDataSource } from './data_source'
|
||||
import { env } from './env'
|
||||
import { bulkAction, BULK_ACTION_JOB_NAME } from './jobs/bulk_action'
|
||||
import { CALL_WEBHOOK_JOB_NAME, callWebhook } from './jobs/call_webhook'
|
||||
import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail'
|
||||
import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds'
|
||||
import { refreshFeed } from './jobs/rss/refreshFeed'
|
||||
import { savePageJob } from './jobs/save_page'
|
||||
import {
|
||||
syncReadPositionsJob,
|
||||
SYNC_READ_POSITIONS_JOB_NAME,
|
||||
} from './jobs/sync_read_positions'
|
||||
import { triggerRule, TRIGGER_RULE_JOB_NAME } from './jobs/trigger_rule'
|
||||
import {
|
||||
updateHighlight,
|
||||
@ -27,12 +33,8 @@ import {
|
||||
} from './jobs/update_db'
|
||||
import { updatePDFContentJob } from './jobs/update_pdf_content'
|
||||
import { redisDataSource } from './redis_data_source'
|
||||
import { logger, CustomTypeOrmLogger } from './utils/logger'
|
||||
import {
|
||||
SYNC_READ_POSITIONS_JOB_NAME,
|
||||
syncReadPositionsJob,
|
||||
} from './jobs/sync_read_positions'
|
||||
import { CACHED_READING_POSITION_PREFIX } from './services/cached_reading_position'
|
||||
import { CustomTypeOrmLogger, logger } from './utils/logger'
|
||||
|
||||
export const QUEUE_NAME = 'omnivore-backend-queue'
|
||||
|
||||
@ -84,6 +86,10 @@ export const createWorker = (connection: ConnectionOptions) =>
|
||||
return updateHighlight(job.data)
|
||||
case SYNC_READ_POSITIONS_JOB_NAME:
|
||||
return syncReadPositionsJob(job.data)
|
||||
case BULK_ACTION_JOB_NAME:
|
||||
return bulkAction(job.data)
|
||||
case CALL_WEBHOOK_JOB_NAME:
|
||||
return callWebhook(job.data)
|
||||
}
|
||||
},
|
||||
{
|
||||
|
||||
@ -8,10 +8,12 @@ export const libraryItemRepository = appDataSource
|
||||
return this.findOneBy({ id })
|
||||
},
|
||||
|
||||
findByUrl(url: string) {
|
||||
return this.findOneBy({
|
||||
originalUrl: url,
|
||||
})
|
||||
findByUserIdAndUrl(userId: string, url: string) {
|
||||
// md5 is used to hash the url to avoid the length limit of the index
|
||||
return this.createQueryBuilder()
|
||||
.where('user_id = :userId', { userId })
|
||||
.andWhere('md5(original_url) = md5(:url)', { url })
|
||||
.getOne()
|
||||
},
|
||||
|
||||
countByCreatedAt(createdAt: Date) {
|
||||
|
||||
@ -16,7 +16,6 @@ import {
|
||||
BulkActionError,
|
||||
BulkActionErrorCode,
|
||||
BulkActionSuccess,
|
||||
BulkActionType,
|
||||
ContentReader,
|
||||
CreateArticleError,
|
||||
CreateArticleErrorCode,
|
||||
@ -64,12 +63,12 @@ import { authTrx, getColumns } from '../../repository'
|
||||
import { getInternalLabelWithColor } from '../../repository/label'
|
||||
import { libraryItemRepository } from '../../repository/library_item'
|
||||
import { userRepository } from '../../repository/user'
|
||||
import { clearCachedReadingPosition } from '../../services/cached_reading_position'
|
||||
import { createPageSaveRequest } from '../../services/create_page_save_request'
|
||||
import { findHighlightsByLibraryItemId } from '../../services/highlights'
|
||||
import {
|
||||
addLabelsToLibraryItem,
|
||||
createAndSaveLabelsInLibraryItem,
|
||||
findLabelsByIds,
|
||||
findOrCreateLabels,
|
||||
} from '../../services/labels'
|
||||
import {
|
||||
@ -93,6 +92,7 @@ import {
|
||||
import { traceAs } from '../../tracing'
|
||||
import { analytics } from '../../utils/analytics'
|
||||
import { isSiteBlockedForParse } from '../../utils/blocked'
|
||||
import { enqueueBulkAction } from '../../utils/createTask'
|
||||
import { authorized } from '../../utils/gql-utils'
|
||||
import {
|
||||
cleanUrl,
|
||||
@ -112,10 +112,6 @@ import {
|
||||
parsePreparedContent,
|
||||
} from '../../utils/parser'
|
||||
import { getStorageFileDetails } from '../../utils/uploads'
|
||||
import {
|
||||
clearCachedReadingPosition,
|
||||
fetchCachedReadingPosition,
|
||||
} from '../../services/cached_reading_position'
|
||||
|
||||
export enum ArticleFormat {
|
||||
Markdown = 'markdown',
|
||||
@ -879,36 +875,41 @@ export const bulkActionResolver = authorized<
|
||||
},
|
||||
})
|
||||
|
||||
// the query size is limited to 4000 characters to allow for 100 items
|
||||
if (!query || query.length > 4000) {
|
||||
log.error('bulkActionResolver error', {
|
||||
error: 'QueryTooLong',
|
||||
query,
|
||||
})
|
||||
const batchSize = 100
|
||||
const searchArgs = {
|
||||
query,
|
||||
size: 0,
|
||||
}
|
||||
const searchResult = await searchLibraryItems(searchArgs, uid)
|
||||
const count = searchResult.count
|
||||
if (count === 0) {
|
||||
log.info('No items found for bulk action')
|
||||
return { success: true }
|
||||
}
|
||||
|
||||
if (count <= batchSize) {
|
||||
searchArgs.size = count
|
||||
// if there are less than 100 items, update them synchronously
|
||||
await batchUpdateLibraryItems(action, searchArgs, uid, labelIds, args)
|
||||
|
||||
return { success: true }
|
||||
}
|
||||
|
||||
// if there are more than 100 items, update them asynchronously
|
||||
const data = {
|
||||
userId: uid,
|
||||
action,
|
||||
labelIds: labelIds || undefined,
|
||||
query,
|
||||
count,
|
||||
args,
|
||||
batchSize,
|
||||
}
|
||||
const job = await enqueueBulkAction(data)
|
||||
if (!job) {
|
||||
return { errorCodes: [BulkActionErrorCode.BadRequest] }
|
||||
}
|
||||
|
||||
// get labels if needed
|
||||
let labels = undefined
|
||||
if (action === BulkActionType.AddLabels) {
|
||||
if (!labelIds || labelIds.length === 0) {
|
||||
return { errorCodes: [BulkActionErrorCode.BadRequest] }
|
||||
}
|
||||
|
||||
labels = await findLabelsByIds(labelIds, uid)
|
||||
}
|
||||
|
||||
await batchUpdateLibraryItems(
|
||||
action,
|
||||
{
|
||||
query,
|
||||
useFolders: query.includes('use:folders'),
|
||||
},
|
||||
uid,
|
||||
labels,
|
||||
args
|
||||
)
|
||||
|
||||
return { success: true }
|
||||
} catch (error) {
|
||||
log.error('bulkActionResolver error', error)
|
||||
|
||||
@ -1,7 +1,13 @@
|
||||
import { ExpressionToken, LiqeQuery } from '@omnivore/liqe'
|
||||
import { DateTime } from 'luxon'
|
||||
import { DeepPartial, FindOptionsWhere, ObjectLiteral } from 'typeorm'
|
||||
import {
|
||||
DeepPartial,
|
||||
EntityManager,
|
||||
FindOptionsWhere,
|
||||
ObjectLiteral,
|
||||
} from 'typeorm'
|
||||
import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity'
|
||||
import { ReadingProgressDataSource } from '../datasources/reading_progress_data_source'
|
||||
import { Highlight } from '../entity/highlight'
|
||||
import { Label } from '../entity/label'
|
||||
import { LibraryItem, LibraryItemState } from '../entity/library_item'
|
||||
@ -13,7 +19,6 @@ import {
|
||||
getColumns,
|
||||
isUniqueViolation,
|
||||
queryBuilderToRawSql,
|
||||
valuesToRawSql,
|
||||
} from '../repository'
|
||||
import { libraryItemRepository } from '../repository/library_item'
|
||||
import { setRecentlySavedItemInRedis, wordsCount } from '../utils/helpers'
|
||||
@ -103,6 +108,20 @@ interface Select {
|
||||
alias?: string
|
||||
}
|
||||
|
||||
const readingProgressDataSource = new ReadingProgressDataSource()
|
||||
|
||||
const markItemAsRead = async (libraryItemId: string, userId: string) => {
|
||||
return await readingProgressDataSource.updateReadingProgress(
|
||||
userId,
|
||||
libraryItemId,
|
||||
{
|
||||
readingProgressPercent: 100,
|
||||
readingProgressTopPercent: 100,
|
||||
readingProgressAnchorIndex: undefined,
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
const handleNoCase = (value: string) => {
|
||||
const keywordRegexMap: Record<string, RegExp> = {
|
||||
highlight: /^highlight(s)?$/i,
|
||||
@ -669,7 +688,7 @@ export const findLibraryItemByUrl = async (
|
||||
.leftJoinAndSelect('recommender.profile', 'profile')
|
||||
.leftJoinAndSelect('recommendations.group', 'group')
|
||||
.where('library_item.user_id = :userId', { userId })
|
||||
.andWhere('library_item.original_url = :url', { url })
|
||||
.andWhere('md5(library_item.original_url) = md5(:url)', { url })
|
||||
.getOne(),
|
||||
undefined,
|
||||
userId
|
||||
@ -927,7 +946,7 @@ export const batchUpdateLibraryItems = async (
|
||||
action: BulkActionType,
|
||||
searchArgs: SearchArgs,
|
||||
userId: string,
|
||||
labels?: Label[],
|
||||
labelIds?: string[] | null,
|
||||
args?: unknown
|
||||
) => {
|
||||
interface FolderArguments {
|
||||
@ -938,46 +957,24 @@ export const batchUpdateLibraryItems = async (
|
||||
return 'folder' in args
|
||||
}
|
||||
|
||||
const now = new Date().toISOString()
|
||||
// build the script
|
||||
let values: Record<string, string | number> = {}
|
||||
let addLabels = false
|
||||
switch (action) {
|
||||
case BulkActionType.Archive:
|
||||
values = {
|
||||
archived_at: now,
|
||||
state: LibraryItemState.Archived,
|
||||
}
|
||||
break
|
||||
case BulkActionType.Delete:
|
||||
values = {
|
||||
state: LibraryItemState.Deleted,
|
||||
deleted_at: now,
|
||||
}
|
||||
break
|
||||
case BulkActionType.AddLabels:
|
||||
addLabels = true
|
||||
break
|
||||
case BulkActionType.MarkAsRead:
|
||||
values = {
|
||||
read_at: now,
|
||||
reading_progress_top_percent: 100,
|
||||
reading_progress_bottom_percent: 100,
|
||||
}
|
||||
break
|
||||
case BulkActionType.MoveToFolder:
|
||||
if (!args || !isFolderArguments(args)) {
|
||||
throw new Error('Invalid arguments')
|
||||
}
|
||||
const getQueryBuilder = (userId: string, em: EntityManager) => {
|
||||
const queryBuilder = em
|
||||
.createQueryBuilder(LibraryItem, 'library_item')
|
||||
.where('library_item.user_id = :userId', { userId })
|
||||
if (query) {
|
||||
queryBuilder
|
||||
.andWhere(`(${query})`)
|
||||
.setParameters(paramtersToObject(parameters))
|
||||
}
|
||||
return queryBuilder
|
||||
}
|
||||
|
||||
values = {
|
||||
folder: args.folder,
|
||||
saved_at: now,
|
||||
}
|
||||
|
||||
break
|
||||
default:
|
||||
throw new Error('Invalid bulk action')
|
||||
const getLibraryItemIds = async (
|
||||
userId: string,
|
||||
em: EntityManager
|
||||
): Promise<{ id: string }[]> => {
|
||||
const queryBuilder = getQueryBuilder(userId, em)
|
||||
return queryBuilder.select('library_item.id', 'id').getRawMany()
|
||||
}
|
||||
|
||||
if (!searchArgs.query) {
|
||||
@ -988,63 +985,73 @@ export const batchUpdateLibraryItems = async (
|
||||
const parameters: ObjectLiteral[] = []
|
||||
const query = buildQuery(searchQuery, parameters)
|
||||
|
||||
await authTrx(async (tx) => {
|
||||
const queryBuilder = tx
|
||||
.createQueryBuilder(LibraryItem, 'library_item')
|
||||
.where('library_item.user_id = :userId', { userId })
|
||||
|
||||
if (query) {
|
||||
queryBuilder
|
||||
.andWhere(`(${query})`)
|
||||
.setParameters(paramtersToObject(parameters))
|
||||
}
|
||||
|
||||
if (addLabels) {
|
||||
if (!labels) {
|
||||
const now = new Date().toISOString()
|
||||
// build the script
|
||||
let values: Record<string, string | number> = {}
|
||||
switch (action) {
|
||||
case BulkActionType.Archive:
|
||||
values = {
|
||||
archivedAt: now,
|
||||
state: LibraryItemState.Archived,
|
||||
}
|
||||
break
|
||||
case BulkActionType.Delete:
|
||||
values = {
|
||||
state: LibraryItemState.Deleted,
|
||||
deletedAt: now,
|
||||
}
|
||||
break
|
||||
case BulkActionType.AddLabels: {
|
||||
if (!labelIds) {
|
||||
throw new Error('Labels are required for this action')
|
||||
}
|
||||
|
||||
const labelIds = labels.map((label) => label.id)
|
||||
const libraryItems = await queryBuilder.getMany()
|
||||
// add labels in library items
|
||||
const libraryItems = await authTrx(
|
||||
async (tx) => getLibraryItemIds(userId, tx),
|
||||
undefined,
|
||||
userId
|
||||
)
|
||||
// add labels to library items
|
||||
for (const libraryItem of libraryItems) {
|
||||
await addLabelsToLibraryItem(labelIds, libraryItem.id, userId)
|
||||
|
||||
libraryItem.labels = labels
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
case BulkActionType.MarkAsRead: {
|
||||
const libraryItems = await authTrx(
|
||||
async (tx) => getLibraryItemIds(userId, tx),
|
||||
undefined,
|
||||
userId
|
||||
)
|
||||
// update reading progress for library items
|
||||
for (const libraryItem of libraryItems) {
|
||||
await markItemAsRead(libraryItem.id, userId)
|
||||
}
|
||||
|
||||
// generate raw sql because postgres doesn't support prepared statements in DO blocks
|
||||
const countSql = queryBuilderToRawSql(queryBuilder.select('COUNT(1)'))
|
||||
const subQuery = queryBuilderToRawSql(queryBuilder.select('id'))
|
||||
const valuesSql = valuesToRawSql(values)
|
||||
return
|
||||
}
|
||||
case BulkActionType.MoveToFolder:
|
||||
if (!args || !isFolderArguments(args)) {
|
||||
throw new Error('Invalid arguments')
|
||||
}
|
||||
|
||||
const start = new Date().toISOString()
|
||||
const batchSize = 1000
|
||||
const sql = `
|
||||
-- Set batch size
|
||||
DO $$
|
||||
DECLARE
|
||||
batch_size INT := ${batchSize};
|
||||
BEGIN
|
||||
-- Loop through batches
|
||||
FOR i IN 0..CEIL((${countSql}) * 1.0 / batch_size) - 1 LOOP
|
||||
-- Update the batch
|
||||
UPDATE omnivore.library_item
|
||||
SET ${valuesSql}
|
||||
WHERE id = ANY(
|
||||
${subQuery}
|
||||
AND updated_at < '${start}'
|
||||
LIMIT batch_size
|
||||
);
|
||||
END LOOP;
|
||||
END $$
|
||||
`
|
||||
values = {
|
||||
folder: args.folder,
|
||||
savedAt: now,
|
||||
}
|
||||
|
||||
return tx.query(sql)
|
||||
})
|
||||
break
|
||||
default:
|
||||
throw new Error('Invalid bulk action')
|
||||
}
|
||||
|
||||
await authTrx(
|
||||
async (tx) =>
|
||||
getQueryBuilder(userId, tx).update(LibraryItem).set(values).execute(),
|
||||
undefined,
|
||||
userId
|
||||
)
|
||||
}
|
||||
|
||||
export const deleteLibraryItemById = async (id: string, userId?: string) => {
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import { ArrayContainedBy, ArrayContains, ILike } from 'typeorm'
|
||||
import { ArrayContains, ILike } from 'typeorm'
|
||||
import { Rule, RuleAction, RuleEventType } from '../entity/rule'
|
||||
import { authTrx, getRepository } from '../repository'
|
||||
|
||||
@ -61,6 +61,6 @@ export const findEnabledRules = async (
|
||||
return getRepository(Rule).findBy({
|
||||
user: { id: userId },
|
||||
enabled: true,
|
||||
eventTypes: ArrayContainedBy([eventType]),
|
||||
eventTypes: ArrayContains([eventType]),
|
||||
})
|
||||
}
|
||||
|
||||
@ -13,6 +13,7 @@ import {
|
||||
SaveResult,
|
||||
} from '../generated/graphql'
|
||||
import { authTrx } from '../repository'
|
||||
import { libraryItemRepository } from '../repository/library_item'
|
||||
import { enqueueThumbnailJob } from '../utils/createTask'
|
||||
import {
|
||||
cleanUrl,
|
||||
@ -119,10 +120,9 @@ export const savePage = async (
|
||||
} else {
|
||||
// check if the item already exists
|
||||
const existingLibraryItem = await authTrx((t) =>
|
||||
t.getRepository(LibraryItem).findOneBy({
|
||||
user: { id: user.id },
|
||||
originalUrl: itemToSave.originalUrl,
|
||||
})
|
||||
t
|
||||
.withRepository(libraryItemRepository)
|
||||
.findByUserIdAndUrl(user.id, input.url)
|
||||
)
|
||||
if (existingLibraryItem) {
|
||||
clientRequestId = existingLibraryItem.id
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import { DeepPartial, EntityManager } from 'typeorm'
|
||||
import { ArrayContains, DeepPartial, EntityManager } from 'typeorm'
|
||||
import { Webhook } from '../entity/webhook'
|
||||
import { authTrx } from '../repository'
|
||||
|
||||
@ -34,6 +34,22 @@ export const findWebhooks = async (userId: string) => {
|
||||
)
|
||||
}
|
||||
|
||||
export const findWebhooksByEventType = async (
|
||||
userId: string,
|
||||
eventType: string
|
||||
) => {
|
||||
return authTrx(
|
||||
(tx) =>
|
||||
tx.getRepository(Webhook).findBy({
|
||||
user: { id: userId },
|
||||
enabled: true,
|
||||
eventTypes: ArrayContains([eventType]),
|
||||
}),
|
||||
undefined,
|
||||
userId
|
||||
)
|
||||
}
|
||||
|
||||
export const findWebhookById = async (id: string, userId: string) => {
|
||||
return authTrx(
|
||||
(tx) => tx.getRepository(Webhook).findOneBy({ id, user: { id: userId } }),
|
||||
|
||||
@ -14,6 +14,8 @@ import {
|
||||
ArticleSavingRequestStatus,
|
||||
CreateLabelInput,
|
||||
} from '../generated/graphql'
|
||||
import { BulkActionData, BULK_ACTION_JOB_NAME } from '../jobs/bulk_action'
|
||||
import { CallWebhookJobData, CALL_WEBHOOK_JOB_NAME } from '../jobs/call_webhook'
|
||||
import { THUMBNAIL_JOB } from '../jobs/find_thumbnail'
|
||||
import { queueRSSRefreshFeedJob } from '../jobs/rss/refreshAllFeeds'
|
||||
import { TriggerRuleJobData, TRIGGER_RULE_JOB_NAME } from '../jobs/trigger_rule'
|
||||
@ -662,7 +664,21 @@ export const enqueueTriggerRuleJob = async (data: TriggerRuleJobData) => {
|
||||
}
|
||||
|
||||
return queue.add(TRIGGER_RULE_JOB_NAME, data, {
|
||||
priority: 1,
|
||||
priority: 5,
|
||||
attempts: 1,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
})
|
||||
}
|
||||
|
||||
export const enqueueWebhookJob = async (data: CallWebhookJobData) => {
|
||||
const queue = await getBackendQueue()
|
||||
if (!queue) {
|
||||
return undefined
|
||||
}
|
||||
|
||||
return queue.add(CALL_WEBHOOK_JOB_NAME, data, {
|
||||
priority: 5,
|
||||
attempts: 1,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
@ -716,4 +732,25 @@ export const enqueueUpdateHighlight = async (data: UpdateHighlightData) => {
|
||||
}
|
||||
}
|
||||
|
||||
export const enqueueBulkAction = async (data: BulkActionData) => {
|
||||
const queue = await getBackendQueue()
|
||||
if (!queue) {
|
||||
return undefined
|
||||
}
|
||||
|
||||
const jobId = `${BULK_ACTION_JOB_NAME}-${data.userId}`
|
||||
|
||||
try {
|
||||
return queue.add(BULK_ACTION_JOB_NAME, data, {
|
||||
attempts: 1,
|
||||
priority: 10,
|
||||
jobId, // deduplication
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error('error enqueuing bulk action job', error)
|
||||
}
|
||||
}
|
||||
|
||||
export default createHttpTaskWithToken
|
||||
|
||||
@ -2159,7 +2159,7 @@ describe('Article API', () => {
|
||||
}
|
||||
`
|
||||
|
||||
context('when action is MarkAsRead and query is in:unread', () => {
|
||||
xcontext('when action is MarkAsRead and query is in:unread', () => {
|
||||
before(async () => {
|
||||
// Create some test items
|
||||
for (let i = 0; i < 5; i++) {
|
||||
|
||||
11
packages/db/migrations/0160.do.drop_slow_db_triggers.sql
Executable file
11
packages/db/migrations/0160.do.drop_slow_db_triggers.sql
Executable file
@ -0,0 +1,11 @@
|
||||
-- Type: DO
|
||||
-- Name: drop_slow_db_triggers
|
||||
-- Description: Drop some db triggers which are slow and have cascading effect
|
||||
|
||||
BEGIN;
|
||||
|
||||
DROP TRIGGER IF EXISTS library_item_labels_update ON omnivore.entity_labels;
|
||||
DROP TRIGGER IF EXISTS library_item_highlight_annotations_update ON omnivore.highlight;
|
||||
DROP TRIGGER IF EXISTS label_names_update ON omnivore.labels;
|
||||
|
||||
COMMIT;
|
||||
23
packages/db/migrations/0160.undo.drop_slow_db_triggers.sql
Executable file
23
packages/db/migrations/0160.undo.drop_slow_db_triggers.sql
Executable file
@ -0,0 +1,23 @@
|
||||
-- Type: UNDO
|
||||
-- Name: drop_slow_db_triggers
|
||||
-- Description: Drop some db triggers which are slow and have cascading effect
|
||||
|
||||
BEGIN;
|
||||
|
||||
CREATE TRIGGER label_names_update
|
||||
AFTER UPDATE ON omnivore.labels
|
||||
FOR EACH ROW
|
||||
WHEN (OLD.name <> NEW.name)
|
||||
EXECUTE FUNCTION update_label_names();
|
||||
|
||||
CREATE TRIGGER library_item_highlight_annotations_update
|
||||
AFTER INSERT OR UPDATE OR DELETE ON omnivore.highlight
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION update_library_item_highlight_annotations();
|
||||
|
||||
CREATE TRIGGER library_item_labels_update
|
||||
AFTER INSERT OR UPDATE OR DELETE ON omnivore.entity_labels
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION update_library_item_labels();
|
||||
|
||||
COMMIT;
|
||||
@ -89,7 +89,9 @@ export default function Home(): JSX.Element {
|
||||
// return
|
||||
// }
|
||||
// }
|
||||
router.push(`/home`)
|
||||
const query = window.sessionStorage.getItem('q')
|
||||
router.push(`/home?${query}`)
|
||||
// router.push(`/home`)
|
||||
}, [router, viewerData, article])
|
||||
|
||||
const goPreviousOrHome = useCallback(() => {
|
||||
@ -103,7 +105,9 @@ export default function Home(): JSX.Element {
|
||||
// return
|
||||
// }
|
||||
// }
|
||||
router.push(`/home`)
|
||||
const query = window.sessionStorage.getItem('q')
|
||||
router.push(`/home?${query}`)
|
||||
// router.push(`/home`)
|
||||
}, [router, viewerData, article])
|
||||
|
||||
const actionHandler = useCallback(
|
||||
|
||||
Reference in New Issue
Block a user