Add queue-processor endpoint to the api

This commit is contained in:
Jackson Harper
2024-01-15 09:50:32 +08:00
parent b4914f5346
commit 819052f51f
4 changed files with 214 additions and 46 deletions

View File

@ -5,7 +5,9 @@
"scripts": {
"build": "tsc && yarn copy-files",
"dev": "ts-node-dev --files src/server.ts",
"dev_qp": "ts-node-dev --files src/queue-processor.ts",
"start": "node dist/server.js",
"start_queue_processor": "node dist/queue-processor.js",
"lint": "eslint src --ext ts,js,tsx,jsx",
"lint:fix": "eslint src --fix --ext ts,js,tsx,jsx",
"test:typecheck": "tsc --noEmit",

View File

@ -0,0 +1,91 @@
/* eslint-disable @typescript-eslint/no-floating-promises */
/* eslint-disable @typescript-eslint/restrict-template-expressions */
/* eslint-disable @typescript-eslint/require-await */
/* eslint-disable @typescript-eslint/no-misused-promises */
import express, { Express } from 'express'
import { appDataSource } from './data_source'
import { env } from './env'
import { redisClient, mqRedisClient } from './redis'
import { Worker, Job, QueueEvents } from 'bullmq'
import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds'
import { refreshFeed } from './jobs/rss/refreshFeed'
export const QUEUE_NAME = 'omnivore-backend-queue'
const main = async () => {
console.log('calling queue-processor start')
const app: Express = express()
const port = process.env.PORT || 3002
await appDataSource.initialize()
// respond healthy to auto-scaler.
app.get('/_ah/health', (req, res) => res.sendStatus(200))
// redis is optional
if (env.redis.url) {
redisClient.on('error', (err) => {
console.error('Redis Client Error', err)
})
await redisClient.connect()
console.log('Redis Client Connected:', env.redis.url)
}
// redis for message queue
if (env.redis.url) {
mqRedisClient?.on('error', (err) => {
console.error('Redis Client Error', err)
})
}
const worker = new Worker(
QUEUE_NAME,
async (job: Job) => {
switch (job.name) {
case 'refresh-all-feeds': {
return await refreshAllFeeds(appDataSource, mqRedisClient)
}
case 'refresh-feed': {
return await refreshFeed(redisClient, job.data)
}
}
return true
},
{
connection: mqRedisClient,
}
)
const queueEvents = new QueueEvents(QUEUE_NAME, {
connection: mqRedisClient,
})
queueEvents.on('added', async (job) => {
console.log('added job: ', job.jobId)
})
queueEvents.on('removed', async (job) => {
console.log('removed job: ', job.jobId)
})
queueEvents.on('completed', async (job) => {
console.log('completed job: ', job.jobId)
})
process.on('SIGINT', async () => {
console.log('[queue-processor]: Received SIGTERM. Shutting down.')
await redisClient.disconnect()
mqRedisClient.disconnect()
})
app.listen(port, () => {
console.log(`[queue-processor]: started`)
})
}
// only call main if the file was called from the CLI and wasn't required from another module
if (require.main === module) {
main().catch((e) => console.error(e))
}

View File

@ -12,6 +12,8 @@ import {
RssSubscriptionGroup,
} from '../../utils/createTask'
import { logger } from '../../utils/logger'
import { queueRSSRefreshAllFeedsJob } from '../../jobs/rss/refreshAllFeeds'
import { mqRedisClient } from '../../redis'
export function rssFeedRouter() {
const router = express.Router()
@ -28,44 +30,46 @@ export function rssFeedRouter() {
return res.status(200).send('Expired')
}
// get active rss feed subscriptions scheduled for fetch and group by feed url
const subscriptionGroups = (await getRepository(Subscription).query(
`
SELECT
url,
ARRAY_AGG(id) AS "subscriptionIds",
ARRAY_AGG(user_id) AS "userIds",
ARRAY_AGG(last_fetched_at) AS "fetchedDates",
ARRAY_AGG(coalesce(scheduled_at, NOW())) AS "scheduledDates",
ARRAY_AGG(last_fetched_checksum) AS checksums,
ARRAY_AGG(fetch_content) AS "fetchContents",
ARRAY_AGG(coalesce(folder, $3)) AS folders
FROM
omnivore.subscriptions
WHERE
type = $1
AND status = $2
AND (scheduled_at <= NOW() OR scheduled_at IS NULL)
GROUP BY
url
`,
[
SubscriptionType.Rss,
SubscriptionStatus.Active,
DEFAULT_SUBSCRIPTION_FOLDER,
]
)) as RssSubscriptionGroup[]
await queueRSSRefreshAllFeedsJob(mqRedisClient)
// create a cloud taks to fetch rss feed item for each subscription
await Promise.all(
subscriptionGroups.map((subscriptionGroup) => {
try {
return enqueueRssFeedFetch(subscriptionGroup)
} catch (error) {
logger.info('error creating rss feed fetch task', error)
}
})
)
// // get active rss feed subscriptions scheduled for fetch and group by feed url
// const subscriptionGroups = (await getRepository(Subscription).query(
// `
// SELECT
// url,
// ARRAY_AGG(id) AS "subscriptionIds",
// ARRAY_AGG(user_id) AS "userIds",
// ARRAY_AGG(last_fetched_at) AS "fetchedDates",
// ARRAY_AGG(coalesce(scheduled_at, NOW())) AS "scheduledDates",
// ARRAY_AGG(last_fetched_checksum) AS checksums,
// ARRAY_AGG(fetch_content) AS "fetchContents",
// ARRAY_AGG(coalesce(folder, $3)) AS folders
// FROM
// omnivore.subscriptions
// WHERE
// type = $1
// AND status = $2
// AND (scheduled_at <= NOW() OR scheduled_at IS NULL)
// GROUP BY
// url
// `,
// [
// SubscriptionType.Rss,
// SubscriptionStatus.Active,
// DEFAULT_SUBSCRIPTION_FOLDER,
// ]
// )) as RssSubscriptionGroup[]
// // create a cloud taks to fetch rss feed item for each subscription
// await Promise.all(
// subscriptionGroups.map((subscriptionGroup) => {
// try {
// return enqueueRssFeedFetch(subscriptionGroup)
// } catch (error) {
// logger.info('error creating rss feed fetch task', error)
// }
// })
// )
} catch (error) {
logger.info('error fetching rss feeds', error)
return res.status(500).send('Internal Server Error')

View File

@ -5980,6 +5980,11 @@
resolved "https://registry.yarnpkg.com/@sqltools/formatter/-/formatter-1.2.3.tgz#1185726610acc37317ddab11c3c7f9066966bd20"
integrity sha512-O3uyB/JbkAEMZaP3YqyHH7TMnex7tWyCbCI4EfJdOCoN6HIhqdJBWTM6aCCiWQ/5f5wxjgU735QAIpJbjDvmzg==
"@sqltools/formatter@^1.2.5":
version "1.2.5"
resolved "https://registry.yarnpkg.com/@sqltools/formatter/-/formatter-1.2.5.tgz#3abc203c79b8c3e90fd6c156a0c62d5403520e12"
integrity sha512-Uy0+khmZqUrUGm5dmMqVlnvufZRSK0FbYzVgp0UMstm+F5+W2/jnEEQyc9vo1ZR/E5ZI/B1WjjoTqBqwJL6Krw==
"@stitches/react@^1.2.5":
version "1.2.8"
resolved "https://registry.yarnpkg.com/@stitches/react/-/react-1.2.8.tgz#954f8008be8d9c65c4e58efa0937f32388ce3a38"
@ -7390,6 +7395,16 @@
"@types/qs" "*"
"@types/serve-static" "*"
"@types/express@^4.17.21":
version "4.17.21"
resolved "https://registry.yarnpkg.com/@types/express/-/express-4.17.21.tgz#c26d4a151e60efe0084b23dc3369ebc631ed192d"
integrity sha512-ejlPM315qwLpaQlQDTjPdsUFSc6ZsP4AN6AlWnogPjQ7CVi7PYF3YVz+CY3jE2pwYf7E/7HlDAN0rV2GxTG0HQ==
dependencies:
"@types/body-parser" "*"
"@types/express-serve-static-core" "^4.17.33"
"@types/qs" "*"
"@types/serve-static" "*"
"@types/filesystem@*":
version "0.0.32"
resolved "https://registry.yarnpkg.com/@types/filesystem/-/filesystem-0.0.32.tgz#307df7cc084a2293c3c1a31151b178063e0a8edf"
@ -7791,6 +7806,13 @@
resolved "https://registry.yarnpkg.com/@types/node/-/node-16.18.68.tgz#3155f64a961b3d8d10246c80657f9a7292e3421a"
integrity sha512-sG3hPIQwJLoewrN7cr0dwEy+yF5nD4D/4FxtQpFciRD/xwUzgD+G05uxZHv5mhfXo4F9Jkp13jjn0CC2q325sg==
"@types/node@^20.11.0":
version "20.11.0"
resolved "https://registry.yarnpkg.com/@types/node/-/node-20.11.0.tgz#8e0b99e70c0c1ade1a86c4a282f7b7ef87c9552f"
integrity sha512-o9bjXmDNcF7GbM4CNQpmi+TutCgap/K3w1JyKgxAjqx41zp9qlIAVFi0IhCNsJcXolEqLWhbFbEeL0PvYm4pcQ==
dependencies:
undici-types "~5.26.4"
"@types/nodemailer@^6.4.4":
version "6.4.4"
resolved "https://registry.yarnpkg.com/@types/nodemailer/-/nodemailer-6.4.4.tgz#c265f7e7a51df587597b3a49a023acaf0c741f4b"
@ -9315,6 +9337,11 @@ app-root-path@^3.0.0:
resolved "https://registry.yarnpkg.com/app-root-path/-/app-root-path-3.0.0.tgz#210b6f43873227e18a4b810a032283311555d5ad"
integrity sha512-qMcx+Gy2UZynHjOHOIXPNvpf+9cjvk3cWrBBK7zg4gH9+clobJRb9NGzcT7mQTcV/6Gm/1WelUtqxVXnNlrwcw==
app-root-path@^3.1.0:
version "3.1.0"
resolved "https://registry.yarnpkg.com/app-root-path/-/app-root-path-3.1.0.tgz#5971a2fc12ba170369a7a1ef018c71e6e47c2e86"
integrity sha512-biN3PwB2gUtjaYy/isrU3aNWI5w+fAfvHkSvCKeQGxhmYpwKFUxudR3Yya+KqVRHBmEDYh+/lTozYCFbmzX4nA==
apparatus@^0.0.10:
version "0.0.10"
resolved "https://registry.yarnpkg.com/apparatus/-/apparatus-0.0.10.tgz#81ea756772ada77863db54ceee8202c109bdca3e"
@ -12724,7 +12751,7 @@ dateformat@^3.0.0, dateformat@^3.0.3:
resolved "https://registry.yarnpkg.com/dateformat/-/dateformat-3.0.3.tgz#a6e37499a4d9a9cf85ef5872044d62901c9889ae"
integrity sha512-jyCETtSl3VMZMWeRo7iY1FL19ges1t55hMo5yaam4Jrsm5EPL89UQkoQRyiI+Yf4k8r2ZpdngkV8hr1lIdjb3Q==
dayjs@1.x, dayjs@^1.10.4, dayjs@^1.11.7:
dayjs@1.x, dayjs@^1.10.4, dayjs@^1.11.7, dayjs@^1.11.9:
version "1.11.10"
resolved "https://registry.yarnpkg.com/dayjs/-/dayjs-1.11.10.tgz#68acea85317a6e164457d6d6947564029a6a16a0"
integrity sha512-vjAczensTgRcqDERK0SR2XMwsF/tSvnvlv6VcF2GIhg6Sx4yOIt/irsr1RDJsKiIyBzJDpCoXiWWq28MqH2cnQ==
@ -13452,16 +13479,16 @@ dotenv@^16.0.1:
resolved "https://registry.yarnpkg.com/dotenv/-/dotenv-16.0.1.tgz#8f8f9d94876c35dac989876a5d3a82a267fdce1d"
integrity sha512-1K6hR6wtk2FviQ4kEiSjFiH5rpzEVi8WW0x96aztHVMhEspNpc4DVOUTEHtEva5VThQ8IaBX1Pe4gSzpVVUsKQ==
dotenv@^16.0.3, dotenv@~16.3.1:
version "16.3.1"
resolved "https://registry.yarnpkg.com/dotenv/-/dotenv-16.3.1.tgz#369034de7d7e5b120972693352a3bf112172cc3e"
integrity sha512-IPzF4w4/Rd94bA9imS68tZBaYyBWSCE47V1RGuMrB94iyTOIEwRmVL2x/4An+6mETpLrKJ5hQkB8W4kFAadeIQ==
dotenv@^8.0.0, dotenv@^8.2.0:
version "8.6.0"
resolved "https://registry.yarnpkg.com/dotenv/-/dotenv-8.6.0.tgz#061af664d19f7f4d8fc6e4ff9b584ce237adcb8b"
integrity sha512-IrPdXQsk2BbzvCBGBOTmmSH5SodmqZNt4ERAZDmW4CT+tL8VtvinqywuANaFu4bOMWki16nqf0e4oC0QIaDr/g==
dotenv@~16.3.1:
version "16.3.1"
resolved "https://registry.yarnpkg.com/dotenv/-/dotenv-16.3.1.tgz#369034de7d7e5b120972693352a3bf112172cc3e"
integrity sha512-IPzF4w4/Rd94bA9imS68tZBaYyBWSCE47V1RGuMrB94iyTOIEwRmVL2x/4An+6mETpLrKJ5hQkB8W4kFAadeIQ==
downshift@^6.0.15:
version "6.1.7"
resolved "https://registry.yarnpkg.com/downshift/-/downshift-6.1.7.tgz#fdb4c4e4f1d11587985cd76e21e8b4b3fa72e44c"
@ -14732,7 +14759,7 @@ express-rate-limit@^6.3.0:
resolved "https://registry.yarnpkg.com/express-rate-limit/-/express-rate-limit-6.11.1.tgz#52e05c5d379cd5d06ae29665862436eb712e414a"
integrity sha512-8+UpWtQY25lJaa4+3WxDBGDcAu4atcTruSs3QSL5VPEplYy6kmk84wutG9rUkkK5LmMQQ7TFHWLZYITwVNbbEg==
express@^4.16.4, express@^4.17.1:
express@^4.16.4, express@^4.17.1, express@^4.18.2:
version "4.18.2"
resolved "https://registry.yarnpkg.com/express/-/express-4.18.2.tgz#3fabe08296e930c796c19e3c516979386ba9fd59"
integrity sha512-5/PsL6iGPdfQ/lKM1UuielYgv3BUoJfz1aUwU9vHZ+J7gyvwdQXFEBIEIaxeGf0GIcreATNyBExtalisDbuMqQ==
@ -16083,7 +16110,7 @@ glob@7.2.0, glob@^7.1.1, glob@^7.1.2, glob@^7.1.3, glob@^7.1.4, glob@^7.1.6, glo
once "^1.3.0"
path-is-absolute "^1.0.0"
glob@^10.2.2:
glob@^10.2.2, glob@^10.3.10:
version "10.3.10"
resolved "https://registry.yarnpkg.com/glob/-/glob-10.3.10.tgz#0351ebb809fd187fe421ab96af83d3a70715df4b"
integrity sha512-fa46+tv1Ak0UPK1TOy/pZrIybNNt4HCv7SDzwyfiOZkvZLEbjsZkJBPtDHVshZjbecAoAGSC20MjLDG/qr679g==
@ -21777,6 +21804,11 @@ mkdirp@^1.0.3, mkdirp@^1.0.4:
resolved "https://registry.yarnpkg.com/mkdirp/-/mkdirp-1.0.4.tgz#3eb5ed62622756d79a5f0e2a221dfebad75c2f7e"
integrity sha512-vVqVZQyf3WLx2Shd0qJ9xuvqgAyKPLAiqITEtqW0oIUjzo3PePDd6fW9iFz30ef7Ysp/oiWqbhszeGWW2T6Gzw==
mkdirp@^2.1.3:
version "2.1.6"
resolved "https://registry.yarnpkg.com/mkdirp/-/mkdirp-2.1.6.tgz#964fbcb12b2d8c5d6fbc62a963ac95a273e2cc19"
integrity sha512-+hEnITedc8LAtIP9u3HJDFIdcLV2vXP33sqLLIzkv1Db1zO/1OxbvYf0Y1OC/S/Qo5dxHXepofhmxL02PsKe+A==
mkdirp@~0.3.5:
version "0.3.5"
resolved "https://registry.yarnpkg.com/mkdirp/-/mkdirp-0.3.5.tgz#de3e5f8961c88c787ee1368df849ac4413eca8d7"
@ -25964,6 +25996,14 @@ read-pkg@^7.1.0:
parse-json "^5.2.0"
type-fest "^2.0.0"
read-yaml-file@^2.1.0:
version "2.1.0"
resolved "https://registry.yarnpkg.com/read-yaml-file/-/read-yaml-file-2.1.0.tgz#c5866712db9ef5343b4d02c2413bada53c41c4a9"
integrity sha512-UkRNRIwnhG+y7hpqnycCL/xbTk7+ia9VuVTC0S+zVbwd65DI9eUpRMfsWIGrCWxTU/mi+JW8cHQCrv+zfCbEPQ==
dependencies:
js-yaml "^4.0.0"
strip-bom "^4.0.0"
read@1, read@^1.0.7, read@~1.0.7:
version "1.0.7"
resolved "https://registry.yarnpkg.com/read/-/read-1.0.7.tgz#b3da19bd052431a97671d44a42634adf710b40c4"
@ -29124,7 +29164,7 @@ tslib@^1.0.0, tslib@^1.8.1, tslib@^1.9.0, tslib@^1.9.3:
resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.14.1.tgz#cf2d38bdc34a134bcaf1091c41f6619e2f672d00"
integrity sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg==
tslib@^2, tslib@^2.0.0, tslib@^2.0.1, tslib@^2.0.3, tslib@^2.1.0, tslib@^2.3.0, tslib@^2.3.1, tslib@^2.4.0:
tslib@^2, tslib@^2.0.0, tslib@^2.0.1, tslib@^2.0.3, tslib@^2.1.0, tslib@^2.3.0, tslib@^2.3.1, tslib@^2.4.0, tslib@^2.5.0:
version "2.6.2"
resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.6.2.tgz#703ac29425e7b37cd6fd456e92404d46d1f3e4ae"
integrity sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==
@ -29340,6 +29380,27 @@ typeorm-naming-strategies@^4.1.0:
resolved "https://registry.yarnpkg.com/typeorm-naming-strategies/-/typeorm-naming-strategies-4.1.0.tgz#1ec6eb296c8d7b69bb06764d5b9083ff80e814a9"
integrity sha512-vPekJXzZOTZrdDvTl1YoM+w+sUIfQHG4kZTpbFYoTsufyv9NIBRe4Q+PdzhEAFA2std3D9LZHEb1EjE9zhRpiQ==
typeorm@^0.3.19:
version "0.3.19"
resolved "https://registry.yarnpkg.com/typeorm/-/typeorm-0.3.19.tgz#a985ce8ae36d266018e44fed5e27a4a5da34ad2a"
integrity sha512-OGelrY5qEoAU80mR1iyvmUHiKCPUydL6xp6bebXzS7jyv/X70Gp/jBWRAfF4qGOfy2A7orMiGRfwsBUNbEL65g==
dependencies:
"@sqltools/formatter" "^1.2.5"
app-root-path "^3.1.0"
buffer "^6.0.3"
chalk "^4.1.2"
cli-highlight "^2.1.11"
dayjs "^1.11.9"
debug "^4.3.4"
dotenv "^16.0.3"
glob "^10.3.10"
mkdirp "^2.1.3"
reflect-metadata "^0.1.13"
sha.js "^2.4.11"
tslib "^2.5.0"
uuid "^9.0.0"
yargs "^17.6.2"
typeorm@^0.3.4:
version "0.3.7"
resolved "https://registry.yarnpkg.com/typeorm/-/typeorm-0.3.7.tgz#5776ed5058f0acb75d64723b39ff458d21de64c1"
@ -29378,6 +29439,11 @@ typescript@^4.4.4:
resolved "https://registry.yarnpkg.com/typescript/-/typescript-4.9.5.tgz#095979f9bcc0d09da324d58d03ce8f8374cbe65a"
integrity sha512-1FXk9E2Hm+QzZQ7z+McJiHL4NW1F2EzMu9Nq9i3zAaGqibafqYwCVU6WyWAuyQRRzOlxou8xZSyXLEN8oKj24g==
typescript@^5.3.3:
version "5.3.3"
resolved "https://registry.yarnpkg.com/typescript/-/typescript-5.3.3.tgz#b3ce6ba258e72e6305ba66f5c9b452aaee3ffe37"
integrity sha512-pXWcraxM0uxAS+tN0AG/BF2TyqmHO014Z070UsJ+pFvYuRSq8KH8DmWpnbXe0pEPDHXZV3FcAbJkijJ5oNEnWw==
ua-parser-js@^0.7.30:
version "0.7.33"
resolved "https://registry.yarnpkg.com/ua-parser-js/-/ua-parser-js-0.7.33.tgz#1d04acb4ccef9293df6f70f2c3d22f3030d8b532"
@ -29456,6 +29522,11 @@ underscore@^1.13.4, underscore@^1.9.1:
resolved "https://registry.yarnpkg.com/underscore/-/underscore-1.13.4.tgz#7886b46bbdf07f768e0052f1828e1dcab40c0dee"
integrity sha512-BQFnUDuAQ4Yf/cYY5LNrK9NCJFKriaRbD9uR1fTeXnBeoa97W0i41qkZfGO9pSo8I5KzjAcSY2XYtdf0oKd7KQ==
undici-types@~5.26.4:
version "5.26.5"
resolved "https://registry.yarnpkg.com/undici-types/-/undici-types-5.26.5.tgz#bcd539893d00b56e964fd2657a4866b221a65617"
integrity sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==
undici@^4.9.3:
version "4.14.1"
resolved "https://registry.yarnpkg.com/undici/-/undici-4.14.1.tgz#7633b143a8a10d6d63335e00511d071e8d52a1d9"