Merge pull request #4203 from omnivore-app/feature/content-fetch-queue

feat: task based content-fetch
This commit is contained in:
Hongbo Wu
2024-08-22 12:57:43 +08:00
committed by GitHub
28 changed files with 618 additions and 459 deletions

View File

@ -111,7 +111,5 @@ jobs:
run: 'docker build --file packages/content-fetch/Dockerfile .'
- name: Build the inbound-email-handler docker image
run: 'docker build --file packages/inbound-email-handler/Dockerfile .'
- name: Build the content-fetch cloud function docker image
run: 'docker build --file packages/content-fetch/Dockerfile-gcf .'
- name: Build the tts docker image
run: 'docker build --file packages/text-to-speech/Dockerfile .'

View File

@ -1,5 +1,5 @@
import { BulkActionType } from '../generated/graphql'
import { getBackendQueue } from '../queue-processor'
import { getQueue } from '../queue-processor'
import { batchUpdateLibraryItems } from '../services/library_item'
import { logger } from '../utils/logger'
@ -18,7 +18,7 @@ export const BULK_ACTION_JOB_NAME = 'bulk-action'
export const bulkAction = async (data: BulkActionData) => {
const { userId, action, query, labelIds, args, batchSize, count } = data
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
throw new Error('Queue not initialized')
}

View File

@ -1,7 +1,7 @@
import { Job } from 'bullmq'
import { DataSource } from 'typeorm'
import { v4 as uuid } from 'uuid'
import { getBackendQueue, JOB_VERSION } from '../../queue-processor'
import { getQueue, JOB_VERSION } from '../../queue-processor'
import { validateUrl } from '../../services/create_page_save_request'
import { getJobPriority, RssSubscriptionGroup } from '../../utils/createTask'
import { stringToHash } from '../../utils/helpers'
@ -124,7 +124,7 @@ const updateSubscriptionGroup = async (
}
export const queueRSSRefreshAllFeedsJob = async () => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
return false
}
@ -144,7 +144,7 @@ export const queueRSSRefreshFeedJob = async (
payload: any,
options = { priority: 'low' as QueuePriority }
): Promise<Job | undefined> => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
return undefined
}

View File

