Merge pull request #2295 from omnivore-app/fix/twitter-handler-timeout
Feature: importer metrics collector and fix puppeteer timeout issue
This commit is contained in:
10
.github/workflows/run-tests.yaml
vendored
10
.github/workflows/run-tests.yaml
vendored
@ -42,6 +42,15 @@ jobs:
|
||||
--health-retries 10
|
||||
ports:
|
||||
- 9200
|
||||
redis:
|
||||
image: redis
|
||||
options: >-
|
||||
--health-cmd "redis-cli ping"
|
||||
--health-interval 10s
|
||||
--health-timeout 5s
|
||||
--health-retries 10
|
||||
ports:
|
||||
- 6379
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
with:
|
||||
@ -89,6 +98,7 @@ jobs:
|
||||
PG_DB: omnivore_test
|
||||
PG_POOL_MAX: 10
|
||||
ELASTIC_URL: http://localhost:${{ job.services.elastic.ports[9200] }}/
|
||||
REDIS_URL: redis://localhost:${{ job.services.redis.ports[6379] }}
|
||||
build-docker-images:
|
||||
name: Build docker images
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
@ -207,6 +207,16 @@ export const parsePreparedContent = async (
|
||||
let highlightData = undefined
|
||||
const { document, pageInfo } = preparedDocument
|
||||
|
||||
if (!document) {
|
||||
console.log('No document')
|
||||
return {
|
||||
canonicalUrl: url,
|
||||
parsedContent: null,
|
||||
domContent: '',
|
||||
pageType: PageType.Unknown,
|
||||
}
|
||||
}
|
||||
|
||||
// Checking for content type acceptance or if there are no contentType
|
||||
// at all (backward extension versions compatibility)
|
||||
if (
|
||||
@ -222,14 +232,15 @@ export const parsePreparedContent = async (
|
||||
}
|
||||
}
|
||||
|
||||
let dom = parseHTML(document).document
|
||||
let dom: Document | null = null
|
||||
|
||||
try {
|
||||
dom = parseHTML(document).document
|
||||
|
||||
if (!article) {
|
||||
// Attempt to parse the article
|
||||
// preParse content
|
||||
const preParsedDom = await preParseContent(url, dom)
|
||||
preParsedDom && (dom = preParsedDom)
|
||||
dom = (await preParseContent(url, dom)) || dom
|
||||
|
||||
article = await getReadabilityResult(url, document, dom, isNewsletter)
|
||||
}
|
||||
@ -260,7 +271,7 @@ export const parsePreparedContent = async (
|
||||
codeBlocks.forEach((e) => {
|
||||
if (e.textContent) {
|
||||
const att = hljs.highlightAuto(e.textContent)
|
||||
const code = dom.createElement('code')
|
||||
const code = articleDom.createElement('code')
|
||||
const langClass =
|
||||
`hljs language-${att.language}` +
|
||||
(att.second_best?.language
|
||||
@ -356,7 +367,7 @@ export const parsePreparedContent = async (
|
||||
domContent: document,
|
||||
parsedContent: article,
|
||||
canonicalUrl,
|
||||
pageType: parseOriginalContent(dom),
|
||||
pageType: dom ? parseOriginalContent(dom) : PageType.Unknown,
|
||||
highlightData,
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,9 +1,9 @@
|
||||
import { ContentHandler, PreHandleResult } from '../content-handler'
|
||||
import axios from 'axios'
|
||||
import { DateTime } from 'luxon'
|
||||
import _ from 'underscore'
|
||||
import { truncate } from 'lodash'
|
||||
import { DateTime } from 'luxon'
|
||||
import { Browser, BrowserContext } from 'puppeteer-core'
|
||||
import _ from 'underscore'
|
||||
import { ContentHandler, PreHandleResult } from '../content-handler'
|
||||
|
||||
interface TweetIncludes {
|
||||
users: {
|
||||
@ -219,6 +219,7 @@ const getTweetIds = async (
|
||||
|
||||
await page.goto(pageURL, {
|
||||
waitUntil: 'networkidle0',
|
||||
timeout: 60000, // 60 seconds
|
||||
})
|
||||
|
||||
return (await page.evaluate(async (author) => {
|
||||
@ -287,7 +288,8 @@ const getTweetIds = async (
|
||||
return Array.from(ids)
|
||||
}, author)) as string[]
|
||||
} catch (error) {
|
||||
console.log(error)
|
||||
console.error('Error getting tweets', error)
|
||||
|
||||
return []
|
||||
} finally {
|
||||
if (context) {
|
||||
|
||||
31
packages/import-handler/Dockerfile-collector
Normal file
31
packages/import-handler/Dockerfile-collector
Normal file
@ -0,0 +1,31 @@
|
||||
FROM node:14.18-alpine
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
ENV PUPPETEER_SKIP_CHROMIUM_DOWNLOAD true
|
||||
RUN apk add g++ make python3
|
||||
|
||||
ENV PORT 8080
|
||||
|
||||
COPY package.json .
|
||||
COPY yarn.lock .
|
||||
COPY tsconfig.json .
|
||||
COPY .eslintrc .
|
||||
|
||||
COPY /packages/readabilityjs/package.json ./packages/readabilityjs/package.json
|
||||
COPY /packages/import-handler/package.json ./packages/import-handler/package.json
|
||||
|
||||
RUN yarn install --pure-lockfile
|
||||
|
||||
ADD /packages/import-handler ./packages/import-handler
|
||||
ADD /packages/readabilityjs ./packages/readabilityjs
|
||||
RUN yarn workspace @omnivore/import-handler build
|
||||
|
||||
# After building, fetch the production dependencies
|
||||
RUN rm -rf /app/packages/import-handler/node_modules
|
||||
RUN rm -rf /app/node_modules
|
||||
RUN yarn install --pure-lockfile --production
|
||||
|
||||
EXPOSE 8080
|
||||
|
||||
ENTRYPOINT ["yarn", "workspace", "@omnivore/import-handler", "start:collector"]
|
||||
@ -12,9 +12,11 @@
|
||||
"test": "yarn mocha -r ts-node/register --config mocha-config.json",
|
||||
"lint": "eslint src --ext ts,js,tsx,jsx",
|
||||
"compile": "tsc",
|
||||
"build": "tsc",
|
||||
"build": "tsc && yarn copy-files",
|
||||
"start": "functions-framework --target=importHandler",
|
||||
"dev": "concurrently \"tsc -w\" \"nodemon --watch ./build/ --exec npm run start\""
|
||||
"dev": "concurrently \"tsc -w\" \"nodemon --watch ./build/ --exec npm run start\"",
|
||||
"copy-files": "copyfiles src/luaScripts/*.lua build/",
|
||||
"start:collector": "functions-framework --target=importMetricsCollector"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/chai": "^4.3.4",
|
||||
@ -28,6 +30,7 @@
|
||||
"@types/unzip-stream": "^0.3.1",
|
||||
"@types/urlsafe-base64": "^1.0.28",
|
||||
"@types/uuid": "^9.0.0",
|
||||
"copyfiles": "^2.4.1",
|
||||
"eslint-plugin-prettier": "^4.0.0"
|
||||
},
|
||||
"dependencies": {
|
||||
@ -46,6 +49,7 @@
|
||||
"jsonwebtoken": "^8.5.1",
|
||||
"linkedom": "^0.14.21",
|
||||
"nodemon": "^2.0.15",
|
||||
"redis": "^4.3.1",
|
||||
"unzip-stream": "^0.3.1",
|
||||
"urlsafe-base64": "^1.0.0",
|
||||
"uuid": "^9.0.0"
|
||||
|
||||
@ -6,8 +6,12 @@
|
||||
import { parse } from '@fast-csv/parse'
|
||||
import { Stream } from 'stream'
|
||||
import { ImportContext } from '.'
|
||||
import { createMetrics, ImportStatus, updateMetrics } from './metrics'
|
||||
|
||||
export const importCsv = async (ctx: ImportContext, stream: Stream) => {
|
||||
// create metrics in redis
|
||||
await createMetrics(ctx.redisClient, ctx.userId, ctx.taskId, 'csv-importer')
|
||||
|
||||
const parser = parse()
|
||||
stream.pipe(parser)
|
||||
for await (const row of parser) {
|
||||
@ -23,11 +27,36 @@ export const importCsv = async (ctx: ImportContext, stream: Stream) => {
|
||||
.map((l) => l.trim())
|
||||
.filter((l) => l !== '')
|
||||
: undefined
|
||||
|
||||
// update total counter
|
||||
await updateMetrics(
|
||||
ctx.redisClient,
|
||||
ctx.userId,
|
||||
ctx.taskId,
|
||||
ImportStatus.TOTAL
|
||||
)
|
||||
|
||||
await ctx.urlHandler(ctx, url, state, labels)
|
||||
|
||||
ctx.countImported += 1
|
||||
// update started counter
|
||||
await updateMetrics(
|
||||
ctx.redisClient,
|
||||
ctx.userId,
|
||||
ctx.taskId,
|
||||
ImportStatus.STARTED
|
||||
)
|
||||
} catch (error) {
|
||||
console.log('invalid url', row, error)
|
||||
|
||||
ctx.countFailed += 1
|
||||
// update invalid counter
|
||||
await updateMetrics(
|
||||
ctx.redisClient,
|
||||
ctx.userId,
|
||||
ctx.taskId,
|
||||
ImportStatus.INVALID
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -5,12 +5,18 @@ import axios from 'axios'
|
||||
import * as jwt from 'jsonwebtoken'
|
||||
import { Stream } from 'node:stream'
|
||||
import * as path from 'path'
|
||||
import { createClient } from 'redis'
|
||||
import { promisify } from 'util'
|
||||
import { v4 as uuid } from 'uuid'
|
||||
import { importCsv } from './csv'
|
||||
import { importMatterArchive } from './matterHistory'
|
||||
import { ImportStatus, updateMetrics } from './metrics'
|
||||
import { createRedisClient } from './redis'
|
||||
import { CONTENT_FETCH_URL, createCloudTask, emailUserUrl } from './task'
|
||||
|
||||
// explicitly create the return type of RedisClient
|
||||
type RedisClient = ReturnType<typeof createClient>
|
||||
|
||||
export enum ArticleSavingRequestStatus {
|
||||
Failed = 'FAILED',
|
||||
Processing = 'PROCESSING',
|
||||
@ -51,10 +57,21 @@ export type ImportContext = {
|
||||
countFailed: number
|
||||
urlHandler: UrlHandler
|
||||
contentHandler: ContentHandler
|
||||
redisClient: RedisClient
|
||||
taskId: string
|
||||
}
|
||||
|
||||
type importHandlerFunc = (ctx: ImportContext, stream: Stream) => Promise<void>
|
||||
|
||||
interface UpdateMetricsRequest {
|
||||
taskId: string
|
||||
status: ImportStatus
|
||||
}
|
||||
|
||||
function isUpdateMetricsRequest(body: any): body is UpdateMetricsRequest {
|
||||
return 'taskId' in body && 'status' in body
|
||||
}
|
||||
|
||||
interface StorageEvent {
|
||||
name: string
|
||||
bucket: string
|
||||
@ -79,6 +96,7 @@ const importURL = async (
|
||||
userId: string,
|
||||
url: URL,
|
||||
source: string,
|
||||
taskId: string,
|
||||
state?: ArticleSavingRequestStatus,
|
||||
labels?: string[]
|
||||
): Promise<string | undefined> => {
|
||||
@ -118,14 +136,27 @@ const sendImportFailedEmail = async (userId: string) => {
|
||||
})
|
||||
}
|
||||
|
||||
const sendImportCompletedEmail = async (
|
||||
export const sendImportStartedEmail = async (
|
||||
userId: string,
|
||||
urlsEnqueued: number,
|
||||
urlsFailed: number
|
||||
) => {
|
||||
return createEmailCloudTask(userId, {
|
||||
subject: 'Your Omnivore import has completed processing',
|
||||
body: `${urlsEnqueued} URLs have been processed and should be available in your library. ${urlsFailed} URLs failed to be parsed.`,
|
||||
subject: 'Your Omnivore import has started',
|
||||
body: `We have started processing ${urlsEnqueued} URLs. ${urlsFailed} URLs are invalid.`,
|
||||
})
|
||||
}
|
||||
|
||||
export const sendImportCompletedEmail = async (
|
||||
userId: string,
|
||||
urlsImported: number,
|
||||
urlsFailed: number
|
||||
) => {
|
||||
return createEmailCloudTask(userId, {
|
||||
subject: 'Your Omnivore import has finished',
|
||||
body: `We have finished processing ${
|
||||
urlsImported + urlsFailed
|
||||
} URLs. ${urlsImported} URLs have been added to your library. ${urlsFailed} URLs failed to be parsed.`,
|
||||
})
|
||||
}
|
||||
|
||||
@ -152,6 +183,7 @@ const urlHandler = async (
|
||||
ctx.userId,
|
||||
url,
|
||||
'csv-importer',
|
||||
ctx.taskId,
|
||||
state,
|
||||
labels && labels.length > 0 ? labels : undefined
|
||||
)
|
||||
@ -222,7 +254,7 @@ const contentHandler = async (
|
||||
return Promise.resolve()
|
||||
}
|
||||
|
||||
const handleEvent = async (data: StorageEvent) => {
|
||||
const handleEvent = async (data: StorageEvent, redisClient: RedisClient) => {
|
||||
if (shouldHandle(data)) {
|
||||
const handler = handlerForFile(data.name)
|
||||
if (!handler) {
|
||||
@ -253,12 +285,14 @@ const handleEvent = async (data: StorageEvent) => {
|
||||
countFailed: 0,
|
||||
urlHandler,
|
||||
contentHandler,
|
||||
redisClient,
|
||||
taskId: data.name,
|
||||
}
|
||||
|
||||
await handler(ctx, stream)
|
||||
|
||||
if (ctx.countImported > 0) {
|
||||
await sendImportCompletedEmail(userId, ctx.countImported, ctx.countFailed)
|
||||
await sendImportStartedEmail(userId, ctx.countImported, ctx.countFailed)
|
||||
} else {
|
||||
await sendImportFailedEmail(userId)
|
||||
}
|
||||
@ -285,11 +319,19 @@ export const importHandler = Sentry.GCPFunction.wrapHttpFunction(
|
||||
const pubSubMessage = req.body.message.data as string
|
||||
const obj = getStorageEvent(pubSubMessage)
|
||||
if (obj) {
|
||||
// create redis client
|
||||
const redisClient = await createRedisClient(
|
||||
process.env.REDIS_URL,
|
||||
process.env.REDIS_CERT
|
||||
)
|
||||
try {
|
||||
await handleEvent(obj)
|
||||
await handleEvent(obj, redisClient)
|
||||
} catch (err) {
|
||||
console.log('error handling event', { err, obj })
|
||||
throw err
|
||||
} finally {
|
||||
// close redis client
|
||||
await redisClient.quit()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -298,3 +340,44 @@ export const importHandler = Sentry.GCPFunction.wrapHttpFunction(
|
||||
res.send('ok')
|
||||
}
|
||||
)
|
||||
|
||||
export const importMetricsCollector = Sentry.GCPFunction.wrapHttpFunction(
|
||||
async (req, res) => {
|
||||
if (!process.env.JWT_SECRET) {
|
||||
console.error('JWT_SECRET not exists')
|
||||
return res.status(500).send({ errorCodes: 'JWT_SECRET_NOT_EXISTS' })
|
||||
}
|
||||
const token = req.headers.authorization
|
||||
if (!token) {
|
||||
return res.status(401).send({ errorCode: 'INVALID_TOKEN' })
|
||||
}
|
||||
|
||||
let userId: string
|
||||
|
||||
try {
|
||||
const decoded = jwt.verify(token, process.env.JWT_SECRET) as {
|
||||
uid: string
|
||||
}
|
||||
userId = decoded.uid
|
||||
} catch (e) {
|
||||
console.error('Authentication error:', e)
|
||||
return res.status(401).send({ errorCode: 'UNAUTHENTICATED' })
|
||||
}
|
||||
|
||||
if (!isUpdateMetricsRequest(req.body)) {
|
||||
console.log('Invalid request body')
|
||||
return res.status(400).send('Bad Request')
|
||||
}
|
||||
|
||||
const redisClient = await createRedisClient(
|
||||
process.env.REDIS_URL,
|
||||
process.env.REDIS_CERT
|
||||
)
|
||||
// update metrics
|
||||
await updateMetrics(redisClient, userId, req.body.taskId, req.body.status)
|
||||
|
||||
await redisClient.quit()
|
||||
|
||||
res.send('ok')
|
||||
}
|
||||
)
|
||||
|
||||
34
packages/import-handler/src/luaScripts/updateMetrics.lua
Normal file
34
packages/import-handler/src/luaScripts/updateMetrics.lua
Normal file
@ -0,0 +1,34 @@
|
||||
local key = tostring(KEYS[1]);
|
||||
local status = tostring(ARGV[1]);
|
||||
local timestamp = tonumber(ARGV[2]);
|
||||
|
||||
-- increment the status counter
|
||||
redis.call('HINCRBY', key, status, 1);
|
||||
|
||||
if (status == "imported" or status == "failed") then
|
||||
-- get the current metrics
|
||||
local bulk = redis.call('HGETALL', key);
|
||||
-- get the total, imported and failed counters
|
||||
local result = {}
|
||||
local nextkey
|
||||
for i, v in ipairs(bulk) do
|
||||
if i % 2 == 1 then
|
||||
nextkey = v
|
||||
else
|
||||
result[nextkey] = v
|
||||
end
|
||||
end
|
||||
|
||||
local imported = tonumber(result['imported']) or 0;
|
||||
local failed = tonumber(result['failed']) or 0;
|
||||
local total = tonumber(result['total']) or 0;
|
||||
local state = tonumber(result['state']) or 0;
|
||||
if (state == 0 and imported + failed >= total) then
|
||||
-- all the records have been processed
|
||||
-- update the metrics
|
||||
redis.call('HSET', key, 'end_time', timestamp, 'state', 1);
|
||||
return 1
|
||||
end
|
||||
end
|
||||
|
||||
return 0;
|
||||
@ -4,20 +4,19 @@
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-argument */
|
||||
|
||||
import { parse } from '@fast-csv/parse'
|
||||
import { Stream } from 'stream'
|
||||
import unzip from 'unzip-stream'
|
||||
import { Readability } from '@omnivore/readability'
|
||||
import crypto from 'crypto'
|
||||
import createDOMPurify, { SanitizeElementHookEvent } from 'dompurify'
|
||||
import fs from 'fs'
|
||||
import path from 'path'
|
||||
import * as fsExtra from 'fs-extra'
|
||||
import glob from 'glob'
|
||||
|
||||
import { parseHTML } from 'linkedom'
|
||||
import { Readability } from '@omnivore/readability'
|
||||
import createDOMPurify, { SanitizeElementHookEvent } from 'dompurify'
|
||||
|
||||
import path from 'path'
|
||||
import { Stream } from 'stream'
|
||||
import unzip from 'unzip-stream'
|
||||
import { encode } from 'urlsafe-base64'
|
||||
import crypto from 'crypto'
|
||||
import { ImportContext } from '.'
|
||||
import { createMetrics, ImportStatus, updateMetrics } from './metrics'
|
||||
|
||||
export type UrlHandler = (url: URL) => Promise<void>
|
||||
|
||||
@ -36,8 +35,22 @@ export const importMatterHistoryCsv = async (
|
||||
for await (const row of parser) {
|
||||
try {
|
||||
const url = new URL(row['URL'])
|
||||
// update total counter
|
||||
await updateMetrics(
|
||||
ctx.redisClient,
|
||||
ctx.userId,
|
||||
ctx.taskId,
|
||||
ImportStatus.TOTAL
|
||||
)
|
||||
await ctx.urlHandler(ctx, url)
|
||||
ctx.countImported += 1
|
||||
// update started counter
|
||||
await updateMetrics(
|
||||
ctx.redisClient,
|
||||
ctx.userId,
|
||||
ctx.taskId,
|
||||
ImportStatus.STARTED
|
||||
)
|
||||
} catch (error) {
|
||||
console.log('invalid url', row, error)
|
||||
ctx.countFailed += 1
|
||||
@ -204,6 +217,13 @@ const handleMatterHistoryRow = async (
|
||||
|
||||
if (!url) {
|
||||
ctx.countFailed += 1
|
||||
// update failed counter
|
||||
await updateMetrics(
|
||||
ctx.redisClient,
|
||||
ctx.userId,
|
||||
ctx.taskId,
|
||||
ImportStatus.FAILED
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
@ -232,6 +252,14 @@ export const importMatterArchive = async (
|
||||
const archiveDir = await unarchive(stream)
|
||||
|
||||
try {
|
||||
// create metrics in redis
|
||||
await createMetrics(
|
||||
ctx.redisClient,
|
||||
ctx.userId,
|
||||
ctx.taskId,
|
||||
'matter-importer'
|
||||
)
|
||||
|
||||
const historyFile = path.join(archiveDir, '_matter_history.csv')
|
||||
|
||||
const parser = parse({
|
||||
@ -243,11 +271,34 @@ export const importMatterArchive = async (
|
||||
|
||||
for await (const row of parser) {
|
||||
try {
|
||||
// update total metrics
|
||||
await updateMetrics(
|
||||
ctx.redisClient,
|
||||
ctx.userId,
|
||||
ctx.taskId,
|
||||
ImportStatus.TOTAL
|
||||
)
|
||||
|
||||
await handleMatterHistoryRow(ctx, archiveDir, row)
|
||||
|
||||
ctx.countImported += 1
|
||||
// update started metrics
|
||||
await updateMetrics(
|
||||
ctx.redisClient,
|
||||
ctx.userId,
|
||||
ctx.taskId,
|
||||
ImportStatus.STARTED
|
||||
)
|
||||
} catch (error) {
|
||||
console.log('invalid url', row, error)
|
||||
ctx.countFailed += 1
|
||||
// update failed metrics
|
||||
await updateMetrics(
|
||||
ctx.redisClient,
|
||||
ctx.userId,
|
||||
ctx.taskId,
|
||||
ImportStatus.FAILED
|
||||
)
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
|
||||
104
packages/import-handler/src/metrics.ts
Normal file
104
packages/import-handler/src/metrics.ts
Normal file
@ -0,0 +1,104 @@
|
||||
import { createClient } from 'redis'
|
||||
import { sendImportCompletedEmail } from '.'
|
||||
import { lua } from './redis'
|
||||
|
||||
// explicitly create the return type of RedisClient
|
||||
type RedisClient = ReturnType<typeof createClient>
|
||||
|
||||
export enum ImportStatus {
|
||||
STARTED = 'started',
|
||||
INVALID = 'invalid',
|
||||
IMPORTED = 'imported',
|
||||
FAILED = 'failed',
|
||||
TOTAL = 'total',
|
||||
}
|
||||
|
||||
enum ImportTaskState {
|
||||
STARTED,
|
||||
FINISHED,
|
||||
}
|
||||
|
||||
interface ImportMetrics {
|
||||
started: number
|
||||
invalid: number
|
||||
imported: number
|
||||
failed: number
|
||||
total: number
|
||||
source: string
|
||||
state: ImportTaskState
|
||||
startTime: number
|
||||
endTime: number
|
||||
}
|
||||
|
||||
export const createMetrics = async (
|
||||
redisClient: RedisClient,
|
||||
userId: string,
|
||||
taskId: string,
|
||||
source: string
|
||||
) => {
|
||||
const key = `import:${userId}:${taskId}`
|
||||
try {
|
||||
// set multiple fields
|
||||
await redisClient.hSet(key, {
|
||||
['start_time']: Date.now(),
|
||||
['source']: source,
|
||||
['state']: ImportTaskState.STARTED,
|
||||
})
|
||||
} catch (error) {
|
||||
console.error('Redis Error', error)
|
||||
}
|
||||
}
|
||||
|
||||
export const updateMetrics = async (
|
||||
redisClient: RedisClient,
|
||||
userId: string,
|
||||
taskId: string,
|
||||
status: ImportStatus
|
||||
) => {
|
||||
const key = `import:${userId}:${taskId}`
|
||||
|
||||
try {
|
||||
// use lua script to increment hash field
|
||||
const state = await redisClient.evalSha(lua.sha, {
|
||||
keys: [key],
|
||||
arguments: [status, Date.now().toString()],
|
||||
})
|
||||
|
||||
// if the task is finished, send email
|
||||
if (state == ImportTaskState.FINISHED) {
|
||||
const metrics = await getMetrics(redisClient, userId, taskId)
|
||||
if (metrics) {
|
||||
await sendImportCompletedEmail(userId, metrics.imported, metrics.failed)
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Redis Error', error)
|
||||
}
|
||||
}
|
||||
|
||||
export const getMetrics = async (
|
||||
redisClient: RedisClient,
|
||||
userId: string,
|
||||
taskId: string
|
||||
): Promise<ImportMetrics | null> => {
|
||||
const key = `import:${userId}:${taskId}`
|
||||
try {
|
||||
const metrics = await redisClient.hGetAll(key)
|
||||
|
||||
return {
|
||||
// convert to integer
|
||||
started: parseInt(metrics.started, 10),
|
||||
invalid: parseInt(metrics.invalid, 10),
|
||||
imported: parseInt(metrics.imported, 10),
|
||||
failed: parseInt(metrics.failed, 10),
|
||||
total: parseInt(metrics.total, 10),
|
||||
source: metrics.source,
|
||||
state: parseInt(metrics.state, 10),
|
||||
startTime: parseInt(metrics.start_time, 10),
|
||||
endTime: parseInt(metrics.end_time, 10),
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Redis Error', error)
|
||||
return null
|
||||
}
|
||||
}
|
||||
41
packages/import-handler/src/redis.ts
Normal file
41
packages/import-handler/src/redis.ts
Normal file
@ -0,0 +1,41 @@
|
||||
import { readFileSync } from 'fs'
|
||||
import path from 'path'
|
||||
import { createClient } from 'redis'
|
||||
|
||||
// load lua script
|
||||
export const lua = {
|
||||
script: readFileSync(
|
||||
path.resolve(__dirname, 'luaScripts/updateMetrics.lua'),
|
||||
'utf8'
|
||||
),
|
||||
sha: '',
|
||||
}
|
||||
|
||||
export const createRedisClient = async (url?: string, cert?: string) => {
|
||||
const redisClient = createClient({
|
||||
url,
|
||||
socket: {
|
||||
tls: url?.startsWith('rediss://'), // rediss:// is the protocol for TLS
|
||||
cert: cert?.replace(/\\n/g, '\n'), // replace \n with new line
|
||||
rejectUnauthorized: false, // for self-signed certs
|
||||
connectTimeout: 10000, // 10 seconds
|
||||
reconnectStrategy(retries: number): number | Error {
|
||||
if (retries > 10) {
|
||||
return new Error('Retries exhausted')
|
||||
}
|
||||
return 1000
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
redisClient.on('error', (err) => console.error('Redis Client Error', err))
|
||||
|
||||
await redisClient.connect()
|
||||
console.log('Redis Client Connected:', url)
|
||||
|
||||
// load script to redis
|
||||
lua.sha = await redisClient.scriptLoad(lua.script)
|
||||
console.log('Redis Lua Script Loaded', lua.sha)
|
||||
|
||||
return redisClient
|
||||
}
|
||||
@ -9,129 +9,134 @@ import { stubImportCtx } from '../util'
|
||||
|
||||
chai.use(chaiString)
|
||||
|
||||
describe('Load a simple CSV file', () => {
|
||||
it('should call the handler for each URL', async () => {
|
||||
const urls: URL[] = []
|
||||
const stream = fs.createReadStream('./test/csv/data/simple.csv')
|
||||
const stub = stubImportCtx()
|
||||
stub.urlHandler = (ctx: ImportContext, url): Promise<void> => {
|
||||
urls.push(url)
|
||||
return Promise.resolve()
|
||||
}
|
||||
describe('Test csv importer', () => {
|
||||
let stub: ImportContext
|
||||
|
||||
await importCsv(stub, stream)
|
||||
expect(stub.countFailed).to.equal(0)
|
||||
expect(stub.countImported).to.equal(2)
|
||||
expect(urls).to.eql([
|
||||
new URL('https://omnivore.app'),
|
||||
new URL('https://google.com'),
|
||||
])
|
||||
beforeEach(async () => {
|
||||
stub = await stubImportCtx()
|
||||
})
|
||||
|
||||
it('increments the failed count when the URL is invalid', async () => {
|
||||
const urls: URL[] = []
|
||||
const stream = fs.createReadStream('./test/csv/data/simple.csv')
|
||||
const stub = stubImportCtx()
|
||||
stub.urlHandler = (ctx: ImportContext, url): Promise<void> => {
|
||||
urls.push(url)
|
||||
return Promise.reject('Failed to import url')
|
||||
}
|
||||
|
||||
await importCsv(stub, stream)
|
||||
expect(stub.countFailed).to.equal(2)
|
||||
expect(stub.countImported).to.equal(0)
|
||||
afterEach(async () => {
|
||||
await stub.redisClient.quit()
|
||||
})
|
||||
})
|
||||
|
||||
describe('Load a complex CSV file', () => {
|
||||
it('should call the handler for each URL, state and labels', async () => {
|
||||
const results: {
|
||||
url: URL
|
||||
state?: ArticleSavingRequestStatus
|
||||
labels?: string[]
|
||||
}[] = []
|
||||
const stream = fs.createReadStream('./test/csv/data/complex.csv')
|
||||
const stub = stubImportCtx()
|
||||
stub.urlHandler = (
|
||||
ctx: ImportContext,
|
||||
url,
|
||||
state,
|
||||
labels
|
||||
): Promise<void> => {
|
||||
results.push({
|
||||
describe('Load a simple CSV file', () => {
|
||||
it('should call the handler for each URL', async () => {
|
||||
const urls: URL[] = []
|
||||
const stream = fs.createReadStream('./test/csv/data/simple.csv')
|
||||
stub.urlHandler = (ctx: ImportContext, url): Promise<void> => {
|
||||
urls.push(url)
|
||||
return Promise.resolve()
|
||||
}
|
||||
|
||||
await importCsv(stub, stream)
|
||||
expect(stub.countFailed).to.equal(0)
|
||||
expect(stub.countImported).to.equal(2)
|
||||
expect(urls).to.eql([
|
||||
new URL('https://omnivore.app'),
|
||||
new URL('https://google.com'),
|
||||
])
|
||||
})
|
||||
|
||||
it('increments the failed count when the URL is invalid', async () => {
|
||||
const stream = fs.createReadStream('./test/csv/data/simple.csv')
|
||||
stub.urlHandler = (ctx: ImportContext, url): Promise<void> => {
|
||||
return Promise.reject('Failed to import url')
|
||||
}
|
||||
|
||||
await importCsv(stub, stream)
|
||||
expect(stub.countFailed).to.equal(2)
|
||||
expect(stub.countImported).to.equal(0)
|
||||
})
|
||||
})
|
||||
|
||||
describe('Load a complex CSV file', () => {
|
||||
it('should call the handler for each URL, state and labels', async () => {
|
||||
const results: {
|
||||
url: URL
|
||||
state?: ArticleSavingRequestStatus
|
||||
labels?: string[]
|
||||
}[] = []
|
||||
const stream = fs.createReadStream('./test/csv/data/complex.csv')
|
||||
stub.urlHandler = (
|
||||
ctx: ImportContext,
|
||||
url,
|
||||
state,
|
||||
labels,
|
||||
})
|
||||
return Promise.resolve()
|
||||
}
|
||||
labels
|
||||
): Promise<void> => {
|
||||
results.push({
|
||||
url,
|
||||
state,
|
||||
labels,
|
||||
})
|
||||
return Promise.resolve()
|
||||
}
|
||||
|
||||
await importCsv(stub, stream)
|
||||
expect(stub.countFailed).to.equal(0)
|
||||
expect(stub.countImported).to.equal(3)
|
||||
expect(results).to.eql([
|
||||
{
|
||||
url: new URL('https://omnivore.app'),
|
||||
state: 'ARCHIVED',
|
||||
labels: ['test'],
|
||||
},
|
||||
{
|
||||
url: new URL('https://google.com'),
|
||||
state: 'SUCCEEDED',
|
||||
labels: ['test', 'development'],
|
||||
},
|
||||
{
|
||||
url: new URL('https://test.com'),
|
||||
state: 'SUCCEEDED',
|
||||
labels: ['test', 'development'],
|
||||
},
|
||||
])
|
||||
})
|
||||
})
|
||||
|
||||
describe('A file with no status set', () => {
|
||||
it('should not try to set status', async () => {
|
||||
const states: (ArticleSavingRequestStatus | undefined)[] = []
|
||||
const stream = fs.createReadStream('./test/csv/data/unset-status.csv')
|
||||
const stub = stubImportCtx()
|
||||
stub.urlHandler = (
|
||||
ctx: ImportContext,
|
||||
url,
|
||||
state?: ArticleSavingRequestStatus
|
||||
): Promise<void> => {
|
||||
states.push(state)
|
||||
return Promise.resolve()
|
||||
}
|
||||
|
||||
await importCsv(stub, stream)
|
||||
expect(stub.countFailed).to.equal(0)
|
||||
expect(stub.countImported).to.equal(2)
|
||||
expect(states).to.eql([undefined, ArticleSavingRequestStatus.Archived])
|
||||
})
|
||||
})
|
||||
|
||||
describe('A file with some labels', () => {
|
||||
it('gets the labels, handles empty, and trims extra whitespace', async () => {
|
||||
const importedLabels: (string[] | undefined)[] = []
|
||||
const stream = fs.createReadStream('./test/csv/data/labels.csv')
|
||||
const stub = stubImportCtx()
|
||||
stub.urlHandler = (
|
||||
ctx: ImportContext,
|
||||
url,
|
||||
state?: ArticleSavingRequestStatus,
|
||||
labels?: string[]
|
||||
): Promise<void> => {
|
||||
importedLabels.push(labels)
|
||||
return Promise.resolve()
|
||||
}
|
||||
|
||||
await importCsv(stub, stream)
|
||||
expect(stub.countFailed).to.equal(0)
|
||||
expect(stub.countImported).to.equal(3)
|
||||
expect(importedLabels).to.eql([
|
||||
['Label1', 'Label2', 'Label 3', 'Label 4'],
|
||||
[],
|
||||
[],
|
||||
])
|
||||
await importCsv(stub, stream)
|
||||
expect(stub.countFailed).to.equal(0)
|
||||
expect(stub.countImported).to.equal(3)
|
||||
expect(results).to.eql([
|
||||
{
|
||||
url: new URL('https://omnivore.app'),
|
||||
state: 'ARCHIVED',
|
||||
labels: ['test'],
|
||||
},
|
||||
{
|
||||
url: new URL('https://google.com'),
|
||||
state: 'SUCCEEDED',
|
||||
labels: ['test', 'development'],
|
||||
},
|
||||
{
|
||||
url: new URL('https://test.com'),
|
||||
state: 'SUCCEEDED',
|
||||
labels: ['test', 'development'],
|
||||
},
|
||||
])
|
||||
})
|
||||
})
|
||||
|
||||
describe('A file with no status set', () => {
|
||||
it('should not try to set status', async () => {
|
||||
const states: (ArticleSavingRequestStatus | undefined)[] = []
|
||||
const stream = fs.createReadStream('./test/csv/data/unset-status.csv')
|
||||
stub.urlHandler = (
|
||||
ctx: ImportContext,
|
||||
url,
|
||||
state?: ArticleSavingRequestStatus
|
||||
): Promise<void> => {
|
||||
states.push(state)
|
||||
return Promise.resolve()
|
||||
}
|
||||
|
||||
await importCsv(stub, stream)
|
||||
expect(stub.countFailed).to.equal(0)
|
||||
expect(stub.countImported).to.equal(2)
|
||||
expect(states).to.eql([undefined, ArticleSavingRequestStatus.Archived])
|
||||
})
|
||||
})
|
||||
|
||||
describe('A file with some labels', () => {
|
||||
it('gets the labels, handles empty, and trims extra whitespace', async () => {
|
||||
const importedLabels: (string[] | undefined)[] = []
|
||||
const stream = fs.createReadStream('./test/csv/data/labels.csv')
|
||||
stub.urlHandler = (
|
||||
ctx: ImportContext,
|
||||
url,
|
||||
state?: ArticleSavingRequestStatus,
|
||||
labels?: string[]
|
||||
): Promise<void> => {
|
||||
importedLabels.push(labels)
|
||||
return Promise.resolve()
|
||||
}
|
||||
|
||||
await importCsv(stub, stream)
|
||||
expect(stub.countFailed).to.equal(0)
|
||||
expect(stub.countImported).to.equal(3)
|
||||
expect(importedLabels).to.eql([
|
||||
['Label1', 'Label2', 'Label 3', 'Label 4'],
|
||||
[],
|
||||
[],
|
||||
])
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@ -1,15 +1,15 @@
|
||||
import 'mocha'
|
||||
import { Readability } from '@omnivore/readability'
|
||||
import * as chai from 'chai'
|
||||
import { expect } from 'chai'
|
||||
import chaiString from 'chai-string'
|
||||
import * as fs from 'fs'
|
||||
import 'mocha'
|
||||
import { ImportContext } from '../../src'
|
||||
import {
|
||||
importMatterArchive,
|
||||
importMatterHistoryCsv,
|
||||
} from '../../src/matterHistory'
|
||||
import { stubImportCtx } from '../util'
|
||||
import { ImportContext } from '../../src'
|
||||
import { Readability } from '@omnivore/readability'
|
||||
|
||||
chai.use(chaiString)
|
||||
|
||||
@ -17,7 +17,7 @@ describe('Load a simple _matter_history file', () => {
|
||||
it('should find the URL of each row', async () => {
|
||||
const urls: URL[] = []
|
||||
const stream = fs.createReadStream('./test/matter/data/_matter_history.csv')
|
||||
const stub = stubImportCtx()
|
||||
const stub = await stubImportCtx()
|
||||
stub.urlHandler = (ctx: ImportContext, url): Promise<void> => {
|
||||
urls.push(url)
|
||||
return Promise.resolve()
|
||||
@ -29,6 +29,8 @@ describe('Load a simple _matter_history file', () => {
|
||||
expect(urls).to.eql([
|
||||
new URL('https://www.bloomberg.com/features/2022-the-crypto-story/'),
|
||||
])
|
||||
|
||||
await stub.redisClient.quit()
|
||||
})
|
||||
})
|
||||
|
||||
@ -36,7 +38,7 @@ describe('Load archive file', () => {
|
||||
it('should find the URL of each row', async () => {
|
||||
const urls: URL[] = []
|
||||
const stream = fs.createReadStream('./test/matter/data/Archive.zip')
|
||||
const stub = stubImportCtx()
|
||||
const stub = await stubImportCtx()
|
||||
stub.contentHandler = (
|
||||
ctx: ImportContext,
|
||||
url: URL,
|
||||
@ -54,5 +56,7 @@ describe('Load archive file', () => {
|
||||
expect(urls).to.eql([
|
||||
new URL('https://www.bloomberg.com/features/2022-the-crypto-story/'),
|
||||
])
|
||||
|
||||
await stub.redisClient.quit()
|
||||
})
|
||||
})
|
||||
|
||||
@ -1,7 +1,10 @@
|
||||
import { Readability } from '@omnivore/readability'
|
||||
import { ArticleSavingRequestStatus, ImportContext } from '../src'
|
||||
import { createRedisClient } from '../src/redis'
|
||||
|
||||
export const stubImportCtx = async (): Promise<ImportContext> => {
|
||||
const redisClient = await createRedisClient(process.env.REDIS_URL)
|
||||
|
||||
export const stubImportCtx = () => {
|
||||
return {
|
||||
userId: '',
|
||||
countImported: 0,
|
||||
@ -23,5 +26,7 @@ export const stubImportCtx = () => {
|
||||
): Promise<void> => {
|
||||
return Promise.resolve()
|
||||
},
|
||||
redisClient,
|
||||
taskId: '',
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
{
|
||||
"extends": "./../../tsconfig.json",
|
||||
"ts-node": { "files": true },
|
||||
"ts-node": { "files": true },
|
||||
"compilerOptions": {
|
||||
"outDir": "build",
|
||||
"rootDir": ".",
|
||||
@ -8,5 +8,5 @@
|
||||
// Generate d.ts files
|
||||
"declaration": true
|
||||
},
|
||||
"include": ["src", "test"]
|
||||
"include": ["src/**/*", "test/**/*"]
|
||||
}
|
||||
|
||||
@ -44,6 +44,8 @@ const NON_SCRIPT_HOSTS= ['medium.com', 'fastcompany.com'];
|
||||
|
||||
const ALLOWED_CONTENT_TYPES = ['text/html', 'application/octet-stream', 'text/plain', 'application/pdf'];
|
||||
|
||||
const IMPORTER_METRICS_COLLECTOR_URL = process.env.IMPORTER_METRICS_COLLECTOR_URL;
|
||||
|
||||
const userAgentForUrl = (url) => {
|
||||
try {
|
||||
const u = new URL(url);
|
||||
@ -59,18 +61,24 @@ const userAgentForUrl = (url) => {
|
||||
};
|
||||
|
||||
const fetchContentWithScrapingBee = async (url) => {
|
||||
const response = await axios.get('https://app.scrapingbee.com/api/v1', {
|
||||
params: {
|
||||
'api_key': process.env.SCRAPINGBEE_API_KEY,
|
||||
'url': url,
|
||||
'render_js': 'false',
|
||||
'premium_proxy': 'true',
|
||||
'country_code':'us'
|
||||
}
|
||||
})
|
||||
try {
|
||||
const response = await axios.get('https://app.scrapingbee.com/api/v1', {
|
||||
params: {
|
||||
'api_key': process.env.SCRAPINGBEE_API_KEY,
|
||||
'url': url,
|
||||
'render_js': 'false',
|
||||
'premium_proxy': 'true',
|
||||
'country_code':'us'
|
||||
}
|
||||
})
|
||||
|
||||
const dom = parseHTML(response.data).document;
|
||||
return { title: dom.title, domContent: dom.documentElement.outerHTML, url }
|
||||
} catch (e) {
|
||||
console.error('error fetching with scrapingbee', e.message)
|
||||
|
||||
const dom = parseHTML(response.data).document;
|
||||
return { title: dom.title, domContent: dom.documentElement.outerHTML, url: url }
|
||||
return { title: url, domContent: '', url }
|
||||
}
|
||||
}
|
||||
|
||||
const enableJavascriptForUrl = (url) => {
|
||||
@ -242,6 +250,27 @@ const saveUploadedPdf = async (userId, url, uploadFileId, articleSavingRequestId
|
||||
);
|
||||
};
|
||||
|
||||
const sendImportStatusUpdate = async (userId, taskId, status) => {
|
||||
try {
|
||||
const auth = await signToken({ uid: userId }, process.env.JWT_SECRET);
|
||||
|
||||
await axios.post(
|
||||
IMPORTER_METRICS_COLLECTOR_URL,
|
||||
{
|
||||
taskId,
|
||||
status,
|
||||
},
|
||||
{
|
||||
headers: {
|
||||
'Authorization': auth,
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
});
|
||||
} catch (e) {
|
||||
console.error('Error while sending import status update', e);
|
||||
}
|
||||
};
|
||||
|
||||
async function fetchContent(req, res) {
|
||||
let functionStartTime = Date.now();
|
||||
|
||||
@ -250,16 +279,19 @@ async function fetchContent(req, res) {
|
||||
const articleSavingRequestId = (req.query ? req.query.saveRequestId : undefined) || (req.body ? req.body.saveRequestId : undefined);
|
||||
const state = req.body.state
|
||||
const labels = req.body.labels
|
||||
const source = req.body.source || 'parseContent';
|
||||
const taskId = req.body.taskId; // taskId is used to update import status
|
||||
|
||||
let logRecord = {
|
||||
url,
|
||||
userId,
|
||||
articleSavingRequestId,
|
||||
labels: {
|
||||
source: 'parseContent',
|
||||
source,
|
||||
},
|
||||
state,
|
||||
labelsToAdd: labels
|
||||
labelsToAdd: labels,
|
||||
taskId: taskId,
|
||||
};
|
||||
|
||||
console.info(`Article parsing request`, logRecord);
|
||||
@ -271,7 +303,7 @@ async function fetchContent(req, res) {
|
||||
}
|
||||
|
||||
// pre handle url with custom handlers
|
||||
let title, content, contentType;
|
||||
let title, content, contentType, importStatus;
|
||||
try {
|
||||
const browser = await getBrowserPromise;
|
||||
const result = await preHandleContent(url, browser);
|
||||
@ -320,19 +352,16 @@ async function fetchContent(req, res) {
|
||||
|
||||
let readabilityResult = null;
|
||||
if (content) {
|
||||
let document = parseHTML(content).document;
|
||||
const document = parseHTML(content).document;
|
||||
|
||||
// preParse content
|
||||
const preParsedDom = await preParseContent(url, document)
|
||||
if (preParsedDom) {
|
||||
document = preParsedDom
|
||||
}
|
||||
const preParsedDom = (await preParseContent(url, document)) || document;
|
||||
|
||||
readabilityResult = await getReadabilityResult(url, document);
|
||||
readabilityResult = await getReadabilityResult(url, preParsedDom);
|
||||
}
|
||||
|
||||
const apiResponse = await sendSavePageMutation(userId, {
|
||||
url: finalUrl,
|
||||
url,
|
||||
clientRequestId: articleSavingRequestId,
|
||||
title,
|
||||
originalContent: content,
|
||||
@ -344,14 +373,16 @@ async function fetchContent(req, res) {
|
||||
logRecord.totalTime = Date.now() - functionStartTime;
|
||||
logRecord.result = apiResponse.createArticle;
|
||||
}
|
||||
|
||||
importStatus = 'imported';
|
||||
} catch (e) {
|
||||
logRecord.error = e.message;
|
||||
console.error(`Error while retrieving page`, logRecord);
|
||||
|
||||
// fallback to scrapingbee
|
||||
const sbResult = await fetchContentWithScrapingBee(url);
|
||||
const sbUrl = finalUrl || sbResult.url;
|
||||
const content = sbResult.domContent;
|
||||
const title = sbResult.title;
|
||||
logRecord.fetchContentTime = Date.now() - functionStartTime;
|
||||
|
||||
let readabilityResult = null;
|
||||
@ -359,7 +390,7 @@ async function fetchContent(req, res) {
|
||||
let document = parseHTML(content).document;
|
||||
|
||||
// preParse content
|
||||
const preParsedDom = await preParseContent(sbUrl, document)
|
||||
const preParsedDom = await preParseContent(url, document)
|
||||
if (preParsedDom) {
|
||||
document = preParsedDom
|
||||
}
|
||||
@ -368,7 +399,7 @@ async function fetchContent(req, res) {
|
||||
}
|
||||
|
||||
const apiResponse = await sendSavePageMutation(userId, {
|
||||
url: finalUrl,
|
||||
url,
|
||||
clientRequestId: articleSavingRequestId,
|
||||
title,
|
||||
originalContent: content,
|
||||
@ -379,14 +410,21 @@ async function fetchContent(req, res) {
|
||||
|
||||
logRecord.totalTime = Date.now() - functionStartTime;
|
||||
logRecord.result = apiResponse.createArticle;
|
||||
|
||||
importStatus = 'failed';
|
||||
} finally {
|
||||
if (context) {
|
||||
await context.close();
|
||||
}
|
||||
console.info(`parse-page`, logRecord);
|
||||
}
|
||||
|
||||
return res.sendStatus(200);
|
||||
// send import status to update the metrics
|
||||
if (taskId) {
|
||||
await sendImportStatusUpdate(userId, taskId, importStatus);
|
||||
}
|
||||
|
||||
res.sendStatus(200);
|
||||
}
|
||||
}
|
||||
|
||||
function validateUrlString(url) {
|
||||
|
||||
Reference in New Issue
Block a user