Merge pull request #3769 from omnivore-app/fix/notion
export last 100 items in one job
This commit is contained in:
@ -1,8 +1,11 @@
|
||||
import { IntegrationType } from '../../entity/integration'
|
||||
import { findIntegration, updateIntegration } from '../../services/integrations'
|
||||
import {
|
||||
findIntegration,
|
||||
getIntegrationClient,
|
||||
updateIntegration,
|
||||
} from '../../services/integrations'
|
||||
import { findRecentLibraryItems } from '../../services/library_item'
|
||||
import { findActiveUser } from '../../services/user'
|
||||
import { enqueueExportItem } from '../../utils/createTask'
|
||||
import { logger } from '../../utils/logger'
|
||||
|
||||
export interface ExportAllItemsJobData {
|
||||
@ -39,17 +42,23 @@ export const exportAllItems = async (jobData: ExportAllItemsJobData) => {
|
||||
return
|
||||
}
|
||||
|
||||
const client = getIntegrationClient(
|
||||
integration.name,
|
||||
integration.token,
|
||||
integration
|
||||
)
|
||||
|
||||
const maxItems = 100
|
||||
const limit = 10
|
||||
let offset = 0
|
||||
// get max 1000 most recent items from the database
|
||||
// get max 100 most recent items from the database
|
||||
while (offset < maxItems) {
|
||||
const libraryItems = await findRecentLibraryItems(userId, limit, offset)
|
||||
if (libraryItems.length === 0) {
|
||||
logger.info('no library items found', {
|
||||
userId,
|
||||
})
|
||||
return
|
||||
break
|
||||
}
|
||||
|
||||
logger.info('enqueuing export item...', {
|
||||
@ -58,24 +67,42 @@ export const exportAllItems = async (jobData: ExportAllItemsJobData) => {
|
||||
integrationId,
|
||||
})
|
||||
|
||||
await enqueueExportItem({
|
||||
userId,
|
||||
libraryItemIds: libraryItems.map((item) => item.id),
|
||||
integrationId,
|
||||
const synced = await client.export(libraryItems)
|
||||
if (!synced) {
|
||||
logger.error('failed to export item', jobData)
|
||||
continue
|
||||
}
|
||||
|
||||
const syncedAt = new Date()
|
||||
logger.info('updating integration...', {
|
||||
...jobData,
|
||||
syncedAt,
|
||||
})
|
||||
|
||||
// update integration syncedAt if successful
|
||||
const updated = await updateIntegration(
|
||||
integration.id,
|
||||
{
|
||||
syncedAt,
|
||||
},
|
||||
userId
|
||||
)
|
||||
logger.info('integration updated', {
|
||||
...jobData,
|
||||
updated,
|
||||
})
|
||||
|
||||
offset += libraryItems.length
|
||||
|
||||
logger.info('exported items', {
|
||||
userId,
|
||||
...jobData,
|
||||
offset,
|
||||
integrationId,
|
||||
})
|
||||
}
|
||||
|
||||
logger.info('exported all items', {
|
||||
userId,
|
||||
integrationId,
|
||||
...jobData,
|
||||
offset,
|
||||
})
|
||||
|
||||
// clear task name in integration
|
||||
|
||||
@ -716,14 +716,23 @@ export const findRecentLibraryItems = async (
|
||||
limit = 1000,
|
||||
offset?: number
|
||||
) => {
|
||||
const selectColumns = getColumns(libraryItemRepository)
|
||||
.filter(
|
||||
(column) => column !== 'readableContent' && column !== 'originalContent'
|
||||
)
|
||||
.map((column) => `library_item.${column}`)
|
||||
|
||||
return authTrx(
|
||||
async (tx) =>
|
||||
tx
|
||||
.createQueryBuilder(LibraryItem, 'library_item')
|
||||
.where('library_item.user_id = :userId', { userId })
|
||||
.andWhere('library_item.state = :state', {
|
||||
state: LibraryItemState.Succeeded,
|
||||
})
|
||||
.select(selectColumns)
|
||||
.leftJoinAndSelect('library_item.labels', 'labels')
|
||||
.leftJoinAndSelect('library_item.highlights', 'highlights')
|
||||
.where(
|
||||
'library_item.user_id = :userId AND library_item.state = :state',
|
||||
{ userId, state: LibraryItemState.Succeeded }
|
||||
)
|
||||
.orderBy('library_item.saved_at', 'DESC', 'NULLS LAST')
|
||||
.take(limit)
|
||||
.skip(offset)
|
||||
|
||||
Reference in New Issue
Block a user