@ -14,7 +14,10 @@ import {
updateSubscriptions,
} from '../../services/update_subscription'
import { findActiveUser } from '../../services/user'
import createHttpTaskWithToken from '../../utils/createTask'
import createHttpTaskWithToken, {
enqueueFetchContentJob,
FetchContentJobData,
} from '../../utils/createTask'
import { cleanUrl } from '../../utils/helpers'
import { createThumbnailProxyUrl } from '../../utils/imageproxy'
import { logger } from '../../utils/logger'
@ -33,6 +36,7 @@ interface RefreshFeedRequest {
fetchContentTypes: FetchContentType[]
folders: FolderType[]
refreshContext?: RSSRefreshContext
priority?: 'low' | 'high'
}
export const isRefreshFeedRequest = (data: any): data is RefreshFeedRequest => {
@ -330,9 +334,10 @@ const createTask = async (
const fetchContentAndCreateItem = async (
users: UserConfig[],
feedUrl: string,
item: RssFeedItem
item: RssFeedItem,
priority = 'low' as 'low' | 'high'
) => {
const payload = {
const data: FetchContentJobData = {
users,
source: 'rss-feeder',
url: item.link.trim(),
@ -340,15 +345,24 @@ const fetchContentAndCreateItem = async (
rssFeedUrl: feedUrl,
savedAt: item.isoDate,
publishedAt: item.isoDate,
priority,
}
try {
const task = await createHttpTaskWithToken({
const contentFetchQueueEnabled =
process.env.CONTENT_FETCH_QUEUE_ENABLED === 'true'
if (contentFetchQueueEnabled) {
return await enqueueFetchContentJob(data)
}
return await createHttpTaskWithToken({
queue: 'omnivore-rss-feed-queue',
taskHandlerUrl: env.queue.contentFetchGCFUrl,
payload,
payload: {
...data,
priority: 'high', // use one queue for all RSS feeds for now
},
})
return !!task
} catch (error) {
logger.error('Error while creating task', error)
return false
@ -704,6 +718,7 @@ export const _refreshFeed = async (request: RefreshFeedRequest) => {
fetchContentTypes,
folders,
refreshContext,
priority,
} = request
logger.info('Processing feed', feedUrl, { refreshContext: refreshContext })
@ -772,7 +787,8 @@ export const _refreshFeed = async (request: RefreshFeedRequest) => {
await fetchContentAndCreateItem(
Array.from(task.users.values()),
feedUrl,
task.item
task.item,
priority
)
}

View File

@ -140,7 +140,7 @@ const sendImportStatusUpdate = async (
Authorization: auth as string,
'Content-Type': 'application/json',
},
timeout: REQUEST_TIMEOUT,
timeout: 5000,
}
)
} catch (e) {
@ -288,7 +288,7 @@ export const savePageJob = async (data: Data, attemptsMade: number) => {
throw e
} finally {
const lastAttempt = attemptsMade + 1 === MAX_IMPORT_ATTEMPTS
const lastAttempt = attemptsMade + 1 >= MAX_IMPORT_ATTEMPTS
if (taskId && (isSaved || lastAttempt)) {
logger.info('sending import status update')

View File

@ -82,7 +82,9 @@ import { CACHED_READING_POSITION_PREFIX } from './services/cached_reading_positi
import { getJobPriority } from './utils/createTask'
import { logger } from './utils/logger'
export const QUEUE_NAME = 'omnivore-backend-queue'
export const BACKEND_QUEUE_NAME = 'omnivore-backend-queue'
export const CONTENT_FETCH_QUEUE = 'omnivore-content-fetch-queue'
export const JOB_VERSION = 'v001'
const jobLatency = new client.Histogram({
@ -94,8 +96,8 @@ const jobLatency = new client.Histogram({
registerMetric(jobLatency)
export const getBackendQueue = async (
name = QUEUE_NAME
export const getQueue = async (
name = BACKEND_QUEUE_NAME
): Promise<Queue | undefined> => {
if (!redisDataSource.workerRedisClient) {
throw new Error('Can not create queues, redis is not initialized')
@ -124,7 +126,7 @@ export const createJobId = (jobName: string, userId: string) =>
`${jobName}_${userId}_${JOB_VERSION}`
export const getJob = async (jobId: string, queueName?: string) => {
const queue = await getBackendQueue(queueName)
const queue = await getQueue(queueName)
if (!queue) {
return
}
@ -152,12 +154,12 @@ export const jobStateToTaskState = (
export const createWorker = (connection: ConnectionOptions) =>
new Worker(
QUEUE_NAME,
BACKEND_QUEUE_NAME,
async (job: Job) => {
const executeJob = async (job: Job) => {
switch (job.name) {
case 'refresh-all-feeds': {
const queue = await getBackendQueue()
const queue = await getQueue()
const counts = await queue?.getJobCounts('prioritized')
if (counts && counts.wait > 1000) {
return
@ -239,7 +241,7 @@ export const createWorker = (connection: ConnectionOptions) =>
)
const setupCronJobs = async () => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
logger.error('Unable to setup cron jobs. Queue is not available.')
return
@ -278,7 +280,7 @@ const main = async () => {
})
app.get('/metrics', async (_, res) => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
res.sendStatus(400)
return
@ -295,7 +297,7 @@ const main = async () => {
jobsTypes.forEach((metric, idx) => {
output += `# TYPE omnivore_queue_messages_${metric} gauge\n`
output += `omnivore_queue_messages_${metric}{queue="${QUEUE_NAME}"} ${counts[metric]}\n`
output += `omnivore_queue_messages_${metric}{queue="${BACKEND_QUEUE_NAME}"} ${counts[metric]}\n`
})
if (redisDataSource.redisClient) {
@ -311,7 +313,7 @@ const main = async () => {
)
if (cursor != '0') {
output += `# TYPE omnivore_read_position_messages gauge\n`
output += `omnivore_read_position_messages{queue="${QUEUE_NAME}"} ${10_001}\n`
output += `omnivore_read_position_messages{queue="${BACKEND_QUEUE_NAME}"} ${10_001}\n`
} else if (batch) {
output += `# TYPE omnivore_read_position_messages gauge\n`
output += `omnivore_read_position_messages{} ${batch.length}\n`
@ -324,10 +326,10 @@ const main = async () => {
const currentTime = Date.now()
const ageInSeconds = (currentTime - oldestJobs[0].timestamp) / 1000
output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n`
output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE_NAME}"} ${ageInSeconds}\n`
output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${BACKEND_QUEUE_NAME}"} ${ageInSeconds}\n`
} else {
output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n`
output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE_NAME}"} ${0}\n`
output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${BACKEND_QUEUE_NAME}"} ${0}\n`
}
const metrics = await getMetrics()
@ -357,7 +359,7 @@ const main = async () => {
await setupCronJobs()
const queueEvents = new QueueEvents(QUEUE_NAME, {
const queueEvents = new QueueEvents(BACKEND_QUEUE_NAME, {
connection: workerRedisClient,
})

View File

@ -245,8 +245,9 @@ export const importFromIntegrationResolver = authorized<
authToken,
integration.importItemState || ImportItemState.Unarchived
)
// update task name in integration
await updateIntegration(integration.id, { taskName }, uid)
log.info('task created', taskName)
// // update task name in integration
// await updateIntegration(integration.id, { taskName }, uid)
analytics.capture({
distinctId: uid,

View File

@ -9,7 +9,7 @@ import {
} from '../generated/graphql'
import { createPubSubClient, PubsubClient } from '../pubsub'
import { redisDataSource } from '../redis_data_source'
import { enqueueParseRequest } from '../utils/createTask'
import { enqueueFetchContentJob } from '../utils/createTask'
import { cleanUrl, generateSlug } from '../utils/helpers'
import { logger } from '../utils/logger'
import { createOrUpdateLibraryItem } from './library_item'
@ -57,7 +57,7 @@ const addRecentSavedItem = async (userId: string) => {
// default: use normal queue
const getPriorityByRateLimit = async (
userId: string
): Promise<'low' | 'high' | undefined> => {
): Promise<'low' | 'high'> => {
const redisClient = redisDataSource.redisClient
if (redisClient) {
const oneMinuteAgo = Date.now() - 60 * 1000
@ -75,6 +75,8 @@ const getPriorityByRateLimit = async (
logger.error('Failed to get priority by rate limit', { userId, error })
}
}
return 'high'
}
export const validateUrl = (url: string): URL => {
@ -157,21 +159,24 @@ export const createPageSaveRequest = async ({
// get priority by checking rate limit if not specified
priority = priority || (await getPriorityByRateLimit(userId))
logger.debug('priority', { priority })
// enqueue task to parse item
await enqueueParseRequest({
await enqueueFetchContentJob({
url,
userId,
saveRequestId: libraryItem.id,
users: [
{
folder,
id: userId,
libraryItemId: libraryItem.id,
},
],
priority,
state,
labels,
locale,
timezone,
savedAt,
publishedAt,
folder,
savedAt: savedAt?.toISOString(),
publishedAt: publishedAt?.toISOString(),
rssFeedUrl: subscription,
})

View File

@ -1368,7 +1368,7 @@ export const deleteLibraryItemsByUserId = async (userId: string) => {
}
export const batchDelete = async (criteria: FindOptionsWhere<LibraryItem>) => {
const batchSize = 1000
const batchSize = 20
const qb = libraryItemRepository.createQueryBuilder().where(criteria)
const countSql = queryBuilderToRawSql(qb.select('COUNT(1)'))

View File

@ -65,7 +65,7 @@ import {
UploadContentJobData,
UPLOAD_CONTENT_JOB,
} from '../jobs/upload_content'
import { getBackendQueue, JOB_VERSION } from '../queue-processor'
import { CONTENT_FETCH_QUEUE, getQueue, JOB_VERSION } from '../queue-processor'
import { redisDataSource } from '../redis_data_source'
import { writeDigest } from '../services/digest'
import { signFeatureToken } from '../services/features'
@ -78,6 +78,8 @@ import View = google.cloud.tasks.v2.Task.View
// Instantiates a client.
const client = new CloudTasksClient()
const FETCH_CONTENT_JOB = 'fetch-content'
/**
* we want to prioritized jobs by the expected time to complete
* lower number means higher priority
@ -94,17 +96,21 @@ export const getJobPriority = (jobName: string): number => {
case SYNC_READ_POSITIONS_JOB_NAME:
case SEND_EMAIL_JOB:
case UPDATE_HOME_JOB:
case `${FETCH_CONTENT_JOB}_high`:
return 1
case TRIGGER_RULE_JOB_NAME:
case CALL_WEBHOOK_JOB_NAME:
case AI_SUMMARIZE_JOB_NAME:
case PROCESS_YOUTUBE_VIDEO_JOB_NAME:
case `${FETCH_CONTENT_JOB}_low`:
case `${FETCH_CONTENT_JOB}_rss_high`:
return 5
case BULK_ACTION_JOB_NAME:
case `${REFRESH_FEED_JOB_NAME}_high`:
case PROCESS_YOUTUBE_TRANSCRIPT_JOB_NAME:
case UPLOAD_CONTENT_JOB:
case SCORE_LIBRARY_ITEM_JOB:
case `${FETCH_CONTENT_JOB}_rss_low`:
return 10
case `${REFRESH_FEED_JOB_NAME}_low`:
case EXPORT_ITEM_JOB_NAME:
@ -320,6 +326,25 @@ export const deleteTask = async (
}
}
export interface FetchContentJobData {
url: string
users: Array<{
id: string
folder?: string
libraryItemId: string
}>
priority: 'low' | 'high'
state?: ArticleSavingRequestStatus
labels?: Array<CreateLabelInput>
locale?: string
timezone?: string
savedAt?: string
publishedAt?: string
folder?: string
rssFeedUrl?: string
source?: string
}
/**
* Enqueues the task for the article content parsing with Puppeteer by URL
* @param url - URL address of the article to parse
@ -329,88 +354,43 @@ export const deleteTask = async (
* @param queue - Queue name
* @returns Name of the task created
*/
export const enqueueParseRequest = async ({
url,
userId,
saveRequestId,
priority = 'high',
queue = env.queue.name,
state,
labels,
locale,
timezone,
savedAt,
publishedAt,
folder,
rssFeedUrl,
}: {
url: string
userId: string
saveRequestId: string
priority?: 'low' | 'high'
queue?: string
state?: ArticleSavingRequestStatus
labels?: CreateLabelInput[]
locale?: string
timezone?: string
savedAt?: Date
publishedAt?: Date
folder?: string
rssFeedUrl?: string
}): Promise<string> => {
const { GOOGLE_CLOUD_PROJECT } = process.env
const payload = {
url,
userId,
saveRequestId,
state,
labels,
locale,
timezone,
savedAt,
publishedAt,
folder,
rssFeedUrl,
priority,
export const enqueueFetchContentJob = async (
data: FetchContentJobData
): Promise<string> => {
const queue = await getQueue(CONTENT_FETCH_QUEUE)
if (!queue) {
throw new Error('No queue found')
}
// If there is no Google Cloud Project Id exposed, it means that we are in local environment
if (env.dev.isLocal || !GOOGLE_CLOUD_PROJECT) {
if (env.queue.contentFetchUrl) {
// Calling the handler function directly.
setTimeout(() => {
axios.post(env.queue.contentFetchUrl, payload).catch((error) => {
logError(error)
logger.error(
`Error occurred while requesting local puppeteer-parse function\nPlease, ensure your function is set up properly and running using "yarn start" from the "/pkg/gcf/puppeteer-parse" folder`
)
})
}, 0)
}
return ''
}
const jobName = `${FETCH_CONTENT_JOB}${data.rssFeedUrl ? '_rss' : ''}_${
data.priority
}`
// use GCF url for low priority tasks
const taskHandlerUrl =
priority === 'low'
? env.queue.contentFetchGCFUrl
: env.queue.contentFetchUrl
// sort the data to make sure the hash is consistent
const sortedData = JSON.stringify(data, Object.keys(data).sort())
const jobId = `${FETCH_CONTENT_JOB}_${stringToHash(
sortedData
)}_${JOB_VERSION}`
const priority = getJobPriority(jobName)
const createdTasks = await createHttpTaskWithToken({
project: GOOGLE_CLOUD_PROJECT,
payload,
const job = await queue.add(FETCH_CONTENT_JOB, data, {
jobId,
removeOnComplete: true,
removeOnFail: true,
priority,
taskHandlerUrl,
queue,
attempts: 2,
backoff: {
type: 'exponential',
delay: 2000,
},
})
if (!createdTasks || !createdTasks[0].name) {
logger.error(`Unable to get the name of the task`, {
payload,
createdTasks,
})
throw new CreateTaskError(`Unable to get the name of the task`)
if (!job || !job.id) {
logger.error('Error while enqueuing fetch-content job', data)
throw new Error('Error while enqueuing fetch-content job')
}
return createdTasks[0].name
return job.id
}
export const enqueueReminder = async (
@ -629,7 +609,7 @@ export const enqueueExportToIntegration = async (
integrationId: string,
userId: string
) => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
return undefined
}
@ -647,7 +627,7 @@ export const enqueueThumbnailJob = async (
userId: string,
libraryItemId: string
) => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
return undefined
}
@ -714,7 +694,7 @@ export const enqueueRssFeedFetch = async (
}
export const enqueueTriggerRuleJob = async (data: TriggerRuleJobData) => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
return undefined
}
@ -726,7 +706,7 @@ export const enqueueTriggerRuleJob = async (data: TriggerRuleJobData) => {
}
export const enqueueWebhookJob = async (data: CallWebhookJobData) => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
return undefined
}
@ -738,7 +718,7 @@ export const enqueueWebhookJob = async (data: CallWebhookJobData) => {
}
export const enqueueAISummarizeJob = async (data: AISummarizeJobData) => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
return undefined
}
@ -752,7 +732,7 @@ export const enqueueAISummarizeJob = async (data: AISummarizeJobData) => {
export const enqueueProcessYouTubeVideo = async (
data: ProcessYouTubeVideoJobData
) => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
return undefined
}
@ -767,7 +747,7 @@ export const enqueueProcessYouTubeVideo = async (
export const enqueueProcessYouTubeTranscript = async (
data: ProcessYouTubeTranscriptJobData
) => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
return undefined
}
@ -780,7 +760,7 @@ export const enqueueProcessYouTubeTranscript = async (
}
export const bulkEnqueueUpdateLabels = async (data: UpdateLabelsData[]) => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
return []
}
@ -806,7 +786,7 @@ export const bulkEnqueueUpdateLabels = async (data: UpdateLabelsData[]) => {
}
export const enqueueUpdateHighlight = async (data: UpdateHighlightData) => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
return undefined
}
@ -825,7 +805,7 @@ export const enqueueUpdateHighlight = async (data: UpdateHighlightData) => {
}
export const enqueueBulkAction = async (data: BulkActionData) => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
return undefined
}
@ -846,7 +826,7 @@ export const enqueueBulkAction = async (data: BulkActionData) => {
}
export const enqueueExportItem = async (jobData: ExportItemJobData) => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
return undefined
}
@ -862,7 +842,7 @@ export const enqueueExportItem = async (jobData: ExportItemJobData) => {
}
export const enqueueSendEmail = async (jobData: SendEmailJobData) => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
return undefined
}
@ -881,7 +861,7 @@ export const scheduledDigestJobOptions = (
})
export const removeDigestJobs = async (userId: string) => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
throw new Error('No queue found')
}
@ -911,7 +891,7 @@ export const enqueueCreateDigest = async (
data: CreateDigestData,
schedule?: CreateDigestJobSchedule
): Promise<CreateDigestJobResponse> => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
throw new Error('No queue found')
}
@ -974,7 +954,7 @@ export const enqueueCreateDigest = async (
export const enqueueBulkUploadContentJob = async (
data: UploadContentJobData[]
) => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
return ''
}
@ -998,7 +978,7 @@ export const updateHomeJobId = (userId: string) =>
`${UPDATE_HOME_JOB}_${userId}_${JOB_VERSION}`
export const enqueueUpdateHomeJob = async (data: UpdateHomeJobData) => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
return undefined
}
@ -1016,7 +996,7 @@ export const updateScoreJobId = (userId: string) =>
`${SCORE_LIBRARY_ITEM_JOB}_${userId}_${JOB_VERSION}`
export const enqueueScoreJob = async (data: ScoreLibraryItemJobData) => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
return undefined
}
@ -1034,7 +1014,7 @@ export const enqueueGeneratePreviewContentJob = async (
libraryItemId: string,
userId: string
) => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
return undefined
}
@ -1056,7 +1036,7 @@ export const enqueueGeneratePreviewContentJob = async (
}
export const enqueuePruneTrashJob = async (numDays: number) => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
return undefined
}
@ -1075,7 +1055,7 @@ export const enqueuePruneTrashJob = async (numDays: number) => {
}
export const enqueueExpireFoldersJob = async () => {
const queue = await getBackendQueue()
const queue = await getQueue()
if (!queue) {
return undefined
}

View File

@ -646,8 +646,8 @@ describe('Article API', () => {
context('when the source is rss-feeder and url is from youtube.com', () => {
const source = 'rss-feeder'
const stub = sinon.stub(createTask, 'enqueueParseRequest')
const stub2 = sinon.stub(createTask, 'enqueueProcessYouTubeVideo')
const stub = sinon.stub(createTask, 'enqueueFetchContentJob')
sinon.stub(createTask, 'enqueueProcessYouTubeVideo')
before(() => {
url = 'https://www.youtube.com/watch?v=123'
@ -678,7 +678,11 @@ describe('Article API', () => {
const url = 'https://blog.omnivore.app/new-url-1'
before(() => {
sinon.replace(createTask, 'enqueueParseRequest', sinon.fake.resolves(''))
sinon.replace(
createTask,
'enqueueFetchContentJob',
sinon.fake.resolves('')
)
})
beforeEach(() => {

View File

@ -13,17 +13,9 @@ import * as createTask from '../../src/utils/createTask'
import { createTestUser } from '../db'
import { graphqlRequest, request } from '../util'
const articleSavingRequestQuery = ({
id,
url,
}: {
id?: string
url?: string
}) => `
query {
articleSavingRequest(id: ${id ? `"${id}"` : null}, url: ${
url ? `"${url}"` : null
}) {
const articleSavingRequestQuery = `
query ArticleSavingRequest($id: ID, $url: String) {
articleSavingRequest(id: $id, url: $url) {
... on ArticleSavingRequestSuccess {
articleSavingRequest {
id
@ -74,9 +66,9 @@ describe('ArticleSavingRequest API', () => {
.post('/local/debug/fake-user-login')
.send({ fakeEmail: user.email })
authToken = res.body.authToken
authToken = res.body.authToken as string
sinon.replace(createTask, 'enqueueParseRequest', sinon.fake.resolves(''))
sinon.replace(createTask, 'enqueueFetchContentJob', sinon.fake.resolves(''))
})
after(async () => {
@ -131,14 +123,14 @@ describe('ArticleSavingRequest API', () => {
createArticleSavingRequestMutation(url),
authToken
).expect(200)
id = res.body.data.createArticleSavingRequest.articleSavingRequest.id
id = res.body.data.createArticleSavingRequest.articleSavingRequest
.id as string
})
it('returns the article saving request if exists', async () => {
const res = await graphqlRequest(
articleSavingRequestQuery({ url }),
authToken
).expect(200)
const res = await graphqlRequest(articleSavingRequestQuery, authToken, {
id,
}).expect(200)
expect(
res.body.data.articleSavingRequest.articleSavingRequest.status
@ -146,10 +138,9 @@ describe('ArticleSavingRequest API', () => {
})
it('returns the user profile info', async () => {
const res = await graphqlRequest(
articleSavingRequestQuery({ url }),
authToken
).expect(200)
const res = await graphqlRequest(articleSavingRequestQuery, authToken, {
url,
}).expect(200)
expect(
res.body.data.articleSavingRequest.articleSavingRequest.user.profile
@ -158,10 +149,9 @@ describe('ArticleSavingRequest API', () => {
})
it('returns the article saving request by id', async () => {
const res = await graphqlRequest(
articleSavingRequestQuery({ id }),
authToken
).expect(200)
const res = await graphqlRequest(articleSavingRequestQuery, authToken, {
id,
}).expect(200)
expect(
res.body.data.articleSavingRequest.articleSavingRequest.status
@ -169,10 +159,9 @@ describe('ArticleSavingRequest API', () => {
})
it('returns not_found if not exists', async () => {
const res = await graphqlRequest(
articleSavingRequestQuery({ id: 'invalid-id' }),
authToken
).expect(200)
const res = await graphqlRequest(articleSavingRequestQuery, authToken, {
id: 'invalid-id',
}).expect(200)
expect(res.body.data.articleSavingRequest.errorCodes).to.eql([
ArticleSavingRequestErrorCode.NotFound,

View File

@ -19,7 +19,7 @@ chai.use(sinonChai)
describe('Integrations resolvers', () => {
const READWISE_API_URL = 'https://readwise.io/api/v2'
let loginUser: User
let authToken: string
@ -30,7 +30,7 @@ describe('Integrations resolvers', () => {
.post('/local/debug/fake-user-login')
.send({ fakeEmail: loginUser.email })
authToken = res.body.authToken
authToken = res.body.authToken as string
})
after(async () => {
@ -39,19 +39,9 @@ describe('Integrations resolvers', () => {
describe('setIntegration API', () => {
const validToken = 'valid-token'
const query = (
id = '',
name = 'READWISE',
token: string = 'test token',
enabled = true
) => `
mutation {
setIntegration(input: {
id: "${id}",
name: "${name}",
token: "${token}",
enabled: ${enabled},
}) {
const query = `
mutation SetIntegration($input: SetIntegrationInput!) {
setIntegration(input: $input) {
... on SetIntegrationSuccess {
integration {
id
@ -79,6 +69,8 @@ describe('Integrations resolvers', () => {
.reply(204)
.persist()
integrationName = 'READWISE'
enabled = true
token = 'test token'
})
after(() => {
@ -101,10 +93,14 @@ describe('Integrations resolvers', () => {
})
it('returns InvalidToken error code', async () => {
const res = await graphqlRequest(
query(integrationId, integrationName, token),
authToken
)
const res = await graphqlRequest(query, authToken, {
input: {
id: integrationId,
name: integrationName,
token,
enabled,
},
})
expect(res.body.data.setIntegration.errorCodes).to.eql([
SetIntegrationErrorCode.InvalidToken,
])
@ -124,10 +120,14 @@ describe('Integrations resolvers', () => {
})
it('creates new integration', async () => {
const res = await graphqlRequest(
query(integrationId, integrationName, token),
authToken
)
const res = await graphqlRequest(query, authToken, {
input: {
id: integrationId,
name: integrationName,
token,
enabled,
},
})
expect(res.body.data.setIntegration.integration.enabled).to.be.true
})
})
@ -142,10 +142,9 @@ describe('Integrations resolvers', () => {
})
it('returns NotFound error code', async () => {
const res = await graphqlRequest(
query(integrationId, integrationName),
authToken
)
const res = await graphqlRequest(query, authToken, {
input: { id: integrationId, name: integrationName, enabled, token },
})
expect(res.body.data.setIntegration.errorCodes).to.eql([
SetIntegrationErrorCode.NotFound,
])
@ -163,6 +162,7 @@ describe('Integrations resolvers', () => {
user: { id: otherUser.id },
name: 'READWISE',
token: 'fakeToken',
enabled,
},
otherUser.id
)
@ -175,10 +175,14 @@ describe('Integrations resolvers', () => {
})
it('returns Unauthorized error code', async () => {
const res = await graphqlRequest(
query(integrationId, integrationName),
authToken
)
const res = await graphqlRequest(query, authToken, {
input: {
id: integrationId,
name: integrationName,
enabled,
token,
},
})
expect(res.body.data.setIntegration.errorCodes).to.eql([
SetIntegrationErrorCode.NotFound,
])
@ -192,6 +196,7 @@ describe('Integrations resolvers', () => {
user: { id: loginUser.id },
name: 'READWISE',
token: 'fakeToken',
enabled,
},
loginUser.id
)
@ -208,17 +213,25 @@ describe('Integrations resolvers', () => {
})
afterEach(async () => {
await updateIntegration(existingIntegration.id, {
taskName: 'some task name',
enabled: true,
}, loginUser.id)
await updateIntegration(
existingIntegration.id,
{
taskName: 'some task name',
enabled: true,
},
loginUser.id
)
})
it('disables integration', async () => {
const res = await graphqlRequest(
query(integrationId, integrationName, token, enabled),
authToken
)
const res = await graphqlRequest(query, authToken, {
input: {
id: integrationId,
name: integrationName,
token,
enabled,
},
})
expect(res.body.data.setIntegration.integration.enabled).to.be
.false
})
@ -230,17 +243,25 @@ describe('Integrations resolvers', () => {
})
afterEach(async () => {
await updateIntegration(existingIntegration.id, {
taskName: null,
enabled: false,
}, loginUser.id)
await updateIntegration(
existingIntegration.id,
{
taskName: null,
enabled: false,
},
loginUser.id
)
})
it('enables integration', async () => {
const res = await graphqlRequest(
query(integrationId, integrationName, token, enabled),
authToken
)
const res = await graphqlRequest(query, authToken, {
input: {
id: integrationId,
name: integrationName,
token,
enabled,
},
})
expect(res.body.data.setIntegration.integration.enabled).to.be
.true
})
@ -333,9 +354,12 @@ describe('Integrations resolvers', () => {
query(existingIntegration.id),
authToken
)
const integration = await findIntegration({
id: existingIntegration.id,
}, loginUser.id)
const integration = await findIntegration(
{
id: existingIntegration.id,
},
loginUser.id
)
expect(res.body.data.deleteIntegration.integration).to.be.an('object')
expect(res.body.data.deleteIntegration.integration.id).to.eql(
@ -383,10 +407,6 @@ describe('Integrations resolvers', () => {
authToken
).expect(200)
expect(res.body.data.importFromIntegration.success).to.be.true
const integration = await findIntegration({
id: existingIntegration.id,
}, loginUser.id)
expect(integration?.taskName).not.to.be.null
})
})

View File

@ -27,7 +27,7 @@ describe('/article/save API', () => {
.post('/local/debug/fake-user-login')
.send({ fakeEmail: user.email })
authToken = res.body.authToken
authToken = res.body.authToken as string
})
after(async () => {
@ -39,7 +39,11 @@ describe('/article/save API', () => {
const url = 'https://blog.omnivore.app'
before(() => {
sinon.replace(createTask, 'enqueueParseRequest', sinon.fake.resolves(''))
sinon.replace(
createTask,
'enqueueFetchContentJob',
sinon.fake.resolves('')
)
})
after(() => {

View File

@ -4,7 +4,7 @@ import { nanoid } from 'nanoid'
import supertest from 'supertest'
import { v4 } from 'uuid'
import { makeApolloServer } from '../src/apollo'
import { createWorker, QUEUE_NAME } from '../src/queue-processor'
import { BACKEND_QUEUE_NAME, createWorker } from '../src/queue-processor'
import { createApp } from '../src/server'
import { corsConfig } from '../src/utils/corsConfig'
@ -26,7 +26,7 @@ export const stopApolloServer = async () => {
export const startWorker = (connection: ConnectionOptions) => {
worker = createWorker(connection)
queueEvents = new QueueEvents(QUEUE_NAME, {
queueEvents = new QueueEvents(BACKEND_QUEUE_NAME, {
connection,
})
}

View File

@ -1,53 +0,0 @@
FROM node:18.16-alpine
# Installs latest Chromium (92) package.
RUN apk add --no-cache \
chromium \
nss \
freetype \
harfbuzz \
ca-certificates \
ttf-freefont \
nodejs \
yarn \
g++ \
make \
python3
WORKDIR /app
ENV CHROMIUM_PATH /usr/bin/chromium-browser
ENV LAUNCH_HEADLESS=true
ENV PORT 9090
COPY package.json .
COPY yarn.lock .
COPY tsconfig.json .
COPY .prettierrc .
COPY .eslintrc .
COPY /packages/content-fetch/package.json ./packages/content-fetch/package.json
COPY /packages/content-handler/package.json ./packages/content-handler/package.json
COPY /packages/puppeteer-parse/package.json ./packages/puppeteer-parse/package.json
COPY /packages/utils/package.json ./packages/utils/package.json
RUN yarn install --pure-lockfile
ADD /packages/content-handler ./packages/content-handler
ADD /packages/puppeteer-parse ./packages/puppeteer-parse
ADD /packages/content-fetch ./packages/content-fetch
ADD /packages/utils ./packages/utils
RUN yarn workspace @omnivore/utils build
RUN yarn workspace @omnivore/content-handler build
RUN yarn workspace @omnivore/puppeteer-parse build
RUN yarn workspace @omnivore/content-fetch build
# After building, fetch the production dependencies
RUN rm -rf /app/packages/content-fetch/node_modules
RUN rm -rf /app/node_modules
RUN yarn install --pure-lockfile --production
EXPOSE 9090
# USER pptruser
ENTRYPOINT ["yarn", "workspace", "@omnivore/content-fetch", "start_gcf"]

View File

@ -7,17 +7,20 @@
"build/src"
],
"dependencies": {
"bullmq": "^5.1.1",
"dotenv": "^8.2.0",
"express": "^4.17.1",
"posthog-node": "^3.6.3",
"@google-cloud/functions-framework": "^3.0.0",
"@google-cloud/storage": "^7.0.1",
"@omnivore/puppeteer-parse": "^1.0.0",
"@omnivore/utils": "1.0.0",
"@sentry/serverless": "^7.77.0"
"axios": "^0.27.2",
"bullmq": "^5.1.1",
"dotenv": "^8.2.0",
"express": "^4.17.1",
"express-async-handler": "^1.2.0",
"jsonwebtoken": "^8.5.1",
"posthog-node": "^3.6.3"
},
"devDependencies": {
"@types/express": "^4.17.1",
"@types/jsonwebtoken": "^8.5.0",
"chai": "^4.3.6",
"mocha": "^10.0.0"
},
@ -26,8 +29,7 @@
"test:typecheck": "tsc --noEmit",
"lint": "eslint src --ext ts,js,tsx,jsx",
"build": "tsc",
"start": "node build/src/app.js",
"start_gcf": "functions-framework --port=9090 --target=puppeteer"
"start": "node build/src/app.js"
},
"volta": {
"extends": "../../package.json"

View File

@ -1,34 +1,124 @@
import 'dotenv/config'
import express from 'express'
import { contentFetchRequestHandler } from './request_handler'
import { RedisDataSource } from '@omnivore/utils'
import { JobType } from 'bullmq'
import express, { Express } from 'express'
import asyncHandler from 'express-async-handler'
import { createWorker, getQueue, QUEUE } from './worker'
const app = express()
const main = () => {
console.log('Starting worker...')
app.use(express.json())
app.use(express.urlencoded({ extended: true }))
const app: Express = express()
const port = process.env.PORT || 3002
if (!process.env.VERIFICATION_TOKEN) {
throw new Error('VERIFICATION_TOKEN environment variable is not set')
// create redis source
const redisDataSource = new RedisDataSource({
cache: {
url: process.env.REDIS_URL,
cert: process.env.REDIS_CERT,
},
mq: {
url: process.env.MQ_REDIS_URL,
cert: process.env.MQ_REDIS_CERT,
},
})
const worker = createWorker(redisDataSource)
// respond healthy to auto-scaler.
app.get('/_ah/health', (req, res) => res.sendStatus(200))
app.get(
'/lifecycle/prestop',
asyncHandler(async (_req, res) => {
console.log('Prestop lifecycle hook called.')
await worker.close()
console.log('Worker closed')
res.sendStatus(200)
})
)
app.get(
'/metrics',
asyncHandler(async (_, res) => {
let output = ''
const queue = await getQueue(redisDataSource.queueRedisClient)
const jobsTypes: Array<JobType> = [
'active',
'failed',
'completed',
'prioritized',
]
const counts = await queue.getJobCounts(...jobsTypes)
jobsTypes.forEach((metric) => {
output += `# TYPE omnivore_queue_messages_${metric} gauge\n`
output += `omnivore_queue_messages_${metric}{queue="${QUEUE}"} ${counts[metric]}\n`
})
// Export the age of the oldest prioritized job in the queue
const oldestJobs = await queue.getJobs(['prioritized'], 0, 1, true)
if (oldestJobs.length > 0) {
const currentTime = Date.now()
const ageInSeconds = (currentTime - oldestJobs[0].timestamp) / 1000
output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n`
output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE}"} ${ageInSeconds}\n`
} else {
output += `# TYPE omnivore_queue_messages_oldest_job_age_seconds gauge\n`
output += `omnivore_queue_messages_oldest_job_age_seconds{queue="${QUEUE}"} ${0}\n`
}
res.status(200).setHeader('Content-Type', 'text/plain').send(output)
})
)
const server = app.listen(port, () => {
console.log('Worker started')
})
const gracefulShutdown = async (signal: string) => {
console.log(`Received ${signal}, closing server...`)
await new Promise<void>((resolve) => {
server.close((err) => {
console.log('Express server closed')
if (err) {
console.log('Error stopping server', { err })
}
resolve()
})
})
await worker.close()
console.log('Worker closed')
await redisDataSource.shutdown()
console.log('Redis connection closed')
process.exit(0)
}
// eslint-disable-next-line @typescript-eslint/no-misused-promises
process.on('SIGINT', () => gracefulShutdown('SIGINT'))
// eslint-disable-next-line @typescript-eslint/no-misused-promises
process.on('SIGTERM', () => gracefulShutdown('SIGTERM'))
process.on('uncaughtException', function (err) {
// Handle the error safely
console.error(err, 'Uncaught exception')
})
process.on('unhandledRejection', (reason, promise) => {
// Handle the error safely
console.error({ promise, reason }, 'Unhandled Rejection at: Promise')
})
}
app.get('/_ah/health', (req, res) => res.sendStatus(200))
app.all('/', (req, res, next) => {
if (req.method !== 'GET' && req.method !== 'POST') {
console.error('request method is not GET or POST')
return res.sendStatus(405)
}
if (req.query.token !== process.env.VERIFICATION_TOKEN) {
console.error('query does not include valid token')
return res.sendStatus(403)
}
return contentFetchRequestHandler(req, res, next)
})
const PORT = process.env.PORT ? parseInt(process.env.PORT) : 8080
app.listen(PORT, () => {
console.log(`App listening on port ${PORT}`)
console.log('Press Ctrl+C to quit.')
})
// only call main if the file was called from the CLI and wasn't required from another module
if (require.main === module) {
main()
}

View File

@ -1,31 +0,0 @@
import { HttpFunction } from '@google-cloud/functions-framework'
import * as Sentry from '@sentry/serverless'
import 'dotenv/config'
import { contentFetchRequestHandler } from './request_handler'
Sentry.GCPFunction.init({
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 0,
})
/**
* Cloud Function entry point, HTTP trigger.
* Loads the requested URL via Puppeteer, captures page content and sends it to backend
*
* @param {Object} req Cloud Function request context.
* @param {Object} res Cloud Function response context.
*/
export const puppeteer = Sentry.GCPFunction.wrapHttpFunction(
contentFetchRequestHandler as HttpFunction
)
/**
* Cloud Function entry point, HTTP trigger.
* Loads the requested URL via Puppeteer and captures a screenshot of the provided element
*
* @param {Object} req Cloud Function request context.
* Inlcudes:
* * url - URL address of the page to open
* @param {Object} res Cloud Function response context.
*/
// exports.preview = Sentry.GCPFunction.wrapHttpFunction(preview);

View File

@ -38,23 +38,24 @@ const getPriority = (job: SavePageJob): number => {
// priority 5: jobs that are expected to finish in less than 10 second
// priority 10: jobs that are expected to finish in less than 10 minutes
// priority 100: jobs that are expected to finish in less than 1 hour
if (job.isRss) {
return 10
}
if (job.isImport) {
return 100
}
return job.priority === 'low' ? 10 : 1
if (job.isRss) {
return job.priority === 'low' ? 10 : 5
}
return job.priority === 'low' ? 5 : 1
}
const getAttempts = (job: SavePageJob): number => {
if (job.isRss || job.isImport) {
// we don't want to retry rss or import jobs
if (job.isImport) {
// we don't want to retry import jobs
return 1
}
return 3
return job.isRss ? 2 : 3
}
const getOpts = (job: SavePageJob): BulkJobOptions => {

View File

@ -1,8 +1,10 @@
import { Storage } from '@google-cloud/storage'
import { fetchContent } from '@omnivore/puppeteer-parse'
import { RedisDataSource } from '@omnivore/utils'
import axios from 'axios'
import 'dotenv/config'
import { RequestHandler } from 'express'
import jwt from 'jsonwebtoken'
import { promisify } from 'util'
import { analytics } from './analytics'
import { queueSavePageJob } from './job'
@ -12,7 +14,7 @@ interface UserConfig {
folder?: string
}
interface RequestBody {
export interface JobData {
url: string
userId?: string
saveRequestId: string
@ -65,8 +67,17 @@ const bucketName = process.env.GCS_UPLOAD_BUCKET || 'omnivore-files'
const NO_CACHE_URLS = [
'https://deviceandbrowserinfo.com/are_you_a_bot',
'https://deviceandbrowserinfo.com/info_device',
'https://jacksonh.org',
]
const signToken = promisify(jwt.sign)
const IMPORTER_METRICS_COLLECTOR_URL =
process.env.IMPORTER_METRICS_COLLECTOR_URL
const JWT_SECRET = process.env.JWT_SECRET
const MAX_IMPORT_ATTEMPTS = 1
const uploadToBucket = async (filePath: string, data: string) => {
await storage
.bucket(bucketName)
@ -175,36 +186,71 @@ const incrementContentFetchFailure = async (
}
}
export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
const sendImportStatusUpdate = async (
userId: string,
taskId: string,
isImported?: boolean
) => {
try {
if (!JWT_SECRET || !IMPORTER_METRICS_COLLECTOR_URL) {
console.error('JWT_SECRET or IMPORTER_METRICS_COLLECTOR_URL is not set')
return
}
console.log('sending import status update')
const auth = await signToken({ uid: userId }, JWT_SECRET)
await axios.post(
IMPORTER_METRICS_COLLECTOR_URL,
{
taskId,
status: isImported ? 'imported' : 'failed',
},
{
headers: {
Authorization: auth as string,
'Content-Type': 'application/json',
},
timeout: 5000,
}
)
} catch (e) {
console.error('Failed to send import status update', e)
}
}
export const processFetchContentJob = async (
redisDataSource: RedisDataSource,
data: JobData,
attemptsMade: number
) => {
const functionStartTime = Date.now()
const body = <RequestBody>req.body
// users is used when saving article for multiple users
let users = body.users || []
const userId = body.userId
let users = data.users || []
const userId = data.userId
// userId is used when saving article for a single user
if (userId) {
users = [
{
id: userId,
folder: body.folder,
libraryItemId: body.saveRequestId,
folder: data.folder,
libraryItemId: data.saveRequestId,
},
]
}
const articleSavingRequestId = body.saveRequestId
const state = body.state
const labels = body.labels
const source = body.source || 'puppeteer-parse'
const taskId = body.taskId // taskId is used to update import status
const url = body.url
const locale = body.locale
const timezone = body.timezone
const rssFeedUrl = body.rssFeedUrl
const savedAt = body.savedAt
const publishedAt = body.publishedAt
const priority = body.priority
const articleSavingRequestId = data.saveRequestId
const state = data.state
const labels = data.labels
const source = data.source || 'puppeteer-parse'
const taskId = data.taskId // taskId is used to update import status
const url = data.url
const locale = data.locale
const timezone = data.timezone
const rssFeedUrl = data.rssFeedUrl
const savedAt = data.savedAt
const publishedAt = data.publishedAt
const priority = data.priority
const logRecord: LogRecord = {
url,
@ -214,7 +260,7 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
},
state,
labelsToAdd: labels,
taskId: taskId,
taskId,
locale,
timezone,
rssFeedUrl,
@ -225,25 +271,14 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
console.log(`Article parsing request`, logRecord)
// create redis source
const redisDataSource = new RedisDataSource({
cache: {
url: process.env.REDIS_URL,
cert: process.env.REDIS_CERT,
},
mq: {
url: process.env.MQ_REDIS_URL,
cert: process.env.MQ_REDIS_CERT,
},
})
try {
const domain = new URL(url).hostname
const isBlocked = await isDomainBlocked(redisDataSource, domain)
if (isBlocked) {
console.log('domain is blocked', domain)
logRecord.error = 'domain is blocked'
return res.sendStatus(200)
return
}
const key = cacheKey(url, locale, timezone)
@ -312,7 +347,7 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
logRecord.error = 'unknown error'
}
return res.sendStatus(500)
throw error
} finally {
logRecord.totalTime = Date.now() - functionStartTime
console.log(`parse-page result`, logRecord)
@ -331,8 +366,11 @@ export const contentFetchRequestHandler: RequestHandler = async (req, res) => {
}
)
await redisDataSource.shutdown()
const lastAttempt = attemptsMade + 1 >= MAX_IMPORT_ATTEMPTS
if (logRecord.error && taskId && lastAttempt) {
console.log('sending import status update')
// send failed to import status to update the metrics for importer
await sendImportStatusUpdate(users[0].id, taskId, false)
}
}
res.sendStatus(200)
}

View File

@ -0,0 +1,77 @@
import { RedisDataSource } from '@omnivore/utils'
import { Job, Queue, QueueEvents, RedisClient, Worker } from 'bullmq'
import { JobData, processFetchContentJob } from './request_handler'
export const QUEUE = 'omnivore-content-fetch-queue'
export const getQueue = async (
connection: RedisClient,
queueName = QUEUE
): Promise<Queue> => {
const queue = new Queue(queueName, {
connection,
defaultJobOptions: {
backoff: {
type: 'exponential',
delay: 2000, // 2 seconds
},
removeOnComplete: {
age: 24 * 3600, // keep up to 24 hours
},
removeOnFail: {
age: 7 * 24 * 3600, // keep up to 7 days
},
},
})
await queue.waitUntilReady()
return queue
}
export const createWorker = (
redisDataSource: RedisDataSource,
queueName = QUEUE
) => {
const worker = new Worker(
queueName,
async (job: Job<JobData>) => {
// process the job
await processFetchContentJob(redisDataSource, job.data, job.attemptsMade)
},
{
connection: redisDataSource.queueRedisClient,
autorun: true, // start processing jobs immediately
// process up to 10 jobs in a second
limiter: {
max: 10,
duration: 1000,
},
concurrency: 2, // process up to 2 jobs concurrently
}
)
worker.on('error', (err) => {
console.error('worker error:', err)
})
const queueEvents = new QueueEvents(queueName, {
connection: redisDataSource.queueRedisClient,
})
queueEvents.on('added', (job) => {
console.log('added job:', job.jobId, job.name)
})
queueEvents.on('removed', (job) => {
console.log('removed job:', job.jobId)
})
queueEvents.on('completed', (job) => {
console.log('completed job:', job.jobId)
})
queueEvents.on('failed', (job) => {
console.log('failed job:', job.jobId)
})
return worker
}

View File

@ -36,7 +36,6 @@
"@fast-csv/parse": "^5.0.0",
"@google-cloud/functions-framework": "3.1.2",
"@google-cloud/storage": "^7.0.1",
"@google-cloud/tasks": "^4.0.0",
"@omnivore/readability": "1.0.0",
"@omnivore/utils": "1.0.0",
"@sentry/serverless": "^7.77.0",

View File

@ -10,10 +10,9 @@ import * as path from 'path'
import { promisify } from 'util'
import { v4 as uuid } from 'uuid'
import { importCsv } from './csv'
import { queueEmailJob } from './job'
import { enqueueFetchContentJob, queueEmailJob } from './job'
import { importMatterArchive } from './matterHistory'
import { ImportStatus, updateMetrics } from './metrics'
import { CONTENT_FETCH_URL, createCloudTask } from './task'
export enum ArticleSavingRequestStatus {
Failed = 'FAILED',
@ -94,6 +93,7 @@ const shouldHandle = (data: StorageEvent) => {
}
const importURL = async (
redisDataSource: RedisDataSource,
userId: string,
url: URL,
source: string,
@ -103,18 +103,17 @@ const importURL = async (
savedAt?: Date,
publishedAt?: Date
): Promise<string | undefined> => {
return createCloudTask(CONTENT_FETCH_URL, {
userId,
source,
return enqueueFetchContentJob(redisDataSource, {
url: url.toString(),
saveRequestId: '',
users: [{ id: userId, libraryItemId: '' }],
source,
taskId,
state,
labels: labels?.map((l) => {
return { name: l }
}),
taskId,
savedAt,
publishedAt,
savedAt: savedAt?.toISOString(),
publishedAt: publishedAt?.toISOString(),
})
}
@ -194,6 +193,7 @@ const urlHandler = async (
try {
// Imports are stored in the format imports/<user id>/<type>-<uuid>.csv
const result = await importURL(
ctx.redisDataSource,
ctx.userId,
url,
ctx.source,

View File

@ -1,8 +1,14 @@
import { RedisDataSource } from '@omnivore/utils'
import { Queue } from 'bullmq'
import { ArticleSavingRequestStatus } from '.'
import crypto from 'crypto'
const BACKEND_QUEUE = 'omnivore-backend-queue'
const CONTENT_FETCH_QUEUE = 'omnivore-content-fetch-queue'
const QUEUE_NAME = 'omnivore-backend-queue'
export const SEND_EMAIL_JOB = 'send-email'
const FETCH_CONTENT_JOB = 'fetch-content'
const JOB_VERSION = 'v001'
interface SendEmailJobData {
userId: string
@ -11,13 +17,61 @@ interface SendEmailJobData {
html?: string
}
interface FetchContentJobData {
url: string
users: Array<{
id: string
folder?: string
libraryItemId: string
}>
source: string
taskId: string
state?: ArticleSavingRequestStatus
labels?: Array<{ name: string }>
savedAt?: string
publishedAt?: string
}
export const stringToHash = (str: string): string => {
return crypto.createHash('md5').update(str).digest('hex')
}
export const queueEmailJob = async (
redisDataSource: RedisDataSource,
data: SendEmailJobData
) => {
const queue = new Queue(QUEUE_NAME, {
const queue = new Queue(BACKEND_QUEUE, {
connection: redisDataSource.queueRedisClient,
})
await queue.add(SEND_EMAIL_JOB, data)
}
export const enqueueFetchContentJob = async (
redisDataSource: RedisDataSource,
data: FetchContentJobData
): Promise<string> => {
const queue = new Queue(CONTENT_FETCH_QUEUE, {
connection: redisDataSource.queueRedisClient,
})
// sort the data to make sure the hash is consistent
const sortedData = JSON.stringify(data, Object.keys(data).sort())
const jobId = `${FETCH_CONTENT_JOB}_${stringToHash(
sortedData
)}_${JOB_VERSION}`
const job = await queue.add(FETCH_CONTENT_JOB, data, {
jobId,
removeOnComplete: true,
removeOnFail: true,
priority: 100,
attempts: 1,
})
if (!job || !job.id) {
console.error('Error while enqueuing fetch-content job', data)
throw new Error('Error while enqueuing fetch-content job')
}
return job.id
}

View File

@ -1,48 +0,0 @@
/* eslint-disable @typescript-eslint/restrict-template-expressions */
import { CloudTasksClient, protos } from '@google-cloud/tasks'
const cloudTask = new CloudTasksClient()
export const CONTENT_FETCH_URL = process.env.CONTENT_FETCH_GCF_URL
export const createCloudTask = async (
taskHandlerUrl: string | undefined,
payload: unknown,
requestHeaders?: Record<string, string>,
queue = 'omnivore-import-queue'
) => {
const location = process.env.GCP_LOCATION
const project = process.env.GCP_PROJECT_ID
if (!project || !location || !queue || !taskHandlerUrl) {
throw `Environment not configured: ${project}, ${location}, ${queue}, ${taskHandlerUrl}`
}
const serviceAccountEmail = `${project}@appspot.gserviceaccount.com`
const parent = cloudTask.queuePath(project, location, queue)
const convertedPayload = JSON.stringify(payload)
const body = Buffer.from(convertedPayload).toString('base64')
const task: protos.google.cloud.tasks.v2.ITask = {
httpRequest: {
httpMethod: 'POST',
url: taskHandlerUrl,
headers: {
'Content-Type': 'application/json',
...requestHeaders,
},
body,
...(serviceAccountEmail
? {
oidcToken: {
serviceAccountEmail,
},
}
: null),
},
}
return cloudTask.createTask({ parent, task }).then((result) => {
return result[0].name ?? undefined
})
}

View File

@ -59,7 +59,10 @@ const run = async () => {
const connection = new Redis(secrets.REDIS_URL, redisOptions(secrets))
console.log('set connection: ', connection)
const rssRefreshFeed = new Queue('omnivore-backend-queue', {
const backendQueue = new Queue('omnivore-backend-queue', {
connection: connection,
})
const contentFetchQueue = new Queue('omnivore-content-fetch-queue', {
connection: connection,
})
@ -67,7 +70,10 @@ const run = async () => {
serverAdapter.setBasePath('/ui')
createBullBoard({
queues: [new BullMQAdapter(rssRefreshFeed)],
queues: [
new BullMQAdapter(backendQueue),
new BullMQAdapter(contentFetchQueue),
],
serverAdapter,
})

View File

@ -2799,7 +2799,7 @@
google-gax "^3.5.7"
protobufjs "^7.2.5"
"@google-cloud/functions-framework@3.1.2", "@google-cloud/functions-framework@^3.0.0":
"@google-cloud/functions-framework@3.1.2":
version "3.1.2"
resolved "https://registry.yarnpkg.com/@google-cloud/functions-framework/-/functions-framework-3.1.2.tgz#2cd92ce4307bf7f32555d028dca22e398473b410"
integrity sha512-pYvEH65/Rqh1JNPdcBmorcV7Xoom2/iOSmbtYza8msro7Inl+qOYxbyMiQfySD2gwAyn38WyWPRqsDRcf/BFLg==
@ -8067,7 +8067,7 @@
dependencies:
"@types/express" "*"
"@types/express@*", "@types/express@^4.17.13", "@types/express@^4.17.14", "@types/express@^4.17.21", "@types/express@^4.17.7":
"@types/express@*", "@types/express@^4.17.1", "@types/express@^4.17.13", "@types/express@^4.17.14", "@types/express@^4.17.21", "@types/express@^4.17.7":
version "4.17.21"
resolved "https://registry.yarnpkg.com/@types/express/-/express-4.17.21.tgz#c26d4a151e60efe0084b23dc3369ebc631ed192d"
integrity sha512-ejlPM315qwLpaQlQDTjPdsUFSc6ZsP4AN6AlWnogPjQ7CVi7PYF3YVz+CY3jE2pwYf7E/7HlDAN0rV2GxTG0HQ==
@ -15721,6 +15721,11 @@ expr-eval@^2.0.2:
resolved "https://registry.yarnpkg.com/expr-eval/-/expr-eval-2.0.2.tgz#fa6f044a7b0c93fde830954eb9c5b0f7fbc7e201"
integrity sha512-4EMSHGOPSwAfBiibw3ndnP0AvjDWLsMvGOvWEZ2F96IGk0bIVdjQisOHxReSkE13mHcfbuCiXw+G4y0zv6N8Eg==
express-async-handler@^1.2.0:
version "1.2.0"
resolved "https://registry.yarnpkg.com/express-async-handler/-/express-async-handler-1.2.0.tgz#ffc9896061d90f8d2e71a2d2b8668db5b0934391"
integrity sha512-rCSVtPXRmQSW8rmik/AIb2P0op6l7r1fMW538yyvTMltCO4xQEWMmobfrIxN2V1/mVrgxB8Az3reYF6yUZw37w==
express-http-context2@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/express-http-context2/-/express-http-context2-1.0.0.tgz#58cd9fb0d233739e0dcd7aabb766d1dc74522d77"