From cb4fc23507abeb1689245d4bd592af3a2d8dcce8 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Thu, 23 May 2024 22:07:19 +0800 Subject: [PATCH] implement justReadFeed API --- packages/api/src/entity/library_item.ts | 3 + packages/api/src/entity/public_item.ts | 3 + packages/api/src/generated/graphql.ts | 4 +- packages/api/src/generated/schema.graphql | 3 +- .../api/src/jobs/update_just_read_feed.ts | 114 +++++++++++++----- .../api/src/resolvers/function_resolvers.ts | 4 +- .../api/src/resolvers/just_read_feed/index.ts | 31 +++++ packages/api/src/schema.ts | 8 +- .../{public_item.ts => just_read_feed.ts} | 0 packages/api/src/utils/createTask.ts | 22 ++++ .../db/migrations/0177.do.public_item.sql | 5 +- .../db/migrations/0177.undo.public_item.sql | 4 +- 12 files changed, 163 insertions(+), 38 deletions(-) create mode 100644 packages/api/src/resolvers/just_read_feed/index.ts rename packages/api/src/services/{public_item.ts => just_read_feed.ts} (100%) diff --git a/packages/api/src/entity/library_item.ts b/packages/api/src/entity/library_item.ts index f6a2f83de..0e9d7c259 100644 --- a/packages/api/src/entity/library_item.ts +++ b/packages/api/src/entity/library_item.ts @@ -207,4 +207,7 @@ export class LibraryItem { @Column('timestamptz') seenAt?: Date + + @Column('text') + topic!: string } diff --git a/packages/api/src/entity/public_item.ts b/packages/api/src/entity/public_item.ts index 00554c3a8..30e318a3a 100644 --- a/packages/api/src/entity/public_item.ts +++ b/packages/api/src/entity/public_item.ts @@ -52,4 +52,7 @@ export class PublicItem { @UpdateDateColumn() updatedAt!: Date + + @Column('text') + topic!: string } diff --git a/packages/api/src/generated/graphql.ts b/packages/api/src/generated/graphql.ts index 2eaa03ac2..55db3b4a0 100644 --- a/packages/api/src/generated/graphql.ts +++ b/packages/api/src/generated/graphql.ts @@ -1392,7 +1392,6 @@ export type JustReadFeedItem = { thumbnail?: Maybe; title: Scalars['String']; topic: Scalars['String']; - type: Scalars['String']; updatedAt?: Maybe; url: Scalars['String']; }; @@ -2272,6 +2271,8 @@ export type QueryIntegrationArgs = { export type QueryJustReadFeedArgs = { + after?: InputMaybe; + first?: InputMaybe; language?: InputMaybe; location?: InputMaybe; }; @@ -6089,7 +6090,6 @@ export type JustReadFeedItemResolvers, ParentType, ContextType>; title?: Resolver; topic?: Resolver; - type?: Resolver; updatedAt?: Resolver, ParentType, ContextType>; url?: Resolver; __isTypeOf?: IsTypeOfResolverFn; diff --git a/packages/api/src/generated/schema.graphql b/packages/api/src/generated/schema.graphql index 696fb8b3e..5dc0b0dc9 100644 --- a/packages/api/src/generated/schema.graphql +++ b/packages/api/src/generated/schema.graphql @@ -1249,7 +1249,6 @@ type JustReadFeedItem { thumbnail: String title: String! topic: String! - type: String! updatedAt: Date url: String! } @@ -1693,7 +1692,7 @@ type Query { hello: String integration(name: String!): IntegrationResult! integrations: IntegrationsResult! - justReadFeed(language: String, location: String): JustReadFeedResult! + justReadFeed(after: String, first: Int, language: String, location: String): JustReadFeedResult! labels: LabelsResult! me: User newsletterEmails: NewsletterEmailsResult! diff --git a/packages/api/src/jobs/update_just_read_feed.ts b/packages/api/src/jobs/update_just_read_feed.ts index 413dd961c..33ff23d65 100644 --- a/packages/api/src/jobs/update_just_read_feed.ts +++ b/packages/api/src/jobs/update_just_read_feed.ts @@ -1,18 +1,21 @@ import { LibraryItem } from '../entity/library_item' import { PublicItem } from '../entity/public_item' import { redisDataSource } from '../redis_data_source' +import { findUnseenPublicItems } from '../services/just_read_feed' import { searchLibraryItems } from '../services/library_item' -import { findUnseenPublicItems } from '../services/public_item' import { logger } from '../utils/logger' -interface JustReadFeedUpdateData { +export const UPDATE_JUST_READ_FEED_JOB = 'UPDATE_JUST_READ_FEED_JOB' + +export interface UpdateJustReadFeedJobData { userId: string } -interface Candidate { +interface JustReadFeedItem { id: string title: string url: string + topic: string thumbnail?: string previewContent?: string languageCode?: string @@ -22,7 +25,17 @@ interface Candidate { subscription?: string } -const libraryItemToCandidate = (item: LibraryItem): Candidate => ({ +interface JustReadFeedTopic { + name: string + items: Array + thumbnail: string +} + +interface JustReadFeed { + topics: Array +} + +const libraryItemToFeedItem = (item: LibraryItem): JustReadFeedItem => ({ id: item.id, title: item.title, url: item.originalUrl, @@ -33,9 +46,10 @@ const libraryItemToCandidate = (item: LibraryItem): Candidate => ({ dir: item.directionality || undefined, publishedAt: item.publishedAt || undefined, subscription: item.subscription || undefined, + topic: item.topic, }) -const publicItemToCandidate = (item: PublicItem): Candidate => ({ +const publicItemToFeedItem = (item: PublicItem): JustReadFeedItem => ({ id: item.id, title: item.title, url: item.url, @@ -46,9 +60,12 @@ const publicItemToCandidate = (item: PublicItem): Candidate => ({ dir: item.dir, publishedAt: item.publishedAt, subscription: item.source_name, + topic: item.topic, }) -const selectCandidates = async (userId: string): Promise> => { +const selectCandidates = async ( + userId: string +): Promise> => { // get last 100 library items saved and not seen by user const libraryItems = await searchLibraryItems( { @@ -60,8 +77,8 @@ const selectCandidates = async (userId: string): Promise> => { ) // map library items to candidates - const privateCandidates: Array = libraryItems.map( - libraryItemToCandidate + const privateCandidates: Array = libraryItems.map( + libraryItemToFeedItem ) // get candidates from public inventory @@ -70,19 +87,18 @@ const selectCandidates = async (userId: string): Promise> => { }) // map public items to candidates - const publicCandidates: Array = publicItems.map( - publicItemToCandidate - ) + const publicCandidates: Array = + publicItems.map(publicItemToFeedItem) // combine candidates return [...privateCandidates, ...publicCandidates] } -const rankCandidates = async ( - candidates: Array -): Promise> => { - if (candidates.length <= 10) { - return candidates +const rankFeedItems = async ( + feedItems: Array +): Promise> => { + if (feedItems.length <= 10) { + return feedItems } // TODO: rank candidates @@ -93,43 +109,83 @@ const rankCandidates = async ( headers: { 'Content-Type': 'application/json', }, - body: JSON.stringify({ candidates }), + body: JSON.stringify({ candidates: feedItems }), }) if (!response.ok) { throw new Error(`Failed to rank candidates: ${response.statusText}`) } - return response.json() as Promise> + return response.json() as Promise> } -const prependCandidatesToFeed = async ( - candidates: Array, - userId: string -) => { - const redisKey = `just-read-feed:${userId}` +const redisKey = (userId: string) => `just-read-feed:${userId}` + +export const getJustReadFeed = async ( + userId: string, + limit: number, + offset: number +): Promise => { const redisClient = redisDataSource.redisClient if (!redisClient) { throw new Error('Redis client not available') } + const key = redisKey(userId) + + const results = await redisClient.lrange(key, offset, offset + limit - 1) + + const feedItems = results.map((item) => JSON.parse(item) as JustReadFeedItem) + + const topics: Array = [] + + feedItems.forEach((item) => { + const topic = topics.find((topic) => topic.name === item.topic) + if (topic) { + topic.items.push(item) + } else { + topics.push({ + name: item.topic, + thumbnail: item.thumbnail || '', + items: [item], + }) + } + }) + + return { topics } +} + +const prependItemsToFeed = async ( + candidates: Array, + userId: string +) => { + const redisClient = redisDataSource.redisClient + if (!redisClient) { + throw new Error('Redis client not available') + } + + const key = redisKey(userId) + const pipeline = redisClient.pipeline() candidates.forEach((candidate) => - pipeline.lpush(redisKey, JSON.stringify(candidate)) + pipeline.lpush(key, JSON.stringify(candidate)) ) + // keep only the first 100 items + pipeline.ltrim(key, 0, 99) + await pipeline.exec() } -const updateJustReadFeed = async (data: JustReadFeedUpdateData) => { +const updateJustReadFeed = async (data: UpdateJustReadFeedJobData) => { const { userId } = data logger.info(`Updating just read feed for user ${userId}`) - const candidates = await selectCandidates(userId) - logger.info(`Found ${candidates.length} candidates`) + const feedItems = await selectCandidates(userId) + logger.info(`Found ${feedItems.length} candidates`) // TODO: integrity check on candidates? - const rankedCandidates = await rankCandidates(candidates) + const rankedFeedItems = await rankFeedItems(feedItems) - await prependCandidatesToFeed(rankedCandidates, userId) + await prependItemsToFeed(rankedFeedItems, userId) } diff --git a/packages/api/src/resolvers/function_resolvers.ts b/packages/api/src/resolvers/function_resolvers.ts index aa41b724d..91f8b942f 100644 --- a/packages/api/src/resolvers/function_resolvers.ts +++ b/packages/api/src/resolvers/function_resolvers.ts @@ -33,7 +33,6 @@ import { wordsCount, } from '../utils/helpers' import { createImageProxyUrl } from '../utils/imageproxy' -import { logger } from '../utils/logger' import { contentConverter } from '../utils/parser' import { generateDownloadSignedUrl, @@ -153,6 +152,7 @@ import { webhookResolver, webhooksResolver, } from './index' +import { justReadFeedResolver } from './just_read_feed' import { markEmailAsItemResolver, recentEmailsResolver, @@ -360,6 +360,7 @@ export const functionResolvers = { feeds: feedsResolver, scanFeeds: scanFeedsResolver, integration: integrationResolver, + justReadFeed: justReadFeedResolver, }, User: { async intercomHash( @@ -722,4 +723,5 @@ export const functionResolvers = { ...resultResolveTypeResolver('Integration'), ...resultResolveTypeResolver('ExportToIntegration'), ...resultResolveTypeResolver('ReplyToEmail'), + ...resultResolveTypeResolver('JustReadFeed'), } diff --git a/packages/api/src/resolvers/just_read_feed/index.ts b/packages/api/src/resolvers/just_read_feed/index.ts new file mode 100644 index 000000000..92e78a417 --- /dev/null +++ b/packages/api/src/resolvers/just_read_feed/index.ts @@ -0,0 +1,31 @@ +import { + JustReadFeedError, + JustReadFeedSuccess, + QueryJustReadFeedArgs, +} from '../../generated/graphql' +import { getJustReadFeed } from '../../jobs/update_just_read_feed' +import { enqueueUpdateJustReadFeed } from '../../utils/createTask' +import { authorized } from '../../utils/gql-utils' + +export const justReadFeedResolver = authorized< + JustReadFeedSuccess, + JustReadFeedError, + QueryJustReadFeedArgs +>(async (_, { first, after }, { uid, log }) => { + first = first || 10 + after = after || '0' + const offset = parseInt(after, 10) + + const feed = await getJustReadFeed(uid, first, offset) + if (feed.topics.length === 0) { + log.info('No feed items found, updating feed') + + await enqueueUpdateJustReadFeed({ userId: uid }) + + return { + topics: [], + } + } + + return feed +}) diff --git a/packages/api/src/schema.ts b/packages/api/src/schema.ts index c6dd0ed50..9eabe1a73 100755 --- a/packages/api/src/schema.ts +++ b/packages/api/src/schema.ts @@ -3128,7 +3128,6 @@ const schema = gql` updatedAt: Date comments: [String!] author: String - type: String! languageCode: String dir: String seen_at: Date @@ -3350,7 +3349,12 @@ const schema = gql` feeds(input: FeedsInput!): FeedsResult! discoverFeeds: DiscoverFeedResult! scanFeeds(input: ScanFeedsInput!): ScanFeedsResult! - justReadFeed(location: String, language: String): JustReadFeedResult! + justReadFeed( + location: String + language: String + first: Int + after: String + ): JustReadFeedResult! } ` diff --git a/packages/api/src/services/public_item.ts b/packages/api/src/services/just_read_feed.ts similarity index 100% rename from packages/api/src/services/public_item.ts rename to packages/api/src/services/just_read_feed.ts diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index e06c1bb99..5903d8193 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -53,6 +53,10 @@ import { UPDATE_HIGHLIGHT_JOB, UPDATE_LABELS_JOB, } from '../jobs/update_db' +import { + UpdateJustReadFeedJobData, + UPDATE_JUST_READ_FEED_JOB, +} from '../jobs/update_just_read_feed' import { UploadContentJobData, UPLOAD_CONTENT_JOB, @@ -85,6 +89,7 @@ export const getJobPriority = (jobName: string): number => { case UPDATE_HIGHLIGHT_JOB: case SYNC_READ_POSITIONS_JOB_NAME: case SEND_EMAIL_JOB: + case UPDATE_JUST_READ_FEED_JOB: return 1 case TRIGGER_RULE_JOB_NAME: case CALL_WEBHOOK_JOB_NAME: @@ -981,4 +986,21 @@ export const enqueueBulkUploadContentJob = async ( return queue.addBulk(jobs) } +export const enqueueUpdateJustReadFeed = async ( + data: UpdateJustReadFeedJobData +) => { + const queue = await getBackendQueue() + if (!queue) { + return undefined + } + + return queue.add(UPDATE_JUST_READ_FEED_JOB, data, { + jobId: `${UPDATE_JUST_READ_FEED_JOB}_${data.userId}_${JOB_VERSION}`, + removeOnComplete: true, + removeOnFail: true, + priority: getJobPriority(UPDATE_JUST_READ_FEED_JOB), + attempts: 3, + }) +} + export default createHttpTaskWithToken diff --git a/packages/db/migrations/0177.do.public_item.sql b/packages/db/migrations/0177.do.public_item.sql index 597fec5fb..d5c3b31d4 100755 --- a/packages/db/migrations/0177.do.public_item.sql +++ b/packages/db/migrations/0177.do.public_item.sql @@ -26,6 +26,7 @@ CREATE TABLE omnivore.public_item ( type TEXT NOT NULL, -- public feeds, newsletters, or user recommended title TEXT NOT NULL, url TEXT NOT NULL, + topic TEXT NOT NULL, approved BOOLEAN NOT NULL DEFAULT FALSE, thumbnail TEXT, preview_content TEXT, @@ -71,6 +72,8 @@ CREATE INDEX public_item_interaction_public_item_id_idx ON omnivore.public_item_ CREATE TRIGGER update_public_item_interactions_modtime BEFORE UPDATE ON omnivore.public_item_interactions FOR EACH ROW EXECUTE PROCEDURE update_updated_at_column(); -ALTER TABLE omnivore.library_item ADD COLUMN seen_at timestamptz; +ALTER TABLE omnivore.library_item + ADD COLUMN seen_at timestamptz, + ADD COLUMN topic TEXT NOT NULL; COMMIT; diff --git a/packages/db/migrations/0177.undo.public_item.sql b/packages/db/migrations/0177.undo.public_item.sql index 36c5f8165..5b9704f4c 100755 --- a/packages/db/migrations/0177.undo.public_item.sql +++ b/packages/db/migrations/0177.undo.public_item.sql @@ -9,6 +9,8 @@ DROP TABLE omnivore.public_item_stats; DROP TABLE omnivore.public_item; DROP TABLE omnivore.public_item_source; -ALTER TABLE omnivore.library_item DROP COLUMN seen_at; +ALTER TABLE omnivore.library_item + DROP COLUMN seen_at, + DROP COLUMN topic; COMMIT;