diff --git a/lambdas/client-transform-filter-lambda/src/__tests__/index.component.test.ts b/lambdas/client-transform-filter-lambda/src/__tests__/index.component.test.ts index b234a244..8198cdfb 100644 --- a/lambdas/client-transform-filter-lambda/src/__tests__/index.component.test.ts +++ b/lambdas/client-transform-filter-lambda/src/__tests__/index.component.test.ts @@ -33,6 +33,7 @@ jest.mock("aws-embedded-metrics", () => ({ Count: "Count", Milliseconds: "Milliseconds", }, + StorageResolution: { High: 1, Standard: 60 }, })); import { GetObjectCommand, NoSuchKey } from "@aws-sdk/client-s3"; diff --git a/lambdas/client-transform-filter-lambda/src/__tests__/services/metrics.test.ts b/lambdas/client-transform-filter-lambda/src/__tests__/services/metrics.test.ts index bdbcc3aa..3e1a419e 100644 --- a/lambdas/client-transform-filter-lambda/src/__tests__/services/metrics.test.ts +++ b/lambdas/client-transform-filter-lambda/src/__tests__/services/metrics.test.ts @@ -1,7 +1,15 @@ -import { Unit, createMetricsLogger } from "aws-embedded-metrics"; +import { + StorageResolution, + Unit, + createMetricsLogger, +} from "aws-embedded-metrics"; import { CallbackMetrics, createMetricLogger } from "services/metrics"; -jest.mock("aws-embedded-metrics"); +jest.mock("aws-embedded-metrics", () => ({ + Unit: { Count: "Count" }, + StorageResolution: { High: 1, Standard: 60 }, + createMetricsLogger: jest.fn(), +})); const mockPutMetric = jest.fn(); const mockSetDimensions = jest.fn(); @@ -80,6 +88,7 @@ describe("CallbackMetrics", () => { "EventsReceived", 1, Unit.Count, + StorageResolution.High, ); }); }); @@ -92,6 +101,7 @@ describe("CallbackMetrics", () => { "TransformationsSuccessful", 1, Unit.Count, + StorageResolution.High, ); }); }); @@ -104,6 +114,7 @@ describe("CallbackMetrics", () => { "TransformationsFailed", 1, Unit.Count, + StorageResolution.High, ); }); }); @@ -116,6 +127,7 @@ describe("CallbackMetrics", () => { "CallbacksInitiated", 1, Unit.Count, + StorageResolution.High, ); }); }); @@ -128,6 +140,7 @@ describe("CallbackMetrics", () => { "ValidationErrors", 1, Unit.Count, + StorageResolution.High, ); }); }); @@ -140,6 +153,7 @@ describe("CallbackMetrics", () => { "FilteringStarted", 1, Unit.Count, + StorageResolution.High, ); }); }); @@ -152,6 +166,7 @@ describe("CallbackMetrics", () => { "FilteringMatched", 1, Unit.Count, + StorageResolution.High, ); }); }); diff --git a/lambdas/client-transform-filter-lambda/src/services/metrics.ts b/lambdas/client-transform-filter-lambda/src/services/metrics.ts index 398c5ecc..fd98ec02 100644 --- a/lambdas/client-transform-filter-lambda/src/services/metrics.ts +++ b/lambdas/client-transform-filter-lambda/src/services/metrics.ts @@ -1,4 +1,8 @@ -import { Unit, createMetricsLogger } from "aws-embedded-metrics"; +import { + StorageResolution, + Unit, + createMetricsLogger, +} from "aws-embedded-metrics"; import type { MetricsLogger } from "aws-embedded-metrics"; export const createMetricLogger = (): MetricsLogger => { @@ -21,30 +25,65 @@ export class CallbackMetrics { constructor(private readonly metrics: MetricsLogger) {} emitEventReceived(): void { - this.metrics.putMetric("EventsReceived", 1, Unit.Count); + this.metrics.putMetric( + "EventsReceived", + 1, + Unit.Count, + StorageResolution.High, + ); } emitTransformationSuccess(): void { - this.metrics.putMetric("TransformationsSuccessful", 1, Unit.Count); + this.metrics.putMetric( + "TransformationsSuccessful", + 1, + Unit.Count, + StorageResolution.High, + ); } emitTransformationFailure(): void { - this.metrics.putMetric("TransformationsFailed", 1, Unit.Count); + this.metrics.putMetric( + "TransformationsFailed", + 1, + Unit.Count, + StorageResolution.High, + ); } emitDeliveryInitiated(): void { - this.metrics.putMetric("CallbacksInitiated", 1, Unit.Count); + this.metrics.putMetric( + "CallbacksInitiated", + 1, + Unit.Count, + StorageResolution.High, + ); } emitValidationError(): void { - this.metrics.putMetric("ValidationErrors", 1, Unit.Count); + this.metrics.putMetric( + "ValidationErrors", + 1, + Unit.Count, + StorageResolution.High, + ); } emitFilteringStarted(): void { - this.metrics.putMetric("FilteringStarted", 1, Unit.Count); + this.metrics.putMetric( + "FilteringStarted", + 1, + Unit.Count, + StorageResolution.High, + ); } emitFilteringMatched(): void { - this.metrics.putMetric("FilteringMatched", 1, Unit.Count); + this.metrics.putMetric( + "FilteringMatched", + 1, + Unit.Count, + StorageResolution.High, + ); } } diff --git a/lambdas/https-client-lambda/src/__tests__/admit-lua.test.ts b/lambdas/https-client-lambda/src/__tests__/admit-lua.test.ts index 43aa2fb6..7553a1b1 100644 --- a/lambdas/https-client-lambda/src/__tests__/admit-lua.test.ts +++ b/lambdas/https-client-lambda/src/__tests__/admit-lua.test.ts @@ -1,7 +1,7 @@ import admitLuaSrc from "services/admit.lua"; import { createRedisStore, evalLua } from "__tests__/helpers/lua-redis-mock"; -// ARGV: [now, capacity, targetRateLimit, cooldownMs, recoveryPeriodMs, probeRateLimit, targetBatchSize] +// ARGV: [now, capacity, targetRateLimit, cooldownMs, recoveryPeriodMs, probeRateLimit, targetBatchSize, cbEnabled] // KEYS: [epKey] // Returns: [consumedTokens, reason, retryAfterMs, effectiveRate] @@ -13,6 +13,7 @@ type AdmitArgs = { recoveryPeriodMs: number; probeRateLimit: number; targetBatchSize: number; + cbEnabled: boolean; }; const defaultArgs: AdmitArgs = { @@ -23,6 +24,7 @@ const defaultArgs: AdmitArgs = { recoveryPeriodMs: 600_000, probeRateLimit: 1 / 60, targetBatchSize: 1, + cbEnabled: true, }; type AdmitResult = { @@ -49,6 +51,7 @@ function runAdmit( merged.recoveryPeriodMs.toString(), merged.probeRateLimit.toString(), merged.targetBatchSize.toString(), + merged.cbEnabled ? "1" : "0", ], store, ) as [number, string, number, number]; @@ -62,7 +65,7 @@ function runAdmit( describe("admit.lua", () => { describe("rate limiting", () => { - it("enters half-open probe on a fresh endpoint with no prior state", () => { + it("rate-limits on a fresh endpoint with no prior state", () => { const store = createRedisStore(); const now = 1_000_000; @@ -76,6 +79,21 @@ describe("admit.lua", () => { expect(effectiveRate).toBeCloseTo(1 / 60, 5); }); + it("generates a probe token on the second call after enough elapsed time", () => { + const store = createRedisStore(); + + runAdmit(store, { now: 1_000_000, targetRateLimit: 10 }); + + const { consumedTokens, effectiveRate, reason } = runAdmit(store, { + now: 1_060_001, + targetRateLimit: 10, + }); + + expect(effectiveRate).toBeCloseTo(1 / 60, 5); + expect(consumedTokens).toBe(1); + expect(reason).toBe("some_allowed"); + }); + it("does not persist circuit state on first contact", () => { const store = createRedisStore(); const now = 1_000_000; @@ -109,7 +127,7 @@ describe("admit.lua", () => { }); expect(consumedTokens).toBeGreaterThanOrEqual(1); - expect(reason).toBe("allowed"); + expect(reason).toBe("some_allowed"); }); it("allows a single request when bucket has tokens from refill", () => { @@ -131,7 +149,7 @@ describe("admit.lua", () => { }); expect(consumedTokens).toBe(1); - expect(reason).toBe("allowed"); + expect(reason).toBe("some_allowed"); expect(retryAfterMs).toBe(0); }); @@ -511,4 +529,121 @@ describe("admit.lua", () => { expect(store.has("ep:target-b")).toBe(true); }); }); + + describe("circuit breaker disabled (cbEnabled = false)", () => { + it("uses full targetRateLimit on a fresh endpoint with no prior state", () => { + const store = createRedisStore(); + const now = 1_000_000; + + const { effectiveRate } = runAdmit(store, { + now, + targetRateLimit: 10, + cbEnabled: false, + }); + + expect(effectiveRate).toBe(10); + }); + + it("applies initial values on fresh endpoint so first call has no tokens", () => { + const store = createRedisStore(); + const now = 1_000_000; + + const { consumedTokens, effectiveRate, reason } = runAdmit(store, { + now, + targetRateLimit: 10, + cbEnabled: false, + }); + + expect(effectiveRate).toBe(10); + expect(consumedTokens).toBe(0); + expect(reason).toBe("rate_limited"); + }); + + it("generates tokens at full rate after initial contact", () => { + const store = createRedisStore(); + + runAdmit(store, { + now: 1_000_000, + targetRateLimit: 10, + cbEnabled: false, + }); + const { consumedTokens, reason } = runAdmit(store, { + now: 1_000_100, + targetRateLimit: 10, + cbEnabled: false, + }); + + expect(consumedTokens).toBe(1); + expect(reason).toBe("some_allowed"); + }); + + it("ignores is_open state when CB is disabled", () => { + const store = createRedisStore(); + const now = 1_000_000; + store.set( + "ep:t1", + new Map([ + ["is_open", "1"], + ["switched_at", now.toString()], + ["bucket_tokens", "5"], + ["bucket_refilled_at", now.toString()], + ]), + ); + + const { consumedTokens, effectiveRate, reason } = runAdmit(store, { + now, + targetRateLimit: 10, + cbEnabled: false, + }); + + expect(effectiveRate).toBe(10); + expect(consumedTokens).toBe(1); + expect(reason).toBe("some_allowed"); + }); + + it("does not zero bucket tokens when is_open and CB disabled", () => { + const store = createRedisStore(); + const now = 1_000_000; + store.set( + "ep:t1", + new Map([ + ["is_open", "1"], + ["switched_at", now.toString()], + ["bucket_tokens", "5"], + ["bucket_refilled_at", now.toString()], + ]), + ); + + const { consumedTokens } = runAdmit(store, { + now, + targetRateLimit: 10, + cbEnabled: false, + targetBatchSize: 3, + }); + + expect(consumedTokens).toBe(3); + }); + + it("never returns circuit_open when CB is disabled", () => { + const store = createRedisStore(); + const now = 1_000_000; + store.set( + "ep:t1", + new Map([ + ["is_open", "1"], + ["switched_at", (now - 10_000).toString()], + ["bucket_tokens", "0"], + ["bucket_refilled_at", now.toString()], + ]), + ); + + const { reason } = runAdmit(store, { + now, + cooldownMs: 120_000, + cbEnabled: false, + }); + + expect(reason).not.toBe("circuit_open"); + }); + }); }); diff --git a/lambdas/https-client-lambda/src/__tests__/delivery-metrics.test.ts b/lambdas/https-client-lambda/src/__tests__/delivery-metrics.test.ts index 803c19bb..249b0148 100644 --- a/lambdas/https-client-lambda/src/__tests__/delivery-metrics.test.ts +++ b/lambdas/https-client-lambda/src/__tests__/delivery-metrics.test.ts @@ -1,6 +1,7 @@ const mockCreateMetricsLogger = jest.fn(); jest.mock("aws-embedded-metrics", () => ({ Unit: { Count: "Count", Milliseconds: "Milliseconds" }, + StorageResolution: { High: 1, Standard: 60 }, createMetricsLogger: () => mockCreateMetricsLogger(), })); @@ -83,6 +84,7 @@ describe("delivery-metrics", () => { "DeliveryAttempt", 1, "Count", + 1, ); }); @@ -96,6 +98,7 @@ describe("delivery-metrics", () => { "DeliverySuccess", 1, "Count", + 1, ); }); @@ -109,6 +112,7 @@ describe("delivery-metrics", () => { "DeliveryFailure", 1, "Count", + 1, ); }); @@ -123,6 +127,7 @@ describe("delivery-metrics", () => { "DeliveryPermanentFailure", 1, "Count", + 1, ); }); @@ -137,6 +142,7 @@ describe("delivery-metrics", () => { "CircuitBreakerOpen", 1, "Count", + 1, ); }); @@ -151,6 +157,7 @@ describe("delivery-metrics", () => { "DeliveryRateLimited", 1, "Count", + 1, ); }); @@ -165,6 +172,7 @@ describe("delivery-metrics", () => { "CircuitBreakerClosed", 1, "Count", + 1, ); }); @@ -179,6 +187,7 @@ describe("delivery-metrics", () => { "DeliveryRetryWindowExhausted", 1, "Count", + 1, ); }); @@ -201,6 +210,7 @@ describe("delivery-metrics", () => { "AdmissionDenied", 1, "Count", + 1, ); }); @@ -215,6 +225,7 @@ describe("delivery-metrics", () => { "DeliveryDurationMs", 250, "Milliseconds", + 1, ); }); diff --git a/lambdas/https-client-lambda/src/__tests__/delivery-observability.test.ts b/lambdas/https-client-lambda/src/__tests__/delivery-observability.test.ts index 25e164a9..2c0c0543 100644 --- a/lambdas/https-client-lambda/src/__tests__/delivery-observability.test.ts +++ b/lambdas/https-client-lambda/src/__tests__/delivery-observability.test.ts @@ -39,7 +39,7 @@ describe("delivery-observability", () => { ); const { logger } = jest.requireMock("@nhs-notify-client-callbacks/logger"); - recordDeliveryAttempt("client-1", "target-1", "msg-123"); + recordDeliveryAttempt("client-1", "target-1", "msg-123", "sqs-msg-1", 3); expect(emitDeliveryAttempt).toHaveBeenCalledWith("target-1"); expect(logger.info).toHaveBeenCalledWith( @@ -48,6 +48,8 @@ describe("delivery-observability", () => { clientId: "client-1", targetId: "target-1", correlationId: "msg-123", + sqsMessageId: "sqs-msg-1", + receiveCount: 3, }), ); }); @@ -190,25 +192,30 @@ describe("delivery-observability", () => { ); }); - it("recordAdmissionDenied emits metric and logs", () => { + it("recordAdmissionDenied emits per-record metrics and logs", () => { const { emitAdmissionDenied } = jest.requireMock( "services/delivery-metrics", ); const { logger } = jest.requireMock("@nhs-notify-client-callbacks/logger"); - recordAdmissionDenied("client-1", "target-1", "rate_limited", "msg-123"); + recordAdmissionDenied("client-1", "target-1", "rate_limited", [ + "msg-a", + "msg-b", + ]); expect(emitAdmissionDenied).toHaveBeenCalledWith( "target-1", "rate_limited", + 2, ); expect(logger.warn).toHaveBeenCalledWith( "Admission denied", expect.objectContaining({ clientId: "client-1", targetId: "target-1", - correlationId: "msg-123", reason: "rate_limited", + deniedCount: 2, + correlationIds: ["msg-a", "msg-b"], }), ); }); diff --git a/lambdas/https-client-lambda/src/__tests__/endpoint-gate.test.ts b/lambdas/https-client-lambda/src/__tests__/endpoint-gate.test.ts index eea9d44d..57573a3d 100644 --- a/lambdas/https-client-lambda/src/__tests__/endpoint-gate.test.ts +++ b/lambdas/https-client-lambda/src/__tests__/endpoint-gate.test.ts @@ -35,7 +35,7 @@ beforeEach(() => { describe("admit", () => { it("returns allowed with consumedTokens when tokens available", async () => { - mockSendCommand.mockResolvedValueOnce([5, "allowed", 0, 10]); + mockSendCommand.mockResolvedValueOnce([5, "some_allowed", 0, 10]); const result = await admit( mockRedis, @@ -99,7 +99,7 @@ describe("admit", () => { it("falls back to EVAL on NOSCRIPT error", async () => { mockSendCommand .mockRejectedValueOnce(new Error("NOSCRIPT No matching script")) - .mockResolvedValueOnce([1, "allowed", 0, 10]); + .mockResolvedValueOnce([1, "some_allowed", 0, 10]); const result = await admit( mockRedis, @@ -126,18 +126,18 @@ describe("admit", () => { ); }); - it("passes probeRateLimit=0 when circuit breaker is disabled", async () => { - mockSendCommand.mockResolvedValueOnce([1, "allowed", 0, 10]); + it("passes cbEnabled=0 when circuit breaker is disabled", async () => { + mockSendCommand.mockResolvedValueOnce([1, "some_allowed", 0, 10]); await admit(mockRedis, "target-1", 10, false, 1, defaultConfig); const args = mockSendCommand.mock.calls[0]![0] as string[]; - const probeRateArg = args[9]; - expect(probeRateArg).toBe("0"); + const cbEnabledArg = args[11]; + expect(cbEnabledArg).toBe("0"); }); it("passes single epKey", async () => { - mockSendCommand.mockResolvedValueOnce([1, "allowed", 0, 5]); + mockSendCommand.mockResolvedValueOnce([1, "some_allowed", 0, 5]); await admit(mockRedis, "my-target", 5, true, 1, defaultConfig); @@ -146,7 +146,7 @@ describe("admit", () => { }); it("passes targetBatchSize as ARGV", async () => { - mockSendCommand.mockResolvedValueOnce([3, "allowed", 0, 10]); + mockSendCommand.mockResolvedValueOnce([3, "some_allowed", 0, 10]); await admit(mockRedis, "target-1", 10, true, 7, defaultConfig); diff --git a/lambdas/https-client-lambda/src/__tests__/handler.test.ts b/lambdas/https-client-lambda/src/__tests__/handler.test.ts index ab5dff46..7121cbeb 100644 --- a/lambdas/https-client-lambda/src/__tests__/handler.test.ts +++ b/lambdas/https-client-lambda/src/__tests__/handler.test.ts @@ -197,9 +197,18 @@ describe("processRecords", () => { }); it("delivers only admitted records when consumedTokens is less than batch size", async () => { - const record1 = makeRecord({ messageId: "msg-1" }); - const record2 = makeRecord({ messageId: "msg-2" }); - const record3 = makeRecord({ messageId: "msg-3" }); + const record1 = makeRecord({ + messageId: "msg-1", + receiptHandle: "receipt-1", + }); + const record2 = makeRecord({ + messageId: "msg-2", + receiptHandle: "receipt-2", + }); + const record3 = makeRecord({ + messageId: "msg-3", + receiptHandle: "receipt-3", + }); mockAdmit.mockResolvedValue({ allowed: true, @@ -207,6 +216,10 @@ describe("processRecords", () => { effectiveRate: 10, }); + const { recordAdmissionDenied } = jest.requireMock( + "services/delivery-observability", + ); + const failures = await processRecords([record1, record2, record3]); expect(mockDeliverPayload).toHaveBeenCalledTimes(1); @@ -214,6 +227,15 @@ describe("processRecords", () => { { itemIdentifier: "msg-2" }, { itemIdentifier: "msg-3" }, ]); + expect(recordAdmissionDenied).toHaveBeenCalledWith( + "client-1", + "target-1", + "rate_limited", + ["test-message-id", "test-message-id"], + ); + + expect(mockChangeVisibility).toHaveBeenCalledWith("receipt-2", 1); + expect(mockChangeVisibility).toHaveBeenCalledWith("receipt-3", 1); }); it("an unexpected delivery error does not prevent other records in the batch", async () => { @@ -651,4 +673,68 @@ describe("processRecords", () => { expect(mockLoadTargetConfig).toHaveBeenCalledWith("client-1", "target-1"); expect(mockLoadTargetConfig).toHaveBeenCalledWith("client-1", "target-2"); }); + + it("calls recordAdmissionDenied with correlationIds when batch denied", async () => { + const record1 = makeRecord({ messageId: "msg-1" }); + const record2 = makeRecord({ messageId: "msg-2" }); + + mockAdmit.mockResolvedValue({ + allowed: false, + reason: "circuit_open", + retryAfterMs: 30_000, + effectiveRate: 0, + }); + + const { recordAdmissionDenied } = jest.requireMock( + "services/delivery-observability", + ); + + await processRecords([record1, record2]); + + expect(recordAdmissionDenied).toHaveBeenCalledWith( + "client-1", + "target-1", + "circuit_open", + ["test-message-id", "test-message-id"], + ); + }); + + it("logs deliveredCount and dlqCount in batch complete", async () => { + const record1 = makeRecord({ messageId: "msg-1" }); + const record2 = makeRecord({ messageId: "msg-2" }); + + mockDeliverPayload + .mockResolvedValueOnce({ outcome: "success" }) + .mockResolvedValueOnce({ outcome: "permanent_failure" }); + + const { logger } = jest.requireMock("@nhs-notify-client-callbacks/logger"); + + await processRecords([record1, record2]); + + expect(logger.info).toHaveBeenCalledWith( + "Batch complete", + expect.objectContaining({ + batchSize: 2, + deliveredCount: 1, + dlqCount: 1, + failureCount: 0, + }), + ); + }); + + it("includes correlationId in error log on unexpected delivery failure", async () => { + mockDeliverPayload.mockRejectedValue(new Error("Connection reset")); + + const { logger } = jest.requireMock("@nhs-notify-client-callbacks/logger"); + + await processRecords([makeRecord()]); + + expect(logger.error).toHaveBeenCalledWith( + "Failed to process record", + expect.objectContaining({ + messageId: "msg-1", + correlationId: "test-message-id", + }), + ); + }); }); diff --git a/lambdas/https-client-lambda/src/handler.ts b/lambdas/https-client-lambda/src/handler.ts index b129fc8d..19420462 100644 --- a/lambdas/https-client-lambda/src/handler.ts +++ b/lambdas/https-client-lambda/src/handler.ts @@ -99,14 +99,20 @@ function groupByTarget(records: SQSRecord[]): TargetBatch[] { ); } +function extractCorrelationId( + message: CallbackDeliveryMessage, +): string | undefined { + return message.payload.data[0]?.attributes?.messageId; +} + async function deliverRecord( record: SQSRecord, message: CallbackDeliveryMessage, target: Awaited>, applicationId: string, clientId: string, -): Promise<{ success: boolean }> { - const correlationId = message.payload.data[0]?.attributes?.messageId; +): Promise<{ success: boolean; dlq: boolean }> { + const correlationId = extractCorrelationId(message); const maxRetryDurationMs = target.delivery?.maxRetryDurationSeconds === undefined @@ -120,7 +126,7 @@ async function deliverRecord( if (isWindowExhausted(firstReceivedMs, maxRetryDurationMs)) { recordRetryWindowExhausted(clientId, message.targetId, correlationId); await sendToDlq(record.body); - return { success: true }; + return { success: true, dlq: true }; } const agent = await buildAgent(target); @@ -131,14 +137,20 @@ async function deliverRecord( ); const payloadJson = JSON.stringify(message.payload); - recordDeliveryAttempt(clientId, message.targetId, correlationId); + recordDeliveryAttempt( + clientId, + message.targetId, + correlationId, + record.messageId, + Number(record.attributes.ApproximateReceiveCount), + ); const deliveryStart = Date.now(); const result = await deliverPayload(target, payloadJson, signature, agent); recordDeliveryDuration(message.targetId, Date.now() - deliveryStart); if (result.outcome === OUTCOME_SUCCESS) { recordDeliverySuccess(clientId, message.targetId, correlationId); - return { success: true }; + return { success: true, dlq: false }; } if (result.outcome === OUTCOME_PERMANENT_FAILURE) { @@ -150,7 +162,7 @@ async function deliverRecord( correlationId, ); await sendToDlq(record.body, result); - return { success: true }; + return { success: true, dlq: true }; } if (result.outcome === OUTCOME_RATE_LIMITED) { @@ -163,7 +175,7 @@ async function deliverRecord( result.retryAfterHeader, receiveCount, ); - return { success: true }; + return { success: true, dlq: false }; } const receiveCount = Number(record.attributes.ApproximateReceiveCount); @@ -177,7 +189,32 @@ async function deliverRecord( correlationId, ); await changeVisibility(record.receiptHandle, backoffSec); - return { success: false }; + return { success: false, dlq: false }; +} + +type TargetBatchResult = { + failures: SQSBatchItemFailure[]; + deliveredCount: number; + dlqCount: number; +}; + +async function handleBatchDenied( + batch: TargetBatch, + clientId: string, + reason: string, + retryAfterMs: number, +): Promise { + const delaySec = Math.ceil(retryAfterMs / 1000); + const correlationIds = batch.messages.map((m) => extractCorrelationId(m)); + recordAdmissionDenied(clientId, batch.targetId, reason, correlationIds); + const failures: SQSBatchItemFailure[] = []; + for (const record of batch.records) { + // eslint-disable-next-line sonarjs/pseudo-random -- jitter for backoff, not security-sensitive + const jitterSec = Math.floor(Math.random() * 5); + await changeVisibility(record.receiptHandle, delaySec + jitterSec); + failures.push({ itemIdentifier: record.messageId }); + } + return { failures, deliveredCount: 0, dlqCount: 0 }; } async function processTargetBatch( @@ -185,7 +222,7 @@ async function processTargetBatch( redis: RedisClientType, clientId: string, concurrencyLimit: number, -): Promise { +): Promise { const target = await loadTargetConfig(clientId, batch.targetId); const cbEnabled = target.delivery?.circuitBreaker?.enabled ?? false; @@ -199,16 +236,12 @@ async function processTargetBatch( ); if (!gateResult.allowed) { - const baseDelaySec = Math.ceil(gateResult.retryAfterMs / 1000); - recordAdmissionDenied(clientId, batch.targetId, gateResult.reason); - const failures: SQSBatchItemFailure[] = []; - for (const record of batch.records) { - // eslint-disable-next-line sonarjs/pseudo-random -- jitter for backoff, not security-sensitive - const jitterSec = Math.floor(Math.random() * 5); - await changeVisibility(record.receiptHandle, baseDelaySec + jitterSec); - failures.push({ itemIdentifier: record.messageId }); - } - return failures; + return handleBatchDenied( + batch, + clientId, + gateResult.reason, + gateResult.retryAfterMs, + ); } const { consumedTokens } = gateResult; @@ -223,7 +256,10 @@ async function processTargetBatch( const deliveryResults = await pMap( admitted, - async (record, index): Promise<{ record: SQSRecord; success: boolean }> => { + async ( + record, + index, + ): Promise<{ record: SQSRecord; success: boolean; dlq: boolean }> => { try { const outcome = await deliverRecord( record, @@ -232,10 +268,12 @@ async function processTargetBatch( applicationId, clientId, ); - return { record, success: outcome.success }; + return { record, success: outcome.success, dlq: outcome.dlq }; } catch (error) { + const correlationId = extractCorrelationId(admittedMessages[index]); logger.error("Failed to process record", { messageId: record.messageId, + correlationId, err: error, }); const receiveCount = Number(record.attributes.ApproximateReceiveCount); @@ -243,7 +281,7 @@ async function processTargetBatch( record.receiptHandle, jitteredBackoffSeconds(receiveCount), ); - return { record, success: false }; + return { record, success: false, dlq: false }; } }, { concurrency: concurrencyLimit }, @@ -256,6 +294,11 @@ async function processTargetBatch( } } + const deliveredCount = deliveryResults.filter( + (r) => r.success && !r.dlq, + ).length; + const dlqCount = deliveryResults.filter((r) => r.dlq).length; + if (cbEnabled && consumedTokens > 0) { const cbOutcome = await recordResult( redis, @@ -275,11 +318,24 @@ async function processTargetBatch( } } - for (const record of rejected) { - failures.push({ itemIdentifier: record.messageId }); + if (rejected.length > 0) { + const rejectedMessages = batch.messages.slice(consumedTokens); + const rejectedCorrelationIds = rejectedMessages.map((m) => + extractCorrelationId(m), + ); + recordAdmissionDenied( + clientId, + batch.targetId, + "rate_limited", + rejectedCorrelationIds, + ); + for (const record of rejected) { + await changeVisibility(record.receiptHandle, 1); + failures.push({ itemIdentifier: record.messageId }); + } } - return failures; + return { failures, deliveredCount, dlqCount }; } export async function processRecords( @@ -302,19 +358,25 @@ export async function processRecords( const targetBatches = groupByTarget(records); const allFailures: SQSBatchItemFailure[] = []; + let totalDelivered = 0; + let totalDlq = 0; for (const batch of targetBatches) { - const batchFailures = await processTargetBatch( + const batchResult = await processTargetBatch( batch, redis, CLIENT_ID, concurrencyLimit, ); - allFailures.push(...batchFailures); + allFailures.push(...batchResult.failures); + totalDelivered += batchResult.deliveredCount; + totalDlq += batchResult.dlqCount; } logger.info("Batch complete", { batchSize: records.length, + deliveredCount: totalDelivered, + dlqCount: totalDlq, failureCount: allFailures.length, }); diff --git a/lambdas/https-client-lambda/src/services/admit.lua b/lambdas/https-client-lambda/src/services/admit.lua index 53b6977c..ed15a928 100644 --- a/lambdas/https-client-lambda/src/services/admit.lua +++ b/lambdas/https-client-lambda/src/services/admit.lua @@ -13,29 +13,35 @@ -- Returns: { consumedTokens, reason, retryAfterMs, effectiveRate } -- Keys -local epKey = KEYS[1] -- ep:{targetId} combined endpoint state hash +local epKey = KEYS[1] -- ep:{targetId} combined endpoint state hash -- Arguments -local now = tonumber(ARGV[1]) or 0 -local capacity = tonumber(ARGV[2]) or 0 -local targetRateLimit = tonumber(ARGV[3]) or 0 -local cooldownMs = tonumber(ARGV[4]) or 0 -local recoveryPeriodMs = tonumber(ARGV[5]) or 0 -local probeRateLimit = tonumber(ARGV[6]) or 0 -local targetBatchSize = tonumber(ARGV[7]) or 0 +local now = tonumber(ARGV[1]) or 0 +local capacity = tonumber(ARGV[2]) or 0 +local targetRateLimit = tonumber(ARGV[3]) or 0 +local cooldownMs = tonumber(ARGV[4]) or 0 +local recoveryPeriodMs = tonumber(ARGV[5]) or 0 +local probeRateLimit = tonumber(ARGV[6]) or 0 +local targetBatchSize = tonumber(ARGV[7]) or 0 +local cbEnabled = tonumber(ARGV[8]) == 1 -------------------------------------------------------------------------------- -- LOAD STATE -------------------------------------------------------------------------------- -local state = redis.call("HMGET", epKey, +local state = redis.call("HMGET", epKey, "is_open", "switched_at", "bucket_tokens", "bucket_refilled_at") -local isOpenRaw = state[1] -local needInit = isOpenRaw == false or isOpenRaw == nil -local isOpen = needInit or tonumber(isOpenRaw) == 1 -local switchedAt = needInit and 0 or tonumber(state[2] or "0") -local bucketTokens = tonumber(state[3] or "0") -local bucketRefilledAt = needInit and now or tonumber(state[4] or "0") +local cbNeedInit = state[1] == false or state[1] == nil +local rlNeedInit = state[4] == false or state[4] == nil +local isOpen = cbNeedInit or tonumber(state[1]) == 1 +local switchedAt = cbNeedInit and 0 or tonumber(state[2] or "0") +local bucketTokens = tonumber(state[3] or "0") +local bucketRefilledAt = rlNeedInit and now or tonumber(state[4]) + +if not cbEnabled then + isOpen = false + switchedAt = 0 +end -------------------------------------------------------------------------------- -- 1. CIRCUIT BREAKER — determine effective rate @@ -88,7 +94,7 @@ local generatedTokens = math.floor((now - bucketRefilledAt) * effectiveRate / 10 local availTokens = math.min(capacity, bucketTokens + generatedTokens) local consumedTokens = math.min(targetBatchSize, availTokens) -bucketTokens = availTokens - consumedTokens +bucketTokens = availTokens - consumedTokens if generatedTokens > 0 and effectiveRate > 0 then local generationTime = generatedTokens * 1000 / effectiveRate bucketRefilledAt = bucketRefilledAt + generationTime @@ -103,6 +109,6 @@ redis.call("HSET", epKey, "bucket_refilled_at", bucketRefilledAt ) -local reason = consumedTokens < 1 and "rate_limited" or "allowed" +local reason = consumedTokens < 1 and "rate_limited" or "some_allowed" local retryAfter = consumedTokens < 1 and 1000 or 0 return { consumedTokens, reason, retryAfter, effectiveRate } diff --git a/lambdas/https-client-lambda/src/services/delivery-metrics.ts b/lambdas/https-client-lambda/src/services/delivery-metrics.ts index 68248591..dcefaf57 100644 --- a/lambdas/https-client-lambda/src/services/delivery-metrics.ts +++ b/lambdas/https-client-lambda/src/services/delivery-metrics.ts @@ -1,4 +1,8 @@ -import { Unit, createMetricsLogger } from "aws-embedded-metrics"; +import { + StorageResolution, + Unit, + createMetricsLogger, +} from "aws-embedded-metrics"; import type { MetricsLogger } from "aws-embedded-metrics"; let metricsInstance: MetricsLogger | undefined; @@ -28,56 +32,90 @@ function getMetrics(): MetricsLogger { export function emitDeliveryAttempt(targetId: string): void { const metrics = getMetrics(); metrics.setProperty("targetId", targetId); - metrics.putMetric("DeliveryAttempt", 1, Unit.Count); + metrics.putMetric("DeliveryAttempt", 1, Unit.Count, StorageResolution.High); } export function emitDeliverySuccess(targetId: string): void { const metrics = getMetrics(); metrics.setProperty("targetId", targetId); - metrics.putMetric("DeliverySuccess", 1, Unit.Count); + metrics.putMetric("DeliverySuccess", 1, Unit.Count, StorageResolution.High); } export function emitDeliveryFailure(targetId: string): void { const metrics = getMetrics(); metrics.setProperty("targetId", targetId); - metrics.putMetric("DeliveryFailure", 1, Unit.Count); + metrics.putMetric("DeliveryFailure", 1, Unit.Count, StorageResolution.High); } export function emitDeliveryPermanentFailure(targetId: string): void { const metrics = getMetrics(); metrics.setProperty("targetId", targetId); - metrics.putMetric("DeliveryPermanentFailure", 1, Unit.Count); + metrics.putMetric( + "DeliveryPermanentFailure", + 1, + Unit.Count, + StorageResolution.High, + ); } export function emitRateLimited(targetId: string): void { const metrics = getMetrics(); metrics.setProperty("targetId", targetId); - metrics.putMetric("DeliveryRateLimited", 1, Unit.Count); + metrics.putMetric( + "DeliveryRateLimited", + 1, + Unit.Count, + StorageResolution.High, + ); } export function emitCircuitBreakerOpen(targetId: string): void { const metrics = getMetrics(); metrics.setProperty("targetId", targetId); - metrics.putMetric("CircuitBreakerOpen", 1, Unit.Count); + metrics.putMetric( + "CircuitBreakerOpen", + 1, + Unit.Count, + StorageResolution.High, + ); } export function emitCircuitBreakerClosed(targetId: string): void { const metrics = getMetrics(); metrics.setProperty("targetId", targetId); - metrics.putMetric("CircuitBreakerClosed", 1, Unit.Count); + metrics.putMetric( + "CircuitBreakerClosed", + 1, + Unit.Count, + StorageResolution.High, + ); } export function emitRetryWindowExhausted(targetId: string): void { const metrics = getMetrics(); metrics.setProperty("targetId", targetId); - metrics.putMetric("DeliveryRetryWindowExhausted", 1, Unit.Count); + metrics.putMetric( + "DeliveryRetryWindowExhausted", + 1, + Unit.Count, + StorageResolution.High, + ); } -export function emitAdmissionDenied(targetId: string, reason: string): void { +export function emitAdmissionDenied( + targetId: string, + reason: string, + count = 1, +): void { const metrics = getMetrics(); metrics.setProperty("targetId", targetId); metrics.setProperty("reason", reason); - metrics.putMetric("AdmissionDenied", 1, Unit.Count); + metrics.putMetric( + "AdmissionDenied", + count, + Unit.Count, + StorageResolution.High, + ); } export function emitDeliveryDuration( @@ -86,7 +124,12 @@ export function emitDeliveryDuration( ): void { const metrics = getMetrics(); metrics.setProperty("targetId", targetId); - metrics.putMetric("DeliveryDurationMs", durationMs, Unit.Milliseconds); + metrics.putMetric( + "DeliveryDurationMs", + durationMs, + Unit.Milliseconds, + StorageResolution.High, + ); } export async function flushMetrics(): Promise { diff --git a/lambdas/https-client-lambda/src/services/delivery-observability.ts b/lambdas/https-client-lambda/src/services/delivery-observability.ts index ed41df8a..d722b851 100644 --- a/lambdas/https-client-lambda/src/services/delivery-observability.ts +++ b/lambdas/https-client-lambda/src/services/delivery-observability.ts @@ -16,9 +16,17 @@ export function recordDeliveryAttempt( clientId: string, targetId: string, correlationId?: string, + sqsMessageId?: string, + receiveCount?: number, ): void { emitDeliveryAttempt(targetId); - logger.info("Attempting delivery", { clientId, targetId, correlationId }); + logger.info("Attempting delivery", { + clientId, + targetId, + correlationId, + sqsMessageId, + receiveCount, + }); } export function recordDeliverySuccess( @@ -108,14 +116,15 @@ export function recordAdmissionDenied( clientId: string, targetId: string, reason: string, - correlationId?: string, + correlationIds: (string | undefined)[], ): void { - emitAdmissionDenied(targetId, reason); + emitAdmissionDenied(targetId, reason, correlationIds.length); logger.warn("Admission denied", { clientId, targetId, - correlationId, reason, + deniedCount: correlationIds.length, + correlationIds, }); } diff --git a/lambdas/https-client-lambda/src/services/endpoint-gate.ts b/lambdas/https-client-lambda/src/services/endpoint-gate.ts index 8870e5d8..b96c00eb 100644 --- a/lambdas/https-client-lambda/src/services/endpoint-gate.ts +++ b/lambdas/https-client-lambda/src/services/endpoint-gate.ts @@ -84,7 +84,6 @@ export async function admit( ): Promise { const epKey = `ep:{${targetId}}`; const now = Date.now().toString(); - const probeRate = cbEnabled ? config.probeRateLimit.toString() : "0"; const args = [ now, @@ -92,8 +91,9 @@ export async function admit( String(refillPerSec), config.cooldownPeriodMs.toString(), config.recoveryPeriodMs.toString(), - probeRate, + config.probeRateLimit.toString(), String(targetBatchSize), + cbEnabled ? "1" : "0", ]; if (!admitSha) { @@ -110,7 +110,7 @@ export async function admit( const [consumedOrFlag, reason, retryAfterMs, effectiveRate] = raw; - if (reason === "allowed" || reason === "probe") { + if (reason === "some_allowed") { return { allowed: true, consumedTokens: Number(consumedOrFlag), diff --git a/lambdas/perf-runner-lambda/src/__tests__/elasticache.test.ts b/lambdas/perf-runner-lambda/src/__tests__/elasticache.test.ts index 09846ed3..54c8e813 100644 --- a/lambdas/perf-runner-lambda/src/__tests__/elasticache.test.ts +++ b/lambdas/perf-runner-lambda/src/__tests__/elasticache.test.ts @@ -1,19 +1,25 @@ -import { flushElastiCache } from "elasticache"; +import { dumpRateLimitState, flushElastiCache } from "elasticache"; import type { ElastiCacheDeps } from "types"; const mockConnect = jest.fn().mockResolvedValue(undefined); const mockFlushAll = jest.fn().mockResolvedValue("OK"); const mockDisconnect = jest.fn().mockResolvedValue(undefined); +const mockHmGet = jest.fn().mockResolvedValue([]); let mockIsOpen = true; +let mockScanKeys: string[] = []; jest.mock("@redis/client", () => ({ createClient: jest.fn(() => ({ connect: mockConnect, flushAll: mockFlushAll, disconnect: mockDisconnect, + hmGet: mockHmGet, get isOpen() { return mockIsOpen; }, + scanIterator: jest.fn(function scanIterator() { + return mockScanKeys[Symbol.iterator](); + }), })), })); @@ -46,6 +52,7 @@ const deps: ElastiCacheDeps = { beforeEach(() => { jest.clearAllMocks(); mockIsOpen = true; + mockScanKeys = []; }); describe("flushElastiCache", () => { @@ -72,3 +79,87 @@ describe("flushElastiCache", () => { expect(mockDisconnect).not.toHaveBeenCalled(); }); }); + +describe("dumpRateLimitState", () => { + it("returns empty array when no ep: keys exist", async () => { + mockScanKeys = []; + + const result = await dumpRateLimitState(deps); + + expect(result).toEqual([]); + expect(mockConnect).toHaveBeenCalledTimes(1); + expect(mockDisconnect).toHaveBeenCalledTimes(1); + }); + + it("returns state for each ep: key sorted alphabetically", async () => { + mockScanKeys = ["ep:{target-b}", "ep:{target-a}"]; + mockHmGet + .mockResolvedValueOnce([ + "1", + "500", + "0", + "400", + "20", + "15", + "5", + "3", + "1500", + ]) + .mockResolvedValueOnce([ + "0", + "1000", + "5", + "900", + "10", + "8", + "2", + "1", + "2000", + ]); + + const result = await dumpRateLimitState(deps); + + expect(result).toHaveLength(2); + expect(result[0].key).toBe("ep:{target-a}"); + expect(result[0].isOpen).toBe("1"); + expect(result[0].switchedAt).toBe("500"); + expect(result[0].bucketTokens).toBe("0"); + expect(result[0].bucketRefilledAt).toBe("400"); + expect(result[0].curAttempts).toBe("20"); + expect(result[0].prevAttempts).toBe("15"); + expect(result[0].curFailures).toBe("5"); + expect(result[0].prevFailures).toBe("3"); + expect(result[0].sampleTill).toBe("1500"); + + expect(result[1].key).toBe("ep:{target-b}"); + expect(result[1].isOpen).toBe("0"); + }); + + it("disconnects even when scan throws", async () => { + const mockClient = { + connect: mockConnect, + disconnect: mockDisconnect, + hmGet: mockHmGet, + get isOpen() { + return mockIsOpen; + }, + scanIterator: jest.fn(() => { + throw new Error("scan failed"); + }), + }; + const { createClient } = jest.requireMock("@redis/client"); + createClient.mockReturnValueOnce(mockClient); + + await expect(dumpRateLimitState(deps)).rejects.toThrow("scan failed"); + expect(mockDisconnect).toHaveBeenCalledTimes(1); + }); + + it("skips disconnect when client is not open", async () => { + mockIsOpen = false; + mockScanKeys = []; + + await dumpRateLimitState(deps); + + expect(mockDisconnect).not.toHaveBeenCalled(); + }); +}); diff --git a/lambdas/perf-runner-lambda/src/__tests__/purge.test.ts b/lambdas/perf-runner-lambda/src/__tests__/purge.test.ts index 60347ef9..14bcf247 100644 --- a/lambdas/perf-runner-lambda/src/__tests__/purge.test.ts +++ b/lambdas/perf-runner-lambda/src/__tests__/purge.test.ts @@ -88,9 +88,9 @@ describe("purgeQueues", () => { expect(mockSend).toHaveBeenCalledTimes(2); }); - it("ignores NonExistentQueue errors gracefully", async () => { + it("ignores QueueDoesNotExist errors gracefully", async () => { const nonExistentError = Object.assign(new Error("Queue does not exist"), { - name: "AWS.SimpleQueueService.NonExistentQueue", + name: "QueueDoesNotExist", }); mockSend.mockRejectedValueOnce(nonExistentError); @@ -99,7 +99,7 @@ describe("purgeQueues", () => { ).resolves.toBeUndefined(); }); - it("rethrows non-NonExistentQueue errors", async () => { + it("rethrows non-QueueDoesNotExist errors", async () => { const otherError = new Error("Access denied"); mockSend.mockRejectedValueOnce(otherError); diff --git a/lambdas/perf-runner-lambda/src/__tests__/runner.test.ts b/lambdas/perf-runner-lambda/src/__tests__/runner.test.ts index 46a0928d..622e98a4 100644 --- a/lambdas/perf-runner-lambda/src/__tests__/runner.test.ts +++ b/lambdas/perf-runner-lambda/src/__tests__/runner.test.ts @@ -12,7 +12,7 @@ import { defaultSleep, runPerformanceTest } from "runner"; import { generatePhaseLoad } from "sqs"; import { deriveQueueUrls, purgeQueues } from "purge"; -import { flushElastiCache } from "elasticache"; +import { dumpRateLimitState, flushElastiCache } from "elasticache"; import { verifyMockWebhook } from "webhook-verify"; import { queryCircuitBreakerSnapshot, @@ -39,6 +39,7 @@ const mockQueryPerClientRateTimeline = jest.mocked(queryPerClientRateTimeline); const mockDeriveQueueUrls = jest.mocked(deriveQueueUrls); const mockPurgeQueues = jest.mocked(purgeQueues); const mockFlushElastiCache = jest.mocked(flushElastiCache); +const mockDumpRateLimitState = jest.mocked(dumpRateLimitState); const mockVerifyMockWebhook = jest.mocked(verifyMockWebhook); const immediateSleep = jest.fn().mockResolvedValue(undefined); @@ -112,6 +113,7 @@ beforeEach(() => { ]); mockPurgeQueues.mockResolvedValue(undefined); mockFlushElastiCache.mockResolvedValue(undefined); + mockDumpRateLimitState.mockResolvedValue([]); mockVerifyMockWebhook.mockResolvedValue({ receivedCallbacks: 0, verified: false, @@ -576,6 +578,59 @@ describe("runPerformanceTest", () => { expect(mockFlushElastiCache).toHaveBeenCalledWith(elastiCacheDeps); }); + it("dumps rate limit state before and after when elasticache deps are provided", async () => { + mockQueryMetricsSnapshot.mockResolvedValue(null); + const elastiCacheDeps = { + endpoint: "cache.example.invalid", + cacheName: "test-cache", + iamUsername: "test-user", + region: "eu-west-2", + }; + const mockState = [ + { + key: "ep:{target-1}", + isOpen: "0", + switchedAt: "0", + bucketTokens: "10", + bucketRefilledAt: "1000", + curAttempts: "5", + prevAttempts: "3", + curFailures: "0", + prevFailures: "0", + sampleTill: "2000", + }, + ]; + mockDumpRateLimitState.mockResolvedValue(mockState); + + const result = await runPerformanceTest( + deps, + scenario, + "test-dump", + immediateSleep, + elastiCacheDeps, + ); + + expect(mockDumpRateLimitState).toHaveBeenCalledTimes(2); + expect(mockDumpRateLimitState).toHaveBeenCalledWith(elastiCacheDeps); + expect(result.rateLimitStateBefore).toEqual(mockState); + expect(result.rateLimitStateAfter).toEqual(mockState); + }); + + it("omits rate limit state when elasticache deps are not provided", async () => { + mockQueryMetricsSnapshot.mockResolvedValue(null); + + const result = await runPerformanceTest( + deps, + scenario, + "test-no-dump", + immediateSleep, + ); + + expect(mockDumpRateLimitState).not.toHaveBeenCalled(); + expect(result.rateLimitStateBefore).toBeUndefined(); + expect(result.rateLimitStateAfter).toBeUndefined(); + }); + it("skips ElastiCache flush when deps are not provided", async () => { mockQueryMetricsSnapshot.mockResolvedValue(null); diff --git a/lambdas/perf-runner-lambda/src/elasticache.ts b/lambdas/perf-runner-lambda/src/elasticache.ts index 8d0b86c6..8f41ad9e 100644 --- a/lambdas/perf-runner-lambda/src/elasticache.ts +++ b/lambdas/perf-runner-lambda/src/elasticache.ts @@ -2,7 +2,7 @@ import { type RedisClientType, createClient } from "@redis/client"; import { SignatureV4 } from "@smithy/signature-v4"; import { Sha256 } from "@aws-crypto/sha256-js"; import { fromNodeProviderChain } from "@aws-sdk/credential-providers"; -import type { ElastiCacheDeps } from "types"; +import type { ElastiCacheDeps, EndpointRateLimitState } from "types"; const TOKEN_EXPIRY_SECONDS = 900; @@ -50,3 +50,61 @@ export async function flushElastiCache(deps: ElastiCacheDeps): Promise { } } } + +const RATE_LIMIT_HASH_FIELDS = [ + "is_open", + "switched_at", + "bucket_tokens", + "bucket_refilled_at", + "cur_attempts", + "prev_attempts", + "cur_failures", + "prev_failures", + "sample_till", +] as const; + +export async function dumpRateLimitState( + deps: ElastiCacheDeps, +): Promise { + const token = await generateIamToken(deps); + + const client: RedisClientType = createClient({ + url: `rediss://${deps.endpoint}:6379`, + username: deps.iamUsername, + password: token, + }); + + try { + await client.connect(); + + const keys: string[] = []; + for await (const key of client.scanIterator({ MATCH: "ep:*" })) { + keys.push(key); + } + + // eslint-disable-next-line sonarjs/null-dereference -- false positive: keys is string[] + keys.sort((a, b) => a.localeCompare(b)); + const states: EndpointRateLimitState[] = []; + for (const key of keys) { + const values = await client.hmGet(key, [...RATE_LIMIT_HASH_FIELDS]); + states.push({ + key, + isOpen: values[0], + switchedAt: values[1], + bucketTokens: values[2], + bucketRefilledAt: values[3], + curAttempts: values[4], + prevAttempts: values[5], + curFailures: values[6], + prevFailures: values[7], + sampleTill: values[8], + }); + } + + return states; + } finally { + if (client.isOpen) { + await client.disconnect(); + } + } +} diff --git a/lambdas/perf-runner-lambda/src/purge.ts b/lambdas/perf-runner-lambda/src/purge.ts index 5743e9d2..e363e706 100644 --- a/lambdas/perf-runner-lambda/src/purge.ts +++ b/lambdas/perf-runner-lambda/src/purge.ts @@ -32,7 +32,7 @@ export async function purgeQueues( for (const result of results) { if (result.status === "rejected") { const error = result.reason as { name?: string }; - if (error.name !== "AWS.SimpleQueueService.NonExistentQueue") { + if (error.name !== "QueueDoesNotExist") { throw result.reason as Error; } } diff --git a/lambdas/perf-runner-lambda/src/runner.ts b/lambdas/perf-runner-lambda/src/runner.ts index 321abc45..7a5b5ee6 100644 --- a/lambdas/perf-runner-lambda/src/runner.ts +++ b/lambdas/perf-runner-lambda/src/runner.ts @@ -2,6 +2,7 @@ import type { CircuitBreakerSnapshot, DeliveryMetricsSnapshot, ElastiCacheDeps, + EndpointRateLimitState, MetricsSnapshot, PerClientRateTimeline, PerformanceResult, @@ -12,7 +13,7 @@ import type { } from "types"; import { generatePhaseLoad } from "sqs"; import { deriveQueueUrls, purgeQueues } from "purge"; -import { flushElastiCache } from "elasticache"; +import { dumpRateLimitState, flushElastiCache } from "elasticache"; import { verifyMockWebhook } from "webhook-verify"; import { queryCircuitBreakerSnapshot, @@ -113,6 +114,11 @@ export async function runPerformanceTest( await flushElastiCache(elastiCacheDeps); } + let rateLimitStateBefore: EndpointRateLimitState[] | undefined; + if (elastiCacheDeps) { + rateLimitStateBefore = await dumpRateLimitState(elastiCacheDeps); + } + const startedAt = new Date(testStartMs).toISOString(); const phaseResults: PhaseResult[] = []; const snapshots: MetricsSnapshot[] = []; @@ -208,6 +214,11 @@ export async function runPerformanceTest( ); } + let rateLimitStateAfter: EndpointRateLimitState[] | undefined; + if (elastiCacheDeps) { + rateLimitStateAfter = await dumpRateLimitState(elastiCacheDeps); + } + await purgeQueues(deps.sqsClient, queueUrls); if (elastiCacheDeps) { await flushElastiCache(elastiCacheDeps); @@ -224,5 +235,7 @@ export async function runPerformanceTest( circuitBreakerMetrics: cbSnapshots, perClientRateTimelines, webhookVerification, + rateLimitStateBefore, + rateLimitStateAfter, }; } diff --git a/lambdas/perf-runner-lambda/src/types.ts b/lambdas/perf-runner-lambda/src/types.ts index 24df2a50..4415ef63 100644 --- a/lambdas/perf-runner-lambda/src/types.ts +++ b/lambdas/perf-runner-lambda/src/types.ts @@ -79,6 +79,19 @@ export type PerClientRateTimeline = { entries: PerClientRateEntry[]; }; +export type EndpointRateLimitState = { + key: string; + isOpen: string | null; + switchedAt: string | null; + bucketTokens: string | null; + bucketRefilledAt: string | null; + curAttempts: string | null; + prevAttempts: string | null; + curFailures: string | null; + prevFailures: string | null; + sampleTill: string | null; +}; + export type WebhookVerificationResult = { receivedCallbacks: number; verified: boolean; @@ -95,6 +108,8 @@ export type PerformanceResult = { circuitBreakerMetrics: CircuitBreakerSnapshot[]; perClientRateTimelines?: PerClientRateTimeline[]; webhookVerification?: WebhookVerificationResult; + rateLimitStateBefore?: EndpointRateLimitState[]; + rateLimitStateAfter?: EndpointRateLimitState[]; }; export type PerfRunnerPayload = { diff --git a/scripts/tests/integration-debug.sh b/scripts/tests/integration-debug.sh index a4ebbd63..6970bd82 100755 --- a/scripts/tests/integration-debug.sh +++ b/scripts/tests/integration-debug.sh @@ -8,6 +8,7 @@ set -euo pipefail # ENVIRONMENT= AWS_PROFILE= make test-integration-debug ACTION=queue-status # ENVIRONMENT= AWS_PROFILE= make test-integration-debug ACTION=tail-transform # ENVIRONMENT= AWS_PROFILE= make test-integration-debug ACTION=tail-transform LOG_FILTER= +# ENVIRONMENT= AWS_PROFILE= FOLLOW=false make test-integration-debug ACTION=tail-transform > out.log # # Actions: # queue-status Show SQS queue message counts @@ -28,6 +29,8 @@ set -euo pipefail # # Optional: # LOG_FILTER CloudWatch Logs filter pattern / text +# LOG_SINCE How far back to start tailing logs (default: 30m, e.g. 1h, 2h, 30m) +# FOLLOW Follow logs continuously (default: true, set false to dump and exit) # AWS_REGION (default: eu-west-2) if [ -z "${ENVIRONMENT:-}" ]; then @@ -49,7 +52,9 @@ fi REGION="${AWS_REGION:-eu-west-2}" LOG_FILTER="${LOG_FILTER:-}" +LOG_SINCE="${LOG_SINCE:-30m}" CLIENT_ID="${CLIENT_ID:-}" +FOLLOW="${FOLLOW:-true}" if ! aws sts get-caller-identity --profile "$AWS_PROFILE" >/dev/null 2>&1; then echo "No active AWS SSO session for profile '$AWS_PROFILE'. Running aws sso login..." @@ -140,49 +145,61 @@ log_filter_args() { fi } +follow_args() { + if [[ "$FOLLOW" == "true" ]]; then + printf '%s\n' --follow + fi +} + action_tail_transform() { local -a filter_args=() + local -a follow_arg=() mapfile -t filter_args < <(log_filter_args) + mapfile -t follow_arg < <(follow_args) print_section "Transform/Filter Lambda Logs" aws logs tail \ "/aws/lambda/${PREFIX}-client-transform-filter" \ --region "$REGION" \ --profile "$AWS_PROFILE" \ - --since 30m \ - --follow \ + --since "$LOG_SINCE" \ --format short \ + "${follow_arg[@]}" \ "${filter_args[@]}" } action_tail_https_client() { require_client_id local -a filter_args=() + local -a follow_arg=() mapfile -t filter_args < <(log_filter_args) + mapfile -t follow_arg < <(follow_args) print_section "HTTPS Client Lambda Logs" aws logs tail \ "/aws/lambda/${PREFIX}-https-client-${CLIENT_ID}" \ --region "$REGION" \ --profile "$AWS_PROFILE" \ - --since 30m \ - --follow \ + --since "$LOG_SINCE" \ --format short \ + "${follow_arg[@]}" \ "${filter_args[@]}" } action_tail_webhook() { local -a filter_args=() + local -a follow_arg=() mapfile -t filter_args < <(log_filter_args) + mapfile -t follow_arg < <(follow_args) print_section "Mock Webhook Lambda Logs" aws logs tail \ "/aws/lambda/${PREFIX}-mock-webhook" \ --region "$REGION" \ --profile "$AWS_PROFILE" \ - --since 30m \ - --follow \ + --since "$LOG_SINCE" \ --format short \ + "${follow_arg[@]}" \ "${filter_args[@]}" } @@ -208,14 +225,17 @@ action_tail_pipe() { pipe_log_group_name="${pipe_log_group_arn#*:log-group:}" + local -a follow_arg=() + mapfile -t follow_arg < <(follow_args) + print_section "EventBridge Pipe Logs" aws logs tail \ "$pipe_log_group_name" \ --region "$REGION" \ --profile "$AWS_PROFILE" \ - --since 30m \ - --follow \ + --since "$LOG_SINCE" \ --format short \ + "${follow_arg[@]}" \ "${filter_args[@]}" } diff --git a/tests/integration/delivery-resilience.test.ts b/tests/integration/delivery-resilience.test.ts index 8b218233..e3b2218b 100644 --- a/tests/integration/delivery-resilience.test.ts +++ b/tests/integration/delivery-resilience.test.ts @@ -108,7 +108,7 @@ describe("Delivery Resilience", () => { }); describe("Rate Limiting", () => { - const BURST_SIZE = 15; + const BURST_SIZE = 30; let dlqUrl: string; let deliveryUrl: string; let httpsClientLogGroup: string; @@ -200,9 +200,27 @@ describe("Delivery Resilience", () => { it("should open the circuit breaker after repeated failures and not affect other clients", async () => { const cbConfig = getClientConfig("clientCircuitBreaker"); + const cbTargetPath = buildMockWebhookTargetPath("clientCircuitBreaker"); const singleTargetConfig = getClientConfig("clientSingleTarget"); const singleTargetPath = buildMockWebhookTargetPath("clientSingleTarget"); + // Send a successful message first so the circuit is confirmed closed (it starts half-open) + const warmupEvent = createMessageStatusPublishEvent({ + data: { + clientId: cbConfig.clientId, + messageId: `cb-warmup-${crypto.randomUUID()}`, + }, + }); + await sendSqsEvent(ctx.sqs, ctx.inboundQueueUrl, warmupEvent); + const warmupCallback = await awaitCallback( + ctx.cwLogs, + ctx.webhookLogGroup, + warmupEvent.data.messageId, + "MessageStatus", + ctx.startTime, + ); + expect(warmupCallback.path).toBe(cbTargetPath); + const cbEvents = Array.from({ length: CB_BURST_SIZE }, () => createMessageStatusPublishEvent({ data: {