Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/PatternKit.Core/Messaging/Consumers/AsyncPollingConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,33 @@ public async ValueTask RunAsync(
}
}

/// <summary>
/// Executes a single poll cycle and returns the item received, or <see langword="null"/>
/// if the source returned no message.
/// </summary>
/// <remarks>
/// Unlike <see cref="RunAsync"/>, this method does not enter a loop, does not sleep,
/// does not apply interval, jitter, or back-off, and does not invoke any run-loop handler.
/// It is intended for caller-driven polling where the caller owns the loop (e.g., a
/// workflow-framework step-based polling integration).
/// The <paramref name="context"/> parameter is optional; when omitted,
/// <see cref="MessageContext.Empty"/> is used.
/// </remarks>
/// <param name="context">
/// Optional message context forwarded to the poll source. When <see langword="null"/>,
/// <see cref="MessageContext.Empty"/> is used.
/// </param>
/// <param name="ct">Cancellation token propagated directly to the poll source.</param>
/// <returns>
/// The message returned by the source, or <see langword="null"/> when the source
/// returned an empty poll.
/// </returns>
public ValueTask<Message<TPayload>?> PollOnceAsync(MessageContext? context = null, CancellationToken ct = default)
{
ct.ThrowIfCancellationRequested();
return _source(context ?? MessageContext.Empty, ct);
}

