diff --git a/packages/integration-handler/package.json b/packages/integration-handler/package.json index c25454038..cf8a5fe0b 100644 --- a/packages/integration-handler/package.json +++ b/packages/integration-handler/package.json @@ -13,41 +13,25 @@ "lint": "eslint src --ext ts,js,tsx,jsx", "compile": "tsc", "build": "tsc", - "start_exporter": "functions-framework --target=exporter" + "start_exporter": "functions-framework --target=exporter", + "start_importer": "functions-framework --target=importer" }, "devDependencies": { "@types/chai": "^4.3.4", - "@types/chai-string": "^1.4.2", - "@types/dompurify": "^2.4.0", - "@types/fs-extra": "^11.0.1", - "@types/glob": "^8.0.1", "@types/jsonwebtoken": "^8.5.0", "@types/mocha": "^10.0.1", "@types/node": "^14.11.2", - "@types/unzip-stream": "^0.3.1", - "@types/urlsafe-base64": "^1.0.28", "@types/uuid": "^9.0.0", - "copyfiles": "^2.4.1", "eslint-plugin-prettier": "^4.0.0" }, "dependencies": { - "@fast-csv/parse": "^4.3.6", "@google-cloud/functions-framework": "3.1.2", "@google-cloud/storage": "^7.0.1", - "@google-cloud/tasks": "^4.0.0", - "@omnivore/readability": "1.0.0", "@sentry/serverless": "^7.30.0", - "@types/express": "^4.17.13", "axios": "^1.2.2", - "dompurify": "^2.4.3", - "fs-extra": "^11.1.0", - "glob": "^8.1.0", + "csv-stringify": "^6.4.0", "jsonwebtoken": "^8.5.1", - "linkedom": "^0.14.21", - "nodemon": "^2.0.15", - "redis": "^4.3.1", - "unzip-stream": "^0.3.1", - "urlsafe-base64": "^1.0.0", + "luxon": "^3.2.1", "uuid": "^9.0.0" }, "volta": { diff --git a/packages/integration-handler/src/index.ts b/packages/integration-handler/src/index.ts index 349c7c65d..73a3c23e2 100644 --- a/packages/integration-handler/src/index.ts +++ b/packages/integration-handler/src/index.ts @@ -2,8 +2,12 @@ import * as Sentry from '@sentry/serverless' import * as jwt from 'jsonwebtoken' import { getIntegrationClient, updateIntegration } from './integrations' import { search } from './item' +import { stringify } from 'csv-stringify' +import { DateTime } from 'luxon' +import { v4 as uuidv4 } from 'uuid' +import { File, Storage } from '@google-cloud/storage' -interface ExportRequest { +interface IntegrationRequest { integrationId: string syncAt: number // unix timestamp in milliseconds integrationName: string @@ -19,18 +23,24 @@ Sentry.GCPFunction.init({ tracesSampleRate: 0, }) +const storage = new Storage() + export const wait = (ms: number): Promise => { return new Promise((resolve) => { setTimeout(resolve, ms) }) } -function isExportRequest(body: any): body is ExportRequest { +function isIntegrationRequest(body: any): body is IntegrationRequest { return ( 'integrationId' in body && 'syncAt' in body && 'integrationName' in body ) } +const createGCSFile = (bucket: string, filename: string): File => { + return storage.bucket(bucket).file(filename) +} + export const exporter = Sentry.GCPFunction.wrapHttpFunction( async (req, res) => { const JWT_SECRET = process.env.JWT_SECRET @@ -40,7 +50,8 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction( return res.status(500).send('Environment not configured correctly') } - const token = (req.query.token || req.headers.authorization) as string + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + const token = (req.cookies?.token || req.headers.authorization) as string if (!token) { return res.status(401).send({ errorCode: 'INVALID_TOKEN' }) } @@ -54,7 +65,7 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction( } try { - if (!isExportRequest(req.body)) { + if (!isIntegrationRequest(req.body)) { console.error('Invalid message') return res.status(200).send('Bad Request') } @@ -125,3 +136,118 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction( res.sendStatus(200) } ) + +export const importer = Sentry.GCPFunction.wrapHttpFunction( + async (req, res) => { + const JWT_SECRET = process.env.JWT_SECRET + const REST_BACKEND_ENDPOINT = process.env.REST_BACKEND_ENDPOINT + const GCS_BUCKET = process.env.GCS_BUCKET + + if (!JWT_SECRET || !REST_BACKEND_ENDPOINT || !GCS_BUCKET) { + return res.status(500).send('Environment not configured correctly') + } + + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + const token = (req.cookies?.token || req.headers.authorization) as string + if (!token) { + return res.status(401).send({ errorCode: 'INVALID_TOKEN' }) + } + + let claims: Claims + try { + claims = jwt.verify(token, JWT_SECRET) as Claims + } catch (e) { + console.error(e) + return res.status(401).send('UNAUTHORIZED') + } + + if (!isIntegrationRequest(req.body)) { + console.error('Invalid message') + return res.status(200).send('Bad Request') + } + + let writeStream: NodeJS.WritableStream | undefined + try { + const userId = claims.uid + const integrationClient = getIntegrationClient(req.body.integrationName) + + let offset = 0 + let syncedAt = req.body.syncAt + const since = syncedAt + + // get pages from integration + const retrieved = await integrationClient.retrieve({ + token: claims.token, + since, + offset, + }) + syncedAt = retrieved.since || Date.now() + + let retrievedData = retrieved.data + // if there are pages to import + if (retrievedData.length > 0) { + // write the list of urls to a csv file and upload it to gcs + // path style: imports///-.csv + const dateStr = DateTime.now().toISODate() + const fileUuid = uuidv4() + const fullPath = `imports/${userId}/${dateStr}/${integrationClient.name}-${fileUuid}.csv` + // open a write_stream to the file + const file = createGCSFile(GCS_BUCKET, fullPath) + writeStream = file.createWriteStream({ + contentType: 'text/csv', + }) + // stringify the data and pipe it to the write_stream + const stringifier = stringify({ + header: true, + columns: ['url', 'state', 'labels'], + }) + stringifier.pipe(writeStream) + + // paginate api calls to the integration + do { + // write the list of urls, state and labels to the stream + retrievedData.forEach((row) => stringifier.write(row)) + + // get next pages from the integration + offset += retrievedData.length + + const retrieved = await integrationClient.retrieve({ + token: claims.token, + since, + offset, + }) + syncedAt = retrieved.since || Date.now() + retrievedData = retrieved.data + + console.log('retrieved data', { + total: offset, + size: retrievedData.length, + }) + + // update the integration's syncedAt and remove taskName + const result = await updateIntegration( + REST_BACKEND_ENDPOINT, + req.body.integrationId, + new Date(syncedAt), + req.body.integrationName, + claims.token, + token + ) + if (!result) { + console.error('failed to update integration', { + integrationId: req.body.integrationId, + }) + return res.status(400).send('Failed to update integration') + } + } while (retrievedData.length > 0 && offset < 20000) // limit to 20k pages + } + } catch (err) { + console.error('import pages from integration failed', err) + return res.status(500).send(err) + } finally { + writeStream?.end() + } + + res.sendStatus(200) + } +)