Skip to content

Commit 363bb81

Browse files
committed
Bump version to v1.1.0: Adding error handling capabilities for uncaught errors from background jobs. Previously, such uncaught job errors were ignored. Now, they are captured and can be accessed by users.
1 parent 8d2a66f commit 363bb81

10 files changed

+309
-24
lines changed

README.md

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,14 @@ npm i zero-backpressure-semaphore-typescript
2626
## Key Features
2727

2828
- __Backpressure Control__: Ideal for job workers and background services. Concurrency control alone isn't sufficient to ensure stability and performance if backpressure control is overlooked.
29-
- __Graceful Termination__: Achieved via the `waitTillAllExecutingJobsAreSettled` method.
29+
- __Graceful Termination__: Await the completion of all currently executing jobs via the `waitTillAllExecutingJobsAreSettled` method.
3030
- __High Efficiency__: All state-altering operations have a constant time complexity, O(1).
3131
- __Comprehensive documentation__: The class is thoroughly documented, enabling IDEs to provide helpful tooltips that enhance the coding experience.
32+
- __Robust Error Handling__: Uncaught errors from background jobs triggered by `startExecution` are captured and can be accessed using the `extractUncaughtErrors` method.
3233
- Fully covered by unit tests.
3334
- Self-explanatory method names.
3435
- No external runtime dependencies: Only development dependencies are used.
35-
- ES2020 Compatibility.
36+
- ES6 Compatibility.
3637
- TypeScript support.
3738

3839
## 1st use-case: Multiple Jobs Execution
@@ -70,15 +71,71 @@ async function aggregateSensorsData(sensorUIDs: ReadonlyArray<string>) {
7071
// Note: at this stage, jobs might be still executing, as we did not wait for
7172
// their completion.
7273

73-
// Graceful termination, if desired.
74+
// Graceful termination: await the completion of all currently executing jobs.
7475
await sensorAggregationSemaphore.waitTillAllExecutingJobsAreSettled();
7576
console.info(`Finished aggregating data from ${sensorUIDs.length} IoT sensors`);
7677
}
7778

