diff --git a/packages/api/src/jobs/sync_read_positions.ts b/packages/api/src/jobs/sync_read_positions.ts new file mode 100644 index 000000000..a91182048 --- /dev/null +++ b/packages/api/src/jobs/sync_read_positions.ts @@ -0,0 +1,68 @@ +import Redis from 'ioredis' +import { redisDataSource } from '../redis_data_source' +import { + CACHED_READING_POSITION_PREFIX, + componentsForCachedReadingPositionKey, + fetchCachedReadingPosition, +} from '../services/cached_reading_position' +import { logger } from '../utils/logger' +import { updateLibraryItemReadingProgress } from '../services/library_item' + +export const SYNC_READ_POSITIONS_JOB_NAME = 'sync-read-positions' + +async function* getSyncUpdatesIterator(redis: Redis) { + const match = `${CACHED_READING_POSITION_PREFIX}:*` + let [cursor, batch]: [string | number, string[]] = [0, []] + do { + ;[cursor, batch] = await redis.scan(cursor, 'MATCH', match, 'COUNT', 100) + if (batch.length) { + for (const key of batch) { + yield key + } + } + } while (cursor !== '0') + return +} + +const syncReadPosition = async (cacheKey: string) => { + const components = componentsForCachedReadingPositionKey(cacheKey) + const position = components + ? await fetchCachedReadingPosition(components.uid, components.libraryItemID) + : undefined + if (components && position) { + const result = await updateLibraryItemReadingProgress( + components.libraryItemID, + components.uid, + position.readingProgressPercent, + position.readingProgressTopPercent, + position.readingProgressAnchorIndex + ) + if (!result) { + logger.error('unable to update reading progress', { cacheKey }) + } + } else { + logger.warning( + 'potential error, reading position cache key found with no data', + { cacheKey } + ) + } + // Even if there are errors above we want to delete the key, otherwise + // in error scenarios we could accumulate a huge number of keys for + // something that is not critical (reading position) + const result = await redisDataSource.redisClient?.del(cacheKey) + if (!result || result < 1) { + logger.warning('error deleting cache key', { cacheKey }) + } +} + +export const syncReadPositionsJob = async (data: any, attempts: number) => { + const redis = redisDataSource.redisClient + if (!redis) { + throw new Error('unable to sync reading position, no redis client') + } + + const updates = getSyncUpdatesIterator(redis) + for await (const value of updates) { + await syncReadPosition(value) + } +} diff --git a/packages/api/src/services/cached_reading_position.ts b/packages/api/src/services/cached_reading_position.ts index 3ae1965ac..44e7edc72 100644 --- a/packages/api/src/services/cached_reading_position.ts +++ b/packages/api/src/services/cached_reading_position.ts @@ -10,11 +10,28 @@ export type ReadingProgressCacheItem = { updatedAt: string | undefined } +export const CACHED_READING_POSITION_PREFIX = `omnivore:reading-progress` + export const keyForCachedReadingPosition = ( uid: string, libraryItemID: string ): string => { - return `omnivore:reading-progress:${uid}:${libraryItemID}` + return `${CACHED_READING_POSITION_PREFIX}:${uid}:${libraryItemID}` +} + +export const componentsForCachedReadingPositionKey = ( + cacheKey: string +): { uid: string; libraryItemID: string } | undefined => { + try { + const [_owner, _prefix, uid, libraryItemID] = cacheKey.split(':') + return { + uid, + libraryItemID, + } + } catch (error) { + logger.log('exception getting cache key components', { cacheKey, error }) + } + return undefined } // Reading positions are cached as an array of positions, when @@ -60,6 +77,7 @@ export const fetchCachedReadingPosition = async ( uid: string, libraryItemID: string ): Promise => { + console.log('checking uid', uid, 'libraryItemId', libraryItemID) const cacheKey = keyForCachedReadingPosition(uid, libraryItemID) try { const cacheItemList = await redisDataSource.redisClient?.lrange( @@ -67,7 +85,9 @@ export const fetchCachedReadingPosition = async ( 0, -1 ) + console.log('cacheItemList: ', cacheKey, cacheItemList) const items = cacheItemList?.map((item) => JSON.parse(item)) + console.log(' items[]: ', items) if (!items || items.length < 1) { return undefined }