diff --git a/packages/api/src/datasources/reading_progress_data_source.ts b/packages/api/src/datasources/reading_progress_data_source.ts index 1421bedcf..5f9e7358b 100644 --- a/packages/api/src/datasources/reading_progress_data_source.ts +++ b/packages/api/src/datasources/reading_progress_data_source.ts @@ -4,80 +4,64 @@ type ReadingProgressCacheItem = { readingProgressPercent: number readingProgressTopPercent: number | undefined readingProgressAnchorIndex: number | undefined - updatedAt: Date + updatedAt: string } export class ReadingProgressDataSource { private cacheItems: { [id: string]: ReadingProgressCacheItem } = {} + constructor() {} + async getReadingProgress( + uid: string, libraryItemID: string ): Promise { - const cacheKey = `omnivore:reading-progress:${libraryItemID}` + const cacheKey = `omnivore:reading-progress:${uid}:${libraryItemID}` const cached = this.cacheItems[cacheKey] if (cached) { return cached } - return this.valueFromRedis(libraryItemID) + return this.valueFromRedis(cacheKey) } async updateReadingProgress( + uid: string, libraryItemID: string, progress: { readingProgressPercent: number - readingProgressTopPercent: number | undefined | null - readingProgressAnchorIndex: number | undefined | null + readingProgressTopPercent: number | undefined + readingProgressAnchorIndex: number | undefined } ): Promise { - const cacheKey = `omnivore:reading-progress:${libraryItemID}` - const existingItem = await this.valueFromRedis(cacheKey) - const cacheItem = { - readingProgressPercent: Math.max( - progress.readingProgressPercent, - existingItem?.readingProgressPercent ?? 0 - ), - readingProgressTopPercent: Math.max( - progress.readingProgressTopPercent ?? 0, - existingItem?.readingProgressTopPercent ?? 0 - ), - readingProgressAnchorIndex: Math.max( - progress.readingProgressAnchorIndex ?? 0, - existingItem?.readingProgressAnchorIndex ?? 0 - ), - updatedAt: new Date(), + const cacheKey = `omnivore:reading-progress:${uid}:${libraryItemID}` + const cacheItem: ReadingProgressCacheItem = { + ...progress, + updatedAt: new Date().toISOString(), } this.cacheItems[cacheKey] = cacheItem - if (await redisDataSource.redisClient?.hmset(cacheKey, cacheItem)) { - console.log('cached reading progress') + if ( + await redisDataSource.redisClient?.lpush( + cacheKey, + JSON.stringify(cacheItem) + ) + ) { + console.log('cached reading progress', cacheKey) } else { console.log('failed to cache reading progress') } } async valueFromRedis( - libraryItemID: string + cacheKey: string ): Promise { - const cacheKey = `omnivore:reading-progress:${libraryItemID}` - const redisCached = await redisDataSource.redisClient?.hgetall(cacheKey) - if (redisCached) { - const readingProgressPercent = parseInt( - redisCached.readingProgressPercent, - 10 - ) - const updatedAt = new Date(parseInt(redisCached.updatedAt, 10)) - if (!Number.isNaN(readingProgressPercent) && updatedAt) { - return { - readingProgressPercent, - readingProgressTopPercent: redisCached.readingProgressTopPercent - ? parseInt(redisCached.readingProgressTopPercent, 10) - : undefined, - readingProgressAnchorIndex: redisCached.readingProgressAnchorIndex - ? parseInt(redisCached.readingProgressAnchorIndex, 10) - : undefined, - updatedAt, - } - } + const redisCached = await redisDataSource.redisClient?.lrange( + cacheKey, + 0, + 0 + ) + if (redisCached && redisCached.length > 0) { + return JSON.parse(redisCached[0]) } return undefined } diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index ebf6fcfdf..afaa5b3ab 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -28,6 +28,10 @@ import { import { updatePDFContentJob } from './jobs/update_pdf_content' import { redisDataSource } from './redis_data_source' import { CustomTypeOrmLogger } from './utils/logger' +import { + SYNC_READ_POSITIONS_JOB_NAME, + syncReadPositionsJob, +} from './jobs/sync_read_positions' export const QUEUE_NAME = 'omnivore-backend-queue' @@ -152,6 +156,21 @@ const main = async () => { const worker = createWorker(workerRedisClient) + const queue = await getBackendQueue() + if (queue) { + await queue.add( + SYNC_READ_POSITIONS_JOB_NAME, + {}, + { + priority: 1, + repeat: { + every: 10000, + limit: 100, + }, + } + ) + } + const queueEvents = new QueueEvents(QUEUE_NAME, { connection: workerRedisClient, }) diff --git a/packages/api/src/resolvers/article/index.ts b/packages/api/src/resolvers/article/index.ts index 5a5aa38f9..2b95913f8 100644 --- a/packages/api/src/resolvers/article/index.ts +++ b/packages/api/src/resolvers/article/index.ts @@ -60,7 +60,7 @@ import { UpdatesSinceError, UpdatesSinceSuccess, } from '../../generated/graphql' -import { getColumns } from '../../repository' +import { authTrx, getColumns } from '../../repository' import { getInternalLabelWithColor } from '../../repository/label' import { libraryItemRepository } from '../../repository/library_item' import { userRepository } from '../../repository/user' @@ -640,19 +640,37 @@ export const saveArticleReadingProgressResolver = authorized< } } - dataSources.readingProgress.updateReadingProgress(id, { - readingProgressPercent, - readingProgressTopPercent, - readingProgressAnchorIndex, - }) - // update reading progress only if the current value is lower - const updatedItem = await updateLibraryItemReadingProgress( - id, - uid, - readingProgressPercent, - readingProgressTopPercent, - readingProgressAnchorIndex - ) + let updatedItem: LibraryItem | null + if (env.redis.cache && env.redis.mq) { + // If redis caching and queueing are available we delay this write + dataSources.readingProgress.updateReadingProgress(uid, id, { + readingProgressPercent, + readingProgressTopPercent: readingProgressTopPercent ?? undefined, + readingProgressAnchorIndex: readingProgressAnchorIndex ?? undefined, + }) + + updatedItem = await authTrx( + async (t) => { + return t.getRepository(LibraryItem).findOne({ + where: { + id, + }, + }) + }, + undefined, + uid + ) + } else { + // update reading progress only if the current value is lower + updatedItem = await updateLibraryItemReadingProgress( + id, + uid, + readingProgressPercent, + readingProgressTopPercent, + readingProgressAnchorIndex + ) + } + if (!updatedItem) { return { errorCodes: [SaveArticleReadingProgressErrorCode.BadData] } } diff --git a/packages/api/src/resolvers/function_resolvers.ts b/packages/api/src/resolvers/function_resolvers.ts index bcc17360b..8d3fb3187 100644 --- a/packages/api/src/resolvers/function_resolvers.ts +++ b/packages/api/src/resolvers/function_resolvers.ts @@ -158,6 +158,60 @@ const resultResolveTypeResolver = ( }, }) +const readingProgressHandlers = { + async readingProgressPercent( + article: { id: string; readingProgressPercent?: number }, + _: unknown, + ctx: WithDataSourcesContext + ) { + if (ctx.claims?.uid) { + const readingProgress = + await ctx.dataSources.readingProgress.getReadingProgress( + ctx.claims?.uid, + article.id + ) + if (readingProgress) { + return readingProgress.readingProgressPercent + } + } + return article.readingProgressPercent + }, + async readingProgressAnchorIndex( + article: { id: string; readingProgressAnchorIndex?: number }, + _: unknown, + ctx: WithDataSourcesContext + ) { + if (ctx.claims?.uid) { + const readingProgress = + await ctx.dataSources.readingProgress.getReadingProgress( + ctx.claims?.uid, + article.id + ) + if (readingProgress) { + return readingProgress.readingProgressAnchorIndex + } + } + return article.readingProgressAnchorIndex + }, + async readingProgressTopPercent( + article: { id: string; readingProgressTopPercent?: number }, + _: unknown, + ctx: WithDataSourcesContext + ) { + if (ctx.claims?.uid) { + const readingProgress = + await ctx.dataSources.readingProgress.getReadingProgress( + ctx.claims?.uid, + article.id + ) + if (readingProgress) { + return readingProgress.readingProgressTopPercent + } + } + return article.readingProgressTopPercent + }, +} + // Provide resolver functions for your schema fields export const functionResolvers = { Mutation: { @@ -328,42 +382,7 @@ export const functionResolvers = { return findLabelsByLibraryItemId(article.id, ctx.uid) }, - async readingProgressPercent( - article: { id: string; readingProgressPercent?: number }, - _: unknown, - ctx: WithDataSourcesContext - ) { - const readingProgress = - await ctx.dataSources.readingProgress.getReadingProgress(article.id) - if (readingProgress) { - return readingProgress.readingProgressPercent - } - return article.readingProgressPercent - }, - async readingProgressAnchorIndex( - article: { id: string; readingProgressAnchorIndex?: number }, - _: unknown, - ctx: WithDataSourcesContext - ) { - const readingProgress = - await ctx.dataSources.readingProgress.getReadingProgress(article.id) - if (readingProgress) { - return readingProgress.readingProgressAnchorIndex - } - return article.readingProgressAnchorIndex - }, - async readingProgressTopPercent( - article: { id: string; readingProgressTopPercent?: number }, - _: unknown, - ctx: WithDataSourcesContext - ) { - const readingProgress = - await ctx.dataSources.readingProgress.getReadingProgress(article.id) - if (readingProgress) { - return readingProgress.readingProgressTopPercent - } - return article.readingProgressTopPercent - }, + ...readingProgressHandlers, }, Highlight: { // async reactions( @@ -469,42 +488,7 @@ export const functionResolvers = { const highlights = await findHighlightsByLibraryItemId(item.id, ctx.uid) return highlights.map(highlightDataToHighlight) }, - async readingProgressPercent( - article: { id: string; readingProgressPercent?: number }, - _: unknown, - ctx: WithDataSourcesContext - ) { - const readingProgress = - await ctx.dataSources.readingProgress.getReadingProgress(article.id) - if (readingProgress) { - return readingProgress.readingProgressPercent - } - return article.readingProgressPercent - }, - async readingProgressAnchorIndex( - article: { id: string; readingProgressAnchorIndex?: number }, - _: unknown, - ctx: WithDataSourcesContext - ) { - const readingProgress = - await ctx.dataSources.readingProgress.getReadingProgress(article.id) - if (readingProgress) { - return readingProgress.readingProgressAnchorIndex - } - return article.readingProgressAnchorIndex - }, - async readingProgressTopPercent( - article: { id: string; readingProgressTopPercent?: number }, - _: unknown, - ctx: WithDataSourcesContext - ) { - const readingProgress = - await ctx.dataSources.readingProgress.getReadingProgress(article.id) - if (readingProgress) { - return readingProgress.readingProgressTopPercent - } - return article.readingProgressTopPercent - }, + ...readingProgressHandlers, }, Subscription: { newsletterEmail(subscription: Subscription) { diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index 6a30a9ccd..b05387574 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -666,6 +666,7 @@ export const enqueueTriggerRuleJob = async (data: TriggerRuleJobData) => { attempts: 1, removeOnComplete: true, removeOnFail: true, + priority: 1, }) } diff --git a/packages/web/components/templates/article/Article.tsx b/packages/web/components/templates/article/Article.tsx index 2c3f4a5e4..091ad7e92 100644 --- a/packages/web/components/templates/article/Article.tsx +++ b/packages/web/components/templates/article/Article.tsx @@ -92,7 +92,7 @@ export function Article(props: ArticleProps): JSX.Element { setReadingProgress(bottomProgress * 100) } - }, 2500) + }, 3500) // Scroll to initial anchor position useEffect(() => {