Merge pull request #3172 from omnivore-app/fix/pocket-importer
fix pocket importer stops running when there is no unarchived items in the batch
This commit is contained in:
@ -187,11 +187,11 @@ export const importer = Sentry.GCPFunction.wrapHttpFunction(
|
||||
let offset = 0
|
||||
let syncedAt = req.body.syncAt
|
||||
const since = syncedAt
|
||||
const state = req.body.state || State.UNARCHIVED // default to unarchived
|
||||
const stateToImport = req.body.state || State.UNARCHIVED // default to unarchived
|
||||
|
||||
console.log('importing pages from integration...', {
|
||||
userId,
|
||||
state,
|
||||
stateToImport,
|
||||
since,
|
||||
})
|
||||
// get pages from integration
|
||||
@ -199,7 +199,7 @@ export const importer = Sentry.GCPFunction.wrapHttpFunction(
|
||||
token: claims.token,
|
||||
since,
|
||||
offset,
|
||||
state,
|
||||
state: stateToImport,
|
||||
})
|
||||
syncedAt = retrieved.since || Date.now()
|
||||
let retrievedData = retrieved.data
|
||||
@ -232,7 +232,20 @@ export const importer = Sentry.GCPFunction.wrapHttpFunction(
|
||||
// paginate api calls to the integration
|
||||
do {
|
||||
// write the list of urls, state and labels to the stream
|
||||
retrievedData.forEach((row) => stringifier.write(row))
|
||||
retrievedData
|
||||
.filter((row) => {
|
||||
// filter out items that are deleted
|
||||
if (row.state === State.DELETED) {
|
||||
return false
|
||||
}
|
||||
|
||||
// filter out items that archived if the stateToImport is unarchived
|
||||
return (
|
||||
stateToImport !== State.UNARCHIVED ||
|
||||
row.state !== State.ARCHIVED
|
||||
)
|
||||
})
|
||||
.forEach((row) => stringifier.write(row))
|
||||
|
||||
// get next pages from the integration
|
||||
offset += retrievedData.length
|
||||
@ -241,7 +254,7 @@ export const importer = Sentry.GCPFunction.wrapHttpFunction(
|
||||
token: claims.token,
|
||||
since,
|
||||
offset,
|
||||
state,
|
||||
state: stateToImport,
|
||||
})
|
||||
syncedAt = retrieved.since || Date.now()
|
||||
retrievedData = retrieved.data
|
||||
|
||||
@ -57,7 +57,7 @@ export const updateIntegration = async (
|
||||
token: integrationToken,
|
||||
enabled: true,
|
||||
type,
|
||||
taskName,
|
||||
// taskName, // TODO: remove this
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
@ -1,16 +1,18 @@
|
||||
import { Item } from '../item'
|
||||
|
||||
export enum State {
|
||||
SUCCEEDED = 'SUCCEEDED',
|
||||
ARCHIVED = 'ARCHIVED',
|
||||
UNREAD = 'UNREAD',
|
||||
UNARCHIVED = 'UNARCHIVED',
|
||||
DELETED = 'DELETED',
|
||||
ALL = 'ALL',
|
||||
}
|
||||
|
||||
export interface RetrievedData {
|
||||
url: string
|
||||
labels?: string[]
|
||||
state?: string
|
||||
state?: State
|
||||
}
|
||||
export interface RetrievedResult {
|
||||
data: RetrievedData[]
|
||||
|
||||
@ -122,26 +122,18 @@ export class PocketClient extends IntegrationClient {
|
||||
}
|
||||
|
||||
const pocketItems = Object.values(pocketData.list)
|
||||
const statusToState: Record<string, string> = {
|
||||
'0': 'SUCCEEDED',
|
||||
'1': 'ARCHIVED',
|
||||
'2': 'DELETED',
|
||||
const statusToState: Record<string, State> = {
|
||||
'0': State.SUCCEEDED,
|
||||
'1': State.ARCHIVED,
|
||||
'2': State.DELETED,
|
||||
}
|
||||
const data = pocketItems
|
||||
.map((item) => ({
|
||||
url: item.given_url,
|
||||
labels: item.tags
|
||||
? Object.values(item.tags).map((tag) => tag.tag)
|
||||
: undefined,
|
||||
state: statusToState[item.status],
|
||||
}))
|
||||
.filter((item) => {
|
||||
if (item.state === 'DELETED') {
|
||||
return false
|
||||
}
|
||||
|
||||
return state !== State.UNARCHIVED || item.state !== 'ARCHIVED'
|
||||
})
|
||||
const data = pocketItems.map((item) => ({
|
||||
url: item.given_url,
|
||||
labels: item.tags
|
||||
? Object.values(item.tags).map((tag) => tag.tag)
|
||||
: undefined,
|
||||
state: statusToState[item.status],
|
||||
}))
|
||||
|
||||
if (pocketData.error) {
|
||||
throw new Error(`Error retrieving pocket data: ${pocketData.error}`)
|
||||
|
||||
Reference in New Issue
Block a user