From bbc7b5e6000943c288a31f640df7b531cca72111 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Wed, 3 Jul 2024 22:20:27 +0800 Subject: [PATCH] use @omnivore/utils in import-handler --- packages/content-fetch/package.json | 3 +- packages/content-handler/package.json | 2 +- packages/content-handler/src/redis.ts | 34 -- .../src/websites/nitter-handler.ts | 424 ------------------ packages/import-handler/Dockerfile | 3 + packages/import-handler/Dockerfile-collector | 3 + packages/import-handler/package.json | 2 + packages/import-handler/src/index.ts | 56 ++- packages/import-handler/src/metrics.ts | 1 + packages/import-handler/src/redis.ts | 35 +- packages/import-handler/test/util.ts | 15 +- 11 files changed, 65 insertions(+), 513 deletions(-) delete mode 100644 packages/content-handler/src/redis.ts delete mode 100644 packages/content-handler/src/websites/nitter-handler.ts diff --git a/packages/content-fetch/package.json b/packages/content-fetch/package.json index 3bd83e706..28f70ed19 100644 --- a/packages/content-fetch/package.json +++ b/packages/content-fetch/package.json @@ -27,8 +27,7 @@ "lint": "eslint src --ext ts,js,tsx,jsx", "build": "tsc", "start": "node build/src/app.js", - "start_gcf": "functions-framework --port=9090 --target=puppeteer", - "dev": "concurrently \"tsc -w\" \"nodemon --watch ./build/ --exec npm run start\"" + "start_gcf": "functions-framework --port=9090 --target=puppeteer" }, "volta": { "extends": "../../package.json" diff --git a/packages/content-handler/package.json b/packages/content-handler/package.json index 832b2b4b4..e1f4604ee 100644 --- a/packages/content-handler/package.json +++ b/packages/content-handler/package.json @@ -30,9 +30,9 @@ "nock": "^13.2.9" }, "dependencies": { + "@omnivore/utils": "1.0.0", "addressparser": "^1.0.1", "axios": "^0.27.2", - "ioredis": "^5.3.2", "linkedom": "^0.14.16", "lodash": "^4.17.21", "luxon": "^3.0.4", diff --git a/packages/content-handler/src/redis.ts b/packages/content-handler/src/redis.ts deleted file mode 100644 index 5bc90decc..000000000 --- a/packages/content-handler/src/redis.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { Redis } from 'ioredis' - -export const createRedisClient = (url?: string, cert?: string) => { - return new Redis(url || 'redis://localhost:6379', { - connectTimeout: 10000, // 10 seconds - tls: cert - ? { - cert: cert.replace(/\\n/g, '\n'), // replace \n with new line - rejectUnauthorized: false, // for self-signed certs - } - : undefined, - reconnectOnError: (err) => { - const targetErrors = [/READONLY/, /ETIMEDOUT/] - - targetErrors.forEach((targetError) => { - if (targetError.test(err.message)) { - // Only reconnect when the error contains the keyword - return true - } - }) - - return false - }, - retryStrategy: (times) => { - if (times > 10) { - // End reconnecting after a specific number of tries and flush all commands with a individual error - return null - } - - // reconnect after - return Math.min(times * 50, 2000) - }, - }) -} diff --git a/packages/content-handler/src/websites/nitter-handler.ts b/packages/content-handler/src/websites/nitter-handler.ts deleted file mode 100644 index f8d927d6f..000000000 --- a/packages/content-handler/src/websites/nitter-handler.ts +++ /dev/null @@ -1,424 +0,0 @@ -import axios from 'axios' -import Redis from 'ioredis' -import { parseHTML } from 'linkedom' -import _, { truncate } from 'lodash' -import { DateTime } from 'luxon' -import { ContentHandler, PreHandleResult } from '../content-handler' -import { createRedisClient } from '../redis' - -interface Tweet { - url: string - author: { - username: string - name: string - profileImageUrl: string - } - text: string - entities: { - urls: { - url: string - displayUrl: string - }[] - } - attachments: { - type: string - url: string - previewUrl: string - }[] - createdAt: string -} - -export class NitterHandler extends ContentHandler { - // matches twitter.com and nitter.net urls - URL_MATCH = - /((x\.com)|(twitter\.com)|(nitter\.net))\/(?:#!\/)?(\w+)\/status(?:es)?\/(\d+)(?:\/.*)?/ - INSTANCES = [ - { value: 'https://nitter.moomoo.me', score: 0 }, - { value: 'https://nitter.net', score: 1 }, // the official instance - { value: 'https://nitter.lacontrevoie.fr', score: 2 }, - { value: 'https://nitter.kavin.rocks', score: 3 }, - { value: 'https://notabird.site', score: 4 }, - { value: 'https://singapore.unofficialbird.com', score: 5 }, - { value: 'https://nitter.fly.dev', score: 6 }, - ] - REDIS_KEY = 'nitter-instances' - - private instance: string - - constructor() { - super() - this.name = 'Nitter' - this.instance = '' - } - - async getInstances(redisClient: Redis) { - // get instances by score in ascending order - const instances = await redisClient.zrange( - this.REDIS_KEY, - '-inf', - '+inf', - 'BYSCORE' - ) - console.debug('instances', instances) - - // if no instance is found, save the default instances - if (instances.length === 0) { - // only add if the key does not exist - const result = await redisClient.zadd( - this.REDIS_KEY, - 'NX', - ...this.INSTANCES.map((i) => [i.score, i.value]).flat() - ) - console.debug('add instances', result) - - // expire the key after 1 day - const exp = await redisClient.expire(this.REDIS_KEY, 60 * 60 * 24) - console.debug('instances expire in 1 day', exp) - - return this.INSTANCES.map((i) => i.value) - } - - return instances - } - - async incrementInstanceScore( - redisClient: Redis, - instance: string, - score = 1 - ) { - await redisClient.zincrby(this.REDIS_KEY, score, instance) - } - - async getTweets(username: string, tweetId: string) { - function authorParser(header: Element) { - const profileImageUrl = - header.querySelector('.tweet-avatar img')?.getAttribute('src') ?? '' - const name = - header.querySelector('.fullname')?.getAttribute('title') ?? '' - const username = - header.querySelector('.username')?.getAttribute('title') ?? '' - - return { - profileImageUrl, - name, - username: username.replace('@', ''), // remove @ from username - } - } - - function dateParser(date: Element) { - const validDateTime = - date.getAttribute('title')?.replace(' · ', ' ') ?? Date.now() - - return new Date(validDateTime).toISOString() - } - - function urlParser(date: Element) { - return date.getAttribute('href') ?? '' - } - - function attachmentParser(attachments: Element | null) { - if (!attachments) return [] - - const photos = Array.from(attachments.querySelectorAll('img')).map( - (i) => ({ - url: i.getAttribute('src') ?? '', - type: 'photo', - previewUrl: i.getAttribute('src') ?? '', - }) - ) - const videos = Array.from(attachments.querySelectorAll('video')).map( - (i) => ({ - url: i.getAttribute('data-url') ?? '', - type: 'video', - previewUrl: i.getAttribute('poster') ?? '', - }) - ) - - return [...photos, ...videos] - } - - function parseTweet(tweet: Element): Tweet | null { - const header = tweet.querySelector('.tweet-header') - if (!header) { - console.error('no header found', tweet) - return null - } - const author = authorParser(header) - - const body = tweet.querySelector('.tweet-body') - if (!body) { - console.error('no body found', tweet) - return null - } - - const tweetDateElement = body.querySelector('.tweet-date a') - if (!tweetDateElement) { - console.error('no tweet date found', tweet) - return null - } - const createdAt = dateParser(tweetDateElement) - const url = urlParser(tweetDateElement) - - const content = body.querySelector('.tweet-content') - if (!content) { - console.error('no content found', tweet) - return null - } - const text = content.textContent ?? '' - const urls = Array.from(content.querySelectorAll('a')).map((a) => ({ - url: a.getAttribute('href') ?? '', - displayUrl: a.textContent ?? '', - })) - - const attachments = attachmentParser(body.querySelector('.attachments')) - - return { - author, - createdAt, - text, - url, - entities: { - urls, - }, - attachments, - } - } - - const redisClient = createRedisClient( - process.env.REDIS_URL, - process.env.REDIS_CERT - ) - - try { - const tweets: Tweet[] = [] - const option = { - timeout: 20000, // 20 seconds - } - let html = '' - // get instances from redis - const instances = await this.getInstances(redisClient) - for (const instance of instances) { - try { - const url = `${instance}/${username}/status/${tweetId}` - const startTime = Date.now() - const response = await axios.get(url, option) - const latency = Math.floor(Date.now() - startTime) - console.debug('latency', latency) - - html = response.data as string - this.instance = instance - - await this.incrementInstanceScore(redisClient, instance, latency) - break - } catch (error) { - await this.incrementInstanceScore( - redisClient, - instance, - option.timeout - ) - - if (axios.isAxiosError(error)) { - console.info(`Error getting tweets from ${instance}`, error.message) - } else { - console.info(`Error getting tweets from ${instance}`, error) - } - } - } - if (!this.instance || !html) { - console.error('no instance or html found') - return [] - } - - const document = parseHTML(html).document - - // get the main thread including tweets and threads - const mainThread = document.querySelector('.main-thread') - if (!mainThread) { - console.error('no main thread found') - return [] - } - const timelineItems = Array.from( - mainThread.querySelectorAll('.timeline-item') - ) - if (timelineItems.length === 0) { - console.error('no timeline items found') - return [] - } - for (let i = 0; i < timelineItems.length; i++) { - const item = timelineItems[i] - const classList = item.classList - // skip unavailable tweets and earlier replies - if ( - classList.contains('unavailable') || - classList.contains('earlier-replies') - ) { - console.info('skip unavailable tweets and earlier replies') - continue - } - // if there are more replies, get them - if (classList.contains('more-replies')) { - const newUrl = item.querySelector('a')?.getAttribute('href') - if (!newUrl) { - console.error('no new url', newUrl) - break - } - - let html = '' - try { - // go to new url and wait for it to load - const response = await axios.get( - `${this.instance}${newUrl}`, - option - ) - - html = response.data as string - } catch (error) { - console.error('Error getting tweets', error) - break - } - - const document = parseHTML(html).document - const nextThread = document.querySelector('.main-thread .after-tweet') - if (!nextThread) { - console.error('no next thread found') - break - } - - // get the new timeline items and add them to the list - const newTimelineItems = Array.from( - nextThread.querySelectorAll('.timeline-item') - ) - - timelineItems.push(...newTimelineItems) - continue - } - - const tweet = parseTweet(item) - // filter out replies - if ( - tweet && - tweet.author.username.toLowerCase() === username.toLowerCase() - ) { - tweets.push(tweet) - } - } - - return tweets - } catch (error) { - console.error('Error getting tweets', error) - - return [] - } finally { - await redisClient?.quit() - } - } - - parseTweetUrl = (url: string) => { - const match = url.match(this.URL_MATCH) - return { - domain: match?.[1]?.replace('x', 'twitter'), - username: match?.[4], - tweetId: match?.[5], - } - } - - titleForTweet = (author: { name: string }, text: string) => { - return `${author.name} on Twitter: ${truncate(text.replace(/http\S+/, ''), { - length: 100, - })}` - } - - formatTimestamp = (timestamp: string) => { - return DateTime.fromJSDate(new Date(timestamp)).toLocaleString( - DateTime.DATETIME_FULL - ) - } - - shouldPreHandle(url: string): boolean { - return this.URL_MATCH.test(url.toString()) - } - - async preHandle(url: string): Promise { - const { tweetId, username, domain } = this.parseTweetUrl(url) - if (!tweetId || !username || !domain) { - throw new Error('could not parse tweet url') - } - const tweets = await this.getTweets(username, tweetId) - if (tweets.length === 0) { - throw new Error('could not get tweets') - } - - const tweet = tweets[0] - const author = tweet.author - // escape html entities in title - const title = this.titleForTweet(author, tweet.text) - const escapedTitle = _.escape(title) - const authorImage = `${this.instance}${author.profileImageUrl.replace( - '_normal', - '_400x400' - )}` - const description = _.escape(tweet.text) || escapedTitle - const imageDomain = - domain.toLowerCase() === 'twitter.com' - ? 'https://pbs.twimg.com' - : 'https://nitter.net/pic' - - let tweetsContent = '' - for (const tweet of tweets) { - let text = tweet.text - for (const urlObj of tweet.entities.urls) { - text = text.replace( - urlObj.displayUrl, - `${urlObj.displayUrl}` - ) - } - - const includesHtml = tweet.attachments - .map( - (attachment) => - ` - - - - ` - ) - .join('\n') - - tweetsContent += `

${text}

${includesHtml}` - } - - const tweetUrl = ` - — ${ - author.username - } ${this.formatTimestamp(tweet.createdAt)}` - - const content = ` - - - - - - - - - - - - - -
- ${tweetsContent} - ${tweetUrl} -
- - ` - - return { content, url, title } - } -} diff --git a/packages/import-handler/Dockerfile b/packages/import-handler/Dockerfile index 9fc10597d..1b0dda2b5 100644 --- a/packages/import-handler/Dockerfile +++ b/packages/import-handler/Dockerfile @@ -14,11 +14,14 @@ COPY .eslintrc . COPY /packages/readabilityjs/package.json ./packages/readabilityjs/package.json COPY /packages/import-handler/package.json ./packages/import-handler/package.json +COPY /packages/utils/package.json ./packages/utils/package.json RUN yarn install --pure-lockfile +ADD /packages/utils ./packages/utils ADD /packages/import-handler ./packages/import-handler ADD /packages/readabilityjs ./packages/readabilityjs +RUN yarn workspace @omnivore/utils build RUN yarn workspace @omnivore/import-handler build # After building, fetch the production dependencies diff --git a/packages/import-handler/Dockerfile-collector b/packages/import-handler/Dockerfile-collector index 8a30021ec..0abe00d6a 100644 --- a/packages/import-handler/Dockerfile-collector +++ b/packages/import-handler/Dockerfile-collector @@ -14,11 +14,14 @@ COPY .eslintrc . COPY /packages/readabilityjs/package.json ./packages/readabilityjs/package.json COPY /packages/import-handler/package.json ./packages/import-handler/package.json +COPY /packages/utils/package.json ./packages/utils/package.json RUN yarn install --pure-lockfile ADD /packages/import-handler ./packages/import-handler ADD /packages/readabilityjs ./packages/readabilityjs +ADD /packages/utils ./packages/utils +RUN yarn workspace @omnivore/utils build RUN yarn workspace @omnivore/import-handler build # After building, fetch the production dependencies diff --git a/packages/import-handler/package.json b/packages/import-handler/package.json index f62fae3f8..7641a598b 100644 --- a/packages/import-handler/package.json +++ b/packages/import-handler/package.json @@ -38,9 +38,11 @@ "@google-cloud/storage": "^7.0.1", "@google-cloud/tasks": "^4.0.0", "@omnivore/readability": "1.0.0", + "@omnivore/utils": "1.0.0", "@sentry/serverless": "^7.77.0", "@types/express": "^4.17.13", "axios": "^1.2.2", + "dotenv": "^16.0.1", "dompurify": "^2.4.3", "fs-extra": "^11.1.0", "glob": "^8.1.0", diff --git a/packages/import-handler/src/index.ts b/packages/import-handler/src/index.ts index d95751b47..de3379a26 100644 --- a/packages/import-handler/src/index.ts +++ b/packages/import-handler/src/index.ts @@ -1,7 +1,9 @@ import { Storage } from '@google-cloud/storage' import { Readability } from '@omnivore/readability' +import { RedisDataSource } from '@omnivore/utils' import * as Sentry from '@sentry/serverless' import axios from 'axios' +import 'dotenv/config' import Redis from 'ioredis' import * as jwt from 'jsonwebtoken' import { Stream } from 'node:stream' @@ -11,7 +13,6 @@ import { v4 as uuid } from 'uuid' import { importCsv } from './csv' import { importMatterArchive } from './matterHistory' import { ImportStatus, updateMetrics } from './metrics' -import { createRedisClient } from './redis' import { CONTENT_FETCH_URL, createCloudTask, emailUserUrl } from './task' export enum ArticleSavingRequestStatus { @@ -363,19 +364,26 @@ export const importHandler = Sentry.GCPFunction.wrapHttpFunction( const pubSubMessage = req.body.message.data as string const obj = getStorageEvent(pubSubMessage) if (obj) { - // create redis client - const redisClient = createRedisClient( - process.env.REDIS_URL, - process.env.REDIS_CERT - ) + // create redis source + const redisDataSource = new RedisDataSource({ + cache: { + url: process.env.REDIS_URL, + cert: process.env.REDIS_CERT, + }, + mq: { + url: process.env.MQ_REDIS_URL, + cert: process.env.MQ_REDIS_CERT, + }, + }) + try { - await handleEvent(obj, redisClient) + await handleEvent(obj, redisDataSource.cacheClient) } catch (err) { console.log('error handling event', { err, obj }) throw err } finally { // close redis client - await redisClient.quit() + await redisDataSource.shutdown() } } } else { @@ -413,14 +421,32 @@ export const importMetricsCollector = Sentry.GCPFunction.wrapHttpFunction( return res.status(400).send('Bad Request') } - const redisClient = createRedisClient( - process.env.REDIS_URL, - process.env.REDIS_CERT - ) - // update metrics - await updateMetrics(redisClient, userId, req.body.taskId, req.body.status) + // create redis source + const redisDataSource = new RedisDataSource({ + cache: { + url: process.env.REDIS_URL, + cert: process.env.REDIS_CERT, + }, + mq: { + url: process.env.MQ_REDIS_URL, + cert: process.env.MQ_REDIS_CERT, + }, + }) - await redisClient.quit() + try { + // update metrics + await updateMetrics( + redisDataSource.cacheClient, + userId, + req.body.taskId, + req.body.status + ) + } catch (error) { + console.error('Error updating metrics', error) + return res.status(500).send('Error updating metrics') + } finally { + await redisDataSource.shutdown() + } res.send('ok') } diff --git a/packages/import-handler/src/metrics.ts b/packages/import-handler/src/metrics.ts index b197356a0..41e4c734c 100644 --- a/packages/import-handler/src/metrics.ts +++ b/packages/import-handler/src/metrics.ts @@ -1,3 +1,4 @@ +import { RedisDataSource } from '@omnivore/utils' import Redis from 'ioredis' import { sendImportCompletedEmail } from '.' diff --git a/packages/import-handler/src/redis.ts b/packages/import-handler/src/redis.ts index ad765dcee..4419e6096 100644 --- a/packages/import-handler/src/redis.ts +++ b/packages/import-handler/src/redis.ts @@ -1,37 +1,4 @@ -import { Redis, Result } from 'ioredis' - -export const createRedisClient = (url?: string, cert?: string) => { - return new Redis(url || 'redis://localhost:6379', { - connectTimeout: 10000, // 10 seconds - tls: cert - ? { - cert: cert.replace(/\\n/g, '\n'), // replace \n with new line - rejectUnauthorized: false, // for self-signed certs - } - : undefined, - reconnectOnError: (err) => { - const targetErrors = [/READONLY/, /ETIMEDOUT/] - - targetErrors.forEach((targetError) => { - if (targetError.test(err.message)) { - // Only reconnect when the error contains the keyword - return true - } - }) - - return false - }, - retryStrategy: (times) => { - if (times > 10) { - // End reconnecting after a specific number of tries and flush all commands with a individual error - return null - } - - // reconnect after - return Math.min(times * 50, 2000) - }, - }) -} +import { Result } from 'ioredis' // Add declarations declare module 'ioredis' { diff --git a/packages/import-handler/test/util.ts b/packages/import-handler/test/util.ts index 04f7f7117..33cdb6342 100644 --- a/packages/import-handler/test/util.ts +++ b/packages/import-handler/test/util.ts @@ -1,9 +1,18 @@ import { Readability } from '@omnivore/readability' +import { RedisDataSource } from '@omnivore/utils' import { ArticleSavingRequestStatus, ImportContext } from '../src' -import { createRedisClient } from '../src/redis' export const stubImportCtx = (): ImportContext => { - const redisClient = createRedisClient(process.env.REDIS_URL) + const redisDataSource = new RedisDataSource({ + cache: { + url: process.env.REDIS_URL, + cert: process.env.REDIS_CERT, + }, + mq: { + url: process.env.MQ_REDIS_URL, + cert: process.env.MQ_REDIS_CERT, + }, + }) return { userId: '', @@ -26,7 +35,7 @@ export const stubImportCtx = (): ImportContext => { ): Promise => { return Promise.resolve() }, - redisClient, + redisClient: redisDataSource.cacheClient, taskId: '', source: 'csv-importer', }