From 34af71e8da6f7299971ff8e02dd38a76aaa9d1d9 Mon Sep 17 00:00:00 2001 From: Mike Wild Date: Tue, 28 Apr 2026 10:54:19 +0100 Subject: [PATCH 01/11] Rate limit/circuit breaker fixes and logging improvements Additional handler logging and observability Fix retry time for partial batch rate limiting Add follow option to debug test script Add since var to int debug script Fix issue when half open and all probes fail High resolution storage metrics --- .../src/__tests__/index.component.test.ts | 1 + .../src/__tests__/services/metrics.test.ts | 15 ++- .../src/services/metrics.ts | 55 +++++++-- .../src/__tests__/delivery-metrics.test.ts | 11 ++ .../__tests__/delivery-observability.test.ts | 15 ++- .../src/__tests__/handler.test.ts | 92 +++++++++++++- .../src/__tests__/record-result-lua.test.ts | 23 +++- lambdas/https-client-lambda/src/handler.ts | 116 ++++++++++++++---- .../src/services/delivery-metrics.ts | 67 ++++++++-- .../src/services/delivery-observability.ts | 17 ++- .../src/services/record-result.lua | 11 ++ scripts/tests/integration-debug.sh | 36 ++++-- 12 files changed, 388 insertions(+), 71 deletions(-) diff --git a/lambdas/client-transform-filter-lambda/src/__tests__/index.component.test.ts b/lambdas/client-transform-filter-lambda/src/__tests__/index.component.test.ts index b234a244..8198cdfb 100644 --- a/lambdas/client-transform-filter-lambda/src/__tests__/index.component.test.ts +++ b/lambdas/client-transform-filter-lambda/src/__tests__/index.component.test.ts @@ -33,6 +33,7 @@ jest.mock("aws-embedded-metrics", () => ({ Count: "Count", Milliseconds: "Milliseconds", }, + StorageResolution: { High: 1, Standard: 60 }, })); import { GetObjectCommand, NoSuchKey } from "@aws-sdk/client-s3"; diff --git a/lambdas/client-transform-filter-lambda/src/__tests__/services/metrics.test.ts b/lambdas/client-transform-filter-lambda/src/__tests__/services/metrics.test.ts index bdbcc3aa..405256e6 100644 --- a/lambdas/client-transform-filter-lambda/src/__tests__/services/metrics.test.ts +++ b/lambdas/client-transform-filter-lambda/src/__tests__/services/metrics.test.ts @@ -1,7 +1,11 @@ -import { Unit, createMetricsLogger } from "aws-embedded-metrics"; +import { StorageResolution, Unit, createMetricsLogger } from "aws-embedded-metrics"; import { CallbackMetrics, createMetricLogger } from "services/metrics"; -jest.mock("aws-embedded-metrics"); +jest.mock("aws-embedded-metrics", () => ({ + Unit: { Count: "Count" }, + StorageResolution: { High: 1, Standard: 60 }, + createMetricsLogger: jest.fn(), +})); const mockPutMetric = jest.fn(); const mockSetDimensions = jest.fn(); @@ -80,6 +84,7 @@ describe("CallbackMetrics", () => { "EventsReceived", 1, Unit.Count, + StorageResolution.High, ); }); }); @@ -92,6 +97,7 @@ describe("CallbackMetrics", () => { "TransformationsSuccessful", 1, Unit.Count, + StorageResolution.High, ); }); }); @@ -104,6 +110,7 @@ describe("CallbackMetrics", () => { "TransformationsFailed", 1, Unit.Count, + StorageResolution.High, ); }); }); @@ -116,6 +123,7 @@ describe("CallbackMetrics", () => { "CallbacksInitiated", 1, Unit.Count, + StorageResolution.High, ); }); }); @@ -128,6 +136,7 @@ describe("CallbackMetrics", () => { "ValidationErrors", 1, Unit.Count, + StorageResolution.High, ); }); }); @@ -140,6 +149,7 @@ describe("CallbackMetrics", () => { "FilteringStarted", 1, Unit.Count, + StorageResolution.High, ); }); }); @@ -152,6 +162,7 @@ describe("CallbackMetrics", () => { "FilteringMatched", 1, Unit.Count, + StorageResolution.High, ); }); }); diff --git a/lambdas/client-transform-filter-lambda/src/services/metrics.ts b/lambdas/client-transform-filter-lambda/src/services/metrics.ts index 398c5ecc..fd98ec02 100644 --- a/lambdas/client-transform-filter-lambda/src/services/metrics.ts +++ b/lambdas/client-transform-filter-lambda/src/services/metrics.ts @@ -1,4 +1,8 @@ -import { Unit, createMetricsLogger } from "aws-embedded-metrics"; +import { + StorageResolution, + Unit, + createMetricsLogger, +} from "aws-embedded-metrics"; import type { MetricsLogger } from "aws-embedded-metrics"; export const createMetricLogger = (): MetricsLogger => { @@ -21,30 +25,65 @@ export class CallbackMetrics { constructor(private readonly metrics: MetricsLogger) {} emitEventReceived(): void { - this.metrics.putMetric("EventsReceived", 1, Unit.Count); + this.metrics.putMetric( + "EventsReceived", + 1, + Unit.Count, + StorageResolution.High, + ); } emitTransformationSuccess(): void { - this.metrics.putMetric("TransformationsSuccessful", 1, Unit.Count); + this.metrics.putMetric( + "TransformationsSuccessful", + 1, + Unit.Count, + StorageResolution.High, + ); } emitTransformationFailure(): void { - this.metrics.putMetric("TransformationsFailed", 1, Unit.Count); + this.metrics.putMetric( + "TransformationsFailed", + 1, + Unit.Count, + StorageResolution.High, + ); } emitDeliveryInitiated(): void { - this.metrics.putMetric("CallbacksInitiated", 1, Unit.Count); + this.metrics.putMetric( + "CallbacksInitiated", + 1, + Unit.Count, + StorageResolution.High, + ); } emitValidationError(): void { - this.metrics.putMetric("ValidationErrors", 1, Unit.Count); + this.metrics.putMetric( + "ValidationErrors", + 1, + Unit.Count, + StorageResolution.High, + ); } emitFilteringStarted(): void { - this.metrics.putMetric("FilteringStarted", 1, Unit.Count); + this.metrics.putMetric( + "FilteringStarted", + 1, + Unit.Count, + StorageResolution.High, + ); } emitFilteringMatched(): void { - this.metrics.putMetric("FilteringMatched", 1, Unit.Count); + this.metrics.putMetric( + "FilteringMatched", + 1, + Unit.Count, + StorageResolution.High, + ); } } diff --git a/lambdas/https-client-lambda/src/__tests__/delivery-metrics.test.ts b/lambdas/https-client-lambda/src/__tests__/delivery-metrics.test.ts index 803c19bb..249b0148 100644 --- a/lambdas/https-client-lambda/src/__tests__/delivery-metrics.test.ts +++ b/lambdas/https-client-lambda/src/__tests__/delivery-metrics.test.ts @@ -1,6 +1,7 @@ const mockCreateMetricsLogger = jest.fn(); jest.mock("aws-embedded-metrics", () => ({ Unit: { Count: "Count", Milliseconds: "Milliseconds" }, + StorageResolution: { High: 1, Standard: 60 }, createMetricsLogger: () => mockCreateMetricsLogger(), })); @@ -83,6 +84,7 @@ describe("delivery-metrics", () => { "DeliveryAttempt", 1, "Count", + 1, ); }); @@ -96,6 +98,7 @@ describe("delivery-metrics", () => { "DeliverySuccess", 1, "Count", + 1, ); }); @@ -109,6 +112,7 @@ describe("delivery-metrics", () => { "DeliveryFailure", 1, "Count", + 1, ); }); @@ -123,6 +127,7 @@ describe("delivery-metrics", () => { "DeliveryPermanentFailure", 1, "Count", + 1, ); }); @@ -137,6 +142,7 @@ describe("delivery-metrics", () => { "CircuitBreakerOpen", 1, "Count", + 1, ); }); @@ -151,6 +157,7 @@ describe("delivery-metrics", () => { "DeliveryRateLimited", 1, "Count", + 1, ); }); @@ -165,6 +172,7 @@ describe("delivery-metrics", () => { "CircuitBreakerClosed", 1, "Count", + 1, ); }); @@ -179,6 +187,7 @@ describe("delivery-metrics", () => { "DeliveryRetryWindowExhausted", 1, "Count", + 1, ); }); @@ -201,6 +210,7 @@ describe("delivery-metrics", () => { "AdmissionDenied", 1, "Count", + 1, ); }); @@ -215,6 +225,7 @@ describe("delivery-metrics", () => { "DeliveryDurationMs", 250, "Milliseconds", + 1, ); }); diff --git a/lambdas/https-client-lambda/src/__tests__/delivery-observability.test.ts b/lambdas/https-client-lambda/src/__tests__/delivery-observability.test.ts index 25e164a9..2c0c0543 100644 --- a/lambdas/https-client-lambda/src/__tests__/delivery-observability.test.ts +++ b/lambdas/https-client-lambda/src/__tests__/delivery-observability.test.ts @@ -39,7 +39,7 @@ describe("delivery-observability", () => { ); const { logger } = jest.requireMock("@nhs-notify-client-callbacks/logger"); - recordDeliveryAttempt("client-1", "target-1", "msg-123"); + recordDeliveryAttempt("client-1", "target-1", "msg-123", "sqs-msg-1", 3); expect(emitDeliveryAttempt).toHaveBeenCalledWith("target-1"); expect(logger.info).toHaveBeenCalledWith( @@ -48,6 +48,8 @@ describe("delivery-observability", () => { clientId: "client-1", targetId: "target-1", correlationId: "msg-123", + sqsMessageId: "sqs-msg-1", + receiveCount: 3, }), ); }); @@ -190,25 +192,30 @@ describe("delivery-observability", () => { ); }); - it("recordAdmissionDenied emits metric and logs", () => { + it("recordAdmissionDenied emits per-record metrics and logs", () => { const { emitAdmissionDenied } = jest.requireMock( "services/delivery-metrics", ); const { logger } = jest.requireMock("@nhs-notify-client-callbacks/logger"); - recordAdmissionDenied("client-1", "target-1", "rate_limited", "msg-123"); + recordAdmissionDenied("client-1", "target-1", "rate_limited", [ + "msg-a", + "msg-b", + ]); expect(emitAdmissionDenied).toHaveBeenCalledWith( "target-1", "rate_limited", + 2, ); expect(logger.warn).toHaveBeenCalledWith( "Admission denied", expect.objectContaining({ clientId: "client-1", targetId: "target-1", - correlationId: "msg-123", reason: "rate_limited", + deniedCount: 2, + correlationIds: ["msg-a", "msg-b"], }), ); }); diff --git a/lambdas/https-client-lambda/src/__tests__/handler.test.ts b/lambdas/https-client-lambda/src/__tests__/handler.test.ts index ab5dff46..7121cbeb 100644 --- a/lambdas/https-client-lambda/src/__tests__/handler.test.ts +++ b/lambdas/https-client-lambda/src/__tests__/handler.test.ts @@ -197,9 +197,18 @@ describe("processRecords", () => { }); it("delivers only admitted records when consumedTokens is less than batch size", async () => { - const record1 = makeRecord({ messageId: "msg-1" }); - const record2 = makeRecord({ messageId: "msg-2" }); - const record3 = makeRecord({ messageId: "msg-3" }); + const record1 = makeRecord({ + messageId: "msg-1", + receiptHandle: "receipt-1", + }); + const record2 = makeRecord({ + messageId: "msg-2", + receiptHandle: "receipt-2", + }); + const record3 = makeRecord({ + messageId: "msg-3", + receiptHandle: "receipt-3", + }); mockAdmit.mockResolvedValue({ allowed: true, @@ -207,6 +216,10 @@ describe("processRecords", () => { effectiveRate: 10, }); + const { recordAdmissionDenied } = jest.requireMock( + "services/delivery-observability", + ); + const failures = await processRecords([record1, record2, record3]); expect(mockDeliverPayload).toHaveBeenCalledTimes(1); @@ -214,6 +227,15 @@ describe("processRecords", () => { { itemIdentifier: "msg-2" }, { itemIdentifier: "msg-3" }, ]); + expect(recordAdmissionDenied).toHaveBeenCalledWith( + "client-1", + "target-1", + "rate_limited", + ["test-message-id", "test-message-id"], + ); + + expect(mockChangeVisibility).toHaveBeenCalledWith("receipt-2", 1); + expect(mockChangeVisibility).toHaveBeenCalledWith("receipt-3", 1); }); it("an unexpected delivery error does not prevent other records in the batch", async () => { @@ -651,4 +673,68 @@ describe("processRecords", () => { expect(mockLoadTargetConfig).toHaveBeenCalledWith("client-1", "target-1"); expect(mockLoadTargetConfig).toHaveBeenCalledWith("client-1", "target-2"); }); + + it("calls recordAdmissionDenied with correlationIds when batch denied", async () => { + const record1 = makeRecord({ messageId: "msg-1" }); + const record2 = makeRecord({ messageId: "msg-2" }); + + mockAdmit.mockResolvedValue({ + allowed: false, + reason: "circuit_open", + retryAfterMs: 30_000, + effectiveRate: 0, + }); + + const { recordAdmissionDenied } = jest.requireMock( + "services/delivery-observability", + ); + + await processRecords([record1, record2]); + + expect(recordAdmissionDenied).toHaveBeenCalledWith( + "client-1", + "target-1", + "circuit_open", + ["test-message-id", "test-message-id"], + ); + }); + + it("logs deliveredCount and dlqCount in batch complete", async () => { + const record1 = makeRecord({ messageId: "msg-1" }); + const record2 = makeRecord({ messageId: "msg-2" }); + + mockDeliverPayload + .mockResolvedValueOnce({ outcome: "success" }) + .mockResolvedValueOnce({ outcome: "permanent_failure" }); + + const { logger } = jest.requireMock("@nhs-notify-client-callbacks/logger"); + + await processRecords([record1, record2]); + + expect(logger.info).toHaveBeenCalledWith( + "Batch complete", + expect.objectContaining({ + batchSize: 2, + deliveredCount: 1, + dlqCount: 1, + failureCount: 0, + }), + ); + }); + + it("includes correlationId in error log on unexpected delivery failure", async () => { + mockDeliverPayload.mockRejectedValue(new Error("Connection reset")); + + const { logger } = jest.requireMock("@nhs-notify-client-callbacks/logger"); + + await processRecords([makeRecord()]); + + expect(logger.error).toHaveBeenCalledWith( + "Failed to process record", + expect.objectContaining({ + messageId: "msg-1", + correlationId: "test-message-id", + }), + ); + }); }); diff --git a/lambdas/https-client-lambda/src/__tests__/record-result-lua.test.ts b/lambdas/https-client-lambda/src/__tests__/record-result-lua.test.ts index 00e04707..1d1650d8 100644 --- a/lambdas/https-client-lambda/src/__tests__/record-result-lua.test.ts +++ b/lambdas/https-client-lambda/src/__tests__/record-result-lua.test.ts @@ -319,10 +319,11 @@ describe("record-result.lua", () => { expect(Number(epHash.get("switched_at"))).toBe(now); }); - it("does not close when half-open but all attempts failed", () => { + it("resets to fully open when half-open and all probes fail", () => { const store = createRedisStore(); const now = 1_000_000; const switchedAt = now - 130_000; + const samplePeriodMs = 300_000; store.set( "ep:t1", @@ -330,18 +331,34 @@ describe("record-result.lua", () => { ["is_open", "1"], ["switched_at", switchedAt.toString()], ["sample_till", "9999999999"], + ["cur_attempts", "3"], + ["cur_failures", "3"], + ["prev_attempts", "2"], + ["prev_failures", "1"], ]), ); const [circuitState, stateChanged] = runRecordResult(store, { now, cooldownPeriodMs: 120_000, + samplePeriodMs, consumedTokens: 1, processingFailures: 1, }); - expect(circuitState).toBe("half_open"); - expect(stateChanged).toBe(0); + // Circuit reset to fully open; cooldown restarts + expect(circuitState).toBe("open"); + expect(stateChanged).toBe(1); + + const epHash = store.get("ep:t1")!; + expect(epHash.get("is_open")).toBe("1"); + expect(Number(epHash.get("switched_at"))).toBe(now); + // Sampling counters cleared so the next half-open window starts fresh + expect(epHash.get("cur_attempts")).toBe("0"); + expect(epHash.get("cur_failures")).toBe("0"); + expect(epHash.get("prev_attempts")).toBe("0"); + expect(epHash.get("prev_failures")).toBe("0"); + expect(Number(epHash.get("sample_till"))).toBe(now + samplePeriodMs); }); }); diff --git a/lambdas/https-client-lambda/src/handler.ts b/lambdas/https-client-lambda/src/handler.ts index b129fc8d..19420462 100644 --- a/lambdas/https-client-lambda/src/handler.ts +++ b/lambdas/https-client-lambda/src/handler.ts @@ -99,14 +99,20 @@ function groupByTarget(records: SQSRecord[]): TargetBatch[] { ); } +function extractCorrelationId( + message: CallbackDeliveryMessage, +): string | undefined { + return message.payload.data[0]?.attributes?.messageId; +} + async function deliverRecord( record: SQSRecord, message: CallbackDeliveryMessage, target: Awaited>, applicationId: string, clientId: string, -): Promise<{ success: boolean }> { - const correlationId = message.payload.data[0]?.attributes?.messageId; +): Promise<{ success: boolean; dlq: boolean }> { + const correlationId = extractCorrelationId(message); const maxRetryDurationMs = target.delivery?.maxRetryDurationSeconds === undefined @@ -120,7 +126,7 @@ async function deliverRecord( if (isWindowExhausted(firstReceivedMs, maxRetryDurationMs)) { recordRetryWindowExhausted(clientId, message.targetId, correlationId); await sendToDlq(record.body); - return { success: true }; + return { success: true, dlq: true }; } const agent = await buildAgent(target); @@ -131,14 +137,20 @@ async function deliverRecord( ); const payloadJson = JSON.stringify(message.payload); - recordDeliveryAttempt(clientId, message.targetId, correlationId); + recordDeliveryAttempt( + clientId, + message.targetId, + correlationId, + record.messageId, + Number(record.attributes.ApproximateReceiveCount), + ); const deliveryStart = Date.now(); const result = await deliverPayload(target, payloadJson, signature, agent); recordDeliveryDuration(message.targetId, Date.now() - deliveryStart); if (result.outcome === OUTCOME_SUCCESS) { recordDeliverySuccess(clientId, message.targetId, correlationId); - return { success: true }; + return { success: true, dlq: false }; } if (result.outcome === OUTCOME_PERMANENT_FAILURE) { @@ -150,7 +162,7 @@ async function deliverRecord( correlationId, ); await sendToDlq(record.body, result); - return { success: true }; + return { success: true, dlq: true }; } if (result.outcome === OUTCOME_RATE_LIMITED) { @@ -163,7 +175,7 @@ async function deliverRecord( result.retryAfterHeader, receiveCount, ); - return { success: true }; + return { success: true, dlq: false }; } const receiveCount = Number(record.attributes.ApproximateReceiveCount); @@ -177,7 +189,32 @@ async function deliverRecord( correlationId, ); await changeVisibility(record.receiptHandle, backoffSec); - return { success: false }; + return { success: false, dlq: false }; +} + +type TargetBatchResult = { + failures: SQSBatchItemFailure[]; + deliveredCount: number; + dlqCount: number; +}; + +async function handleBatchDenied( + batch: TargetBatch, + clientId: string, + reason: string, + retryAfterMs: number, +): Promise { + const delaySec = Math.ceil(retryAfterMs / 1000); + const correlationIds = batch.messages.map((m) => extractCorrelationId(m)); + recordAdmissionDenied(clientId, batch.targetId, reason, correlationIds); + const failures: SQSBatchItemFailure[] = []; + for (const record of batch.records) { + // eslint-disable-next-line sonarjs/pseudo-random -- jitter for backoff, not security-sensitive + const jitterSec = Math.floor(Math.random() * 5); + await changeVisibility(record.receiptHandle, delaySec + jitterSec); + failures.push({ itemIdentifier: record.messageId }); + } + return { failures, deliveredCount: 0, dlqCount: 0 }; } async function processTargetBatch( @@ -185,7 +222,7 @@ async function processTargetBatch( redis: RedisClientType, clientId: string, concurrencyLimit: number, -): Promise { +): Promise { const target = await loadTargetConfig(clientId, batch.targetId); const cbEnabled = target.delivery?.circuitBreaker?.enabled ?? false; @@ -199,16 +236,12 @@ async function processTargetBatch( ); if (!gateResult.allowed) { - const baseDelaySec = Math.ceil(gateResult.retryAfterMs / 1000); - recordAdmissionDenied(clientId, batch.targetId, gateResult.reason); - const failures: SQSBatchItemFailure[] = []; - for (const record of batch.records) { - // eslint-disable-next-line sonarjs/pseudo-random -- jitter for backoff, not security-sensitive - const jitterSec = Math.floor(Math.random() * 5); - await changeVisibility(record.receiptHandle, baseDelaySec + jitterSec); - failures.push({ itemIdentifier: record.messageId }); - } - return failures; + return handleBatchDenied( + batch, + clientId, + gateResult.reason, + gateResult.retryAfterMs, + ); } const { consumedTokens } = gateResult; @@ -223,7 +256,10 @@ async function processTargetBatch( const deliveryResults = await pMap( admitted, - async (record, index): Promise<{ record: SQSRecord; success: boolean }> => { + async ( + record, + index, + ): Promise<{ record: SQSRecord; success: boolean; dlq: boolean }> => { try { const outcome = await deliverRecord( record, @@ -232,10 +268,12 @@ async function processTargetBatch( applicationId, clientId, ); - return { record, success: outcome.success }; + return { record, success: outcome.success, dlq: outcome.dlq }; } catch (error) { + const correlationId = extractCorrelationId(admittedMessages[index]); logger.error("Failed to process record", { messageId: record.messageId, + correlationId, err: error, }); const receiveCount = Number(record.attributes.ApproximateReceiveCount); @@ -243,7 +281,7 @@ async function processTargetBatch( record.receiptHandle, jitteredBackoffSeconds(receiveCount), ); - return { record, success: false }; + return { record, success: false, dlq: false }; } }, { concurrency: concurrencyLimit }, @@ -256,6 +294,11 @@ async function processTargetBatch( } } + const deliveredCount = deliveryResults.filter( + (r) => r.success && !r.dlq, + ).length; + const dlqCount = deliveryResults.filter((r) => r.dlq).length; + if (cbEnabled && consumedTokens > 0) { const cbOutcome = await recordResult( redis, @@ -275,11 +318,24 @@ async function processTargetBatch( } } - for (const record of rejected) { - failures.push({ itemIdentifier: record.messageId }); + if (rejected.length > 0) { + const rejectedMessages = batch.messages.slice(consumedTokens); + const rejectedCorrelationIds = rejectedMessages.map((m) => + extractCorrelationId(m), + ); + recordAdmissionDenied( + clientId, + batch.targetId, + "rate_limited", + rejectedCorrelationIds, + ); + for (const record of rejected) { + await changeVisibility(record.receiptHandle, 1); + failures.push({ itemIdentifier: record.messageId }); + } } - return failures; + return { failures, deliveredCount, dlqCount }; } export async function processRecords( @@ -302,19 +358,25 @@ export async function processRecords( const targetBatches = groupByTarget(records); const allFailures: SQSBatchItemFailure[] = []; + let totalDelivered = 0; + let totalDlq = 0; for (const batch of targetBatches) { - const batchFailures = await processTargetBatch( + const batchResult = await processTargetBatch( batch, redis, CLIENT_ID, concurrencyLimit, ); - allFailures.push(...batchFailures); + allFailures.push(...batchResult.failures); + totalDelivered += batchResult.deliveredCount; + totalDlq += batchResult.dlqCount; } logger.info("Batch complete", { batchSize: records.length, + deliveredCount: totalDelivered, + dlqCount: totalDlq, failureCount: allFailures.length, }); diff --git a/lambdas/https-client-lambda/src/services/delivery-metrics.ts b/lambdas/https-client-lambda/src/services/delivery-metrics.ts index 68248591..dcefaf57 100644 --- a/lambdas/https-client-lambda/src/services/delivery-metrics.ts +++ b/lambdas/https-client-lambda/src/services/delivery-metrics.ts @@ -1,4 +1,8 @@ -import { Unit, createMetricsLogger } from "aws-embedded-metrics"; +import { + StorageResolution, + Unit, + createMetricsLogger, +} from "aws-embedded-metrics"; import type { MetricsLogger } from "aws-embedded-metrics"; let metricsInstance: MetricsLogger | undefined; @@ -28,56 +32,90 @@ function getMetrics(): MetricsLogger { export function emitDeliveryAttempt(targetId: string): void { const metrics = getMetrics(); metrics.setProperty("targetId", targetId); - metrics.putMetric("DeliveryAttempt", 1, Unit.Count); + metrics.putMetric("DeliveryAttempt", 1, Unit.Count, StorageResolution.High); } export function emitDeliverySuccess(targetId: string): void { const metrics = getMetrics(); metrics.setProperty("targetId", targetId); - metrics.putMetric("DeliverySuccess", 1, Unit.Count); + metrics.putMetric("DeliverySuccess", 1, Unit.Count, StorageResolution.High); } export function emitDeliveryFailure(targetId: string): void { const metrics = getMetrics(); metrics.setProperty("targetId", targetId); - metrics.putMetric("DeliveryFailure", 1, Unit.Count); + metrics.putMetric("DeliveryFailure", 1, Unit.Count, StorageResolution.High); } export function emitDeliveryPermanentFailure(targetId: string): void { const metrics = getMetrics(); metrics.setProperty("targetId", targetId); - metrics.putMetric("DeliveryPermanentFailure", 1, Unit.Count); + metrics.putMetric( + "DeliveryPermanentFailure", + 1, + Unit.Count, + StorageResolution.High, + ); } export function emitRateLimited(targetId: string): void { const metrics = getMetrics(); metrics.setProperty("targetId", targetId); - metrics.putMetric("DeliveryRateLimited", 1, Unit.Count); + metrics.putMetric( + "DeliveryRateLimited", + 1, + Unit.Count, + StorageResolution.High, + ); } export function emitCircuitBreakerOpen(targetId: string): void { const metrics = getMetrics(); metrics.setProperty("targetId", targetId); - metrics.putMetric("CircuitBreakerOpen", 1, Unit.Count); + metrics.putMetric( + "CircuitBreakerOpen", + 1, + Unit.Count, + StorageResolution.High, + ); } export function emitCircuitBreakerClosed(targetId: string): void { const metrics = getMetrics(); metrics.setProperty("targetId", targetId); - metrics.putMetric("CircuitBreakerClosed", 1, Unit.Count); + metrics.putMetric( + "CircuitBreakerClosed", + 1, + Unit.Count, + StorageResolution.High, + ); } export function emitRetryWindowExhausted(targetId: string): void { const metrics = getMetrics(); metrics.setProperty("targetId", targetId); - metrics.putMetric("DeliveryRetryWindowExhausted", 1, Unit.Count); + metrics.putMetric( + "DeliveryRetryWindowExhausted", + 1, + Unit.Count, + StorageResolution.High, + ); } -export function emitAdmissionDenied(targetId: string, reason: string): void { +export function emitAdmissionDenied( + targetId: string, + reason: string, + count = 1, +): void { const metrics = getMetrics(); metrics.setProperty("targetId", targetId); metrics.setProperty("reason", reason); - metrics.putMetric("AdmissionDenied", 1, Unit.Count); + metrics.putMetric( + "AdmissionDenied", + count, + Unit.Count, + StorageResolution.High, + ); } export function emitDeliveryDuration( @@ -86,7 +124,12 @@ export function emitDeliveryDuration( ): void { const metrics = getMetrics(); metrics.setProperty("targetId", targetId); - metrics.putMetric("DeliveryDurationMs", durationMs, Unit.Milliseconds); + metrics.putMetric( + "DeliveryDurationMs", + durationMs, + Unit.Milliseconds, + StorageResolution.High, + ); } export async function flushMetrics(): Promise { diff --git a/lambdas/https-client-lambda/src/services/delivery-observability.ts b/lambdas/https-client-lambda/src/services/delivery-observability.ts index ed41df8a..d722b851 100644 --- a/lambdas/https-client-lambda/src/services/delivery-observability.ts +++ b/lambdas/https-client-lambda/src/services/delivery-observability.ts @@ -16,9 +16,17 @@ export function recordDeliveryAttempt( clientId: string, targetId: string, correlationId?: string, + sqsMessageId?: string, + receiveCount?: number, ): void { emitDeliveryAttempt(targetId); - logger.info("Attempting delivery", { clientId, targetId, correlationId }); + logger.info("Attempting delivery", { + clientId, + targetId, + correlationId, + sqsMessageId, + receiveCount, + }); } export function recordDeliverySuccess( @@ -108,14 +116,15 @@ export function recordAdmissionDenied( clientId: string, targetId: string, reason: string, - correlationId?: string, + correlationIds: (string | undefined)[], ): void { - emitAdmissionDenied(targetId, reason); + emitAdmissionDenied(targetId, reason, correlationIds.length); logger.warn("Admission denied", { clientId, targetId, - correlationId, reason, + deniedCount: correlationIds.length, + correlationIds, }); } diff --git a/lambdas/https-client-lambda/src/services/record-result.lua b/lambdas/https-client-lambda/src/services/record-result.lua index fa42efea..fd916932 100644 --- a/lambdas/https-client-lambda/src/services/record-result.lua +++ b/lambdas/https-client-lambda/src/services/record-result.lua @@ -115,6 +115,17 @@ if isHalfOpen and processingSuccesses > 0 then -- fall through, allow circuit to immediately re-open end +-- Reset to fully open when half-open probe fails entirely; restarts cooldown +if isHalfOpen and processingSuccesses == 0 then + switchedAt = now + curAttempts = 0 + curFailures = 0 + prevAttempts = 0 + prevFailures = 0 + sampleTill = now + samplePeriodMs + stateChanged = true +end + -- Open circuit when closed, enough samples, and threshold exceeded local hasSampledEnough = attempts >= minAttempts if not isOpen and hasSampledEnough and (failures / attempts) > failureThreshold then diff --git a/scripts/tests/integration-debug.sh b/scripts/tests/integration-debug.sh index a4ebbd63..6970bd82 100755 --- a/scripts/tests/integration-debug.sh +++ b/scripts/tests/integration-debug.sh @@ -8,6 +8,7 @@ set -euo pipefail # ENVIRONMENT= AWS_PROFILE= make test-integration-debug ACTION=queue-status # ENVIRONMENT= AWS_PROFILE= make test-integration-debug ACTION=tail-transform # ENVIRONMENT= AWS_PROFILE= make test-integration-debug ACTION=tail-transform LOG_FILTER= +# ENVIRONMENT= AWS_PROFILE= FOLLOW=false make test-integration-debug ACTION=tail-transform > out.log # # Actions: # queue-status Show SQS queue message counts @@ -28,6 +29,8 @@ set -euo pipefail # # Optional: # LOG_FILTER CloudWatch Logs filter pattern / text +# LOG_SINCE How far back to start tailing logs (default: 30m, e.g. 1h, 2h, 30m) +# FOLLOW Follow logs continuously (default: true, set false to dump and exit) # AWS_REGION (default: eu-west-2) if [ -z "${ENVIRONMENT:-}" ]; then @@ -49,7 +52,9 @@ fi REGION="${AWS_REGION:-eu-west-2}" LOG_FILTER="${LOG_FILTER:-}" +LOG_SINCE="${LOG_SINCE:-30m}" CLIENT_ID="${CLIENT_ID:-}" +FOLLOW="${FOLLOW:-true}" if ! aws sts get-caller-identity --profile "$AWS_PROFILE" >/dev/null 2>&1; then echo "No active AWS SSO session for profile '$AWS_PROFILE'. Running aws sso login..." @@ -140,49 +145,61 @@ log_filter_args() { fi } +follow_args() { + if [[ "$FOLLOW" == "true" ]]; then + printf '%s\n' --follow + fi +} + action_tail_transform() { local -a filter_args=() + local -a follow_arg=() mapfile -t filter_args < <(log_filter_args) + mapfile -t follow_arg < <(follow_args) print_section "Transform/Filter Lambda Logs" aws logs tail \ "/aws/lambda/${PREFIX}-client-transform-filter" \ --region "$REGION" \ --profile "$AWS_PROFILE" \ - --since 30m \ - --follow \ + --since "$LOG_SINCE" \ --format short \ + "${follow_arg[@]}" \ "${filter_args[@]}" } action_tail_https_client() { require_client_id local -a filter_args=() + local -a follow_arg=() mapfile -t filter_args < <(log_filter_args) + mapfile -t follow_arg < <(follow_args) print_section "HTTPS Client Lambda Logs" aws logs tail \ "/aws/lambda/${PREFIX}-https-client-${CLIENT_ID}" \ --region "$REGION" \ --profile "$AWS_PROFILE" \ - --since 30m \ - --follow \ + --since "$LOG_SINCE" \ --format short \ + "${follow_arg[@]}" \ "${filter_args[@]}" } action_tail_webhook() { local -a filter_args=() + local -a follow_arg=() mapfile -t filter_args < <(log_filter_args) + mapfile -t follow_arg < <(follow_args) print_section "Mock Webhook Lambda Logs" aws logs tail \ "/aws/lambda/${PREFIX}-mock-webhook" \ --region "$REGION" \ --profile "$AWS_PROFILE" \ - --since 30m \ - --follow \ + --since "$LOG_SINCE" \ --format short \ + "${follow_arg[@]}" \ "${filter_args[@]}" } @@ -208,14 +225,17 @@ action_tail_pipe() { pipe_log_group_name="${pipe_log_group_arn#*:log-group:}" + local -a follow_arg=() + mapfile -t follow_arg < <(follow_args) + print_section "EventBridge Pipe Logs" aws logs tail \ "$pipe_log_group_name" \ --region "$REGION" \ --profile "$AWS_PROFILE" \ - --since 30m \ - --follow \ + --since "$LOG_SINCE" \ --format short \ + "${follow_arg[@]}" \ "${filter_args[@]}" } From b3d8dad1bf1565467dac4a6087266d11f91ec07e Mon Sep 17 00:00:00 2001 From: Mike Wild Date: Tue, 28 Apr 2026 15:05:42 +0100 Subject: [PATCH 02/11] fixup! Rate limit/circuit breaker fixes and logging improvements --- .../src/__tests__/services/metrics.test.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lambdas/client-transform-filter-lambda/src/__tests__/services/metrics.test.ts b/lambdas/client-transform-filter-lambda/src/__tests__/services/metrics.test.ts index 405256e6..3e1a419e 100644 --- a/lambdas/client-transform-filter-lambda/src/__tests__/services/metrics.test.ts +++ b/lambdas/client-transform-filter-lambda/src/__tests__/services/metrics.test.ts @@ -1,4 +1,8 @@ -import { StorageResolution, Unit, createMetricsLogger } from "aws-embedded-metrics"; +import { + StorageResolution, + Unit, + createMetricsLogger, +} from "aws-embedded-metrics"; import { CallbackMetrics, createMetricLogger } from "services/metrics"; jest.mock("aws-embedded-metrics", () => ({ From 3c03527195d51437c83406b0237f5681f56edc49 Mon Sep 17 00:00:00 2001 From: Mike Wild Date: Tue, 28 Apr 2026 15:36:09 +0100 Subject: [PATCH 03/11] Revert "Fix issue when half open and all probes fail" This reverts commit 5d19fcca6531e3c7de3ffb4f8cd816db38f03ac1. --- .../src/__tests__/record-result-lua.test.ts | 23 +++---------------- .../src/services/record-result.lua | 11 --------- 2 files changed, 3 insertions(+), 31 deletions(-) diff --git a/lambdas/https-client-lambda/src/__tests__/record-result-lua.test.ts b/lambdas/https-client-lambda/src/__tests__/record-result-lua.test.ts index 1d1650d8..00e04707 100644 --- a/lambdas/https-client-lambda/src/__tests__/record-result-lua.test.ts +++ b/lambdas/https-client-lambda/src/__tests__/record-result-lua.test.ts @@ -319,11 +319,10 @@ describe("record-result.lua", () => { expect(Number(epHash.get("switched_at"))).toBe(now); }); - it("resets to fully open when half-open and all probes fail", () => { + it("does not close when half-open but all attempts failed", () => { const store = createRedisStore(); const now = 1_000_000; const switchedAt = now - 130_000; - const samplePeriodMs = 300_000; store.set( "ep:t1", @@ -331,34 +330,18 @@ describe("record-result.lua", () => { ["is_open", "1"], ["switched_at", switchedAt.toString()], ["sample_till", "9999999999"], - ["cur_attempts", "3"], - ["cur_failures", "3"], - ["prev_attempts", "2"], - ["prev_failures", "1"], ]), ); const [circuitState, stateChanged] = runRecordResult(store, { now, cooldownPeriodMs: 120_000, - samplePeriodMs, consumedTokens: 1, processingFailures: 1, }); - // Circuit reset to fully open; cooldown restarts - expect(circuitState).toBe("open"); - expect(stateChanged).toBe(1); - - const epHash = store.get("ep:t1")!; - expect(epHash.get("is_open")).toBe("1"); - expect(Number(epHash.get("switched_at"))).toBe(now); - // Sampling counters cleared so the next half-open window starts fresh - expect(epHash.get("cur_attempts")).toBe("0"); - expect(epHash.get("cur_failures")).toBe("0"); - expect(epHash.get("prev_attempts")).toBe("0"); - expect(epHash.get("prev_failures")).toBe("0"); - expect(Number(epHash.get("sample_till"))).toBe(now + samplePeriodMs); + expect(circuitState).toBe("half_open"); + expect(stateChanged).toBe(0); }); }); diff --git a/lambdas/https-client-lambda/src/services/record-result.lua b/lambdas/https-client-lambda/src/services/record-result.lua index fd916932..fa42efea 100644 --- a/lambdas/https-client-lambda/src/services/record-result.lua +++ b/lambdas/https-client-lambda/src/services/record-result.lua @@ -115,17 +115,6 @@ if isHalfOpen and processingSuccesses > 0 then -- fall through, allow circuit to immediately re-open end --- Reset to fully open when half-open probe fails entirely; restarts cooldown -if isHalfOpen and processingSuccesses == 0 then - switchedAt = now - curAttempts = 0 - curFailures = 0 - prevAttempts = 0 - prevFailures = 0 - sampleTill = now + samplePeriodMs - stateChanged = true -end - -- Open circuit when closed, enough samples, and threshold exceeded local hasSampledEnough = attempts >= minAttempts if not isOpen and hasSampledEnough and (failures / attempts) > failureThreshold then From 98caede5334b596a5462e6b4c1158163e3dd9024 Mon Sep 17 00:00:00 2001 From: Mike Wild Date: Tue, 28 Apr 2026 15:42:07 +0100 Subject: [PATCH 04/11] Warm up in the circruit breaker test to ensure circuit is closed --- tests/integration/delivery-resilience.test.ts | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/integration/delivery-resilience.test.ts b/tests/integration/delivery-resilience.test.ts index 8b218233..d3995b0e 100644 --- a/tests/integration/delivery-resilience.test.ts +++ b/tests/integration/delivery-resilience.test.ts @@ -200,9 +200,27 @@ describe("Delivery Resilience", () => { it("should open the circuit breaker after repeated failures and not affect other clients", async () => { const cbConfig = getClientConfig("clientCircuitBreaker"); + const cbTargetPath = buildMockWebhookTargetPath("clientCircuitBreaker"); const singleTargetConfig = getClientConfig("clientSingleTarget"); const singleTargetPath = buildMockWebhookTargetPath("clientSingleTarget"); + // Send a successful message first so the circuit is confirmed closed (it starts half-open) + const warmupEvent = createMessageStatusPublishEvent({ + data: { + clientId: cbConfig.clientId, + messageId: `cb-warmup-${crypto.randomUUID()}`, + }, + }); + await sendSqsEvent(ctx.sqs, ctx.inboundQueueUrl, warmupEvent); + const warmupCallback = await awaitCallback( + ctx.cwLogs, + ctx.webhookLogGroup, + warmupEvent.data.messageId, + "MessageStatus", + ctx.startTime, + ); + expect(warmupCallback.path).toBe(cbTargetPath); + const cbEvents = Array.from({ length: CB_BURST_SIZE }, () => createMessageStatusPublishEvent({ data: { From bbfc5d759e79f113b1bccb2fcfd549b19d012ffc Mon Sep 17 00:00:00 2001 From: Mike Wild Date: Tue, 28 Apr 2026 17:24:11 +0100 Subject: [PATCH 05/11] Circuit breaker disabled fixes --- .../src/__tests__/admit-lua.test.ts | 99 +++++++++++++++++++ .../src/services/admit.lua | 21 ++-- 2 files changed, 113 insertions(+), 7 deletions(-) diff --git a/lambdas/https-client-lambda/src/__tests__/admit-lua.test.ts b/lambdas/https-client-lambda/src/__tests__/admit-lua.test.ts index 43aa2fb6..c43c6f59 100644 --- a/lambdas/https-client-lambda/src/__tests__/admit-lua.test.ts +++ b/lambdas/https-client-lambda/src/__tests__/admit-lua.test.ts @@ -511,4 +511,103 @@ describe("admit.lua", () => { expect(store.has("ep:target-b")).toBe(true); }); }); + + describe("circuit breaker disabled (probeRateLimit = 0)", () => { + it("uses full targetRateLimit on a fresh endpoint with no prior state", () => { + const store = createRedisStore(); + const now = 1_000_000; + + const { effectiveRate } = runAdmit(store, { + now, + targetRateLimit: 10, + probeRateLimit: 0, + }); + + expect(effectiveRate).toBe(10); + }); + + it("allows tokens immediately on first contact", () => { + const store = createRedisStore(); + const now = 1_000_000; + + const { consumedTokens, effectiveRate, reason } = runAdmit(store, { + now, + targetRateLimit: 10, + probeRateLimit: 0, + }); + + expect(effectiveRate).toBe(10); + expect(consumedTokens).toBeGreaterThanOrEqual(1); + expect(reason).toBe("allowed"); + }); + + it("ignores is_open state when CB is disabled", () => { + const store = createRedisStore(); + const now = 1_000_000; + store.set( + "ep:t1", + new Map([ + ["is_open", "1"], + ["switched_at", now.toString()], + ["bucket_tokens", "5"], + ["bucket_refilled_at", now.toString()], + ]), + ); + + const { consumedTokens, effectiveRate, reason } = runAdmit(store, { + now, + targetRateLimit: 10, + probeRateLimit: 0, + }); + + expect(effectiveRate).toBe(10); + expect(consumedTokens).toBe(1); + expect(reason).toBe("allowed"); + }); + + it("does not zero bucket tokens when is_open and CB disabled", () => { + const store = createRedisStore(); + const now = 1_000_000; + store.set( + "ep:t1", + new Map([ + ["is_open", "1"], + ["switched_at", now.toString()], + ["bucket_tokens", "5"], + ["bucket_refilled_at", now.toString()], + ]), + ); + + const { consumedTokens } = runAdmit(store, { + now, + targetRateLimit: 10, + probeRateLimit: 0, + targetBatchSize: 3, + }); + + expect(consumedTokens).toBe(3); + }); + + it("never returns circuit_open when CB is disabled", () => { + const store = createRedisStore(); + const now = 1_000_000; + store.set( + "ep:t1", + new Map([ + ["is_open", "1"], + ["switched_at", (now - 10_000).toString()], + ["bucket_tokens", "0"], + ["bucket_refilled_at", now.toString()], + ]), + ); + + const { reason } = runAdmit(store, { + now, + cooldownMs: 120_000, + probeRateLimit: 0, + }); + + expect(reason).not.toBe("circuit_open"); + }); + }); }); diff --git a/lambdas/https-client-lambda/src/services/admit.lua b/lambdas/https-client-lambda/src/services/admit.lua index 53b6977c..767af270 100644 --- a/lambdas/https-client-lambda/src/services/admit.lua +++ b/lambdas/https-client-lambda/src/services/admit.lua @@ -28,25 +28,32 @@ local targetBatchSize = tonumber(ARGV[7]) or 0 -- LOAD STATE -------------------------------------------------------------------------------- +local cbEnabled = probeRateLimit > 0 + local state = redis.call("HMGET", epKey, "is_open", "switched_at", "bucket_tokens", "bucket_refilled_at") local isOpenRaw = state[1] local needInit = isOpenRaw == false or isOpenRaw == nil -local isOpen = needInit or tonumber(isOpenRaw) == 1 -local switchedAt = needInit and 0 or tonumber(state[2] or "0") +local isOpen = cbEnabled and (needInit or tonumber(isOpenRaw) == 1) +local switchedAt = (cbEnabled and needInit) and 0 or tonumber(state[2] or "0") local bucketTokens = tonumber(state[3] or "0") -local bucketRefilledAt = needInit and now or tonumber(state[4] or "0") +local bucketRefilledAt = (cbEnabled and needInit) and now or tonumber(state[4] or "0") -------------------------------------------------------------------------------- -- 1. CIRCUIT BREAKER — determine effective rate +-- +-- When probeRateLimit is 0 the circuit breaker is disabled; skip straight to +-- the token bucket at the full configured rate. -------------------------------------------------------------------------------- -local isHalfOpen = isOpen and now > switchedAt + cooldownMs -local isRecovering = (not isOpen) and now < switchedAt + recoveryPeriodMs +local isHalfOpen = cbEnabled and isOpen and now > switchedAt + cooldownMs +local isRecovering = cbEnabled and (not isOpen) and now < switchedAt + recoveryPeriodMs local effectiveRate -if isOpen then +if not cbEnabled then + effectiveRate = targetRateLimit +elseif isOpen then if isHalfOpen then effectiveRate = probeRateLimit else @@ -80,7 +87,7 @@ end -- generationTime, the leftover fractional time carries over to the next call. -------------------------------------------------------------------------------- -if isOpen then +if cbEnabled and isOpen then bucketTokens = 0 end From c6a36d5a95ea600425ad6938aef24df2aff47921 Mon Sep 17 00:00:00 2001 From: Mike Wild Date: Mon, 27 Apr 2026 17:23:01 +0100 Subject: [PATCH 06/11] Fix perf lambda DLQ purge --- lambdas/perf-runner-lambda/src/__tests__/purge.test.ts | 6 +++--- lambdas/perf-runner-lambda/src/purge.ts | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lambdas/perf-runner-lambda/src/__tests__/purge.test.ts b/lambdas/perf-runner-lambda/src/__tests__/purge.test.ts index 60347ef9..14bcf247 100644 --- a/lambdas/perf-runner-lambda/src/__tests__/purge.test.ts +++ b/lambdas/perf-runner-lambda/src/__tests__/purge.test.ts @@ -88,9 +88,9 @@ describe("purgeQueues", () => { expect(mockSend).toHaveBeenCalledTimes(2); }); - it("ignores NonExistentQueue errors gracefully", async () => { + it("ignores QueueDoesNotExist errors gracefully", async () => { const nonExistentError = Object.assign(new Error("Queue does not exist"), { - name: "AWS.SimpleQueueService.NonExistentQueue", + name: "QueueDoesNotExist", }); mockSend.mockRejectedValueOnce(nonExistentError); @@ -99,7 +99,7 @@ describe("purgeQueues", () => { ).resolves.toBeUndefined(); }); - it("rethrows non-NonExistentQueue errors", async () => { + it("rethrows non-QueueDoesNotExist errors", async () => { const otherError = new Error("Access denied"); mockSend.mockRejectedValueOnce(otherError); diff --git a/lambdas/perf-runner-lambda/src/purge.ts b/lambdas/perf-runner-lambda/src/purge.ts index 5743e9d2..e363e706 100644 --- a/lambdas/perf-runner-lambda/src/purge.ts +++ b/lambdas/perf-runner-lambda/src/purge.ts @@ -32,7 +32,7 @@ export async function purgeQueues( for (const result of results) { if (result.status === "rejected") { const error = result.reason as { name?: string }; - if (error.name !== "AWS.SimpleQueueService.NonExistentQueue") { + if (error.name !== "QueueDoesNotExist") { throw result.reason as Error; } } From 4284be552242b0c50777cede20a7491466758c19 Mon Sep 17 00:00:00 2001 From: Mike Wild Date: Wed, 29 Apr 2026 10:49:18 +0100 Subject: [PATCH 07/11] Fix 0 tokens issue on initial state and re-work cbEnabled=false fix --- .../src/__tests__/admit-lua.test.ts | 43 +++++++++++---- .../src/__tests__/endpoint-gate.test.ts | 6 +-- .../src/services/admit.lua | 52 +++++++++++-------- .../src/services/endpoint-gate.ts | 4 +- 4 files changed, 66 insertions(+), 39 deletions(-) diff --git a/lambdas/https-client-lambda/src/__tests__/admit-lua.test.ts b/lambdas/https-client-lambda/src/__tests__/admit-lua.test.ts index c43c6f59..0919222e 100644 --- a/lambdas/https-client-lambda/src/__tests__/admit-lua.test.ts +++ b/lambdas/https-client-lambda/src/__tests__/admit-lua.test.ts @@ -1,7 +1,7 @@ import admitLuaSrc from "services/admit.lua"; import { createRedisStore, evalLua } from "__tests__/helpers/lua-redis-mock"; -// ARGV: [now, capacity, targetRateLimit, cooldownMs, recoveryPeriodMs, probeRateLimit, targetBatchSize] +// ARGV: [now, capacity, targetRateLimit, cooldownMs, recoveryPeriodMs, probeRateLimit, targetBatchSize, cbEnabled] // KEYS: [epKey] // Returns: [consumedTokens, reason, retryAfterMs, effectiveRate] @@ -13,6 +13,7 @@ type AdmitArgs = { recoveryPeriodMs: number; probeRateLimit: number; targetBatchSize: number; + cbEnabled: boolean; }; const defaultArgs: AdmitArgs = { @@ -23,6 +24,7 @@ const defaultArgs: AdmitArgs = { recoveryPeriodMs: 600_000, probeRateLimit: 1 / 60, targetBatchSize: 1, + cbEnabled: true, }; type AdmitResult = { @@ -49,6 +51,7 @@ function runAdmit( merged.recoveryPeriodMs.toString(), merged.probeRateLimit.toString(), merged.targetBatchSize.toString(), + merged.cbEnabled ? "1" : "0", ], store, ) as [number, string, number, number]; @@ -71,8 +74,8 @@ describe("admit.lua", () => { targetRateLimit: 10, }); - expect(consumedTokens).toBe(0); - expect(reason).toBe("rate_limited"); + expect(consumedTokens).toBe(1); + expect(reason).toBe("allowed"); expect(effectiveRate).toBeCloseTo(1 / 60, 5); }); @@ -512,7 +515,7 @@ describe("admit.lua", () => { }); }); - describe("circuit breaker disabled (probeRateLimit = 0)", () => { + describe("circuit breaker disabled (cbEnabled = false)", () => { it("uses full targetRateLimit on a fresh endpoint with no prior state", () => { const store = createRedisStore(); const now = 1_000_000; @@ -520,24 +523,42 @@ describe("admit.lua", () => { const { effectiveRate } = runAdmit(store, { now, targetRateLimit: 10, - probeRateLimit: 0, + cbEnabled: false, }); expect(effectiveRate).toBe(10); }); - it("allows tokens immediately on first contact", () => { + it("applies initial values on fresh endpoint so first call has no tokens", () => { const store = createRedisStore(); const now = 1_000_000; const { consumedTokens, effectiveRate, reason } = runAdmit(store, { now, targetRateLimit: 10, - probeRateLimit: 0, + cbEnabled: false, }); expect(effectiveRate).toBe(10); - expect(consumedTokens).toBeGreaterThanOrEqual(1); + expect(consumedTokens).toBe(0); + expect(reason).toBe("rate_limited"); + }); + + it("generates tokens at full rate after initial contact", () => { + const store = createRedisStore(); + + runAdmit(store, { + now: 1_000_000, + targetRateLimit: 10, + cbEnabled: false, + }); + const { consumedTokens, reason } = runAdmit(store, { + now: 1_000_100, + targetRateLimit: 10, + cbEnabled: false, + }); + + expect(consumedTokens).toBe(1); expect(reason).toBe("allowed"); }); @@ -557,7 +578,7 @@ describe("admit.lua", () => { const { consumedTokens, effectiveRate, reason } = runAdmit(store, { now, targetRateLimit: 10, - probeRateLimit: 0, + cbEnabled: false, }); expect(effectiveRate).toBe(10); @@ -581,7 +602,7 @@ describe("admit.lua", () => { const { consumedTokens } = runAdmit(store, { now, targetRateLimit: 10, - probeRateLimit: 0, + cbEnabled: false, targetBatchSize: 3, }); @@ -604,7 +625,7 @@ describe("admit.lua", () => { const { reason } = runAdmit(store, { now, cooldownMs: 120_000, - probeRateLimit: 0, + cbEnabled: false, }); expect(reason).not.toBe("circuit_open"); diff --git a/lambdas/https-client-lambda/src/__tests__/endpoint-gate.test.ts b/lambdas/https-client-lambda/src/__tests__/endpoint-gate.test.ts index eea9d44d..0ef17857 100644 --- a/lambdas/https-client-lambda/src/__tests__/endpoint-gate.test.ts +++ b/lambdas/https-client-lambda/src/__tests__/endpoint-gate.test.ts @@ -126,14 +126,14 @@ describe("admit", () => { ); }); - it("passes probeRateLimit=0 when circuit breaker is disabled", async () => { + it("passes cbEnabled=0 when circuit breaker is disabled", async () => { mockSendCommand.mockResolvedValueOnce([1, "allowed", 0, 10]); await admit(mockRedis, "target-1", 10, false, 1, defaultConfig); const args = mockSendCommand.mock.calls[0]![0] as string[]; - const probeRateArg = args[9]; - expect(probeRateArg).toBe("0"); + const cbEnabledArg = args[11]; + expect(cbEnabledArg).toBe("0"); }); it("passes single epKey", async () => { diff --git a/lambdas/https-client-lambda/src/services/admit.lua b/lambdas/https-client-lambda/src/services/admit.lua index 767af270..0a9526aa 100644 --- a/lambdas/https-client-lambda/src/services/admit.lua +++ b/lambdas/https-client-lambda/src/services/admit.lua @@ -13,47 +13,50 @@ -- Returns: { consumedTokens, reason, retryAfterMs, effectiveRate } -- Keys -local epKey = KEYS[1] -- ep:{targetId} combined endpoint state hash +local epKey = KEYS[1] -- ep:{targetId} combined endpoint state hash -- Arguments -local now = tonumber(ARGV[1]) or 0 -local capacity = tonumber(ARGV[2]) or 0 -local targetRateLimit = tonumber(ARGV[3]) or 0 -local cooldownMs = tonumber(ARGV[4]) or 0 -local recoveryPeriodMs = tonumber(ARGV[5]) or 0 -local probeRateLimit = tonumber(ARGV[6]) or 0 -local targetBatchSize = tonumber(ARGV[7]) or 0 +local now = tonumber(ARGV[1]) or 0 +local capacity = tonumber(ARGV[2]) or 0 +local targetRateLimit = tonumber(ARGV[3]) or 0 +local cooldownMs = tonumber(ARGV[4]) or 0 +local recoveryPeriodMs = tonumber(ARGV[5]) or 0 +local probeRateLimit = tonumber(ARGV[6]) or 0 +local targetBatchSize = tonumber(ARGV[7]) or 0 +local cbEnabled = tonumber(ARGV[8]) == 1 -------------------------------------------------------------------------------- -- LOAD STATE -------------------------------------------------------------------------------- -local cbEnabled = probeRateLimit > 0 - local state = redis.call("HMGET", epKey, "is_open", "switched_at", "bucket_tokens", "bucket_refilled_at") local isOpenRaw = state[1] local needInit = isOpenRaw == false or isOpenRaw == nil -local isOpen = cbEnabled and (needInit or tonumber(isOpenRaw) == 1) -local switchedAt = (cbEnabled and needInit) and 0 or tonumber(state[2] or "0") +local isOpen = needInit or tonumber(isOpenRaw) == 1 +local switchedAt = needInit and 0 or tonumber(state[2] or "0") local bucketTokens = tonumber(state[3] or "0") -local bucketRefilledAt = (cbEnabled and needInit) and now or tonumber(state[4] or "0") +-- checked independently of needInit because admit writes bucket_refilled_at +-- before record-result writes is_open, so needInit can be stale here +local bucketRefilledAtRaw = state[4] +local bucketRefilledAt = (bucketRefilledAtRaw == false or bucketRefilledAtRaw == nil) + and now or tonumber(bucketRefilledAtRaw) + +if not cbEnabled then + isOpen = false + switchedAt = 0 +end -------------------------------------------------------------------------------- -- 1. CIRCUIT BREAKER — determine effective rate --- --- When probeRateLimit is 0 the circuit breaker is disabled; skip straight to --- the token bucket at the full configured rate. -------------------------------------------------------------------------------- -local isHalfOpen = cbEnabled and isOpen and now > switchedAt + cooldownMs -local isRecovering = cbEnabled and (not isOpen) and now < switchedAt + recoveryPeriodMs +local isHalfOpen = isOpen and now > switchedAt + cooldownMs +local isRecovering = (not isOpen) and now < switchedAt + recoveryPeriodMs local effectiveRate -if not cbEnabled then - effectiveRate = targetRateLimit -elseif isOpen then +if isOpen then if isHalfOpen then effectiveRate = probeRateLimit else @@ -87,15 +90,18 @@ end -- generationTime, the leftover fractional time carries over to the next call. -------------------------------------------------------------------------------- -if cbEnabled and isOpen then +if isOpen then bucketTokens = 0 + if needInit and isHalfOpen then + bucketTokens = 1 + end end local generatedTokens = math.floor((now - bucketRefilledAt) * effectiveRate / 1000) local availTokens = math.min(capacity, bucketTokens + generatedTokens) local consumedTokens = math.min(targetBatchSize, availTokens) -bucketTokens = availTokens - consumedTokens +bucketTokens = availTokens - consumedTokens if generatedTokens > 0 and effectiveRate > 0 then local generationTime = generatedTokens * 1000 / effectiveRate bucketRefilledAt = bucketRefilledAt + generationTime diff --git a/lambdas/https-client-lambda/src/services/endpoint-gate.ts b/lambdas/https-client-lambda/src/services/endpoint-gate.ts index 8870e5d8..1047cb06 100644 --- a/lambdas/https-client-lambda/src/services/endpoint-gate.ts +++ b/lambdas/https-client-lambda/src/services/endpoint-gate.ts @@ -84,7 +84,6 @@ export async function admit( ): Promise { const epKey = `ep:{${targetId}}`; const now = Date.now().toString(); - const probeRate = cbEnabled ? config.probeRateLimit.toString() : "0"; const args = [ now, @@ -92,8 +91,9 @@ export async function admit( String(refillPerSec), config.cooldownPeriodMs.toString(), config.recoveryPeriodMs.toString(), - probeRate, + config.probeRateLimit.toString(), String(targetBatchSize), + cbEnabled ? "1" : "0", ]; if (!admitSha) { From 6b55b09a51ec43679521f966e007ea697159b31a Mon Sep 17 00:00:00 2001 From: Mike Wild Date: Wed, 29 Apr 2026 12:13:54 +0100 Subject: [PATCH 08/11] Add flush and debug ability to perf runner lambda --- .../src/__tests__/elasticache.test.ts | 93 ++++++++++++++++++- .../src/__tests__/runner.test.ts | 57 +++++++++++- lambdas/perf-runner-lambda/src/elasticache.ts | 60 +++++++++++- lambdas/perf-runner-lambda/src/runner.ts | 15 ++- lambdas/perf-runner-lambda/src/types.ts | 15 +++ 5 files changed, 236 insertions(+), 4 deletions(-) diff --git a/lambdas/perf-runner-lambda/src/__tests__/elasticache.test.ts b/lambdas/perf-runner-lambda/src/__tests__/elasticache.test.ts index 09846ed3..54c8e813 100644 --- a/lambdas/perf-runner-lambda/src/__tests__/elasticache.test.ts +++ b/lambdas/perf-runner-lambda/src/__tests__/elasticache.test.ts @@ -1,19 +1,25 @@ -import { flushElastiCache } from "elasticache"; +import { dumpRateLimitState, flushElastiCache } from "elasticache"; import type { ElastiCacheDeps } from "types"; const mockConnect = jest.fn().mockResolvedValue(undefined); const mockFlushAll = jest.fn().mockResolvedValue("OK"); const mockDisconnect = jest.fn().mockResolvedValue(undefined); +const mockHmGet = jest.fn().mockResolvedValue([]); let mockIsOpen = true; +let mockScanKeys: string[] = []; jest.mock("@redis/client", () => ({ createClient: jest.fn(() => ({ connect: mockConnect, flushAll: mockFlushAll, disconnect: mockDisconnect, + hmGet: mockHmGet, get isOpen() { return mockIsOpen; }, + scanIterator: jest.fn(function scanIterator() { + return mockScanKeys[Symbol.iterator](); + }), })), })); @@ -46,6 +52,7 @@ const deps: ElastiCacheDeps = { beforeEach(() => { jest.clearAllMocks(); mockIsOpen = true; + mockScanKeys = []; }); describe("flushElastiCache", () => { @@ -72,3 +79,87 @@ describe("flushElastiCache", () => { expect(mockDisconnect).not.toHaveBeenCalled(); }); }); + +describe("dumpRateLimitState", () => { + it("returns empty array when no ep: keys exist", async () => { + mockScanKeys = []; + + const result = await dumpRateLimitState(deps); + + expect(result).toEqual([]); + expect(mockConnect).toHaveBeenCalledTimes(1); + expect(mockDisconnect).toHaveBeenCalledTimes(1); + }); + + it("returns state for each ep: key sorted alphabetically", async () => { + mockScanKeys = ["ep:{target-b}", "ep:{target-a}"]; + mockHmGet + .mockResolvedValueOnce([ + "1", + "500", + "0", + "400", + "20", + "15", + "5", + "3", + "1500", + ]) + .mockResolvedValueOnce([ + "0", + "1000", + "5", + "900", + "10", + "8", + "2", + "1", + "2000", + ]); + + const result = await dumpRateLimitState(deps); + + expect(result).toHaveLength(2); + expect(result[0].key).toBe("ep:{target-a}"); + expect(result[0].isOpen).toBe("1"); + expect(result[0].switchedAt).toBe("500"); + expect(result[0].bucketTokens).toBe("0"); + expect(result[0].bucketRefilledAt).toBe("400"); + expect(result[0].curAttempts).toBe("20"); + expect(result[0].prevAttempts).toBe("15"); + expect(result[0].curFailures).toBe("5"); + expect(result[0].prevFailures).toBe("3"); + expect(result[0].sampleTill).toBe("1500"); + + expect(result[1].key).toBe("ep:{target-b}"); + expect(result[1].isOpen).toBe("0"); + }); + + it("disconnects even when scan throws", async () => { + const mockClient = { + connect: mockConnect, + disconnect: mockDisconnect, + hmGet: mockHmGet, + get isOpen() { + return mockIsOpen; + }, + scanIterator: jest.fn(() => { + throw new Error("scan failed"); + }), + }; + const { createClient } = jest.requireMock("@redis/client"); + createClient.mockReturnValueOnce(mockClient); + + await expect(dumpRateLimitState(deps)).rejects.toThrow("scan failed"); + expect(mockDisconnect).toHaveBeenCalledTimes(1); + }); + + it("skips disconnect when client is not open", async () => { + mockIsOpen = false; + mockScanKeys = []; + + await dumpRateLimitState(deps); + + expect(mockDisconnect).not.toHaveBeenCalled(); + }); +}); diff --git a/lambdas/perf-runner-lambda/src/__tests__/runner.test.ts b/lambdas/perf-runner-lambda/src/__tests__/runner.test.ts index 46a0928d..622e98a4 100644 --- a/lambdas/perf-runner-lambda/src/__tests__/runner.test.ts +++ b/lambdas/perf-runner-lambda/src/__tests__/runner.test.ts @@ -12,7 +12,7 @@ import { defaultSleep, runPerformanceTest } from "runner"; import { generatePhaseLoad } from "sqs"; import { deriveQueueUrls, purgeQueues } from "purge"; -import { flushElastiCache } from "elasticache"; +import { dumpRateLimitState, flushElastiCache } from "elasticache"; import { verifyMockWebhook } from "webhook-verify"; import { queryCircuitBreakerSnapshot, @@ -39,6 +39,7 @@ const mockQueryPerClientRateTimeline = jest.mocked(queryPerClientRateTimeline); const mockDeriveQueueUrls = jest.mocked(deriveQueueUrls); const mockPurgeQueues = jest.mocked(purgeQueues); const mockFlushElastiCache = jest.mocked(flushElastiCache); +const mockDumpRateLimitState = jest.mocked(dumpRateLimitState); const mockVerifyMockWebhook = jest.mocked(verifyMockWebhook); const immediateSleep = jest.fn().mockResolvedValue(undefined); @@ -112,6 +113,7 @@ beforeEach(() => { ]); mockPurgeQueues.mockResolvedValue(undefined); mockFlushElastiCache.mockResolvedValue(undefined); + mockDumpRateLimitState.mockResolvedValue([]); mockVerifyMockWebhook.mockResolvedValue({ receivedCallbacks: 0, verified: false, @@ -576,6 +578,59 @@ describe("runPerformanceTest", () => { expect(mockFlushElastiCache).toHaveBeenCalledWith(elastiCacheDeps); }); + it("dumps rate limit state before and after when elasticache deps are provided", async () => { + mockQueryMetricsSnapshot.mockResolvedValue(null); + const elastiCacheDeps = { + endpoint: "cache.example.invalid", + cacheName: "test-cache", + iamUsername: "test-user", + region: "eu-west-2", + }; + const mockState = [ + { + key: "ep:{target-1}", + isOpen: "0", + switchedAt: "0", + bucketTokens: "10", + bucketRefilledAt: "1000", + curAttempts: "5", + prevAttempts: "3", + curFailures: "0", + prevFailures: "0", + sampleTill: "2000", + }, + ]; + mockDumpRateLimitState.mockResolvedValue(mockState); + + const result = await runPerformanceTest( + deps, + scenario, + "test-dump", + immediateSleep, + elastiCacheDeps, + ); + + expect(mockDumpRateLimitState).toHaveBeenCalledTimes(2); + expect(mockDumpRateLimitState).toHaveBeenCalledWith(elastiCacheDeps); + expect(result.rateLimitStateBefore).toEqual(mockState); + expect(result.rateLimitStateAfter).toEqual(mockState); + }); + + it("omits rate limit state when elasticache deps are not provided", async () => { + mockQueryMetricsSnapshot.mockResolvedValue(null); + + const result = await runPerformanceTest( + deps, + scenario, + "test-no-dump", + immediateSleep, + ); + + expect(mockDumpRateLimitState).not.toHaveBeenCalled(); + expect(result.rateLimitStateBefore).toBeUndefined(); + expect(result.rateLimitStateAfter).toBeUndefined(); + }); + it("skips ElastiCache flush when deps are not provided", async () => { mockQueryMetricsSnapshot.mockResolvedValue(null); diff --git a/lambdas/perf-runner-lambda/src/elasticache.ts b/lambdas/perf-runner-lambda/src/elasticache.ts index 8d0b86c6..8f41ad9e 100644 --- a/lambdas/perf-runner-lambda/src/elasticache.ts +++ b/lambdas/perf-runner-lambda/src/elasticache.ts @@ -2,7 +2,7 @@ import { type RedisClientType, createClient } from "@redis/client"; import { SignatureV4 } from "@smithy/signature-v4"; import { Sha256 } from "@aws-crypto/sha256-js"; import { fromNodeProviderChain } from "@aws-sdk/credential-providers"; -import type { ElastiCacheDeps } from "types"; +import type { ElastiCacheDeps, EndpointRateLimitState } from "types"; const TOKEN_EXPIRY_SECONDS = 900; @@ -50,3 +50,61 @@ export async function flushElastiCache(deps: ElastiCacheDeps): Promise { } } } + +const RATE_LIMIT_HASH_FIELDS = [ + "is_open", + "switched_at", + "bucket_tokens", + "bucket_refilled_at", + "cur_attempts", + "prev_attempts", + "cur_failures", + "prev_failures", + "sample_till", +] as const; + +export async function dumpRateLimitState( + deps: ElastiCacheDeps, +): Promise { + const token = await generateIamToken(deps); + + const client: RedisClientType = createClient({ + url: `rediss://${deps.endpoint}:6379`, + username: deps.iamUsername, + password: token, + }); + + try { + await client.connect(); + + const keys: string[] = []; + for await (const key of client.scanIterator({ MATCH: "ep:*" })) { + keys.push(key); + } + + // eslint-disable-next-line sonarjs/null-dereference -- false positive: keys is string[] + keys.sort((a, b) => a.localeCompare(b)); + const states: EndpointRateLimitState[] = []; + for (const key of keys) { + const values = await client.hmGet(key, [...RATE_LIMIT_HASH_FIELDS]); + states.push({ + key, + isOpen: values[0], + switchedAt: values[1], + bucketTokens: values[2], + bucketRefilledAt: values[3], + curAttempts: values[4], + prevAttempts: values[5], + curFailures: values[6], + prevFailures: values[7], + sampleTill: values[8], + }); + } + + return states; + } finally { + if (client.isOpen) { + await client.disconnect(); + } + } +} diff --git a/lambdas/perf-runner-lambda/src/runner.ts b/lambdas/perf-runner-lambda/src/runner.ts index 321abc45..7a5b5ee6 100644 --- a/lambdas/perf-runner-lambda/src/runner.ts +++ b/lambdas/perf-runner-lambda/src/runner.ts @@ -2,6 +2,7 @@ import type { CircuitBreakerSnapshot, DeliveryMetricsSnapshot, ElastiCacheDeps, + EndpointRateLimitState, MetricsSnapshot, PerClientRateTimeline, PerformanceResult, @@ -12,7 +13,7 @@ import type { } from "types"; import { generatePhaseLoad } from "sqs"; import { deriveQueueUrls, purgeQueues } from "purge"; -import { flushElastiCache } from "elasticache"; +import { dumpRateLimitState, flushElastiCache } from "elasticache"; import { verifyMockWebhook } from "webhook-verify"; import { queryCircuitBreakerSnapshot, @@ -113,6 +114,11 @@ export async function runPerformanceTest( await flushElastiCache(elastiCacheDeps); } + let rateLimitStateBefore: EndpointRateLimitState[] | undefined; + if (elastiCacheDeps) { + rateLimitStateBefore = await dumpRateLimitState(elastiCacheDeps); + } + const startedAt = new Date(testStartMs).toISOString(); const phaseResults: PhaseResult[] = []; const snapshots: MetricsSnapshot[] = []; @@ -208,6 +214,11 @@ export async function runPerformanceTest( ); } + let rateLimitStateAfter: EndpointRateLimitState[] | undefined; + if (elastiCacheDeps) { + rateLimitStateAfter = await dumpRateLimitState(elastiCacheDeps); + } + await purgeQueues(deps.sqsClient, queueUrls); if (elastiCacheDeps) { await flushElastiCache(elastiCacheDeps); @@ -224,5 +235,7 @@ export async function runPerformanceTest( circuitBreakerMetrics: cbSnapshots, perClientRateTimelines, webhookVerification, + rateLimitStateBefore, + rateLimitStateAfter, }; } diff --git a/lambdas/perf-runner-lambda/src/types.ts b/lambdas/perf-runner-lambda/src/types.ts index 24df2a50..4415ef63 100644 --- a/lambdas/perf-runner-lambda/src/types.ts +++ b/lambdas/perf-runner-lambda/src/types.ts @@ -79,6 +79,19 @@ export type PerClientRateTimeline = { entries: PerClientRateEntry[]; }; +export type EndpointRateLimitState = { + key: string; + isOpen: string | null; + switchedAt: string | null; + bucketTokens: string | null; + bucketRefilledAt: string | null; + curAttempts: string | null; + prevAttempts: string | null; + curFailures: string | null; + prevFailures: string | null; + sampleTill: string | null; +}; + export type WebhookVerificationResult = { receivedCallbacks: number; verified: boolean; @@ -95,6 +108,8 @@ export type PerformanceResult = { circuitBreakerMetrics: CircuitBreakerSnapshot[]; perClientRateTimelines?: PerClientRateTimeline[]; webhookVerification?: WebhookVerificationResult; + rateLimitStateBefore?: EndpointRateLimitState[]; + rateLimitStateAfter?: EndpointRateLimitState[]; }; export type PerfRunnerPayload = { From d0aabe3d9db82101982f21a9af85efde4b8da48f Mon Sep 17 00:00:00 2001 From: Mike Wild Date: Wed, 29 Apr 2026 12:59:12 +0100 Subject: [PATCH 09/11] Up the burst in rate limit test --- tests/integration/delivery-resilience.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/delivery-resilience.test.ts b/tests/integration/delivery-resilience.test.ts index d3995b0e..e3b2218b 100644 --- a/tests/integration/delivery-resilience.test.ts +++ b/tests/integration/delivery-resilience.test.ts @@ -108,7 +108,7 @@ describe("Delivery Resilience", () => { }); describe("Rate Limiting", () => { - const BURST_SIZE = 15; + const BURST_SIZE = 30; let dlqUrl: string; let deliveryUrl: string; let httpsClientLogGroup: string; From 6acda60686690e8097ddf2633f1a9d315dbacdce Mon Sep 17 00:00:00 2001 From: Mike Wild Date: Wed, 29 Apr 2026 14:24:38 +0100 Subject: [PATCH 10/11] PR feedback --- .../src/__tests__/admit-lua.test.ts | 19 ++++++++- .../src/services/admit.lua | 40 ++++++++----------- 2 files changed, 34 insertions(+), 25 deletions(-) diff --git a/lambdas/https-client-lambda/src/__tests__/admit-lua.test.ts b/lambdas/https-client-lambda/src/__tests__/admit-lua.test.ts index 0919222e..6155c065 100644 --- a/lambdas/https-client-lambda/src/__tests__/admit-lua.test.ts +++ b/lambdas/https-client-lambda/src/__tests__/admit-lua.test.ts @@ -65,7 +65,7 @@ function runAdmit( describe("admit.lua", () => { describe("rate limiting", () => { - it("enters half-open probe on a fresh endpoint with no prior state", () => { + it("rate-limits on a fresh endpoint with no prior state", () => { const store = createRedisStore(); const now = 1_000_000; @@ -74,9 +74,24 @@ describe("admit.lua", () => { targetRateLimit: 10, }); + expect(consumedTokens).toBe(0); + expect(reason).toBe("rate_limited"); + expect(effectiveRate).toBeCloseTo(1 / 60, 5); + }); + + it("generates a probe token on the second call after enough elapsed time", () => { + const store = createRedisStore(); + + runAdmit(store, { now: 1_000_000, targetRateLimit: 10 }); + + const { consumedTokens, effectiveRate, reason } = runAdmit(store, { + now: 1_060_001, + targetRateLimit: 10, + }); + + expect(effectiveRate).toBeCloseTo(1 / 60, 5); expect(consumedTokens).toBe(1); expect(reason).toBe("allowed"); - expect(effectiveRate).toBeCloseTo(1 / 60, 5); }); it("does not persist circuit state on first contact", () => { diff --git a/lambdas/https-client-lambda/src/services/admit.lua b/lambdas/https-client-lambda/src/services/admit.lua index 0a9526aa..40b6a5f1 100644 --- a/lambdas/https-client-lambda/src/services/admit.lua +++ b/lambdas/https-client-lambda/src/services/admit.lua @@ -13,34 +13,31 @@ -- Returns: { consumedTokens, reason, retryAfterMs, effectiveRate } -- Keys -local epKey = KEYS[1] -- ep:{targetId} combined endpoint state hash +local epKey = KEYS[1] -- ep:{targetId} combined endpoint state hash -- Arguments -local now = tonumber(ARGV[1]) or 0 -local capacity = tonumber(ARGV[2]) or 0 -local targetRateLimit = tonumber(ARGV[3]) or 0 -local cooldownMs = tonumber(ARGV[4]) or 0 -local recoveryPeriodMs = tonumber(ARGV[5]) or 0 -local probeRateLimit = tonumber(ARGV[6]) or 0 -local targetBatchSize = tonumber(ARGV[7]) or 0 -local cbEnabled = tonumber(ARGV[8]) == 1 +local now = tonumber(ARGV[1]) or 0 +local capacity = tonumber(ARGV[2]) or 0 +local targetRateLimit = tonumber(ARGV[3]) or 0 +local cooldownMs = tonumber(ARGV[4]) or 0 +local recoveryPeriodMs = tonumber(ARGV[5]) or 0 +local probeRateLimit = tonumber(ARGV[6]) or 0 +local targetBatchSize = tonumber(ARGV[7]) or 0 +local cbEnabled = tonumber(ARGV[8]) == 1 -------------------------------------------------------------------------------- -- LOAD STATE -------------------------------------------------------------------------------- -local state = redis.call("HMGET", epKey, +local state = redis.call("HMGET", epKey, "is_open", "switched_at", "bucket_tokens", "bucket_refilled_at") -local isOpenRaw = state[1] -local needInit = isOpenRaw == false or isOpenRaw == nil -local isOpen = needInit or tonumber(isOpenRaw) == 1 -local switchedAt = needInit and 0 or tonumber(state[2] or "0") -local bucketTokens = tonumber(state[3] or "0") --- checked independently of needInit because admit writes bucket_refilled_at --- before record-result writes is_open, so needInit can be stale here -local bucketRefilledAtRaw = state[4] -local bucketRefilledAt = (bucketRefilledAtRaw == false or bucketRefilledAtRaw == nil) - and now or tonumber(bucketRefilledAtRaw) +local isOpenRaw = state[1] +local cbNeedInit = isOpenRaw == false or isOpenRaw == nil +local rlNeedInit = state[4] == false or state[4] == nil +local isOpen = cbNeedInit or tonumber(isOpenRaw) == 1 +local switchedAt = cbNeedInit and 0 or tonumber(state[2] or "0") +local bucketTokens = tonumber(state[3] or "0") +local bucketRefilledAt = rlNeedInit and now or tonumber(state[4]) if not cbEnabled then isOpen = false @@ -92,9 +89,6 @@ end if isOpen then bucketTokens = 0 - if needInit and isHalfOpen then - bucketTokens = 1 - end end local generatedTokens = math.floor((now - bucketRefilledAt) * effectiveRate / 1000) From 44abc33e5d3eda0d747858b37dca14df1d5b8cfe Mon Sep 17 00:00:00 2001 From: Tim Marston Date: Wed, 29 Apr 2026 14:57:01 +0100 Subject: [PATCH 11/11] renamed admin.ua success reason, refactored cbNeedIni --- .../src/__tests__/admit-lua.test.ts | 10 +++++----- .../src/__tests__/endpoint-gate.test.ts | 10 +++++----- lambdas/https-client-lambda/src/services/admit.lua | 7 +++---- .../https-client-lambda/src/services/endpoint-gate.ts | 2 +- 4 files changed, 14 insertions(+), 15 deletions(-) diff --git a/lambdas/https-client-lambda/src/__tests__/admit-lua.test.ts b/lambdas/https-client-lambda/src/__tests__/admit-lua.test.ts index 6155c065..7553a1b1 100644 --- a/lambdas/https-client-lambda/src/__tests__/admit-lua.test.ts +++ b/lambdas/https-client-lambda/src/__tests__/admit-lua.test.ts @@ -91,7 +91,7 @@ describe("admit.lua", () => { expect(effectiveRate).toBeCloseTo(1 / 60, 5); expect(consumedTokens).toBe(1); - expect(reason).toBe("allowed"); + expect(reason).toBe("some_allowed"); }); it("does not persist circuit state on first contact", () => { @@ -127,7 +127,7 @@ describe("admit.lua", () => { }); expect(consumedTokens).toBeGreaterThanOrEqual(1); - expect(reason).toBe("allowed"); + expect(reason).toBe("some_allowed"); }); it("allows a single request when bucket has tokens from refill", () => { @@ -149,7 +149,7 @@ describe("admit.lua", () => { }); expect(consumedTokens).toBe(1); - expect(reason).toBe("allowed"); + expect(reason).toBe("some_allowed"); expect(retryAfterMs).toBe(0); }); @@ -574,7 +574,7 @@ describe("admit.lua", () => { }); expect(consumedTokens).toBe(1); - expect(reason).toBe("allowed"); + expect(reason).toBe("some_allowed"); }); it("ignores is_open state when CB is disabled", () => { @@ -598,7 +598,7 @@ describe("admit.lua", () => { expect(effectiveRate).toBe(10); expect(consumedTokens).toBe(1); - expect(reason).toBe("allowed"); + expect(reason).toBe("some_allowed"); }); it("does not zero bucket tokens when is_open and CB disabled", () => { diff --git a/lambdas/https-client-lambda/src/__tests__/endpoint-gate.test.ts b/lambdas/https-client-lambda/src/__tests__/endpoint-gate.test.ts index 0ef17857..57573a3d 100644 --- a/lambdas/https-client-lambda/src/__tests__/endpoint-gate.test.ts +++ b/lambdas/https-client-lambda/src/__tests__/endpoint-gate.test.ts @@ -35,7 +35,7 @@ beforeEach(() => { describe("admit", () => { it("returns allowed with consumedTokens when tokens available", async () => { - mockSendCommand.mockResolvedValueOnce([5, "allowed", 0, 10]); + mockSendCommand.mockResolvedValueOnce([5, "some_allowed", 0, 10]); const result = await admit( mockRedis, @@ -99,7 +99,7 @@ describe("admit", () => { it("falls back to EVAL on NOSCRIPT error", async () => { mockSendCommand .mockRejectedValueOnce(new Error("NOSCRIPT No matching script")) - .mockResolvedValueOnce([1, "allowed", 0, 10]); + .mockResolvedValueOnce([1, "some_allowed", 0, 10]); const result = await admit( mockRedis, @@ -127,7 +127,7 @@ describe("admit", () => { }); it("passes cbEnabled=0 when circuit breaker is disabled", async () => { - mockSendCommand.mockResolvedValueOnce([1, "allowed", 0, 10]); + mockSendCommand.mockResolvedValueOnce([1, "some_allowed", 0, 10]); await admit(mockRedis, "target-1", 10, false, 1, defaultConfig); @@ -137,7 +137,7 @@ describe("admit", () => { }); it("passes single epKey", async () => { - mockSendCommand.mockResolvedValueOnce([1, "allowed", 0, 5]); + mockSendCommand.mockResolvedValueOnce([1, "some_allowed", 0, 5]); await admit(mockRedis, "my-target", 5, true, 1, defaultConfig); @@ -146,7 +146,7 @@ describe("admit", () => { }); it("passes targetBatchSize as ARGV", async () => { - mockSendCommand.mockResolvedValueOnce([3, "allowed", 0, 10]); + mockSendCommand.mockResolvedValueOnce([3, "some_allowed", 0, 10]); await admit(mockRedis, "target-1", 10, true, 7, defaultConfig); diff --git a/lambdas/https-client-lambda/src/services/admit.lua b/lambdas/https-client-lambda/src/services/admit.lua index 40b6a5f1..ed15a928 100644 --- a/lambdas/https-client-lambda/src/services/admit.lua +++ b/lambdas/https-client-lambda/src/services/admit.lua @@ -31,10 +31,9 @@ local cbEnabled = tonumber(ARGV[8]) == 1 local state = redis.call("HMGET", epKey, "is_open", "switched_at", "bucket_tokens", "bucket_refilled_at") -local isOpenRaw = state[1] -local cbNeedInit = isOpenRaw == false or isOpenRaw == nil +local cbNeedInit = state[1] == false or state[1] == nil local rlNeedInit = state[4] == false or state[4] == nil -local isOpen = cbNeedInit or tonumber(isOpenRaw) == 1 +local isOpen = cbNeedInit or tonumber(state[1]) == 1 local switchedAt = cbNeedInit and 0 or tonumber(state[2] or "0") local bucketTokens = tonumber(state[3] or "0") local bucketRefilledAt = rlNeedInit and now or tonumber(state[4]) @@ -110,6 +109,6 @@ redis.call("HSET", epKey, "bucket_refilled_at", bucketRefilledAt ) -local reason = consumedTokens < 1 and "rate_limited" or "allowed" +local reason = consumedTokens < 1 and "rate_limited" or "some_allowed" local retryAfter = consumedTokens < 1 and 1000 or 0 return { consumedTokens, reason, retryAfter, effectiveRate } diff --git a/lambdas/https-client-lambda/src/services/endpoint-gate.ts b/lambdas/https-client-lambda/src/services/endpoint-gate.ts index 1047cb06..b96c00eb 100644 --- a/lambdas/https-client-lambda/src/services/endpoint-gate.ts +++ b/lambdas/https-client-lambda/src/services/endpoint-gate.ts @@ -110,7 +110,7 @@ export async function admit( const [consumedOrFlag, reason, retryAfterMs, effectiveRate] = raw; - if (reason === "allowed" || reason === "probe") { + if (reason === "some_allowed") { return { allowed: true, consumedTokens: Number(consumedOrFlag),