running worker in the test

This commit is contained in:
Hongbo Wu
2024-01-31 17:45:49 +08:00
parent cf898299e4
commit 5e883cb2ba
14 changed files with 152 additions and 155 deletions

View File

@ -93,6 +93,7 @@ jobs:
PG_DB: omnivore_test
PG_LOGGER: debug
REDIS_URL: redis://localhost:${{ job.services.redis.ports[6379] }}
MQ_REDIS_URL: redis://localhost:${{ job.services.redis.ports[6379] }}
build-docker-images:
name: Build docker images
runs-on: ubuntu-latest

View File

@ -35,6 +35,9 @@ export class Highlight {
@JoinColumn({ name: 'library_item_id' })
libraryItem!: LibraryItem
@Column('uuid')
libraryItemId!: string
@Column('text')
quote?: string | null

View File

@ -183,15 +183,6 @@ export class LibraryItem {
})
highlights?: Highlight[]
@Column('text', { nullable: true })
labelNames?: string[] | null
@Column('text', { nullable: true })
highlightLabels?: string[] | null
@Column('text', { nullable: true })
highlightAnnotations?: string[] | null
@Column('text', { nullable: true })
note?: string | null

View File

@ -2,7 +2,14 @@
/* eslint-disable @typescript-eslint/restrict-template-expressions */
/* eslint-disable @typescript-eslint/require-await */
/* eslint-disable @typescript-eslint/no-misused-promises */
import { Job, JobType, Queue, QueueEvents, Worker } from 'bullmq'
import {
ConnectionOptions,
Job,
JobType,
Queue,
QueueEvents,
Worker,
} from 'bullmq'
import express, { Express } from 'express'
import { SnakeNamingStrategy } from 'typeorm-naming-strategies'
import { appDataSource } from './data_source'
@ -40,6 +47,43 @@ export const getBackendQueue = async (): Promise<Queue | undefined> => {
return backendQueue
}
export const createWorker = (connection: ConnectionOptions) =>
new Worker(
QUEUE_NAME,
async (job: Job) => {
switch (job.name) {
case 'refresh-all-feeds': {
const queue = await getBackendQueue()
const counts = await queue?.getJobCounts('prioritized')
if (counts && counts.wait > 1000) {
return
}
return await refreshAllFeeds(appDataSource)
}
case 'refresh-feed': {
return await refreshFeed(job.data)
}
case 'save-page': {
return savePageJob(job.data, job.attemptsMade)
}
case 'update-pdf-content': {
return updatePDFContentJob(job.data)
}
case THUMBNAIL_JOB:
return findThumbnail(job.data)
case TRIGGER_RULE_JOB_NAME:
return triggerRule(job.data)
case UPDATE_LABELS_JOB:
return updateLabels(job.data)
case UPDATE_HIGHLIGHT_JOB:
return updateHighlight(job.data)
}
},
{
connection,
}
)
const main = async () => {
console.log('[queue-processor]: starting queue processor')
@ -106,41 +150,7 @@ const main = async () => {
throw '[queue-processor] error redis is not initialized'
}
const worker = new Worker(
QUEUE_NAME,
async (job: Job) => {
switch (job.name) {
case 'refresh-all-feeds': {
const queue = await getBackendQueue()
const counts = await queue?.getJobCounts('prioritized')
if (counts && counts.wait > 1000) {
return
}
return await refreshAllFeeds(appDataSource)
}
case 'refresh-feed': {
return await refreshFeed(job.data)
}
case 'save-page': {
return savePageJob(job.data, job.attemptsMade)
}
case 'update-pdf-content': {
return updatePDFContentJob(job.data)
}
case THUMBNAIL_JOB:
return findThumbnail(job.data)
case TRIGGER_RULE_JOB_NAME:
return triggerRule(job.data)
case UPDATE_LABELS_JOB:
return updateLabels(job.data)
case UPDATE_HIGHLIGHT_JOB:
return updateHighlight(job.data)
}
},
{
connection: workerRedisClient,
}
)
const worker = createWorker(workerRedisClient)
const queueEvents = new QueueEvents(QUEUE_NAME, {
connection: workerRedisClient,

View File

@ -701,15 +701,10 @@ export const searchResolver = authorized<
const edges = await Promise.all(
libraryItems.map(async (libraryItem) => {
if (
libraryItem.highlightAnnotations &&
libraryItem.highlightAnnotations.length > 0
) {
libraryItem.highlights = await findHighlightsByLibraryItemId(
libraryItem.id,
uid
)
}
libraryItem.highlights = await findHighlightsByLibraryItemId(
libraryItem.id,
uid
)
if (params.includeContent && libraryItem.readableContent) {
// convert html to the requested format

View File

@ -334,17 +334,13 @@ export const functionResolvers = {
return article.content ? wordsCount(article.content) : undefined
},
async labels(
article: { id: string; labels?: Label[]; labelNames?: string[] | null },
article: { id: string; labels?: Label[] },
_: unknown,
ctx: WithDataSourcesContext
) {
if (article.labels) return article.labels
if (article.labelNames && article.labelNames.length > 0) {
return findLabelsByLibraryItemId(article.id, ctx.uid)
}
return []
return findLabelsByLibraryItemId(article.id, ctx.uid)
},
},
Highlight: {
@ -409,17 +405,13 @@ export const functionResolvers = {
return item.siteIcon
},
async labels(
item: { id: string; labels?: Label[]; labelNames?: string[] | null },
item: { id: string; labels?: Label[] },
_: unknown,
ctx: WithDataSourcesContext
) {
if (item.labels) return item.labels
if (item.labelNames && item.labelNames.length > 0) {
return findLabelsByLibraryItemId(item.id, ctx.uid)
}
return []
return findLabelsByLibraryItemId(item.id, ctx.uid)
},
async recommendations(
item: {
@ -446,19 +438,14 @@ export const functionResolvers = {
item: {
id: string
highlights?: Highlight[]
highlightAnnotations?: string[] | null
},
_: unknown,
ctx: WithDataSourcesContext
) {
if (item.highlights) return item.highlights
if (item.highlightAnnotations && item.highlightAnnotations.length > 0) {
const highlights = await findHighlightsByLibraryItemId(item.id, ctx.uid)
return highlights.map(highlightDataToHighlight)
}
return []
const highlights = await findHighlightsByLibraryItemId(item.id, ctx.uid)
return highlights.map(highlightDataToHighlight)
},
},
Subscription: {

View File

@ -9,7 +9,6 @@ import { createPubSubClient, EntityType } from '../pubsub'
import { authTrx } from '../repository'
import { highlightRepository } from '../repository/highlight'
import { enqueueUpdateHighlight } from '../utils/createTask'
import { logger } from '../utils/logger'
type HighlightEvent = { id: string; pageId: string }
type CreateHighlightEvent = DeepPartial<Highlight> & HighlightEvent
@ -63,13 +62,10 @@ export const createHighlight = async (
userId
)
if (newHighlight.annotation) {
const job = await enqueueUpdateHighlight({
libraryItemId,
userId,
})
logger.info('update highlight job enqueued', job)
}
await enqueueUpdateHighlight({
libraryItemId,
userId,
})
return newHighlight
}
@ -113,13 +109,10 @@ export const mergeHighlights = async (
userId
)
if (newHighlight.annotation) {
const job = await enqueueUpdateHighlight({
libraryItemId,
userId,
})
logger.info('update highlight job enqueued', job)
}
await enqueueUpdateHighlight({
libraryItemId,
userId,
})
return newHighlight
}
@ -150,13 +143,10 @@ export const updateHighlight = async (
userId
)
if (highlight.annotation) {
const job = await enqueueUpdateHighlight({
libraryItemId,
userId,
})
logger.info('update highlight job enqueued', job)
}
await enqueueUpdateHighlight({
libraryItemId,
userId,
})
return updatedHighlight
}
@ -168,7 +158,6 @@ export const deleteHighlightById = async (highlightId: string) => {
where: { id: highlightId },
relations: {
user: true,
libraryItem: true,
},
})
@ -176,13 +165,10 @@ export const deleteHighlightById = async (highlightId: string) => {
return highlight
})
if (deletedHighlight.annotation) {
const job = await enqueueUpdateHighlight({
libraryItemId: deletedHighlight.libraryItem.id,
userId: deletedHighlight.user.id,
})
logger.info('update highlight job enqueued', job)
}
await enqueueUpdateHighlight({
libraryItemId: deletedHighlight.libraryItemId,
userId: deletedHighlight.user.id,
})
return deletedHighlight
}

View File

@ -1,13 +1,12 @@
import { DeepPartial, FindOptionsWhere, In } from 'typeorm'
import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity'
import { EntityLabel, LabelSource } from '../entity/entity_label'
import { Highlight } from '../entity/highlight'
import { Label } from '../entity/label'
import { createPubSubClient, EntityType, PubsubClient } from '../pubsub'
import { authTrx } from '../repository'
import { CreateLabelInput, labelRepository } from '../repository/label'
import { bulkEnqueueUpdateLabels } from '../utils/createTask'
import { logger } from '../utils/logger'
import { findHighlightById } from './highlights'
import { findLibraryItemIdsByLabelId } from './library_item'
type AddLabelsToLibraryItemEvent = {
@ -126,8 +125,7 @@ export const saveLabelsInLibraryItem = async (
}
// update labels in library item
const jobs = await bulkEnqueueUpdateLabels([{ libraryItemId, userId }])
logger.info('update labels jobs enqueued', jobs)
await bulkEnqueueUpdateLabels([{ libraryItemId, userId }])
}
export const addLabelsToLibraryItem = async (
@ -155,8 +153,7 @@ export const addLabelsToLibraryItem = async (
)
// update labels in library item
const jobs = await bulkEnqueueUpdateLabels([{ libraryItemId, userId }])
logger.info('update labels jobs enqueued', jobs)
await bulkEnqueueUpdateLabels([{ libraryItemId, userId }])
}
export const saveLabelsInHighlight = async (
@ -189,19 +186,11 @@ export const saveLabelsInHighlight = async (
userId
)
const highlight = await authTrx(async (tx) =>
tx.getRepository(Highlight).findOne({
where: { id: highlightId },
relations: ['libraryItem'],
})
)
if (highlight) {
// update labels in library item
const jobs = await bulkEnqueueUpdateLabels([
{ libraryItemId: highlight.libraryItem.id, userId },
])
logger.info('update labels jobs enqueued', jobs)
}
const highlight = await findHighlightById(highlightId, userId)
// update labels in library item
await bulkEnqueueUpdateLabels([
{ libraryItemId: highlight.libraryItemId, userId },
])
}
export const findLabelsByIds = async (
@ -259,8 +248,7 @@ export const deleteLabelById = async (labelId: string, userId: string) => {
libraryItemId,
userId,
}))
const jobs = await bulkEnqueueUpdateLabels(data)
logger.info('update labels jobs enqueued', jobs)
await bulkEnqueueUpdateLabels(data)
return true
}
@ -281,16 +269,13 @@ export const updateLabel = async (
userId
)
if (label.name) {
const libraryItemIds = await findLibraryItemIdsByLabelId(id, userId)
const libraryItemIds = await findLibraryItemIdsByLabelId(id, userId)
const data = libraryItemIds.map((libraryItemId) => ({
libraryItemId,
userId,
}))
const jobs = await bulkEnqueueUpdateLabels(data)
logger.info('update labels jobs enqueued', jobs)
}
const data = libraryItemIds.map((libraryItemId) => ({
libraryItemId,
userId,
}))
await bulkEnqueueUpdateLabels(data)
return updatedLabel
}

