From 708ba41ace28430fc857d3824887cab75d6549af Mon Sep 17 00:00:00 2001 From: JerrettDavis Date: Fri, 22 May 2026 22:28:31 -0500 Subject: [PATCH 1/4] feat(messaging-transformation): add KeyedNormalizer Key-based O(1) dispatch variant of Normalizer. Builder throws on duplicate key registration (safer than last-writer-wins). Dictionary snapshot taken at Build() so the built instance is fully immutable after construction. 8 test scenarios cover: happy path, multiple keys, default fallback, missing-key-throws, async handler, cancellation, duplicate-key guard, and build-twice semantics. Co-Authored-By: Claude Opus 4.6 --- .../Transformation/KeyedNormalizer.cs | 116 +++++++++++++ .../Transformation/KeyedNormalizerTests.cs | 154 ++++++++++++++++++ 2 files changed, 270 insertions(+) create mode 100644 src/PatternKit.Core/Messaging/Transformation/KeyedNormalizer.cs create mode 100644 test/PatternKit.Tests/Messaging/Transformation/KeyedNormalizerTests.cs diff --git a/src/PatternKit.Core/Messaging/Transformation/KeyedNormalizer.cs b/src/PatternKit.Core/Messaging/Transformation/KeyedNormalizer.cs new file mode 100644 index 0000000..deb5c52 --- /dev/null +++ b/src/PatternKit.Core/Messaging/Transformation/KeyedNormalizer.cs @@ -0,0 +1,116 @@ +namespace PatternKit.Messaging.Transformation; + +/// +/// Key-based dispatcher that normalizes a raw value into a canonical form using a dictionary lookup. +/// Unlike (predicate-first), dispatch here is O(1) keyed +/// lookup — useful for format-tagged messages where the discriminator is known at call time +/// (e.g., content-type strings, routing keys, source system identifiers). +/// +/// The key type used to select the handler. Must be non-nullable. +/// The incoming raw type (e.g., , ). +/// The target canonical type produced after normalization. +public sealed class KeyedNormalizer + where TKey : notnull +{ + /// Async normalizer handler for a specific key. + public delegate ValueTask AsyncNormalizerHandler(TRaw raw, CancellationToken cancellationToken); + + private readonly string _name; + private readonly Dictionary _handlers; + private readonly AsyncNormalizerHandler? _default; + + private KeyedNormalizer( + string name, + Dictionary handlers, + AsyncNormalizerHandler? @default) + { + _name = name; + _handlers = handlers; + _default = @default; + } + + /// Creates a new keyed normalizer builder. + public static Builder Create(string name = "keyed-normalizer") => new(name); + + /// + /// Normalizes by dispatching to the handler registered for + /// . Falls back to the default handler when no key match is found. + /// + /// + /// Thrown when has no registered handler and no default handler + /// was configured. + /// + public ValueTask NormalizeAsync(TKey key, TRaw raw, CancellationToken ct = default) + { + ct.ThrowIfCancellationRequested(); + + if (_handlers.TryGetValue(key, out var handler)) + return handler(raw, ct); + + if (_default is not null) + return _default(raw, ct); + + throw new KeyNotFoundException( + $"No format handler matched the raw input for normalizer '{_name}'."); + } + + /// Fluent builder for . + public sealed class Builder + { + private readonly string _name; + private readonly Dictionary _entries; + private AsyncNormalizerHandler? _default; + + internal Builder(string name) + { + if (string.IsNullOrWhiteSpace(name)) + throw new ArgumentException("Keyed normalizer name cannot be null, empty, or whitespace.", nameof(name)); + + _name = name; + _entries = new Dictionary(); + } + + /// + /// Registers a handler for . Throws + /// when the same key is registered more than once — + /// duplicate registrations are almost always a configuration mistake. + /// + /// + /// Thrown when is already registered. + /// + public Builder When(TKey key, Func> handler) + { + if (key is null) + throw new ArgumentNullException(nameof(key)); + if (handler is null) + throw new ArgumentNullException(nameof(handler)); + + if (_entries.ContainsKey(key)) + throw new ArgumentException( + $"A handler for key '{key}' is already registered. Each key may only have one handler.", + nameof(key)); + + _entries[key] = (raw, ct) => handler(raw, ct); + return this; + } + + /// Sets the default handler used when no key matches. + public Builder Default(Func> handler) + { + _default = (raw, ct) => (handler ?? throw new ArgumentNullException(nameof(handler)))(raw, ct); + return this; + } + + /// Builds an immutable keyed normalizer. + public KeyedNormalizer Build() + { + if (_entries.Count == 0 && _default is null) + throw new InvalidOperationException( + "KeyedNormalizer requires at least one key handler or a default handler."); + + // Snapshot the dictionary so the built instance is immutable-after-build. + var snapshot = new Dictionary(_entries); + return new KeyedNormalizer(_name, snapshot, _default); + } + } +} diff --git a/test/PatternKit.Tests/Messaging/Transformation/KeyedNormalizerTests.cs b/test/PatternKit.Tests/Messaging/Transformation/KeyedNormalizerTests.cs new file mode 100644 index 0000000..d64ca4b --- /dev/null +++ b/test/PatternKit.Tests/Messaging/Transformation/KeyedNormalizerTests.cs @@ -0,0 +1,154 @@ +using PatternKit.Messaging.Transformation; +using TinyBDD; + +namespace PatternKit.Tests.Messaging.Transformation; + +public sealed class KeyedNormalizerTests +{ + // ─── Happy path ─────────────────────────────────────────────────────────── + + [Scenario("NormalizeAsync RegisteredKey DispatchesToCorrectHandler")] + [Fact] + public async Task NormalizeAsync_RegisteredKey_DispatchesToCorrectHandler() + { + var normalizer = KeyedNormalizer.Create() + .When("json", (raw, _) => new ValueTask(new Order(raw, "json"))) + .When("xml", (raw, _) => new ValueTask(new Order(raw, "xml"))) + .Build(); + + var result = await normalizer.NormalizeAsync("json", "payload-1"); + + ScenarioExpect.Equal("json", result.Format); + ScenarioExpect.Equal("payload-1", result.Id); + } + + [Scenario("NormalizeAsync MultipleKeys EachRoutesCorrectly")] + [Fact] + public async Task NormalizeAsync_MultipleKeys_EachRoutesCorrectly() + { + var normalizer = KeyedNormalizer.Create() + .When("csv", (raw, _) => new ValueTask(new Order(raw, "csv"))) + .When("avro", (raw, _) => new ValueTask(new Order(raw, "avro"))) + .When("proto", (raw, _) => new ValueTask(new Order(raw, "proto"))) + .Build(); + + var csv = await normalizer.NormalizeAsync("csv", "c"); + var avro = await normalizer.NormalizeAsync("avro", "a"); + var proto = await normalizer.NormalizeAsync("proto", "p"); + + ScenarioExpect.Equal("csv", csv.Format); + ScenarioExpect.Equal("avro", avro.Format); + ScenarioExpect.Equal("proto", proto.Format); + } + + // ─── Default handler ────────────────────────────────────────────────────── + + [Scenario("NormalizeAsync UnregisteredKeyWithDefault DispatchesToDefault")] + [Fact] + public async Task NormalizeAsync_UnregisteredKeyWithDefault_DispatchesToDefault() + { + var normalizer = KeyedNormalizer.Create() + .When("json", (raw, _) => new ValueTask(new Order(raw, "json"))) + .Default((raw, _) => new ValueTask(new Order(raw, "unknown"))) + .Build(); + + var result = await normalizer.NormalizeAsync("csv", "anything"); + + ScenarioExpect.Equal("unknown", result.Format); + } + + // ─── Missing key, no default ────────────────────────────────────────────── + + [Scenario("NormalizeAsync UnregisteredKeyWithoutDefault ThrowsKeyNotFoundException")] + [Fact] + public async Task NormalizeAsync_UnregisteredKeyWithoutDefault_ThrowsKeyNotFoundException() + { + var normalizer = KeyedNormalizer.Create() + .When("json", (raw, _) => new ValueTask(new Order(raw, "json"))) + .Build(); + + await ScenarioExpect.ThrowsAsync( + () => normalizer.NormalizeAsync("xml", "data").AsTask()); + } + + // ─── Async handler completes asynchronously ─────────────────────────────── + + [Scenario("NormalizeAsync AsyncHandlerCompletesAsynchronously")] + [Fact] + public async Task NormalizeAsync_AsyncHandlerCompletesAsynchronously() + { + var normalizer = KeyedNormalizer.Create() + .When("slow", async (raw, ct) => + { + await Task.Yield(); + return new Order(raw, "slow"); + }) + .Build(); + + var result = await normalizer.NormalizeAsync("slow", "data"); + + ScenarioExpect.Equal("slow", result.Format); + } + + // ─── Cancellation propagates ────────────────────────────────────────────── + + [Scenario("NormalizeAsync CancellationPropagates")] + [Fact] + public async Task NormalizeAsync_CancellationPropagates() + { + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + var normalizer = KeyedNormalizer.Create() + .When("json", async (raw, ct) => + { + await Task.Delay(1000, ct); + return new Order(raw, "json"); + }) + .Build(); + + await ScenarioExpect.ThrowsAsync( + () => normalizer.NormalizeAsync("json", "data", cts.Token).AsTask()); + } + + // ─── Duplicate key registration ─────────────────────────────────────────── + + [Scenario("Builder DuplicateKey ThrowsArgumentException")] + [Fact] + public void Builder_DuplicateKey_ThrowsArgumentException() + { + ScenarioExpect.Throws(() => + KeyedNormalizer.Create() + .When("json", (raw, _) => new ValueTask(new Order(raw, "json"))) + .When("json", (raw, _) => new ValueTask(new Order(raw, "json2"))) + .Build()); + } + + // ─── Build semantics ────────────────────────────────────────────────────── + + [Scenario("Builder BuildCalledTwice ReturnsDistinctInstances")] + [Fact] + public void Builder_BuildCalledTwice_ReturnsDistinctInstances() + { + var builder = KeyedNormalizer.Create() + .When("json", (raw, _) => new ValueTask(new Order(raw, "json"))); + + var a = builder.Build(); + var b = builder.Build(); + + // Each call produces an independent snapshot — not the same reference. + ScenarioExpect.False(ReferenceEquals(a, b)); + } + + // ─── Builder validation ─────────────────────────────────────────────────── + + [Scenario("Builder NoHandlers ThrowsInvalidOperationException")] + [Fact] + public void Builder_NoHandlers_ThrowsInvalidOperationException() + { + ScenarioExpect.Throws(() => + KeyedNormalizer.Create().Build()); + } + + private sealed record Order(string Id, string Format); +} From 0e1ebf4434855de570e3f9d98b09dd69fa303985 Mon Sep 17 00:00:00 2001 From: JerrettDavis Date: Fri, 22 May 2026 22:28:41 -0500 Subject: [PATCH 2/4] feat(messaging-consumers): add AsyncPollingConsumer.PollOnceAsync MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Single-shot poll cycle that skips the run loop, interval, jitter, and back-off — designed for caller-driven polling (workflow steps, cron jobs, AWS Lambda, Azure Functions). Returns the raw message from the source or null on empty poll. Does not invoke any run-loop handler. 4 new test scenarios: returns item, returns null on empty, respects cancellation, confirms handler is not invoked. Co-Authored-By: Claude Opus 4.6 --- .../Consumers/AsyncPollingConsumer.cs | 21 ++++++ .../Consumers/AsyncPollingConsumerTests.cs | 65 +++++++++++++++++++ 2 files changed, 86 insertions(+) diff --git a/src/PatternKit.Core/Messaging/Consumers/AsyncPollingConsumer.cs b/src/PatternKit.Core/Messaging/Consumers/AsyncPollingConsumer.cs index 6a2a9b3..878c20f 100644 --- a/src/PatternKit.Core/Messaging/Consumers/AsyncPollingConsumer.cs +++ b/src/PatternKit.Core/Messaging/Consumers/AsyncPollingConsumer.cs @@ -111,6 +111,27 @@ public async ValueTask RunAsync( } } + /// + /// Executes a single poll cycle and returns the item received, or + /// if the source returned no message. + /// + /// + /// Unlike , this method does not enter a loop, does not sleep, + /// does not apply interval, jitter, or back-off, and does not invoke any registered + /// run-loop handler. It is intended for caller-driven polling where the caller owns + /// the loop (e.g., a workflow-framework step-based polling integration). + /// + /// Cancellation token propagated directly to the poll source. + /// + /// The message returned by the source, or when the source + /// returned an empty poll. + /// + public ValueTask?> PollOnceAsync(CancellationToken ct = default) + { + ct.ThrowIfCancellationRequested(); + return _source(MessageContext.Empty, ct); + } + private TimeSpan ComputeDelay(int consecutiveEmpty, Random rng) { TimeSpan baseDelay; diff --git a/test/PatternKit.Tests/Messaging/Consumers/AsyncPollingConsumerTests.cs b/test/PatternKit.Tests/Messaging/Consumers/AsyncPollingConsumerTests.cs index 496a922..2bb5613 100644 --- a/test/PatternKit.Tests/Messaging/Consumers/AsyncPollingConsumerTests.cs +++ b/test/PatternKit.Tests/Messaging/Consumers/AsyncPollingConsumerTests.cs @@ -167,4 +167,69 @@ public async Task RunAsync_RejectsNullHandler() await ScenarioExpect.ThrowsAsync(() => consumer.RunAsync(null!).AsTask()); } + + // ─── PollOnceAsync ──────────────────────────────────────────────────────── + + [Scenario("PollOnceAsync ReturnsItemFromSource")] + [Fact] + public async Task PollOnceAsync_ReturnsItemFromSource() + { + var consumer = AsyncPollingConsumer.Create() + .WithSource(async (_, _) => { await Task.CompletedTask; return Message.Create("hello"); }) + .Build(); + + var result = await consumer.PollOnceAsync(); + + ScenarioExpect.NotNull(result); + ScenarioExpect.Equal("hello", result!.Payload); + } + + [Scenario("PollOnceAsync ReturnsNullWhenSourceReturnsEmpty")] + [Fact] + public async Task PollOnceAsync_ReturnsNullWhenSourceReturnsEmpty() + { + var consumer = AsyncPollingConsumer.Create() + .WithSource(async (_, _) => { await Task.CompletedTask; return null; }) + .Build(); + + var result = await consumer.PollOnceAsync(); + + ScenarioExpect.Null(result); + } + + [Scenario("PollOnceAsync RespectsCancellation")] + [Fact] + public async Task PollOnceAsync_RespectsCancellation() + { + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + var consumer = AsyncPollingConsumer.Create() + .WithSource(async (_, ct) => + { + await Task.Delay(1000, ct); + return Message.Create("never"); + }) + .Build(); + + await ScenarioExpect.ThrowsAsync( + () => consumer.PollOnceAsync(cts.Token).AsTask()); + } + + [Scenario("PollOnceAsync DoesNotInvokeRunLoopHandler")] + [Fact] + public async Task PollOnceAsync_DoesNotInvokeRunLoopHandler() + { + var handlerInvoked = false; + + var consumer = AsyncPollingConsumer.Create() + .WithSource(async (_, _) => { await Task.CompletedTask; return Message.Create("msg"); }) + .Build(); + + // PollOnceAsync takes no handler argument — the run-loop handler is never called. + var result = await consumer.PollOnceAsync(); + + ScenarioExpect.False(handlerInvoked); + ScenarioExpect.NotNull(result); // the source did produce a message + } } From 6a5b56a21d3ff04c8a08c3ee42116b2c2fe1fd89 Mon Sep 17 00:00:00 2001 From: JerrettDavis Date: Fri, 22 May 2026 22:28:50 -0500 Subject: [PATCH 3/4] feat(messaging-reliability): add OutboxStoreExtensions.EnqueueObjectAsync Convenience extension on IOutboxStore for callers without compile-time payload type knowledge (e.g., generic workflow orchestrators). Accepts an untyped payload + IReadOnlyDictionary headers, projects to MessageHeaders, and delegates to EnqueueAsync. Null or empty headers map to MessageHeaders.Empty (no allocation). 3 test scenarios: with payload+headers, null headers, cancellation. Co-Authored-By: Claude Opus 4.6 --- .../Reliability/OutboxStoreExtensions.cs | 55 ++++++++++++++++++ .../Reliability/OutboxStoreExtensionsTests.cs | 57 +++++++++++++++++++ 2 files changed, 112 insertions(+) create mode 100644 src/PatternKit.Core/Messaging/Reliability/OutboxStoreExtensions.cs create mode 100644 test/PatternKit.Tests/Messaging/Reliability/OutboxStoreExtensionsTests.cs diff --git a/src/PatternKit.Core/Messaging/Reliability/OutboxStoreExtensions.cs b/src/PatternKit.Core/Messaging/Reliability/OutboxStoreExtensions.cs new file mode 100644 index 0000000..89d5827 --- /dev/null +++ b/src/PatternKit.Core/Messaging/Reliability/OutboxStoreExtensions.cs @@ -0,0 +1,55 @@ +namespace PatternKit.Messaging.Reliability; + +/// +/// Convenience extension methods for . +/// +public static class OutboxStoreExtensions +{ + /// + /// Convenience overload that accepts an untyped payload and an optional header dictionary, + /// boxes them into a Message<object>, and enqueues + /// via the typed store. + /// + /// + /// Use when the calling layer does not have compile-time knowledge of the payload type + /// (e.g., a generic workflow-step orchestrator). Projects that want this convenience + /// should consume IOutboxStore<object> + /// as their dependency. + /// When is , the message is enqueued + /// with (no allocation). + /// + /// The outbox store to enqueue into. + /// The untyped payload to enqueue. + /// + /// Optional string-to-string headers. Pass to use empty headers. + /// + /// Cancellation token. + /// The stored OutboxMessage<object>. + public static ValueTask> EnqueueObjectAsync( + this IOutboxStore store, + object payload, + IReadOnlyDictionary? headers, + CancellationToken ct = default) + { + if (store is null) + throw new ArgumentNullException(nameof(store)); + if (payload is null) + throw new ArgumentNullException(nameof(payload)); + + MessageHeaders messageHeaders; + if (headers is null || headers.Count == 0) + { + messageHeaders = MessageHeaders.Empty; + } + else + { + // Build from the IReadOnlyDictionary by projecting to the + // object? value shape that MessageHeaders accepts. + messageHeaders = new MessageHeaders( + headers.Select(kv => new KeyValuePair(kv.Key, kv.Value))); + } + + var message = new Message(payload, messageHeaders); + return store.EnqueueAsync(message, cancellationToken: ct); + } +} diff --git a/test/PatternKit.Tests/Messaging/Reliability/OutboxStoreExtensionsTests.cs b/test/PatternKit.Tests/Messaging/Reliability/OutboxStoreExtensionsTests.cs new file mode 100644 index 0000000..84dae4b --- /dev/null +++ b/test/PatternKit.Tests/Messaging/Reliability/OutboxStoreExtensionsTests.cs @@ -0,0 +1,57 @@ +using PatternKit.Messaging; +using PatternKit.Messaging.Reliability; +using TinyBDD; + +namespace PatternKit.Tests.Messaging.Reliability; + +public sealed class OutboxStoreExtensionsTests +{ + [Scenario("EnqueueObjectAsync WithPayloadAndHeaders StoresCorrectly")] + [Fact] + public async Task EnqueueObjectAsync_WithPayloadAndHeaders_StoresCorrectly() + { + var store = new InMemoryOutboxStore(); + var payload = new { Type = "OrderCreated", OrderId = 42 }; + var headers = new Dictionary + { + ["content-type"] = "application/json", + ["source"] = "order-service", + }; + + var record = await store.EnqueueObjectAsync(payload, headers); + + ScenarioExpect.NotNull(record); + ScenarioExpect.False(record.Dispatched); + ScenarioExpect.Equal(1, store.Records.Count); + ScenarioExpect.Equal(payload, record.Message.Payload); + ScenarioExpect.Equal("application/json", record.Message.Headers.GetString("content-type")); + ScenarioExpect.Equal("order-service", record.Message.Headers.GetString("source")); + } + + [Scenario("EnqueueObjectAsync WithNullHeaders StoresEmptyHeadersMessage")] + [Fact] + public async Task EnqueueObjectAsync_WithNullHeaders_StoresEmptyHeadersMessage() + { + var store = new InMemoryOutboxStore(); + var payload = "plain-string-payload"; + + var record = await store.EnqueueObjectAsync(payload, headers: null); + + ScenarioExpect.NotNull(record); + ScenarioExpect.Equal(payload, record.Message.Payload); + ScenarioExpect.Equal(0, record.Message.Headers.Count); + } + + [Scenario("EnqueueObjectAsync RespectsCancellation")] + [Fact] + public async Task EnqueueObjectAsync_RespectsCancellation() + { + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + var store = new InMemoryOutboxStore(); + + await ScenarioExpect.ThrowsAsync( + () => store.EnqueueObjectAsync("payload", null, cts.Token).AsTask()); + } +} From 71c1e4a662df85fd5770ca3d665a1faa4cd7a8a1 Mon Sep 17 00:00:00 2001 From: JerrettDavis Date: Fri, 22 May 2026 22:41:31 -0500 Subject: [PATCH 4/4] fix(review): address Copilot review comments on PR #335 - PollOnceAsync: add optional context parameter so callers can supply MessageContext instead of always using Empty; cancellation flows via the explicit ct parameter as before - PollOnceAsync test DoesNotInvokeRunLoopHandler: rewrite to count source invocations (observable) instead of checking an unused flag - OutboxStoreExtensionsTests: assert ReferenceEquals(MessageHeaders.Empty) for null headers; add empty-dictionary scenario to verify same guarantee Co-Authored-By: Claude Opus 4.6 --- .../Consumers/AsyncPollingConsumer.cs | 16 +++++++++----- .../Consumers/AsyncPollingConsumerTests.cs | 21 +++++++++++++------ .../Reliability/OutboxStoreExtensionsTests.cs | 17 ++++++++++++++- 3 files changed, 42 insertions(+), 12 deletions(-) diff --git a/src/PatternKit.Core/Messaging/Consumers/AsyncPollingConsumer.cs b/src/PatternKit.Core/Messaging/Consumers/AsyncPollingConsumer.cs index 878c20f..b433da4 100644 --- a/src/PatternKit.Core/Messaging/Consumers/AsyncPollingConsumer.cs +++ b/src/PatternKit.Core/Messaging/Consumers/AsyncPollingConsumer.cs @@ -117,19 +117,25 @@ public async ValueTask RunAsync( /// /// /// Unlike , this method does not enter a loop, does not sleep, - /// does not apply interval, jitter, or back-off, and does not invoke any registered - /// run-loop handler. It is intended for caller-driven polling where the caller owns - /// the loop (e.g., a workflow-framework step-based polling integration). + /// does not apply interval, jitter, or back-off, and does not invoke any run-loop handler. + /// It is intended for caller-driven polling where the caller owns the loop (e.g., a + /// workflow-framework step-based polling integration). + /// The parameter is optional; when omitted, + /// is used. /// + /// + /// Optional message context forwarded to the poll source. When , + /// is used. + /// /// Cancellation token propagated directly to the poll source. /// /// The message returned by the source, or when the source /// returned an empty poll. /// - public ValueTask?> PollOnceAsync(CancellationToken ct = default) + public ValueTask?> PollOnceAsync(MessageContext? context = null, CancellationToken ct = default) { ct.ThrowIfCancellationRequested(); - return _source(MessageContext.Empty, ct); + return _source(context ?? MessageContext.Empty, ct); } private TimeSpan ComputeDelay(int consecutiveEmpty, Random rng) diff --git a/test/PatternKit.Tests/Messaging/Consumers/AsyncPollingConsumerTests.cs b/test/PatternKit.Tests/Messaging/Consumers/AsyncPollingConsumerTests.cs index 2bb5613..459129d 100644 --- a/test/PatternKit.Tests/Messaging/Consumers/AsyncPollingConsumerTests.cs +++ b/test/PatternKit.Tests/Messaging/Consumers/AsyncPollingConsumerTests.cs @@ -213,23 +213,32 @@ public async Task PollOnceAsync_RespectsCancellation() .Build(); await ScenarioExpect.ThrowsAsync( - () => consumer.PollOnceAsync(cts.Token).AsTask()); + () => consumer.PollOnceAsync(ct: cts.Token).AsTask()); } [Scenario("PollOnceAsync DoesNotInvokeRunLoopHandler")] [Fact] public async Task PollOnceAsync_DoesNotInvokeRunLoopHandler() { - var handlerInvoked = false; + // PollOnceAsync has no handler parameter — the only side-effect observable is + // that the source is called exactly once and the raw message is returned. + // We verify this by counting source invocations and confirming the value is + // returned directly without any additional callback layer. + var sourceCallCount = 0; var consumer = AsyncPollingConsumer.Create() - .WithSource(async (_, _) => { await Task.CompletedTask; return Message.Create("msg"); }) + .WithSource(async (_, _) => + { + Interlocked.Increment(ref sourceCallCount); + await Task.CompletedTask; + return Message.Create("msg"); + }) .Build(); - // PollOnceAsync takes no handler argument — the run-loop handler is never called. var result = await consumer.PollOnceAsync(); - ScenarioExpect.False(handlerInvoked); - ScenarioExpect.NotNull(result); // the source did produce a message + ScenarioExpect.Equal(1, sourceCallCount); // source called exactly once + ScenarioExpect.NotNull(result); // message returned directly to caller + ScenarioExpect.Equal("msg", result!.Payload); // no handler mutation } } diff --git a/test/PatternKit.Tests/Messaging/Reliability/OutboxStoreExtensionsTests.cs b/test/PatternKit.Tests/Messaging/Reliability/OutboxStoreExtensionsTests.cs index 84dae4b..62afdad 100644 --- a/test/PatternKit.Tests/Messaging/Reliability/OutboxStoreExtensionsTests.cs +++ b/test/PatternKit.Tests/Messaging/Reliability/OutboxStoreExtensionsTests.cs @@ -39,7 +39,22 @@ public async Task EnqueueObjectAsync_WithNullHeaders_StoresEmptyHeadersMessage() ScenarioExpect.NotNull(record); ScenarioExpect.Equal(payload, record.Message.Payload); - ScenarioExpect.Equal(0, record.Message.Headers.Count); + // Null headers must reuse MessageHeaders.Empty — not allocate a new empty instance. + ScenarioExpect.True(ReferenceEquals(MessageHeaders.Empty, record.Message.Headers)); + } + + [Scenario("EnqueueObjectAsync WithEmptyDictionary StoresEmptyHeadersMessage")] + [Fact] + public async Task EnqueueObjectAsync_WithEmptyDictionary_StoresEmptyHeadersMessage() + { + var store = new InMemoryOutboxStore(); + var payload = 42; + + var record = await store.EnqueueObjectAsync(payload, headers: new Dictionary()); + + ScenarioExpect.NotNull(record); + // An empty (non-null) dictionary must also map to MessageHeaders.Empty. + ScenarioExpect.True(ReferenceEquals(MessageHeaders.Empty, record.Message.Headers)); } [Scenario("EnqueueObjectAsync RespectsCancellation")]