store only feed item id and type in redis and fetch the details from DB when pagination API is called

This commit is contained in:
Hongbo Wu
2024-05-27 17:38:36 +08:00
parent 87ad71fbe9
commit f603c84b27
12 changed files with 227 additions and 104 deletions

View File

@ -32,6 +32,7 @@ import { ClaimsToSet, RequestContext, ResolverContext } from './resolvers/types'
import ScalarResolvers from './scalars'
import typeDefs from './schema'
import { batchGetHighlightsFromLibraryItemIds } from './services/highlights'
import { batchGetJustReadFeedItems } from './services/just_read_feed'
import { batchGetLabelsFromLibraryItemIds } from './services/labels'
import { batchGetRecommendationsFromLibraryItemIds } from './services/recommendation'
import {
@ -112,6 +113,7 @@ const contextFunc: ContextFunction<ExpressContext, ResolverContext> = async ({
batchGetRecommendationsFromLibraryItemIds
),
uploadFiles: new DataLoader(batchGetUploadFilesByIds),
justReadFeedItems: new DataLoader(batchGetJustReadFeedItems),
},
}

View File

@ -1391,11 +1391,10 @@ export type JustReadFeedItem = {
canShare?: Maybe<Scalars['Boolean']>;
date: Scalars['Date'];
dir?: Maybe<Scalars['String']>;
highlights?: Maybe<Scalars['String']>;
id: Scalars['ID'];
likedCount?: Maybe<Scalars['Int']>;
likeCount?: Maybe<Scalars['Int']>;
previewContent?: Maybe<Scalars['String']>;
savedCount?: Maybe<Scalars['Int']>;
saveCount?: Maybe<Scalars['Int']>;
seen_at?: Maybe<Scalars['Date']>;
subscription: JustReadFeedSubscription;
thumbnail?: Maybe<Scalars['String']>;
@ -6103,11 +6102,10 @@ export type JustReadFeedItemResolvers<ContextType = ResolverContext, ParentType
canShare?: Resolver<Maybe<ResolversTypes['Boolean']>, ParentType, ContextType>;
date?: Resolver<ResolversTypes['Date'], ParentType, ContextType>;
dir?: Resolver<Maybe<ResolversTypes['String']>, ParentType, ContextType>;
highlights?: Resolver<Maybe<ResolversTypes['String']>, ParentType, ContextType>;
id?: Resolver<ResolversTypes['ID'], ParentType, ContextType>;
likedCount?: Resolver<Maybe<ResolversTypes['Int']>, ParentType, ContextType>;
likeCount?: Resolver<Maybe<ResolversTypes['Int']>, ParentType, ContextType>;
previewContent?: Resolver<Maybe<ResolversTypes['String']>, ParentType, ContextType>;
savedCount?: Resolver<Maybe<ResolversTypes['Int']>, ParentType, ContextType>;
saveCount?: Resolver<Maybe<ResolversTypes['Int']>, ParentType, ContextType>;
seen_at?: Resolver<Maybe<ResolversTypes['Date']>, ParentType, ContextType>;
subscription?: Resolver<ResolversTypes['JustReadFeedSubscription'], ParentType, ContextType>;
thumbnail?: Resolver<Maybe<ResolversTypes['String']>, ParentType, ContextType>;

View File

@ -1253,11 +1253,10 @@ type JustReadFeedItem {
canShare: Boolean
date: Date!
dir: String
highlights: String
id: ID!
likedCount: Int
likeCount: Int
previewContent: String
savedCount: Int
saveCount: Int
seen_at: Date
subscription: JustReadFeedSubscription!
thumbnail: String

View File

@ -53,6 +53,7 @@ export const scoreLibraryItem = async (
author: libraryItem.author,
language: lanaugeToCode(libraryItem.itemLanguage || 'English'),
word_count: libraryItem.wordCount,
published_at: libraryItem.publishedAt,
} as Feature,
}
@ -77,4 +78,5 @@ export const scoreLibraryItem = async (
undefined,
true
)
logger.info('Library item scored', data)
}

View File

