diff --git a/.github/workflows/run-tests.yaml b/.github/workflows/run-tests.yaml index 0086a086e..08bc418cb 100644 --- a/.github/workflows/run-tests.yaml +++ b/.github/workflows/run-tests.yaml @@ -42,6 +42,15 @@ jobs: --health-retries 10 ports: - 9200 + redis: + image: redis + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 10 + ports: + - 6379 steps: - uses: actions/checkout@v2 with: @@ -89,6 +98,7 @@ jobs: PG_DB: omnivore_test PG_POOL_MAX: 10 ELASTIC_URL: http://localhost:${{ job.services.elastic.ports[9200] }}/ + REDIS_URL: redis://localhost:${{ job.services.redis.ports[6379] }} build-docker-images: name: Build docker images runs-on: ubuntu-latest diff --git a/packages/api/src/utils/parser.ts b/packages/api/src/utils/parser.ts index 6d255677a..e48ca7326 100644 --- a/packages/api/src/utils/parser.ts +++ b/packages/api/src/utils/parser.ts @@ -207,6 +207,16 @@ export const parsePreparedContent = async ( let highlightData = undefined const { document, pageInfo } = preparedDocument + if (!document) { + console.log('No document') + return { + canonicalUrl: url, + parsedContent: null, + domContent: '', + pageType: PageType.Unknown, + } + } + // Checking for content type acceptance or if there are no contentType // at all (backward extension versions compatibility) if ( @@ -222,14 +232,15 @@ export const parsePreparedContent = async ( } } - let dom = parseHTML(document).document + let dom: Document | null = null try { + dom = parseHTML(document).document + if (!article) { // Attempt to parse the article // preParse content - const preParsedDom = await preParseContent(url, dom) - preParsedDom && (dom = preParsedDom) + dom = (await preParseContent(url, dom)) || dom article = await getReadabilityResult(url, document, dom, isNewsletter) } @@ -260,7 +271,7 @@ export const parsePreparedContent = async ( codeBlocks.forEach((e) => { if (e.textContent) { const att = hljs.highlightAuto(e.textContent) - const code = dom.createElement('code') + const code = articleDom.createElement('code') const langClass = `hljs language-${att.language}` + (att.second_best?.language @@ -356,7 +367,7 @@ export const parsePreparedContent = async ( domContent: document, parsedContent: article, canonicalUrl, - pageType: parseOriginalContent(dom), + pageType: dom ? parseOriginalContent(dom) : PageType.Unknown, highlightData, } } diff --git a/packages/content-handler/src/websites/twitter-handler.ts b/packages/content-handler/src/websites/twitter-handler.ts index 16fa054c2..c1878e498 100644 --- a/packages/content-handler/src/websites/twitter-handler.ts +++ b/packages/content-handler/src/websites/twitter-handler.ts @@ -1,9 +1,9 @@ -import { ContentHandler, PreHandleResult } from '../content-handler' import axios from 'axios' -import { DateTime } from 'luxon' -import _ from 'underscore' import { truncate } from 'lodash' +import { DateTime } from 'luxon' import { Browser, BrowserContext } from 'puppeteer-core' +import _ from 'underscore' +import { ContentHandler, PreHandleResult } from '../content-handler' interface TweetIncludes { users: { @@ -219,6 +219,7 @@ const getTweetIds = async ( await page.goto(pageURL, { waitUntil: 'networkidle0', + timeout: 60000, // 60 seconds }) return (await page.evaluate(async (author) => { @@ -287,7 +288,8 @@ const getTweetIds = async ( return Array.from(ids) }, author)) as string[] } catch (error) { - console.log(error) + console.error('Error getting tweets', error) + return [] } finally { if (context) { diff --git a/packages/import-handler/Dockerfile-collector b/packages/import-handler/Dockerfile-collector new file mode 100644 index 000000000..fba468482 --- /dev/null +++ b/packages/import-handler/Dockerfile-collector @@ -0,0 +1,31 @@ +FROM node:14.18-alpine + +WORKDIR /app + +ENV PUPPETEER_SKIP_CHROMIUM_DOWNLOAD true +RUN apk add g++ make python3 + +ENV PORT 8080 + +COPY package.json . +COPY yarn.lock . +COPY tsconfig.json . +COPY .eslintrc . + +COPY /packages/readabilityjs/package.json ./packages/readabilityjs/package.json +COPY /packages/import-handler/package.json ./packages/import-handler/package.json + +RUN yarn install --pure-lockfile + +ADD /packages/import-handler ./packages/import-handler +ADD /packages/readabilityjs ./packages/readabilityjs +RUN yarn workspace @omnivore/import-handler build + +# After building, fetch the production dependencies +RUN rm -rf /app/packages/import-handler/node_modules +RUN rm -rf /app/node_modules +RUN yarn install --pure-lockfile --production + +EXPOSE 8080 + +ENTRYPOINT ["yarn", "workspace", "@omnivore/import-handler", "start:collector"] diff --git a/packages/import-handler/package.json b/packages/import-handler/package.json index c425e3764..ec14ef577 100644 --- a/packages/import-handler/package.json +++ b/packages/import-handler/package.json @@ -12,9 +12,11 @@ "test": "yarn mocha -r ts-node/register --config mocha-config.json", "lint": "eslint src --ext ts,js,tsx,jsx", "compile": "tsc", - "build": "tsc", + "build": "tsc && yarn copy-files", "start": "functions-framework --target=importHandler", - "dev": "concurrently \"tsc -w\" \"nodemon --watch ./build/ --exec npm run start\"" + "dev": "concurrently \"tsc -w\" \"nodemon --watch ./build/ --exec npm run start\"", + "copy-files": "copyfiles src/luaScripts/*.lua build/", + "start:collector": "functions-framework --target=importMetricsCollector" }, "devDependencies": { "@types/chai": "^4.3.4", @@ -28,6 +30,7 @@ "@types/unzip-stream": "^0.3.1", "@types/urlsafe-base64": "^1.0.28", "@types/uuid": "^9.0.0", + "copyfiles": "^2.4.1", "eslint-plugin-prettier": "^4.0.0" }, "dependencies": { @@ -46,6 +49,7 @@ "jsonwebtoken": "^8.5.1", "linkedom": "^0.14.21", "nodemon": "^2.0.15", + "redis": "^4.3.1", "unzip-stream": "^0.3.1", "urlsafe-base64": "^1.0.0", "uuid": "^9.0.0" diff --git a/packages/import-handler/src/csv.ts b/packages/import-handler/src/csv.ts index 140cf00f0..70bb8b955 100644 --- a/packages/import-handler/src/csv.ts +++ b/packages/import-handler/src/csv.ts @@ -6,8 +6,12 @@ import { parse } from '@fast-csv/parse' import { Stream } from 'stream' import { ImportContext } from '.' +import { createMetrics, ImportStatus, updateMetrics } from './metrics' export const importCsv = async (ctx: ImportContext, stream: Stream) => { + // create metrics in redis + await createMetrics(ctx.redisClient, ctx.userId, ctx.taskId, 'csv-importer') + const parser = parse() stream.pipe(parser) for await (const row of parser) { @@ -23,11 +27,36 @@ export const importCsv = async (ctx: ImportContext, stream: Stream) => { .map((l) => l.trim()) .filter((l) => l !== '') : undefined + + // update total counter + await updateMetrics( + ctx.redisClient, + ctx.userId, + ctx.taskId, + ImportStatus.TOTAL + ) + await ctx.urlHandler(ctx, url, state, labels) + ctx.countImported += 1 + // update started counter + await updateMetrics( + ctx.redisClient, + ctx.userId, + ctx.taskId, + ImportStatus.STARTED + ) } catch (error) { console.log('invalid url', row, error) + ctx.countFailed += 1 + // update invalid counter + await updateMetrics( + ctx.redisClient, + ctx.userId, + ctx.taskId, + ImportStatus.INVALID + ) } } } diff --git a/packages/import-handler/src/index.ts b/packages/import-handler/src/index.ts index 89b469ad1..269ad463c 100644 --- a/packages/import-handler/src/index.ts +++ b/packages/import-handler/src/index.ts @@ -5,12 +5,18 @@ import axios from 'axios' import * as jwt from 'jsonwebtoken' import { Stream } from 'node:stream' import * as path from 'path' +import { createClient } from 'redis' import { promisify } from 'util' import { v4 as uuid } from 'uuid' import { importCsv } from './csv' import { importMatterArchive } from './matterHistory' +import { ImportStatus, updateMetrics } from './metrics' +import { createRedisClient } from './redis' import { CONTENT_FETCH_URL, createCloudTask, emailUserUrl } from './task' +// explicitly create the return type of RedisClient +type RedisClient = ReturnType + export enum ArticleSavingRequestStatus { Failed = 'FAILED', Processing = 'PROCESSING', @@ -51,10 +57,21 @@ export type ImportContext = { countFailed: number urlHandler: UrlHandler contentHandler: ContentHandler + redisClient: RedisClient + taskId: string } type importHandlerFunc = (ctx: ImportContext, stream: Stream) => Promise +interface UpdateMetricsRequest { + taskId: string + status: ImportStatus +} + +function isUpdateMetricsRequest(body: any): body is UpdateMetricsRequest { + return 'taskId' in body && 'status' in body +} + interface StorageEvent { name: string bucket: string @@ -79,6 +96,7 @@ const importURL = async ( userId: string, url: URL, source: string, + taskId: string, state?: ArticleSavingRequestStatus, labels?: string[] ): Promise => { @@ -118,14 +136,27 @@ const sendImportFailedEmail = async (userId: string) => { }) } -const sendImportCompletedEmail = async ( +export const sendImportStartedEmail = async ( userId: string, urlsEnqueued: number, urlsFailed: number ) => { return createEmailCloudTask(userId, { - subject: 'Your Omnivore import has completed processing', - body: `${urlsEnqueued} URLs have been processed and should be available in your library. ${urlsFailed} URLs failed to be parsed.`, + subject: 'Your Omnivore import has started', + body: `We have started processing ${urlsEnqueued} URLs. ${urlsFailed} URLs are invalid.`, + }) +} + +export const sendImportCompletedEmail = async ( + userId: string, + urlsImported: number, + urlsFailed: number +) => { + return createEmailCloudTask(userId, { + subject: 'Your Omnivore import has finished', + body: `We have finished processing ${ + urlsImported + urlsFailed + } URLs. ${urlsImported} URLs have been added to your library. ${urlsFailed} URLs failed to be parsed.`, }) } @@ -152,6 +183,7 @@ const urlHandler = async ( ctx.userId, url, 'csv-importer', + ctx.taskId, state, labels && labels.length > 0 ? labels : undefined ) @@ -222,7 +254,7 @@ const contentHandler = async ( return Promise.resolve() } -const handleEvent = async (data: StorageEvent) => { +const handleEvent = async (data: StorageEvent, redisClient: RedisClient) => { if (shouldHandle(data)) { const handler = handlerForFile(data.name) if (!handler) { @@ -253,12 +285,14 @@ const handleEvent = async (data: StorageEvent) => { countFailed: 0, urlHandler, contentHandler, + redisClient, + taskId: data.name, } await handler(ctx, stream) if (ctx.countImported > 0) { - await sendImportCompletedEmail(userId, ctx.countImported, ctx.countFailed) + await sendImportStartedEmail(userId, ctx.countImported, ctx.countFailed) } else { await sendImportFailedEmail(userId) } @@ -285,11 +319,19 @@ export const importHandler = Sentry.GCPFunction.wrapHttpFunction( const pubSubMessage = req.body.message.data as string const obj = getStorageEvent(pubSubMessage) if (obj) { + // create redis client + const redisClient = await createRedisClient( + process.env.REDIS_URL, + process.env.REDIS_CERT + ) try { - await handleEvent(obj) + await handleEvent(obj, redisClient) } catch (err) { console.log('error handling event', { err, obj }) throw err + } finally { + // close redis client + await redisClient.quit() } } } else { @@ -298,3 +340,44 @@ export const importHandler = Sentry.GCPFunction.wrapHttpFunction( res.send('ok') } ) + +export const importMetricsCollector = Sentry.GCPFunction.wrapHttpFunction( + async (req, res) => { + if (!process.env.JWT_SECRET) { + console.error('JWT_SECRET not exists') + return res.status(500).send({ errorCodes: 'JWT_SECRET_NOT_EXISTS' }) + } + const token = req.headers.authorization + if (!token) { + return res.status(401).send({ errorCode: 'INVALID_TOKEN' }) + } + + let userId: string + + try { + const decoded = jwt.verify(token, process.env.JWT_SECRET) as { + uid: string + } + userId = decoded.uid + } catch (e) { + console.error('Authentication error:', e) + return res.status(401).send({ errorCode: 'UNAUTHENTICATED' }) + } + + if (!isUpdateMetricsRequest(req.body)) { + console.log('Invalid request body') + return res.status(400).send('Bad Request') + } + + const redisClient = await createRedisClient( + process.env.REDIS_URL, + process.env.REDIS_CERT + ) + // update metrics + await updateMetrics(redisClient, userId, req.body.taskId, req.body.status) + + await redisClient.quit() + + res.send('ok') + } +) diff --git a/packages/import-handler/src/luaScripts/updateMetrics.lua b/packages/import-handler/src/luaScripts/updateMetrics.lua new file mode 100644 index 000000000..4e7b08f26 --- /dev/null +++ b/packages/import-handler/src/luaScripts/updateMetrics.lua @@ -0,0 +1,34 @@ +local key = tostring(KEYS[1]); +local status = tostring(ARGV[1]); +local timestamp = tonumber(ARGV[2]); + +-- increment the status counter +redis.call('HINCRBY', key, status, 1); + +if (status == "imported" or status == "failed") then + -- get the current metrics + local bulk = redis.call('HGETALL', key); + -- get the total, imported and failed counters + local result = {} + local nextkey + for i, v in ipairs(bulk) do + if i % 2 == 1 then + nextkey = v + else + result[nextkey] = v + end + end + + local imported = tonumber(result['imported']) or 0; + local failed = tonumber(result['failed']) or 0; + local total = tonumber(result['total']) or 0; + local state = tonumber(result['state']) or 0; + if (state == 0 and imported + failed >= total) then + -- all the records have been processed + -- update the metrics + redis.call('HSET', key, 'end_time', timestamp, 'state', 1); + return 1 + end +end + +return 0; diff --git a/packages/import-handler/src/matterHistory.ts b/packages/import-handler/src/matterHistory.ts index 6b626d9bc..378f9d51e 100644 --- a/packages/import-handler/src/matterHistory.ts +++ b/packages/import-handler/src/matterHistory.ts @@ -4,20 +4,19 @@ /* eslint-disable @typescript-eslint/no-unsafe-argument */ import { parse } from '@fast-csv/parse' -import { Stream } from 'stream' -import unzip from 'unzip-stream' +import { Readability } from '@omnivore/readability' +import crypto from 'crypto' +import createDOMPurify, { SanitizeElementHookEvent } from 'dompurify' import fs from 'fs' -import path from 'path' import * as fsExtra from 'fs-extra' import glob from 'glob' - import { parseHTML } from 'linkedom' -import { Readability } from '@omnivore/readability' -import createDOMPurify, { SanitizeElementHookEvent } from 'dompurify' - +import path from 'path' +import { Stream } from 'stream' +import unzip from 'unzip-stream' import { encode } from 'urlsafe-base64' -import crypto from 'crypto' import { ImportContext } from '.' +import { createMetrics, ImportStatus, updateMetrics } from './metrics' export type UrlHandler = (url: URL) => Promise @@ -36,8 +35,22 @@ export const importMatterHistoryCsv = async ( for await (const row of parser) { try { const url = new URL(row['URL']) + // update total counter + await updateMetrics( + ctx.redisClient, + ctx.userId, + ctx.taskId, + ImportStatus.TOTAL + ) await ctx.urlHandler(ctx, url) ctx.countImported += 1 + // update started counter + await updateMetrics( + ctx.redisClient, + ctx.userId, + ctx.taskId, + ImportStatus.STARTED + ) } catch (error) { console.log('invalid url', row, error) ctx.countFailed += 1 @@ -204,6 +217,13 @@ const handleMatterHistoryRow = async ( if (!url) { ctx.countFailed += 1 + // update failed counter + await updateMetrics( + ctx.redisClient, + ctx.userId, + ctx.taskId, + ImportStatus.FAILED + ) return } @@ -232,6 +252,14 @@ export const importMatterArchive = async ( const archiveDir = await unarchive(stream) try { + // create metrics in redis + await createMetrics( + ctx.redisClient, + ctx.userId, + ctx.taskId, + 'matter-importer' + ) + const historyFile = path.join(archiveDir, '_matter_history.csv') const parser = parse({ @@ -243,11 +271,34 @@ export const importMatterArchive = async ( for await (const row of parser) { try { + // update total metrics + await updateMetrics( + ctx.redisClient, + ctx.userId, + ctx.taskId, + ImportStatus.TOTAL + ) + await handleMatterHistoryRow(ctx, archiveDir, row) + ctx.countImported += 1 + // update started metrics + await updateMetrics( + ctx.redisClient, + ctx.userId, + ctx.taskId, + ImportStatus.STARTED + ) } catch (error) { console.log('invalid url', row, error) ctx.countFailed += 1 + // update failed metrics + await updateMetrics( + ctx.redisClient, + ctx.userId, + ctx.taskId, + ImportStatus.FAILED + ) } } } catch (err) { diff --git a/packages/import-handler/src/metrics.ts b/packages/import-handler/src/metrics.ts new file mode 100644 index 000000000..1fef1f7f8 --- /dev/null +++ b/packages/import-handler/src/metrics.ts @@ -0,0 +1,104 @@ +import { createClient } from 'redis' +import { sendImportCompletedEmail } from '.' +import { lua } from './redis' + +// explicitly create the return type of RedisClient +type RedisClient = ReturnType + +export enum ImportStatus { + STARTED = 'started', + INVALID = 'invalid', + IMPORTED = 'imported', + FAILED = 'failed', + TOTAL = 'total', +} + +enum ImportTaskState { + STARTED, + FINISHED, +} + +interface ImportMetrics { + started: number + invalid: number + imported: number + failed: number + total: number + source: string + state: ImportTaskState + startTime: number + endTime: number +} + +export const createMetrics = async ( + redisClient: RedisClient, + userId: string, + taskId: string, + source: string +) => { + const key = `import:${userId}:${taskId}` + try { + // set multiple fields + await redisClient.hSet(key, { + ['start_time']: Date.now(), + ['source']: source, + ['state']: ImportTaskState.STARTED, + }) + } catch (error) { + console.error('Redis Error', error) + } +} + +export const updateMetrics = async ( + redisClient: RedisClient, + userId: string, + taskId: string, + status: ImportStatus +) => { + const key = `import:${userId}:${taskId}` + + try { + // use lua script to increment hash field + const state = await redisClient.evalSha(lua.sha, { + keys: [key], + arguments: [status, Date.now().toString()], + }) + + // if the task is finished, send email + if (state == ImportTaskState.FINISHED) { + const metrics = await getMetrics(redisClient, userId, taskId) + if (metrics) { + await sendImportCompletedEmail(userId, metrics.imported, metrics.failed) + } + } + } catch (error) { + console.error('Redis Error', error) + } +} + +export const getMetrics = async ( + redisClient: RedisClient, + userId: string, + taskId: string +): Promise => { + const key = `import:${userId}:${taskId}` + try { + const metrics = await redisClient.hGetAll(key) + + return { + // convert to integer + started: parseInt(metrics.started, 10), + invalid: parseInt(metrics.invalid, 10), + imported: parseInt(metrics.imported, 10), + failed: parseInt(metrics.failed, 10), + total: parseInt(metrics.total, 10), + source: metrics.source, + state: parseInt(metrics.state, 10), + startTime: parseInt(metrics.start_time, 10), + endTime: parseInt(metrics.end_time, 10), + } + } catch (error) { + console.error('Redis Error', error) + return null + } +} diff --git a/packages/import-handler/src/redis.ts b/packages/import-handler/src/redis.ts new file mode 100644 index 000000000..481f136b9 --- /dev/null +++ b/packages/import-handler/src/redis.ts @@ -0,0 +1,41 @@ +import { readFileSync } from 'fs' +import path from 'path' +import { createClient } from 'redis' + +// load lua script +export const lua = { + script: readFileSync( + path.resolve(__dirname, 'luaScripts/updateMetrics.lua'), + 'utf8' + ), + sha: '', +} + +export const createRedisClient = async (url?: string, cert?: string) => { + const redisClient = createClient({ + url, + socket: { + tls: url?.startsWith('rediss://'), // rediss:// is the protocol for TLS + cert: cert?.replace(/\\n/g, '\n'), // replace \n with new line + rejectUnauthorized: false, // for self-signed certs + connectTimeout: 10000, // 10 seconds + reconnectStrategy(retries: number): number | Error { + if (retries > 10) { + return new Error('Retries exhausted') + } + return 1000 + }, + }, + }) + + redisClient.on('error', (err) => console.error('Redis Client Error', err)) + + await redisClient.connect() + console.log('Redis Client Connected:', url) + + // load script to redis + lua.sha = await redisClient.scriptLoad(lua.script) + console.log('Redis Lua Script Loaded', lua.sha) + + return redisClient +} diff --git a/packages/import-handler/test/csv/csv.test.ts b/packages/import-handler/test/csv/csv.test.ts index f045a0c96..82798fa89 100644 --- a/packages/import-handler/test/csv/csv.test.ts +++ b/packages/import-handler/test/csv/csv.test.ts @@ -9,129 +9,134 @@ import { stubImportCtx } from '../util' chai.use(chaiString) -describe('Load a simple CSV file', () => { - it('should call the handler for each URL', async () => { - const urls: URL[] = [] - const stream = fs.createReadStream('./test/csv/data/simple.csv') - const stub = stubImportCtx() - stub.urlHandler = (ctx: ImportContext, url): Promise => { - urls.push(url) - return Promise.resolve() - } +describe('Test csv importer', () => { + let stub: ImportContext - await importCsv(stub, stream) - expect(stub.countFailed).to.equal(0) - expect(stub.countImported).to.equal(2) - expect(urls).to.eql([ - new URL('https://omnivore.app'), - new URL('https://google.com'), - ]) + beforeEach(async () => { + stub = await stubImportCtx() }) - it('increments the failed count when the URL is invalid', async () => { - const urls: URL[] = [] - const stream = fs.createReadStream('./test/csv/data/simple.csv') - const stub = stubImportCtx() - stub.urlHandler = (ctx: ImportContext, url): Promise => { - urls.push(url) - return Promise.reject('Failed to import url') - } - - await importCsv(stub, stream) - expect(stub.countFailed).to.equal(2) - expect(stub.countImported).to.equal(0) + afterEach(async () => { + await stub.redisClient.quit() }) -}) -describe('Load a complex CSV file', () => { - it('should call the handler for each URL, state and labels', async () => { - const results: { - url: URL - state?: ArticleSavingRequestStatus - labels?: string[] - }[] = [] - const stream = fs.createReadStream('./test/csv/data/complex.csv') - const stub = stubImportCtx() - stub.urlHandler = ( - ctx: ImportContext, - url, - state, - labels - ): Promise => { - results.push({ + describe('Load a simple CSV file', () => { + it('should call the handler for each URL', async () => { + const urls: URL[] = [] + const stream = fs.createReadStream('./test/csv/data/simple.csv') + stub.urlHandler = (ctx: ImportContext, url): Promise => { + urls.push(url) + return Promise.resolve() + } + + await importCsv(stub, stream) + expect(stub.countFailed).to.equal(0) + expect(stub.countImported).to.equal(2) + expect(urls).to.eql([ + new URL('https://omnivore.app'), + new URL('https://google.com'), + ]) + }) + + it('increments the failed count when the URL is invalid', async () => { + const stream = fs.createReadStream('./test/csv/data/simple.csv') + stub.urlHandler = (ctx: ImportContext, url): Promise => { + return Promise.reject('Failed to import url') + } + + await importCsv(stub, stream) + expect(stub.countFailed).to.equal(2) + expect(stub.countImported).to.equal(0) + }) + }) + + describe('Load a complex CSV file', () => { + it('should call the handler for each URL, state and labels', async () => { + const results: { + url: URL + state?: ArticleSavingRequestStatus + labels?: string[] + }[] = [] + const stream = fs.createReadStream('./test/csv/data/complex.csv') + stub.urlHandler = ( + ctx: ImportContext, url, state, - labels, - }) - return Promise.resolve() - } + labels + ): Promise => { + results.push({ + url, + state, + labels, + }) + return Promise.resolve() + } - await importCsv(stub, stream) - expect(stub.countFailed).to.equal(0) - expect(stub.countImported).to.equal(3) - expect(results).to.eql([ - { - url: new URL('https://omnivore.app'), - state: 'ARCHIVED', - labels: ['test'], - }, - { - url: new URL('https://google.com'), - state: 'SUCCEEDED', - labels: ['test', 'development'], - }, - { - url: new URL('https://test.com'), - state: 'SUCCEEDED', - labels: ['test', 'development'], - }, - ]) - }) -}) - -describe('A file with no status set', () => { - it('should not try to set status', async () => { - const states: (ArticleSavingRequestStatus | undefined)[] = [] - const stream = fs.createReadStream('./test/csv/data/unset-status.csv') - const stub = stubImportCtx() - stub.urlHandler = ( - ctx: ImportContext, - url, - state?: ArticleSavingRequestStatus - ): Promise => { - states.push(state) - return Promise.resolve() - } - - await importCsv(stub, stream) - expect(stub.countFailed).to.equal(0) - expect(stub.countImported).to.equal(2) - expect(states).to.eql([undefined, ArticleSavingRequestStatus.Archived]) - }) -}) - -describe('A file with some labels', () => { - it('gets the labels, handles empty, and trims extra whitespace', async () => { - const importedLabels: (string[] | undefined)[] = [] - const stream = fs.createReadStream('./test/csv/data/labels.csv') - const stub = stubImportCtx() - stub.urlHandler = ( - ctx: ImportContext, - url, - state?: ArticleSavingRequestStatus, - labels?: string[] - ): Promise => { - importedLabels.push(labels) - return Promise.resolve() - } - - await importCsv(stub, stream) - expect(stub.countFailed).to.equal(0) - expect(stub.countImported).to.equal(3) - expect(importedLabels).to.eql([ - ['Label1', 'Label2', 'Label 3', 'Label 4'], - [], - [], - ]) + await importCsv(stub, stream) + expect(stub.countFailed).to.equal(0) + expect(stub.countImported).to.equal(3) + expect(results).to.eql([ + { + url: new URL('https://omnivore.app'), + state: 'ARCHIVED', + labels: ['test'], + }, + { + url: new URL('https://google.com'), + state: 'SUCCEEDED', + labels: ['test', 'development'], + }, + { + url: new URL('https://test.com'), + state: 'SUCCEEDED', + labels: ['test', 'development'], + }, + ]) + }) + }) + + describe('A file with no status set', () => { + it('should not try to set status', async () => { + const states: (ArticleSavingRequestStatus | undefined)[] = [] + const stream = fs.createReadStream('./test/csv/data/unset-status.csv') + stub.urlHandler = ( + ctx: ImportContext, + url, + state?: ArticleSavingRequestStatus + ): Promise => { + states.push(state) + return Promise.resolve() + } + + await importCsv(stub, stream) + expect(stub.countFailed).to.equal(0) + expect(stub.countImported).to.equal(2) + expect(states).to.eql([undefined, ArticleSavingRequestStatus.Archived]) + }) + }) + + describe('A file with some labels', () => { + it('gets the labels, handles empty, and trims extra whitespace', async () => { + const importedLabels: (string[] | undefined)[] = [] + const stream = fs.createReadStream('./test/csv/data/labels.csv') + stub.urlHandler = ( + ctx: ImportContext, + url, + state?: ArticleSavingRequestStatus, + labels?: string[] + ): Promise => { + importedLabels.push(labels) + return Promise.resolve() + } + + await importCsv(stub, stream) + expect(stub.countFailed).to.equal(0) + expect(stub.countImported).to.equal(3) + expect(importedLabels).to.eql([ + ['Label1', 'Label2', 'Label 3', 'Label 4'], + [], + [], + ]) + }) }) }) diff --git a/packages/import-handler/test/matter/matter_importer.test.ts b/packages/import-handler/test/matter/matter_importer.test.ts index 90600b16a..94017e634 100644 --- a/packages/import-handler/test/matter/matter_importer.test.ts +++ b/packages/import-handler/test/matter/matter_importer.test.ts @@ -1,15 +1,15 @@ -import 'mocha' +import { Readability } from '@omnivore/readability' import * as chai from 'chai' import { expect } from 'chai' import chaiString from 'chai-string' import * as fs from 'fs' +import 'mocha' +import { ImportContext } from '../../src' import { importMatterArchive, importMatterHistoryCsv, } from '../../src/matterHistory' import { stubImportCtx } from '../util' -import { ImportContext } from '../../src' -import { Readability } from '@omnivore/readability' chai.use(chaiString) @@ -17,7 +17,7 @@ describe('Load a simple _matter_history file', () => { it('should find the URL of each row', async () => { const urls: URL[] = [] const stream = fs.createReadStream('./test/matter/data/_matter_history.csv') - const stub = stubImportCtx() + const stub = await stubImportCtx() stub.urlHandler = (ctx: ImportContext, url): Promise => { urls.push(url) return Promise.resolve() @@ -29,6 +29,8 @@ describe('Load a simple _matter_history file', () => { expect(urls).to.eql([ new URL('https://www.bloomberg.com/features/2022-the-crypto-story/'), ]) + + await stub.redisClient.quit() }) }) @@ -36,7 +38,7 @@ describe('Load archive file', () => { it('should find the URL of each row', async () => { const urls: URL[] = [] const stream = fs.createReadStream('./test/matter/data/Archive.zip') - const stub = stubImportCtx() + const stub = await stubImportCtx() stub.contentHandler = ( ctx: ImportContext, url: URL, @@ -54,5 +56,7 @@ describe('Load archive file', () => { expect(urls).to.eql([ new URL('https://www.bloomberg.com/features/2022-the-crypto-story/'), ]) + + await stub.redisClient.quit() }) }) diff --git a/packages/import-handler/test/util.ts b/packages/import-handler/test/util.ts index a60564ae2..54a8b5082 100644 --- a/packages/import-handler/test/util.ts +++ b/packages/import-handler/test/util.ts @@ -1,7 +1,10 @@ import { Readability } from '@omnivore/readability' import { ArticleSavingRequestStatus, ImportContext } from '../src' +import { createRedisClient } from '../src/redis' + +export const stubImportCtx = async (): Promise => { + const redisClient = await createRedisClient(process.env.REDIS_URL) -export const stubImportCtx = () => { return { userId: '', countImported: 0, @@ -23,5 +26,7 @@ export const stubImportCtx = () => { ): Promise => { return Promise.resolve() }, + redisClient, + taskId: '', } } diff --git a/packages/import-handler/tsconfig.json b/packages/import-handler/tsconfig.json index ea8c4d3ef..912ebd323 100644 --- a/packages/import-handler/tsconfig.json +++ b/packages/import-handler/tsconfig.json @@ -1,6 +1,6 @@ { "extends": "./../../tsconfig.json", - "ts-node": { "files": true }, + "ts-node": { "files": true }, "compilerOptions": { "outDir": "build", "rootDir": ".", @@ -8,5 +8,5 @@ // Generate d.ts files "declaration": true }, - "include": ["src", "test"] + "include": ["src/**/*", "test/**/*"] } diff --git a/packages/puppeteer-parse/index.js b/packages/puppeteer-parse/index.js index 960a8e735..7c5bda58a 100644 --- a/packages/puppeteer-parse/index.js +++ b/packages/puppeteer-parse/index.js @@ -44,6 +44,8 @@ const NON_SCRIPT_HOSTS= ['medium.com', 'fastcompany.com']; const ALLOWED_CONTENT_TYPES = ['text/html', 'application/octet-stream', 'text/plain', 'application/pdf']; +const IMPORTER_METRICS_COLLECTOR_URL = process.env.IMPORTER_METRICS_COLLECTOR_URL; + const userAgentForUrl = (url) => { try { const u = new URL(url); @@ -59,18 +61,24 @@ const userAgentForUrl = (url) => { }; const fetchContentWithScrapingBee = async (url) => { - const response = await axios.get('https://app.scrapingbee.com/api/v1', { - params: { - 'api_key': process.env.SCRAPINGBEE_API_KEY, - 'url': url, - 'render_js': 'false', - 'premium_proxy': 'true', - 'country_code':'us' - } - }) + try { + const response = await axios.get('https://app.scrapingbee.com/api/v1', { + params: { + 'api_key': process.env.SCRAPINGBEE_API_KEY, + 'url': url, + 'render_js': 'false', + 'premium_proxy': 'true', + 'country_code':'us' + } + }) + + const dom = parseHTML(response.data).document; + return { title: dom.title, domContent: dom.documentElement.outerHTML, url } + } catch (e) { + console.error('error fetching with scrapingbee', e.message) - const dom = parseHTML(response.data).document; - return { title: dom.title, domContent: dom.documentElement.outerHTML, url: url } + return { title: url, domContent: '', url } + } } const enableJavascriptForUrl = (url) => { @@ -242,6 +250,27 @@ const saveUploadedPdf = async (userId, url, uploadFileId, articleSavingRequestId ); }; +const sendImportStatusUpdate = async (userId, taskId, status) => { + try { + const auth = await signToken({ uid: userId }, process.env.JWT_SECRET); + + await axios.post( + IMPORTER_METRICS_COLLECTOR_URL, + { + taskId, + status, + }, + { + headers: { + 'Authorization': auth, + 'Content-Type': 'application/json', + }, + }); + } catch (e) { + console.error('Error while sending import status update', e); + } +}; + async function fetchContent(req, res) { let functionStartTime = Date.now(); @@ -250,16 +279,19 @@ async function fetchContent(req, res) { const articleSavingRequestId = (req.query ? req.query.saveRequestId : undefined) || (req.body ? req.body.saveRequestId : undefined); const state = req.body.state const labels = req.body.labels + const source = req.body.source || 'parseContent'; + const taskId = req.body.taskId; // taskId is used to update import status let logRecord = { url, userId, articleSavingRequestId, labels: { - source: 'parseContent', + source, }, state, - labelsToAdd: labels + labelsToAdd: labels, + taskId: taskId, }; console.info(`Article parsing request`, logRecord); @@ -271,7 +303,7 @@ async function fetchContent(req, res) { } // pre handle url with custom handlers - let title, content, contentType; + let title, content, contentType, importStatus; try { const browser = await getBrowserPromise; const result = await preHandleContent(url, browser); @@ -320,19 +352,16 @@ async function fetchContent(req, res) { let readabilityResult = null; if (content) { - let document = parseHTML(content).document; + const document = parseHTML(content).document; // preParse content - const preParsedDom = await preParseContent(url, document) - if (preParsedDom) { - document = preParsedDom - } + const preParsedDom = (await preParseContent(url, document)) || document; - readabilityResult = await getReadabilityResult(url, document); + readabilityResult = await getReadabilityResult(url, preParsedDom); } const apiResponse = await sendSavePageMutation(userId, { - url: finalUrl, + url, clientRequestId: articleSavingRequestId, title, originalContent: content, @@ -344,14 +373,16 @@ async function fetchContent(req, res) { logRecord.totalTime = Date.now() - functionStartTime; logRecord.result = apiResponse.createArticle; } + + importStatus = 'imported'; } catch (e) { logRecord.error = e.message; console.error(`Error while retrieving page`, logRecord); // fallback to scrapingbee const sbResult = await fetchContentWithScrapingBee(url); - const sbUrl = finalUrl || sbResult.url; const content = sbResult.domContent; + const title = sbResult.title; logRecord.fetchContentTime = Date.now() - functionStartTime; let readabilityResult = null; @@ -359,7 +390,7 @@ async function fetchContent(req, res) { let document = parseHTML(content).document; // preParse content - const preParsedDom = await preParseContent(sbUrl, document) + const preParsedDom = await preParseContent(url, document) if (preParsedDom) { document = preParsedDom } @@ -368,7 +399,7 @@ async function fetchContent(req, res) { } const apiResponse = await sendSavePageMutation(userId, { - url: finalUrl, + url, clientRequestId: articleSavingRequestId, title, originalContent: content, @@ -379,14 +410,21 @@ async function fetchContent(req, res) { logRecord.totalTime = Date.now() - functionStartTime; logRecord.result = apiResponse.createArticle; + + importStatus = 'failed'; } finally { if (context) { await context.close(); } console.info(`parse-page`, logRecord); - } - return res.sendStatus(200); + // send import status to update the metrics + if (taskId) { + await sendImportStatusUpdate(userId, taskId, importStatus); + } + + res.sendStatus(200); + } } function validateUrlString(url) {