Write behind reading progress as a list that can be reduced at commit or read time
This commit is contained in:
@ -4,80 +4,64 @@ type ReadingProgressCacheItem = {
|
||||
readingProgressPercent: number
|
||||
readingProgressTopPercent: number | undefined
|
||||
readingProgressAnchorIndex: number | undefined
|
||||
updatedAt: Date
|
||||
updatedAt: string
|
||||
}
|
||||
|
||||
export class ReadingProgressDataSource {
|
||||
private cacheItems: { [id: string]: ReadingProgressCacheItem } = {}
|
||||
|
||||
constructor() {}
|
||||
|
||||
async getReadingProgress(
|
||||
uid: string,
|
||||
libraryItemID: string
|
||||
): Promise<ReadingProgressCacheItem | undefined> {
|
||||
const cacheKey = `omnivore:reading-progress:${libraryItemID}`
|
||||
const cacheKey = `omnivore:reading-progress:${uid}:${libraryItemID}`
|
||||
const cached = this.cacheItems[cacheKey]
|
||||
if (cached) {
|
||||
return cached
|
||||
}
|
||||
return this.valueFromRedis(libraryItemID)
|
||||
return this.valueFromRedis(cacheKey)
|
||||
}
|
||||
|
||||
async updateReadingProgress(
|
||||
uid: string,
|
||||
libraryItemID: string,
|
||||
progress: {
|
||||
readingProgressPercent: number
|
||||
readingProgressTopPercent: number | undefined | null
|
||||
readingProgressAnchorIndex: number | undefined | null
|
||||
readingProgressTopPercent: number | undefined
|
||||
readingProgressAnchorIndex: number | undefined
|
||||
}
|
||||
): Promise<void> {
|
||||
const cacheKey = `omnivore:reading-progress:${libraryItemID}`
|
||||
const existingItem = await this.valueFromRedis(cacheKey)
|
||||
const cacheItem = {
|
||||
readingProgressPercent: Math.max(
|
||||
progress.readingProgressPercent,
|
||||
existingItem?.readingProgressPercent ?? 0
|
||||
),
|
||||
readingProgressTopPercent: Math.max(
|
||||
progress.readingProgressTopPercent ?? 0,
|
||||
existingItem?.readingProgressTopPercent ?? 0
|
||||
),
|
||||
readingProgressAnchorIndex: Math.max(
|
||||
progress.readingProgressAnchorIndex ?? 0,
|
||||
existingItem?.readingProgressAnchorIndex ?? 0
|
||||
),
|
||||
updatedAt: new Date(),
|
||||
const cacheKey = `omnivore:reading-progress:${uid}:${libraryItemID}`
|
||||
const cacheItem: ReadingProgressCacheItem = {
|
||||
...progress,
|
||||
updatedAt: new Date().toISOString(),
|
||||
}
|
||||
|
||||
this.cacheItems[cacheKey] = cacheItem
|
||||
if (await redisDataSource.redisClient?.hmset(cacheKey, cacheItem)) {
|
||||
console.log('cached reading progress')
|
||||
if (
|
||||
await redisDataSource.redisClient?.lpush(
|
||||
cacheKey,
|
||||
JSON.stringify(cacheItem)
|
||||
)
|
||||
) {
|
||||
console.log('cached reading progress', cacheKey)
|
||||
} else {
|
||||
console.log('failed to cache reading progress')
|
||||
}
|
||||
}
|
||||
|
||||
async valueFromRedis(
|
||||
libraryItemID: string
|
||||
cacheKey: string
|
||||
): Promise<ReadingProgressCacheItem | undefined> {
|
||||
const cacheKey = `omnivore:reading-progress:${libraryItemID}`
|
||||
const redisCached = await redisDataSource.redisClient?.hgetall(cacheKey)
|
||||
if (redisCached) {
|
||||
const readingProgressPercent = parseInt(
|
||||
redisCached.readingProgressPercent,
|
||||
10
|
||||
)
|
||||
const updatedAt = new Date(parseInt(redisCached.updatedAt, 10))
|
||||
if (!Number.isNaN(readingProgressPercent) && updatedAt) {
|
||||
return {
|
||||
readingProgressPercent,
|
||||
readingProgressTopPercent: redisCached.readingProgressTopPercent
|
||||
? parseInt(redisCached.readingProgressTopPercent, 10)
|
||||
: undefined,
|
||||
readingProgressAnchorIndex: redisCached.readingProgressAnchorIndex
|
||||
? parseInt(redisCached.readingProgressAnchorIndex, 10)
|
||||
: undefined,
|
||||
updatedAt,
|
||||
}
|
||||
}
|
||||
const redisCached = await redisDataSource.redisClient?.lrange(
|
||||
cacheKey,
|
||||
0,
|
||||
0
|
||||
)
|
||||
if (redisCached && redisCached.length > 0) {
|
||||
return JSON.parse(redisCached[0])
|
||||
}
|
||||
return undefined
|
||||
}
|
||||
|
||||
@ -28,6 +28,10 @@ import {
|
||||
import { updatePDFContentJob } from './jobs/update_pdf_content'
|
||||
import { redisDataSource } from './redis_data_source'
|
||||
import { CustomTypeOrmLogger } from './utils/logger'
|
||||
import {
|
||||
SYNC_READ_POSITIONS_JOB_NAME,
|
||||
syncReadPositionsJob,
|
||||
} from './jobs/sync_read_positions'
|
||||
|
||||
export const QUEUE_NAME = 'omnivore-backend-queue'
|
||||
|
||||
@ -152,6 +156,21 @@ const main = async () => {
|
||||
|
||||
const worker = createWorker(workerRedisClient)
|
||||
|
||||
const queue = await getBackendQueue()
|
||||
if (queue) {
|
||||
await queue.add(
|
||||
SYNC_READ_POSITIONS_JOB_NAME,
|
||||
{},
|
||||
{
|
||||
priority: 1,
|
||||
repeat: {
|
||||
every: 10000,
|
||||
limit: 100,
|
||||
},
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
const queueEvents = new QueueEvents(QUEUE_NAME, {
|
||||
connection: workerRedisClient,
|
||||
})
|
||||
|
||||
@ -60,7 +60,7 @@ import {
|
||||
UpdatesSinceError,
|
||||
UpdatesSinceSuccess,
|
||||
} from '../../generated/graphql'
|
||||
import { getColumns } from '../../repository'
|
||||
import { authTrx, getColumns } from '../../repository'
|
||||
import { getInternalLabelWithColor } from '../../repository/label'
|
||||
import { libraryItemRepository } from '../../repository/library_item'
|
||||
import { userRepository } from '../../repository/user'
|
||||
@ -640,19 +640,37 @@ export const saveArticleReadingProgressResolver = authorized<
|
||||
}
|
||||
}
|
||||
|
||||
dataSources.readingProgress.updateReadingProgress(id, {
|
||||
readingProgressPercent,
|
||||
readingProgressTopPercent,
|
||||
readingProgressAnchorIndex,
|
||||
})
|
||||
// update reading progress only if the current value is lower
|
||||
const updatedItem = await updateLibraryItemReadingProgress(
|
||||
id,
|
||||
uid,
|
||||
readingProgressPercent,
|
||||
readingProgressTopPercent,
|
||||
readingProgressAnchorIndex
|
||||
)
|
||||
let updatedItem: LibraryItem | null
|
||||
if (env.redis.cache && env.redis.mq) {
|
||||
// If redis caching and queueing are available we delay this write
|
||||
dataSources.readingProgress.updateReadingProgress(uid, id, {
|
||||
readingProgressPercent,
|
||||
readingProgressTopPercent: readingProgressTopPercent ?? undefined,
|
||||
readingProgressAnchorIndex: readingProgressAnchorIndex ?? undefined,
|
||||
})
|
||||
|
||||
updatedItem = await authTrx(
|
||||
async (t) => {
|
||||
return t.getRepository(LibraryItem).findOne({
|
||||
where: {
|
||||
id,
|
||||
},
|
||||
})
|
||||
},
|
||||
undefined,
|
||||
uid
|
||||
)
|
||||
} else {
|
||||
// update reading progress only if the current value is lower
|
||||
updatedItem = await updateLibraryItemReadingProgress(
|
||||
id,
|
||||
uid,
|
||||
readingProgressPercent,
|
||||
readingProgressTopPercent,
|
||||
readingProgressAnchorIndex
|
||||
)
|
||||
}
|
||||
|
||||
if (!updatedItem) {
|
||||
return { errorCodes: [SaveArticleReadingProgressErrorCode.BadData] }
|
||||
}
|
||||
|
||||
@ -158,6 +158,60 @@ const resultResolveTypeResolver = (
|
||||
},
|
||||
})
|
||||
|
||||
const readingProgressHandlers = {
|
||||
async readingProgressPercent(
|
||||
article: { id: string; readingProgressPercent?: number },
|
||||
_: unknown,
|
||||
ctx: WithDataSourcesContext
|
||||
) {
|
||||
if (ctx.claims?.uid) {
|
||||
const readingProgress =
|
||||
await ctx.dataSources.readingProgress.getReadingProgress(
|
||||
ctx.claims?.uid,
|
||||
article.id
|
||||
)
|
||||
if (readingProgress) {
|
||||
return readingProgress.readingProgressPercent
|
||||
}
|
||||
}
|
||||
return article.readingProgressPercent
|
||||
},
|
||||
async readingProgressAnchorIndex(
|
||||
article: { id: string; readingProgressAnchorIndex?: number },
|
||||
_: unknown,
|
||||
ctx: WithDataSourcesContext
|
||||
) {
|
||||
if (ctx.claims?.uid) {
|
||||
const readingProgress =
|
||||
await ctx.dataSources.readingProgress.getReadingProgress(
|
||||
ctx.claims?.uid,
|
||||
article.id
|
||||
)
|
||||
if (readingProgress) {
|
||||
return readingProgress.readingProgressAnchorIndex
|
||||
}
|
||||
}
|
||||
return article.readingProgressAnchorIndex
|
||||
},
|
||||
async readingProgressTopPercent(
|
||||
article: { id: string; readingProgressTopPercent?: number },
|
||||
_: unknown,
|
||||
ctx: WithDataSourcesContext
|
||||
) {
|
||||
if (ctx.claims?.uid) {
|
||||
const readingProgress =
|
||||
await ctx.dataSources.readingProgress.getReadingProgress(
|
||||
ctx.claims?.uid,
|
||||
article.id
|
||||
)
|
||||
if (readingProgress) {
|
||||
return readingProgress.readingProgressTopPercent
|
||||
}
|
||||
}
|
||||
return article.readingProgressTopPercent
|
||||
},
|
||||
}
|
||||
|
||||
// Provide resolver functions for your schema fields
|
||||
export const functionResolvers = {
|
||||
Mutation: {
|
||||
@ -328,42 +382,7 @@ export const functionResolvers = {
|
||||
|
||||
return findLabelsByLibraryItemId(article.id, ctx.uid)
|
||||
},
|
||||
async readingProgressPercent(
|
||||
article: { id: string; readingProgressPercent?: number },
|
||||
_: unknown,
|
||||
ctx: WithDataSourcesContext
|
||||
) {
|
||||
const readingProgress =
|
||||
await ctx.dataSources.readingProgress.getReadingProgress(article.id)
|
||||
if (readingProgress) {
|
||||
return readingProgress.readingProgressPercent
|
||||
}
|
||||
return article.readingProgressPercent
|
||||
},
|
||||
async readingProgressAnchorIndex(
|
||||
article: { id: string; readingProgressAnchorIndex?: number },
|
||||
_: unknown,
|
||||
ctx: WithDataSourcesContext
|
||||
) {
|
||||
const readingProgress =
|
||||
await ctx.dataSources.readingProgress.getReadingProgress(article.id)
|
||||
if (readingProgress) {
|
||||
return readingProgress.readingProgressAnchorIndex
|
||||
}
|
||||
return article.readingProgressAnchorIndex
|
||||
},
|
||||
async readingProgressTopPercent(
|
||||
article: { id: string; readingProgressTopPercent?: number },
|
||||
_: unknown,
|
||||
ctx: WithDataSourcesContext
|
||||
) {
|
||||
const readingProgress =
|
||||
await ctx.dataSources.readingProgress.getReadingProgress(article.id)
|
||||
if (readingProgress) {
|
||||
return readingProgress.readingProgressTopPercent
|
||||
}
|
||||
return article.readingProgressTopPercent
|
||||
},
|
||||
...readingProgressHandlers,
|
||||
},
|
||||
Highlight: {
|
||||
// async reactions(
|
||||
@ -469,42 +488,7 @@ export const functionResolvers = {
|
||||
const highlights = await findHighlightsByLibraryItemId(item.id, ctx.uid)
|
||||
return highlights.map(highlightDataToHighlight)
|
||||
},
|
||||
async readingProgressPercent(
|
||||
article: { id: string; readingProgressPercent?: number },
|
||||
_: unknown,
|
||||
ctx: WithDataSourcesContext
|
||||
) {
|
||||
const readingProgress =
|
||||
await ctx.dataSources.readingProgress.getReadingProgress(article.id)
|
||||
if (readingProgress) {
|
||||
return readingProgress.readingProgressPercent
|
||||
}
|
||||
return article.readingProgressPercent
|
||||
},
|
||||
async readingProgressAnchorIndex(
|
||||
article: { id: string; readingProgressAnchorIndex?: number },
|
||||
_: unknown,
|
||||
ctx: WithDataSourcesContext
|
||||
) {
|
||||
const readingProgress =
|
||||
await ctx.dataSources.readingProgress.getReadingProgress(article.id)
|
||||
if (readingProgress) {
|
||||
return readingProgress.readingProgressAnchorIndex
|
||||
}
|
||||
return article.readingProgressAnchorIndex
|
||||
},
|
||||
async readingProgressTopPercent(
|
||||
article: { id: string; readingProgressTopPercent?: number },
|
||||
_: unknown,
|
||||
ctx: WithDataSourcesContext
|
||||
) {
|
||||
const readingProgress =
|
||||
await ctx.dataSources.readingProgress.getReadingProgress(article.id)
|
||||
if (readingProgress) {
|
||||
return readingProgress.readingProgressTopPercent
|
||||
}
|
||||
return article.readingProgressTopPercent
|
||||
},
|
||||
...readingProgressHandlers,
|
||||
},
|
||||
Subscription: {
|
||||
newsletterEmail(subscription: Subscription) {
|
||||
|
||||
@ -666,6 +666,7 @@ export const enqueueTriggerRuleJob = async (data: TriggerRuleJobData) => {
|
||||
attempts: 1,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
priority: 1,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@ -92,7 +92,7 @@ export function Article(props: ArticleProps): JSX.Element {
|
||||
|
||||
setReadingProgress(bottomProgress * 100)
|
||||
}
|
||||
}, 2500)
|
||||
}, 3500)
|
||||
|
||||
// Scroll to initial anchor position
|
||||
useEffect(() => {
|
||||
|
||||
Reference in New Issue
Block a user