wait until all jobs are done in test

This commit is contained in:
Hongbo Wu
2024-01-31 19:10:57 +08:00
parent 5e883cb2ba
commit 1b8111b907
5 changed files with 95 additions and 31 deletions

View File

@ -125,7 +125,7 @@ export const saveLabelsInLibraryItem = async (
}
// update labels in library item
await bulkEnqueueUpdateLabels([{ libraryItemId, userId }])
return bulkEnqueueUpdateLabels([{ libraryItemId, userId }])
}
export const addLabelsToLibraryItem = async (

View File

@ -669,7 +669,7 @@ export const enqueueTriggerRuleJob = async (data: TriggerRuleJobData) => {
export const bulkEnqueueUpdateLabels = async (data: UpdateLabelsData[]) => {
const queue = await getBackendQueue()
if (!queue) {
return undefined
return []
}
const jobs = data.map((d) => ({
@ -684,6 +684,7 @@ export const bulkEnqueueUpdateLabels = async (data: UpdateLabelsData[]) => {
return queue.addBulk(jobs)
} catch (error) {
logger.error('error enqueuing update labels jobs', error)
return []
}
}

View File

@ -1,18 +1,24 @@
import { DeepPartial } from 'typeorm'
import { appDataSource } from '../src/data_source'
import { EntityLabel, LabelSource } from '../src/entity/entity_label'
import { Filter } from '../src/entity/filter'
import { Highlight } from '../src/entity/highlight'
import { Label } from '../src/entity/label'
import { LibraryItem } from '../src/entity/library_item'
import { Reminder } from '../src/entity/reminder'
import { User } from '../src/entity/user'
import { UserDeviceToken } from '../src/entity/user_device_tokens'
import { getRepository, setClaims } from '../src/repository'
import { authTrx, getRepository, setClaims } from '../src/repository'
import { highlightRepository } from '../src/repository/highlight'
import { userRepository } from '../src/repository/user'
import { createUser } from '../src/services/create_user'
import { saveLabelsInLibraryItem } from '../src/services/labels'
import { createLibraryItem } from '../src/services/library_item'
import { createDeviceToken } from '../src/services/user_device_tokens'
import { generateFakeUuid } from './util'
import {
bulkEnqueueUpdateLabels,
enqueueUpdateHighlight,
} from '../src/utils/createTask'
import { generateFakeUuid, waitUntilJobsDone } from './util'
export const createTestConnection = async (): Promise<void> => {
appDataSource.setOptions({
@ -121,3 +127,68 @@ export const createTestLibraryItem = async (
return createdItem
}
export const saveLabelsInLibraryItem = async (
labels: Label[],
libraryItemId: string,
userId: string,
source: LabelSource = 'user'
) => {
await authTrx(
async (tx) => {
const repo = tx.getRepository(EntityLabel)
// delete existing labels
await repo.delete({
libraryItemId,
})
// save new labels
await repo.save(
labels.map((l) => ({
labelId: l.id,
libraryItemId,
source,
}))
)
},
undefined,
userId
)
// update labels in library item
const jobs = await bulkEnqueueUpdateLabels([{ libraryItemId, userId }])
await waitUntilJobsDone(jobs)
}
export const createHighlight = async (
highlight: DeepPartial<Highlight>,
libraryItemId: string,
userId: string
) => {
const newHighlight = await authTrx(
async (tx) => {
const repo = tx.withRepository(highlightRepository)
const newHighlight = await repo.createAndSave(highlight)
return repo.findOneOrFail({
where: { id: newHighlight.id },
relations: {
user: true,
},
})
},
undefined,
userId
)
const job = await enqueueUpdateHighlight({
libraryItemId,
userId,
})
if (job) {
await waitUntilJobsDone([job])
}
return newHighlight
}

View File

@ -21,12 +21,7 @@ import {
} from '../../src/generated/graphql'
import { getRepository } from '../../src/repository'
import { createGroup, deleteGroup } from '../../src/services/groups'
import { createHighlight } from '../../src/services/highlights'
import {
createLabel,
deleteLabels,
saveLabelsInLibraryItem,
} from '../../src/services/labels'
import { createLabel, deleteLabels } from '../../src/services/labels'
import {
createLibraryItem,
createLibraryItems,
@ -41,13 +36,13 @@ import {
import { deleteUser } from '../../src/services/user'
import * as createTask from '../../src/utils/createTask'
import * as uploads from '../../src/utils/uploads'
import { createTestLibraryItem, createTestUser } from '../db'
import {
generateFakeUuid,
graphqlRequest,
request,
waitUntilJobsDone,
} from '../util'
createHighlight,
createTestLibraryItem,
createTestUser,
saveLabelsInLibraryItem,
} from '../db'
import { generateFakeUuid, graphqlRequest, request } from '../util'
chai.use(chaiString)
@ -947,8 +942,6 @@ describe('Article API', () => {
)
highlights.push(highlight)
}
await waitUntilJobsDone()
})
beforeEach(async () => {
@ -1294,8 +1287,6 @@ describe('Article API', () => {
)
await saveLabelsInLibraryItem([label1], items[0].id, user.id)
await saveLabelsInLibraryItem([label2], items[1].id, user.id)
await waitUntilJobsDone()
})
after(async () => {
@ -1793,8 +1784,6 @@ describe('Article API', () => {
)
await saveLabelsInLibraryItem([label1], items[0].id, user.id)
await saveLabelsInLibraryItem([label2], items[1].id, user.id)
await waitUntilJobsDone()
})
after(async () => {
@ -1853,8 +1842,6 @@ describe('Article API', () => {
)
await saveLabelsInLibraryItem([label1], items[0].id, user.id)
await saveLabelsInLibraryItem([label2], items[1].id, user.id)
await waitUntilJobsDone()
})
after(async () => {

View File

@ -1,14 +1,15 @@
import { ConnectionOptions, Worker } from 'bullmq'
import { ConnectionOptions, Job, QueueEvents, Worker } from 'bullmq'
import { nanoid } from 'nanoid'
import supertest from 'supertest'
import { v4 } from 'uuid'
import { createWorker } from '../src/queue-processor'
import { createWorker, QUEUE_NAME } from '../src/queue-processor'
import { createApp } from '../src/server'
import { corsConfig } from '../src/utils/corsConfig'
const { app, apollo } = createApp()
export const request = supertest(app)
let worker: Worker | undefined
let worker: Worker
let queueEvents: QueueEvents
export const startApolloServer = async () => {
await apollo.start()
@ -21,14 +22,18 @@ export const stopApolloServer = async () => {
export const startWorker = async (connection: ConnectionOptions) => {
worker = createWorker(connection)
queueEvents = new QueueEvents(QUEUE_NAME, {
connection,
})
}
export const stopWorker = async () => {
worker?.close()
queueEvents.close()
worker.close()
}
export const waitUntilJobsDone = async () => {
await worker?.waitUntilReady()
export const waitUntilJobsDone = async (jobs: Job[]) => {
await Promise.all(jobs.map((job) => job.waitUntilFinished(queueEvents)))
}
export const graphqlRequest = (