break save page into different db transactions from one big transaction to reduce lock time
This commit is contained in:
@ -64,7 +64,8 @@ export const createHighlight = async (
|
||||
highlight: DeepPartial<Highlight>,
|
||||
libraryItemId: string,
|
||||
userId: string,
|
||||
pubsub = createPubSubClient()
|
||||
pubsub = createPubSubClient(),
|
||||
updateLibraryItem = true
|
||||
) => {
|
||||
const newHighlight = await authTrx(
|
||||
async (tx) => {
|
||||
@ -90,10 +91,12 @@ export const createHighlight = async (
|
||||
userId
|
||||
)
|
||||
|
||||
await enqueueUpdateHighlight({
|
||||
libraryItemId,
|
||||
userId,
|
||||
})
|
||||
if (updateLibraryItem) {
|
||||
await enqueueUpdateHighlight({
|
||||
libraryItemId,
|
||||
userId,
|
||||
})
|
||||
}
|
||||
|
||||
return newHighlight
|
||||
}
|
||||
@ -214,6 +217,21 @@ export const deleteHighlightById = async (
|
||||
return deletedHighlight
|
||||
}
|
||||
|
||||
export const deleteHighlightByLibraryItemId = async (
|
||||
userId: string,
|
||||
libraryItemId: string
|
||||
) => {
|
||||
await authTrx(
|
||||
async (tx) =>
|
||||
tx.getRepository(Highlight).delete({
|
||||
libraryItemId,
|
||||
}),
|
||||
{
|
||||
uid: userId,
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
export const deleteHighlightsByIds = async (
|
||||
userId: string,
|
||||
highlightIds: string[]
|
||||
|
||||
@ -107,7 +107,8 @@ export const createAndAddLabelsToLibraryItem = async (
|
||||
newLabels.map((l) => l.id),
|
||||
libraryItemId,
|
||||
userId,
|
||||
source
|
||||
source,
|
||||
false
|
||||
)
|
||||
}
|
||||
}
|
||||
@ -191,7 +192,8 @@ export const addLabelsToLibraryItem = async (
|
||||
labelIds: string[],
|
||||
libraryItemId: string,
|
||||
userId: string,
|
||||
source: LabelSource = 'user'
|
||||
source: LabelSource = 'user',
|
||||
updateLibraryItem = true
|
||||
) => {
|
||||
await authTrx(
|
||||
async (tx) => {
|
||||
@ -224,8 +226,10 @@ export const addLabelsToLibraryItem = async (
|
||||
}
|
||||
)
|
||||
|
||||
// update labels in library item
|
||||
await bulkEnqueueUpdateLabels([{ libraryItemId, userId }])
|
||||
if (updateLibraryItem) {
|
||||
// update labels in library item
|
||||
await bulkEnqueueUpdateLabels([{ libraryItemId, userId }])
|
||||
}
|
||||
}
|
||||
|
||||
export const saveLabelsInHighlight = async (
|
||||
@ -294,6 +298,21 @@ export const deleteLabels = async (
|
||||
)
|
||||
}
|
||||
|
||||
export const deleteLabelsByLibraryItemId = async (
|
||||
userId: string,
|
||||
libraryItemId: string
|
||||
) => {
|
||||
return authTrx(
|
||||
async (t) =>
|
||||
t.getRepository(EntityLabel).delete({
|
||||
libraryItemId,
|
||||
}),
|
||||
{
|
||||
uid: userId,
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
export const deleteLabelById = async (labelId: string, userId: string) => {
|
||||
const libraryItemIds = await findLibraryItemIdsByLabelId(labelId, userId)
|
||||
|
||||
|
||||
@ -11,7 +11,6 @@ import {
|
||||
import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity'
|
||||
import { ReadingProgressDataSource } from '../datasources/reading_progress_data_source'
|
||||
import { appDataSource } from '../data_source'
|
||||
import { EntityLabel } from '../entity/entity_label'
|
||||
import { Highlight } from '../entity/highlight'
|
||||
import { Label } from '../entity/label'
|
||||
import { LibraryItem, LibraryItemState } from '../entity/library_item'
|
||||
@ -37,8 +36,12 @@ import {
|
||||
} from '../utils/helpers'
|
||||
import { logger } from '../utils/logger'
|
||||
import { parseSearchQuery } from '../utils/search'
|
||||
import { HighlightEvent } from './highlights'
|
||||
import { addLabelsToLibraryItem, LabelEvent } from './labels'
|
||||
import { deleteHighlightByLibraryItemId, HighlightEvent } from './highlights'
|
||||
import {
|
||||
addLabelsToLibraryItem,
|
||||
deleteLabelsByLibraryItemId,
|
||||
LabelEvent,
|
||||
} from './labels'
|
||||
|
||||
const columnsToDelete = [
|
||||
'user',
|
||||
@ -1064,81 +1067,98 @@ export const createOrUpdateLibraryItem = async (
|
||||
pubsub = createPubSubClient(),
|
||||
skipPubSub = false
|
||||
): Promise<LibraryItem> => {
|
||||
const newLibraryItem = await authTrx(
|
||||
async (tx) => {
|
||||
const repo = tx.withRepository(libraryItemRepository)
|
||||
// find existing library item by user_id and url for update
|
||||
const existingLibraryItem = await repo.findByUserIdAndUrl(
|
||||
userId,
|
||||
libraryItem.originalUrl,
|
||||
true
|
||||
)
|
||||
let libraryItemCreated: LibraryItem
|
||||
|
||||
if (existingLibraryItem) {
|
||||
const id = existingLibraryItem.id
|
||||
const existingLibraryItem = await authTrx(
|
||||
async (tx) =>
|
||||
tx
|
||||
.withRepository(libraryItemRepository)
|
||||
.findByUserIdAndUrl(userId, libraryItem.originalUrl),
|
||||
{ uid: userId }
|
||||
)
|
||||
|
||||
try {
|
||||
// delete labels and highlights if the item was deleted
|
||||
if (existingLibraryItem.state === LibraryItemState.Deleted) {
|
||||
logger.info('Deleting labels and highlights for item', {
|
||||
id,
|
||||
})
|
||||
await tx.getRepository(Highlight).delete({
|
||||
libraryItem: { id: existingLibraryItem.id },
|
||||
})
|
||||
if (existingLibraryItem) {
|
||||
const id = existingLibraryItem.id
|
||||
|
||||
await tx.getRepository(EntityLabel).delete({
|
||||
libraryItemId: existingLibraryItem.id,
|
||||
})
|
||||
try {
|
||||
// delete labels and highlights if the item was deleted
|
||||
if (existingLibraryItem.state === LibraryItemState.Deleted) {
|
||||
logger.info('Deleting labels and highlights for item', {
|
||||
id,
|
||||
})
|
||||
|
||||
libraryItem.labelNames = []
|
||||
libraryItem.highlightAnnotations = []
|
||||
}
|
||||
} catch (error) {
|
||||
// continue to save the item even if we failed to delete labels and highlights
|
||||
logger.error('Failed to delete labels and highlights', error)
|
||||
if (existingLibraryItem.highlightAnnotations?.length) {
|
||||
await deleteHighlightByLibraryItemId(userId, id)
|
||||
existingLibraryItem.highlightAnnotations = []
|
||||
}
|
||||
|
||||
// update existing library item
|
||||
const newItem = await repo.save({
|
||||
if (existingLibraryItem.labelNames?.length) {
|
||||
await deleteLabelsByLibraryItemId(userId, id)
|
||||
existingLibraryItem.labelNames = []
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
// continue to save the item even if we failed to delete labels and highlights
|
||||
logger.error('Failed to delete labels and highlights', error)
|
||||
}
|
||||
|
||||
const existingLabels = existingLibraryItem.labelNames || []
|
||||
const newLabels = libraryItem.labelNames || []
|
||||
const combinedLabels = [...new Set([...existingLabels, ...newLabels])]
|
||||
|
||||
const existingHighlights = existingLibraryItem.highlightAnnotations || []
|
||||
const newHighlights = libraryItem.highlightAnnotations || []
|
||||
const combinedHighlights = [...existingHighlights, ...newHighlights]
|
||||
|
||||
// update existing library item
|
||||
libraryItemCreated = await authTrx(
|
||||
async (tx) =>
|
||||
tx.getRepository(LibraryItem).save({
|
||||
...libraryItem,
|
||||
id,
|
||||
slug: existingLibraryItem.slug, // keep the original slug
|
||||
})
|
||||
|
||||
// delete the new item if it's different from the existing one
|
||||
if (libraryItem.id && libraryItem.id !== id) {
|
||||
await repo.delete(libraryItem.id)
|
||||
}
|
||||
|
||||
return newItem
|
||||
labelNames: combinedLabels,
|
||||
highlightAnnotations: combinedHighlights,
|
||||
}),
|
||||
{
|
||||
uid: userId,
|
||||
}
|
||||
)
|
||||
|
||||
// create or update library item
|
||||
return repo.upsertLibraryItemById(libraryItem)
|
||||
},
|
||||
{
|
||||
uid: userId,
|
||||
// delete the new item if it's different from the existing one
|
||||
if (libraryItem.id && libraryItem.id !== id) {
|
||||
await deleteLibraryItemById(libraryItem.id, userId)
|
||||
}
|
||||
)
|
||||
} else {
|
||||
// upsert library item
|
||||
libraryItemCreated = await authTrx(
|
||||
async (tx) =>
|
||||
tx
|
||||
.withRepository(libraryItemRepository)
|
||||
.upsertLibraryItemById(libraryItem),
|
||||
{
|
||||
uid: userId,
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
// set recently saved item in redis if redis is enabled
|
||||
if (redisDataSource.redisClient) {
|
||||
await setRecentlySavedItemInRedis(
|
||||
redisDataSource.redisClient,
|
||||
userId,
|
||||
newLibraryItem.originalUrl
|
||||
libraryItemCreated.originalUrl
|
||||
)
|
||||
}
|
||||
|
||||
if (skipPubSub || libraryItem.state === LibraryItemState.Processing) {
|
||||
return newLibraryItem
|
||||
return libraryItemCreated
|
||||
}
|
||||
|
||||
const data = deepDelete(newLibraryItem, columnsToDelete)
|
||||
const data = deepDelete(libraryItemCreated, columnsToDelete)
|
||||
await pubsub.entityCreated<ItemEvent>(EntityType.ITEM, data, userId)
|
||||
|
||||
return newLibraryItem
|
||||
return libraryItemCreated
|
||||
}
|
||||
|
||||
export const findLibraryItemsByPrefix = async (
|
||||
|
||||
@ -2,15 +2,24 @@ import { LibraryItemState } from '../entity/library_item'
|
||||
import { User } from '../entity/user'
|
||||
import { homePageURL } from '../env'
|
||||
import { SaveErrorCode, SaveFileInput, SaveResult } from '../generated/graphql'
|
||||
import { logger } from '../utils/logger'
|
||||
import { getStorageFileDetails } from '../utils/uploads'
|
||||
import { createAndAddLabelsToLibraryItem } from './labels'
|
||||
import { updateLibraryItem } from './library_item'
|
||||
import { findLibraryItemById, updateLibraryItem } from './library_item'
|
||||
import { findUploadFileById, setFileUploadComplete } from './upload_file'
|
||||
|
||||
export const saveFile = async (
|
||||
input: SaveFileInput,
|
||||
user: User
|
||||
): Promise<SaveResult> => {
|
||||
const libraryItem = await findLibraryItemById(input.clientRequestId, user.id)
|
||||
if (!libraryItem) {
|
||||
return {
|
||||
__typename: 'SaveError',
|
||||
errorCodes: [SaveErrorCode.Unauthorized],
|
||||
}
|
||||
}
|
||||
|
||||
const uploadFile = await findUploadFileById(input.uploadFileId)
|
||||
if (!uploadFile) {
|
||||
return {
|
||||
@ -30,27 +39,46 @@ export const saveFile = async (
|
||||
}
|
||||
}
|
||||
|
||||
await updateLibraryItem(
|
||||
input.clientRequestId,
|
||||
{
|
||||
state:
|
||||
(input.state as unknown as LibraryItemState) ||
|
||||
LibraryItemState.Succeeded,
|
||||
folder: input.folder || undefined,
|
||||
savedAt: input.savedAt ? new Date(input.savedAt) : undefined,
|
||||
publishedAt: input.publishedAt ? new Date(input.publishedAt) : undefined,
|
||||
labelNames: input.labels?.map((label) => label.name) || undefined,
|
||||
},
|
||||
user.id
|
||||
)
|
||||
try {
|
||||
const existingLabels = libraryItem.labelNames || []
|
||||
const newLabels = input.labels?.map((l) => l.name) || []
|
||||
const combinedLabels = [...new Set([...existingLabels, ...newLabels])]
|
||||
|
||||
// add labels to item
|
||||
await createAndAddLabelsToLibraryItem(
|
||||
input.clientRequestId,
|
||||
user.id,
|
||||
input.labels,
|
||||
input.subscription
|
||||
)
|
||||
await updateLibraryItem(
|
||||
input.clientRequestId,
|
||||
{
|
||||
state:
|
||||
(input.state as unknown as LibraryItemState) ||
|
||||
LibraryItemState.Succeeded,
|
||||
folder: input.folder || undefined,
|
||||
savedAt: input.savedAt ? new Date(input.savedAt) : undefined,
|
||||
publishedAt: input.publishedAt
|
||||
? new Date(input.publishedAt)
|
||||
: undefined,
|
||||
labelNames: combinedLabels,
|
||||
},
|
||||
user.id
|
||||
)
|
||||
|
||||
// add labels to item
|
||||
await createAndAddLabelsToLibraryItem(
|
||||
input.clientRequestId,
|
||||
user.id,
|
||||
input.labels,
|
||||
input.subscription
|
||||
)
|
||||
} catch (error) {
|
||||
logger.error('Failed to update library item', {
|
||||
error,
|
||||
clientRequestId: input.clientRequestId,
|
||||
userId: user.id,
|
||||
})
|
||||
|
||||
return {
|
||||
__typename: 'SaveError',
|
||||
errorCodes: [SaveErrorCode.Unknown],
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
clientRequestId: input.clientRequestId,
|
||||
|
||||
@ -169,7 +169,13 @@ export const savePage = async (
|
||||
|
||||
// merge highlights
|
||||
try {
|
||||
await createHighlight(highlight, clientRequestId, user.id)
|
||||
await createHighlight(
|
||||
highlight,
|
||||
clientRequestId,
|
||||
user.id,
|
||||
undefined,
|
||||
false
|
||||
)
|
||||
} catch (error) {
|
||||
logger.error('Failed to create highlight', {
|
||||
highlight,
|
||||
@ -189,7 +195,6 @@ export const savePage = async (
|
||||
export const parsedContentToLibraryItem = ({
|
||||
url,
|
||||
userId,
|
||||
originalHtml,
|
||||
itemId,
|
||||
parsedContent,
|
||||
slug,
|
||||
|
||||
@ -637,7 +637,7 @@ export const enqueueThumbnailJob = async (
|
||||
return queue.add(THUMBNAIL_JOB, payload, {
|
||||
priority: getJobPriority(THUMBNAIL_JOB),
|
||||
attempts: 1,
|
||||
delay: 5000,
|
||||
delay: 1000,
|
||||
})
|
||||
}
|
||||
|
||||
@ -701,7 +701,7 @@ export const enqueueTriggerRuleJob = async (data: TriggerRuleJobData) => {
|
||||
return queue.add(TRIGGER_RULE_JOB_NAME, data, {
|
||||
priority: getJobPriority(TRIGGER_RULE_JOB_NAME),
|
||||
attempts: 1,
|
||||
delay: 3000,
|
||||
delay: 500,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@ -404,7 +404,9 @@ describe('Article API', () => {
|
||||
archivedAt: new Date(),
|
||||
state: LibraryItemState.Archived,
|
||||
},
|
||||
user.id
|
||||
user.id,
|
||||
undefined,
|
||||
true
|
||||
)
|
||||
itemId = item.id
|
||||
})
|
||||
@ -718,7 +720,12 @@ describe('Article API', () => {
|
||||
originalUrl: 'https://blog.omnivore.app/setBookmarkArticle',
|
||||
slug: 'test-with-omnivore',
|
||||
}
|
||||
const item = await createOrUpdateLibraryItem(itemToSave, user.id)
|
||||
const item = await createOrUpdateLibraryItem(
|
||||
itemToSave,
|
||||
user.id,
|
||||
undefined,
|
||||
true
|
||||
)
|
||||
itemId = item.id
|
||||
})
|
||||
|
||||
@ -832,7 +839,9 @@ describe('Article API', () => {
|
||||
readingProgressBottomPercent: 100,
|
||||
readingProgressTopPercent: 80,
|
||||
},
|
||||
user.id
|
||||
user.id,
|
||||
undefined,
|
||||
true
|
||||
)
|
||||
).id
|
||||
})
|
||||
@ -873,7 +882,9 @@ describe('Article API', () => {
|
||||
readingProgressBottomPercent: 100,
|
||||
readingProgressTopPercent: 80,
|
||||
},
|
||||
user.id
|
||||
user.id,
|
||||
undefined,
|
||||
true
|
||||
)
|
||||
itemId = item.id
|
||||
|
||||
@ -948,7 +959,12 @@ describe('Article API', () => {
|
||||
siteName: 'Example',
|
||||
readingProgressBottomPercent: readingProgressArray[i],
|
||||
}
|
||||
const item = await createOrUpdateLibraryItem(itemToSave, user.id)
|
||||
const item = await createOrUpdateLibraryItem(
|
||||
itemToSave,
|
||||
user.id,
|
||||
undefined,
|
||||
true
|
||||
)
|
||||
items.push(item)
|
||||
|
||||
// Create some test highlights
|
||||
@ -2007,7 +2023,12 @@ describe('Article API', () => {
|
||||
slug: '',
|
||||
originalUrl: `https://blog.omnivore.app/p/typeahead-search-${i}`,
|
||||
}
|
||||
const item = await createOrUpdateLibraryItem(itemToSave, user.id)
|
||||
const item = await createOrUpdateLibraryItem(
|
||||
itemToSave,
|
||||
user.id,
|
||||
undefined,
|
||||
true
|
||||
)
|
||||
items.push(item)
|
||||
}
|
||||
})
|
||||
@ -2081,7 +2102,12 @@ describe('Article API', () => {
|
||||
originalUrl: `https://blog.omnivore.app/p/updates-since-${i}`,
|
||||
user,
|
||||
}
|
||||
const item = await createOrUpdateLibraryItem(itemToSave, user.id)
|
||||
const item = await createOrUpdateLibraryItem(
|
||||
itemToSave,
|
||||
user.id,
|
||||
undefined,
|
||||
true
|
||||
)
|
||||
items.push(item)
|
||||
}
|
||||
|
||||
@ -2205,7 +2231,9 @@ describe('Article API', () => {
|
||||
i == 0 ? LibraryItemState.Failed : LibraryItemState.Succeeded,
|
||||
originalUrl: `https://blog.omnivore.app/p/bulk-action-${i}`,
|
||||
},
|
||||
user.id
|
||||
user.id,
|
||||
undefined,
|
||||
true
|
||||
)
|
||||
}
|
||||
})
|
||||
@ -2298,7 +2326,9 @@ describe('Article API', () => {
|
||||
i == 0 ? LibraryItemState.Failed : LibraryItemState.Succeeded,
|
||||
originalUrl: `https://blog.omnivore.app/p/bulk-action-${i}`,
|
||||
},
|
||||
user.id
|
||||
user.id,
|
||||
undefined,
|
||||
true
|
||||
)
|
||||
items.push(item)
|
||||
}
|
||||
@ -2340,7 +2370,9 @@ describe('Article API', () => {
|
||||
slug: '',
|
||||
originalUrl: `https://blog.omnivore.app/p/bulk-action-${i}`,
|
||||
},
|
||||
user.id
|
||||
user.id,
|
||||
undefined,
|
||||
true
|
||||
)
|
||||
|
||||
items.push(item)
|
||||
@ -2394,7 +2426,12 @@ describe('Article API', () => {
|
||||
readableContent: '<p>test</p>',
|
||||
originalUrl: `https://blog.omnivore.app/p/setFavoriteArticle`,
|
||||
}
|
||||
const item = await createOrUpdateLibraryItem(itemToSave, user.id)
|
||||
const item = await createOrUpdateLibraryItem(
|
||||
itemToSave,
|
||||
user.id,
|
||||
undefined,
|
||||
true
|
||||
)
|
||||
articleId = item.id
|
||||
})
|
||||
|
||||
@ -2445,7 +2482,12 @@ describe('Article API', () => {
|
||||
deletedAt: new Date(),
|
||||
state: LibraryItemState.Deleted,
|
||||
}
|
||||
const item = await createOrUpdateLibraryItem(itemToSave, user.id)
|
||||
const item = await createOrUpdateLibraryItem(
|
||||
itemToSave,
|
||||
user.id,
|
||||
undefined,
|
||||
true
|
||||
)
|
||||
items.push(item)
|
||||
}
|
||||
})
|
||||
|
||||
@ -37,7 +37,9 @@ export const stopWorker = async () => {
|
||||
}
|
||||
|
||||
export const waitUntilJobsDone = async (jobs: Job[]) => {
|
||||
await Promise.all(jobs.map((job) => job.waitUntilFinished(queueEvents)))
|
||||
await Promise.all(
|
||||
jobs.map((job) => job.waitUntilFinished(queueEvents, 10000))
|
||||
)
|
||||
}
|
||||
|
||||
export const graphqlRequest = (
|
||||
|
||||
Reference in New Issue
Block a user