private TimeSpan ComputeDelay(int consecutiveEmpty, Random rng)
{
TimeSpan baseDelay;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
namespace PatternKit.Messaging.Reliability;

/// <summary>
/// Convenience extension methods for <see cref="IOutboxStore{TPayload}"/>.
/// </summary>
public static class OutboxStoreExtensions
{
/// <summary>
/// Convenience overload that accepts an untyped payload and an optional header dictionary,
/// boxes them into a <see cref="Message{TPayload}">Message&lt;object&gt;</see>, and enqueues
/// via the typed store.
/// </summary>
/// <remarks>
/// Use when the calling layer does not have compile-time knowledge of the payload type
/// (e.g., a generic workflow-step orchestrator). Projects that want this convenience
/// should consume <see cref="IOutboxStore{TPayload}">IOutboxStore&lt;object&gt;</see>
/// as their dependency.
/// When <paramref name="headers"/> is <see langword="null"/>, the message is enqueued
/// with <see cref="MessageHeaders.Empty"/> (no allocation).
/// </remarks>
/// <param name="store">The outbox store to enqueue into.</param>
/// <param name="payload">The untyped payload to enqueue.</param>
/// <param name="headers">
/// Optional string-to-string headers. Pass <see langword="null"/> to use empty headers.
/// </param>
/// <param name="ct">Cancellation token.</param>
/// <returns>The stored <see cref="OutboxMessage{TPayload}">OutboxMessage&lt;object&gt;</see>.</returns>
public static ValueTask<OutboxMessage<object>> EnqueueObjectAsync(
this IOutboxStore<object> store,
object payload,
IReadOnlyDictionary<string, string>? headers,
CancellationToken ct = default)
{
if (store is null)
throw new ArgumentNullException(nameof(store));
if (payload is null)
throw new ArgumentNullException(nameof(payload));

MessageHeaders messageHeaders;
if (headers is null || headers.Count == 0)
{
messageHeaders = MessageHeaders.Empty;
}
else
{
// Build from the IReadOnlyDictionary<string,string> by projecting to the
// object? value shape that MessageHeaders accepts.
messageHeaders = new MessageHeaders(
headers.Select(kv => new KeyValuePair<string, object?>(kv.Key, kv.Value)));
}

var message = new Message<object>(payload, messageHeaders);
return store.EnqueueAsync(message, cancellationToken: ct);
}
}
116 changes: 116 additions & 0 deletions src/PatternKit.Core/Messaging/Transformation/KeyedNormalizer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
namespace PatternKit.Messaging.Transformation;

/// <summary>
/// Key-based dispatcher that normalizes a raw value into a canonical form using a dictionary lookup.
/// Unlike <see cref="Normalizer{TRaw,TCanonical}"/> (predicate-first), dispatch here is O(1) keyed
/// lookup — useful for format-tagged messages where the discriminator is known at call time
/// (e.g., content-type strings, routing keys, source system identifiers).
/// </summary>
/// <typeparam name="TKey">The key type used to select the handler. Must be non-nullable.</typeparam>
/// <typeparam name="TRaw">The incoming raw type (e.g., <see langword="string"/>, <see langword="byte[]"/>).</typeparam>
/// <typeparam name="TCanonical">The target canonical type produced after normalization.</typeparam>
public sealed class KeyedNormalizer<TKey, TRaw, TCanonical>
where TKey : notnull
{
/// <summary>Async normalizer handler for a specific key.</summary>
public delegate ValueTask<TCanonical> AsyncNormalizerHandler(TRaw raw, CancellationToken cancellationToken);

private readonly string _name;
private readonly Dictionary<TKey, AsyncNormalizerHandler> _handlers;
private readonly AsyncNormalizerHandler? _default;

private KeyedNormalizer(
string name,
Dictionary<TKey, AsyncNormalizerHandler> handlers,
AsyncNormalizerHandler? @default)
{
_name = name;
_handlers = handlers;
_default = @default;
}

/// <summary>Creates a new keyed normalizer builder.</summary>
public static Builder Create(string name = "keyed-normalizer") => new(name);

/// <summary>
/// Normalizes <paramref name="raw"/> by dispatching to the handler registered for
/// <paramref name="key"/>. Falls back to the default handler when no key match is found.
/// </summary>
/// <exception cref="KeyNotFoundException">
/// Thrown when <paramref name="key"/> has no registered handler and no default handler
/// was configured.
/// </exception>
public ValueTask<TCanonical> NormalizeAsync(TKey key, TRaw raw, CancellationToken ct = default)
{
ct.ThrowIfCancellationRequested();

if (_handlers.TryGetValue(key, out var handler))
return handler(raw, ct);

if (_default is not null)
return _default(raw, ct);

throw new KeyNotFoundException(
$"No format handler matched the raw input for normalizer '{_name}'.");
}

/// <summary>Fluent builder for <see cref="KeyedNormalizer{TKey,TRaw,TCanonical}"/>.</summary>
public sealed class Builder
{
private readonly string _name;
private readonly Dictionary<TKey, AsyncNormalizerHandler> _entries;
private AsyncNormalizerHandler? _default;

internal Builder(string name)
{
if (string.IsNullOrWhiteSpace(name))
throw new ArgumentException("Keyed normalizer name cannot be null, empty, or whitespace.", nameof(name));

_name = name;
_entries = new Dictionary<TKey, AsyncNormalizerHandler>();
}

/// <summary>
/// Registers a handler for <paramref name="key"/>. Throws
/// <see cref="ArgumentException"/> when the same key is registered more than once —
/// duplicate registrations are almost always a configuration mistake.
/// </summary>
/// <exception cref="ArgumentException">
/// Thrown when <paramref name="key"/> is already registered.
/// </exception>
public Builder When(TKey key, Func<TRaw, CancellationToken, ValueTask<TCanonical>> handler)
{
if (key is null)
throw new ArgumentNullException(nameof(key));
if (handler is null)
throw new ArgumentNullException(nameof(handler));

if (_entries.ContainsKey(key))
throw new ArgumentException(
$"A handler for key '{key}' is already registered. Each key may only have one handler.",
nameof(key));

_entries[key] = (raw, ct) => handler(raw, ct);
return this;
}

/// <summary>Sets the default handler used when no key matches.</summary>
public Builder Default(Func<TRaw, CancellationToken, ValueTask<TCanonical>> handler)
{
_default = (raw, ct) => (handler ?? throw new ArgumentNullException(nameof(handler)))(raw, ct);
return this;
}

/// <summary>Builds an immutable keyed normalizer.</summary>
public KeyedNormalizer<TKey, TRaw, TCanonical> Build()
{
if (_entries.Count == 0 && _default is null)
throw new InvalidOperationException(
"KeyedNormalizer requires at least one key handler or a default handler.");

// Snapshot the dictionary so the built instance is immutable-after-build.
var snapshot = new Dictionary<TKey, AsyncNormalizerHandler>(_entries);
return new KeyedNormalizer<TKey, TRaw, TCanonical>(_name, snapshot, _default);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,78 @@ public async Task RunAsync_RejectsNullHandler()

await ScenarioExpect.ThrowsAsync<ArgumentNullException>(() => consumer.RunAsync(null!).AsTask());
}

// ─── PollOnceAsync ────────────────────────────────────────────────────────

[Scenario("PollOnceAsync ReturnsItemFromSource")]
[Fact]
public async Task PollOnceAsync_ReturnsItemFromSource()
{
var consumer = AsyncPollingConsumer<string>.Create()
.WithSource(async (_, _) => { await Task.CompletedTask; return Message<string>.Create("hello"); })
.Build();

var result = await consumer.PollOnceAsync();

ScenarioExpect.NotNull(result);
ScenarioExpect.Equal("hello", result!.Payload);
}

[Scenario("PollOnceAsync ReturnsNullWhenSourceReturnsEmpty")]
[Fact]
public async Task PollOnceAsync_ReturnsNullWhenSourceReturnsEmpty()
{
var consumer = AsyncPollingConsumer<string>.Create()
.WithSource(async (_, _) => { await Task.CompletedTask; return null; })
.Build();

var result = await consumer.PollOnceAsync();

ScenarioExpect.Null(result);
}

[Scenario("PollOnceAsync RespectsCancellation")]
[Fact]
public async Task PollOnceAsync_RespectsCancellation()
{
using var cts = new CancellationTokenSource();
cts.Cancel();

var consumer = AsyncPollingConsumer<string>.Create()
.WithSource(async (_, ct) =>
{
await Task.Delay(1000, ct);
return Message<string>.Create("never");
})
.Build();

await ScenarioExpect.ThrowsAsync<OperationCanceledException>(
() => consumer.PollOnceAsync(ct: cts.Token).AsTask());
}

[Scenario("PollOnceAsync DoesNotInvokeRunLoopHandler")]
[Fact]
public async Task PollOnceAsync_DoesNotInvokeRunLoopHandler()
{
// PollOnceAsync has no handler parameter — the only side-effect observable is
// that the source is called exactly once and the raw message is returned.
// We verify this by counting source invocations and confirming the value is
// returned directly without any additional callback layer.
var sourceCallCount = 0;

var consumer = AsyncPollingConsumer<string>.Create()
.WithSource(async (_, _) =>
{
Interlocked.Increment(ref sourceCallCount);
await Task.CompletedTask;
return Message<string>.Create("msg");
})
.Build();

var result = await consumer.PollOnceAsync();

ScenarioExpect.Equal(1, sourceCallCount); // source called exactly once
ScenarioExpect.NotNull(result); // message returned directly to caller
ScenarioExpect.Equal("msg", result!.Payload); // no handler mutation
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
using PatternKit.Messaging;
using PatternKit.Messaging.Reliability;
using TinyBDD;

namespace PatternKit.Tests.Messaging.Reliability;

public sealed class OutboxStoreExtensionsTests
{
[Scenario("EnqueueObjectAsync WithPayloadAndHeaders StoresCorrectly")]
[Fact]
public async Task EnqueueObjectAsync_WithPayloadAndHeaders_StoresCorrectly()
{
var store = new InMemoryOutboxStore<object>();
var payload = new { Type = "OrderCreated", OrderId = 42 };
var headers = new Dictionary<string, string>
{
["content-type"] = "application/json",
["source"] = "order-service",
};

var record = await store.EnqueueObjectAsync(payload, headers);

ScenarioExpect.NotNull(record);
ScenarioExpect.False(record.Dispatched);
ScenarioExpect.Equal(1, store.Records.Count);
ScenarioExpect.Equal(payload, record.Message.Payload);
ScenarioExpect.Equal("application/json", record.Message.Headers.GetString("content-type"));
ScenarioExpect.Equal("order-service", record.Message.Headers.GetString("source"));
}

[Scenario("EnqueueObjectAsync WithNullHeaders StoresEmptyHeadersMessage")]
[Fact]
public async Task EnqueueObjectAsync_WithNullHeaders_StoresEmptyHeadersMessage()
{
var store = new InMemoryOutboxStore<object>();
var payload = "plain-string-payload";

var record = await store.EnqueueObjectAsync(payload, headers: null);

ScenarioExpect.NotNull(record);
ScenarioExpect.Equal(payload, record.Message.Payload);
// Null headers must reuse MessageHeaders.Empty — not allocate a new empty instance.
ScenarioExpect.True(ReferenceEquals(MessageHeaders.Empty, record.Message.Headers));
}

[Scenario("EnqueueObjectAsync WithEmptyDictionary StoresEmptyHeadersMessage")]
[Fact]
public async Task EnqueueObjectAsync_WithEmptyDictionary_StoresEmptyHeadersMessage()
{
var store = new InMemoryOutboxStore<object>();
var payload = 42;

var record = await store.EnqueueObjectAsync(payload, headers: new Dictionary<string, string>());

ScenarioExpect.NotNull(record);
// An empty (non-null) dictionary must also map to MessageHeaders.Empty.
ScenarioExpect.True(ReferenceEquals(MessageHeaders.Empty, record.Message.Headers));
}

[Scenario("EnqueueObjectAsync RespectsCancellation")]
[Fact]
public async Task EnqueueObjectAsync_RespectsCancellation()
{
using var cts = new CancellationTokenSource();
cts.Cancel();

var store = new InMemoryOutboxStore<object>();

await ScenarioExpect.ThrowsAsync<OperationCanceledException>(
() => store.EnqueueObjectAsync("payload", null, cts.Token).AsTask());
}
}
Loading
Loading