diff --git a/.github/workflows/run-tests.yaml b/.github/workflows/run-tests.yaml index 70714c14c..42196640c 100644 --- a/.github/workflows/run-tests.yaml +++ b/.github/workflows/run-tests.yaml @@ -93,6 +93,7 @@ jobs: PG_DB: omnivore_test PG_LOGGER: debug REDIS_URL: redis://localhost:${{ job.services.redis.ports[6379] }} + MQ_REDIS_URL: redis://localhost:${{ job.services.redis.ports[6379] }} build-docker-images: name: Build docker images runs-on: ubuntu-latest diff --git a/packages/api/src/entity/highlight.ts b/packages/api/src/entity/highlight.ts index 356c8bddc..f34ba69e8 100644 --- a/packages/api/src/entity/highlight.ts +++ b/packages/api/src/entity/highlight.ts @@ -35,6 +35,9 @@ export class Highlight { @JoinColumn({ name: 'library_item_id' }) libraryItem!: LibraryItem + @Column('uuid') + libraryItemId!: string + @Column('text') quote?: string | null diff --git a/packages/api/src/entity/library_item.ts b/packages/api/src/entity/library_item.ts index 5765c1469..ce803891f 100644 --- a/packages/api/src/entity/library_item.ts +++ b/packages/api/src/entity/library_item.ts @@ -183,15 +183,6 @@ export class LibraryItem { }) highlights?: Highlight[] - @Column('text', { nullable: true }) - labelNames?: string[] | null - - @Column('text', { nullable: true }) - highlightLabels?: string[] | null - - @Column('text', { nullable: true }) - highlightAnnotations?: string[] | null - @Column('text', { nullable: true }) note?: string | null diff --git a/packages/api/src/jobs/update_db.ts b/packages/api/src/jobs/update_db.ts new file mode 100644 index 000000000..dcde84383 --- /dev/null +++ b/packages/api/src/jobs/update_db.ts @@ -0,0 +1,54 @@ +import { authTrx } from '../repository' + +export const UPDATE_LABELS_JOB = 'update-labels' +export const UPDATE_HIGHLIGHT_JOB = 'update-highlight' + +export interface UpdateLabelsData { + libraryItemId: string + userId: string +} + +export interface UpdateHighlightData { + libraryItemId: string + userId: string +} + +export const updateLabels = async (data: UpdateLabelsData) => { + return authTrx( + async (tx) => + tx.query( + `WITH labels_agg AS ( + SELECT array_agg(DISTINCT l.name) AS names_agg + FROM omnivore.labels l + INNER JOIN omnivore.entity_labels el ON el.label_id = l.id + LEFT JOIN omnivore.highlight h ON h.id = el.highlight_id + WHERE el.library_item_id = $1 OR h.library_item_id = $1 + ) + UPDATE omnivore.library_item li + SET label_names = COALESCE((SELECT names_agg FROM labels_agg), ARRAY[]::TEXT[]) + WHERE li.id = $1`, + [data.libraryItemId] + ), + undefined, + data.userId + ) +} + +export const updateHighlight = async (data: UpdateHighlightData) => { + return authTrx( + async (tx) => + tx.query( + `WITH highlight_agg AS ( + SELECT array_agg(COALESCE(annotation, '')) AS annotation_agg + FROM omnivore.highlight + WHERE library_item_id = $1 + ) + UPDATE omnivore.library_item + SET highlight_annotations = COALESCE((SELECT annotation_agg FROM highlight_agg), ARRAY[]::TEXT[]) + WHERE id = $1`, + [data.libraryItemId] + ), + undefined, + data.userId + ) +} diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index cd89e1c69..ebf6fcfdf 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -2,7 +2,14 @@ /* eslint-disable @typescript-eslint/restrict-template-expressions */ /* eslint-disable @typescript-eslint/require-await */ /* eslint-disable @typescript-eslint/no-misused-promises */ -import { Job, Queue, QueueEvents, Worker, JobType } from 'bullmq' +import { + ConnectionOptions, + Job, + JobType, + Queue, + QueueEvents, + Worker, +} from 'bullmq' import express, { Express } from 'express' import { SnakeNamingStrategy } from 'typeorm-naming-strategies' import { appDataSource } from './data_source' @@ -11,10 +18,16 @@ import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail' import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds' import { refreshFeed } from './jobs/rss/refreshFeed' import { savePageJob } from './jobs/save_page' +import { triggerRule, TRIGGER_RULE_JOB_NAME } from './jobs/trigger_rule' +import { + updateHighlight, + updateLabels, + UPDATE_HIGHLIGHT_JOB, + UPDATE_LABELS_JOB, +} from './jobs/update_db' import { updatePDFContentJob } from './jobs/update_pdf_content' import { redisDataSource } from './redis_data_source' import { CustomTypeOrmLogger } from './utils/logger' -import { triggerRule, TRIGGER_RULE_JOB_NAME } from './jobs/trigger_rule' export const QUEUE_NAME = 'omnivore-backend-queue' @@ -34,6 +47,43 @@ export const getBackendQueue = async (): Promise => { return backendQueue } +export const createWorker = (connection: ConnectionOptions) => + new Worker( + QUEUE_NAME, + async (job: Job) => { + switch (job.name) { + case 'refresh-all-feeds': { + const queue = await getBackendQueue() + const counts = await queue?.getJobCounts('prioritized') + if (counts && counts.wait > 1000) { + return + } + return await refreshAllFeeds(appDataSource) + } + case 'refresh-feed': { + return await refreshFeed(job.data) + } + case 'save-page': { + return savePageJob(job.data, job.attemptsMade) + } + case 'update-pdf-content': { + return updatePDFContentJob(job.data) + } + case THUMBNAIL_JOB: + return findThumbnail(job.data) + case TRIGGER_RULE_JOB_NAME: + return triggerRule(job.data) + case UPDATE_LABELS_JOB: + return updateLabels(job.data) + case UPDATE_HIGHLIGHT_JOB: + return updateHighlight(job.data) + } + }, + { + connection, + } + ) + const main = async () => { console.log('[queue-processor]: starting queue processor') @@ -100,37 +150,7 @@ const main = async () => { throw '[queue-processor] error redis is not initialized' } - const worker = new Worker( - QUEUE_NAME, - async (job: Job) => { - switch (job.name) { - case 'refresh-all-feeds': { - const queue = await getBackendQueue() - const counts = await queue?.getJobCounts('prioritized') - if (counts && counts.wait > 1000) { - return - } - return await refreshAllFeeds(appDataSource) - } - case 'refresh-feed': { - return await refreshFeed(job.data) - } - case 'save-page': { - return savePageJob(job.data, job.attemptsMade) - } - case 'update-pdf-content': { - return updatePDFContentJob(job.data) - } - case THUMBNAIL_JOB: - return findThumbnail(job.data) - case TRIGGER_RULE_JOB_NAME: - return triggerRule(job.data) - } - }, - { - connection: workerRedisClient, - } - ) + const worker = createWorker(workerRedisClient) const queueEvents = new QueueEvents(QUEUE_NAME, { connection: workerRedisClient, diff --git a/packages/api/src/resolvers/article/index.ts b/packages/api/src/resolvers/article/index.ts index 93710f98f..921a01cf8 100644 --- a/packages/api/src/resolvers/article/index.ts +++ b/packages/api/src/resolvers/article/index.ts @@ -701,15 +701,10 @@ export const searchResolver = authorized< const edges = await Promise.all( libraryItems.map(async (libraryItem) => { - if ( - libraryItem.highlightAnnotations && - libraryItem.highlightAnnotations.length > 0 - ) { - libraryItem.highlights = await findHighlightsByLibraryItemId( - libraryItem.id, - uid - ) - } + libraryItem.highlights = await findHighlightsByLibraryItemId( + libraryItem.id, + uid + ) if (params.includeContent && libraryItem.readableContent) { // convert html to the requested format diff --git a/packages/api/src/resolvers/function_resolvers.ts b/packages/api/src/resolvers/function_resolvers.ts index ad3e3ac96..871782068 100644 --- a/packages/api/src/resolvers/function_resolvers.ts +++ b/packages/api/src/resolvers/function_resolvers.ts @@ -334,17 +334,13 @@ export const functionResolvers = { return article.content ? wordsCount(article.content) : undefined }, async labels( - article: { id: string; labels?: Label[]; labelNames?: string[] | null }, + article: { id: string; labels?: Label[] }, _: unknown, ctx: WithDataSourcesContext ) { if (article.labels) return article.labels - if (article.labelNames && article.labelNames.length > 0) { - return findLabelsByLibraryItemId(article.id, ctx.uid) - } - - return [] + return findLabelsByLibraryItemId(article.id, ctx.uid) }, }, Highlight: { @@ -409,17 +405,13 @@ export const functionResolvers = { return item.siteIcon }, async labels( - item: { id: string; labels?: Label[]; labelNames?: string[] | null }, + item: { id: string; labels?: Label[] }, _: unknown, ctx: WithDataSourcesContext ) { if (item.labels) return item.labels - if (item.labelNames && item.labelNames.length > 0) { - return findLabelsByLibraryItemId(item.id, ctx.uid) - } - - return [] + return findLabelsByLibraryItemId(item.id, ctx.uid) }, async recommendations( item: { @@ -446,19 +438,14 @@ export const functionResolvers = { item: { id: string highlights?: Highlight[] - highlightAnnotations?: string[] | null }, _: unknown, ctx: WithDataSourcesContext ) { if (item.highlights) return item.highlights - if (item.highlightAnnotations && item.highlightAnnotations.length > 0) { - const highlights = await findHighlightsByLibraryItemId(item.id, ctx.uid) - return highlights.map(highlightDataToHighlight) - } - - return [] + const highlights = await findHighlightsByLibraryItemId(item.id, ctx.uid) + return highlights.map(highlightDataToHighlight) }, }, Subscription: { diff --git a/packages/api/src/resolvers/labels/index.ts b/packages/api/src/resolvers/labels/index.ts index 6a6860940..cf9d6e3b6 100644 --- a/packages/api/src/resolvers/labels/index.ts +++ b/packages/api/src/resolvers/labels/index.ts @@ -31,6 +31,7 @@ import { import { labelRepository } from '../../repository/label' import { userRepository } from '../../repository/user' import { + deleteLabelById, findOrCreateLabels, saveLabelsInHighlight, saveLabelsInLibraryItem, @@ -118,13 +119,10 @@ export const deleteLabelResolver = authorized< DeleteLabelSuccess, DeleteLabelError, MutationDeleteLabelArgs ->(async (_, { id: labelId }, { authTrx, log, uid }) => { +>(async (_, { id: labelId }, { log, uid }) => { try { - const deleteResult = await authTrx(async (tx) => { - return tx.withRepository(labelRepository).deleteById(labelId) - }) - - if (!deleteResult.affected) { + const deleted = await deleteLabelById(labelId, uid) + if (!deleted) { return { errorCodes: [DeleteLabelErrorCode.NotFound], } @@ -281,7 +279,7 @@ export const setLabelsForHighlightResolver = authorized< } } - // save labels in the library item + // save labels in the highlight await saveLabelsInHighlight(labelsSet, input.highlightId, uid, pubsub) analytics.track({ diff --git a/packages/api/src/services/highlights.ts b/packages/api/src/services/highlights.ts index 2d062418f..436a3dd1a 100644 --- a/packages/api/src/services/highlights.ts +++ b/packages/api/src/services/highlights.ts @@ -8,6 +8,7 @@ import { homePageURL } from '../env' import { createPubSubClient, EntityType } from '../pubsub' import { authTrx } from '../repository' import { highlightRepository } from '../repository/highlight' +import { enqueueUpdateHighlight } from '../utils/createTask' type HighlightEvent = { id: string; pageId: string } type CreateHighlightEvent = DeepPartial & HighlightEvent @@ -61,6 +62,11 @@ export const createHighlight = async ( userId ) + await enqueueUpdateHighlight({ + libraryItemId, + userId, + }) + return newHighlight } @@ -103,6 +109,11 @@ export const mergeHighlights = async ( userId ) + await enqueueUpdateHighlight({ + libraryItemId, + userId, + }) + return newHighlight } @@ -125,17 +136,23 @@ export const updateHighlight = async ( }) }) + const libraryItemId = updatedHighlight.libraryItem.id await pubsub.entityUpdated( EntityType.HIGHLIGHT, - { ...highlight, id: highlightId, pageId: updatedHighlight.libraryItem.id }, + { ...highlight, id: highlightId, pageId: libraryItemId }, userId ) + await enqueueUpdateHighlight({ + libraryItemId, + userId, + }) + return updatedHighlight } export const deleteHighlightById = async (highlightId: string) => { - return authTrx(async (tx) => { + const deletedHighlight = await authTrx(async (tx) => { const highlightRepo = tx.withRepository(highlightRepository) const highlight = await highlightRepo.findOneOrFail({ where: { id: highlightId }, @@ -147,6 +164,13 @@ export const deleteHighlightById = async (highlightId: string) => { await highlightRepo.delete(highlightId) return highlight }) + + await enqueueUpdateHighlight({ + libraryItemId: deletedHighlight.libraryItemId, + userId: deletedHighlight.user.id, + }) + + return deletedHighlight } export const findHighlightById = async ( diff --git a/packages/api/src/services/labels.ts b/packages/api/src/services/labels.ts index 564a1e2da..c4f8d4d13 100644 --- a/packages/api/src/services/labels.ts +++ b/packages/api/src/services/labels.ts @@ -5,6 +5,9 @@ import { Label } from '../entity/label' import { createPubSubClient, EntityType, PubsubClient } from '../pubsub' import { authTrx } from '../repository' import { CreateLabelInput, labelRepository } from '../repository/label' +import { bulkEnqueueUpdateLabels } from '../utils/createTask' +import { findHighlightById } from './highlights' +import { findLibraryItemIdsByLabelId } from './library_item' type AddLabelsToLibraryItemEvent = { pageId: string @@ -120,6 +123,9 @@ export const saveLabelsInLibraryItem = async ( userId ) } + + // update labels in library item + return bulkEnqueueUpdateLabels([{ libraryItemId, userId }]) } export const addLabelsToLibraryItem = async ( @@ -145,6 +151,9 @@ export const addLabelsToLibraryItem = async ( undefined, userId ) + + // update labels in library item + await bulkEnqueueUpdateLabels([{ libraryItemId, userId }]) } export const saveLabelsInHighlight = async ( @@ -176,6 +185,12 @@ export const saveLabelsInHighlight = async ( { highlightId, labels }, userId ) + + const highlight = await findHighlightById(highlightId, userId) + // update labels in library item + await bulkEnqueueUpdateLabels([ + { libraryItemId: highlight.libraryItemId, userId }, + ]) } export const findLabelsByIds = async ( @@ -218,12 +233,32 @@ export const deleteLabels = async ( ) } +export const deleteLabelById = async (labelId: string, userId: string) => { + const libraryItemIds = await findLibraryItemIdsByLabelId(labelId, userId) + + const deleteResult = await authTrx(async (tx) => { + return tx.withRepository(labelRepository).deleteById(labelId) + }) + + if (!deleteResult.affected) { + return false + } + + const data = libraryItemIds.map((libraryItemId) => ({ + libraryItemId, + userId, + })) + await bulkEnqueueUpdateLabels(data) + + return true +} + export const updateLabel = async ( id: string, label: QueryDeepPartialEntity