Add job to sync read position data

This commit is contained in:
Jackson Harper
2024-01-31 12:15:32 +08:00
parent a40f5bed55
commit 7bb3718a8f
2 changed files with 89 additions and 1 deletions

View File

@ -0,0 +1,68 @@
import Redis from 'ioredis'
import { redisDataSource } from '../redis_data_source'
import {
CACHED_READING_POSITION_PREFIX,
componentsForCachedReadingPositionKey,
fetchCachedReadingPosition,
} from '../services/cached_reading_position'
import { logger } from '../utils/logger'
import { updateLibraryItemReadingProgress } from '../services/library_item'
export const SYNC_READ_POSITIONS_JOB_NAME = 'sync-read-positions'
async function* getSyncUpdatesIterator(redis: Redis) {
const match = `${CACHED_READING_POSITION_PREFIX}:*`
let [cursor, batch]: [string | number, string[]] = [0, []]
do {
;[cursor, batch] = await redis.scan(cursor, 'MATCH', match, 'COUNT', 100)
if (batch.length) {
for (const key of batch) {
yield key
}
}
} while (cursor !== '0')
return
}
const syncReadPosition = async (cacheKey: string) => {
const components = componentsForCachedReadingPositionKey(cacheKey)
const position = components
? await fetchCachedReadingPosition(components.uid, components.libraryItemID)
: undefined
if (components && position) {
const result = await updateLibraryItemReadingProgress(
components.libraryItemID,
components.uid,
position.readingProgressPercent,
position.readingProgressTopPercent,
position.readingProgressAnchorIndex
)
if (!result) {
logger.error('unable to update reading progress', { cacheKey })
}
} else {
logger.warning(
'potential error, reading position cache key found with no data',
{ cacheKey }
)
}
// Even if there are errors above we want to delete the key, otherwise
// in error scenarios we could accumulate a huge number of keys for
// something that is not critical (reading position)
const result = await redisDataSource.redisClient?.del(cacheKey)
if (!result || result < 1) {
logger.warning('error deleting cache key', { cacheKey })
}
}
export const syncReadPositionsJob = async (data: any, attempts: number) => {
const redis = redisDataSource.redisClient
if (!redis) {
throw new Error('unable to sync reading position, no redis client')
}
const updates = getSyncUpdatesIterator(redis)
for await (const value of updates) {
await syncReadPosition(value)
}
}

View File

@ -10,11 +10,28 @@ export type ReadingProgressCacheItem = {
updatedAt: string | undefined
}
export const CACHED_READING_POSITION_PREFIX = `omnivore:reading-progress`
export const keyForCachedReadingPosition = (
uid: string,
libraryItemID: string
): string => {
return `omnivore:reading-progress:${uid}:${libraryItemID}`
return `${CACHED_READING_POSITION_PREFIX}:${uid}:${libraryItemID}`
}
export const componentsForCachedReadingPositionKey = (
cacheKey: string
): { uid: string; libraryItemID: string } | undefined => {
try {
const [_owner, _prefix, uid, libraryItemID] = cacheKey.split(':')
return {
uid,
libraryItemID,
}
} catch (error) {
logger.log('exception getting cache key components', { cacheKey, error })
}
return undefined
}
// Reading positions are cached as an array of positions, when
@ -60,6 +77,7 @@ export const fetchCachedReadingPosition = async (
uid: string,
libraryItemID: string
): Promise<ReadingProgressCacheItem | undefined> => {
console.log('checking uid', uid, 'libraryItemId', libraryItemID)
const cacheKey = keyForCachedReadingPosition(uid, libraryItemID)
try {
const cacheItemList = await redisDataSource.redisClient?.lrange(
@ -67,7 +85,9 @@ export const fetchCachedReadingPosition = async (
0,
-1
)
console.log('cacheItemList: ', cacheKey, cacheItemList)
const items = cacheItemList?.map((item) => JSON.parse(item))
console.log(' items[]: ', items)
if (!items || items.length < 1) {
return undefined
}