Merge pull request #3466 from omnivore-app/fix/rewrite-trigger-functions-in-db
create a job for db trigger
This commit is contained in:
1
.github/workflows/run-tests.yaml
vendored
1
.github/workflows/run-tests.yaml
vendored
@ -93,6 +93,7 @@ jobs:
|
||||
PG_DB: omnivore_test
|
||||
PG_LOGGER: debug
|
||||
REDIS_URL: redis://localhost:${{ job.services.redis.ports[6379] }}
|
||||
MQ_REDIS_URL: redis://localhost:${{ job.services.redis.ports[6379] }}
|
||||
build-docker-images:
|
||||
name: Build docker images
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
@ -35,6 +35,9 @@ export class Highlight {
|
||||
@JoinColumn({ name: 'library_item_id' })
|
||||
libraryItem!: LibraryItem
|
||||
|
||||
@Column('uuid')
|
||||
libraryItemId!: string
|
||||
|
||||
@Column('text')
|
||||
quote?: string | null
|
||||
|
||||
|
||||
@ -183,15 +183,6 @@ export class LibraryItem {
|
||||
})
|
||||
highlights?: Highlight[]
|
||||
|
||||
@Column('text', { nullable: true })
|
||||
labelNames?: string[] | null
|
||||
|
||||
@Column('text', { nullable: true })
|
||||
highlightLabels?: string[] | null
|
||||
|
||||
@Column('text', { nullable: true })
|
||||
highlightAnnotations?: string[] | null
|
||||
|
||||
@Column('text', { nullable: true })
|
||||
note?: string | null
|
||||
|
||||
|
||||
54
packages/api/src/jobs/update_db.ts
Normal file
54
packages/api/src/jobs/update_db.ts
Normal file
@ -0,0 +1,54 @@
|
||||
import { authTrx } from '../repository'
|
||||
|
||||
export const UPDATE_LABELS_JOB = 'update-labels'
|
||||
export const UPDATE_HIGHLIGHT_JOB = 'update-highlight'
|
||||
|
||||
export interface UpdateLabelsData {
|
||||
libraryItemId: string
|
||||
userId: string
|
||||
}
|
||||
|
||||
export interface UpdateHighlightData {
|
||||
libraryItemId: string
|
||||
userId: string
|
||||
}
|
||||
|
||||
export const updateLabels = async (data: UpdateLabelsData) => {
|
||||
return authTrx(
|
||||
async (tx) =>
|
||||
tx.query(
|
||||
`WITH labels_agg AS (
|
||||
SELECT array_agg(DISTINCT l.name) AS names_agg
|
||||
FROM omnivore.labels l
|
||||
INNER JOIN omnivore.entity_labels el ON el.label_id = l.id
|
||||
LEFT JOIN omnivore.highlight h ON h.id = el.highlight_id
|
||||
WHERE el.library_item_id = $1 OR h.library_item_id = $1
|
||||
)
|
||||
UPDATE omnivore.library_item li
|
||||
SET label_names = COALESCE((SELECT names_agg FROM labels_agg), ARRAY[]::TEXT[])
|
||||
WHERE li.id = $1`,
|
||||
[data.libraryItemId]
|
||||
),
|
||||
undefined,
|
||||
data.userId
|
||||
)
|
||||
}
|
||||
|
||||
export const updateHighlight = async (data: UpdateHighlightData) => {
|
||||
return authTrx(
|
||||
async (tx) =>
|
||||
tx.query(
|
||||
`WITH highlight_agg AS (
|
||||
SELECT array_agg(COALESCE(annotation, '')) AS annotation_agg
|
||||
FROM omnivore.highlight
|
||||
WHERE library_item_id = $1
|
||||
)
|
||||
UPDATE omnivore.library_item
|
||||
SET highlight_annotations = COALESCE((SELECT annotation_agg FROM highlight_agg), ARRAY[]::TEXT[])
|
||||
WHERE id = $1`,
|
||||
[data.libraryItemId]
|
||||
),
|
||||
undefined,
|
||||
data.userId
|
||||
)
|
||||
}
|
||||
@ -2,7 +2,14 @@
|
||||
/* eslint-disable @typescript-eslint/restrict-template-expressions */
|
||||
/* eslint-disable @typescript-eslint/require-await */
|
||||
/* eslint-disable @typescript-eslint/no-misused-promises */
|
||||
import { Job, Queue, QueueEvents, Worker, JobType } from 'bullmq'
|
||||
import {
|
||||
ConnectionOptions,
|
||||
Job,
|
||||
JobType,
|
||||
Queue,
|
||||
QueueEvents,
|
||||
Worker,
|
||||
} from 'bullmq'
|
||||
import express, { Express } from 'express'
|
||||
import { SnakeNamingStrategy } from 'typeorm-naming-strategies'
|
||||
import { appDataSource } from './data_source'
|
||||
@ -11,10 +18,16 @@ import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail'
|
||||
import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds'
|
||||
import { refreshFeed } from './jobs/rss/refreshFeed'
|
||||
import { savePageJob } from './jobs/save_page'
|
||||
import { triggerRule, TRIGGER_RULE_JOB_NAME } from './jobs/trigger_rule'
|
||||
import {
|
||||
updateHighlight,
|
||||
updateLabels,
|
||||
UPDATE_HIGHLIGHT_JOB,
|
||||
UPDATE_LABELS_JOB,
|
||||
} from './jobs/update_db'
|
||||
import { updatePDFContentJob } from './jobs/update_pdf_content'
|
||||
import { redisDataSource } from './redis_data_source'
|
||||
import { CustomTypeOrmLogger } from './utils/logger'
|
||||
import { triggerRule, TRIGGER_RULE_JOB_NAME } from './jobs/trigger_rule'
|
||||
|
||||
export const QUEUE_NAME = 'omnivore-backend-queue'
|
||||
|
||||
@ -34,6 +47,43 @@ export const getBackendQueue = async (): Promise<Queue | undefined> => {
|
||||
return backendQueue
|
||||
}
|
||||
|
||||
export const createWorker = (connection: ConnectionOptions) =>
|
||||
new Worker(
|
||||
QUEUE_NAME,
|
||||
async (job: Job) => {
|
||||
switch (job.name) {
|
||||
case 'refresh-all-feeds': {
|
||||
const queue = await getBackendQueue()
|
||||
const counts = await queue?.getJobCounts('prioritized')
|
||||
if (counts && counts.wait > 1000) {
|
||||
return
|
||||
}
|
||||
return await refreshAllFeeds(appDataSource)
|
||||
}
|
||||
case 'refresh-feed': {
|
||||
return await refreshFeed(job.data)
|
||||
}
|
||||
case 'save-page': {
|
||||
return savePageJob(job.data, job.attemptsMade)
|
||||
}
|
||||
case 'update-pdf-content': {
|
||||
return updatePDFContentJob(job.data)
|
||||
}
|
||||
case THUMBNAIL_JOB:
|
||||
return findThumbnail(job.data)
|
||||
case TRIGGER_RULE_JOB_NAME:
|
||||
return triggerRule(job.data)
|
||||
case UPDATE_LABELS_JOB:
|
||||
return updateLabels(job.data)
|
||||
case UPDATE_HIGHLIGHT_JOB:
|
||||
return updateHighlight(job.data)
|
||||
}
|
||||
},
|
||||
{
|
||||
connection,
|
||||
}
|
||||
)
|
||||
|
||||
const main = async () => {
|
||||
console.log('[queue-processor]: starting queue processor')
|
||||
|
||||
@ -100,37 +150,7 @@ const main = async () => {
|
||||
throw '[queue-processor] error redis is not initialized'
|
||||
}
|
||||
|
||||
const worker = new Worker(
|
||||
QUEUE_NAME,
|
||||
async (job: Job) => {
|
||||
switch (job.name) {
|
||||
case 'refresh-all-feeds': {
|
||||
const queue = await getBackendQueue()
|
||||
const counts = await queue?.getJobCounts('prioritized')
|
||||
if (counts && counts.wait > 1000) {
|
||||
return
|
||||
}
|
||||
return await refreshAllFeeds(appDataSource)
|
||||
}
|
||||
case 'refresh-feed': {
|
||||
return await refreshFeed(job.data)
|
||||
}
|
||||
case 'save-page': {
|
||||
return savePageJob(job.data, job.attemptsMade)
|
||||
}
|
||||
case 'update-pdf-content': {
|
||||
return updatePDFContentJob(job.data)
|
||||
}
|
||||
case THUMBNAIL_JOB:
|
||||
return findThumbnail(job.data)
|
||||
case TRIGGER_RULE_JOB_NAME:
|
||||
return triggerRule(job.data)
|
||||
}
|
||||
},
|
||||
{
|
||||
connection: workerRedisClient,
|
||||
}
|
||||
)
|
||||
const worker = createWorker(workerRedisClient)
|
||||
|
||||
const queueEvents = new QueueEvents(QUEUE_NAME, {
|
||||
connection: workerRedisClient,
|
||||
|
||||
@ -701,15 +701,10 @@ export const searchResolver = authorized<
|
||||
|
||||
const edges = await Promise.all(
|
||||
libraryItems.map(async (libraryItem) => {
|
||||
if (
|
||||
libraryItem.highlightAnnotations &&
|
||||
libraryItem.highlightAnnotations.length > 0
|
||||
) {
|
||||
libraryItem.highlights = await findHighlightsByLibraryItemId(
|
||||
libraryItem.id,
|
||||
uid
|
||||
)
|
||||
}
|
||||
libraryItem.highlights = await findHighlightsByLibraryItemId(
|
||||
libraryItem.id,
|
||||
uid
|
||||
)
|
||||
|
||||
if (params.includeContent && libraryItem.readableContent) {
|
||||
// convert html to the requested format
|
||||
|
||||
@ -334,17 +334,13 @@ export const functionResolvers = {
|
||||
return article.content ? wordsCount(article.content) : undefined
|
||||
},
|
||||
async labels(
|
||||
article: { id: string; labels?: Label[]; labelNames?: string[] | null },
|
||||
article: { id: string; labels?: Label[] },
|
||||
_: unknown,
|
||||
ctx: WithDataSourcesContext
|
||||
) {
|
||||
if (article.labels) return article.labels
|
||||
|
||||
if (article.labelNames && article.labelNames.length > 0) {
|
||||
return findLabelsByLibraryItemId(article.id, ctx.uid)
|
||||
}
|
||||
|
||||
return []
|
||||
return findLabelsByLibraryItemId(article.id, ctx.uid)
|
||||
},
|
||||
},
|
||||
Highlight: {
|
||||
@ -409,17 +405,13 @@ export const functionResolvers = {
|
||||
return item.siteIcon
|
||||
},
|
||||
async labels(
|
||||
item: { id: string; labels?: Label[]; labelNames?: string[] | null },
|
||||
item: { id: string; labels?: Label[] },
|
||||
_: unknown,
|
||||
ctx: WithDataSourcesContext
|
||||
) {
|
||||
if (item.labels) return item.labels
|
||||
|
||||
if (item.labelNames && item.labelNames.length > 0) {
|
||||
return findLabelsByLibraryItemId(item.id, ctx.uid)
|
||||
}
|
||||
|
||||
return []
|
||||
return findLabelsByLibraryItemId(item.id, ctx.uid)
|
||||
},
|
||||
async recommendations(
|
||||
item: {
|
||||
@ -446,19 +438,14 @@ export const functionResolvers = {
|
||||
item: {
|
||||
id: string
|
||||
highlights?: Highlight[]
|
||||
highlightAnnotations?: string[] | null
|
||||
},
|
||||
_: unknown,
|
||||
ctx: WithDataSourcesContext
|
||||
) {
|
||||
if (item.highlights) return item.highlights
|
||||
|
||||
if (item.highlightAnnotations && item.highlightAnnotations.length > 0) {
|
||||
const highlights = await findHighlightsByLibraryItemId(item.id, ctx.uid)
|
||||
return highlights.map(highlightDataToHighlight)
|
||||
}
|
||||
|
||||
return []
|
||||
const highlights = await findHighlightsByLibraryItemId(item.id, ctx.uid)
|
||||
return highlights.map(highlightDataToHighlight)
|
||||
},
|
||||
},
|
||||
Subscription: {
|
||||
|
||||
@ -31,6 +31,7 @@ import {
|
||||
import { labelRepository } from '../../repository/label'
|
||||
import { userRepository } from '../../repository/user'
|
||||
import {
|
||||
deleteLabelById,
|
||||
findOrCreateLabels,
|
||||
saveLabelsInHighlight,
|
||||
saveLabelsInLibraryItem,
|
||||
@ -118,13 +119,10 @@ export const deleteLabelResolver = authorized<
|
||||
DeleteLabelSuccess,
|
||||
DeleteLabelError,
|
||||
MutationDeleteLabelArgs
|
||||
>(async (_, { id: labelId }, { authTrx, log, uid }) => {
|
||||
>(async (_, { id: labelId }, { log, uid }) => {
|
||||
try {
|
||||
const deleteResult = await authTrx(async (tx) => {
|
||||
return tx.withRepository(labelRepository).deleteById(labelId)
|
||||
})
|
||||
|
||||
if (!deleteResult.affected) {
|
||||
const deleted = await deleteLabelById(labelId, uid)
|
||||
if (!deleted) {
|
||||
return {
|
||||
errorCodes: [DeleteLabelErrorCode.NotFound],
|
||||
}
|
||||
@ -281,7 +279,7 @@ export const setLabelsForHighlightResolver = authorized<
|
||||
}
|
||||
}
|
||||
|
||||
// save labels in the library item
|
||||
// save labels in the highlight
|
||||
await saveLabelsInHighlight(labelsSet, input.highlightId, uid, pubsub)
|
||||
|
||||
analytics.track({
|
||||
|
||||
@ -8,6 +8,7 @@ import { homePageURL } from '../env'
|
||||
import { createPubSubClient, EntityType } from '../pubsub'
|
||||
import { authTrx } from '../repository'
|
||||
import { highlightRepository } from '../repository/highlight'
|
||||
import { enqueueUpdateHighlight } from '../utils/createTask'
|
||||
|
||||
type HighlightEvent = { id: string; pageId: string }
|
||||
type CreateHighlightEvent = DeepPartial<Highlight> & HighlightEvent
|
||||
@ -61,6 +62,11 @@ export const createHighlight = async (
|
||||
userId
|
||||
)
|
||||
|
||||
await enqueueUpdateHighlight({
|
||||
libraryItemId,
|
||||
userId,
|
||||
})
|
||||
|
||||
return newHighlight
|
||||
}
|
||||
|
||||
@ -103,6 +109,11 @@ export const mergeHighlights = async (
|
||||
userId
|
||||
)
|
||||
|
||||
await enqueueUpdateHighlight({
|
||||
libraryItemId,
|
||||
userId,
|
||||
})
|
||||
|
||||
return newHighlight
|
||||
}
|
||||
|
||||
@ -125,17 +136,23 @@ export const updateHighlight = async (
|
||||
})
|
||||
})
|
||||
|
||||
const libraryItemId = updatedHighlight.libraryItem.id
|
||||
await pubsub.entityUpdated<UpdateHighlightEvent>(
|
||||
EntityType.HIGHLIGHT,
|
||||
{ ...highlight, id: highlightId, pageId: updatedHighlight.libraryItem.id },
|
||||
{ ...highlight, id: highlightId, pageId: libraryItemId },
|
||||
userId
|
||||
)
|
||||
|
||||
await enqueueUpdateHighlight({
|
||||
libraryItemId,
|
||||
userId,
|
||||
})
|
||||
|
||||
return updatedHighlight
|
||||
}
|
||||
|
||||
export const deleteHighlightById = async (highlightId: string) => {
|
||||
return authTrx(async (tx) => {
|
||||
const deletedHighlight = await authTrx(async (tx) => {
|
||||
const highlightRepo = tx.withRepository(highlightRepository)
|
||||
const highlight = await highlightRepo.findOneOrFail({
|
||||
where: { id: highlightId },
|
||||
@ -147,6 +164,13 @@ export const deleteHighlightById = async (highlightId: string) => {
|
||||
await highlightRepo.delete(highlightId)
|
||||
return highlight
|
||||
})
|
||||
|
||||
await enqueueUpdateHighlight({
|
||||
libraryItemId: deletedHighlight.libraryItemId,
|
||||
userId: deletedHighlight.user.id,
|
||||
})
|
||||
|
||||
return deletedHighlight
|
||||
}
|
||||
|
||||
export const findHighlightById = async (
|
||||
|
||||
@ -5,6 +5,9 @@ import { Label } from '../entity/label'
|
||||
import { createPubSubClient, EntityType, PubsubClient } from '../pubsub'
|
||||
import { authTrx } from '../repository'
|
||||
import { CreateLabelInput, labelRepository } from '../repository/label'
|
||||
import { bulkEnqueueUpdateLabels } from '../utils/createTask'
|
||||
import { findHighlightById } from './highlights'
|
||||
import { findLibraryItemIdsByLabelId } from './library_item'
|
||||
|
||||
type AddLabelsToLibraryItemEvent = {
|
||||
pageId: string
|
||||
@ -120,6 +123,9 @@ export const saveLabelsInLibraryItem = async (
|
||||
userId
|
||||
)
|
||||
}
|
||||
|
||||
// update labels in library item
|
||||
return bulkEnqueueUpdateLabels([{ libraryItemId, userId }])
|
||||
}
|
||||
|
||||
export const addLabelsToLibraryItem = async (
|
||||
@ -145,6 +151,9 @@ export const addLabelsToLibraryItem = async (
|
||||
undefined,
|
||||
userId
|
||||
)
|
||||
|
||||
// update labels in library item
|
||||
await bulkEnqueueUpdateLabels([{ libraryItemId, userId }])
|
||||
}
|
||||
|
||||
export const saveLabelsInHighlight = async (
|
||||
@ -176,6 +185,12 @@ export const saveLabelsInHighlight = async (
|
||||
{ highlightId, labels },
|
||||
userId
|
||||
)
|
||||
|
||||
const highlight = await findHighlightById(highlightId, userId)
|
||||
// update labels in library item
|
||||
await bulkEnqueueUpdateLabels([
|
||||
{ libraryItemId: highlight.libraryItemId, userId },
|
||||
])
|
||||
}
|
||||
|
||||
export const findLabelsByIds = async (
|
||||
@ -218,12 +233,32 @@ export const deleteLabels = async (
|
||||
)
|
||||
}
|
||||
|
||||
export const deleteLabelById = async (labelId: string, userId: string) => {
|
||||
const libraryItemIds = await findLibraryItemIdsByLabelId(labelId, userId)
|
||||
|
||||
const deleteResult = await authTrx(async (tx) => {
|
||||
return tx.withRepository(labelRepository).deleteById(labelId)
|
||||
})
|
||||
|
||||
if (!deleteResult.affected) {
|
||||
return false
|
||||
}
|
||||
|
||||
const data = libraryItemIds.map((libraryItemId) => ({
|
||||
libraryItemId,
|
||||
userId,
|
||||
}))
|
||||
await bulkEnqueueUpdateLabels(data)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
export const updateLabel = async (
|
||||
id: string,
|
||||
label: QueryDeepPartialEntity<Label>,
|
||||
userId: string
|
||||
) => {
|
||||
return authTrx(
|
||||
const updatedLabel = await authTrx(
|
||||
async (t) => {
|
||||
const repo = t.withRepository(labelRepository)
|
||||
await repo.updateLabel(id, label)
|
||||
@ -233,6 +268,16 @@ export const updateLabel = async (
|
||||
undefined,
|
||||
userId
|
||||
)
|
||||
|
||||
const libraryItemIds = await findLibraryItemIdsByLabelId(id, userId)
|
||||
|
||||
const data = libraryItemIds.map((libraryItemId) => ({
|
||||
libraryItemId,
|
||||
userId,
|
||||
}))
|
||||
await bulkEnqueueUpdateLabels(data)
|
||||
|
||||
return updatedLabel
|
||||
}
|
||||
|
||||
export const findLabelsByUserId = async (userId: string): Promise<Label[]> => {
|
||||
|
||||
@ -2,7 +2,6 @@ import { ExpressionToken, LiqeQuery } from '@omnivore/liqe'
|
||||
import { DateTime } from 'luxon'
|
||||
import { DeepPartial, FindOptionsWhere, ObjectLiteral } from 'typeorm'
|
||||
import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity'
|
||||
import { EntityLabel } from '../entity/entity_label'
|
||||
import { Highlight } from '../entity/highlight'
|
||||
import { Label } from '../entity/label'
|
||||
import { LibraryItem, LibraryItemState } from '../entity/library_item'
|
||||
@ -20,6 +19,7 @@ import { libraryItemRepository } from '../repository/library_item'
|
||||
import { setRecentlySavedItemInRedis, wordsCount } from '../utils/helpers'
|
||||
import { logger } from '../utils/logger'
|
||||
import { parseSearchQuery } from '../utils/search'
|
||||
import { addLabelsToLibraryItem } from './labels'
|
||||
|
||||
enum ReadFilter {
|
||||
ALL = 'all',
|
||||
@ -1004,23 +1004,16 @@ export const batchUpdateLibraryItems = async (
|
||||
throw new Error('Labels are required for this action')
|
||||
}
|
||||
|
||||
const labelIds = labels.map((label) => label.id)
|
||||
const libraryItems = await queryBuilder.getMany()
|
||||
// add labels in library items
|
||||
const labelsToAdd = libraryItems.flatMap((libraryItem) =>
|
||||
labels
|
||||
.map((label) => ({
|
||||
labelId: label.id,
|
||||
libraryItemId: libraryItem.id,
|
||||
name: label.name,
|
||||
}))
|
||||
.filter((entityLabel) => {
|
||||
const existingLabel = libraryItem.labelNames?.find(
|
||||
(l) => l.toLowerCase() === entityLabel.name.toLowerCase()
|
||||
)
|
||||
return !existingLabel
|
||||
})
|
||||
)
|
||||
return tx.getRepository(EntityLabel).save(labelsToAdd)
|
||||
for (const libraryItem of libraryItems) {
|
||||
await addLabelsToLibraryItem(labelIds, libraryItem.id, userId)
|
||||
|
||||
libraryItem.labels = labels
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// generate raw sql because postgres doesn't support prepared statements in DO blocks
|
||||
@ -1132,3 +1125,36 @@ export const batchDelete = async (criteria: FindOptionsWhere<LibraryItem>) => {
|
||||
|
||||
return authTrx(async (t) => t.query(sql))
|
||||
}
|
||||
|
||||
export const findLibraryItemIdsByLabelId = async (
|
||||
labelId: string,
|
||||
userId: string
|
||||
) => {
|
||||
return authTrx(
|
||||
async (tx) => {
|
||||
// find library items have the label or have highlights with the label
|
||||
const result = (await tx.query(
|
||||
`
|
||||
SELECT library_item_id
|
||||
FROM (
|
||||
SELECT library_item_id
|
||||
FROM omnivore.entity_labels
|
||||
WHERE label_id = $1
|
||||
AND library_item_id IS NOT NULL
|
||||
UNION
|
||||
SELECT h.library_item_id
|
||||
FROM omnivore.highlight h
|
||||
INNER JOIN omnivore.entity_labels ON entity_labels.highlight_id = h.id
|
||||
WHERE label_id = $1
|
||||
AND highlight_id IS NOT NULL
|
||||
) AS combined_results
|
||||
`,
|
||||
[labelId]
|
||||
)) as { library_item_id: string }[]
|
||||
|
||||
return result.map((r) => r.library_item_id)
|
||||
},
|
||||
undefined,
|
||||
userId
|
||||
)
|
||||
}
|
||||
|
||||
@ -17,6 +17,12 @@ import {
|
||||
import { THUMBNAIL_JOB } from '../jobs/find_thumbnail'
|
||||
import { queueRSSRefreshFeedJob } from '../jobs/rss/refreshAllFeeds'
|
||||
import { TriggerRuleJobData, TRIGGER_RULE_JOB_NAME } from '../jobs/trigger_rule'
|
||||
import {
|
||||
UpdateHighlightData,
|
||||
UpdateLabelsData,
|
||||
UPDATE_HIGHLIGHT_JOB,
|
||||
UPDATE_LABELS_JOB,
|
||||
} from '../jobs/update_db'
|
||||
import { getBackendQueue } from '../queue-processor'
|
||||
import { redisDataSource } from '../redis_data_source'
|
||||
import { signFeatureToken } from '../services/features'
|
||||
@ -663,4 +669,41 @@ export const enqueueTriggerRuleJob = async (data: TriggerRuleJobData) => {
|
||||
})
|
||||
}
|
||||
|
||||
export const bulkEnqueueUpdateLabels = async (data: UpdateLabelsData[]) => {
|
||||
const queue = await getBackendQueue()
|
||||
if (!queue) {
|
||||
return []
|
||||
}
|
||||
|
||||
const jobs = data.map((d) => ({
|
||||
name: UPDATE_LABELS_JOB,
|
||||
data: d,
|
||||
opts: {
|
||||
priority: 1,
|
||||
},
|
||||
}))
|
||||
|
||||
try {
|
||||
return queue.addBulk(jobs)
|
||||
} catch (error) {
|
||||
logger.error('error enqueuing update labels jobs', error)
|
||||
return []
|
||||
}
|
||||
}
|
||||
|
||||
export const enqueueUpdateHighlight = async (data: UpdateHighlightData) => {
|
||||
const queue = await getBackendQueue()
|
||||
if (!queue) {
|
||||
return undefined
|
||||
}
|
||||
|
||||
try {
|
||||
return queue.add(UPDATE_HIGHLIGHT_JOB, data, {
|
||||
priority: 1,
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error('error enqueuing update highlight job', error)
|
||||
}
|
||||
}
|
||||
|
||||
export default createHttpTaskWithToken
|
||||
|
||||
@ -1,18 +1,24 @@
|
||||
import { DeepPartial } from 'typeorm'
|
||||
import { appDataSource } from '../src/data_source'
|
||||
import { EntityLabel, LabelSource } from '../src/entity/entity_label'
|
||||
import { Filter } from '../src/entity/filter'
|
||||
import { Highlight } from '../src/entity/highlight'
|
||||
import { Label } from '../src/entity/label'
|
||||
import { LibraryItem } from '../src/entity/library_item'
|
||||
import { Reminder } from '../src/entity/reminder'
|
||||
import { User } from '../src/entity/user'
|
||||
import { UserDeviceToken } from '../src/entity/user_device_tokens'
|
||||
import { getRepository, setClaims } from '../src/repository'
|
||||
import { authTrx, getRepository, setClaims } from '../src/repository'
|
||||
import { highlightRepository } from '../src/repository/highlight'
|
||||
import { userRepository } from '../src/repository/user'
|
||||
import { createUser } from '../src/services/create_user'
|
||||
import { saveLabelsInLibraryItem } from '../src/services/labels'
|
||||
import { createLibraryItem } from '../src/services/library_item'
|
||||
import { createDeviceToken } from '../src/services/user_device_tokens'
|
||||
import { generateFakeUuid } from './util'
|
||||
import {
|
||||
bulkEnqueueUpdateLabels,
|
||||
enqueueUpdateHighlight,
|
||||
} from '../src/utils/createTask'
|
||||
import { generateFakeUuid, waitUntilJobsDone } from './util'
|
||||
|
||||
export const createTestConnection = async (): Promise<void> => {
|
||||
appDataSource.setOptions({
|
||||
@ -121,3 +127,68 @@ export const createTestLibraryItem = async (
|
||||
|
||||
return createdItem
|
||||
}
|
||||
|
||||
export const saveLabelsInLibraryItem = async (
|
||||
labels: Label[],
|
||||
libraryItemId: string,
|
||||
userId: string,
|
||||
source: LabelSource = 'user'
|
||||
) => {
|
||||
await authTrx(
|
||||
async (tx) => {
|
||||
const repo = tx.getRepository(EntityLabel)
|
||||
|
||||
// delete existing labels
|
||||
await repo.delete({
|
||||
libraryItemId,
|
||||
})
|
||||
|
||||
// save new labels
|
||||
await repo.save(
|
||||
labels.map((l) => ({
|
||||
labelId: l.id,
|
||||
libraryItemId,
|
||||
source,
|
||||
}))
|
||||
)
|
||||
},
|
||||
undefined,
|
||||
userId
|
||||
)
|
||||
|
||||
// update labels in library item
|
||||
const jobs = await bulkEnqueueUpdateLabels([{ libraryItemId, userId }])
|
||||
|
||||
await waitUntilJobsDone(jobs)
|
||||
}
|
||||
|
||||
export const createHighlight = async (
|
||||
highlight: DeepPartial<Highlight>,
|
||||
libraryItemId: string,
|
||||
userId: string
|
||||
) => {
|
||||
const newHighlight = await authTrx(
|
||||
async (tx) => {
|
||||
const repo = tx.withRepository(highlightRepository)
|
||||
const newHighlight = await repo.createAndSave(highlight)
|
||||
return repo.findOneOrFail({
|
||||
where: { id: newHighlight.id },
|
||||
relations: {
|
||||
user: true,
|
||||
},
|
||||
})
|
||||
},
|
||||
undefined,
|
||||
userId
|
||||
)
|
||||
|
||||
const job = await enqueueUpdateHighlight({
|
||||
libraryItemId,
|
||||
userId,
|
||||
})
|
||||
if (job) {
|
||||
await waitUntilJobsDone([job])
|
||||
}
|
||||
|
||||
return newHighlight
|
||||
}
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
import { env } from '../src/env'
|
||||
import { redisDataSource } from '../src/redis_data_source'
|
||||
import { createTestConnection } from './db'
|
||||
import { startApolloServer } from './util'
|
||||
import { startApolloServer, startWorker } from './util'
|
||||
|
||||
export const mochaGlobalSetup = async () => {
|
||||
await createTestConnection()
|
||||
@ -10,6 +10,11 @@ export const mochaGlobalSetup = async () => {
|
||||
if (env.redis.cache.url) {
|
||||
await redisDataSource.initialize()
|
||||
console.log('redis connection created')
|
||||
|
||||
if (redisDataSource.workerRedisClient) {
|
||||
startWorker(redisDataSource.workerRedisClient)
|
||||
console.log('worker started')
|
||||
}
|
||||
}
|
||||
|
||||
await startApolloServer()
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
import { appDataSource } from '../src/data_source'
|
||||
import { env } from '../src/env'
|
||||
import { redisDataSource } from '../src/redis_data_source'
|
||||
import { stopApolloServer } from './util'
|
||||
import { stopApolloServer, stopWorker } from './util'
|
||||
|
||||
export const mochaGlobalTeardown = async () => {
|
||||
await stopApolloServer()
|
||||
@ -13,5 +13,10 @@ export const mochaGlobalTeardown = async () => {
|
||||
if (env.redis.cache.url) {
|
||||
await redisDataSource.shutdown()
|
||||
console.log('redis connection closed')
|
||||
|
||||
if (redisDataSource.workerRedisClient) {
|
||||
stopWorker()
|
||||
console.log('worker closed')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -21,12 +21,7 @@ import {
|
||||
} from '../../src/generated/graphql'
|
||||
import { getRepository } from '../../src/repository'
|
||||
import { createGroup, deleteGroup } from '../../src/services/groups'
|
||||
import { createHighlight } from '../../src/services/highlights'
|
||||
import {
|
||||
createLabel,
|
||||
deleteLabels,
|
||||
saveLabelsInLibraryItem,
|
||||
} from '../../src/services/labels'
|
||||
import { createLabel, deleteLabels } from '../../src/services/labels'
|
||||
import {
|
||||
createLibraryItem,
|
||||
createLibraryItems,
|
||||
@ -41,7 +36,12 @@ import {
|
||||
import { deleteUser } from '../../src/services/user'
|
||||
import * as createTask from '../../src/utils/createTask'
|
||||
import * as uploads from '../../src/utils/uploads'
|
||||
import { createTestLibraryItem, createTestUser } from '../db'
|
||||
import {
|
||||
createHighlight,
|
||||
createTestLibraryItem,
|
||||
createTestUser,
|
||||
saveLabelsInLibraryItem,
|
||||
} from '../db'
|
||||
import { generateFakeUuid, graphqlRequest, request } from '../util'
|
||||
|
||||
chai.use(chaiString)
|
||||
|
||||
@ -2,7 +2,9 @@ import * as chai from 'chai'
|
||||
import { expect } from 'chai'
|
||||
import chaiString from 'chai-string'
|
||||
import 'mocha'
|
||||
import { Highlight } from '../../src/entity/highlight'
|
||||
import { User } from '../../src/entity/user'
|
||||
import { getRepository } from '../../src/repository'
|
||||
import {
|
||||
createHighlight,
|
||||
deleteHighlightById,
|
||||
@ -11,7 +13,12 @@ import {
|
||||
import { createLabel, saveLabelsInHighlight } from '../../src/services/labels'
|
||||
import { deleteUser } from '../../src/services/user'
|
||||
import { createTestLibraryItem, createTestUser } from '../db'
|
||||
import { generateFakeShortId, generateFakeUuid, graphqlRequest, request } from '../util'
|
||||
import {
|
||||
generateFakeShortId,
|
||||
generateFakeUuid,
|
||||
graphqlRequest,
|
||||
request,
|
||||
} from '../util'
|
||||
|
||||
chai.use(chaiString)
|
||||
|
||||
@ -281,7 +288,7 @@ describe('Highlights API', () => {
|
||||
itemId,
|
||||
newHighlightId,
|
||||
newShortHighlightId,
|
||||
[highlightId],
|
||||
[highlightId]
|
||||
)
|
||||
const res = await graphqlRequest(query, authToken).expect(200)
|
||||
|
||||
|
||||
@ -1,11 +1,15 @@
|
||||
import { ConnectionOptions, Job, QueueEvents, Worker } from 'bullmq'
|
||||
import { nanoid } from 'nanoid'
|
||||
import supertest from 'supertest'
|
||||
import { v4 } from 'uuid'
|
||||
import { createWorker, QUEUE_NAME } from '../src/queue-processor'
|
||||
import { createApp } from '../src/server'
|
||||
import { corsConfig } from '../src/utils/corsConfig'
|
||||
|
||||
const { app, apollo } = createApp()
|
||||
export const request = supertest(app)
|
||||
let worker: Worker
|
||||
let queueEvents: QueueEvents
|
||||
|
||||
export const startApolloServer = async () => {
|
||||
await apollo.start()
|
||||
@ -16,6 +20,22 @@ export const stopApolloServer = async () => {
|
||||
await apollo.stop()
|
||||
}
|
||||
|
||||
export const startWorker = async (connection: ConnectionOptions) => {
|
||||
worker = createWorker(connection)
|
||||
queueEvents = new QueueEvents(QUEUE_NAME, {
|
||||
connection,
|
||||
})
|
||||
}
|
||||
|
||||
export const stopWorker = async () => {
|
||||
queueEvents.close()
|
||||
worker.close()
|
||||
}
|
||||
|
||||
export const waitUntilJobsDone = async (jobs: Job[]) => {
|
||||
await Promise.all(jobs.map((job) => job.waitUntilFinished(queueEvents)))
|
||||
}
|
||||
|
||||
export const graphqlRequest = (
|
||||
query: string,
|
||||
authToken: string,
|
||||
|
||||
Reference in New Issue
Block a user