79+
/**
80+
* Handles the data aggregation process for a specified IoT sensor.
81+
*
82+
* @param sensorUID - The unique identifier of the IoT sensor whose data is to be aggregated.
83+
*/
7884
async function handleDataAggregation(sensorUID): Promise<void> {
79-
// Business logic for aggregating data from a single sensor.
85+
// Implementation goes here.
8086
}
8187
```
88+
89+
If the jobs might throw errors, you don't need to worry about these errors propagating up to the event loop and potentially crashing the application. Uncaught errors from jobs triggered by `startExecution` are captured by the semaphore and can be safely accessed for post-processing purposes (e.g., metrics). See the following adaptation of the above example, now utilizing the semaphore's error handling capabilities:
90+
91+
```ts
92+
import { ZeroBackpressureSemaphore } from 'zero-backpressure-semaphore-typescript';
93+
94+
const maxConcurrentAggregationJobs = 24;
95+
const sensorAggregationSemaphore = // Notice the 2nd generic parameter (Error by default).
96+
new ZeroBackpressureSemaphore<void, SensorAggregationError>(
97+
maxConcurrentAggregationJobs
98+
);
99+
100+
async function aggregateSensorsData(sensorUIDs: ReadonlyArray<string>) {
101+
for (const uid of sensorUIDs) {
102+
// Until the semaphore can start aggregating data from the current sensor, it won't make
103+
// sense to add more jobs, as such will induce unnecessary backpressure.
104+
await sensorAggregationSemaphore.startExecution(
105+
(): Promise<void> => handleDataAggregation(uid)
106+
);
107+
}
108+
// Note: at this stage, jobs might be still executing, as we did not wait for
109+
// their completion.
110+
111+
// Graceful termination: await the completion of all currently executing jobs.
112+
await sensorAggregationSemaphore.waitTillAllExecutingJobsAreSettled();
113+
114+
// Post processing.
115+
const errors = sensorAggregationSemaphore.extractUncaughtErrors();
116+
if (errors.length > 0) {
117+
await updateFailedAggregationMetrics(errors);
118+
}
119+
120+
// Summary.
121+
const successfulJobsCount = sensorUIDs.length - errors.length;
122+
logger.info(
123+
`Successfully aggregated data from ${successfulJobsCount} IoT sensors, ` +
124+
`with failures in aggregating data from ${errors.length} IoT sensors`
125+
);
126+
}
127+
128+
/**
129+
* Handles the data aggregation process for a specified IoT sensor.
130+
*
131+
* @param sensorUID - The unique identifier of the IoT sensor whose data is to be aggregated.
132+
* @throws SensorAggregationError - Throws an error if the data aggregation process fails.
133+
*/
134+
async function handleDataAggregation(sensorUID): Promise<void> {
135+
// Implementation goes here.
136+
}
137+
```
138+
82139
Please note that in a real-world scenario, sensor UIDs are more likely to be consumed from a message queue (e.g., RabbitMQ, Kafka, AWS SNS) rather than from an in-memory array. This setup **highlights the benefits** of avoiding backpressure:
83140
We should avoid consuming a message if we cannot start processing it immediately. Working with message queues typically involves acknowledgements, which have timeout mechanisms. Therefore, immediate processing is crucial to ensure efficient and reliable handling of messages.
84141

@@ -125,6 +182,23 @@ A key use case for this method is ensuring stable unit tests. Each test should s
125182

126183
If your component has a termination method (`stop`, `terminate`, or similar), keep that in mind.
127184

185+
## Error Handling for Background Jobs
186+
187+
Background jobs triggered by `startExecution` may throw errors. Unlike the `waitForCompletion` case, the caller has no reference to the corresponding job promise which executes in the background.
188+
189+
Therefore, errors from background jobs are captured by the semaphore and can be extracted using the `extractUncaughtErrors` method. Optionally, you can specify a custom `UncaughtErrorType` as the second generic parameter of the `ZeroBackpressureSemaphore` class. By default, the error type is `Error`.
190+
```ts
191+
const trafficAnalyzerSemaphore = new ZeroBackpressureSemaphore<void, TrafficAnalyzerError>(
192+
maxConcurrentAnalyzers
193+
);
194+
```
195+
The number of accumulated uncaught errors can be obtained via the `amountOfUncaughtErrors` getter method. This can be useful, for example, if the user wants to handle uncaught errors only after a certain threshold is reached.
196+
197+
Even if the user does not intend to perform error-handling with these uncaught errors, it is **important** to periodically call this method when using `startExecution` to prevent the accumulation of errors in memory.
198+
However, there are a few exceptional cases where the user can safely avoid extracting uncaught errors:
199+
- The number of jobs is relatively small and the process is short-lived.
200+
- The jobs never throw errors, thus no uncaught errors are possible.
201+
128202
## Unavoidable / Implicit Backpressure
129203

130204
Mitigating backpressure is primarily associated with the `startExecution` method, particularly in scenarios involving multiple jobs. However, the single-job use case may certainly inflict backpressure on the Node.js micro-tasks queue.

dist/zero-backpressure-semaphore.d.ts

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,26 @@ export type SemaphoreJob<T> = () => Promise<T>;
3131
* post-processing logic, and ensure a clear state between unit-tests.
3232
* If your component has a termination method (`stop`, `terminate`, or similar), keep that in mind.
3333
*
34+
* ### Error Handling for Background Jobs
35+
* Background jobs triggered by `startExecution` may throw errors. Unlike the `waitForCompletion` case,
36+
* the caller has no reference to the corresponding job promise which executes in the background.
37+
* Therefore, errors from background jobs are captured by the semaphore and can be extracted using
38+
* the `extractUncaughtErrors` method. The number of accumulated uncaught errors can be obtained via
39+
* the `amountOfUncaughtErrors` getter method. This can be useful, for example, if the user wants to
40+
* handle uncaught errors only after a certain threshold is reached.
41+
*
3442
* ### Time Complexity
3543
* - **Initialization**: O(maxConcurrentJobs) for both time and space.
3644
* - **startExecution, waitForCompletion**: O(1) for both time and space, excluding the job execution itself.
3745
* - **waitTillAllExecutingJobsAreSettled**: O(maxConcurrentJobs) for both time and space, excluding job executions.
3846
* - **maxConcurrentJobs, isAvailable, amountOfCurrentlyExecutingJobs**: O(1) for both time and space.
3947
*/
40-
export declare class ZeroBackpressureSemaphore<T> {
48+
export declare class ZeroBackpressureSemaphore<T, UncaughtErrorType = Error> {
4149
private readonly _availableRoomsStack;
4250
private readonly _rooms;
4351
private _waitForAvailableRoom?;
4452
private _notifyAvailableRoomExists?;
53+
private _uncaughtErrors;
4554
constructor(maxConcurrentJobs: number);
4655
/**
4756
* maxConcurrentJobs
@@ -61,6 +70,12 @@ export declare class ZeroBackpressureSemaphore<T> {
6170
* @returns The number of jobs currently being executed by the semaphore.
6271
*/
6372
get amountOfCurrentlyExecutingJobs(): number;
73+
/**
74+
* amountOfUncaughtErrors
75+
*
76+
* @returns The number of uncaught errors from background jobs, triggered by `startExecution`.
77+
*/
78+
get amountOfUncaughtErrors(): number;
6479
/**
6580
* startExecution
6681
*
@@ -111,6 +126,26 @@ export declare class ZeroBackpressureSemaphore<T> {
111126
* @returns A promise that resolves when all currently executing jobs are settled.
112127
*/
113128
waitTillAllExecutingJobsAreSettled(): Promise<void>;
129+
/**
130+
* extractUncaughtErrors
131+
*
132+
* This method returns an array of uncaught errors, captured by the semaphore while executing
133+
* background jobs added by `startExecution`. The term `extract` implies that the semaphore
134+
* instance will no longer hold these error references once extracted, unlike `get`. In other
135+
* words, ownership of these uncaught errors shifts to the caller, while the semaphore clears
136+
* its list of uncaught errors.
137+
*
138+
* Even if the user does not intend to perform error-handling with these uncaught errors, it is
139+
* important to periodically call this method when using `startExecution` to prevent the
140+
* accumulation of errors in memory.
141+
* However, there are a few exceptional cases where the user can safely avoid extracting
142+
* uncaught errors:
143+
* - The number of jobs is relatively small and the process is short-lived.
144+
* - The jobs never throw errors, thus no uncaught errors are possible.
145+
*
146+
* @returns An array of uncaught errors from background jobs triggered by `startExecution`.
147+
*/
148+
extractUncaughtErrors(): UncaughtErrorType[];
114149
private _getAvailableRoom;
115150
/**
116151
* _handleJobExecution

dist/zero-backpressure-semaphore.js

Lines changed: 46 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/zero-backpressure-semaphore.js.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)