From f603c84b270c17f6820c7d4e7fe804e8fb13f7c1 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Mon, 27 May 2024 17:38:36 +0800 Subject: [PATCH] store only feed item id and type in redis and fetch the details from DB when pagination API is called --- packages/api/src/apollo.ts | 2 + packages/api/src/generated/graphql.ts | 10 +- packages/api/src/generated/schema.graphql | 5 +- packages/api/src/jobs/score_library_item.ts | 2 + .../api/src/jobs/update_just_read_feed.ts | 150 ++++++++++-------- .../api/src/resolvers/function_resolvers.ts | 11 ++ .../api/src/resolvers/just_read_feed/index.ts | 23 ++- packages/api/src/resolvers/types.ts | 2 + packages/api/src/schema.ts | 5 +- packages/api/src/services/just_read_feed.ts | 59 +++++++ packages/api/src/services/score.ts | 50 +++--- packages/api/src/services/subscriptions.ts | 12 +- 12 files changed, 227 insertions(+), 104 deletions(-) diff --git a/packages/api/src/apollo.ts b/packages/api/src/apollo.ts index 11008c18b..76d7681be 100644 --- a/packages/api/src/apollo.ts +++ b/packages/api/src/apollo.ts @@ -32,6 +32,7 @@ import { ClaimsToSet, RequestContext, ResolverContext } from './resolvers/types' import ScalarResolvers from './scalars' import typeDefs from './schema' import { batchGetHighlightsFromLibraryItemIds } from './services/highlights' +import { batchGetJustReadFeedItems } from './services/just_read_feed' import { batchGetLabelsFromLibraryItemIds } from './services/labels' import { batchGetRecommendationsFromLibraryItemIds } from './services/recommendation' import { @@ -112,6 +113,7 @@ const contextFunc: ContextFunction = async ({ batchGetRecommendationsFromLibraryItemIds ), uploadFiles: new DataLoader(batchGetUploadFilesByIds), + justReadFeedItems: new DataLoader(batchGetJustReadFeedItems), }, } diff --git a/packages/api/src/generated/graphql.ts b/packages/api/src/generated/graphql.ts index b3e7b91c3..99b993511 100644 --- a/packages/api/src/generated/graphql.ts +++ b/packages/api/src/generated/graphql.ts @@ -1391,11 +1391,10 @@ export type JustReadFeedItem = { canShare?: Maybe; date: Scalars['Date']; dir?: Maybe; - highlights?: Maybe; id: Scalars['ID']; - likedCount?: Maybe; + likeCount?: Maybe; previewContent?: Maybe; - savedCount?: Maybe; + saveCount?: Maybe; seen_at?: Maybe; subscription: JustReadFeedSubscription; thumbnail?: Maybe; @@ -6103,11 +6102,10 @@ export type JustReadFeedItemResolvers, ParentType, ContextType>; date?: Resolver; dir?: Resolver, ParentType, ContextType>; - highlights?: Resolver, ParentType, ContextType>; id?: Resolver; - likedCount?: Resolver, ParentType, ContextType>; + likeCount?: Resolver, ParentType, ContextType>; previewContent?: Resolver, ParentType, ContextType>; - savedCount?: Resolver, ParentType, ContextType>; + saveCount?: Resolver, ParentType, ContextType>; seen_at?: Resolver, ParentType, ContextType>; subscription?: Resolver; thumbnail?: Resolver, ParentType, ContextType>; diff --git a/packages/api/src/generated/schema.graphql b/packages/api/src/generated/schema.graphql index 6a31ae89e..c79c05d23 100644 --- a/packages/api/src/generated/schema.graphql +++ b/packages/api/src/generated/schema.graphql @@ -1253,11 +1253,10 @@ type JustReadFeedItem { canShare: Boolean date: Date! dir: String - highlights: String id: ID! - likedCount: Int + likeCount: Int previewContent: String - savedCount: Int + saveCount: Int seen_at: Date subscription: JustReadFeedSubscription! thumbnail: String diff --git a/packages/api/src/jobs/score_library_item.ts b/packages/api/src/jobs/score_library_item.ts index 5708316c6..3e001bf45 100644 --- a/packages/api/src/jobs/score_library_item.ts +++ b/packages/api/src/jobs/score_library_item.ts @@ -53,6 +53,7 @@ export const scoreLibraryItem = async ( author: libraryItem.author, language: lanaugeToCode(libraryItem.itemLanguage || 'English'), word_count: libraryItem.wordCount, + published_at: libraryItem.publishedAt, } as Feature, } @@ -77,4 +78,5 @@ export const scoreLibraryItem = async ( undefined, true ) + logger.info('Library item scored', data) } diff --git a/packages/api/src/jobs/update_just_read_feed.ts b/packages/api/src/jobs/update_just_read_feed.ts index e2d5c5e38..267dfcfb3 100644 --- a/packages/api/src/jobs/update_just_read_feed.ts +++ b/packages/api/src/jobs/update_just_read_feed.ts @@ -1,11 +1,12 @@ import { LibraryItem } from '../entity/library_item' import { PublicItem } from '../entity/public_item' +import { Subscription } from '../entity/subscription' import { User } from '../entity/user' -import { JustReadFeedSection } from '../generated/graphql' import { redisDataSource } from '../redis_data_source' import { findUnseenPublicItems } from '../services/just_read_feed' import { searchLibraryItems } from '../services/library_item' import { Feature, getScores, ScoreApiResponse } from '../services/score' +import { findSubscriptionsByNames } from '../services/subscriptions' import { findActiveUser } from '../services/user' import { lanaugeToCode } from '../utils/helpers' import { logger } from '../utils/logger' @@ -16,10 +17,11 @@ export interface UpdateJustReadFeedJobData { userId: string } -interface FeedItem { +interface Candidate { id: string title: string url: string + type: string thumbnail?: string previewContent?: string languageCode: string @@ -27,27 +29,36 @@ interface FeedItem { dir: string date: Date topic?: string - wordCount?: number + wordCount: number siteIcon?: string siteName?: string - saveCount?: number - likeCount?: number - broadcastCount?: number folder?: string - subscriptionType: string score?: number publishedAt?: Date - subscription: { - id: string + subscription?: { name: string - icon?: string + type: string } } -const libraryItemToCandidate = (user: User, item: LibraryItem): FeedItem => ({ +interface Item { + id: string + type: string +} + +interface Section { + items: Array + layout: string +} + +const libraryItemToCandidate = ( + item: LibraryItem, + subscriptions: Array +): Candidate => ({ id: item.id, title: item.title, url: item.originalUrl, + type: 'library_item', thumbnail: item.thumbnail || undefined, previewContent: item.description || undefined, languageCode: lanaugeToCode(item.itemLanguage || 'English'), @@ -55,24 +66,24 @@ const libraryItemToCandidate = (user: User, item: LibraryItem): FeedItem => ({ dir: item.directionality || 'ltr', date: item.createdAt, topic: item.topic, - wordCount: item.wordCount || undefined, + wordCount: item.wordCount!, siteName: item.siteName || undefined, siteIcon: item.siteIcon || undefined, folder: item.folder, - subscriptionType: 'library', score: item.score, publishedAt: item.publishedAt || undefined, - subscription: { - id: user.id, - name: user.name, - icon: user.profile.pictureUrl || undefined, - }, + subscription: subscriptions.find( + (subscription) => + subscription.name === item.subscription || + subscription.url === item.subscription + ), }) -const publicItemToCandidate = (item: PublicItem): FeedItem => ({ +const publicItemToCandidate = (item: PublicItem): Candidate => ({ id: item.id, title: item.title, url: item.url, + type: 'public_item', thumbnail: item.thumbnail, previewContent: item.previewContent, languageCode: item.languageCode || 'en', @@ -80,38 +91,42 @@ const publicItemToCandidate = (item: PublicItem): FeedItem => ({ dir: item.dir || 'ltr', date: item.createdAt, topic: item.topic, - wordCount: item.wordCount, - siteName: item.siteName, - saveCount: item.stats.saveCount, - likeCount: item.stats.likeCount, - broadcastCount: item.stats.broadcastCount, + wordCount: item.wordCount!, siteIcon: item.siteIcon, - subscriptionType: item.type, publishedAt: item.publishedAt, subscription: { - id: item.source.id, name: item.source.name, - icon: item.source.icon, + type: item.source.type, }, }) -const selectCandidates = async (user: User): Promise> => { +const selectCandidates = async (user: User): Promise> => { const userId = user.id // get last 100 library items saved and not seen by user const libraryItems = await searchLibraryItems( { size: 100, includeContent: false, - query: `-is:seen`, + query: `-is:seen wordsCount:>0`, }, userId ) logger.info(`Found ${libraryItems.length} library items`) + // get subscriptions for the library items + const subscriptionNames = libraryItems + .filter((item) => !!item.subscription) + .map((item) => item.subscription as string) + + const subscriptions = await findSubscriptionsByNames( + userId, + subscriptionNames + ) + // map library items to candidates and limit to 70 - const privateCandidates: Array = libraryItems - .map((libraryItem) => libraryItemToCandidate(user, libraryItem)) + const privateCandidates: Array = libraryItems + .map((item) => libraryItemToCandidate(item, subscriptions)) .slice(0, 70) const privateCandidatesSize = privateCandidates.length @@ -125,7 +140,7 @@ const selectCandidates = async (user: User): Promise> => { logger.info(`Found ${publicItems.length} public items`) // map public items to candidates and limit to the remaining vacancies - const publicCandidates: Array = publicItems + const publicCandidates: Array = publicItems .map(publicItemToCandidate) .slice(0, 100 - privateCandidatesSize) @@ -138,8 +153,8 @@ const selectCandidates = async (user: User): Promise> => { const rankCandidates = async ( userId: string, - candidates: Array -): Promise> => { + candidates: Array +): Promise> => { if (candidates.length <= 10) { // no need to rank if there are less than 10 candidates return candidates @@ -155,13 +170,13 @@ const rankCandidates = async ( acc[item.id] = { title: item.title, has_thumbnail: !!item.thumbnail, - has_site_icon: !!item.subscription.icon, + has_site_icon: !!item.siteIcon, saved_at: item.date, site: item.siteName, language: item.languageCode, directionality: item.dir, - folder: item.subscription.name, - subscription_type: item.subscriptionType, + folder: item.folder, + subscription_type: item.subscription?.type, author: item.author, word_count: item.wordCount, published_at: item.publishedAt, @@ -196,8 +211,8 @@ const MAX_FEED_ITEMS = 500 export const getJustReadFeedSections = async ( userId: string, limit: number, - minScore?: number -): Promise> => { + maxScore?: number +): Promise> => { const redisClient = redisDataSource.redisClient if (!redisClient) { throw new Error('Redis client not available') @@ -211,8 +226,8 @@ export const getJustReadFeedSections = async ( // response is an array of [member1, score1, member2, score2, ...] const results = await redisClient.zrevrangebyscore( key, - '+inf', - minScore || '-inf', + maxScore ? maxScore - 1 : '+inf', + '-inf', 'WITHSCORES', 'LIMIT', 0, @@ -221,7 +236,7 @@ export const getJustReadFeedSections = async ( const sections = [] for (let i = 0; i < results.length; i += 2) { - const member = JSON.parse(results[i]) as JustReadFeedSection + const member = JSON.parse(results[i]) as Section const score = Number(results[i + 1]) sections.push({ member, score }) } @@ -231,7 +246,7 @@ export const getJustReadFeedSections = async ( const appendSectionsToFeed = async ( userId: string, - sections: Array + sections: Array
) => { const redisClient = redisDataSource.redisClient if (!redisClient) { @@ -243,8 +258,8 @@ const appendSectionsToFeed = async ( // store candidates in redis sorted set const pipeline = redisClient.pipeline() - const scoreMembers = sections.flatMap((section) => [ - Date.now() + 86_400_000, // sections expire in 24 hours + const scoreMembers = sections.flatMap((section, index) => [ + Date.now() + index + 86_400_000, // sections expire in 24 hours JSON.stringify(section), ]) // add section to the sorted set @@ -258,32 +273,33 @@ const appendSectionsToFeed = async ( await pipeline.exec() } -const mixFeedItems = ( - rankedFeedItems: Array -): Array => { +const mixFeedItems = (rankedFeedItems: Array): Array
=> { // find the median word count - const wordCounts = rankedFeedItems.map((item) => item.wordCount || 0) - wordCounts.sort() + const wordCounts = rankedFeedItems.map((item) => item.wordCount) + wordCounts.sort((a, b) => a - b) const medianWordCount = wordCounts[Math.floor(wordCounts.length / 2)] // separate items into two groups based on word count - const shortItems: Array = [] - const longItems: Array = [] + const shortItems: Array = [] + const longItems: Array = [] for (const item of rankedFeedItems) { - if (item.wordCount && item.wordCount < medianWordCount) { + if (item.wordCount < medianWordCount) { shortItems.push(item) } else { longItems.push(item) } } // initialize empty batches - const batches = [[]] + const batches: Array> = Array.from( + { length: Math.floor(rankedFeedItems.length / 10) }, + () => [] + ) - const checkConstraints = (batch: Array, item: FeedItem) => { + const checkConstraints = (batch: Array, item: Candidate) => { const titleCount = batch.filter((i) => i.title === item.title).length const authorCount = batch.filter((i) => i.author === item.author).length const siteCount = batch.filter((i) => i.siteName === item.siteName).length const subscriptionCount = batch.filter( - (i) => i.subscription.name === item.subscription.name + (i) => i.subscription?.name === item.subscription?.name ).length return ( @@ -295,8 +311,8 @@ const mixFeedItems = ( } const distributeItems = ( - items: Array, - batches: Array> + items: Array, + batches: Array> ) => { for (const item of items) { let added = false @@ -310,7 +326,7 @@ const mixFeedItems = ( if (!added) { for (const batch of batches) { - if (batch.length < 5) { + if (batch.length < 10) { batch.push(item) break } @@ -328,15 +344,23 @@ const mixFeedItems = ( for (const batch of batches) { // create a section for each long item for (let i = 0; i < 5; i++) { - const section: JustReadFeedSection = { - items: [batch[i]], + const section: Section = { + items: [ + { + id: batch[i].id, + type: batch[i].type, + }, + ], layout: 'long', } sections.push(section) } // create a section for short items sections.push({ - items: batch.slice(5), + items: batch.slice(5).map((item) => ({ + id: item.id, + type: item.type, + })), layout: 'quick links', }) } @@ -372,7 +396,7 @@ export const updateJustReadFeed = async (data: UpdateJustReadFeedJobData) => { const rankedSections = mixFeedItems(rankedCandidates) logger.info(`Created ${rankedSections.length} sections`) - logger.info('Appending feed items to feed') + logger.info('Appending sections to feed') await appendSectionsToFeed(userId, rankedSections) logger.info('Feed updated for user', { userId }) } diff --git a/packages/api/src/resolvers/function_resolvers.ts b/packages/api/src/resolvers/function_resolvers.ts index 91f8b942f..13a750a9f 100644 --- a/packages/api/src/resolvers/function_resolvers.ts +++ b/packages/api/src/resolvers/function_resolvers.ts @@ -624,6 +624,17 @@ export const functionResolvers = { return newsletterEmail.folder || EXISTING_NEWSLETTER_FOLDER }, }, + JustReadFeedSection: { + async items( + section: { items: Array<{ id: string }> }, + _: unknown, + ctx: WithDataSourcesContext + ) { + return ctx.dataLoaders.justReadFeedItems.loadMany( + section.items.map((i) => i.id) + ) + }, + }, ...resultResolveTypeResolver('Login'), ...resultResolveTypeResolver('LogOut'), ...resultResolveTypeResolver('GoogleSignup'), diff --git a/packages/api/src/resolvers/just_read_feed/index.ts b/packages/api/src/resolvers/just_read_feed/index.ts index e6ff119a6..1a225dc87 100644 --- a/packages/api/src/resolvers/just_read_feed/index.ts +++ b/packages/api/src/resolvers/just_read_feed/index.ts @@ -1,29 +1,46 @@ import { JustReadFeedError, JustReadFeedErrorCode, + JustReadFeedItem, + JustReadFeedSection, JustReadFeedSuccess, QueryJustReadFeedArgs, } from '../../generated/graphql' import { getJustReadFeedSections } from '../../jobs/update_just_read_feed' import { getJob } from '../../queue-processor' +import { Merge } from '../../util' import { enqueueUpdateJustReadFeed, updateJustReadFeedJobId, } from '../../utils/createTask' import { authorized } from '../../utils/gql-utils' +type PartialJustReadFeedItem = Merge< + Partial, + { type: string } +> +type PartialJustReadFeedSection = Merge< + JustReadFeedSection, + { items: Array } +> +type PartialJustReadFeedSuccess = Merge< + JustReadFeedSuccess, + { + edges: Array<{ cursor: string; node: PartialJustReadFeedSection }> + } +> // This resolver is used to fetch the just read feed for the user. // when the feed is empty, it enqueues a job to update the feed. // when client tries to fetch more then the feed has, it enqueues a job to update the feed. export const justReadFeedResolver = authorized< - JustReadFeedSuccess, + PartialJustReadFeedSuccess, JustReadFeedError, QueryJustReadFeedArgs >(async (_, { first, after }, { uid, log }) => { const limit = first || 10 - const minScore = after ? Number(after) : 0 + const cursor = after ? parseInt(after) : undefined - const sections = await getJustReadFeedSections(uid, limit, minScore) + const sections = await getJustReadFeedSections(uid, limit, cursor) log.info('Just read feed sections fetched') if (sections.length === 0) { diff --git a/packages/api/src/resolvers/types.ts b/packages/api/src/resolvers/types.ts index f880a0145..dacc50476 100644 --- a/packages/api/src/resolvers/types.ts +++ b/packages/api/src/resolvers/types.ts @@ -10,6 +10,7 @@ import { Highlight } from '../entity/highlight' import { Label } from '../entity/label' import { Recommendation } from '../entity/recommendation' import { UploadFile } from '../entity/upload_file' +import { JustReadFeedItem } from '../generated/graphql' import { PubsubClient } from '../pubsub' export interface Claims { @@ -51,6 +52,7 @@ export interface RequestContext { highlights: DataLoader recommendations: DataLoader uploadFiles: DataLoader + justReadFeedItems: DataLoader } } diff --git a/packages/api/src/schema.ts b/packages/api/src/schema.ts index 3a7c0bd4c..c0b9345cb 100755 --- a/packages/api/src/schema.ts +++ b/packages/api/src/schema.ts @@ -3114,9 +3114,8 @@ const schema = gql` url: String! thumbnail: String previewContent: String - highlights: String - savedCount: Int - likedCount: Int + saveCount: Int + likeCount: Int broadcastCount: Int date: Date! author: String diff --git a/packages/api/src/services/just_read_feed.ts b/packages/api/src/services/just_read_feed.ts index 895192062..a38da1650 100644 --- a/packages/api/src/services/just_read_feed.ts +++ b/packages/api/src/services/just_read_feed.ts @@ -1,5 +1,64 @@ import { PublicItem } from '../entity/public_item' +import { JustReadFeedItem } from '../generated/graphql' import { authTrx } from '../repository' +import { findLibraryItemsByIds } from './library_item' + +export const batchGetJustReadFeedItems = async ( + ids: readonly string[] +): Promise> => { + const libraryItems = await findLibraryItemsByIds(ids as string[], '') + + const publicItems = await authTrx(async (tx) => + tx + .getRepository(PublicItem) + .createQueryBuilder('public_item') + .innerJoin( + 'public_item_stats', + 'stats', + 'stats.public_item_id = public_item.id' + ) + .innerJoin( + 'public_item_source', + 'source', + 'source.id = public_item.source_id' + ) + .where('public_item.id IN (:...ids)', { ids }) + .getMany() + ) + + return ids + .map((id) => { + const libraryItem = libraryItems.find((li) => li.id === id) + if (libraryItem) { + return { + ...libraryItem, + date: libraryItem.savedAt, + url: libraryItem.originalUrl, + canArchive: !libraryItem.archivedAt, + canDelete: !libraryItem.deletedAt, + canSave: false, + dir: libraryItem.directionality, + } + } else { + const publicItem = publicItems.find((pi) => pi.id === id) + return publicItem + ? { + ...publicItem, + date: publicItem.createdAt, + url: publicItem.url, + canArchive: false, + canDelete: false, + canSave: true, + broadcastCount: publicItem.stats.broadcastCount, + likeCount: publicItem.stats.likeCount, + saveCount: publicItem.stats.saveCount, + subscription: publicItem.source, + } + : undefined + } + }) + .filter((item) => item !== undefined) as JustReadFeedItem[] +} export const findUnseenPublicItems = async ( userId: string, diff --git a/packages/api/src/services/score.ts b/packages/api/src/services/score.ts index ed1820efc..e2dc8d1ca 100644 --- a/packages/api/src/services/score.ts +++ b/packages/api/src/services/score.ts @@ -8,8 +8,8 @@ export interface Feature { author?: string directionality: string word_count?: number - subscription_type: string - folder: string + subscription_type?: string + folder?: string published_at?: Date } @@ -23,33 +23,33 @@ export type ScoreApiResponse = Record // item_id -> score export const getScores = async ( data: ScoreApiRequestBody ): Promise => { - const API_URL = 'http://127.0.0.1:5000/predictions' - const token = process.env.SCORE_API_TOKEN + // const API_URL = 'http://127.0.0.1:5000/predictions' + // const token = process.env.SCORE_API_TOKEN - if (!token) { - throw new Error('No score API token found') - } + // if (!token) { + // throw new Error('No score API token found') + // } - const response = await fetch(API_URL, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - Authorization: `Bearer ${token}`, - }, - body: JSON.stringify(data), - }) + // const response = await fetch(API_URL, { + // method: 'POST', + // headers: { + // 'Content-Type': 'application/json', + // Authorization: `Bearer ${token}`, + // }, + // body: JSON.stringify(data), + // }) - if (!response.ok) { - throw new Error(`Failed to score candidates: ${response.statusText}`) - } + // if (!response.ok) { + // throw new Error(`Failed to score candidates: ${response.statusText}`) + // } - const scores = (await response.json()) as ScoreApiResponse - return scores + // const scores = (await response.json()) as ScoreApiResponse + // return scores // fake random scores - // const scores: ScoreApiResponse = {} - // for (const itemId of Object.keys(data.item_features)) { - // scores[itemId] = Math.random() - // } - // return Promise.resolve(scores) + const scores: ScoreApiResponse = {} + for (const itemId of Object.keys(data.item_features)) { + scores[itemId] = Math.random() + } + return Promise.resolve(scores) } diff --git a/packages/api/src/services/subscriptions.ts b/packages/api/src/services/subscriptions.ts index 22be8c2f4..72b0fc9d2 100644 --- a/packages/api/src/services/subscriptions.ts +++ b/packages/api/src/services/subscriptions.ts @@ -1,5 +1,5 @@ import axios from 'axios' -import { DeepPartial, DeleteResult } from 'typeorm' +import { DeepPartial, DeleteResult, In } from 'typeorm' import { appDataSource } from '../data_source' import { NewsletterEmail } from '../entity/newsletter_email' import { Subscription } from '../entity/subscription' @@ -214,3 +214,13 @@ export const createRssSubscriptions = async ( ) => { return getRepository(Subscription).save(subscriptions) } + +export const findSubscriptionsByNames = async ( + userId: string, + names: string[] +): Promise => { + return getRepository(Subscription).findBy([ + { user: { id: userId }, name: In(names) }, + { user: { id: userId }, url: In(names) }, + ]) +}