create a job for update labels in library item
This commit is contained in:
@ -1,20 +1,18 @@
|
||||
import { authTrx } from '../repository'
|
||||
|
||||
export const UPDATE_LABELS_IN_LIBRARY_ITEM_JOB = 'update-labels-in-library-item'
|
||||
export const UPDATE_LABELS_JOB = 'update-labels'
|
||||
|
||||
export interface UpdateLabelsInLibraryItemData {
|
||||
export interface UpdateLabelsData {
|
||||
libraryItemId: string
|
||||
userId: string
|
||||
}
|
||||
|
||||
export const updateLabelsInLibraryItem = async (
|
||||
data: UpdateLabelsInLibraryItemData
|
||||
) => {
|
||||
export const updateLabels = async (data: UpdateLabelsData) => {
|
||||
return authTrx(
|
||||
async (tx) =>
|
||||
tx.query(
|
||||
`WITH labels_agg AS (
|
||||
SELECT array_agg(l.name) AS names_agg
|
||||
SELECT array_agg(DISTINCT l.name) AS names_agg
|
||||
FROM omnivore.labels l
|
||||
INNER JOIN omnivore.entity_labels el ON el.label_id = l.id
|
||||
LEFT JOIN omnivore.highlight h ON h.id = el.highlight_id
|
||||
|
||||
@ -12,10 +12,7 @@ import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds'
|
||||
import { refreshFeed } from './jobs/rss/refreshFeed'
|
||||
import { savePageJob } from './jobs/save_page'
|
||||
import { triggerRule, TRIGGER_RULE_JOB_NAME } from './jobs/trigger_rule'
|
||||
import {
|
||||
updateLabelsInLibraryItem,
|
||||
UPDATE_LABELS_IN_LIBRARY_ITEM_JOB,
|
||||
} from './jobs/update_db'
|
||||
import { updateLabels, UPDATE_LABELS_JOB } from './jobs/update_db'
|
||||
import { updatePDFContentJob } from './jobs/update_pdf_content'
|
||||
import { redisDataSource } from './redis_data_source'
|
||||
import { CustomTypeOrmLogger } from './utils/logger'
|
||||
@ -129,8 +126,8 @@ const main = async () => {
|
||||
return findThumbnail(job.data)
|
||||
case TRIGGER_RULE_JOB_NAME:
|
||||
return triggerRule(job.data)
|
||||
case UPDATE_LABELS_IN_LIBRARY_ITEM_JOB:
|
||||
return updateLabelsInLibraryItem(job.data)
|
||||
case UPDATE_LABELS_JOB:
|
||||
return updateLabels(job.data)
|
||||
}
|
||||
},
|
||||
{
|
||||
|
||||
@ -31,6 +31,7 @@ import {
|
||||
import { labelRepository } from '../../repository/label'
|
||||
import { userRepository } from '../../repository/user'
|
||||
import {
|
||||
deleteLabelById,
|
||||
findOrCreateLabels,
|
||||
saveLabelsInHighlight,
|
||||
saveLabelsInLibraryItem,
|
||||
@ -118,13 +119,10 @@ export const deleteLabelResolver = authorized<
|
||||
DeleteLabelSuccess,
|
||||
DeleteLabelError,
|
||||
MutationDeleteLabelArgs
|
||||
>(async (_, { id: labelId }, { authTrx, log, uid }) => {
|
||||
>(async (_, { id: labelId }, { log, uid }) => {
|
||||
try {
|
||||
const deleteResult = await authTrx(async (tx) => {
|
||||
return tx.withRepository(labelRepository).deleteById(labelId)
|
||||
})
|
||||
|
||||
if (!deleteResult.affected) {
|
||||
const deleted = await deleteLabelById(labelId, uid)
|
||||
if (!deleted) {
|
||||
return {
|
||||
errorCodes: [DeleteLabelErrorCode.NotFound],
|
||||
}
|
||||
@ -281,7 +279,7 @@ export const setLabelsForHighlightResolver = authorized<
|
||||
}
|
||||
}
|
||||
|
||||
// save labels in the library item
|
||||
// save labels in the highlight
|
||||
await saveLabelsInHighlight(labelsSet, input.highlightId, uid, pubsub)
|
||||
|
||||
analytics.track({
|
||||
|
||||
@ -5,8 +5,9 @@ import { Label } from '../entity/label'
|
||||
import { createPubSubClient, EntityType, PubsubClient } from '../pubsub'
|
||||
import { authTrx } from '../repository'
|
||||
import { CreateLabelInput, labelRepository } from '../repository/label'
|
||||
import { enqueueUpdateLabelsInLibraryItem } from '../utils/createTask'
|
||||
import { bulkEnqueueUpdateLabels } from '../utils/createTask'
|
||||
import { logger } from '../utils/logger'
|
||||
import { findLibraryItemIdsByLabelId } from './library_item'
|
||||
|
||||
type AddLabelsToLibraryItemEvent = {
|
||||
pageId: string
|
||||
@ -124,8 +125,8 @@ export const saveLabelsInLibraryItem = async (
|
||||
}
|
||||
|
||||
// update labels in library item
|
||||
const job = await enqueueUpdateLabelsInLibraryItem({ libraryItemId, userId })
|
||||
logger.info('update labels in library item job created', job)
|
||||
const jobs = await bulkEnqueueUpdateLabels([{ libraryItemId, userId }])
|
||||
logger.info('update labels jobs enqueued', jobs)
|
||||
}
|
||||
|
||||
export const addLabelsToLibraryItem = async (
|
||||
@ -151,6 +152,10 @@ export const addLabelsToLibraryItem = async (
|
||||
undefined,
|
||||
userId
|
||||
)
|
||||
|
||||
// update labels in library item
|
||||
const jobs = await bulkEnqueueUpdateLabels([{ libraryItemId, userId }])
|
||||
logger.info('update labels jobs enqueued', jobs)
|
||||
}
|
||||
|
||||
export const saveLabelsInHighlight = async (
|
||||
@ -224,12 +229,33 @@ export const deleteLabels = async (
|
||||
)
|
||||
}
|
||||
|
||||
export const deleteLabelById = async (labelId: string, userId: string) => {
|
||||
const libraryItemIds = await findLibraryItemIdsByLabelId(labelId, userId)
|
||||
|
||||
const deleteResult = await authTrx(async (tx) => {
|
||||
return tx.withRepository(labelRepository).deleteById(labelId)
|
||||
})
|
||||
|
||||
if (!deleteResult.affected) {
|
||||
return false
|
||||
}
|
||||
|
||||
const data = libraryItemIds.map((libraryItemId) => ({
|
||||
libraryItemId,
|
||||
userId,
|
||||
}))
|
||||
const jobs = await bulkEnqueueUpdateLabels(data)
|
||||
logger.info('update labels jobs enqueued', jobs)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
export const updateLabel = async (
|
||||
id: string,
|
||||
label: QueryDeepPartialEntity<Label>,
|
||||
userId: string
|
||||
) => {
|
||||
return authTrx(
|
||||
const updatedLabel = await authTrx(
|
||||
async (t) => {
|
||||
const repo = t.withRepository(labelRepository)
|
||||
await repo.updateLabel(id, label)
|
||||
@ -239,6 +265,19 @@ export const updateLabel = async (
|
||||
undefined,
|
||||
userId
|
||||
)
|
||||
|
||||
if (label.name) {
|
||||
const libraryItemIds = await findLibraryItemIdsByLabelId(id, userId)
|
||||
|
||||
const data = libraryItemIds.map((libraryItemId) => ({
|
||||
libraryItemId,
|
||||
userId,
|
||||
}))
|
||||
const jobs = await bulkEnqueueUpdateLabels(data)
|
||||
logger.info('update labels jobs enqueued', jobs)
|
||||
}
|
||||
|
||||
return updatedLabel
|
||||
}
|
||||
|
||||
export const findLabelsByUserId = async (userId: string): Promise<Label[]> => {
|
||||
|
||||
@ -1132,3 +1132,36 @@ export const batchDelete = async (criteria: FindOptionsWhere<LibraryItem>) => {
|
||||
|
||||
return authTrx(async (t) => t.query(sql))
|
||||
}
|
||||
|
||||
export const findLibraryItemIdsByLabelId = async (
|
||||
labelId: string,
|
||||
userId: string
|
||||
) => {
|
||||
return authTrx(
|
||||
async (tx) => {
|
||||
// find library items have the label or have highlights with the label
|
||||
const result = (await tx.query(
|
||||
`
|
||||
SELECT library_item_id
|
||||
FROM (
|
||||
SELECT library_item_id
|
||||
FROM omnivore.entity_labels
|
||||
WHERE label_id = $1
|
||||
AND library_item_id IS NOT NULL
|
||||
UNION
|
||||
SELECT h.library_item_id
|
||||
FROM omnivore.highlight h
|
||||
INNER JOIN omnivore.entity_labels ON entity_labels.highlight_id = h.id
|
||||
WHERE label_id = $1
|
||||
AND highlight_id IS NOT NULL
|
||||
) AS combined_results
|
||||
`,
|
||||
[labelId]
|
||||
)) as { library_item_id: string }[]
|
||||
|
||||
return result.map((r) => r.library_item_id)
|
||||
},
|
||||
undefined,
|
||||
userId
|
||||
)
|
||||
}
|
||||
|
||||
@ -17,10 +17,7 @@ import {
|
||||
import { THUMBNAIL_JOB } from '../jobs/find_thumbnail'
|
||||
import { queueRSSRefreshFeedJob } from '../jobs/rss/refreshAllFeeds'
|
||||
import { TriggerRuleJobData, TRIGGER_RULE_JOB_NAME } from '../jobs/trigger_rule'
|
||||
import {
|
||||
UpdateLabelsInLibraryItemData,
|
||||
UPDATE_LABELS_IN_LIBRARY_ITEM_JOB,
|
||||
} from '../jobs/update_db'
|
||||
import { UpdateLabelsData, UPDATE_LABELS_JOB } from '../jobs/update_db'
|
||||
import { getBackendQueue } from '../queue-processor'
|
||||
import { redisDataSource } from '../redis_data_source'
|
||||
import { signFeatureToken } from '../services/features'
|
||||
@ -664,17 +661,25 @@ export const enqueueTriggerRuleJob = async (data: TriggerRuleJobData) => {
|
||||
})
|
||||
}
|
||||
|
||||
export const enqueueUpdateLabelsInLibraryItem = async (
|
||||
data: UpdateLabelsInLibraryItemData
|
||||
) => {
|
||||
export const bulkEnqueueUpdateLabels = async (data: UpdateLabelsData[]) => {
|
||||
const queue = await getBackendQueue()
|
||||
if (!queue) {
|
||||
return undefined
|
||||
}
|
||||
|
||||
return queue.add(UPDATE_LABELS_IN_LIBRARY_ITEM_JOB, data, {
|
||||
priority: 1,
|
||||
})
|
||||
const jobs = data.map((d) => ({
|
||||
name: UPDATE_LABELS_JOB,
|
||||
data: d,
|
||||
opts: {
|
||||
priority: 1,
|
||||
},
|
||||
}))
|
||||
|
||||
try {
|
||||
return queue.addBulk(jobs)
|
||||
} catch (error) {
|
||||
logger.error('error enqueuing update labels jobs', error)
|
||||
}
|
||||
}
|
||||
|
||||
export default createHttpTaskWithToken
|
||||
|
||||
Reference in New Issue
Block a user