Skip to content

Commit 26549e8

Browse files
committed
README improvements: Clarifying that waitForAvailability is optional
1 parent 63961e2 commit 26549e8

File tree

1 file changed

+48
-14
lines changed

1 file changed

+48
-14
lines changed

README.md

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,14 @@ async function handleDataAggregation(sensorUID): Promise<void> {
150150
```
151151

152152
Please note that in a real-world scenario, sensor UIDs may be consumed from a message queue (e.g., RabbitMQ, Kafka, AWS SNS) rather than from an in-memory array. This setup **highlights the benefits** of avoiding backpressure:
153-
Working with message queues typically involves acknowledgements, which have **timeout** mechanisms. Therefore, immediate processing is crucial to ensure efficient and reliable handling of messages. Backpressure on the semaphore means that messages experience longer delays before their corresponding jobs start execution. The `waitForAvailability` method addresses this need by checking availability as a preliminary action, **before** consuming a message.
153+
Working with message queues typically involves acknowledgements, which have **timeout** mechanisms. Therefore, immediate processing is crucial to ensure efficient and reliable handling of messages. Backpressure on the semaphore means that messages experience longer delays before their corresponding jobs start execution.
154154
Refer to the following adaptation of the previous example, where sensor UIDs are consumed from a message queue. This example overlooks error handling and message validation, for simplicity.
155155

156156
```ts
157-
import { ZeroBackpressureSemaphore } from 'zero-backpressure-semaphore-typescript';
157+
import {
158+
ZeroBackpressureSemaphore,
159+
SemaphoreJob
160+
} from 'zero-backpressure-semaphore-typescript';
158161

159162
const maxConcurrentAggregationJobs = 24;
160163
const sensorAggregationSemaphore =
@@ -167,26 +170,29 @@ const mqClient = new MessageQueueClient(SENSOR_UIDS_TOPIC);
167170

168171
async function processConsumedMessages(): Promise<void> {
169172
let numberOfProcessedMessages = 0;
173+
let isQueueEmpty = false;
174+
175+
const processOneMessage: SemaphoreJob<void> = async (): Promise<void> => {
176+
if (isQueueEmpty) {
177+
return;
178+
}
170179

171-
do {
172-
await sensorAggregationSemaphore.waitForAvailability();
173180
const message = await mqClient.receiveOneMessage();
174181
if (!message) {
175182
// Consider the queue as empty.
176-
break;
183+
isQueueEmpty = true;
184+
return;
177185
}
178186

179187
++numberOfProcessedMessages;
180188
const { uid } = message.data;
181-
182-
// At this point, `startExecution` will begin immediately, due to the
183-
// preliminary `waitForAvailability` action.
184-
await sensorAggregationSemaphore.startExecution(
185-
(): Promise<void> => handleDataAggregation(uid);
186-
);
187-
189+
await handleDataAggregation(uid);
188190
await mqClient.removeMessageFromQueue(message);
189-
} while (true);
191+
};
192+
193+
do {
194+
await sensorAggregationSemaphore.startExecution(processOneMessage);
195+
} while (!isQueueEmpty);
190196
// Note: at this stage, jobs might be still executing, as we did not wait for
191197
// their completion.
192198

@@ -208,11 +214,39 @@ async function processConsumedMessages(): Promise<void> {
208214
}
209215
```
210216

217+
Alternatively, the `waitForAvailability` method can address this need by checking availability as a preliminary action, **before** consuming a message.
218+
219+
```ts
220+
async function processConsumedMessages(): Promise<void> {
221+
let numberOfProcessedMessages = 0;
222+
223+
do {
224+
await sensorAggregationSemaphore.waitForAvailability();
225+
const message = await mqClient.receiveOneMessage();
226+
if (!message) {
227+
// Consider the queue as empty.
228+
break;
229+
}
230+
231+
++numberOfProcessedMessages;
232+
const { uid } = message.data;
233+
234+
// At this point, `startExecution` will begin immediately, due to the
235+
// preliminary `waitForAvailability` action.
236+
await sensorAggregationSemaphore.startExecution(
237+
(): Promise<void> => handleDataAggregation(uid);
238+
);
239+
240+
await mqClient.removeMessageFromQueue(message);
241+
} while (true);
242+
}
243+
```
244+
211245
In reference to the above example, please note that `waitForAvailability` may be considered overkill or redundant if the job's duration is significantly shorter than the message timeout.
212246
For example, if the message queue's timeout for acknowledging a message is 1 minute and a typical job duration is 1 second, the 59 second gap provides a substantial safety margin. In such cases, the preliminary `waitForAvailability` action can be omitted.
213247
On the other hand, given that the timeout is 30 seconds and a typical job duration is 20 seconds, using `waitForAvailability` is sensible. This is because `startExecution` might have to wait 20 seconds before the job can begin, resulting in a total of 40 seconds from the invocation of `startExecution` until the job completes.
214248

215-
As a general rule, `waitForAvailability` is advisable whenever a timeout mechanism is involved, and the timeout period begins **before** the job starts execution.
249+
As a general rule, `waitForAvailability` is advisable whenever a timeout mechanism is involved, and the timeout period begins **before** the job starts execution. Note that the same effect can be achieved with `startExecution` alone, if the timeout-triggering logic is included in the job itself (such as, consuming a message). Both approaches are valid.
216250

217251
## 2nd use-case: Single Job Execution
218252

0 commit comments

Comments
 (0)