diff --git a/src/PatternKit.Core/Messaging/Consumers/AsyncPollingConsumer.cs b/src/PatternKit.Core/Messaging/Consumers/AsyncPollingConsumer.cs
index 6a2a9b3b..b433da44 100644
--- a/src/PatternKit.Core/Messaging/Consumers/AsyncPollingConsumer.cs
+++ b/src/PatternKit.Core/Messaging/Consumers/AsyncPollingConsumer.cs
@@ -111,6 +111,33 @@ public async ValueTask RunAsync(
}
}
+ ///
+ /// Executes a single poll cycle and returns the item received, or
+ /// if the source returned no message.
+ ///
+ ///
+ /// Unlike , this method does not enter a loop, does not sleep,
+ /// does not apply interval, jitter, or back-off, and does not invoke any run-loop handler.
+ /// It is intended for caller-driven polling where the caller owns the loop (e.g., a
+ /// workflow-framework step-based polling integration).
+ /// The parameter is optional; when omitted,
+ /// is used.
+ ///
+ ///
+ /// Optional message context forwarded to the poll source. When ,
+ /// is used.
+ ///
+ /// Cancellation token propagated directly to the poll source.
+ ///
+ /// The message returned by the source, or when the source
+ /// returned an empty poll.
+ ///
+ public ValueTask?> PollOnceAsync(MessageContext? context = null, CancellationToken ct = default)
+ {
+ ct.ThrowIfCancellationRequested();
+ return _source(context ?? MessageContext.Empty, ct);
+ }
+
private TimeSpan ComputeDelay(int consecutiveEmpty, Random rng)
{
TimeSpan baseDelay;
diff --git a/src/PatternKit.Core/Messaging/Reliability/OutboxStoreExtensions.cs b/src/PatternKit.Core/Messaging/Reliability/OutboxStoreExtensions.cs
new file mode 100644
index 00000000..89d58279
--- /dev/null
+++ b/src/PatternKit.Core/Messaging/Reliability/OutboxStoreExtensions.cs
@@ -0,0 +1,55 @@
+namespace PatternKit.Messaging.Reliability;
+
+///
+/// Convenience extension methods for .
+///
+public static class OutboxStoreExtensions
+{
+ ///
+ /// Convenience overload that accepts an untyped payload and an optional header dictionary,
+ /// boxes them into a Message<object> , and enqueues
+ /// via the typed store.
+ ///
+ ///
+ /// Use when the calling layer does not have compile-time knowledge of the payload type
+ /// (e.g., a generic workflow-step orchestrator). Projects that want this convenience
+ /// should consume IOutboxStore<object>
+ /// as their dependency.
+ /// When is , the message is enqueued
+ /// with (no allocation).
+ ///
+ /// The outbox store to enqueue into.
+ /// The untyped payload to enqueue.
+ ///
+ /// Optional string-to-string headers. Pass to use empty headers.
+ ///
+ /// Cancellation token.
+ /// The stored OutboxMessage<object> .
+ public static ValueTask> EnqueueObjectAsync(
+ this IOutboxStore store,
+ object payload,
+ IReadOnlyDictionary? headers,
+ CancellationToken ct = default)
+ {
+ if (store is null)
+ throw new ArgumentNullException(nameof(store));
+ if (payload is null)
+ throw new ArgumentNullException(nameof(payload));
+
+ MessageHeaders messageHeaders;
+ if (headers is null || headers.Count == 0)
+ {
+ messageHeaders = MessageHeaders.Empty;
+ }
+ else
+ {
+ // Build from the IReadOnlyDictionary by projecting to the
+ // object? value shape that MessageHeaders accepts.
+ messageHeaders = new MessageHeaders(
+ headers.Select(kv => new KeyValuePair(kv.Key, kv.Value)));
+ }
+
+ var message = new Message(payload, messageHeaders);
+ return store.EnqueueAsync(message, cancellationToken: ct);
+ }
+}
diff --git a/src/PatternKit.Core/Messaging/Transformation/KeyedNormalizer.cs b/src/PatternKit.Core/Messaging/Transformation/KeyedNormalizer.cs
new file mode 100644
index 00000000..deb5c524
--- /dev/null
+++ b/src/PatternKit.Core/Messaging/Transformation/KeyedNormalizer.cs
@@ -0,0 +1,116 @@
+namespace PatternKit.Messaging.Transformation;
+
+///
+/// Key-based dispatcher that normalizes a raw value into a canonical form using a dictionary lookup.
+/// Unlike (predicate-first), dispatch here is O(1) keyed
+/// lookup — useful for format-tagged messages where the discriminator is known at call time
+/// (e.g., content-type strings, routing keys, source system identifiers).
+///
+/// The key type used to select the handler. Must be non-nullable.
+/// The incoming raw type (e.g., , ).
+/// The target canonical type produced after normalization.
+public sealed class KeyedNormalizer
+ where TKey : notnull
+{
+ /// Async normalizer handler for a specific key.
+ public delegate ValueTask AsyncNormalizerHandler(TRaw raw, CancellationToken cancellationToken);
+
+ private readonly string _name;
+ private readonly Dictionary _handlers;
+ private readonly AsyncNormalizerHandler? _default;
+
+ private KeyedNormalizer(
+ string name,
+ Dictionary handlers,
+ AsyncNormalizerHandler? @default)
+ {
+ _name = name;
+ _handlers = handlers;
+ _default = @default;
+ }
+
+ /// Creates a new keyed normalizer builder.
+ public static Builder Create(string name = "keyed-normalizer") => new(name);
+
+ ///
+ /// Normalizes by dispatching to the handler registered for
+ /// . Falls back to the default handler when no key match is found.
+ ///
+ ///
+ /// Thrown when has no registered handler and no default handler
+ /// was configured.
+ ///
+ public ValueTask NormalizeAsync(TKey key, TRaw raw, CancellationToken ct = default)
+ {
+ ct.ThrowIfCancellationRequested();
+
+ if (_handlers.TryGetValue(key, out var handler))
+ return handler(raw, ct);
+
+ if (_default is not null)
+ return _default(raw, ct);
+
+ throw new KeyNotFoundException(
+ $"No format handler matched the raw input for normalizer '{_name}'.");
+ }
+
+ /// Fluent builder for .
+ public sealed class Builder
+ {
+ private readonly string _name;
+ private readonly Dictionary _entries;
+ private AsyncNormalizerHandler? _default;
+
+ internal Builder(string name)
+ {
+ if (string.IsNullOrWhiteSpace(name))
+ throw new ArgumentException("Keyed normalizer name cannot be null, empty, or whitespace.", nameof(name));
+
+ _name = name;
+ _entries = new Dictionary();
+ }
+
+ ///
+ /// Registers a handler for . Throws
+ /// when the same key is registered more than once —
+ /// duplicate registrations are almost always a configuration mistake.
+ ///
+ ///
+ /// Thrown when is already registered.
+ ///
+ public Builder When(TKey key, Func> handler)
+ {
+ if (key is null)
+ throw new ArgumentNullException(nameof(key));
+ if (handler is null)
+ throw new ArgumentNullException(nameof(handler));
+
+ if (_entries.ContainsKey(key))
+ throw new ArgumentException(
+ $"A handler for key '{key}' is already registered. Each key may only have one handler.",
+ nameof(key));
+
+ _entries[key] = (raw, ct) => handler(raw, ct);
+ return this;
+ }
+
+ /// Sets the default handler used when no key matches.
+ public Builder Default(Func> handler)
+ {
+ _default = (raw, ct) => (handler ?? throw new ArgumentNullException(nameof(handler)))(raw, ct);
+ return this;
+ }
+
+ /// Builds an immutable keyed normalizer.
+ public KeyedNormalizer Build()
+ {
+ if (_entries.Count == 0 && _default is null)
+ throw new InvalidOperationException(
+ "KeyedNormalizer requires at least one key handler or a default handler.");
+
+ // Snapshot the dictionary so the built instance is immutable-after-build.
+ var snapshot = new Dictionary(_entries);
+ return new KeyedNormalizer(_name, snapshot, _default);
+ }
+ }
+}
diff --git a/test/PatternKit.Tests/Messaging/Consumers/AsyncPollingConsumerTests.cs b/test/PatternKit.Tests/Messaging/Consumers/AsyncPollingConsumerTests.cs
index 496a922e..459129d5 100644
--- a/test/PatternKit.Tests/Messaging/Consumers/AsyncPollingConsumerTests.cs
+++ b/test/PatternKit.Tests/Messaging/Consumers/AsyncPollingConsumerTests.cs
@@ -167,4 +167,78 @@ public async Task RunAsync_RejectsNullHandler()
await ScenarioExpect.ThrowsAsync(() => consumer.RunAsync(null!).AsTask());
}
+
+ // ─── PollOnceAsync ────────────────────────────────────────────────────────
+
+ [Scenario("PollOnceAsync ReturnsItemFromSource")]
+ [Fact]
+ public async Task PollOnceAsync_ReturnsItemFromSource()
+ {
+ var consumer = AsyncPollingConsumer.Create()
+ .WithSource(async (_, _) => { await Task.CompletedTask; return Message.Create("hello"); })
+ .Build();
+
+ var result = await consumer.PollOnceAsync();
+
+ ScenarioExpect.NotNull(result);
+ ScenarioExpect.Equal("hello", result!.Payload);
+ }
+
+ [Scenario("PollOnceAsync ReturnsNullWhenSourceReturnsEmpty")]
+ [Fact]
+ public async Task PollOnceAsync_ReturnsNullWhenSourceReturnsEmpty()
+ {
+ var consumer = AsyncPollingConsumer.Create()
+ .WithSource(async (_, _) => { await Task.CompletedTask; return null; })
+ .Build();
+
+ var result = await consumer.PollOnceAsync();
+
+ ScenarioExpect.Null(result);
+ }
+
+ [Scenario("PollOnceAsync RespectsCancellation")]
+ [Fact]
+ public async Task PollOnceAsync_RespectsCancellation()
+ {
+ using var cts = new CancellationTokenSource();
+ cts.Cancel();
+
+ var consumer = AsyncPollingConsumer.Create()
+ .WithSource(async (_, ct) =>
+ {
+ await Task.Delay(1000, ct);
+ return Message.Create("never");
+ })
+ .Build();
+
+ await ScenarioExpect.ThrowsAsync(
+ () => consumer.PollOnceAsync(ct: cts.Token).AsTask());
+ }
+
+ [Scenario("PollOnceAsync DoesNotInvokeRunLoopHandler")]
+ [Fact]
+ public async Task PollOnceAsync_DoesNotInvokeRunLoopHandler()
+ {
+ // PollOnceAsync has no handler parameter — the only side-effect observable is
+ // that the source is called exactly once and the raw message is returned.
+ // We verify this by counting source invocations and confirming the value is
+ // returned directly without any additional callback layer.
+ var sourceCallCount = 0;
+
+ var consumer = AsyncPollingConsumer.Create()
+ .WithSource(async (_, _) =>
+ {
+ Interlocked.Increment(ref sourceCallCount);
+ await Task.CompletedTask;
+ return Message.Create("msg");
+ })
+ .Build();
+
+ var result = await consumer.PollOnceAsync();
+
+ ScenarioExpect.Equal(1, sourceCallCount); // source called exactly once
+ ScenarioExpect.NotNull(result); // message returned directly to caller
+ ScenarioExpect.Equal("msg", result!.Payload); // no handler mutation
+ }
}
diff --git a/test/PatternKit.Tests/Messaging/Reliability/OutboxStoreExtensionsTests.cs b/test/PatternKit.Tests/Messaging/Reliability/OutboxStoreExtensionsTests.cs
new file mode 100644
index 00000000..62afdadf
--- /dev/null
+++ b/test/PatternKit.Tests/Messaging/Reliability/OutboxStoreExtensionsTests.cs
@@ -0,0 +1,72 @@
+using PatternKit.Messaging;
+using PatternKit.Messaging.Reliability;
+using TinyBDD;
+
+namespace PatternKit.Tests.Messaging.Reliability;
+
+public sealed class OutboxStoreExtensionsTests
+{
+ [Scenario("EnqueueObjectAsync WithPayloadAndHeaders StoresCorrectly")]
+ [Fact]
+ public async Task EnqueueObjectAsync_WithPayloadAndHeaders_StoresCorrectly()
+ {
+ var store = new InMemoryOutboxStore();
+ var payload = new { Type = "OrderCreated", OrderId = 42 };
+ var headers = new Dictionary
+ {
+ ["content-type"] = "application/json",
+ ["source"] = "order-service",
+ };
+
+ var record = await store.EnqueueObjectAsync(payload, headers);
+
+ ScenarioExpect.NotNull(record);
+ ScenarioExpect.False(record.Dispatched);
+ ScenarioExpect.Equal(1, store.Records.Count);
+ ScenarioExpect.Equal(payload, record.Message.Payload);
+ ScenarioExpect.Equal("application/json", record.Message.Headers.GetString("content-type"));
+ ScenarioExpect.Equal("order-service", record.Message.Headers.GetString("source"));
+ }
+
+ [Scenario("EnqueueObjectAsync WithNullHeaders StoresEmptyHeadersMessage")]
+ [Fact]
+ public async Task EnqueueObjectAsync_WithNullHeaders_StoresEmptyHeadersMessage()
+ {
+ var store = new InMemoryOutboxStore();
+ var payload = "plain-string-payload";
+
+ var record = await store.EnqueueObjectAsync(payload, headers: null);
+
+ ScenarioExpect.NotNull(record);
+ ScenarioExpect.Equal(payload, record.Message.Payload);
+ // Null headers must reuse MessageHeaders.Empty — not allocate a new empty instance.
+ ScenarioExpect.True(ReferenceEquals(MessageHeaders.Empty, record.Message.Headers));
+ }
+
+ [Scenario("EnqueueObjectAsync WithEmptyDictionary StoresEmptyHeadersMessage")]
+ [Fact]
+ public async Task EnqueueObjectAsync_WithEmptyDictionary_StoresEmptyHeadersMessage()
+ {
+ var store = new InMemoryOutboxStore();
+ var payload = 42;
+
+ var record = await store.EnqueueObjectAsync(payload, headers: new Dictionary());
+
+ ScenarioExpect.NotNull(record);
+ // An empty (non-null) dictionary must also map to MessageHeaders.Empty.
+ ScenarioExpect.True(ReferenceEquals(MessageHeaders.Empty, record.Message.Headers));
+ }
+
+ [Scenario("EnqueueObjectAsync RespectsCancellation")]
+ [Fact]
+ public async Task EnqueueObjectAsync_RespectsCancellation()
+ {
+ using var cts = new CancellationTokenSource();
+ cts.Cancel();
+
+ var store = new InMemoryOutboxStore();
+
+ await ScenarioExpect.ThrowsAsync(
+ () => store.EnqueueObjectAsync("payload", null, cts.Token).AsTask());
+ }
+}
diff --git a/test/PatternKit.Tests/Messaging/Transformation/KeyedNormalizerTests.cs b/test/PatternKit.Tests/Messaging/Transformation/KeyedNormalizerTests.cs
new file mode 100644
index 00000000..d64ca4b2
--- /dev/null
+++ b/test/PatternKit.Tests/Messaging/Transformation/KeyedNormalizerTests.cs
@@ -0,0 +1,154 @@
+using PatternKit.Messaging.Transformation;
+using TinyBDD;
+
+namespace PatternKit.Tests.Messaging.Transformation;
+
+public sealed class KeyedNormalizerTests
+{
+ // ─── Happy path ───────────────────────────────────────────────────────────
+
+ [Scenario("NormalizeAsync RegisteredKey DispatchesToCorrectHandler")]
+ [Fact]
+ public async Task NormalizeAsync_RegisteredKey_DispatchesToCorrectHandler()
+ {
+ var normalizer = KeyedNormalizer.Create()
+ .When("json", (raw, _) => new ValueTask(new Order(raw, "json")))
+ .When("xml", (raw, _) => new ValueTask(new Order(raw, "xml")))
+ .Build();
+
+ var result = await normalizer.NormalizeAsync("json", "payload-1");
+
+ ScenarioExpect.Equal("json", result.Format);
+ ScenarioExpect.Equal("payload-1", result.Id);
+ }
+
+ [Scenario("NormalizeAsync MultipleKeys EachRoutesCorrectly")]
+ [Fact]
+ public async Task NormalizeAsync_MultipleKeys_EachRoutesCorrectly()
+ {
+ var normalizer = KeyedNormalizer.Create()
+ .When("csv", (raw, _) => new ValueTask(new Order(raw, "csv")))
+ .When("avro", (raw, _) => new ValueTask(new Order(raw, "avro")))
+ .When("proto", (raw, _) => new ValueTask(new Order(raw, "proto")))
+ .Build();
+
+ var csv = await normalizer.NormalizeAsync("csv", "c");
+ var avro = await normalizer.NormalizeAsync("avro", "a");
+ var proto = await normalizer.NormalizeAsync("proto", "p");
+
+ ScenarioExpect.Equal("csv", csv.Format);
+ ScenarioExpect.Equal("avro", avro.Format);
+ ScenarioExpect.Equal("proto", proto.Format);
+ }
+
+ // ─── Default handler ──────────────────────────────────────────────────────
+
+ [Scenario("NormalizeAsync UnregisteredKeyWithDefault DispatchesToDefault")]
+ [Fact]
+ public async Task NormalizeAsync_UnregisteredKeyWithDefault_DispatchesToDefault()
+ {
+ var normalizer = KeyedNormalizer.Create()
+ .When("json", (raw, _) => new ValueTask(new Order(raw, "json")))
+ .Default((raw, _) => new ValueTask(new Order(raw, "unknown")))
+ .Build();
+
+ var result = await normalizer.NormalizeAsync("csv", "anything");
+
+ ScenarioExpect.Equal("unknown", result.Format);
+ }
+
+ // ─── Missing key, no default ──────────────────────────────────────────────
+
+ [Scenario("NormalizeAsync UnregisteredKeyWithoutDefault ThrowsKeyNotFoundException")]
+ [Fact]
+ public async Task NormalizeAsync_UnregisteredKeyWithoutDefault_ThrowsKeyNotFoundException()
+ {
+ var normalizer = KeyedNormalizer.Create()
+ .When("json", (raw, _) => new ValueTask(new Order(raw, "json")))
+ .Build();
+
+ await ScenarioExpect.ThrowsAsync(
+ () => normalizer.NormalizeAsync("xml", "data").AsTask());
+ }
+
+ // ─── Async handler completes asynchronously ───────────────────────────────
+
+ [Scenario("NormalizeAsync AsyncHandlerCompletesAsynchronously")]
+ [Fact]
+ public async Task NormalizeAsync_AsyncHandlerCompletesAsynchronously()
+ {
+ var normalizer = KeyedNormalizer.Create()
+ .When("slow", async (raw, ct) =>
+ {
+ await Task.Yield();
+ return new Order(raw, "slow");
+ })
+ .Build();
+
+ var result = await normalizer.NormalizeAsync("slow", "data");
+
+ ScenarioExpect.Equal("slow", result.Format);
+ }
+
+ // ─── Cancellation propagates ──────────────────────────────────────────────
+
+ [Scenario("NormalizeAsync CancellationPropagates")]
+ [Fact]
+ public async Task NormalizeAsync_CancellationPropagates()
+ {
+ using var cts = new CancellationTokenSource();
+ cts.Cancel();
+
+ var normalizer = KeyedNormalizer.Create()
+ .When("json", async (raw, ct) =>
+ {
+ await Task.Delay(1000, ct);
+ return new Order(raw, "json");
+ })
+ .Build();
+
+ await ScenarioExpect.ThrowsAsync(
+ () => normalizer.NormalizeAsync("json", "data", cts.Token).AsTask());
+ }
+
+ // ─── Duplicate key registration ───────────────────────────────────────────
+
+ [Scenario("Builder DuplicateKey ThrowsArgumentException")]
+ [Fact]
+ public void Builder_DuplicateKey_ThrowsArgumentException()
+ {
+ ScenarioExpect.Throws(() =>
+ KeyedNormalizer.Create()
+ .When("json", (raw, _) => new ValueTask(new Order(raw, "json")))
+ .When("json", (raw, _) => new ValueTask(new Order(raw, "json2")))
+ .Build());
+ }
+
+ // ─── Build semantics ──────────────────────────────────────────────────────
+
+ [Scenario("Builder BuildCalledTwice ReturnsDistinctInstances")]
+ [Fact]
+ public void Builder_BuildCalledTwice_ReturnsDistinctInstances()
+ {
+ var builder = KeyedNormalizer.Create()
+ .When("json", (raw, _) => new ValueTask(new Order(raw, "json")));
+
+ var a = builder.Build();
+ var b = builder.Build();
+
+ // Each call produces an independent snapshot — not the same reference.
+ ScenarioExpect.False(ReferenceEquals(a, b));
+ }
+
+ // ─── Builder validation ───────────────────────────────────────────────────
+
+ [Scenario("Builder NoHandlers ThrowsInvalidOperationException")]
+ [Fact]
+ public void Builder_NoHandlers_ThrowsInvalidOperationException()
+ {
+ ScenarioExpect.Throws(() =>
+ KeyedNormalizer.Create().Build());
+ }
+
+ private sealed record Order(string Id, string Format);
+}