Skip to content

Commit 8e142be

Browse files
committed
fix rate limiter loop
1 parent 8d2b9ca commit 8e142be

File tree

1 file changed

+24
-5
lines changed
  • internal-packages/run-engine/src/batch-queue

1 file changed

+24
-5
lines changed

internal-packages/run-engine/src/batch-queue/index.ts

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -645,15 +645,34 @@ export class BatchQueue {
645645
}
646646

647647
try {
648-
// Rate limit per-item at the processing level (1 token per message)
648+
// Rate limit per-item at the processing level (1 token per message).
649+
// Loop until allowed so multiple consumers don't all rush through after one sleep.
649650
if (this.globalRateLimiter) {
650-
const result = await this.globalRateLimiter.limit();
651-
if (!result.allowed && result.resetAt) {
652-
const waitMs = Math.max(0, result.resetAt - Date.now());
651+
while (this.isRunning) {
652+
const result = await this.globalRateLimiter.limit();
653+
if (result.allowed) {
654+
break;
655+
}
656+
const waitMs = Math.max(0, (result.resetAt ?? Date.now()) - Date.now());
653657
if (waitMs > 0) {
654-
await new Promise((resolve) => setTimeout(resolve, waitMs));
658+
await new Promise<void>((resolve, reject) => {
659+
const timer = setTimeout(resolve, waitMs);
660+
const onAbort = () => {
661+
clearTimeout(timer);
662+
reject(this.abortController.signal.reason);
663+
};
664+
if (this.abortController.signal.aborted) {
665+
clearTimeout(timer);
666+
reject(this.abortController.signal.reason);
667+
return;
668+
}
669+
this.abortController.signal.addEventListener("abort", onAbort, { once: true });
670+
});
655671
}
656672
}
673+
if (!this.isRunning) {
674+
break;
675+
}
657676
}
658677

659678
await this.batchedSpanManager.withBatchedSpan(

0 commit comments

Comments
 (0)