update entity subscriber

This commit is contained in:
Hongbo Wu
2023-08-30 10:25:26 +08:00
parent 648a16615b
commit 5dce78eedb
11 changed files with 303 additions and 100 deletions

View File

@ -80,7 +80,7 @@ export class Highlight {
@Column('text', { nullable: true })
color?: string | null
@ManyToMany(() => Label)
@ManyToMany(() => Label, { cascade: true })
@JoinTable({
name: 'entity_labels',
joinColumn: { name: 'highlight_id' },

View File

@ -197,4 +197,16 @@ export class LibraryItem {
inverseJoinColumn: { name: 'id' },
})
highlights?: Highlight[]
@Column('text', { nullable: true })
labelNames?: string[] | null
@Column('text', { nullable: true })
highlightLabels?: string[] | null
@Column('text', { nullable: true })
highlightAnnotations?: string[] | null
@Column('text', { nullable: true })
note?: string | null
}

View File

@ -1,26 +0,0 @@
import {
CreateDateColumn,
Entity,
JoinColumn,
ManyToOne,
PrimaryGeneratedColumn,
} from 'typeorm'
import { Link } from './link'
import { Label } from './label'
@Entity({ name: 'link_labels' })
export class LinkLabel {
@PrimaryGeneratedColumn('uuid')
id!: string
@ManyToOne(() => Link)
@JoinColumn({ name: 'link_id' })
link!: Link
@ManyToOne(() => Label)
@JoinColumn({ name: 'label_id' })
label!: Label
@CreateDateColumn()
createdAt!: Date
}

View File

@ -0,0 +1,103 @@
import {
EntityManager,
EntitySubscriberInterface,
EventSubscriber,
InsertEvent,
ObjectLiteral,
RemoveEvent,
UpdateEvent,
} from 'typeorm'
import { Highlight } from '../entity/highlight'
import { Label } from '../entity/label'
import { LibraryItem } from '../entity/library_item'
import { createPubSubClient, EntityType } from '../pubsub'
@EventSubscriber()
export class HighlightSubscriber
implements EntitySubscriberInterface<Highlight>
{
private readonly pubsubClient = createPubSubClient()
async updateLibraryItem(manager: EntityManager, libraryItemId: string) {
// get all the highlights belonging to the library_item
const highlights = await manager.getRepository(Highlight).find({
where: { libraryItem: { id: libraryItemId } },
relations: {
labels: true,
},
})
const highlightLabels: string[] = []
const highlightAnnotations: string[] = []
// for each highlight, add the lowercased label names to highlight_labels
// and the annotation to highlight_annotations
highlights.forEach((highlight) => {
highlight.labels &&
highlightLabels.push(
...highlight.labels.map((label) => label.name.toLowerCase())
)
highlightAnnotations.push(highlight.annotation || '')
})
// update highlight_labels and highlight_annotations on library_item
await manager.update(LibraryItem, libraryItemId, {
highlightAnnotations,
highlightLabels,
})
}
listenTo() {
return Highlight
}
async afterInsert(event: InsertEvent<Highlight>): Promise<void> {
await this.updateLibraryItem(event.manager, event.entity.libraryItem.id)
await this.pubsubClient.entityCreated<Highlight>(
EntityType.HIGHLIGHT,
event.entity,
event.entity.libraryItem.user.id
)
}
async afterUpdate(event: UpdateEvent<Highlight>): Promise<void> {
if (event.entity) {
await this.updateLibraryItem(
event.manager,
event.databaseEntity.libraryItem.id
)
// publish update event
await this.pubsubClient.entityUpdated<ObjectLiteral>(
EntityType.HIGHLIGHT,
{ ...event.entity, libraryItem: event.databaseEntity.libraryItem },
event.databaseEntity.libraryItem.user.id
)
// publish label added event if a label was added
if (event.entity.labels) {
await this.pubsubClient.entityCreated<Label>(
EntityType.LABEL,
event.entity.labels,
event.entity.libraryItem.user.id
)
}
}
}
async afterRemove(event: RemoveEvent<Highlight>): Promise<void> {
if (event.entityId) {
await this.updateLibraryItem(
event.manager,
event.databaseEntity.libraryItem.id
)
await this.pubsubClient.entityDeleted(
EntityType.HIGHLIGHT,
event.entityId,
event.databaseEntity.libraryItem.user.id
)
}
}
}

View File

@ -0,0 +1,82 @@
import {
EntitySubscriberInterface,
EventSubscriber,
InsertEvent,
ObjectLiteral,
UpdateEvent,
} from 'typeorm'
import { Label } from '../entity/label'
import { LibraryItem, LibraryItemState } from '../entity/library_item'
import { createPubSubClient, EntityType } from '../pubsub'
@EventSubscriber()
export class LibraryItemSubscriber
implements EntitySubscriberInterface<LibraryItem>
{
private readonly pubsubClient = createPubSubClient()
listenTo() {
return LibraryItem
}
async afterInsert(event: InsertEvent<LibraryItem>): Promise<void> {
// Only publish the event if the library item has been successfully created
if (event.entity.state === LibraryItemState.Succeeded) {
await this.pubsubClient.entityCreated<LibraryItem>(
EntityType.PAGE,
event.entity,
event.entity.user.id
)
}
}
async afterUpdate(event: UpdateEvent<LibraryItem>): Promise<void> {
if (event.entity) {
// publish delete event if the library item has been deleted
if (event.entity.state === LibraryItemState.Deleted) {
return this.pubsubClient.entityDeleted(
EntityType.PAGE,
event.databaseEntity.id,
event.databaseEntity.user.id
)
}
// publish create event if the library item has finished processing
if (
event.databaseEntity.state === LibraryItemState.Processing &&
event.entity.state === LibraryItemState.Succeeded
) {
return this.pubsubClient.entityCreated<LibraryItem>(
EntityType.PAGE,
{
...event.databaseEntity,
...event.entity,
},
event.databaseEntity.user.id
)
}
// publish update event for all other cases
await this.pubsubClient.entityUpdated<ObjectLiteral>(
EntityType.PAGE,
event.entity,
event.databaseEntity.user.id
)
// publish label added event if a label was added
if (event.entity.labels) {
await event.manager
.getRepository(LibraryItem)
.update(event.databaseEntity.id, {
labelNames: event.entity.labels,
})
await this.pubsubClient.entityCreated<Label>(
EntityType.LABEL,
event.entity.labels,
event.databaseEntity.user.id
)
}
}
}
}

View File

@ -1,9 +1,30 @@
import { EntityManager, EntityTarget, Repository } from 'typeorm'
import { AppDataSource } from '../data-source'
import { ApiKey } from '../entity/api_key'
import { Feature } from '../entity/feature'
import { Filter } from '../entity/filter'
import { Follower } from '../entity/follower'
import { Group } from '../entity/groups/group'
import { GroupMembership } from '../entity/groups/group_membership'
import { Invite } from '../entity/groups/invite'
import { Highlight } from '../entity/highlight'
import { Integration } from '../entity/integration'
import { Label } from '../entity/label'
import { LibraryItem } from '../entity/library_item'
import { NewsletterEmail } from '../entity/newsletter_email'
import { Profile } from '../entity/profile'
import { ReceivedEmail } from '../entity/received_email'
import { Recommendation } from '../entity/recommendation'
import { Reminder } from '../entity/reminder'
import { AbuseReport } from '../entity/reports/abuse_report'
import { ContentDisplayReport } from '../entity/reports/content_display_report'
import { Rule } from '../entity/rule'
import { Subscription } from '../entity/subscription'
import { UploadFile } from '../entity/upload_file'
import { User } from '../entity/user'
import { UserDeviceToken } from '../entity/user_device_tokens'
import { UserPersonalization } from '../entity/user_personalization'
import { Webhook } from '../entity/webhook'
export const setClaims = async (
t: EntityManager,
@ -19,9 +40,31 @@ export const getRepository = <T>(entity: EntityTarget<T>): Repository<T> => {
return entityManager.getRepository(entity)
}
export const entityManager = AppDataSource.createEntityManager()
export const entityManager = AppDataSource.manager
export const userRepository = getRepository(User)
export const uploadFileRepository = getRepository(UploadFile)
export const reminderRepository = getRepository(Reminder)
export const libraryItemRepository = getRepository(LibraryItem)
export const groupMembershipRepository = getRepository(GroupMembership)
export const groupRepository = getRepository(Group)
export const inviteRepository = getRepository(Invite)
export const abuseReportRepository = getRepository(AbuseReport)
export const contentDisplayReportRepository =
getRepository(ContentDisplayReport)
export const apiKeyRepository = getRepository(ApiKey)
export const featureRepository = getRepository(Feature)
export const filterRepository = getRepository(Filter)
export const followerRepository = getRepository(Follower)
export const highlightRepository = getRepository(Highlight)
export const integrationRepository = getRepository(Integration)
export const labelRepository = getRepository(Label)
export const newsletterEmailRepository = getRepository(NewsletterEmail)
export const profileRepository = getRepository(Profile)
export const receivedEmailRepository = getRepository(ReceivedEmail)
export const recommendationRepository = getRepository(Recommendation)
export const ruleRepository = getRepository(Rule)
export const subscriptionRepository = getRepository(Subscription)
export const userDeviceTokenRepository = getRepository(UserDeviceToken)
export const userPersonalizationRepository = getRepository(UserPersonalization)
export const webhookRepository = getRepository(Webhook)

View File

@ -70,7 +70,7 @@ import {
} from '../../services/labels'
import { searchLibraryItems } from '../../services/library_item'
import { setFileUploadComplete } from '../../services/save_file'
import { parsedContentToPage } from '../../services/save_page'
import { parsedContentToLibraryItem } from '../../services/save_page'
import { traceAs } from '../../tracing'
import { Merge } from '../../util'
import { analytics } from '../../utils/analytics'
@ -295,7 +295,7 @@ export const createArticleResolver = authorized<
const saveTime = new Date()
const slug = generateSlug(parsedContent?.title || croppedPathname)
const articleToSave = parsedContentToPage({
const articleToSave = parsedContentToLibraryItem({
url,
title,
parsedContent,

View File

@ -8,7 +8,7 @@ import {
SaveErrorCode,
SaveSuccess,
} from '../../generated/graphql'
import { getRepository } from '../../repository'
import { getRepository, userRepository } from '../../repository'
import { saveFile } from '../../services/save_file'
import { savePage } from '../../services/save_page'
import { saveUrl } from '../../services/save_url'
@ -31,7 +31,7 @@ export const savePageResolver = authorized<
},
})
const user = await getRepository(User).findOneBy({
const user = await userRepository.findOneBy({
id: ctx.uid,
})
if (!user) {
@ -61,7 +61,7 @@ export const saveUrlResolver = authorized<
},
})
const user = await getRepository(User).findOneBy({
const user = await userRepository.findOneBy({
id: uid,
})
if (!user) {
@ -87,7 +87,7 @@ export const saveFileResolver = authorized<
},
})
const user = await getRepository(User).findOneBy({
const user = await userRepository.findOneBy({
id: ctx.uid,
})
if (!user) {

View File

@ -193,3 +193,7 @@ export const getTopUsers = async (): Promise<User[]> => {
.limit(MAX_RECORDS_LIMIT)
.getMany()
}
export const getUserById = async (id: string): Promise<User | null> => {
return userRepository.findOneBy({ id })
}

View File

@ -281,9 +281,3 @@ export const searchLibraryItems = async (
return [libraryItems, count]
}
export const libraryItemToSearchItem = (
libraryItem: LibraryItem,
userId: string
): SearchItem => {

View File

@ -1,10 +1,14 @@
import { Readability } from '@omnivore/readability'
import { addHighlightToPage } from '../elastic/highlights'
import { createPage, getPageByParam, updatePage } from '../elastic/pages'
import { ArticleSavingRequestStatus, Page, PageType } from '../elastic/types'
import { DeepPartial } from 'typeorm'
import {
LibraryItem,
LibraryItemState,
LibraryItemType,
} from '../entity/library_item'
import { User } from '../entity/user'
import { homePageURL } from '../env'
import {
ArticleSavingRequestStatus,
HighlightType,
Maybe,
PreparedDocumentInput,
@ -12,6 +16,7 @@ import {
SavePageInput,
SaveResult,
} from '../generated/graphql'
import { libraryItemRepository } from '../repository'
import { WithDataSourcesContext } from '../resolvers/types'
import { enqueueThumbnailTask } from '../utils/createTask'
import {
@ -26,6 +31,7 @@ import { logger } from '../utils/logger'
import { parsePreparedContent } from '../utils/parser'
import { createPageSaveRequest } from './create_page_save_request'
import { createLabels } from './labels'
import { createLibraryItem } from './library_item'
// where we can use APIs to fetch their underlying content.
const FORCE_PUPPETEER_URLS = [
@ -72,7 +78,7 @@ export const savePage = async (
const [newSlug, croppedPathname] = createSlug(input.url, input.title)
let slug = newSlug
let pageId = input.clientRequestId
const articleToSave = parsedContentToPage({
const itemToSave = parsedContentToLibraryItem({
url: input.url,
title: input.title,
userId: user.id,
@ -80,7 +86,7 @@ export const savePage = async (
slug,
croppedPathname,
parsedContent: parseResult.parsedContent,
pageType: parseResult.pageType,
itemType: parseResult.pageType as unknown as LibraryItemType,
originalHtml: parseResult.domContent,
canonicalUrl: parseResult.canonicalUrl,
rssFeedUrl: input.rssFeedUrl,
@ -89,10 +95,10 @@ export const savePage = async (
})
// save state
articleToSave.archivedAt =
const archivedAt =
input.state === ArticleSavingRequestStatus.Archived ? new Date() : null
// add labels to page
articleToSave.labels = input.labels
const labels = input.labels
? await createLabels(ctx, input.labels)
: undefined
@ -103,11 +109,11 @@ export const savePage = async (
try {
await createPageSaveRequest({
userId: user.id,
url: articleToSave.url,
url: itemToSave.originalUrl!,
pubsub: ctx.pubsub,
articleSavingRequestId: input.clientRequestId,
archivedAt: articleToSave.archivedAt,
labels: articleToSave.labels,
archivedAt,
labels,
})
} catch (e) {
return {
@ -117,15 +123,15 @@ export const savePage = async (
}
} else {
// check if the page already exists
const existingPage = await getPageByParam({
userId: user.id,
url: articleToSave.url,
const existingLibraryItem = await libraryItemRepository.findOne({
where: { user: { id: user.id }, originalUrl: itemToSave.originalUrl },
relations: ['subscriptions'],
})
if (existingPage) {
if (existingLibraryItem) {
// we don't want to update an rss feed page if rss-feeder is tring to re-save it
if (
existingPage.rssFeedUrl &&
existingPage.rssFeedUrl === input.rssFeedUrl
existingLibraryItem.subscription &&
existingLibraryItem.subscription.url === input.rssFeedUrl
) {
return {
clientRequestId: pageId,
@ -133,21 +139,9 @@ export const savePage = async (
}
}
pageId = existingPage.id
slug = existingPage.slug
if (
!(await updatePage(
existingPage.id,
{
// update the page with the new content
...articleToSave,
id: pageId, // we don't want to update the id
slug, // we don't want to update the slug
createdAt: existingPage.createdAt, // we don't want to update the createdAt
},
ctx
))
) {
pageId = existingLibraryItem.id
slug = existingLibraryItem.slug
if (!(await libraryItemRepository.save(itemToSave))) {
return {
errorCodes: [SaveErrorCode.Unknown],
message: 'Failed to update existing page',
@ -155,10 +149,7 @@ export const savePage = async (
}
} else {
// do not publish a pubsub event if the page is imported
const newPageId = await createPage(articleToSave, {
...ctx,
shouldPublish: !isImported,
})
const newPageId = await createLibraryItem(itemToSave)
if (!newPageId) {
return {
errorCodes: [SaveErrorCode.Unknown],
@ -204,8 +195,8 @@ export const savePage = async (
}
}
// convert parsed content to an elastic page
export const parsedContentToPage = ({
// convert parsed content to an library item
export const parsedContentToLibraryItem = ({
url,
userId,
originalHtml,
@ -216,7 +207,7 @@ export const parsedContentToPage = ({
title,
preparedDocument,
canonicalUrl,
pageType,
itemType,
uploadFileHash,
uploadFileId,
saveTime,
@ -227,7 +218,7 @@ export const parsedContentToPage = ({
userId: string
slug: string
croppedPathname: string
pageType: PageType
itemType: LibraryItemType
parsedContent: Readability.ParseResult | null
originalHtml?: string | null
pageId?: string | null
@ -239,13 +230,13 @@ export const parsedContentToPage = ({
saveTime?: Date
rssFeedUrl?: string | null
publishedAt?: Date | null
}): Page => {
}): DeepPartial<LibraryItem> => {
return {
id: pageId || '',
id: pageId ?? undefined,
slug,
userId,
originalHtml,
content: parsedContent?.content || '',
user: { id: userId },
originalContent: originalHtml,
readableContent: parsedContent?.content || '',
description: parsedContent?.excerpt,
title:
title ||
@ -254,24 +245,24 @@ export const parsedContentToPage = ({
croppedPathname ||
parsedContent?.siteName ||
url,
author: parsedContent?.byline ?? undefined,
url: cleanUrl(canonicalUrl || url),
pageType,
hash: uploadFileHash || stringToHash(parsedContent?.content || url),
image: parsedContent?.previewImage ?? undefined,
author: parsedContent?.byline,
originalUrl: cleanUrl(canonicalUrl || url),
itemType,
textContentHash:
uploadFileHash || stringToHash(parsedContent?.content || url),
thumbnail: parsedContent?.previewImage ?? undefined,
publishedAt: validatedDate(
publishedAt || parsedContent?.publishedDate || undefined
),
uploadFileId,
readingProgressPercent: 0,
readingProgressAnchorIndex: 0,
state: ArticleSavingRequestStatus.Succeeded,
createdAt: validatedDate(saveTime) || new Date(),
savedAt: validatedDate(saveTime) || new Date(),
siteName: parsedContent?.siteName ?? undefined,
language: parsedContent?.language ?? undefined,
siteIcon: parsedContent?.siteIcon ?? undefined,
wordsCount: wordsCount(parsedContent?.textContent || ''),
rssFeedUrl: rssFeedUrl || undefined,
uploadFile: { id: uploadFileId ?? undefined },
readingProgressTopPercent: 0,
readingProgressHighestReadAnchor: 0,
state: LibraryItemState.Succeeded,
createdAt: validatedDate(saveTime),
savedAt: validatedDate(saveTime),
siteName: parsedContent?.siteName,
itemLanguage: parsedContent?.language,
siteIcon: parsedContent?.siteIcon,
wordCount: wordsCount(parsedContent?.textContent || ''),
}
}