Use Cloud Task instead of pubsub for intra service communication

This commit is contained in:
Jackson Harper
2023-01-04 14:48:03 +08:00
parent 87c73adebb
commit fe993f1f41
4 changed files with 81 additions and 15 deletions

View File

@ -26,12 +26,12 @@
"dependencies": {
"@fast-csv/parse": "^4.3.6",
"@google-cloud/functions-framework": "3.1.2",
"@google-cloud/pubsub": "^2.16.3",
"@google-cloud/storage": "^5.18.1",
"@google-cloud/tasks": "^3.0.5",
"@types/express": "^4.17.13",
"axios": "^0.27.2",
"concurrently": "^7.0.0",
"csv-parser": "^3.0.0",
"nodemon": "^2.0.15"
}
}
}

View File

@ -3,15 +3,14 @@ import {
CloudFunctionsContext,
} from '@google-cloud/functions-framework/build/src/functions'
import { Storage } from '@google-cloud/storage'
import { PubSub } from '@google-cloud/pubsub'
import { importCsv, UrlHandler } from './csv'
import * as path from 'path'
import { importMatterHistory } from './matterHistory'
import { Stream } from 'node:stream'
import { v4 as uuid } from 'uuid'
import { createCloudTask } from './task'
const pubsub = new PubSub()
const storage = new Storage()
const IMPORT_URL_UPDATE_TOPIC = 'importURL'
interface StorageEventData {
bucket: string
@ -43,13 +42,12 @@ const importURL = async (
url: URL,
source: string
): Promise<string | undefined> => {
return pubsub
.topic(IMPORT_URL_UPDATE_TOPIC)
.publish(Buffer.from(JSON.stringify({ userId, url, source })))
.catch((err) => {
console.error('error publishing url:', err)
return undefined
})
return createCloudTask({
userId,
source,
url: url.toString(),
saveRequestId: uuid(),
})
}
const handlerForFile = (name: string): importHandlerFunc | undefined => {
@ -84,12 +82,13 @@ export const importHandler: EventFunction = async (event, context) => {
await handler(stream, async (url): Promise<void> => {
try {
// Imports are stored in the format imports/<user id>/<type>-<uuid>.csv
const group = path.parse(data.name).name.match(/(?<=-).*/gi)
if (!group || group.length < 1) {
const regex = new RegExp('imports/(.*?)/')
const groups = regex.exec(data.name)
if (!groups || groups.length < 2) {
console.log('could not match file pattern: ', data.name)
return
}
const userId = [...group][0]
const userId = [...groups][1]
const result = await importURL(userId, url, 'csv-importer')
console.log('import url result', result)
} catch (err) {

View File

@ -0,0 +1,60 @@
import { CloudTasksClient, protos } from '@google-cloud/tasks'
import { google } from '@google-cloud/tasks/build/protos/protos'
type TaskPayload = {
url: string
userId: string
saveRequestId: string
source: string
}
const cloudTask = new CloudTasksClient()
export const createCloudTask = async (payload: TaskPayload) => {
const queue = 'omnivore-import-queue'
const location = process.env.GCP_LOCATION
const project = process.env.GCP_PROJECT_ID
const taskHandlerUrl = process.env.CONTENT_FETCH_GCF_URL
if (!project || !location || !queue || !taskHandlerUrl) {
throw `Environment not configured: ${project}, ${location}, ${queue}, ${taskHandlerUrl}`
}
const serviceAccountEmail = `${project}@appspot.gserviceaccount.com`
const parent = cloudTask.queuePath(project, location, queue)
console.log(`Task creation options: `, {
project,
location,
queue,
taskHandlerUrl,
serviceAccountEmail,
payload,
})
let convertedPayload: string | ArrayBuffer
convertedPayload = JSON.stringify(payload)
const body = Buffer.from(convertedPayload).toString('base64')
const task: protos.google.cloud.tasks.v2.ITask = {
httpRequest: {
httpMethod: 'POST',
url: taskHandlerUrl,
headers: {
'Content-Type': 'application/json',
},
body,
...(serviceAccountEmail
? {
oidcToken: {
serviceAccountEmail,
},
}
: null),
},
}
return cloudTask.createTask({ parent, task }).then((result) => {
return result[0].name ?? undefined
})
}

View File

@ -2746,6 +2746,13 @@
dependencies:
google-gax "^2.24.1"
"@google-cloud/tasks@^3.0.5":
version "3.0.5"
resolved "https://registry.yarnpkg.com/@google-cloud/tasks/-/tasks-3.0.5.tgz#4c0c648d10c7cf10b2e1599dd5a48bf8cc78cbd6"
integrity sha512-fC7afAV2d+zz0A2TxJWBZyBTHlB8nWBkNr/7BH6ZM/uAgT6gmSLfDho3aQNttnumj7IiI0vwUx31Zu1wG3m7yw==
dependencies:
google-gax "^3.5.2"
"@graphql-codegen/cli@^2.6.2":
version "2.6.2"
resolved "https://registry.yarnpkg.com/@graphql-codegen/cli/-/cli-2.6.2.tgz#a9aa4656141ee0998cae8c7ad7d0bf9ca8e0c9ae"