implement justReadFeed API

This commit is contained in:
Hongbo Wu
2024-05-23 22:07:19 +08:00
parent b058952c2d
commit cb4fc23507
12 changed files with 163 additions and 38 deletions

View File

@ -207,4 +207,7 @@ export class LibraryItem {
@Column('timestamptz')
seenAt?: Date
@Column('text')
topic!: string
}

View File

@ -52,4 +52,7 @@ export class PublicItem {
@UpdateDateColumn()
updatedAt!: Date
@Column('text')
topic!: string
}

View File

@ -1392,7 +1392,6 @@ export type JustReadFeedItem = {
thumbnail?: Maybe<Scalars['String']>;
title: Scalars['String'];
topic: Scalars['String'];
type: Scalars['String'];
updatedAt?: Maybe<Scalars['Date']>;
url: Scalars['String'];
};
@ -2272,6 +2271,8 @@ export type QueryIntegrationArgs = {
export type QueryJustReadFeedArgs = {
after?: InputMaybe<Scalars['String']>;
first?: InputMaybe<Scalars['Int']>;
language?: InputMaybe<Scalars['String']>;
location?: InputMaybe<Scalars['String']>;
};
@ -6089,7 +6090,6 @@ export type JustReadFeedItemResolvers<ContextType = ResolverContext, ParentType
thumbnail?: Resolver<Maybe<ResolversTypes['String']>, ParentType, ContextType>;
title?: Resolver<ResolversTypes['String'], ParentType, ContextType>;
topic?: Resolver<ResolversTypes['String'], ParentType, ContextType>;
type?: Resolver<ResolversTypes['String'], ParentType, ContextType>;
updatedAt?: Resolver<Maybe<ResolversTypes['Date']>, ParentType, ContextType>;
url?: Resolver<ResolversTypes['String'], ParentType, ContextType>;
__isTypeOf?: IsTypeOfResolverFn<ParentType, ContextType>;

View File

@ -1249,7 +1249,6 @@ type JustReadFeedItem {
thumbnail: String
title: String!
topic: String!
type: String!
updatedAt: Date
url: String!
}
@ -1693,7 +1692,7 @@ type Query {
hello: String
integration(name: String!): IntegrationResult!
integrations: IntegrationsResult!
justReadFeed(language: String, location: String): JustReadFeedResult!
justReadFeed(after: String, first: Int, language: String, location: String): JustReadFeedResult!
labels: LabelsResult!
me: User
newsletterEmails: NewsletterEmailsResult!

View File

@ -1,18 +1,21 @@
import { LibraryItem } from '../entity/library_item'
import { PublicItem } from '../entity/public_item'
import { redisDataSource } from '../redis_data_source'
import { findUnseenPublicItems } from '../services/just_read_feed'
import { searchLibraryItems } from '../services/library_item'
import { findUnseenPublicItems } from '../services/public_item'
import { logger } from '../utils/logger'
interface JustReadFeedUpdateData {
export const UPDATE_JUST_READ_FEED_JOB = 'UPDATE_JUST_READ_FEED_JOB'
export interface UpdateJustReadFeedJobData {
userId: string
}
interface Candidate {
interface JustReadFeedItem {
id: string
title: string
url: string
topic: string
thumbnail?: string
previewContent?: string
languageCode?: string
@ -22,7 +25,17 @@ interface Candidate {
subscription?: string
}
const libraryItemToCandidate = (item: LibraryItem): Candidate => ({
interface JustReadFeedTopic {
name: string
items: Array<JustReadFeedItem>
thumbnail: string
}
interface JustReadFeed {
topics: Array<JustReadFeedTopic>
}
const libraryItemToFeedItem = (item: LibraryItem): JustReadFeedItem => ({
id: item.id,
title: item.title,
url: item.originalUrl,
@ -33,9 +46,10 @@ const libraryItemToCandidate = (item: LibraryItem): Candidate => ({
dir: item.directionality || undefined,
publishedAt: item.publishedAt || undefined,
subscription: item.subscription || undefined,
topic: item.topic,
})
const publicItemToCandidate = (item: PublicItem): Candidate => ({
const publicItemToFeedItem = (item: PublicItem): JustReadFeedItem => ({
id: item.id,
title: item.title,
url: item.url,
@ -46,9 +60,12 @@ const publicItemToCandidate = (item: PublicItem): Candidate => ({
dir: item.dir,
publishedAt: item.publishedAt,
subscription: item.source_name,
topic: item.topic,
})
const selectCandidates = async (userId: string): Promise<Array<Candidate>> => {
const selectCandidates = async (
userId: string
): Promise<Array<JustReadFeedItem>> => {
// get last 100 library items saved and not seen by user
const libraryItems = await searchLibraryItems(
{
@ -60,8 +77,8 @@ const selectCandidates = async (userId: string): Promise<Array<Candidate>> => {
)
// map library items to candidates
const privateCandidates: Array<Candidate> = libraryItems.map(
libraryItemToCandidate
const privateCandidates: Array<JustReadFeedItem> = libraryItems.map(
libraryItemToFeedItem
)
// get candidates from public inventory
@ -70,19 +87,18 @@ const selectCandidates = async (userId: string): Promise<Array<Candidate>> => {
})
// map public items to candidates
const publicCandidates: Array<Candidate> = publicItems.map(
publicItemToCandidate
)
const publicCandidates: Array<JustReadFeedItem> =
publicItems.map(publicItemToFeedItem)
// combine candidates
return [...privateCandidates, ...publicCandidates]
}
const rankCandidates = async (
candidates: Array<Candidate>
): Promise<Array<Candidate>> => {
if (candidates.length <= 10) {
return candidates
const rankFeedItems = async (
feedItems: Array<JustReadFeedItem>
): Promise<Array<JustReadFeedItem>> => {
if (feedItems.length <= 10) {
return feedItems
}
// TODO: rank candidates
@ -93,43 +109,83 @@ const rankCandidates = async (
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ candidates }),
body: JSON.stringify({ candidates: feedItems }),
})
if (!response.ok) {
throw new Error(`Failed to rank candidates: ${response.statusText}`)
}
return response.json() as Promise<Array<Candidate>>
return response.json() as Promise<Array<JustReadFeedItem>>
}
const prependCandidatesToFeed = async (
candidates: Array<Candidate>,
userId: string
) => {
const redisKey = `just-read-feed:${userId}`
const redisKey = (userId: string) => `just-read-feed:${userId}`
export const getJustReadFeed = async (
userId: string,
limit: number,
offset: number
): Promise<JustReadFeed> => {
const redisClient = redisDataSource.redisClient
if (!redisClient) {
throw new Error('Redis client not available')
}
const key = redisKey(userId)
const results = await redisClient.lrange(key, offset, offset + limit - 1)
const feedItems = results.map((item) => JSON.parse(item) as JustReadFeedItem)
const topics: Array<JustReadFeedTopic> = []
feedItems.forEach((item) => {
const topic = topics.find((topic) => topic.name === item.topic)
if (topic) {
topic.items.push(item)
} else {
topics.push({
name: item.topic,
thumbnail: item.thumbnail || '',
items: [item],
})
}
})
return { topics }
}
const prependItemsToFeed = async (
candidates: Array<JustReadFeedItem>,
userId: string
) => {
const redisClient = redisDataSource.redisClient
if (!redisClient) {
throw new Error('Redis client not available')
}
const key = redisKey(userId)
const pipeline = redisClient.pipeline()
candidates.forEach((candidate) =>
pipeline.lpush(redisKey, JSON.stringify(candidate))
pipeline.lpush(key, JSON.stringify(candidate))
)
// keep only the first 100 items
pipeline.ltrim(key, 0, 99)
await pipeline.exec()
}
const updateJustReadFeed = async (data: JustReadFeedUpdateData) => {
const updateJustReadFeed = async (data: UpdateJustReadFeedJobData) => {
const { userId } = data
logger.info(`Updating just read feed for user ${userId}`)
const candidates = await selectCandidates(userId)
logger.info(`Found ${candidates.length} candidates`)
const feedItems = await selectCandidates(userId)
logger.info(`Found ${feedItems.length} candidates`)
// TODO: integrity check on candidates?
const rankedCandidates = await rankCandidates(candidates)
const rankedFeedItems = await rankFeedItems(feedItems)
await prependCandidatesToFeed(rankedCandidates, userId)
await prependItemsToFeed(rankedFeedItems, userId)
}

View File

@ -33,7 +33,6 @@ import {
wordsCount,
} from '../utils/helpers'
import { createImageProxyUrl } from '../utils/imageproxy'
import { logger } from '../utils/logger'
import { contentConverter } from '../utils/parser'
import {
generateDownloadSignedUrl,
@ -153,6 +152,7 @@ import {
webhookResolver,
webhooksResolver,
} from './index'
import { justReadFeedResolver } from './just_read_feed'
import {
markEmailAsItemResolver,
recentEmailsResolver,
@ -360,6 +360,7 @@ export const functionResolvers = {
feeds: feedsResolver,
scanFeeds: scanFeedsResolver,
integration: integrationResolver,
justReadFeed: justReadFeedResolver,
},
User: {
async intercomHash(
@ -722,4 +723,5 @@ export const functionResolvers = {
...resultResolveTypeResolver('Integration'),
...resultResolveTypeResolver('ExportToIntegration'),
...resultResolveTypeResolver('ReplyToEmail'),
...resultResolveTypeResolver('JustReadFeed'),
}

View File

@ -0,0 +1,31 @@
import {
JustReadFeedError,
JustReadFeedSuccess,
QueryJustReadFeedArgs,
} from '../../generated/graphql'
import { getJustReadFeed } from '../../jobs/update_just_read_feed'
import { enqueueUpdateJustReadFeed } from '../../utils/createTask'
import { authorized } from '../../utils/gql-utils'
export const justReadFeedResolver = authorized<
JustReadFeedSuccess,
JustReadFeedError,
QueryJustReadFeedArgs
>(async (_, { first, after }, { uid, log }) => {
first = first || 10
after = after || '0'
const offset = parseInt(after, 10)
const feed = await getJustReadFeed(uid, first, offset)
if (feed.topics.length === 0) {
log.info('No feed items found, updating feed')
await enqueueUpdateJustReadFeed({ userId: uid })
return {
topics: [],
}
}
return feed
})

View File

@ -3128,7 +3128,6 @@ const schema = gql`
updatedAt: Date
comments: [String!]
author: String
type: String!
languageCode: String
dir: String
seen_at: Date
@ -3350,7 +3349,12 @@ const schema = gql`
feeds(input: FeedsInput!): FeedsResult!
discoverFeeds: DiscoverFeedResult!
scanFeeds(input: ScanFeedsInput!): ScanFeedsResult!
justReadFeed(location: String, language: String): JustReadFeedResult!
justReadFeed(
location: String
language: String
first: Int
after: String
): JustReadFeedResult!
}
`

View File

@ -53,6 +53,10 @@ import {
UPDATE_HIGHLIGHT_JOB,
UPDATE_LABELS_JOB,
} from '../jobs/update_db'
import {
UpdateJustReadFeedJobData,
UPDATE_JUST_READ_FEED_JOB,
} from '../jobs/update_just_read_feed'
import {
UploadContentJobData,
UPLOAD_CONTENT_JOB,
@ -85,6 +89,7 @@ export const getJobPriority = (jobName: string): number => {
case UPDATE_HIGHLIGHT_JOB:
case SYNC_READ_POSITIONS_JOB_NAME:
case SEND_EMAIL_JOB:
case UPDATE_JUST_READ_FEED_JOB:
return 1
case TRIGGER_RULE_JOB_NAME:
case CALL_WEBHOOK_JOB_NAME:
@ -981,4 +986,21 @@ export const enqueueBulkUploadContentJob = async (
return queue.addBulk(jobs)
}
export const enqueueUpdateJustReadFeed = async (
data: UpdateJustReadFeedJobData
) => {
const queue = await getBackendQueue()
if (!queue) {
return undefined
}
return queue.add(UPDATE_JUST_READ_FEED_JOB, data, {
jobId: `${UPDATE_JUST_READ_FEED_JOB}_${data.userId}_${JOB_VERSION}`,
removeOnComplete: true,
removeOnFail: true,
priority: getJobPriority(UPDATE_JUST_READ_FEED_JOB),
attempts: 3,
})
}
export default createHttpTaskWithToken

View File

@ -26,6 +26,7 @@ CREATE TABLE omnivore.public_item (
type TEXT NOT NULL, -- public feeds, newsletters, or user recommended
title TEXT NOT NULL,
url TEXT NOT NULL,
topic TEXT NOT NULL,
approved BOOLEAN NOT NULL DEFAULT FALSE,
thumbnail TEXT,
preview_content TEXT,
@ -71,6 +72,8 @@ CREATE INDEX public_item_interaction_public_item_id_idx ON omnivore.public_item_
CREATE TRIGGER update_public_item_interactions_modtime BEFORE UPDATE ON omnivore.public_item_interactions FOR EACH ROW EXECUTE PROCEDURE update_updated_at_column();
ALTER TABLE omnivore.library_item ADD COLUMN seen_at timestamptz;
ALTER TABLE omnivore.library_item
ADD COLUMN seen_at timestamptz,
ADD COLUMN topic TEXT NOT NULL;
COMMIT;

View File

@ -9,6 +9,8 @@ DROP TABLE omnivore.public_item_stats;
DROP TABLE omnivore.public_item;
DROP TABLE omnivore.public_item_source;
ALTER TABLE omnivore.library_item DROP COLUMN seen_at;
ALTER TABLE omnivore.library_item
DROP COLUMN seen_at,
DROP COLUMN topic;
COMMIT;