diff --git a/packages/api/src/datalayer/pubsub.ts b/packages/api/src/datalayer/pubsub.ts index f81b3210f..8ef12ba83 100644 --- a/packages/api/src/datalayer/pubsub.ts +++ b/packages/api/src/datalayer/pubsub.ts @@ -1,4 +1,4 @@ -import { CreateSubscriptionOptions, PubSub } from '@google-cloud/pubsub' +import { PubSub } from '@google-cloud/pubsub' import { env } from '../env' import { ReportType } from '../generated/graphql' import express from 'express' @@ -143,38 +143,3 @@ export const readPushSubscription = ( return { message: message, expired: expired(body) } } - -export const createPubSubSubscription = async ( - topicName: string, - subscriptionName: string, - options?: CreateSubscriptionOptions -) => { - const topic = client.topic(topicName) - const [exists] = await topic.exists() - if (!exists) { - await topic.create() - } - - const subscription = topic.subscription(subscriptionName) - const [subscriptionExists] = await subscription.exists() - if (!subscriptionExists) { - await subscription.create(options) - } -} - -export const deletePubSubSubscription = async ( - topicName: string, - subscriptionName: string -) => { - const topic = client.topic(topicName) - const [exists] = await topic.exists() - if (!exists) { - return - } - - const subscription = topic.subscription(subscriptionName) - const [subscriptionExists] = await subscription.exists() - if (subscriptionExists) { - await subscription.delete() - } -} diff --git a/packages/api/src/resolvers/rules/index.ts b/packages/api/src/resolvers/rules/index.ts index ffa732410..516e07e79 100644 --- a/packages/api/src/resolvers/rules/index.ts +++ b/packages/api/src/resolvers/rules/index.ts @@ -8,15 +8,6 @@ import { import { getRepository } from '../../entity/utils' import { User } from '../../entity/user' import { Rule } from '../../entity/rule' -import { - getPubSubSubscriptionName, - getPubSubSubscriptionOptions, - getPubSubTopicName, -} from '../../services/rules' -import { - createPubSubSubscription, - deletePubSubSubscription, -} from '../../datalayer/pubsub' export const setRuleResolver = authorized< SetRuleSuccess, @@ -40,28 +31,6 @@ export const setRuleResolver = authorized< } } - // create or delete pubsub subscription based on action and enabled state - for (const action of input.actions) { - const topicName = getPubSubTopicName(action) - const subscriptionName = getPubSubSubscriptionName( - topicName, - user.id, - input.name - ) - - if (input.enabled) { - const options = await getPubSubSubscriptionOptions( - user.id, - input.name, - input.filter, - action - ) - await createPubSubSubscription(topicName, subscriptionName, options) - } else { - await deletePubSubSubscription(topicName, subscriptionName) - } - } - const rule = await getRepository(Rule).save({ ...input, id: input.id || undefined, diff --git a/packages/api/src/services/rules.ts b/packages/api/src/services/rules.ts deleted file mode 100644 index 01f76f005..000000000 --- a/packages/api/src/services/rules.ts +++ /dev/null @@ -1,94 +0,0 @@ -import { RuleAction, RuleActionType } from '../generated/graphql' -import { CreateSubscriptionOptions } from '@google-cloud/pubsub' -import { env } from '../env' -import { getDeviceTokensByUserId } from './user_device_tokens' - -enum RuleTrigger { - ON_PAGE_UPDATE, - CRON, -} - -export const getRuleTrigger = (action: RuleAction): RuleTrigger => { - switch (action.type) { - case RuleActionType.AddLabel: - case RuleActionType.Archive: - case RuleActionType.MarkAsRead: - case RuleActionType.SendNotification: - return RuleTrigger.ON_PAGE_UPDATE - // TODO: Add more actions, e.g. RuleActionType.SendEmail - } - - return RuleTrigger.ON_PAGE_UPDATE -} - -export const getPubSubTopicName = (action: RuleAction): string => { - const trigger = getRuleTrigger(action) - - switch (trigger) { - case RuleTrigger.ON_PAGE_UPDATE: - return 'entityUpdated' - // TODO: Add more triggers, e.g. RuleTrigger.CRON - } - - return 'entityUpdated' -} - -export const getPubSubSubscriptionName = ( - topicName: string, - userId: string, - ruleName: string -): string => { - return `${topicName}-${userId}-rule-${ruleName}` -} - -export const getPubSubSubscriptionOptions = async ( - userId: string, - ruleName: string, - filter: string, - action: RuleAction -): Promise => { - const options: CreateSubscriptionOptions = { - messageRetentionDuration: 60 * 10, // 10 minutes - expirationPolicy: { - ttl: null, // never expire - }, - ackDeadlineSeconds: 10, - retryPolicy: { - minimumBackoff: { - seconds: 10, - }, - maximumBackoff: { - seconds: 600, - }, - }, - filter, - } - - switch (action.type) { - case RuleActionType.SendNotification: { - const params = action.params - if (params.length === 0) { - throw new Error('Missing notification messages') - } - - const deviceTokens = await getDeviceTokensByUserId(userId) - if (!deviceTokens || deviceTokens.length === 0) { - throw new Error('No device tokens found') - } - - options.pushConfig = { - pushEndpoint: `${env.queue.notificationEndpoint}?token=${env.queue.verificationToken}`, - attributes: { - userId, - filter, - messages: JSON.stringify(params), - tokens: JSON.stringify(deviceTokens.map((t) => t.token)), - }, - } - break - } - // TODO: Add more actions, e.g. RuleActionType.SendEmail - } - - return options -} diff --git a/packages/notification/src/index.ts b/packages/notification/src/index.ts deleted file mode 100644 index d03bfdcb5..000000000 --- a/packages/notification/src/index.ts +++ /dev/null @@ -1,97 +0,0 @@ -import * as Sentry from '@sentry/serverless' -import { Request, Response } from 'express' -import { sendBatchPushNotifications } from './sendNotification' -import { Message } from 'firebase-admin/lib/messaging' -import * as dotenv from 'dotenv' // see https://github.com/motdotla/dotenv#how-do-i-use-dotenv-with-import - -dotenv.config() - -interface SubscriptionAttributes { - messages: string[] - tokens: string[] -} - -interface SubscriptionData { - attributes?: SubscriptionAttributes - data: string -} - -const readPushSubscription = (req: Request): SubscriptionData | null => { - console.debug('request query', req.body) - - if (req.query.token !== process.env.PUBSUB_VERIFICATION_TOKEN) { - console.log('query does not include valid pubsub token') - return null - } - - // GCP PubSub sends the request as a base64 encoded string - if (!('message' in req.body)) { - console.log('Invalid pubsub message: message not in body') - return null - } - - const body = req.body as { - message: { data: string } - attributes?: SubscriptionAttributes - } - const data = Buffer.from(body.message.data, 'base64').toString('utf-8') - - return { - data, - attributes: body.attributes, - } -} - -const getBatchMessages = (messages: string[], tokens: string[]): Message[] => { - const batchMessages: Message[] = [] - messages.forEach((message) => { - tokens.forEach((token) => { - batchMessages.push({ - token, - notification: { - body: message, - }, - }) - }) - }) - - return batchMessages -} - -export const notification = Sentry.GCPFunction.wrapHttpFunction( - async (req: Request, res: Response) => { - try { - const subscriptionData = readPushSubscription(req) - if (!subscriptionData) { - res.status(400).send('Invalid request') - return - } - - const { attributes } = subscriptionData - if (!attributes) { - res.status(400).send('Invalid request') - return - } - - const { messages, tokens } = attributes - if ( - !messages || - messages.length === 0 || - !tokens || - tokens.length === 0 - ) { - res.status(400).send('Invalid request') - return - } - - const batchMessages = getBatchMessages(messages, tokens) - - await sendBatchPushNotifications(batchMessages) - - res.status(200).send('OK') - } catch (error) { - console.error(error) - res.status(500).send('Internal server error') - } - } -) diff --git a/packages/notification/.dockerignore b/packages/rule-handler/.dockerignore similarity index 100% rename from packages/notification/.dockerignore rename to packages/rule-handler/.dockerignore diff --git a/packages/notification/.eslintignore b/packages/rule-handler/.eslintignore similarity index 100% rename from packages/notification/.eslintignore rename to packages/rule-handler/.eslintignore diff --git a/packages/notification/.eslintrc b/packages/rule-handler/.eslintrc similarity index 100% rename from packages/notification/.eslintrc rename to packages/rule-handler/.eslintrc diff --git a/packages/notification/.gcloudignore b/packages/rule-handler/.gcloudignore similarity index 100% rename from packages/notification/.gcloudignore rename to packages/rule-handler/.gcloudignore diff --git a/packages/notification/.npmignore b/packages/rule-handler/.npmignore similarity index 100% rename from packages/notification/.npmignore rename to packages/rule-handler/.npmignore diff --git a/packages/notification/Dockerfile b/packages/rule-handler/Dockerfile similarity index 56% rename from packages/notification/Dockerfile rename to packages/rule-handler/Dockerfile index b22f1317f..f0abbcc56 100644 --- a/packages/notification/Dockerfile +++ b/packages/rule-handler/Dockerfile @@ -8,19 +8,19 @@ COPY yarn.lock . COPY tsconfig.json . COPY .eslintrc . -COPY /packages/notification/package.json ./packages/notification/package.json +COPY /packages/rule-handler/package.json ./packages/rule-handler/package.json RUN yarn install --pure-lockfile -ADD /packages/notification ./packages/notification -RUN yarn workspace @omnivore/notification build +ADD /packages/rule-handler ./packages/rule-handler +RUN yarn workspace @omnivore/rule-handler build # After building, fetch the production dependencies -RUN rm -rf /app/packages/notification/node_modules +RUN rm -rf /app/packages/rule-handler/node_modules RUN rm -rf /app/node_modules RUN yarn install --pure-lockfile --production EXPOSE 8080 -CMD ["yarn", "workspace", "@omnivore/notification", "start"] +CMD ["yarn", "workspace", "@omnivore/rule-handler", "start"] diff --git a/packages/notification/mocha-config.json b/packages/rule-handler/mocha-config.json similarity index 100% rename from packages/notification/mocha-config.json rename to packages/rule-handler/mocha-config.json diff --git a/packages/notification/package.json b/packages/rule-handler/package.json similarity index 65% rename from packages/notification/package.json rename to packages/rule-handler/package.json index adc92f007..4b6b5be90 100644 --- a/packages/notification/package.json +++ b/packages/rule-handler/package.json @@ -1,5 +1,5 @@ { - "name": "@omnivore/notification", + "name": "@omnivore/rule-handler", "version": "1.0.0", "main": "build/src/index.js", "files": [ @@ -11,9 +11,9 @@ "lint": "eslint src --ext ts,js,tsx,jsx", "compile": "tsc", "build": "tsc", - "start": "functions-framework --target=notification", + "start": "functions-framework --target=ruleHandler", "dev": "concurrently \"tsc -w\" \"nodemon --watch ./build/ --exec npm run start\"", - "gcloud-deploy": "gcloud functions deploy notification --gen2 --entry-point=notification --trigger-http --allow-unauthenticated --region=us-west2 --runtime nodejs14", + "gcloud-deploy": "gcloud functions deploy rule-handler --gen2 --entry-point=ruleHandler --trigger-http --allow-unauthenticated --region=us-west2 --runtime nodejs14", "deploy": "yarn build && yarn gcloud-deploy" }, "devDependencies": { @@ -23,9 +23,10 @@ }, "dependencies": { "@google-cloud/functions-framework": "3.1.2", - "@google-cloud/pubsub": "^3.2.1", "dotenv": "^16.0.1", "firebase-admin": "^10.0.2", - "@sentry/serverless": "^6.16.1" + "@sentry/serverless": "^6.16.1", + "typeorm": "^0.3.4", + "typeorm-naming-strategies": "^4.1.0" } } diff --git a/packages/rule-handler/src/db.ts b/packages/rule-handler/src/db.ts new file mode 100644 index 000000000..518c610e0 --- /dev/null +++ b/packages/rule-handler/src/db.ts @@ -0,0 +1,30 @@ +import { DataSource, EntityTarget, Repository } from 'typeorm' +import { SnakeNamingStrategy } from 'typeorm-naming-strategies' +import * as dotenv from 'dotenv' + +dotenv.config() + +const AppDataSource = new DataSource({ + type: 'postgres', + host: process.env.PG_HOST, + port: Number(process.env.PG_PORT), + schema: 'omnivore', + username: process.env.PG_USER, + password: process.env.PG_PASSWORD, + database: process.env.PG_DB, + logging: ['query', 'info'], + entities: [__dirname + '/entity/**/*{.js,.ts}'], + namingStrategy: new SnakeNamingStrategy(), +}) + +export const createDBConnection = async () => { + await AppDataSource.initialize() +} + +export const closeDBConnection = async () => { + await AppDataSource.destroy() +} + +export const getRepository = (entity: EntityTarget): Repository => { + return AppDataSource.getRepository(entity) +} diff --git a/packages/rule-handler/src/entity/rules.ts b/packages/rule-handler/src/entity/rules.ts new file mode 100644 index 000000000..d508be63f --- /dev/null +++ b/packages/rule-handler/src/entity/rules.ts @@ -0,0 +1,41 @@ +import { + Column, + CreateDateColumn, + Entity, + JoinColumn, + ManyToOne, + PrimaryGeneratedColumn, + UpdateDateColumn, +} from 'typeorm' +import { User } from './user' + +@Entity({ name: 'rules' }) +export class Rules { + @PrimaryGeneratedColumn('uuid') + id!: string + + @ManyToOne(() => User, { onDelete: 'CASCADE' }) + @JoinColumn({ name: 'user_id' }) + user!: User + + @Column('text') + name!: string + + @Column('text') + filter!: string + + @Column('simple-json') + actions!: { type: string; params: string[] }[] + + @Column('text', { nullable: true }) + description?: string | null + + @Column('boolean', { default: true }) + enabled!: boolean + + @CreateDateColumn({ default: () => 'CURRENT_TIMESTAMP' }) + createdAt!: Date + + @UpdateDateColumn({ default: () => 'CURRENT_TIMESTAMP' }) + updatedAt!: Date +} diff --git a/packages/rule-handler/src/entity/user.ts b/packages/rule-handler/src/entity/user.ts new file mode 100644 index 000000000..f3ecf261f --- /dev/null +++ b/packages/rule-handler/src/entity/user.ts @@ -0,0 +1,31 @@ +import { + Column, + CreateDateColumn, + Entity, + PrimaryGeneratedColumn, + UpdateDateColumn, +} from 'typeorm' + +@Entity() +export class User { + @PrimaryGeneratedColumn('uuid') + id!: string + + @Column('text') + name!: string + + @Column('text') + email!: string + + @Column('text') + sourceUserId!: string + + @CreateDateColumn() + createdAt!: Date + + @UpdateDateColumn() + updatedAt!: Date + + @Column('varchar', { length: 255, nullable: true }) + password?: string +} diff --git a/packages/rule-handler/src/entity/user_device_tokens.ts b/packages/rule-handler/src/entity/user_device_tokens.ts new file mode 100644 index 000000000..0f642d245 --- /dev/null +++ b/packages/rule-handler/src/entity/user_device_tokens.ts @@ -0,0 +1,25 @@ +import { + Column, + CreateDateColumn, + Entity, + JoinColumn, + ManyToOne, + PrimaryGeneratedColumn, +} from 'typeorm' +import { User } from './user' + +@Entity({ name: 'user_device_tokens' }) +export class UserDeviceToken { + @PrimaryGeneratedColumn('uuid') + id!: string + + @Column('text') + token!: string + + @ManyToOne(() => User) + @JoinColumn({ name: 'user_id' }) + user!: User + + @CreateDateColumn() + createdAt!: Date +} diff --git a/packages/rule-handler/src/index.ts b/packages/rule-handler/src/index.ts new file mode 100644 index 000000000..ee4706984 --- /dev/null +++ b/packages/rule-handler/src/index.ts @@ -0,0 +1,102 @@ +import * as Sentry from '@sentry/serverless' +import express, { Request, Response } from 'express' +import * as dotenv from 'dotenv' +import { closeDBConnection, createDBConnection, getRepository } from './db' +import { Rules } from './entity/rules' +import { triggerActions } from './rule' // see https://github.com/motdotla/dotenv#how-do-i-use-dotenv-with-import + +dotenv.config() + +interface PubSubRequestMessage { + data: string + publishTime: string +} + +interface PubSubRequestBody { + message: PubSubRequestMessage +} + +enum EntityType { + PAGE = 'page', + HIGHLIGHT = 'highlight', + LABEL = 'label', +} + +const expired = (body: PubSubRequestBody): boolean => { + const now = new Date() + const expiredTime = new Date(body.message.publishTime) + expiredTime.setHours(expiredTime.getHours() + 1) + + return now > expiredTime +} + +const readPushSubscription = ( + req: express.Request +): { message: string | undefined; expired: boolean } => { + console.debug('request query', req.body) + + if (req.query.token !== process.env.PUBSUB_VERIFICATION_TOKEN) { + console.log('query does not include valid pubsub token') + return { message: undefined, expired: false } + } + + // GCP PubSub sends the request as a base64 encoded string + if (!('message' in req.body)) { + console.log('Invalid pubsub message: message not in body') + return { message: undefined, expired: false } + } + + const body = req.body as PubSubRequestBody + const message = Buffer.from(body.message.data, 'base64').toString('utf-8') + + return { message: message, expired: expired(body) } +} + +export const ruleHandler = Sentry.GCPFunction.wrapHttpFunction( + async (req: Request, res: Response) => { + const { message: msgStr, expired } = readPushSubscription(req) + + if (!msgStr) { + res.status(400).send('Bad Request') + return + } + + if (expired) { + console.log('discarding expired message') + res.status(200).send('Expired') + return + } + + try { + const data = JSON.parse(msgStr) as { userId: string; type: EntityType } + const { userId, type } = data + if (!userId || !type) { + console.log('No userId or type found in message') + res.status(400).send('Bad Request') + return + } + + if (type !== EntityType.PAGE) { + console.log('Not a page update') + res.status(200).send('Not Page') + return + } + + await createDBConnection() + + const rules = await getRepository(Rules).findBy({ + user: { id: userId }, + enabled: true, + }) + + await triggerActions(userId, rules, data) + + await closeDBConnection() + + res.status(200).send('OK') + } catch (error) { + console.error(error) + res.status(500).send('Internal server error') + } + } +) diff --git a/packages/rule-handler/src/rule.ts b/packages/rule-handler/src/rule.ts new file mode 100644 index 000000000..97b3c7ba3 --- /dev/null +++ b/packages/rule-handler/src/rule.ts @@ -0,0 +1,48 @@ +import { Rules } from './entity/rules' +import { + getBatchMessages, + sendBatchPushNotifications, +} from './sendNotification' +import { getRepository } from './db' +import { UserDeviceToken } from './entity/user_device_tokens' + +enum RuleActionType { + AddLabel = 'ADD_LABEL', + Archive = 'ARCHIVE', + MarkAsRead = 'MARK_AS_READ', + SendNotification = 'SEND_NOTIFICATION', +} + +export const triggerActions = async ( + userId: string, + rules: Rules[], + data: any +) => { + for (const rule of rules) { + // TODO: filter out rules that don't match the trigger + + for (const action of rule.actions) { + switch (action.type) { + case RuleActionType.AddLabel: + case RuleActionType.Archive: + case RuleActionType.MarkAsRead: + continue + case RuleActionType.SendNotification: + await sendNotification(userId, action.params) + } + } + } +} + +export const sendNotification = async (userId: string, messages: string[]) => { + const tokens = await getRepository(UserDeviceToken).findBy({ + user: { id: userId }, + }) + + const batchMessages = getBatchMessages( + messages, + tokens.map((t) => t.token) + ) + + return sendBatchPushNotifications(batchMessages) +} diff --git a/packages/notification/src/sendNotification.ts b/packages/rule-handler/src/sendNotification.ts similarity index 69% rename from packages/notification/src/sendNotification.ts rename to packages/rule-handler/src/sendNotification.ts index 5f1a988d4..cd3676a00 100644 --- a/packages/notification/src/sendNotification.ts +++ b/packages/rule-handler/src/sendNotification.ts @@ -11,6 +11,25 @@ initializeApp({ credential: applicationDefault(), }) +export const getBatchMessages = ( + messages: string[], + tokens: string[] +): Message[] => { + const batchMessages: Message[] = [] + messages.forEach((message) => { + tokens.forEach((token) => { + batchMessages.push({ + token, + notification: { + body: message, + }, + }) + }) + }) + + return batchMessages +} + export const sendPushNotification = async ( message: Message ): Promise => { diff --git a/packages/notification/test/babel-register.js b/packages/rule-handler/test/babel-register.js similarity index 100% rename from packages/notification/test/babel-register.js rename to packages/rule-handler/test/babel-register.js diff --git a/packages/notification/tsconfig.json b/packages/rule-handler/tsconfig.json similarity index 78% rename from packages/notification/tsconfig.json rename to packages/rule-handler/tsconfig.json index 42c16d244..547ae79ac 100644 --- a/packages/notification/tsconfig.json +++ b/packages/rule-handler/tsconfig.json @@ -1,5 +1,5 @@ { - "extends": "@tsconfig/node14/tsconfig.json", + "extends": "./../../tsconfig.json", "compilerOptions": { "outDir": "build", "rootDir": ".",