diff --git a/CHANGELOG.md b/CHANGELOG.md index cf09111..fd6d24a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added +- Export `WorkerQueue` for monitoring purposes to provide insights into the running load of the application. - JSDocs: added addEntryToProcessingMap for EventQueueProcessorBase - Enhanced Event Processing: Events will continue to be processed even if the initial processing time is exceeded. Events are now broadcast, allowing different application instances to pick them up. The existing worker queue is used to ensure proper load balancing. diff --git a/src/index.d.ts b/src/index.d.ts index 21b946b..d019bf2 100644 --- a/src/index.d.ts +++ b/src/index.d.ts @@ -215,3 +215,38 @@ declare class Config { } export const config: Config; + +export const workerQueue: WorkerQueue; + +declare class WorkerQueue { + constructor(concurrency: number); + + addToQueue(load: number, label: string, priority?: Priorities, cb?: () => any): Promise; + + _executeFunction( + load: number, + label: string, + cb: () => any, + resolve: (value?: unknown) => void, + reject: (reason?: any) => void, + startTime: bigint, + priority: string + ): void; + + get runningPromises(): Array>; + get runningLoad(): number; + + static get instance(): WorkerQueue; + + get queue(): Record< + string, + Array<[number, string, () => any, (value?: unknown) => void, (reason?: any) => void, bigint]> + >; +} + +interface Priorities { + Low: string; + Medium: string; + High: string; + VeryHigh: string; +} diff --git a/src/index.js b/src/index.js index 1d5e4d3..97022b1 100644 --- a/src/index.js +++ b/src/index.js @@ -10,4 +10,5 @@ module.exports = { ...require("./constants"), ...require("./publishEvent"), EventQueueProcessorBase: require("./EventQueueProcessorBase"), + WorkerQueue: require("./shared/WorkerQueue"), }; diff --git a/src/shared/WorkerQueue.js b/src/shared/WorkerQueue.js index 5ab25f1..d319af9 100644 --- a/src/shared/WorkerQueue.js +++ b/src/shared/WorkerQueue.js @@ -8,7 +8,7 @@ const { Priorities } = require("../constants"); const SetIntervalDriftSafe = require("./SetIntervalDriftSafe"); const PRIORITIES = Object.values(Priorities).reverse(); -const PRIORITY_MULTIPLICATOR = PRIORITIES.reduce((result, element, index) => { +const PRIORITY_MULTIPLICATION = PRIORITIES.reduce((result, element, index) => { result[element] = index + 1; return result; }, {}); @@ -152,6 +152,10 @@ class WorkerQueue { return this.#queue; } + get runningLoad() { + return this.#runningLoad; + } + #checkAndLogWaitingTime(startTime, label, priority) { const ts = Date.now(); if (ts - lastLogTs <= 1000) { @@ -159,7 +163,7 @@ class WorkerQueue { } lastLogTs = ts; const diffMs = Math.round(Number(process.hrtime.bigint() - startTime) / NANO_TO_MS); - const priorityMultiplication = PRIORITY_MULTIPLICATOR[priority]; + const priorityMultiplication = PRIORITY_MULTIPLICATION[priority]; let logLevel; if (diffMs >= THRESHOLD.ERROR * priorityMultiplication) { logLevel = "error";