diff --git a/packages/api/src/jobs/update_pdf_content.ts b/packages/api/src/jobs/update_pdf_content.ts new file mode 100644 index 000000000..60850fd0a --- /dev/null +++ b/packages/api/src/jobs/update_pdf_content.ts @@ -0,0 +1,14 @@ +import { + UpdateContentMessage, + isUpdateContentMessage, + updateContentForFileItem, +} from '../services/update_pdf_content' +import { logger } from '../utils/logger' + +export const updatePDFContentJob = async (data: unknown): Promise => { + if (isUpdateContentMessage(data)) { + return await updateContentForFileItem(data) + } + logger.log('update_pdf_content data is not a update message', { data }) + return false +} diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index dc6b27673..7ccd5e272 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -12,6 +12,7 @@ import { refreshFeed } from './jobs/rss/refreshFeed' import { savePageJob } from './jobs/save_page' import { redisDataSource } from './redis_data_source' import { CustomTypeOrmLogger } from './utils/logger' +import { updatePDFContentJob } from './jobs/update_pdf_content' export const QUEUE_NAME = 'omnivore-backend-queue' @@ -98,6 +99,9 @@ const main = async () => { case 'save-page': { return savePageJob(job.data, job.attemptsMade) } + case 'update-pdf-content': { + return updatePDFContentJob(job.data) + } } return true }, diff --git a/packages/api/src/routers/svc/content.ts b/packages/api/src/routers/svc/content.ts index 70723429b..2c1c51d25 100644 --- a/packages/api/src/routers/svc/content.ts +++ b/packages/api/src/routers/svc/content.ts @@ -2,25 +2,12 @@ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ /* eslint-disable @typescript-eslint/explicit-module-boundary-types */ import express from 'express' -import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity' -import { LibraryItem, LibraryItemState } from '../../entity/library_item' import { readPushSubscription } from '../../pubsub' -import { authTrx } from '../../repository' -import { libraryItemRepository } from '../../repository/library_item' -import { updateLibraryItem } from '../../services/library_item' -import { - findUploadFileById, - setFileUploadComplete, -} from '../../services/upload_file' import { logger } from '../../utils/logger' - -interface UpdateContentMessage { - fileId: string - content: string - title?: string - author?: string - description?: string -} +import { + UpdateContentMessage, + updateContentForFileItem, +} from '../../services/update_pdf_content' export function contentServiceRouter() { const router = express.Router() @@ -48,75 +35,12 @@ export function contentServiceRouter() { return } const msg = data as UpdateContentMessage - - // First attempt to parse the file id out of the name - const parts = msg.fileId.split('/') - const fileId = parts && parts.length > 1 ? parts[1] : undefined - if (!fileId) { - logger.info('No file id found in message') - res.status(200).send('Bad Request') - return - } - - const uploadFile = await findUploadFileById(fileId) - if (!uploadFile) { - logger.info('No file found') - res.status(404).send('No file found') - return - } - - const libraryItem = await authTrx( - async (tx) => - tx - .withRepository(libraryItemRepository) - .createQueryBuilder('item') - .innerJoinAndSelect('item.uploadFile', 'file') - .where('item.user = :userId', { userId: uploadFile.user.id }) - .andWhere('file.id = :fileId', { fileId }) - .getOne(), - undefined, - uploadFile.user.id - ) - if (!libraryItem) { - logger.info(`No upload file found for id: ${fileId}`) + if (!updateContentForFileItem(msg)) { res.status(404).send('Bad Request') return } - - const itemToUpdate: QueryDeepPartialEntity = { - originalContent: msg.content, - } - if (msg.title) itemToUpdate.title = msg.title - if (msg.author) itemToUpdate.author = msg.author - if (msg.description) itemToUpdate.description = msg.description - - // This event is fired after the file is fully uploaded, - // so along with updating content, we mark it as - // succeeded. - itemToUpdate.state = LibraryItemState.Succeeded - - try { - const uploadFileData = await setFileUploadComplete( - fileId, - uploadFile.user.id - ) - logger.info('updated uploadFileData', uploadFileData) - } catch (error) { - logger.info('error marking file upload as completed', error) - } - - const result = await updateLibraryItem( - libraryItem.id, - itemToUpdate, - uploadFile.user.id - ) - logger.info('Updating library item text', { - id: libraryItem.id, - result, - content: msg.content.substring(0, 20), - }) - - res.status(200).send(msg) + res.sendStatus(200) + return }) return router diff --git a/packages/api/src/services/update_pdf_content.ts b/packages/api/src/services/update_pdf_content.ts new file mode 100644 index 000000000..ed9822ce2 --- /dev/null +++ b/packages/api/src/services/update_pdf_content.ts @@ -0,0 +1,88 @@ +import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity' +import { LibraryItem, LibraryItemState } from '../entity/library_item' +import { authTrx } from '../repository' +import { libraryItemRepository } from '../repository/library_item' +import { logger } from '../utils/logger' +import { updateLibraryItem } from './library_item' +import { findUploadFileById, setFileUploadComplete } from './upload_file' + +export interface UpdateContentMessage { + fileId: string + content: string + title?: string + author?: string + description?: string +} + +export const isUpdateContentMessage = ( + data: any +): data is UpdateContentMessage => { + return 'fileId' in data && 'content' in data +} + +export const updateContentForFileItem = async (msg: UpdateContentMessage) => { + const parts = msg.fileId.split('/') + const fileId = parts && parts.length > 1 ? parts[1] : undefined + if (!fileId) { + logger.info('No file id found in message') + return true + } + + const uploadFile = await findUploadFileById(fileId) + if (!uploadFile) { + logger.info('No file found') + return false + } + + const libraryItem = await authTrx( + async (tx) => + tx + .withRepository(libraryItemRepository) + .createQueryBuilder('item') + .innerJoinAndSelect('item.uploadFile', 'file') + .where('item.user = :userId', { userId: uploadFile.user.id }) + .andWhere('file.id = :fileId', { fileId }) + .getOne(), + undefined, + uploadFile.user.id + ) + if (!libraryItem) { + logger.info(`No upload file found for id: ${fileId}`) + return false + } + + const itemToUpdate: QueryDeepPartialEntity = { + originalContent: msg.content, + } + if (msg.title) itemToUpdate.title = msg.title + if (msg.author) itemToUpdate.author = msg.author + if (msg.description) itemToUpdate.description = msg.description + + // This event is fired after the file is fully uploaded, + // so along with updating content, we mark it as + // succeeded. + itemToUpdate.state = LibraryItemState.Succeeded + + try { + const uploadFileData = await setFileUploadComplete( + fileId, + uploadFile.user.id + ) + logger.info('updated uploadFileData', uploadFileData) + } catch (error) { + logger.info('error marking file upload as completed', error) + } + + const result = await updateLibraryItem( + libraryItem.id, + itemToUpdate, + uploadFile.user.id + ) + logger.info('Updating library item text', { + id: libraryItem.id, + result, + content: msg.content.substring(0, 20), + }) + + return true +} diff --git a/packages/pdf-handler/package.json b/packages/pdf-handler/package.json index 80c679c4a..52cf4ac97 100644 --- a/packages/pdf-handler/package.json +++ b/packages/pdf-handler/package.json @@ -32,7 +32,9 @@ "@google-cloud/storage": "^7.0.1", "@sentry/serverless": "^7.77.0", "axios": "^0.27.2", + "bullmq": "^5.1.4", "concurrently": "^7.0.0", + "ioredis": "^5.3.2", "pdfjs-dist": "^2.9.359" }, "volta": { diff --git a/packages/pdf-handler/src/index.ts b/packages/pdf-handler/src/index.ts index 8c0aec5e1..e1623372b 100644 --- a/packages/pdf-handler/src/index.ts +++ b/packages/pdf-handler/src/index.ts @@ -1,16 +1,14 @@ -import { PubSub } from '@google-cloud/pubsub' import { GetSignedUrlConfig, Storage } from '@google-cloud/storage' import * as Sentry from '@sentry/serverless' import { parsePdf } from './pdf' +import { queueUpdatePageJob } from './job' Sentry.GCPFunction.init({ dsn: process.env.SENTRY_DSN, tracesSampleRate: 0, }) -const pubsub = new PubSub() const storage = new Storage() -const CONTENT_UPDATE_TOPIC = 'updatePageContent' interface StorageEventData { bucket: string @@ -50,24 +48,21 @@ const getDocumentUrl = async ( } } -export const updatePageContent = ( +export const updatePageContent = async ( fileId: string, content: string, title?: string, author?: string, description?: string ): Promise => { - return pubsub - .topic(CONTENT_UPDATE_TOPIC) - .publish( - Buffer.from( - JSON.stringify({ fileId, content, title, author, description }) - ) - ) - .catch((err) => { - console.error('error publishing conentUpdate:', err) - return undefined - }) + const job = await queueUpdatePageJob({ + fileId, + content, + title, + author, + description, + }) + return job.id } const getStorageEventData = ( diff --git a/packages/pdf-handler/src/job.ts b/packages/pdf-handler/src/job.ts new file mode 100644 index 000000000..3ba1c1849 --- /dev/null +++ b/packages/pdf-handler/src/job.ts @@ -0,0 +1,21 @@ +import { Queue } from 'bullmq' +import { redisDataSource } from './redis_data_source' + +const QUEUE_NAME = 'omnivore-backend-queue' +const JOB_NAME = 'update-pdf-content' + +const queue = new Queue(QUEUE_NAME, { + connection: redisDataSource.queueRedisClient, +}) + +type UpdatePageJobData = { + fileId: string + content: string + title?: string + author?: string + description?: string +} + +export const queueUpdatePageJob = async (data: UpdatePageJobData) => { + return queue.add(JOB_NAME, data) +} diff --git a/packages/pdf-handler/src/redis_data_source.ts b/packages/pdf-handler/src/redis_data_source.ts new file mode 100644 index 000000000..1a95f9d6e --- /dev/null +++ b/packages/pdf-handler/src/redis_data_source.ts @@ -0,0 +1,88 @@ +import Redis, { RedisOptions } from 'ioredis' + +export type RedisDataSourceOptions = { + REDIS_URL?: string + REDIS_CERT?: string +} + +export class RedisDataSource { + options: RedisDataSourceOptions + + cacheClient: Redis + queueRedisClient: Redis + + constructor(options: RedisDataSourceOptions) { + this.options = options + + this.cacheClient = createRedisClient('cache', this.options) + this.queueRedisClient = createRedisClient('queue', this.options) + } + + setOptions(options: RedisDataSourceOptions): void { + this.options = options + } + + async shutdown(): Promise { + try { + await this.queueRedisClient?.quit() + await this.cacheClient?.quit() + } catch (err) { + console.error('error while shutting down redis', err) + } + } +} + +const createRedisClient = (name: string, options: RedisDataSourceOptions) => { + const redisURL = options.REDIS_URL + const cert = options.REDIS_CERT?.replace(/\\n/g, '\n') // replace \n with new line + if (!redisURL) { + throw 'Error: no redisURL supplied' + } + + const redisOptions: RedisOptions = { + name, + connectTimeout: 10000, // 10 seconds + tls: cert + ? { + cert, + rejectUnauthorized: false, // for self-signed certs + } + : undefined, + maxRetriesPerRequest: null, + offlineQueue: false, + } + + const redis = new Redis(redisURL, redisOptions) + + redis.on('connect', () => { + console.log('Redis connected', name) + }) + + redis.on('error', (err) => { + console.error('Redis error', err, name) + }) + + redis.on('close', () => { + console.log('Redis closed', name) + }) + + return redis +} + +export const redisDataSource = new RedisDataSource({ + REDIS_URL: process.env.REDIS_URL, + REDIS_CERT: process.env.REDIS_CERT, +}) + +// eslint-disable-next-line @typescript-eslint/no-misused-promises +process.on('SIGINT', async () => { + console.log('SIGINT signal received.') + + try { + await redisDataSource.shutdown() + } catch (error) { + console.error('error while shutting down redis', error) + } + + process.exit(0) +})