Skip to content

Commit 22fdaff

Browse files
committed
fix worker queue depth enforcement
1 parent acece57 commit 22fdaff

File tree

2 files changed

+22
-1
lines changed

2 files changed

+22
-1
lines changed

packages/redis-worker/src/fair-queue/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1119,6 +1119,9 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
11191119
if (depth >= this.workerQueueMaxDepth) {
11201120
return 0;
11211121
}
1122+
// Cap claim size to remaining capacity so we don't overshoot the depth limit
1123+
const remainingCapacity = this.workerQueueMaxDepth - depth;
1124+
maxClaimCount = Math.min(maxClaimCount, remainingCapacity);
11221125
}
11231126

11241127
// Claim batch of messages with visibility timeout

packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1333,7 +1333,19 @@ describe("FairQueue", () => {
13331333
});
13341334
}
13351335

1336-
// Start processing
1336+
// Start processing and track peak worker queue depth
1337+
let peakDepth = 0;
1338+
let polling = true;
1339+
const depthPoller = (async () => {
1340+
while (polling) {
1341+
const depth = await workerQueueManager.getLength(TEST_WORKER_QUEUE_ID);
1342+
if (depth > peakDepth) {
1343+
peakDepth = depth;
1344+
}
1345+
await new Promise((resolve) => setTimeout(resolve, 25));
1346+
}
1347+
})();
1348+
13371349
queue.start();
13381350

13391351
// Verify all messages eventually get processed (depth cap doesn't permanently block)
@@ -1344,6 +1356,12 @@ describe("FairQueue", () => {
13441356
{ timeout: 25000 }
13451357
);
13461358

1359+
polling = false;
1360+
await depthPoller;
1361+
1362+
// Verify the depth cap was respected during processing
1363+
expect(peakDepth).toBeLessThanOrEqual(maxDepth);
1364+
13471365
// Verify the worker queue is drained
13481366
const finalDepth = await workerQueueManager.getLength(TEST_WORKER_QUEUE_ID);
13491367
expect(finalDepth).toBe(0);

0 commit comments

Comments
 (0)