import { PubSub } from '@google-cloud/pubsub' import express from 'express' import { env } from './env' import { ReportType } from './generated/graphql' import { deepDelete } from './utils/helpers' import { buildLogger } from './utils/logger' const logger = buildLogger('pubsub') const client = new PubSub() export const createPubSubClient = (): PubsubClient => { const fieldsToDelete = ['user'] as const const publish = (topicName: string, msg: Buffer): Promise => { if (env.dev.isLocal) { logger.info(`Publishing ${topicName}: ${msg.toString()}`) return Promise.resolve() } return client .topic(topicName) .publishMessage({ data: msg }) .catch((err) => { logger.error(`[PubSub] error: ${topicName}`, err) }) .then(() => { return Promise.resolve() }) } return { userCreated: ( userId: string, email: string, name: string, username: string ): Promise => { return publish( 'userCreated', Buffer.from(JSON.stringify({ userId, email, name, username })) ) }, entityCreated: ( type: EntityType, data: T, userId: string ): Promise => { const cleanData = deepDelete( data as T & Record, [...fieldsToDelete] ) return publish( 'entityCreated', Buffer.from(JSON.stringify({ type, userId, ...cleanData })) ) }, entityUpdated: ( type: EntityType, data: T, userId: string ): Promise => { const cleanData = deepDelete( data as T & Record, [...fieldsToDelete] ) return publish( 'entityUpdated', Buffer.from(JSON.stringify({ type, userId, ...cleanData })) ) }, entityDeleted: ( type: EntityType, id: string, userId: string ): Promise => { return publish( 'entityDeleted', Buffer.from(JSON.stringify({ type, id, userId })) ) }, reportSubmitted: ( submitterId: string, itemUrl: string, reportType: ReportType[], reportComment: string ): Promise => { return publish( 'reportSubmitted', Buffer.from( JSON.stringify({ submitterId, itemUrl, reportType, reportComment }) ) ) }, } } export enum EntityType { PAGE = 'page', HIGHLIGHT = 'highlight', LABEL = 'label', } export interface PubsubClient { userCreated: ( userId: string, email: string, name: string, username: string ) => Promise entityCreated: (type: EntityType, data: T, userId: string) => Promise entityUpdated: (type: EntityType, data: T, userId: string) => Promise entityDeleted: (type: EntityType, id: string, userId: string) => Promise reportSubmitted( submitterId: string | undefined, itemUrl: string, reportType: ReportType[], reportComment: string ): Promise } interface PubSubRequestMessage { data: string publishTime: string } export interface PubSubRequestBody { message: PubSubRequestMessage } const expired = (body: PubSubRequestBody): boolean => { const now = new Date() const expiredTime = new Date(body.message.publishTime) expiredTime.setHours(expiredTime.getHours() + 1) return now > expiredTime } export const readPushSubscription = ( req: express.Request ): { message: string | undefined; expired: boolean } => { if (req.query.token !== process.env.PUBSUB_VERIFICATION_TOKEN) { logger.info('query does not include valid pubsub token') return { message: undefined, expired: false } } // GCP PubSub sends the request as a base64 encoded string if (!('message' in req.body)) { logger.info('Invalid pubsub message: message not in body') return { message: undefined, expired: false } } const body = req.body as PubSubRequestBody const message = Buffer.from(body.message.data, 'base64').toString('utf-8') return { message: message, expired: expired(body) } }