Merge pull request #2375 from omnivore-app/fix/pocket-importer
fix pocket paginated api
This commit is contained in:
@ -228,16 +228,19 @@ export function integrationsServiceRouter() {
|
||||
})
|
||||
stringifier.pipe(writeStream)
|
||||
|
||||
let hasMore = true
|
||||
let offset = 0
|
||||
let since = integration.syncedAt?.getTime() || 0
|
||||
while (hasMore) {
|
||||
const since = integration.syncedAt?.getTime() || 0
|
||||
let syncedAt = since
|
||||
// eslint-disable-next-line no-constant-condition
|
||||
while (true) {
|
||||
// get pages from integration
|
||||
const retrieved = await integrationService.retrieve({
|
||||
token: integration.token,
|
||||
since,
|
||||
offset: offset,
|
||||
offset,
|
||||
})
|
||||
syncedAt = retrieved.since || Date.now()
|
||||
|
||||
const retrievedData = retrieved.data
|
||||
if (retrievedData.length === 0) {
|
||||
break
|
||||
@ -245,13 +248,15 @@ export function integrationsServiceRouter() {
|
||||
// write the list of urls, state and labels to the stream
|
||||
retrievedData.forEach((row) => stringifier.write(row))
|
||||
|
||||
hasMore = !!retrieved.hasMore
|
||||
offset += retrievedData.length
|
||||
since = retrieved.since || Date.now()
|
||||
console.debug('retrieved data', {
|
||||
total: offset,
|
||||
size: retrievedData.length,
|
||||
})
|
||||
}
|
||||
// update the integration's syncedAt
|
||||
await getRepository(Integration).update(integration.id, {
|
||||
syncedAt: new Date(since),
|
||||
syncedAt: new Date(syncedAt),
|
||||
})
|
||||
} catch (err) {
|
||||
logger.error('import pages from integration failed', err)
|
||||
|
||||
@ -135,9 +135,13 @@ export class PocketIntegration extends IntegrationService {
|
||||
: undefined,
|
||||
state: statusToState[item.status],
|
||||
}))
|
||||
|
||||
if (pocketData.error) {
|
||||
throw new Error(`Error retrieving pocket data: ${pocketData.error}`)
|
||||
}
|
||||
|
||||
return {
|
||||
data,
|
||||
hasMore: pocketData.complete !== 1,
|
||||
since: pocketData.since * 1000,
|
||||
}
|
||||
}
|
||||
|
||||
@ -480,7 +480,17 @@ export const enqueueImportFromIntegration = async (
|
||||
}
|
||||
// If there is no Google Cloud Project Id exposed, it means that we are in local environment
|
||||
if (env.dev.isLocal || !GOOGLE_CLOUD_PROJECT) {
|
||||
return nanoid()
|
||||
// Calling the handler function directly.
|
||||
setTimeout(() => {
|
||||
axios
|
||||
.post(`${env.queue.integrationTaskHandlerUrl}/import`, payload, {
|
||||
headers,
|
||||
})
|
||||
.catch((error) => {
|
||||
console.error(error)
|
||||
})
|
||||
}, 0)
|
||||
return ''
|
||||
}
|
||||
|
||||
const createdTasks = await createHttpTaskWithToken({
|
||||
|
||||
@ -1,30 +1,30 @@
|
||||
import 'mocha'
|
||||
import { createTestElasticPage, request } from '../util'
|
||||
import { Storage } from '@google-cloud/storage'
|
||||
import { expect } from 'chai'
|
||||
import { DateTime } from 'luxon'
|
||||
import 'mocha'
|
||||
import nock from 'nock'
|
||||
import sinon from 'sinon'
|
||||
import {
|
||||
createPubSubClient,
|
||||
PubSubRequestBody,
|
||||
} from '../../src/datalayer/pubsub'
|
||||
import { User } from '../../src/entity/user'
|
||||
import { createTestUser, deleteTestIntegrations, deleteTestUser } from '../db'
|
||||
import { Integration, IntegrationType } from '../../src/entity/integration'
|
||||
import { getRepository } from '../../src/entity/utils'
|
||||
import { addHighlightToPage } from '../../src/elastic/highlights'
|
||||
import { deletePage } from '../../src/elastic/pages'
|
||||
import {
|
||||
Highlight,
|
||||
HighlightType,
|
||||
Page,
|
||||
PageContext,
|
||||
} from '../../src/elastic/types'
|
||||
import nock from 'nock'
|
||||
import { addHighlightToPage } from '../../src/elastic/highlights'
|
||||
import { getHighlightUrl } from '../../src/services/highlights'
|
||||
import { deletePage } from '../../src/elastic/pages'
|
||||
import { READWISE_API_URL } from '../../src/services/integrations/readwise'
|
||||
import sinon from 'sinon'
|
||||
import { Storage } from '@google-cloud/storage'
|
||||
import { MockBucket } from '../mock_storage'
|
||||
import { Integration, IntegrationType } from '../../src/entity/integration'
|
||||
import { User } from '../../src/entity/user'
|
||||
import { getRepository } from '../../src/entity/utils'
|
||||
import { env } from '../../src/env'
|
||||
import { getHighlightUrl } from '../../src/services/highlights'
|
||||
import { READWISE_API_URL } from '../../src/services/integrations/readwise'
|
||||
import { createTestUser, deleteTestIntegrations, deleteTestUser } from '../db'
|
||||
import { MockBucket } from '../mock_storage'
|
||||
import { createTestElasticPage, request } from '../util'
|
||||
|
||||
describe('Integrations routers', () => {
|
||||
const baseUrl = '/svc/pubsub/integrations'
|
||||
@ -349,22 +349,23 @@ describe('Integrations routers', () => {
|
||||
})
|
||||
|
||||
// mock Pocket API
|
||||
const reqBody = {
|
||||
access_token: token,
|
||||
consumer_key: env.pocket.consumerKey,
|
||||
state: 'all',
|
||||
detailType: 'complete',
|
||||
since: 0,
|
||||
sort: 'oldest',
|
||||
count: 100,
|
||||
offset: 0,
|
||||
}
|
||||
nock('https://getpocket.com', {
|
||||
reqheaders: {
|
||||
'content-type': 'application/json',
|
||||
'x-accept': 'application/json',
|
||||
},
|
||||
})
|
||||
.post('/v3/get', {
|
||||
access_token: token,
|
||||
consumer_key: env.pocket.consumerKey,
|
||||
state: 'all',
|
||||
detailType: 'complete',
|
||||
since: 0,
|
||||
sort: 'oldest',
|
||||
count: 100,
|
||||
offset: 0,
|
||||
})
|
||||
.post('/v3/get', reqBody)
|
||||
.reply(200, {
|
||||
complete: 1,
|
||||
list: {
|
||||
@ -383,6 +384,13 @@ describe('Integrations routers', () => {
|
||||
},
|
||||
since: Date.now() / 1000,
|
||||
})
|
||||
.post('/v3/get', {
|
||||
...reqBody,
|
||||
offset: 1,
|
||||
})
|
||||
.reply(200, {
|
||||
list: {},
|
||||
})
|
||||
|
||||
// mock cloud storage
|
||||
const mockBucket = new MockBucket('test')
|
||||
|
||||
Reference in New Issue
Block a user