From 0d8a3f734654e8e5d1516f1191b06f7d2f81f234 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Fri, 16 Jun 2023 18:11:31 +0800 Subject: [PATCH 1/2] fix pocket paginated api --- packages/api/src/routers/svc/integrations.ts | 19 ++++++++++++------- .../api/src/services/integrations/pocket.ts | 6 +++++- packages/api/src/utils/createTask.ts | 12 +++++++++++- 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/packages/api/src/routers/svc/integrations.ts b/packages/api/src/routers/svc/integrations.ts index bb891750c..2b115c5ff 100644 --- a/packages/api/src/routers/svc/integrations.ts +++ b/packages/api/src/routers/svc/integrations.ts @@ -228,16 +228,19 @@ export function integrationsServiceRouter() { }) stringifier.pipe(writeStream) - let hasMore = true let offset = 0 - let since = integration.syncedAt?.getTime() || 0 - while (hasMore) { + const since = integration.syncedAt?.getTime() || 0 + let syncedAt = since + // eslint-disable-next-line no-constant-condition + while (true) { // get pages from integration const retrieved = await integrationService.retrieve({ token: integration.token, since, - offset: offset, + offset, }) + syncedAt = retrieved.since || Date.now() + const retrievedData = retrieved.data if (retrievedData.length === 0) { break @@ -245,13 +248,15 @@ export function integrationsServiceRouter() { // write the list of urls, state and labels to the stream retrievedData.forEach((row) => stringifier.write(row)) - hasMore = !!retrieved.hasMore offset += retrievedData.length - since = retrieved.since || Date.now() + console.debug('retrieved data', { + total: offset, + size: retrievedData.length, + }) } // update the integration's syncedAt await getRepository(Integration).update(integration.id, { - syncedAt: new Date(since), + syncedAt: new Date(syncedAt), }) } catch (err) { logger.error('import pages from integration failed', err) diff --git a/packages/api/src/services/integrations/pocket.ts b/packages/api/src/services/integrations/pocket.ts index c2a269cbe..ff3373ba4 100644 --- a/packages/api/src/services/integrations/pocket.ts +++ b/packages/api/src/services/integrations/pocket.ts @@ -135,9 +135,13 @@ export class PocketIntegration extends IntegrationService { : undefined, state: statusToState[item.status], })) + + if (pocketData.error) { + throw new Error(`Error retrieving pocket data: ${pocketData.error}`) + } + return { data, - hasMore: pocketData.complete !== 1, since: pocketData.since * 1000, } } diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index c6cbe328c..939eacc5b 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -480,7 +480,17 @@ export const enqueueImportFromIntegration = async ( } // If there is no Google Cloud Project Id exposed, it means that we are in local environment if (env.dev.isLocal || !GOOGLE_CLOUD_PROJECT) { - return nanoid() + // Calling the handler function directly. + setTimeout(() => { + axios + .post(`${env.queue.integrationTaskHandlerUrl}/import`, payload, { + headers, + }) + .catch((error) => { + console.error(error) + }) + }, 0) + return '' } const createdTasks = await createHttpTaskWithToken({ From 43b2789e471f7cce48ddd79db169298d1f35a22e Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Fri, 16 Jun 2023 18:54:42 +0800 Subject: [PATCH 2/2] fix test --- .../api/test/routers/integrations.test.ts | 56 +++++++++++-------- 1 file changed, 32 insertions(+), 24 deletions(-) diff --git a/packages/api/test/routers/integrations.test.ts b/packages/api/test/routers/integrations.test.ts index 458e97d47..cef2d1116 100644 --- a/packages/api/test/routers/integrations.test.ts +++ b/packages/api/test/routers/integrations.test.ts @@ -1,30 +1,30 @@ -import 'mocha' -import { createTestElasticPage, request } from '../util' +import { Storage } from '@google-cloud/storage' import { expect } from 'chai' import { DateTime } from 'luxon' +import 'mocha' +import nock from 'nock' +import sinon from 'sinon' import { createPubSubClient, PubSubRequestBody, } from '../../src/datalayer/pubsub' -import { User } from '../../src/entity/user' -import { createTestUser, deleteTestIntegrations, deleteTestUser } from '../db' -import { Integration, IntegrationType } from '../../src/entity/integration' -import { getRepository } from '../../src/entity/utils' +import { addHighlightToPage } from '../../src/elastic/highlights' +import { deletePage } from '../../src/elastic/pages' import { Highlight, HighlightType, Page, PageContext, } from '../../src/elastic/types' -import nock from 'nock' -import { addHighlightToPage } from '../../src/elastic/highlights' -import { getHighlightUrl } from '../../src/services/highlights' -import { deletePage } from '../../src/elastic/pages' -import { READWISE_API_URL } from '../../src/services/integrations/readwise' -import sinon from 'sinon' -import { Storage } from '@google-cloud/storage' -import { MockBucket } from '../mock_storage' +import { Integration, IntegrationType } from '../../src/entity/integration' +import { User } from '../../src/entity/user' +import { getRepository } from '../../src/entity/utils' import { env } from '../../src/env' +import { getHighlightUrl } from '../../src/services/highlights' +import { READWISE_API_URL } from '../../src/services/integrations/readwise' +import { createTestUser, deleteTestIntegrations, deleteTestUser } from '../db' +import { MockBucket } from '../mock_storage' +import { createTestElasticPage, request } from '../util' describe('Integrations routers', () => { const baseUrl = '/svc/pubsub/integrations' @@ -349,22 +349,23 @@ describe('Integrations routers', () => { }) // mock Pocket API + const reqBody = { + access_token: token, + consumer_key: env.pocket.consumerKey, + state: 'all', + detailType: 'complete', + since: 0, + sort: 'oldest', + count: 100, + offset: 0, + } nock('https://getpocket.com', { reqheaders: { 'content-type': 'application/json', 'x-accept': 'application/json', }, }) - .post('/v3/get', { - access_token: token, - consumer_key: env.pocket.consumerKey, - state: 'all', - detailType: 'complete', - since: 0, - sort: 'oldest', - count: 100, - offset: 0, - }) + .post('/v3/get', reqBody) .reply(200, { complete: 1, list: { @@ -383,6 +384,13 @@ describe('Integrations routers', () => { }, since: Date.now() / 1000, }) + .post('/v3/get', { + ...reqBody, + offset: 1, + }) + .reply(200, { + list: {}, + }) // mock cloud storage const mockBucket = new MockBucket('test')