View File

@ -1008,30 +1008,24 @@ export const batchUpdateLibraryItems = async (
const libraryItems = await queryBuilder.getMany()
// add labels in library items
const labelsToAdd = libraryItems.flatMap((libraryItem) =>
labels
.map((label) => ({
labelId: label.id,
libraryItemId: libraryItem.id,
name: label.name,
}))
.filter((entityLabel) => {
const existingLabel = libraryItem.labelNames?.find(
(l) => l.toLowerCase() === entityLabel.name.toLowerCase()
)
return !existingLabel
})
labels.map((label) => ({
labelId: label.id,
libraryItemId: libraryItem.id,
name: label.name,
// put an zero uuid for highlight to avoid unique constraint violation
highlightId: '00000000-0000-0000-0000-000000000000',
}))
)
const labelsAdded = await tx.getRepository(EntityLabel).save(labelsToAdd)
const data = labelsAdded.map((label) => ({
libraryItemId: label.libraryItemId,
const data = libraryItems.map((item) => ({
libraryItemId: item.id,
userId,
}))
// update labels in library item
const jobs = await bulkEnqueueUpdateLabels(data)
logger.info('update labels jobs enqueued', jobs)
await bulkEnqueueUpdateLabels(data)
return
return labelsAdded
}
// generate raw sql because postgres doesn't support prepared statements in DO blocks

View File

@ -1,7 +1,7 @@
import { env } from '../src/env'
import { redisDataSource } from '../src/redis_data_source'
import { createTestConnection } from './db'
import { startApolloServer } from './util'
import { startApolloServer, startWorker } from './util'
export const mochaGlobalSetup = async () => {
await createTestConnection()
@ -10,6 +10,11 @@ export const mochaGlobalSetup = async () => {
if (env.redis.cache.url) {
await redisDataSource.initialize()
console.log('redis connection created')
if (redisDataSource.workerRedisClient) {
startWorker(redisDataSource.workerRedisClient)
console.log('worker started')
}
}
await startApolloServer()

View File

@ -1,7 +1,7 @@
import { appDataSource } from '../src/data_source'
import { env } from '../src/env'
import { redisDataSource } from '../src/redis_data_source'
import { stopApolloServer } from './util'
import { stopApolloServer, stopWorker } from './util'
export const mochaGlobalTeardown = async () => {
await stopApolloServer()
@ -13,5 +13,10 @@ export const mochaGlobalTeardown = async () => {
if (env.redis.cache.url) {
await redisDataSource.shutdown()
console.log('redis connection closed')
if (redisDataSource.workerRedisClient) {
stopWorker()
console.log('worker closed')
}
}
}

View File

@ -42,7 +42,12 @@ import { deleteUser } from '../../src/services/user'
import * as createTask from '../../src/utils/createTask'
import * as uploads from '../../src/utils/uploads'
import { createTestLibraryItem, createTestUser } from '../db'
import { generateFakeUuid, graphqlRequest, request } from '../util'
import {
generateFakeUuid,
graphqlRequest,
request,
waitUntilJobsDone,
} from '../util'
chai.use(chaiString)
@ -942,6 +947,8 @@ describe('Article API', () => {
)
highlights.push(highlight)
}
await waitUntilJobsDone()
})
beforeEach(async () => {
@ -1287,6 +1294,8 @@ describe('Article API', () => {
)
await saveLabelsInLibraryItem([label1], items[0].id, user.id)
await saveLabelsInLibraryItem([label2], items[1].id, user.id)
await waitUntilJobsDone()
})
after(async () => {
@ -1784,6 +1793,8 @@ describe('Article API', () => {
)
await saveLabelsInLibraryItem([label1], items[0].id, user.id)
await saveLabelsInLibraryItem([label2], items[1].id, user.id)
await waitUntilJobsDone()
})
after(async () => {
@ -1842,6 +1853,8 @@ describe('Article API', () => {
)
await saveLabelsInLibraryItem([label1], items[0].id, user.id)
await saveLabelsInLibraryItem([label2], items[1].id, user.id)
await waitUntilJobsDone()
})
after(async () => {

View File

@ -2,7 +2,9 @@ import * as chai from 'chai'
import { expect } from 'chai'
import chaiString from 'chai-string'
import 'mocha'
import { Highlight } from '../../src/entity/highlight'
import { User } from '../../src/entity/user'
import { getRepository } from '../../src/repository'
import {
createHighlight,
deleteHighlightById,
@ -11,7 +13,12 @@ import {
import { createLabel, saveLabelsInHighlight } from '../../src/services/labels'
import { deleteUser } from '../../src/services/user'
import { createTestLibraryItem, createTestUser } from '../db'
import { generateFakeShortId, generateFakeUuid, graphqlRequest, request } from '../util'
import {
generateFakeShortId,
generateFakeUuid,
graphqlRequest,
request,
} from '../util'
chai.use(chaiString)
@ -281,7 +288,7 @@ describe('Highlights API', () => {
itemId,
newHighlightId,
newShortHighlightId,
[highlightId],
[highlightId]
)
const res = await graphqlRequest(query, authToken).expect(200)

View File

@ -1,11 +1,14 @@
import { ConnectionOptions, Worker } from 'bullmq'
import { nanoid } from 'nanoid'
import supertest from 'supertest'
import { v4 } from 'uuid'
import { createWorker } from '../src/queue-processor'
import { createApp } from '../src/server'
import { corsConfig } from '../src/utils/corsConfig'
const { app, apollo } = createApp()
export const request = supertest(app)
let worker: Worker | undefined
export const startApolloServer = async () => {
await apollo.start()
@ -16,6 +19,18 @@ export const stopApolloServer = async () => {
await apollo.stop()
}
export const startWorker = async (connection: ConnectionOptions) => {
worker = createWorker(connection)
}
export const stopWorker = async () => {
worker?.close()
}
export const waitUntilJobsDone = async () => {
await worker?.waitUntilReady()
}
export const graphqlRequest = (
query: string,
authToken: string,