From 5e883cb2ba12ff0ae44cd3ef5608794fb0411546 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Wed, 31 Jan 2024 17:45:49 +0800 Subject: [PATCH] running worker in the test --- .github/workflows/run-tests.yaml | 1 + packages/api/src/entity/highlight.ts | 3 + packages/api/src/entity/library_item.ts | 9 -- packages/api/src/queue-processor.ts | 82 +++++++++++-------- packages/api/src/resolvers/article/index.ts | 13 +-- .../api/src/resolvers/function_resolvers.ts | 25 ++---- packages/api/src/services/highlights.ts | 46 ++++------- packages/api/src/services/labels.ts | 45 ++++------ packages/api/src/services/library_item.ts | 28 +++---- packages/api/test/global-setup.ts | 7 +- packages/api/test/global-teardown.ts | 7 +- packages/api/test/resolvers/article.test.ts | 15 +++- packages/api/test/resolvers/highlight.test.ts | 11 ++- packages/api/test/util.ts | 15 ++++ 14 files changed, 152 insertions(+), 155 deletions(-) diff --git a/.github/workflows/run-tests.yaml b/.github/workflows/run-tests.yaml index 70714c14c..42196640c 100644 --- a/.github/workflows/run-tests.yaml +++ b/.github/workflows/run-tests.yaml @@ -93,6 +93,7 @@ jobs: PG_DB: omnivore_test PG_LOGGER: debug REDIS_URL: redis://localhost:${{ job.services.redis.ports[6379] }} + MQ_REDIS_URL: redis://localhost:${{ job.services.redis.ports[6379] }} build-docker-images: name: Build docker images runs-on: ubuntu-latest diff --git a/packages/api/src/entity/highlight.ts b/packages/api/src/entity/highlight.ts index 356c8bddc..f34ba69e8 100644 --- a/packages/api/src/entity/highlight.ts +++ b/packages/api/src/entity/highlight.ts @@ -35,6 +35,9 @@ export class Highlight { @JoinColumn({ name: 'library_item_id' }) libraryItem!: LibraryItem + @Column('uuid') + libraryItemId!: string + @Column('text') quote?: string | null diff --git a/packages/api/src/entity/library_item.ts b/packages/api/src/entity/library_item.ts index 5765c1469..ce803891f 100644 --- a/packages/api/src/entity/library_item.ts +++ b/packages/api/src/entity/library_item.ts @@ -183,15 +183,6 @@ export class LibraryItem { }) highlights?: Highlight[] - @Column('text', { nullable: true }) - labelNames?: string[] | null - - @Column('text', { nullable: true }) - highlightLabels?: string[] | null - - @Column('text', { nullable: true }) - highlightAnnotations?: string[] | null - @Column('text', { nullable: true }) note?: string | null diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index f80a9d8ce..ebf6fcfdf 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -2,7 +2,14 @@ /* eslint-disable @typescript-eslint/restrict-template-expressions */ /* eslint-disable @typescript-eslint/require-await */ /* eslint-disable @typescript-eslint/no-misused-promises */ -import { Job, JobType, Queue, QueueEvents, Worker } from 'bullmq' +import { + ConnectionOptions, + Job, + JobType, + Queue, + QueueEvents, + Worker, +} from 'bullmq' import express, { Express } from 'express' import { SnakeNamingStrategy } from 'typeorm-naming-strategies' import { appDataSource } from './data_source' @@ -40,6 +47,43 @@ export const getBackendQueue = async (): Promise => { return backendQueue } +export const createWorker = (connection: ConnectionOptions) => + new Worker( + QUEUE_NAME, + async (job: Job) => { + switch (job.name) { + case 'refresh-all-feeds': { + const queue = await getBackendQueue() + const counts = await queue?.getJobCounts('prioritized') + if (counts && counts.wait > 1000) { + return + } + return await refreshAllFeeds(appDataSource) + } + case 'refresh-feed': { + return await refreshFeed(job.data) + } + case 'save-page': { + return savePageJob(job.data, job.attemptsMade) + } + case 'update-pdf-content': { + return updatePDFContentJob(job.data) + } + case THUMBNAIL_JOB: + return findThumbnail(job.data) + case TRIGGER_RULE_JOB_NAME: + return triggerRule(job.data) + case UPDATE_LABELS_JOB: + return updateLabels(job.data) + case UPDATE_HIGHLIGHT_JOB: + return updateHighlight(job.data) + } + }, + { + connection, + } + ) + const main = async () => { console.log('[queue-processor]: starting queue processor') @@ -106,41 +150,7 @@ const main = async () => { throw '[queue-processor] error redis is not initialized' } - const worker = new Worker( - QUEUE_NAME, - async (job: Job) => { - switch (job.name) { - case 'refresh-all-feeds': { - const queue = await getBackendQueue() - const counts = await queue?.getJobCounts('prioritized') - if (counts && counts.wait > 1000) { - return - } - return await refreshAllFeeds(appDataSource) - } - case 'refresh-feed': { - return await refreshFeed(job.data) - } - case 'save-page': { - return savePageJob(job.data, job.attemptsMade) - } - case 'update-pdf-content': { - return updatePDFContentJob(job.data) - } - case THUMBNAIL_JOB: - return findThumbnail(job.data) - case TRIGGER_RULE_JOB_NAME: - return triggerRule(job.data) - case UPDATE_LABELS_JOB: - return updateLabels(job.data) - case UPDATE_HIGHLIGHT_JOB: - return updateHighlight(job.data) - } - }, - { - connection: workerRedisClient, - } - ) + const worker = createWorker(workerRedisClient) const queueEvents = new QueueEvents(QUEUE_NAME, { connection: workerRedisClient, diff --git a/packages/api/src/resolvers/article/index.ts b/packages/api/src/resolvers/article/index.ts index 93710f98f..921a01cf8 100644 --- a/packages/api/src/resolvers/article/index.ts +++ b/packages/api/src/resolvers/article/index.ts @@ -701,15 +701,10 @@ export const searchResolver = authorized< const edges = await Promise.all( libraryItems.map(async (libraryItem) => { - if ( - libraryItem.highlightAnnotations && - libraryItem.highlightAnnotations.length > 0 - ) { - libraryItem.highlights = await findHighlightsByLibraryItemId( - libraryItem.id, - uid - ) - } + libraryItem.highlights = await findHighlightsByLibraryItemId( + libraryItem.id, + uid + ) if (params.includeContent && libraryItem.readableContent) { // convert html to the requested format diff --git a/packages/api/src/resolvers/function_resolvers.ts b/packages/api/src/resolvers/function_resolvers.ts index ad3e3ac96..871782068 100644 --- a/packages/api/src/resolvers/function_resolvers.ts +++ b/packages/api/src/resolvers/function_resolvers.ts @@ -334,17 +334,13 @@ export const functionResolvers = { return article.content ? wordsCount(article.content) : undefined }, async labels( - article: { id: string; labels?: Label[]; labelNames?: string[] | null }, + article: { id: string; labels?: Label[] }, _: unknown, ctx: WithDataSourcesContext ) { if (article.labels) return article.labels - if (article.labelNames && article.labelNames.length > 0) { - return findLabelsByLibraryItemId(article.id, ctx.uid) - } - - return [] + return findLabelsByLibraryItemId(article.id, ctx.uid) }, }, Highlight: { @@ -409,17 +405,13 @@ export const functionResolvers = { return item.siteIcon }, async labels( - item: { id: string; labels?: Label[]; labelNames?: string[] | null }, + item: { id: string; labels?: Label[] }, _: unknown, ctx: WithDataSourcesContext ) { if (item.labels) return item.labels - if (item.labelNames && item.labelNames.length > 0) { - return findLabelsByLibraryItemId(item.id, ctx.uid) - } - - return [] + return findLabelsByLibraryItemId(item.id, ctx.uid) }, async recommendations( item: { @@ -446,19 +438,14 @@ export const functionResolvers = { item: { id: string highlights?: Highlight[] - highlightAnnotations?: string[] | null }, _: unknown, ctx: WithDataSourcesContext ) { if (item.highlights) return item.highlights - if (item.highlightAnnotations && item.highlightAnnotations.length > 0) { - const highlights = await findHighlightsByLibraryItemId(item.id, ctx.uid) - return highlights.map(highlightDataToHighlight) - } - - return [] + const highlights = await findHighlightsByLibraryItemId(item.id, ctx.uid) + return highlights.map(highlightDataToHighlight) }, }, Subscription: { diff --git a/packages/api/src/services/highlights.ts b/packages/api/src/services/highlights.ts index 509f666b4..436a3dd1a 100644 --- a/packages/api/src/services/highlights.ts +++ b/packages/api/src/services/highlights.ts @@ -9,7 +9,6 @@ import { createPubSubClient, EntityType } from '../pubsub' import { authTrx } from '../repository' import { highlightRepository } from '../repository/highlight' import { enqueueUpdateHighlight } from '../utils/createTask' -import { logger } from '../utils/logger' type HighlightEvent = { id: string; pageId: string } type CreateHighlightEvent = DeepPartial & HighlightEvent @@ -63,13 +62,10 @@ export const createHighlight = async ( userId ) - if (newHighlight.annotation) { - const job = await enqueueUpdateHighlight({ - libraryItemId, - userId, - }) - logger.info('update highlight job enqueued', job) - } + await enqueueUpdateHighlight({ + libraryItemId, + userId, + }) return newHighlight } @@ -113,13 +109,10 @@ export const mergeHighlights = async ( userId ) - if (newHighlight.annotation) { - const job = await enqueueUpdateHighlight({ - libraryItemId, - userId, - }) - logger.info('update highlight job enqueued', job) - } + await enqueueUpdateHighlight({ + libraryItemId, + userId, + }) return newHighlight } @@ -150,13 +143,10 @@ export const updateHighlight = async ( userId ) - if (highlight.annotation) { - const job = await enqueueUpdateHighlight({ - libraryItemId, - userId, - }) - logger.info('update highlight job enqueued', job) - } + await enqueueUpdateHighlight({ + libraryItemId, + userId, + }) return updatedHighlight } @@ -168,7 +158,6 @@ export const deleteHighlightById = async (highlightId: string) => { where: { id: highlightId }, relations: { user: true, - libraryItem: true, }, }) @@ -176,13 +165,10 @@ export const deleteHighlightById = async (highlightId: string) => { return highlight }) - if (deletedHighlight.annotation) { - const job = await enqueueUpdateHighlight({ - libraryItemId: deletedHighlight.libraryItem.id, - userId: deletedHighlight.user.id, - }) - logger.info('update highlight job enqueued', job) - } + await enqueueUpdateHighlight({ + libraryItemId: deletedHighlight.libraryItemId, + userId: deletedHighlight.user.id, + }) return deletedHighlight } diff --git a/packages/api/src/services/labels.ts b/packages/api/src/services/labels.ts index 8ad7c24a6..8d98514c2 100644 --- a/packages/api/src/services/labels.ts +++ b/packages/api/src/services/labels.ts @@ -1,13 +1,12 @@ import { DeepPartial, FindOptionsWhere, In } from 'typeorm' import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity' import { EntityLabel, LabelSource } from '../entity/entity_label' -import { Highlight } from '../entity/highlight' import { Label } from '../entity/label' import { createPubSubClient, EntityType, PubsubClient } from '../pubsub' import { authTrx } from '../repository' import { CreateLabelInput, labelRepository } from '../repository/label' import { bulkEnqueueUpdateLabels } from '../utils/createTask' -import { logger } from '../utils/logger' +import { findHighlightById } from './highlights' import { findLibraryItemIdsByLabelId } from './library_item' type AddLabelsToLibraryItemEvent = { @@ -126,8 +125,7 @@ export const saveLabelsInLibraryItem = async ( } // update labels in library item - const jobs = await bulkEnqueueUpdateLabels([{ libraryItemId, userId }]) - logger.info('update labels jobs enqueued', jobs) + await bulkEnqueueUpdateLabels([{ libraryItemId, userId }]) } export const addLabelsToLibraryItem = async ( @@ -155,8 +153,7 @@ export const addLabelsToLibraryItem = async ( ) // update labels in library item - const jobs = await bulkEnqueueUpdateLabels([{ libraryItemId, userId }]) - logger.info('update labels jobs enqueued', jobs) + await bulkEnqueueUpdateLabels([{ libraryItemId, userId }]) } export const saveLabelsInHighlight = async ( @@ -189,19 +186,11 @@ export const saveLabelsInHighlight = async ( userId ) - const highlight = await authTrx(async (tx) => - tx.getRepository(Highlight).findOne({ - where: { id: highlightId }, - relations: ['libraryItem'], - }) - ) - if (highlight) { - // update labels in library item - const jobs = await bulkEnqueueUpdateLabels([ - { libraryItemId: highlight.libraryItem.id, userId }, - ]) - logger.info('update labels jobs enqueued', jobs) - } + const highlight = await findHighlightById(highlightId, userId) + // update labels in library item + await bulkEnqueueUpdateLabels([ + { libraryItemId: highlight.libraryItemId, userId }, + ]) } export const findLabelsByIds = async ( @@ -259,8 +248,7 @@ export const deleteLabelById = async (labelId: string, userId: string) => { libraryItemId, userId, })) - const jobs = await bulkEnqueueUpdateLabels(data) - logger.info('update labels jobs enqueued', jobs) + await bulkEnqueueUpdateLabels(data) return true } @@ -281,16 +269,13 @@ export const updateLabel = async ( userId ) - if (label.name) { - const libraryItemIds = await findLibraryItemIdsByLabelId(id, userId) + const libraryItemIds = await findLibraryItemIdsByLabelId(id, userId) - const data = libraryItemIds.map((libraryItemId) => ({ - libraryItemId, - userId, - })) - const jobs = await bulkEnqueueUpdateLabels(data) - logger.info('update labels jobs enqueued', jobs) - } + const data = libraryItemIds.map((libraryItemId) => ({ + libraryItemId, + userId, + })) + await bulkEnqueueUpdateLabels(data) return updatedLabel } diff --git a/packages/api/src/services/library_item.ts b/packages/api/src/services/library_item.ts index 7151c8d2b..d8c927151 100644 --- a/packages/api/src/services/library_item.ts +++ b/packages/api/src/services/library_item.ts @@ -1008,30 +1008,24 @@ export const batchUpdateLibraryItems = async ( const libraryItems = await queryBuilder.getMany() // add labels in library items const labelsToAdd = libraryItems.flatMap((libraryItem) => - labels - .map((label) => ({ - labelId: label.id, - libraryItemId: libraryItem.id, - name: label.name, - })) - .filter((entityLabel) => { - const existingLabel = libraryItem.labelNames?.find( - (l) => l.toLowerCase() === entityLabel.name.toLowerCase() - ) - return !existingLabel - }) + labels.map((label) => ({ + labelId: label.id, + libraryItemId: libraryItem.id, + name: label.name, + // put an zero uuid for highlight to avoid unique constraint violation + highlightId: '00000000-0000-0000-0000-000000000000', + })) ) const labelsAdded = await tx.getRepository(EntityLabel).save(labelsToAdd) - const data = labelsAdded.map((label) => ({ - libraryItemId: label.libraryItemId, + const data = libraryItems.map((item) => ({ + libraryItemId: item.id, userId, })) // update labels in library item - const jobs = await bulkEnqueueUpdateLabels(data) - logger.info('update labels jobs enqueued', jobs) + await bulkEnqueueUpdateLabels(data) - return + return labelsAdded } // generate raw sql because postgres doesn't support prepared statements in DO blocks diff --git a/packages/api/test/global-setup.ts b/packages/api/test/global-setup.ts index 31f9a3d86..56c82a57f 100644 --- a/packages/api/test/global-setup.ts +++ b/packages/api/test/global-setup.ts @@ -1,7 +1,7 @@ import { env } from '../src/env' import { redisDataSource } from '../src/redis_data_source' import { createTestConnection } from './db' -import { startApolloServer } from './util' +import { startApolloServer, startWorker } from './util' export const mochaGlobalSetup = async () => { await createTestConnection() @@ -10,6 +10,11 @@ export const mochaGlobalSetup = async () => { if (env.redis.cache.url) { await redisDataSource.initialize() console.log('redis connection created') + + if (redisDataSource.workerRedisClient) { + startWorker(redisDataSource.workerRedisClient) + console.log('worker started') + } } await startApolloServer() diff --git a/packages/api/test/global-teardown.ts b/packages/api/test/global-teardown.ts index 2e476c55e..45b0760b6 100644 --- a/packages/api/test/global-teardown.ts +++ b/packages/api/test/global-teardown.ts @@ -1,7 +1,7 @@ import { appDataSource } from '../src/data_source' import { env } from '../src/env' import { redisDataSource } from '../src/redis_data_source' -import { stopApolloServer } from './util' +import { stopApolloServer, stopWorker } from './util' export const mochaGlobalTeardown = async () => { await stopApolloServer() @@ -13,5 +13,10 @@ export const mochaGlobalTeardown = async () => { if (env.redis.cache.url) { await redisDataSource.shutdown() console.log('redis connection closed') + + if (redisDataSource.workerRedisClient) { + stopWorker() + console.log('worker closed') + } } } diff --git a/packages/api/test/resolvers/article.test.ts b/packages/api/test/resolvers/article.test.ts index c25d1447f..3fb6405f6 100644 --- a/packages/api/test/resolvers/article.test.ts +++ b/packages/api/test/resolvers/article.test.ts @@ -42,7 +42,12 @@ import { deleteUser } from '../../src/services/user' import * as createTask from '../../src/utils/createTask' import * as uploads from '../../src/utils/uploads' import { createTestLibraryItem, createTestUser } from '../db' -import { generateFakeUuid, graphqlRequest, request } from '../util' +import { + generateFakeUuid, + graphqlRequest, + request, + waitUntilJobsDone, +} from '../util' chai.use(chaiString) @@ -942,6 +947,8 @@ describe('Article API', () => { ) highlights.push(highlight) } + + await waitUntilJobsDone() }) beforeEach(async () => { @@ -1287,6 +1294,8 @@ describe('Article API', () => { ) await saveLabelsInLibraryItem([label1], items[0].id, user.id) await saveLabelsInLibraryItem([label2], items[1].id, user.id) + + await waitUntilJobsDone() }) after(async () => { @@ -1784,6 +1793,8 @@ describe('Article API', () => { ) await saveLabelsInLibraryItem([label1], items[0].id, user.id) await saveLabelsInLibraryItem([label2], items[1].id, user.id) + + await waitUntilJobsDone() }) after(async () => { @@ -1842,6 +1853,8 @@ describe('Article API', () => { ) await saveLabelsInLibraryItem([label1], items[0].id, user.id) await saveLabelsInLibraryItem([label2], items[1].id, user.id) + + await waitUntilJobsDone() }) after(async () => { diff --git a/packages/api/test/resolvers/highlight.test.ts b/packages/api/test/resolvers/highlight.test.ts index e0fe8a668..1d839d515 100644 --- a/packages/api/test/resolvers/highlight.test.ts +++ b/packages/api/test/resolvers/highlight.test.ts @@ -2,7 +2,9 @@ import * as chai from 'chai' import { expect } from 'chai' import chaiString from 'chai-string' import 'mocha' +import { Highlight } from '../../src/entity/highlight' import { User } from '../../src/entity/user' +import { getRepository } from '../../src/repository' import { createHighlight, deleteHighlightById, @@ -11,7 +13,12 @@ import { import { createLabel, saveLabelsInHighlight } from '../../src/services/labels' import { deleteUser } from '../../src/services/user' import { createTestLibraryItem, createTestUser } from '../db' -import { generateFakeShortId, generateFakeUuid, graphqlRequest, request } from '../util' +import { + generateFakeShortId, + generateFakeUuid, + graphqlRequest, + request, +} from '../util' chai.use(chaiString) @@ -281,7 +288,7 @@ describe('Highlights API', () => { itemId, newHighlightId, newShortHighlightId, - [highlightId], + [highlightId] ) const res = await graphqlRequest(query, authToken).expect(200) diff --git a/packages/api/test/util.ts b/packages/api/test/util.ts index 5d2567f83..cfd2976a3 100644 --- a/packages/api/test/util.ts +++ b/packages/api/test/util.ts @@ -1,11 +1,14 @@ +import { ConnectionOptions, Worker } from 'bullmq' import { nanoid } from 'nanoid' import supertest from 'supertest' import { v4 } from 'uuid' +import { createWorker } from '../src/queue-processor' import { createApp } from '../src/server' import { corsConfig } from '../src/utils/corsConfig' const { app, apollo } = createApp() export const request = supertest(app) +let worker: Worker | undefined export const startApolloServer = async () => { await apollo.start() @@ -16,6 +19,18 @@ export const stopApolloServer = async () => { await apollo.stop() } +export const startWorker = async (connection: ConnectionOptions) => { + worker = createWorker(connection) +} + +export const stopWorker = async () => { + worker?.close() +} + +export const waitUntilJobsDone = async () => { + await worker?.waitUntilReady() +} + export const graphqlRequest = ( query: string, authToken: string,