diff --git a/packages/api/package.json b/packages/api/package.json index c41153433..7d5420686 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -5,7 +5,9 @@ "scripts": { "build": "tsc && yarn copy-files", "dev": "ts-node-dev --files src/server.ts", + "dev_qp": "ts-node-dev --files src/queue-processor.ts", "start": "node dist/server.js", + "start_queue_processor": "node dist/queue-processor.js", "lint": "eslint src --ext ts,js,tsx,jsx", "lint:fix": "eslint src --fix --ext ts,js,tsx,jsx", "test:typecheck": "tsc --noEmit", diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts new file mode 100644 index 000000000..b8afa9a25 --- /dev/null +++ b/packages/api/src/queue-processor.ts @@ -0,0 +1,91 @@ +/* eslint-disable @typescript-eslint/no-floating-promises */ +/* eslint-disable @typescript-eslint/restrict-template-expressions */ +/* eslint-disable @typescript-eslint/require-await */ +/* eslint-disable @typescript-eslint/no-misused-promises */ +import express, { Express } from 'express' +import { appDataSource } from './data_source' +import { env } from './env' +import { redisClient, mqRedisClient } from './redis' + +import { Worker, Job, QueueEvents } from 'bullmq' +import { refreshAllFeeds } from './jobs/rss/refreshAllFeeds' +import { refreshFeed } from './jobs/rss/refreshFeed' + +export const QUEUE_NAME = 'omnivore-backend-queue' + +const main = async () => { + console.log('calling queue-processor start') + const app: Express = express() + const port = process.env.PORT || 3002 + + await appDataSource.initialize() + + // respond healthy to auto-scaler. + app.get('/_ah/health', (req, res) => res.sendStatus(200)) + + // redis is optional + if (env.redis.url) { + redisClient.on('error', (err) => { + console.error('Redis Client Error', err) + }) + + await redisClient.connect() + console.log('Redis Client Connected:', env.redis.url) + } + + // redis for message queue + if (env.redis.url) { + mqRedisClient?.on('error', (err) => { + console.error('Redis Client Error', err) + }) + } + + const worker = new Worker( + QUEUE_NAME, + async (job: Job) => { + switch (job.name) { + case 'refresh-all-feeds': { + return await refreshAllFeeds(appDataSource, mqRedisClient) + } + case 'refresh-feed': { + return await refreshFeed(redisClient, job.data) + } + } + return true + }, + { + connection: mqRedisClient, + } + ) + + const queueEvents = new QueueEvents(QUEUE_NAME, { + connection: mqRedisClient, + }) + + queueEvents.on('added', async (job) => { + console.log('added job: ', job.jobId) + }) + + queueEvents.on('removed', async (job) => { + console.log('removed job: ', job.jobId) + }) + + queueEvents.on('completed', async (job) => { + console.log('completed job: ', job.jobId) + }) + + process.on('SIGINT', async () => { + console.log('[queue-processor]: Received SIGTERM. Shutting down.') + await redisClient.disconnect() + mqRedisClient.disconnect() + }) + + app.listen(port, () => { + console.log(`[queue-processor]: started`) + }) +} + +// only call main if the file was called from the CLI and wasn't required from another module +if (require.main === module) { + main().catch((e) => console.error(e)) +} diff --git a/packages/api/src/routers/svc/rss_feed.ts b/packages/api/src/routers/svc/rss_feed.ts index 671fbebd5..2c5be9bd7 100644 --- a/packages/api/src/routers/svc/rss_feed.ts +++ b/packages/api/src/routers/svc/rss_feed.ts @@ -12,6 +12,8 @@ import { RssSubscriptionGroup, } from '../../utils/createTask' import { logger } from '../../utils/logger' +import { queueRSSRefreshAllFeedsJob } from '../../jobs/rss/refreshAllFeeds' +import { mqRedisClient } from '../../redis' export function rssFeedRouter() { const router = express.Router() @@ -28,44 +30,46 @@ export function rssFeedRouter() { return res.status(200).send('Expired') } - // get active rss feed subscriptions scheduled for fetch and group by feed url - const subscriptionGroups = (await getRepository(Subscription).query( - ` - SELECT - url, - ARRAY_AGG(id) AS "subscriptionIds", - ARRAY_AGG(user_id) AS "userIds", - ARRAY_AGG(last_fetched_at) AS "fetchedDates", - ARRAY_AGG(coalesce(scheduled_at, NOW())) AS "scheduledDates", - ARRAY_AGG(last_fetched_checksum) AS checksums, - ARRAY_AGG(fetch_content) AS "fetchContents", - ARRAY_AGG(coalesce(folder, $3)) AS folders - FROM - omnivore.subscriptions - WHERE - type = $1 - AND status = $2 - AND (scheduled_at <= NOW() OR scheduled_at IS NULL) - GROUP BY - url - `, - [ - SubscriptionType.Rss, - SubscriptionStatus.Active, - DEFAULT_SUBSCRIPTION_FOLDER, - ] - )) as RssSubscriptionGroup[] + await queueRSSRefreshAllFeedsJob(mqRedisClient) - // create a cloud taks to fetch rss feed item for each subscription - await Promise.all( - subscriptionGroups.map((subscriptionGroup) => { - try { - return enqueueRssFeedFetch(subscriptionGroup) - } catch (error) { - logger.info('error creating rss feed fetch task', error) - } - }) - ) + // // get active rss feed subscriptions scheduled for fetch and group by feed url + // const subscriptionGroups = (await getRepository(Subscription).query( + // ` + // SELECT + // url, + // ARRAY_AGG(id) AS "subscriptionIds", + // ARRAY_AGG(user_id) AS "userIds", + // ARRAY_AGG(last_fetched_at) AS "fetchedDates", + // ARRAY_AGG(coalesce(scheduled_at, NOW())) AS "scheduledDates", + // ARRAY_AGG(last_fetched_checksum) AS checksums, + // ARRAY_AGG(fetch_content) AS "fetchContents", + // ARRAY_AGG(coalesce(folder, $3)) AS folders + // FROM + // omnivore.subscriptions + // WHERE + // type = $1 + // AND status = $2 + // AND (scheduled_at <= NOW() OR scheduled_at IS NULL) + // GROUP BY + // url + // `, + // [ + // SubscriptionType.Rss, + // SubscriptionStatus.Active, + // DEFAULT_SUBSCRIPTION_FOLDER, + // ] + // )) as RssSubscriptionGroup[] + + // // create a cloud taks to fetch rss feed item for each subscription + // await Promise.all( + // subscriptionGroups.map((subscriptionGroup) => { + // try { + // return enqueueRssFeedFetch(subscriptionGroup) + // } catch (error) { + // logger.info('error creating rss feed fetch task', error) + // } + // }) + // ) } catch (error) { logger.info('error fetching rss feeds', error) return res.status(500).send('Internal Server Error') diff --git a/yarn.lock b/yarn.lock index 4512cf5f5..2a5ec5f3d 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5980,6 +5980,11 @@ resolved "https://registry.yarnpkg.com/@sqltools/formatter/-/formatter-1.2.3.tgz#1185726610acc37317ddab11c3c7f9066966bd20" integrity sha512-O3uyB/JbkAEMZaP3YqyHH7TMnex7tWyCbCI4EfJdOCoN6HIhqdJBWTM6aCCiWQ/5f5wxjgU735QAIpJbjDvmzg== +"@sqltools/formatter@^1.2.5": + version "1.2.5" + resolved "https://registry.yarnpkg.com/@sqltools/formatter/-/formatter-1.2.5.tgz#3abc203c79b8c3e90fd6c156a0c62d5403520e12" + integrity sha512-Uy0+khmZqUrUGm5dmMqVlnvufZRSK0FbYzVgp0UMstm+F5+W2/jnEEQyc9vo1ZR/E5ZI/B1WjjoTqBqwJL6Krw== + "@stitches/react@^1.2.5": version "1.2.8" resolved "https://registry.yarnpkg.com/@stitches/react/-/react-1.2.8.tgz#954f8008be8d9c65c4e58efa0937f32388ce3a38" @@ -7390,6 +7395,16 @@ "@types/qs" "*" "@types/serve-static" "*" +"@types/express@^4.17.21": + version "4.17.21" + resolved "https://registry.yarnpkg.com/@types/express/-/express-4.17.21.tgz#c26d4a151e60efe0084b23dc3369ebc631ed192d" + integrity sha512-ejlPM315qwLpaQlQDTjPdsUFSc6ZsP4AN6AlWnogPjQ7CVi7PYF3YVz+CY3jE2pwYf7E/7HlDAN0rV2GxTG0HQ== + dependencies: + "@types/body-parser" "*" + "@types/express-serve-static-core" "^4.17.33" + "@types/qs" "*" + "@types/serve-static" "*" + "@types/filesystem@*": version "0.0.32" resolved "https://registry.yarnpkg.com/@types/filesystem/-/filesystem-0.0.32.tgz#307df7cc084a2293c3c1a31151b178063e0a8edf" @@ -7791,6 +7806,13 @@ resolved "https://registry.yarnpkg.com/@types/node/-/node-16.18.68.tgz#3155f64a961b3d8d10246c80657f9a7292e3421a" integrity sha512-sG3hPIQwJLoewrN7cr0dwEy+yF5nD4D/4FxtQpFciRD/xwUzgD+G05uxZHv5mhfXo4F9Jkp13jjn0CC2q325sg== +"@types/node@^20.11.0": + version "20.11.0" + resolved "https://registry.yarnpkg.com/@types/node/-/node-20.11.0.tgz#8e0b99e70c0c1ade1a86c4a282f7b7ef87c9552f" + integrity sha512-o9bjXmDNcF7GbM4CNQpmi+TutCgap/K3w1JyKgxAjqx41zp9qlIAVFi0IhCNsJcXolEqLWhbFbEeL0PvYm4pcQ== + dependencies: + undici-types "~5.26.4" + "@types/nodemailer@^6.4.4": version "6.4.4" resolved "https://registry.yarnpkg.com/@types/nodemailer/-/nodemailer-6.4.4.tgz#c265f7e7a51df587597b3a49a023acaf0c741f4b" @@ -9315,6 +9337,11 @@ app-root-path@^3.0.0: resolved "https://registry.yarnpkg.com/app-root-path/-/app-root-path-3.0.0.tgz#210b6f43873227e18a4b810a032283311555d5ad" integrity sha512-qMcx+Gy2UZynHjOHOIXPNvpf+9cjvk3cWrBBK7zg4gH9+clobJRb9NGzcT7mQTcV/6Gm/1WelUtqxVXnNlrwcw== +app-root-path@^3.1.0: + version "3.1.0" + resolved "https://registry.yarnpkg.com/app-root-path/-/app-root-path-3.1.0.tgz#5971a2fc12ba170369a7a1ef018c71e6e47c2e86" + integrity sha512-biN3PwB2gUtjaYy/isrU3aNWI5w+fAfvHkSvCKeQGxhmYpwKFUxudR3Yya+KqVRHBmEDYh+/lTozYCFbmzX4nA== + apparatus@^0.0.10: version "0.0.10" resolved "https://registry.yarnpkg.com/apparatus/-/apparatus-0.0.10.tgz#81ea756772ada77863db54ceee8202c109bdca3e" @@ -12724,7 +12751,7 @@ dateformat@^3.0.0, dateformat@^3.0.3: resolved "https://registry.yarnpkg.com/dateformat/-/dateformat-3.0.3.tgz#a6e37499a4d9a9cf85ef5872044d62901c9889ae" integrity sha512-jyCETtSl3VMZMWeRo7iY1FL19ges1t55hMo5yaam4Jrsm5EPL89UQkoQRyiI+Yf4k8r2ZpdngkV8hr1lIdjb3Q== -dayjs@1.x, dayjs@^1.10.4, dayjs@^1.11.7: +dayjs@1.x, dayjs@^1.10.4, dayjs@^1.11.7, dayjs@^1.11.9: version "1.11.10" resolved "https://registry.yarnpkg.com/dayjs/-/dayjs-1.11.10.tgz#68acea85317a6e164457d6d6947564029a6a16a0" integrity sha512-vjAczensTgRcqDERK0SR2XMwsF/tSvnvlv6VcF2GIhg6Sx4yOIt/irsr1RDJsKiIyBzJDpCoXiWWq28MqH2cnQ== @@ -13452,16 +13479,16 @@ dotenv@^16.0.1: resolved "https://registry.yarnpkg.com/dotenv/-/dotenv-16.0.1.tgz#8f8f9d94876c35dac989876a5d3a82a267fdce1d" integrity sha512-1K6hR6wtk2FviQ4kEiSjFiH5rpzEVi8WW0x96aztHVMhEspNpc4DVOUTEHtEva5VThQ8IaBX1Pe4gSzpVVUsKQ== +dotenv@^16.0.3, dotenv@~16.3.1: + version "16.3.1" + resolved "https://registry.yarnpkg.com/dotenv/-/dotenv-16.3.1.tgz#369034de7d7e5b120972693352a3bf112172cc3e" + integrity sha512-IPzF4w4/Rd94bA9imS68tZBaYyBWSCE47V1RGuMrB94iyTOIEwRmVL2x/4An+6mETpLrKJ5hQkB8W4kFAadeIQ== + dotenv@^8.0.0, dotenv@^8.2.0: version "8.6.0" resolved "https://registry.yarnpkg.com/dotenv/-/dotenv-8.6.0.tgz#061af664d19f7f4d8fc6e4ff9b584ce237adcb8b" integrity sha512-IrPdXQsk2BbzvCBGBOTmmSH5SodmqZNt4ERAZDmW4CT+tL8VtvinqywuANaFu4bOMWki16nqf0e4oC0QIaDr/g== -dotenv@~16.3.1: - version "16.3.1" - resolved "https://registry.yarnpkg.com/dotenv/-/dotenv-16.3.1.tgz#369034de7d7e5b120972693352a3bf112172cc3e" - integrity sha512-IPzF4w4/Rd94bA9imS68tZBaYyBWSCE47V1RGuMrB94iyTOIEwRmVL2x/4An+6mETpLrKJ5hQkB8W4kFAadeIQ== - downshift@^6.0.15: version "6.1.7" resolved "https://registry.yarnpkg.com/downshift/-/downshift-6.1.7.tgz#fdb4c4e4f1d11587985cd76e21e8b4b3fa72e44c" @@ -14732,7 +14759,7 @@ express-rate-limit@^6.3.0: resolved "https://registry.yarnpkg.com/express-rate-limit/-/express-rate-limit-6.11.1.tgz#52e05c5d379cd5d06ae29665862436eb712e414a" integrity sha512-8+UpWtQY25lJaa4+3WxDBGDcAu4atcTruSs3QSL5VPEplYy6kmk84wutG9rUkkK5LmMQQ7TFHWLZYITwVNbbEg== -express@^4.16.4, express@^4.17.1: +express@^4.16.4, express@^4.17.1, express@^4.18.2: version "4.18.2" resolved "https://registry.yarnpkg.com/express/-/express-4.18.2.tgz#3fabe08296e930c796c19e3c516979386ba9fd59" integrity sha512-5/PsL6iGPdfQ/lKM1UuielYgv3BUoJfz1aUwU9vHZ+J7gyvwdQXFEBIEIaxeGf0GIcreATNyBExtalisDbuMqQ== @@ -16083,7 +16110,7 @@ glob@7.2.0, glob@^7.1.1, glob@^7.1.2, glob@^7.1.3, glob@^7.1.4, glob@^7.1.6, glo once "^1.3.0" path-is-absolute "^1.0.0" -glob@^10.2.2: +glob@^10.2.2, glob@^10.3.10: version "10.3.10" resolved "https://registry.yarnpkg.com/glob/-/glob-10.3.10.tgz#0351ebb809fd187fe421ab96af83d3a70715df4b" integrity sha512-fa46+tv1Ak0UPK1TOy/pZrIybNNt4HCv7SDzwyfiOZkvZLEbjsZkJBPtDHVshZjbecAoAGSC20MjLDG/qr679g== @@ -21777,6 +21804,11 @@ mkdirp@^1.0.3, mkdirp@^1.0.4: resolved "https://registry.yarnpkg.com/mkdirp/-/mkdirp-1.0.4.tgz#3eb5ed62622756d79a5f0e2a221dfebad75c2f7e" integrity sha512-vVqVZQyf3WLx2Shd0qJ9xuvqgAyKPLAiqITEtqW0oIUjzo3PePDd6fW9iFz30ef7Ysp/oiWqbhszeGWW2T6Gzw== +mkdirp@^2.1.3: + version "2.1.6" + resolved "https://registry.yarnpkg.com/mkdirp/-/mkdirp-2.1.6.tgz#964fbcb12b2d8c5d6fbc62a963ac95a273e2cc19" + integrity sha512-+hEnITedc8LAtIP9u3HJDFIdcLV2vXP33sqLLIzkv1Db1zO/1OxbvYf0Y1OC/S/Qo5dxHXepofhmxL02PsKe+A== + mkdirp@~0.3.5: version "0.3.5" resolved "https://registry.yarnpkg.com/mkdirp/-/mkdirp-0.3.5.tgz#de3e5f8961c88c787ee1368df849ac4413eca8d7" @@ -25964,6 +25996,14 @@ read-pkg@^7.1.0: parse-json "^5.2.0" type-fest "^2.0.0" +read-yaml-file@^2.1.0: + version "2.1.0" + resolved "https://registry.yarnpkg.com/read-yaml-file/-/read-yaml-file-2.1.0.tgz#c5866712db9ef5343b4d02c2413bada53c41c4a9" + integrity sha512-UkRNRIwnhG+y7hpqnycCL/xbTk7+ia9VuVTC0S+zVbwd65DI9eUpRMfsWIGrCWxTU/mi+JW8cHQCrv+zfCbEPQ== + dependencies: + js-yaml "^4.0.0" + strip-bom "^4.0.0" + read@1, read@^1.0.7, read@~1.0.7: version "1.0.7" resolved "https://registry.yarnpkg.com/read/-/read-1.0.7.tgz#b3da19bd052431a97671d44a42634adf710b40c4" @@ -29124,7 +29164,7 @@ tslib@^1.0.0, tslib@^1.8.1, tslib@^1.9.0, tslib@^1.9.3: resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.14.1.tgz#cf2d38bdc34a134bcaf1091c41f6619e2f672d00" integrity sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg== -tslib@^2, tslib@^2.0.0, tslib@^2.0.1, tslib@^2.0.3, tslib@^2.1.0, tslib@^2.3.0, tslib@^2.3.1, tslib@^2.4.0: +tslib@^2, tslib@^2.0.0, tslib@^2.0.1, tslib@^2.0.3, tslib@^2.1.0, tslib@^2.3.0, tslib@^2.3.1, tslib@^2.4.0, tslib@^2.5.0: version "2.6.2" resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.6.2.tgz#703ac29425e7b37cd6fd456e92404d46d1f3e4ae" integrity sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q== @@ -29340,6 +29380,27 @@ typeorm-naming-strategies@^4.1.0: resolved "https://registry.yarnpkg.com/typeorm-naming-strategies/-/typeorm-naming-strategies-4.1.0.tgz#1ec6eb296c8d7b69bb06764d5b9083ff80e814a9" integrity sha512-vPekJXzZOTZrdDvTl1YoM+w+sUIfQHG4kZTpbFYoTsufyv9NIBRe4Q+PdzhEAFA2std3D9LZHEb1EjE9zhRpiQ== +typeorm@^0.3.19: + version "0.3.19" + resolved "https://registry.yarnpkg.com/typeorm/-/typeorm-0.3.19.tgz#a985ce8ae36d266018e44fed5e27a4a5da34ad2a" + integrity sha512-OGelrY5qEoAU80mR1iyvmUHiKCPUydL6xp6bebXzS7jyv/X70Gp/jBWRAfF4qGOfy2A7orMiGRfwsBUNbEL65g== + dependencies: + "@sqltools/formatter" "^1.2.5" + app-root-path "^3.1.0" + buffer "^6.0.3" + chalk "^4.1.2" + cli-highlight "^2.1.11" + dayjs "^1.11.9" + debug "^4.3.4" + dotenv "^16.0.3" + glob "^10.3.10" + mkdirp "^2.1.3" + reflect-metadata "^0.1.13" + sha.js "^2.4.11" + tslib "^2.5.0" + uuid "^9.0.0" + yargs "^17.6.2" + typeorm@^0.3.4: version "0.3.7" resolved "https://registry.yarnpkg.com/typeorm/-/typeorm-0.3.7.tgz#5776ed5058f0acb75d64723b39ff458d21de64c1" @@ -29378,6 +29439,11 @@ typescript@^4.4.4: resolved "https://registry.yarnpkg.com/typescript/-/typescript-4.9.5.tgz#095979f9bcc0d09da324d58d03ce8f8374cbe65a" integrity sha512-1FXk9E2Hm+QzZQ7z+McJiHL4NW1F2EzMu9Nq9i3zAaGqibafqYwCVU6WyWAuyQRRzOlxou8xZSyXLEN8oKj24g== +typescript@^5.3.3: + version "5.3.3" + resolved "https://registry.yarnpkg.com/typescript/-/typescript-5.3.3.tgz#b3ce6ba258e72e6305ba66f5c9b452aaee3ffe37" + integrity sha512-pXWcraxM0uxAS+tN0AG/BF2TyqmHO014Z070UsJ+pFvYuRSq8KH8DmWpnbXe0pEPDHXZV3FcAbJkijJ5oNEnWw== + ua-parser-js@^0.7.30: version "0.7.33" resolved "https://registry.yarnpkg.com/ua-parser-js/-/ua-parser-js-0.7.33.tgz#1d04acb4ccef9293df6f70f2c3d22f3030d8b532" @@ -29456,6 +29522,11 @@ underscore@^1.13.4, underscore@^1.9.1: resolved "https://registry.yarnpkg.com/underscore/-/underscore-1.13.4.tgz#7886b46bbdf07f768e0052f1828e1dcab40c0dee" integrity sha512-BQFnUDuAQ4Yf/cYY5LNrK9NCJFKriaRbD9uR1fTeXnBeoa97W0i41qkZfGO9pSo8I5KzjAcSY2XYtdf0oKd7KQ== +undici-types@~5.26.4: + version "5.26.5" + resolved "https://registry.yarnpkg.com/undici-types/-/undici-types-5.26.5.tgz#bcd539893d00b56e964fd2657a4866b221a65617" + integrity sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA== + undici@^4.9.3: version "4.14.1" resolved "https://registry.yarnpkg.com/undici/-/undici-4.14.1.tgz#7633b143a8a10d6d63335e00511d071e8d52a1d9"