Skip to content

Commit

Permalink
export worker queue (#217)
Browse files Browse the repository at this point in the history
Co-authored-by: Max Gruenfelder <maximilian.gruenfelder@sap.com>
  • Loading branch information
soccermax and Max Gruenfelder committed Jul 31, 2024
1 parent 0cd1ea1 commit 1290df6
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Added

- Export `WorkerQueue` for monitoring purposes to provide insights into the running load of the application.
- JSDocs: added addEntryToProcessingMap for EventQueueProcessorBase
- Enhanced Event Processing: Events will continue to be processed even if the initial processing time is exceeded. Events are now broadcast, allowing different application instances to pick them up. The existing worker queue is used to ensure proper load balancing.

Expand Down
35 changes: 35 additions & 0 deletions src/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,38 @@ declare class Config {
}

export const config: Config;

export const workerQueue: WorkerQueue;

declare class WorkerQueue {
constructor(concurrency: number);

addToQueue(load: number, label: string, priority?: Priorities, cb?: () => any): Promise<any>;

_executeFunction(
load: number,
label: string,
cb: () => any,
resolve: (value?: unknown) => void,
reject: (reason?: any) => void,
startTime: bigint,
priority: string
): void;

get runningPromises(): Array<Promise<any>>;
get runningLoad(): number;

static get instance(): WorkerQueue;

get queue(): Record<
string,
Array<[number, string, () => any, (value?: unknown) => void, (reason?: any) => void, bigint]>
>;
}

interface Priorities {
Low: string;
Medium: string;
High: string;
VeryHigh: string;
}
1 change: 1 addition & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ module.exports = {
...require("./constants"),
...require("./publishEvent"),
EventQueueProcessorBase: require("./EventQueueProcessorBase"),
WorkerQueue: require("./shared/WorkerQueue"),
};
8 changes: 6 additions & 2 deletions src/shared/WorkerQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const { Priorities } = require("../constants");
const SetIntervalDriftSafe = require("./SetIntervalDriftSafe");

const PRIORITIES = Object.values(Priorities).reverse();
const PRIORITY_MULTIPLICATOR = PRIORITIES.reduce((result, element, index) => {
const PRIORITY_MULTIPLICATION = PRIORITIES.reduce((result, element, index) => {
result[element] = index + 1;
return result;
}, {});
Expand Down Expand Up @@ -152,14 +152,18 @@ class WorkerQueue {
return this.#queue;
}

get runningLoad() {
return this.#runningLoad;
}

#checkAndLogWaitingTime(startTime, label, priority) {
const ts = Date.now();
if (ts - lastLogTs <= 1000) {
return;
}
lastLogTs = ts;
const diffMs = Math.round(Number(process.hrtime.bigint() - startTime) / NANO_TO_MS);
const priorityMultiplication = PRIORITY_MULTIPLICATOR[priority];
const priorityMultiplication = PRIORITY_MULTIPLICATION[priority];
let logLevel;
if (diffMs >= THRESHOLD.ERROR * priorityMultiplication) {
logLevel = "error";
Expand Down

0 comments on commit 1290df6

Please sign in to comment.