Merge pull request #3608 from omnivore-app/fix/export-job
fix: export-item job takes too long and save-page job was delayed
This commit is contained in:
@ -35,47 +35,41 @@ export const exportItem = async (jobData: ExportItemJobData) => {
|
||||
return
|
||||
}
|
||||
|
||||
await Promise.all(
|
||||
integrations.map(async (integration) => {
|
||||
const logObject = {
|
||||
userId,
|
||||
integrationId: integration.id,
|
||||
}
|
||||
logger.info('exporting item...', logObject)
|
||||
// currently only readwise integration is supported
|
||||
const integration = integrations[0]
|
||||
|
||||
try {
|
||||
const client = getIntegrationClient(integration.name)
|
||||
const logObject = {
|
||||
userId,
|
||||
integrationId: integration.id,
|
||||
}
|
||||
logger.info('exporting item...', logObject)
|
||||
|
||||
const synced = await client.export(integration.token, libraryItems)
|
||||
if (!synced) {
|
||||
logger.error('failed to export item', logObject)
|
||||
return Promise.resolve(false)
|
||||
}
|
||||
const client = getIntegrationClient(integration.name)
|
||||
|
||||
const syncedAt = new Date()
|
||||
logger.info('updating integration...', {
|
||||
...logObject,
|
||||
syncedAt,
|
||||
})
|
||||
const synced = await client.export(integration.token, libraryItems)
|
||||
if (!synced) {
|
||||
logger.error('failed to export item', logObject)
|
||||
return false
|
||||
}
|
||||
|
||||
// update integration syncedAt if successful
|
||||
const updated = await updateIntegration(
|
||||
integration.id,
|
||||
{
|
||||
syncedAt,
|
||||
},
|
||||
userId
|
||||
)
|
||||
logger.info('integration updated', {
|
||||
...logObject,
|
||||
updated,
|
||||
})
|
||||
const syncedAt = new Date()
|
||||
logger.info('updating integration...', {
|
||||
...logObject,
|
||||
syncedAt,
|
||||
})
|
||||
|
||||
return Promise.resolve(true)
|
||||
} catch (err) {
|
||||
logger.error('export with integration failed', err)
|
||||
return Promise.resolve(false)
|
||||
}
|
||||
})
|
||||
// update integration syncedAt if successful
|
||||
const updated = await updateIntegration(
|
||||
integration.id,
|
||||
{
|
||||
syncedAt,
|
||||
},
|
||||
userId
|
||||
)
|
||||
logger.info('integration updated', {
|
||||
...logObject,
|
||||
updated,
|
||||
})
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import axios from 'axios'
|
||||
import { LibraryItem } from '../../entity/library_item'
|
||||
import { highlightUrl, wait } from '../../utils/helpers'
|
||||
import { highlightUrl } from '../../utils/helpers'
|
||||
import { logger } from '../../utils/logger'
|
||||
import { IntegrationClient } from './integration'
|
||||
|
||||
@ -95,40 +95,22 @@ export class ReadwiseClient implements IntegrationClient {
|
||||
|
||||
syncWithReadwise = async (
|
||||
token: string,
|
||||
highlights: ReadwiseHighlight[],
|
||||
retryCount = 0
|
||||
highlights: ReadwiseHighlight[]
|
||||
): Promise<boolean> => {
|
||||
const url = `${this.apiUrl}/highlights`
|
||||
try {
|
||||
const response = await axios.post(
|
||||
url,
|
||||
{
|
||||
highlights,
|
||||
const response = await axios.post(
|
||||
url,
|
||||
{
|
||||
highlights,
|
||||
},
|
||||
{
|
||||
headers: {
|
||||
Authorization: `Token ${token}`,
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
{
|
||||
headers: {
|
||||
Authorization: `Token ${token}`,
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
timeout: 5000, // 5 seconds
|
||||
}
|
||||
)
|
||||
return response.status === 200
|
||||
} catch (error) {
|
||||
console.error(error)
|
||||
|
||||
if (axios.isAxiosError(error)) {
|
||||
if (error.response?.status === 429 && retryCount < 3) {
|
||||
console.log('Readwise API rate limit exceeded, retrying...')
|
||||
// wait for Retry-After seconds in the header if rate limited
|
||||
// max retry count is 3
|
||||
const retryAfter = error.response?.headers['retry-after'] || '10' // default to 10 seconds
|
||||
await wait(parseInt(retryAfter, 10) * 1000)
|
||||
return this.syncWithReadwise(token, highlights, retryCount + 1)
|
||||
}
|
||||
timeout: 5000, // 5 seconds
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
)
|
||||
return response.status === 200
|
||||
}
|
||||
}
|
||||
|
||||
@ -66,13 +66,13 @@ export const getJobPriority = (jobName: string): number => {
|
||||
return 1
|
||||
case TRIGGER_RULE_JOB_NAME:
|
||||
case CALL_WEBHOOK_JOB_NAME:
|
||||
case EXPORT_ITEM_JOB_NAME:
|
||||
case AI_SUMMARIZE_JOB_NAME:
|
||||
return 5
|
||||
case BULK_ACTION_JOB_NAME:
|
||||
case `${REFRESH_FEED_JOB_NAME}_high`:
|
||||
return 10
|
||||
case `${REFRESH_FEED_JOB_NAME}_low`:
|
||||
case EXPORT_ITEM_JOB_NAME:
|
||||
return 50
|
||||
case EXPORT_ALL_ITEMS_JOB_NAME:
|
||||
case REFRESH_ALL_FEEDS_JOB_NAME:
|
||||
@ -781,8 +781,12 @@ export const enqueueExportItem = async (jobData: ExportItemJobData) => {
|
||||
}
|
||||
|
||||
return queue.add(EXPORT_ITEM_JOB_NAME, jobData, {
|
||||
attempts: 1,
|
||||
attempts: 3,
|
||||
priority: getJobPriority(EXPORT_ITEM_JOB_NAME),
|
||||
backoff: {
|
||||
type: 'exponential',
|
||||
delay: 10000, // 10 seconds
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user