@ -1,11 +1,12 @@
import { LibraryItem } from '../entity/library_item'
import { PublicItem } from '../entity/public_item'
import { Subscription } from '../entity/subscription'
import { User } from '../entity/user'
import { JustReadFeedSection } from '../generated/graphql'
import { redisDataSource } from '../redis_data_source'
import { findUnseenPublicItems } from '../services/just_read_feed'
import { searchLibraryItems } from '../services/library_item'
import { Feature, getScores, ScoreApiResponse } from '../services/score'
import { findSubscriptionsByNames } from '../services/subscriptions'
import { findActiveUser } from '../services/user'
import { lanaugeToCode } from '../utils/helpers'
import { logger } from '../utils/logger'
@ -16,10 +17,11 @@ export interface UpdateJustReadFeedJobData {
userId: string
}
interface FeedItem {
interface Candidate {
id: string
title: string
url: string
type: string
thumbnail?: string
previewContent?: string
languageCode: string
@ -27,27 +29,36 @@ interface FeedItem {
dir: string
date: Date
topic?: string
wordCount?: number
wordCount: number
siteIcon?: string
siteName?: string
saveCount?: number
likeCount?: number
broadcastCount?: number
folder?: string
subscriptionType: string
score?: number
publishedAt?: Date
subscription: {
id: string
subscription?: {
name: string
icon?: string
type: string
}
}
const libraryItemToCandidate = (user: User, item: LibraryItem): FeedItem => ({
interface Item {
id: string
type: string
}
interface Section {
items: Array<Item>
layout: string
}
const libraryItemToCandidate = (
item: LibraryItem,
subscriptions: Array<Subscription>
): Candidate => ({
id: item.id,
title: item.title,
url: item.originalUrl,
type: 'library_item',
thumbnail: item.thumbnail || undefined,
previewContent: item.description || undefined,
languageCode: lanaugeToCode(item.itemLanguage || 'English'),
@ -55,24 +66,24 @@ const libraryItemToCandidate = (user: User, item: LibraryItem): FeedItem => ({
dir: item.directionality || 'ltr',
date: item.createdAt,
topic: item.topic,
wordCount: item.wordCount || undefined,
wordCount: item.wordCount!,
siteName: item.siteName || undefined,
siteIcon: item.siteIcon || undefined,
folder: item.folder,
subscriptionType: 'library',
score: item.score,
publishedAt: item.publishedAt || undefined,
subscription: {
id: user.id,
name: user.name,
icon: user.profile.pictureUrl || undefined,
},
subscription: subscriptions.find(
(subscription) =>
subscription.name === item.subscription ||
subscription.url === item.subscription
),
})
const publicItemToCandidate = (item: PublicItem): FeedItem => ({
const publicItemToCandidate = (item: PublicItem): Candidate => ({
id: item.id,
title: item.title,
url: item.url,
type: 'public_item',
thumbnail: item.thumbnail,
previewContent: item.previewContent,
languageCode: item.languageCode || 'en',
@ -80,38 +91,42 @@ const publicItemToCandidate = (item: PublicItem): FeedItem => ({
dir: item.dir || 'ltr',
date: item.createdAt,
topic: item.topic,
wordCount: item.wordCount,
siteName: item.siteName,
saveCount: item.stats.saveCount,
likeCount: item.stats.likeCount,
broadcastCount: item.stats.broadcastCount,
wordCount: item.wordCount!,
siteIcon: item.siteIcon,
subscriptionType: item.type,
publishedAt: item.publishedAt,
subscription: {
id: item.source.id,
name: item.source.name,
icon: item.source.icon,
type: item.source.type,
},
})
const selectCandidates = async (user: User): Promise<Array<FeedItem>> => {
const selectCandidates = async (user: User): Promise<Array<Candidate>> => {
const userId = user.id
// get last 100 library items saved and not seen by user
const libraryItems = await searchLibraryItems(
{
size: 100,
includeContent: false,
query: `-is:seen`,
query: `-is:seen wordsCount:>0`,
},
userId
)
logger.info(`Found ${libraryItems.length} library items`)
// get subscriptions for the library items
const subscriptionNames = libraryItems
.filter((item) => !!item.subscription)
.map((item) => item.subscription as string)
const subscriptions = await findSubscriptionsByNames(
userId,
subscriptionNames
)
// map library items to candidates and limit to 70
const privateCandidates: Array<FeedItem> = libraryItems
.map((libraryItem) => libraryItemToCandidate(user, libraryItem))
const privateCandidates: Array<Candidate> = libraryItems
.map((item) => libraryItemToCandidate(item, subscriptions))
.slice(0, 70)
const privateCandidatesSize = privateCandidates.length
@ -125,7 +140,7 @@ const selectCandidates = async (user: User): Promise<Array<FeedItem>> => {
logger.info(`Found ${publicItems.length} public items`)
// map public items to candidates and limit to the remaining vacancies
const publicCandidates: Array<FeedItem> = publicItems
const publicCandidates: Array<Candidate> = publicItems
.map(publicItemToCandidate)
.slice(0, 100 - privateCandidatesSize)
@ -138,8 +153,8 @@ const selectCandidates = async (user: User): Promise<Array<FeedItem>> => {
const rankCandidates = async (
userId: string,
candidates: Array<FeedItem>
): Promise<Array<FeedItem>> => {
candidates: Array<Candidate>
): Promise<Array<Candidate>> => {
if (candidates.length <= 10) {
// no need to rank if there are less than 10 candidates
return candidates
@ -155,13 +170,13 @@ const rankCandidates = async (
acc[item.id] = {
title: item.title,
has_thumbnail: !!item.thumbnail,
has_site_icon: !!item.subscription.icon,
has_site_icon: !!item.siteIcon,
saved_at: item.date,
site: item.siteName,
language: item.languageCode,
directionality: item.dir,
folder: item.subscription.name,
subscription_type: item.subscriptionType,
folder: item.folder,
subscription_type: item.subscription?.type,
author: item.author,
word_count: item.wordCount,
published_at: item.publishedAt,
@ -196,8 +211,8 @@ const MAX_FEED_ITEMS = 500
export const getJustReadFeedSections = async (
userId: string,
limit: number,
minScore?: number
): Promise<Array<{ member: JustReadFeedSection; score: number }>> => {
maxScore?: number
): Promise<Array<{ member: Section; score: number }>> => {
const redisClient = redisDataSource.redisClient
if (!redisClient) {
throw new Error('Redis client not available')
@ -211,8 +226,8 @@ export const getJustReadFeedSections = async (
// response is an array of [member1, score1, member2, score2, ...]
const results = await redisClient.zrevrangebyscore(
key,
'+inf',
minScore || '-inf',
maxScore ? maxScore - 1 : '+inf',
'-inf',
'WITHSCORES',
'LIMIT',
0,
@ -221,7 +236,7 @@ export const getJustReadFeedSections = async (
const sections = []
for (let i = 0; i < results.length; i += 2) {
const member = JSON.parse(results[i]) as JustReadFeedSection
const member = JSON.parse(results[i]) as Section
const score = Number(results[i + 1])
sections.push({ member, score })
}
@ -231,7 +246,7 @@ export const getJustReadFeedSections = async (
const appendSectionsToFeed = async (
userId: string,
sections: Array<JustReadFeedSection>
sections: Array<Section>
) => {
const redisClient = redisDataSource.redisClient
if (!redisClient) {
@ -243,8 +258,8 @@ const appendSectionsToFeed = async (
// store candidates in redis sorted set
const pipeline = redisClient.pipeline()
const scoreMembers = sections.flatMap((section) => [
Date.now() + 86_400_000, // sections expire in 24 hours
const scoreMembers = sections.flatMap((section, index) => [
Date.now() + index + 86_400_000, // sections expire in 24 hours
JSON.stringify(section),
])
// add section to the sorted set
@ -258,32 +273,33 @@ const appendSectionsToFeed = async (
await pipeline.exec()
}
const mixFeedItems = (
rankedFeedItems: Array<FeedItem>
): Array<JustReadFeedSection> => {
const mixFeedItems = (rankedFeedItems: Array<Candidate>): Array<Section> => {
// find the median word count
const wordCounts = rankedFeedItems.map((item) => item.wordCount || 0)
wordCounts.sort()
const wordCounts = rankedFeedItems.map((item) => item.wordCount)
wordCounts.sort((a, b) => a - b)
const medianWordCount = wordCounts[Math.floor(wordCounts.length / 2)]
// separate items into two groups based on word count
const shortItems: Array<FeedItem> = []
const longItems: Array<FeedItem> = []
const shortItems: Array<Candidate> = []
const longItems: Array<Candidate> = []
for (const item of rankedFeedItems) {
if (item.wordCount && item.wordCount < medianWordCount) {
if (item.wordCount < medianWordCount) {
shortItems.push(item)
} else {
longItems.push(item)
}
}
// initialize empty batches
const batches = [[]]
const batches: Array<Array<Candidate>> = Array.from(
{ length: Math.floor(rankedFeedItems.length / 10) },
() => []
)
const checkConstraints = (batch: Array<FeedItem>, item: FeedItem) => {
const checkConstraints = (batch: Array<Candidate>, item: Candidate) => {
const titleCount = batch.filter((i) => i.title === item.title).length
const authorCount = batch.filter((i) => i.author === item.author).length
const siteCount = batch.filter((i) => i.siteName === item.siteName).length
const subscriptionCount = batch.filter(
(i) => i.subscription.name === item.subscription.name
(i) => i.subscription?.name === item.subscription?.name
).length
return (
@ -295,8 +311,8 @@ const mixFeedItems = (
}
const distributeItems = (
items: Array<FeedItem>,
batches: Array<Array<FeedItem>>
items: Array<Candidate>,
batches: Array<Array<Candidate>>
) => {
for (const item of items) {
let added = false
@ -310,7 +326,7 @@ const mixFeedItems = (
if (!added) {
for (const batch of batches) {
if (batch.length < 5) {
if (batch.length < 10) {
batch.push(item)
break
}
@ -328,15 +344,23 @@ const mixFeedItems = (
for (const batch of batches) {
// create a section for each long item
for (let i = 0; i < 5; i++) {
const section: JustReadFeedSection = {
items: [batch[i]],
const section: Section = {
items: [
{
id: batch[i].id,
type: batch[i].type,
},
],
layout: 'long',
}
sections.push(section)
}
// create a section for short items
sections.push({
items: batch.slice(5),
items: batch.slice(5).map((item) => ({
id: item.id,
type: item.type,
})),
layout: 'quick links',
})
}
@ -372,7 +396,7 @@ export const updateJustReadFeed = async (data: UpdateJustReadFeedJobData) => {
const rankedSections = mixFeedItems(rankedCandidates)
logger.info(`Created ${rankedSections.length} sections`)
logger.info('Appending feed items to feed')
logger.info('Appending sections to feed')
await appendSectionsToFeed(userId, rankedSections)
logger.info('Feed updated for user', { userId })
}

View File

@ -624,6 +624,17 @@ export const functionResolvers = {
return newsletterEmail.folder || EXISTING_NEWSLETTER_FOLDER
},
},
JustReadFeedSection: {
async items(
section: { items: Array<{ id: string }> },
_: unknown,
ctx: WithDataSourcesContext
) {
return ctx.dataLoaders.justReadFeedItems.loadMany(
section.items.map((i) => i.id)
)
},
},
...resultResolveTypeResolver('Login'),
...resultResolveTypeResolver('LogOut'),
...resultResolveTypeResolver('GoogleSignup'),

View File

@ -1,29 +1,46 @@
import {
JustReadFeedError,
JustReadFeedErrorCode,
JustReadFeedItem,
JustReadFeedSection,
JustReadFeedSuccess,
QueryJustReadFeedArgs,
} from '../../generated/graphql'
import { getJustReadFeedSections } from '../../jobs/update_just_read_feed'
import { getJob } from '../../queue-processor'
import { Merge } from '../../util'
import {
enqueueUpdateJustReadFeed,
updateJustReadFeedJobId,
} from '../../utils/createTask'
import { authorized } from '../../utils/gql-utils'
type PartialJustReadFeedItem = Merge<
Partial<JustReadFeedItem>,
{ type: string }
>
type PartialJustReadFeedSection = Merge<
JustReadFeedSection,
{ items: Array<PartialJustReadFeedItem> }
>
type PartialJustReadFeedSuccess = Merge<
JustReadFeedSuccess,
{
edges: Array<{ cursor: string; node: PartialJustReadFeedSection }>
}
>
// This resolver is used to fetch the just read feed for the user.
// when the feed is empty, it enqueues a job to update the feed.
// when client tries to fetch more then the feed has, it enqueues a job to update the feed.
export const justReadFeedResolver = authorized<
JustReadFeedSuccess,
PartialJustReadFeedSuccess,
JustReadFeedError,
QueryJustReadFeedArgs
>(async (_, { first, after }, { uid, log }) => {
const limit = first || 10
const minScore = after ? Number(after) : 0
const cursor = after ? parseInt(after) : undefined
const sections = await getJustReadFeedSections(uid, limit, minScore)
const sections = await getJustReadFeedSections(uid, limit, cursor)
log.info('Just read feed sections fetched')
if (sections.length === 0) {

View File

@ -10,6 +10,7 @@ import { Highlight } from '../entity/highlight'
import { Label } from '../entity/label'
import { Recommendation } from '../entity/recommendation'
import { UploadFile } from '../entity/upload_file'
import { JustReadFeedItem } from '../generated/graphql'
import { PubsubClient } from '../pubsub'
export interface Claims {
@ -51,6 +52,7 @@ export interface RequestContext {
highlights: DataLoader<string, Highlight[]>
recommendations: DataLoader<string, Recommendation[]>
uploadFiles: DataLoader<string, UploadFile | undefined>
justReadFeedItems: DataLoader<string, JustReadFeedItem>
}
}

View File

@ -3114,9 +3114,8 @@ const schema = gql`
url: String!
thumbnail: String
previewContent: String
highlights: String
savedCount: Int
likedCount: Int
saveCount: Int
likeCount: Int
broadcastCount: Int
date: Date!
author: String

View File

@ -1,5 +1,64 @@
import { PublicItem } from '../entity/public_item'
import { JustReadFeedItem } from '../generated/graphql'
import { authTrx } from '../repository'
import { findLibraryItemsByIds } from './library_item'
export const batchGetJustReadFeedItems = async (
ids: readonly string[]
): Promise<Array<JustReadFeedItem>> => {
const libraryItems = await findLibraryItemsByIds(ids as string[], '')
const publicItems = await authTrx(async (tx) =>
tx
.getRepository(PublicItem)
.createQueryBuilder('public_item')
.innerJoin(
'public_item_stats',
'stats',
'stats.public_item_id = public_item.id'
)
.innerJoin(
'public_item_source',
'source',
'source.id = public_item.source_id'
)
.where('public_item.id IN (:...ids)', { ids })
.getMany()
)
return ids
.map((id) => {
const libraryItem = libraryItems.find((li) => li.id === id)
if (libraryItem) {
return {
...libraryItem,
date: libraryItem.savedAt,
url: libraryItem.originalUrl,
canArchive: !libraryItem.archivedAt,
canDelete: !libraryItem.deletedAt,
canSave: false,
dir: libraryItem.directionality,
}
} else {
const publicItem = publicItems.find((pi) => pi.id === id)
return publicItem
? {
...publicItem,
date: publicItem.createdAt,
url: publicItem.url,
canArchive: false,
canDelete: false,
canSave: true,
broadcastCount: publicItem.stats.broadcastCount,
likeCount: publicItem.stats.likeCount,
saveCount: publicItem.stats.saveCount,
subscription: publicItem.source,
}
: undefined
}
})
.filter((item) => item !== undefined) as JustReadFeedItem[]
}
export const findUnseenPublicItems = async (
userId: string,

View File

@ -8,8 +8,8 @@ export interface Feature {
author?: string
directionality: string
word_count?: number
subscription_type: string
folder: string
subscription_type?: string
folder?: string
published_at?: Date
}
@ -23,33 +23,33 @@ export type ScoreApiResponse = Record<string, number> // item_id -> score
export const getScores = async (
data: ScoreApiRequestBody
): Promise<ScoreApiResponse> => {
const API_URL = 'http://127.0.0.1:5000/predictions'
const token = process.env.SCORE_API_TOKEN
// const API_URL = 'http://127.0.0.1:5000/predictions'
// const token = process.env.SCORE_API_TOKEN
if (!token) {
throw new Error('No score API token found')
}
// if (!token) {
// throw new Error('No score API token found')
// }
const response = await fetch(API_URL, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${token}`,
},
body: JSON.stringify(data),
})
// const response = await fetch(API_URL, {
// method: 'POST',
// headers: {
// 'Content-Type': 'application/json',
// Authorization: `Bearer ${token}`,
// },
// body: JSON.stringify(data),
// })
if (!response.ok) {
throw new Error(`Failed to score candidates: ${response.statusText}`)
}
// if (!response.ok) {
// throw new Error(`Failed to score candidates: ${response.statusText}`)
// }
const scores = (await response.json()) as ScoreApiResponse
return scores
// const scores = (await response.json()) as ScoreApiResponse
// return scores
// fake random scores
// const scores: ScoreApiResponse = {}
// for (const itemId of Object.keys(data.item_features)) {
// scores[itemId] = Math.random()
// }
// return Promise.resolve(scores)
const scores: ScoreApiResponse = {}
for (const itemId of Object.keys(data.item_features)) {
scores[itemId] = Math.random()
}
return Promise.resolve(scores)
}

View File

@ -1,5 +1,5 @@
import axios from 'axios'
import { DeepPartial, DeleteResult } from 'typeorm'
import { DeepPartial, DeleteResult, In } from 'typeorm'
import { appDataSource } from '../data_source'
import { NewsletterEmail } from '../entity/newsletter_email'
import { Subscription } from '../entity/subscription'
@ -214,3 +214,13 @@ export const createRssSubscriptions = async (
) => {
return getRepository(Subscription).save(subscriptions)
}
export const findSubscriptionsByNames = async (
userId: string,
names: string[]
): Promise<Subscription[]> => {
return getRepository(Subscription).findBy([
{ user: { id: userId }, name: In(names) },
{ user: { id: userId }, url: In(names) },
])
}