From fe993f1f41a0cacf38cfb3a4cbb898fc3a07d9cf Mon Sep 17 00:00:00 2001 From: Jackson Harper Date: Wed, 4 Jan 2023 14:48:03 +0800 Subject: [PATCH] Use Cloud Task instead of pubsub for intra service communication --- packages/import-handler/package.json | 4 +- packages/import-handler/src/index.ts | 25 ++++++------ packages/import-handler/src/task.ts | 60 ++++++++++++++++++++++++++++ yarn.lock | 7 ++++ 4 files changed, 81 insertions(+), 15 deletions(-) create mode 100644 packages/import-handler/src/task.ts diff --git a/packages/import-handler/package.json b/packages/import-handler/package.json index b256bfce4..a4567dfb2 100644 --- a/packages/import-handler/package.json +++ b/packages/import-handler/package.json @@ -26,12 +26,12 @@ "dependencies": { "@fast-csv/parse": "^4.3.6", "@google-cloud/functions-framework": "3.1.2", - "@google-cloud/pubsub": "^2.16.3", "@google-cloud/storage": "^5.18.1", + "@google-cloud/tasks": "^3.0.5", "@types/express": "^4.17.13", "axios": "^0.27.2", "concurrently": "^7.0.0", "csv-parser": "^3.0.0", "nodemon": "^2.0.15" } -} +} \ No newline at end of file diff --git a/packages/import-handler/src/index.ts b/packages/import-handler/src/index.ts index 65ff113ed..6815f8ff3 100644 --- a/packages/import-handler/src/index.ts +++ b/packages/import-handler/src/index.ts @@ -3,15 +3,14 @@ import { CloudFunctionsContext, } from '@google-cloud/functions-framework/build/src/functions' import { Storage } from '@google-cloud/storage' -import { PubSub } from '@google-cloud/pubsub' import { importCsv, UrlHandler } from './csv' import * as path from 'path' import { importMatterHistory } from './matterHistory' import { Stream } from 'node:stream' +import { v4 as uuid } from 'uuid' +import { createCloudTask } from './task' -const pubsub = new PubSub() const storage = new Storage() -const IMPORT_URL_UPDATE_TOPIC = 'importURL' interface StorageEventData { bucket: string @@ -43,13 +42,12 @@ const importURL = async ( url: URL, source: string ): Promise => { - return pubsub - .topic(IMPORT_URL_UPDATE_TOPIC) - .publish(Buffer.from(JSON.stringify({ userId, url, source }))) - .catch((err) => { - console.error('error publishing url:', err) - return undefined - }) + return createCloudTask({ + userId, + source, + url: url.toString(), + saveRequestId: uuid(), + }) } const handlerForFile = (name: string): importHandlerFunc | undefined => { @@ -84,12 +82,13 @@ export const importHandler: EventFunction = async (event, context) => { await handler(stream, async (url): Promise => { try { // Imports are stored in the format imports//-.csv - const group = path.parse(data.name).name.match(/(?<=-).*/gi) - if (!group || group.length < 1) { + const regex = new RegExp('imports/(.*?)/') + const groups = regex.exec(data.name) + if (!groups || groups.length < 2) { console.log('could not match file pattern: ', data.name) return } - const userId = [...group][0] + const userId = [...groups][1] const result = await importURL(userId, url, 'csv-importer') console.log('import url result', result) } catch (err) { diff --git a/packages/import-handler/src/task.ts b/packages/import-handler/src/task.ts new file mode 100644 index 000000000..9267499ad --- /dev/null +++ b/packages/import-handler/src/task.ts @@ -0,0 +1,60 @@ +import { CloudTasksClient, protos } from '@google-cloud/tasks' +import { google } from '@google-cloud/tasks/build/protos/protos' + +type TaskPayload = { + url: string + userId: string + saveRequestId: string + source: string +} + +const cloudTask = new CloudTasksClient() + +export const createCloudTask = async (payload: TaskPayload) => { + const queue = 'omnivore-import-queue' + const location = process.env.GCP_LOCATION + const project = process.env.GCP_PROJECT_ID + const taskHandlerUrl = process.env.CONTENT_FETCH_GCF_URL + + if (!project || !location || !queue || !taskHandlerUrl) { + throw `Environment not configured: ${project}, ${location}, ${queue}, ${taskHandlerUrl}` + } + + const serviceAccountEmail = `${project}@appspot.gserviceaccount.com` + + const parent = cloudTask.queuePath(project, location, queue) + console.log(`Task creation options: `, { + project, + location, + queue, + taskHandlerUrl, + serviceAccountEmail, + payload, + }) + + let convertedPayload: string | ArrayBuffer + convertedPayload = JSON.stringify(payload) + + const body = Buffer.from(convertedPayload).toString('base64') + const task: protos.google.cloud.tasks.v2.ITask = { + httpRequest: { + httpMethod: 'POST', + url: taskHandlerUrl, + headers: { + 'Content-Type': 'application/json', + }, + body, + ...(serviceAccountEmail + ? { + oidcToken: { + serviceAccountEmail, + }, + } + : null), + }, + } + + return cloudTask.createTask({ parent, task }).then((result) => { + return result[0].name ?? undefined + }) +} diff --git a/yarn.lock b/yarn.lock index 592b91ca7..9d836228a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2746,6 +2746,13 @@ dependencies: google-gax "^2.24.1" +"@google-cloud/tasks@^3.0.5": + version "3.0.5" + resolved "https://registry.yarnpkg.com/@google-cloud/tasks/-/tasks-3.0.5.tgz#4c0c648d10c7cf10b2e1599dd5a48bf8cc78cbd6" + integrity sha512-fC7afAV2d+zz0A2TxJWBZyBTHlB8nWBkNr/7BH6ZM/uAgT6gmSLfDho3aQNttnumj7IiI0vwUx31Zu1wG3m7yw== + dependencies: + google-gax "^3.5.2" + "@graphql-codegen/cli@^2.6.2": version "2.6.2" resolved "https://registry.yarnpkg.com/@graphql-codegen/cli/-/cli-2.6.2.tgz#a9aa4656141ee0998cae8c7ad7d0bf9ca8e0c9ae"