Refactor with factor pattern
This commit is contained in:
@ -17,7 +17,7 @@ import { User } from '../../entity/user'
|
||||
import { Integration } from '../../entity/integration'
|
||||
import { analytics } from '../../utils/analytics'
|
||||
import { env } from '../../env'
|
||||
import { validateToken } from '../../services/integrations'
|
||||
import { getIntegrationService } from '../../services/integrations'
|
||||
import { deleteTask, enqueueSyncWithIntegration } from '../../utils/createTask'
|
||||
|
||||
export const setIntegrationResolver = authorized<
|
||||
@ -64,8 +64,9 @@ export const setIntegrationResolver = authorized<
|
||||
}
|
||||
} else {
|
||||
// Create
|
||||
const integrationService = getIntegrationService(input.name)
|
||||
// validate token
|
||||
if (!(await validateToken(input.token, input.name))) {
|
||||
if (!(await integrationService.validateToken(input.token))) {
|
||||
return {
|
||||
errorCodes: [SetIntegrationErrorCode.InvalidToken],
|
||||
}
|
||||
|
||||
@ -65,6 +65,7 @@ export function integrationsServiceRouter() {
|
||||
}
|
||||
|
||||
const action = req.params.action.toUpperCase()
|
||||
const integrationService = getIntegrationService(integration.name)
|
||||
if (action === 'SYNC_UPDATED') {
|
||||
// get updated page by id
|
||||
let id: string | undefined
|
||||
@ -100,7 +101,7 @@ export function integrationsServiceRouter() {
|
||||
pageId: page.id,
|
||||
})
|
||||
|
||||
const synced = await syncWithIntegration(integration, [page])
|
||||
const synced = await integrationService.exportPages(integration, [page])
|
||||
if (!synced) {
|
||||
logger.info('failed to sync page', {
|
||||
integrationId: integration.id,
|
||||
@ -131,7 +132,10 @@ export function integrationsServiceRouter() {
|
||||
|
||||
logger.info('syncing pages', { pageIds })
|
||||
|
||||
const synced = await syncWithIntegration(integration, pages)
|
||||
const synced = await integrationService.exportPages(
|
||||
integration,
|
||||
pages
|
||||
)
|
||||
if (!synced) {
|
||||
logger.info('failed to sync pages', {
|
||||
pageIds,
|
||||
|
||||
@ -1,169 +0,0 @@
|
||||
import { env } from '../env'
|
||||
import axios from 'axios'
|
||||
import { wait } from '../utils/helpers'
|
||||
import { HighlightType, Page } from '../elastic/types'
|
||||
import { getHighlightUrl } from './highlights'
|
||||
import { Integration } from '../entity/integration'
|
||||
import { getRepository } from '../entity/utils'
|
||||
|
||||
interface ReadwiseHighlight {
|
||||
// The highlight text, (technically the only field required in a highlight object)
|
||||
text: string
|
||||
// The title of the page the highlight is on
|
||||
title?: string
|
||||
// The author of the page the highlight is on
|
||||
author?: string
|
||||
// The URL of the page image
|
||||
image_url?: string
|
||||
// The URL of the page
|
||||
source_url?: string
|
||||
// A meaningful unique identifier for your app
|
||||
source_type?: string
|
||||
// One of: books, articles, tweets or podcasts
|
||||
category?: string
|
||||
// Annotation note attached to the specific highlight
|
||||
note?: string
|
||||
// Highlight's location in the source text. Used to order the highlights
|
||||
location?: number
|
||||
// One of: page, order or time_offset
|
||||
location_type?: string
|
||||
// A datetime representing when the highlight was taken in the ISO 8601 format
|
||||
highlighted_at?: string
|
||||
// Unique url of the specific highlight
|
||||
highlight_url?: string
|
||||
}
|
||||
|
||||
export const READWISE_API_URL = 'https://readwise.io/api/v2'
|
||||
|
||||
export const validateToken = async (
|
||||
token: string,
|
||||
name: string
|
||||
): Promise<boolean> => {
|
||||
switch (name) {
|
||||
case 'READWISE':
|
||||
return validateReadwiseToken(token)
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
const validateReadwiseToken = async (token: string): Promise<boolean> => {
|
||||
const authUrl = `${env.readwise.apiUrl || READWISE_API_URL}/auth`
|
||||
try {
|
||||
const response = await axios.get(authUrl, {
|
||||
headers: {
|
||||
Authorization: `Token ${token}`,
|
||||
},
|
||||
})
|
||||
return response.status === 204
|
||||
} catch (error) {
|
||||
console.log('error validating readwise token', error)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
const pageToReadwiseHighlight = (page: Page): ReadwiseHighlight[] => {
|
||||
if (!page.highlights) return []
|
||||
const category = page.siteName === 'Twitter' ? 'tweets' : 'articles'
|
||||
return (
|
||||
page.highlights
|
||||
// filter out highlights with no quote and are not of type Highlight
|
||||
.filter(
|
||||
(highlight) =>
|
||||
highlight.type === HighlightType.Highlight && highlight.quote
|
||||
)
|
||||
.map((highlight) => {
|
||||
return {
|
||||
text: highlight.quote!,
|
||||
title: page.title,
|
||||
author: page.author || undefined,
|
||||
highlight_url: getHighlightUrl(page.slug, highlight.id),
|
||||
highlighted_at: new Date(highlight.createdAt).toISOString(),
|
||||
category,
|
||||
image_url: page.image || undefined,
|
||||
// location: highlight.highlightPositionAnchorIndex || undefined,
|
||||
location_type: 'order',
|
||||
note: highlight.annotation || undefined,
|
||||
source_type: 'omnivore',
|
||||
source_url: page.url,
|
||||
}
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
export const syncWithIntegration = async (
|
||||
integration: Integration,
|
||||
pages: Page[]
|
||||
): Promise<boolean> => {
|
||||
let result = true
|
||||
switch (integration.name) {
|
||||
case 'READWISE': {
|
||||
const highlights = pages.flatMap(pageToReadwiseHighlight)
|
||||
// If there are no highlights, we will skip the sync
|
||||
if (highlights.length > 0) {
|
||||
result = await syncWithReadwise(integration.token, highlights)
|
||||
}
|
||||
break
|
||||
}
|
||||
default:
|
||||
return false
|
||||
}
|
||||
// update integration syncedAt if successful
|
||||
if (result) {
|
||||
console.log('updating integration syncedAt')
|
||||
await getRepository(Integration).update(integration.id, {
|
||||
syncedAt: new Date(),
|
||||
})
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
export const syncWithReadwise = async (
|
||||
token: string,
|
||||
highlights: ReadwiseHighlight[],
|
||||
retryCount = 0
|
||||
): Promise<boolean> => {
|
||||
const url = `${env.readwise.apiUrl || READWISE_API_URL}/highlights`
|
||||
try {
|
||||
const response = await axios.post(
|
||||
url,
|
||||
{
|
||||
highlights,
|
||||
},
|
||||
{
|
||||
headers: {
|
||||
Authorization: `Token ${token}`,
|
||||
ContentType: 'application/json',
|
||||
},
|
||||
}
|
||||
)
|
||||
return response.status === 200
|
||||
} catch (error) {
|
||||
if (axios.isAxiosError(error)) {
|
||||
if (error.response) {
|
||||
if (error.response.status === 429 && retryCount < 3) {
|
||||
console.log('Readwise API rate limit exceeded, retrying...')
|
||||
// wait for Retry-After seconds in the header if rate limited
|
||||
// max retry count is 3
|
||||
const retryAfter = error.response?.headers['retry-after'] || '10' // default to 10 seconds
|
||||
await wait(parseInt(retryAfter, 10) * 1000)
|
||||
return syncWithReadwise(token, highlights, retryCount + 1)
|
||||
}
|
||||
// The request was made and the server responded with a status code
|
||||
// that falls out of the range of 2xx
|
||||
console.error('Readwise error, response data', error.response.data)
|
||||
} else if (error.request) {
|
||||
// The request was made but no response was received
|
||||
// `error.request` is an instance of XMLHttpRequest in the browser and an instance of
|
||||
// http.ClientRequest in node.js
|
||||
console.error('Readwise error, request', error.request)
|
||||
} else {
|
||||
// Something happened in setting up the request that triggered an Error
|
||||
console.error('Error', error.message)
|
||||
}
|
||||
} else {
|
||||
console.error('Error syncing with readwise', error)
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
12
packages/api/src/services/integrations/index.ts
Normal file
12
packages/api/src/services/integrations/index.ts
Normal file
@ -0,0 +1,12 @@
|
||||
import { ReadwiseIntegration } from './readwise'
|
||||
import { IntegrationService } from './integration'
|
||||
|
||||
const integrations: IntegrationService[] = [new ReadwiseIntegration()]
|
||||
|
||||
export const getIntegrationService = (name: string): IntegrationService => {
|
||||
const service = integrations.find((s) => s.name === name)
|
||||
if (!service) {
|
||||
throw new Error(`Integration service not found: ${name}`)
|
||||
}
|
||||
return service
|
||||
}
|
||||
22
packages/api/src/services/integrations/integration.ts
Normal file
22
packages/api/src/services/integrations/integration.ts
Normal file
@ -0,0 +1,22 @@
|
||||
import { Integration } from '../../entity/integration'
|
||||
import { Page } from '../../elastic/types'
|
||||
|
||||
export abstract class IntegrationService {
|
||||
abstract name: string
|
||||
|
||||
validateToken = async (token: string): Promise<boolean> => {
|
||||
return Promise.resolve(true)
|
||||
}
|
||||
exportPages = async (
|
||||
integration: Integration,
|
||||
pages: Page[]
|
||||
): Promise<boolean> => {
|
||||
return Promise.resolve(true)
|
||||
}
|
||||
importPages = async (
|
||||
integration: Integration,
|
||||
pages: Page[]
|
||||
): Promise<boolean> => {
|
||||
return Promise.resolve(true)
|
||||
}
|
||||
}
|
||||
5
packages/api/src/services/integrations/pocket.ts
Normal file
5
packages/api/src/services/integrations/pocket.ts
Normal file
@ -0,0 +1,5 @@
|
||||
import { IntegrationService } from './integration'
|
||||
|
||||
export class PocketIntegration extends IntegrationService {
|
||||
name = 'POCKET'
|
||||
}
|
||||
134
packages/api/src/services/integrations/readwise.ts
Normal file
134
packages/api/src/services/integrations/readwise.ts
Normal file
@ -0,0 +1,134 @@
|
||||
import { env } from '../../env'
|
||||
import axios from 'axios'
|
||||
import { Page } from '../../elastic/types'
|
||||
import { getHighlightUrl } from '../highlights'
|
||||
import { getRepository } from '../../entity/utils'
|
||||
import { Integration } from '../../entity/integration'
|
||||
import { wait } from '../../utils/helpers'
|
||||
import { IntegrationService } from './integration'
|
||||
|
||||
interface ReadwiseHighlight {
|
||||
// The highlight text, (technically the only field required in a highlight object)
|
||||
text: string
|
||||
// The title of the page the highlight is on
|
||||
title?: string
|
||||
// The author of the page the highlight is on
|
||||
author?: string
|
||||
// The URL of the page image
|
||||
image_url?: string
|
||||
// The URL of the page
|
||||
source_url?: string
|
||||
// A meaningful unique identifier for your app
|
||||
source_type?: string
|
||||
// One of: books, articles, tweets or podcasts
|
||||
category?: string
|
||||
// Annotation note attached to the specific highlight
|
||||
note?: string
|
||||
// Highlight's location in the source text. Used to order the highlights
|
||||
location?: number
|
||||
// One of: page, order or time_offset
|
||||
location_type?: string
|
||||
// A datetime representing when the highlight was taken in the ISO 8601 format
|
||||
highlighted_at?: string
|
||||
// Unique url of the specific highlight
|
||||
highlight_url?: string
|
||||
}
|
||||
|
||||
export const READWISE_API_URL = 'https://readwise.io/api/v2'
|
||||
|
||||
export class ReadwiseIntegration extends IntegrationService {
|
||||
name = 'READWISE'
|
||||
validateToken = async (token: string): Promise<boolean> => {
|
||||
const authUrl = `${env.readwise.apiUrl || READWISE_API_URL}/auth`
|
||||
try {
|
||||
const response = await axios.get(authUrl, {
|
||||
headers: {
|
||||
Authorization: `Token ${token}`,
|
||||
},
|
||||
})
|
||||
return response.status === 204
|
||||
} catch (error) {
|
||||
console.log('error validating readwise token', error)
|
||||
return false
|
||||
}
|
||||
}
|
||||
exportPages = async (
|
||||
integration: Integration,
|
||||
pages: Page[]
|
||||
): Promise<boolean> => {
|
||||
let result = true
|
||||
|
||||
const highlights = pages.flatMap(this.pageToReadwiseHighlight)
|
||||
// If there are no highlights, we will skip the sync
|
||||
if (highlights.length > 0) {
|
||||
result = await this.syncWithReadwise(integration.token, highlights)
|
||||
}
|
||||
|
||||
// update integration syncedAt if successful
|
||||
if (result) {
|
||||
console.log('updating integration syncedAt')
|
||||
await getRepository(Integration).update(integration.id, {
|
||||
syncedAt: new Date(),
|
||||
})
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
pageToReadwiseHighlight = (page: Page): ReadwiseHighlight[] => {
|
||||
if (!page.highlights) return []
|
||||
return page.highlights.map((highlight) => {
|
||||
return {
|
||||
text: highlight.quote,
|
||||
title: page.title,
|
||||
author: page.author || undefined,
|
||||
highlight_url: getHighlightUrl(page.slug, highlight.id),
|
||||
highlighted_at: new Date(highlight.createdAt).toISOString(),
|
||||
category: 'articles',
|
||||
image_url: page.image || undefined,
|
||||
location: highlight.highlightPositionPercent || undefined,
|
||||
location_type: 'order',
|
||||
note: highlight.annotation || undefined,
|
||||
source_type: 'omnivore',
|
||||
source_url: page.url,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
syncWithReadwise = async (
|
||||
token: string,
|
||||
highlights: ReadwiseHighlight[],
|
||||
retryCount = 0
|
||||
): Promise<boolean> => {
|
||||
const url = `${env.readwise.apiUrl || READWISE_API_URL}/highlights`
|
||||
try {
|
||||
const response = await axios.post(
|
||||
url,
|
||||
{
|
||||
highlights,
|
||||
},
|
||||
{
|
||||
headers: {
|
||||
Authorization: `Token ${token}`,
|
||||
ContentType: 'application/json',
|
||||
},
|
||||
}
|
||||
)
|
||||
return response.status === 200
|
||||
} catch (error) {
|
||||
if (
|
||||
axios.isAxiosError(error) &&
|
||||
error.response?.status === 429 &&
|
||||
retryCount < 3
|
||||
) {
|
||||
console.log('Readwise API rate limit exceeded, retrying...')
|
||||
// wait for Retry-After seconds in the header if rate limited
|
||||
// max retry count is 3
|
||||
const retryAfter = error.response?.headers['retry-after'] || '10' // default to 10 seconds
|
||||
await wait(parseInt(retryAfter, 10) * 1000)
|
||||
return this.syncWithReadwise(token, highlights, retryCount + 1)
|
||||
}
|
||||
console.log('Error creating highlights in Readwise', error)
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -279,7 +279,7 @@ export const enqueueReminder = async (
|
||||
|
||||
export const enqueueSyncWithIntegration = async (
|
||||
userId: string,
|
||||
integrationType: string
|
||||
integrationName: string
|
||||
): Promise<string> => {
|
||||
const { GOOGLE_CLOUD_PROJECT, PUBSUB_VERIFICATION_TOKEN } = process.env
|
||||
// use pubsub data format to send the userId to the task handler
|
||||
@ -304,7 +304,7 @@ export const enqueueSyncWithIntegration = async (
|
||||
payload,
|
||||
taskHandlerUrl: `${
|
||||
env.queue.integrationTaskHandlerUrl
|
||||
}/${integrationType.toLowerCase()}/sync_all?token=${PUBSUB_VERIFICATION_TOKEN}`,
|
||||
}/${integrationName.toLowerCase()}/sync_all?token=${PUBSUB_VERIFICATION_TOKEN}`,
|
||||
priority: 'low',
|
||||
})
|
||||
|
||||
|
||||
@ -7,7 +7,7 @@ import { expect } from 'chai'
|
||||
import { getRepository } from '../../src/entity/utils'
|
||||
import { Integration } from '../../src/entity/integration'
|
||||
import nock from 'nock'
|
||||
import { READWISE_API_URL } from '../../src/services/integrations'
|
||||
import { READWISE_API_URL } from '../../src/services/integrations/readwise'
|
||||
|
||||
describe('Integrations resolvers', () => {
|
||||
let loginUser: User
|
||||
@ -38,7 +38,7 @@ describe('Integrations resolvers', () => {
|
||||
mutation {
|
||||
setIntegration(input: {
|
||||
id: "${id}",
|
||||
name: ${name},
|
||||
name: "${name}",
|
||||
token: "${token}",
|
||||
enabled: ${enabled},
|
||||
}) {
|
||||
@ -68,6 +68,7 @@ describe('Integrations resolvers', () => {
|
||||
.get('/auth')
|
||||
.reply(204)
|
||||
.persist()
|
||||
integrationName = 'READWISE'
|
||||
})
|
||||
|
||||
after(() => {
|
||||
@ -79,80 +80,51 @@ describe('Integrations resolvers', () => {
|
||||
integrationId = ''
|
||||
})
|
||||
|
||||
context('when integration exists', () => {
|
||||
let existingIntegration: Integration
|
||||
|
||||
before(async () => {
|
||||
existingIntegration = await getRepository(Integration).save({
|
||||
user: { id: loginUser.id },
|
||||
name: 'READWISE',
|
||||
token: 'fakeToken',
|
||||
})
|
||||
integrationName = existingIntegration.name
|
||||
context('when token is invalid', () => {
|
||||
before(() => {
|
||||
token = 'invalid token'
|
||||
})
|
||||
|
||||
after(async () => {
|
||||
await deleteTestIntegrations(loginUser.id, [existingIntegration.id])
|
||||
})
|
||||
|
||||
it('returns AlreadyExists error code', async () => {
|
||||
it('returns InvalidToken error code', async () => {
|
||||
const res = await graphqlRequest(
|
||||
query(integrationId, integrationName),
|
||||
query(integrationId, integrationName, token),
|
||||
authToken
|
||||
)
|
||||
expect(res.body.data.setIntegration.errorCodes).to.eql([
|
||||
SetIntegrationErrorCode.AlreadyExists,
|
||||
SetIntegrationErrorCode.InvalidToken,
|
||||
])
|
||||
})
|
||||
})
|
||||
|
||||
context('when integration does not exist', () => {
|
||||
context('when token is invalid', () => {
|
||||
before(() => {
|
||||
token = 'invalid token'
|
||||
})
|
||||
context('when token is valid', () => {
|
||||
before(() => {
|
||||
token = validToken
|
||||
})
|
||||
|
||||
it('returns InvalidToken error code', async () => {
|
||||
const res = await graphqlRequest(
|
||||
query(integrationId, integrationName, token),
|
||||
authToken
|
||||
)
|
||||
expect(res.body.data.setIntegration.errorCodes).to.eql([
|
||||
SetIntegrationErrorCode.InvalidToken,
|
||||
])
|
||||
afterEach(async () => {
|
||||
await deleteTestIntegrations(loginUser.id, {
|
||||
user: { id: loginUser.id },
|
||||
name: integrationName,
|
||||
})
|
||||
})
|
||||
|
||||
context('when token is valid', () => {
|
||||
before(() => {
|
||||
token = validToken
|
||||
})
|
||||
it('creates new integration', async () => {
|
||||
const res = await graphqlRequest(
|
||||
query(integrationId, integrationName, token),
|
||||
authToken
|
||||
)
|
||||
expect(res.body.data.setIntegration.integration.enabled).to.be.true
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
await deleteTestIntegrations(loginUser.id, {
|
||||
user: { id: loginUser.id },
|
||||
name: integrationName,
|
||||
})
|
||||
})
|
||||
|
||||
it('creates new integration', async () => {
|
||||
const res = await graphqlRequest(
|
||||
query(integrationId, integrationName, token),
|
||||
authToken
|
||||
)
|
||||
expect(res.body.data.setIntegration.integration.enabled).to.be.true
|
||||
})
|
||||
|
||||
it('creates new cloud task to sync all existing articles and highlights', async () => {
|
||||
const res = await graphqlRequest(
|
||||
query(integrationId, integrationName, token),
|
||||
authToken
|
||||
)
|
||||
const integration = await getRepository(Integration).findOneBy({
|
||||
id: res.body.data.setIntegration.integration.id,
|
||||
})
|
||||
expect(integration?.taskName).not.to.be.null
|
||||
it('creates new cloud task to sync all existing articles and highlights', async () => {
|
||||
const res = await graphqlRequest(
|
||||
query(integrationId, integrationName, token),
|
||||
authToken
|
||||
)
|
||||
const integration = await getRepository(Integration).findOneBy({
|
||||
id: res.body.data.setIntegration.integration.id,
|
||||
})
|
||||
expect(integration?.taskName).not.to.be.null
|
||||
})
|
||||
})
|
||||
})
|
||||
@ -192,7 +164,7 @@ describe('Integrations resolvers', () => {
|
||||
|
||||
after(async () => {
|
||||
await deleteTestUser(otherUser.id)
|
||||
await deleteTestIntegrations(loginUser.id, [existingIntegration.id])
|
||||
await deleteTestIntegrations(otherUser.id, [existingIntegration.id])
|
||||
})
|
||||
|
||||
it('returns Unauthorized error code', async () => {
|
||||
|
||||
@ -17,10 +17,10 @@ import {
|
||||
PageContext,
|
||||
} from '../../src/elastic/types'
|
||||
import nock from 'nock'
|
||||
import { READWISE_API_URL } from '../../src/services/integrations'
|
||||
import { addHighlightToPage } from '../../src/elastic/highlights'
|
||||
import { getHighlightUrl } from '../../src/services/highlights'
|
||||
import { deletePage } from '../../src/elastic/pages'
|
||||
import { READWISE_API_URL } from '../../src/services/integrations/readwise'
|
||||
|
||||
describe('Integrations routers', () => {
|
||||
let token: string
|
||||
|
||||
Reference in New Issue
Block a user