create an integration handler for importing from pocket

This commit is contained in:
Hongbo Wu
2023-10-26 23:14:48 +08:00
parent 58d5eb6bc0
commit 5cc9474b73
2 changed files with 134 additions and 24 deletions

View File

@ -13,41 +13,25 @@
"lint": "eslint src --ext ts,js,tsx,jsx",
"compile": "tsc",
"build": "tsc",
"start_exporter": "functions-framework --target=exporter"
"start_exporter": "functions-framework --target=exporter",
"start_importer": "functions-framework --target=importer"
},
"devDependencies": {
"@types/chai": "^4.3.4",
"@types/chai-string": "^1.4.2",
"@types/dompurify": "^2.4.0",
"@types/fs-extra": "^11.0.1",
"@types/glob": "^8.0.1",
"@types/jsonwebtoken": "^8.5.0",
"@types/mocha": "^10.0.1",
"@types/node": "^14.11.2",
"@types/unzip-stream": "^0.3.1",
"@types/urlsafe-base64": "^1.0.28",
"@types/uuid": "^9.0.0",
"copyfiles": "^2.4.1",
"eslint-plugin-prettier": "^4.0.0"
},
"dependencies": {
"@fast-csv/parse": "^4.3.6",
"@google-cloud/functions-framework": "3.1.2",
"@google-cloud/storage": "^7.0.1",
"@google-cloud/tasks": "^4.0.0",
"@omnivore/readability": "1.0.0",
"@sentry/serverless": "^7.30.0",
"@types/express": "^4.17.13",
"axios": "^1.2.2",
"dompurify": "^2.4.3",
"fs-extra": "^11.1.0",
"glob": "^8.1.0",
"csv-stringify": "^6.4.0",
"jsonwebtoken": "^8.5.1",
"linkedom": "^0.14.21",
"nodemon": "^2.0.15",
"redis": "^4.3.1",
"unzip-stream": "^0.3.1",
"urlsafe-base64": "^1.0.0",
"luxon": "^3.2.1",
"uuid": "^9.0.0"
},
"volta": {

View File

@ -2,8 +2,12 @@ import * as Sentry from '@sentry/serverless'
import * as jwt from 'jsonwebtoken'
import { getIntegrationClient, updateIntegration } from './integrations'
import { search } from './item'
import { stringify } from 'csv-stringify'
import { DateTime } from 'luxon'
import { v4 as uuidv4 } from 'uuid'
import { File, Storage } from '@google-cloud/storage'
interface ExportRequest {
interface IntegrationRequest {
integrationId: string
syncAt: number // unix timestamp in milliseconds
integrationName: string
@ -19,18 +23,24 @@ Sentry.GCPFunction.init({
tracesSampleRate: 0,
})
const storage = new Storage()
export const wait = (ms: number): Promise<void> => {
return new Promise((resolve) => {
setTimeout(resolve, ms)
})
}
function isExportRequest(body: any): body is ExportRequest {
function isIntegrationRequest(body: any): body is IntegrationRequest {
return (
'integrationId' in body && 'syncAt' in body && 'integrationName' in body
)
}
const createGCSFile = (bucket: string, filename: string): File => {
return storage.bucket(bucket).file(filename)
}
export const exporter = Sentry.GCPFunction.wrapHttpFunction(
async (req, res) => {
const JWT_SECRET = process.env.JWT_SECRET
@ -40,7 +50,8 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction(
return res.status(500).send('Environment not configured correctly')
}
const token = (req.query.token || req.headers.authorization) as string
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
const token = (req.cookies?.token || req.headers.authorization) as string
if (!token) {
return res.status(401).send({ errorCode: 'INVALID_TOKEN' })
}
@ -54,7 +65,7 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction(
}
try {
if (!isExportRequest(req.body)) {
if (!isIntegrationRequest(req.body)) {
console.error('Invalid message')
return res.status(200).send('Bad Request')
}
@ -125,3 +136,118 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction(
res.sendStatus(200)
}
)
export const importer = Sentry.GCPFunction.wrapHttpFunction(
async (req, res) => {
const JWT_SECRET = process.env.JWT_SECRET
const REST_BACKEND_ENDPOINT = process.env.REST_BACKEND_ENDPOINT
const GCS_BUCKET = process.env.GCS_BUCKET
if (!JWT_SECRET || !REST_BACKEND_ENDPOINT || !GCS_BUCKET) {
return res.status(500).send('Environment not configured correctly')
}
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
const token = (req.cookies?.token || req.headers.authorization) as string
if (!token) {
return res.status(401).send({ errorCode: 'INVALID_TOKEN' })
}
let claims: Claims
try {
claims = jwt.verify(token, JWT_SECRET) as Claims
} catch (e) {
console.error(e)
return res.status(401).send('UNAUTHORIZED')
}
if (!isIntegrationRequest(req.body)) {
console.error('Invalid message')
return res.status(200).send('Bad Request')
}
let writeStream: NodeJS.WritableStream | undefined
try {
const userId = claims.uid
const integrationClient = getIntegrationClient(req.body.integrationName)
let offset = 0
let syncedAt = req.body.syncAt
const since = syncedAt
// get pages from integration
const retrieved = await integrationClient.retrieve({
token: claims.token,
since,
offset,
})
syncedAt = retrieved.since || Date.now()
let retrievedData = retrieved.data
// if there are pages to import
if (retrievedData.length > 0) {
// write the list of urls to a csv file and upload it to gcs
// path style: imports/<uid>/<date>/<type>-<uuid>.csv
const dateStr = DateTime.now().toISODate()
const fileUuid = uuidv4()
const fullPath = `imports/${userId}/${dateStr}/${integrationClient.name}-${fileUuid}.csv`
// open a write_stream to the file
const file = createGCSFile(GCS_BUCKET, fullPath)
writeStream = file.createWriteStream({
contentType: 'text/csv',
})
// stringify the data and pipe it to the write_stream
const stringifier = stringify({
header: true,
columns: ['url', 'state', 'labels'],
})
stringifier.pipe(writeStream)
// paginate api calls to the integration
do {
// write the list of urls, state and labels to the stream
retrievedData.forEach((row) => stringifier.write(row))
// get next pages from the integration
offset += retrievedData.length
const retrieved = await integrationClient.retrieve({
token: claims.token,
since,
offset,
})
syncedAt = retrieved.since || Date.now()
retrievedData = retrieved.data
console.log('retrieved data', {
total: offset,
size: retrievedData.length,
})
// update the integration's syncedAt and remove taskName
const result = await updateIntegration(
REST_BACKEND_ENDPOINT,
req.body.integrationId,
new Date(syncedAt),
req.body.integrationName,
claims.token,
token
)
if (!result) {
console.error('failed to update integration', {
integrationId: req.body.integrationId,
})
return res.status(400).send('Failed to update integration')
}
} while (retrievedData.length > 0 && offset < 20000) // limit to 20k pages
}
} catch (err) {
console.error('import pages from integration failed', err)
return res.status(500).send(err)
} finally {
writeStream?.end()
}
res.sendStatus(200)
}
)