diff --git a/src/PatternKit.Core/Messaging/Channels/AsyncWireTap.cs b/src/PatternKit.Core/Messaging/Channels/AsyncWireTap.cs new file mode 100644 index 00000000..03138867 --- /dev/null +++ b/src/PatternKit.Core/Messaging/Channels/AsyncWireTap.cs @@ -0,0 +1,193 @@ +namespace PatternKit.Messaging.Channels; + +/// +/// Determines how a tap failure is handled by . +/// +public enum TapErrorPolicy +{ + /// Swallow the exception; the main flow is unaffected (default). + Swallow, + + /// Log the exception via the configured sink; the main flow is unaffected. + Log, + + /// Re-throw the exception, propagating it to the caller. + Propagate, +} + +/// +/// Per-tap execution outcome captured by . +/// +public sealed class TapResult +{ + private TapResult(string tapName, bool succeeded, Exception? exception) + { + TapName = tapName; + Succeeded = succeeded; + Exception = exception; + } + + /// The name of the tap that produced this result. + public string TapName { get; } + + /// Whether the tap completed without error. + public bool Succeeded { get; } + + /// The exception thrown by the tap, if any. + public Exception? Exception { get; } + + internal static TapResult Success(string tapName) => new(tapName, true, null); + internal static TapResult Failure(string tapName, Exception exception) => new(tapName, false, exception); +} + +/// +/// Result returned by . +/// +public sealed class AsyncWireTapResult +{ + internal AsyncWireTapResult(Message message, string tapName, TapResult[] tapResults) + { + Message = message; + TapName = tapName; + TapResults = tapResults; + } + + /// The unchanged message that was observed. + public Message Message { get; } + + /// The wire-tap name. + public string TapName { get; } + + /// Per-tap execution outcomes. + public IReadOnlyList TapResults { get; } +} + +/// +/// Async wire tap that observes messages with named async side-channel handlers, with per-tap error isolation. +/// The main message flow is never disrupted unless the tap policy is . +/// +/// The message payload type. +public sealed class AsyncWireTap +{ + /// Async tap handler delegate. + public delegate ValueTask AsyncTapHandler(Message message, MessageContext context, CancellationToken cancellationToken); + + private readonly string _name; + private readonly Tap[] _taps; + + private AsyncWireTap(string name, Tap[] taps) => (_name, _taps) = (name, taps); + + /// Creates a new async wire-tap builder. + public static Builder Create(string name = "async-wire-tap") => new(name); + + /// + /// Publishes to all taps and returns the unchanged message with per-tap outcomes. + /// Tap failures are handled according to each tap's configured . + /// + public async ValueTask> PublishAsync( + Message message, + MessageContext? context = null, + CancellationToken cancellationToken = default) + { + if (message is null) + throw new ArgumentNullException(nameof(message)); + + var effectiveContext = context ?? MessageContext.From(message, cancellationToken); + var results = new TapResult[_taps.Length]; + + for (var i = 0; i < _taps.Length; i++) + { + var tap = _taps[i]; + try + { + await tap.Handler(message, effectiveContext, cancellationToken).ConfigureAwait(false); + results[i] = TapResult.Success(tap.Name); + } + catch (Exception ex) + { + // Re-throw OCE when the caller requested cancellation — tap policy must not swallow it. + if (ex is OperationCanceledException && cancellationToken.IsCancellationRequested) + throw; + + results[i] = TapResult.Failure(tap.Name, ex); + switch (tap.Policy) + { + case TapErrorPolicy.Log: + tap.ErrorSink?.Invoke(ex); + break; + case TapErrorPolicy.Propagate: + throw; + case TapErrorPolicy.Swallow: + default: + break; + } + } + } + + return new AsyncWireTapResult(message, _name, results); + } + + /// Fluent builder for . + public sealed class Builder + { + private readonly string _name; + private readonly List _taps = new(4); + private TapErrorPolicy _defaultPolicy = TapErrorPolicy.Swallow; + private Action? _defaultErrorSink; + + internal Builder(string name) + { + if (string.IsNullOrWhiteSpace(name)) + throw new ArgumentException("Wire tap name cannot be null, empty, or whitespace.", nameof(name)); + + _name = name; + } + + /// Sets the default error policy applied to taps that do not specify their own. + public Builder WithDefaultPolicy(TapErrorPolicy policy, Action? sink = null) + { + _defaultPolicy = policy; + _defaultErrorSink = sink; + return this; + } + + /// Adds an async tap with the default error policy. + public Builder Tap(string name, AsyncTapHandler handler) + => Tap(name, handler, _defaultPolicy, _defaultErrorSink); + + /// Adds an async tap with an explicit error policy. + public Builder Tap(string name, AsyncTapHandler handler, TapErrorPolicy policy, Action? errorSink = null) + { + if (string.IsNullOrWhiteSpace(name)) + throw new ArgumentException("Tap name cannot be null, empty, or whitespace.", nameof(name)); + if (handler is null) + throw new ArgumentNullException(nameof(handler)); + + if (policy == TapErrorPolicy.Log && errorSink is null && _defaultErrorSink is null) + throw new ArgumentException("An error sink is required when the tap policy is Log.", nameof(errorSink)); + + _taps.Add(new Tap(name, handler, policy, errorSink ?? _defaultErrorSink)); + return this; + } + + /// Builds an immutable async wire tap. + public AsyncWireTap Build() + { + if (_taps.Count == 0) + throw new InvalidOperationException("AsyncWireTap must have at least one tap handler."); + + return new AsyncWireTap(_name, _taps.ToArray()); + } + } + + private sealed class Tap + { + internal Tap(string name, AsyncTapHandler handler, TapErrorPolicy policy, Action? errorSink) + => (Name, Handler, Policy, ErrorSink) = (name, handler, policy, errorSink); + + internal string Name { get; } + internal AsyncTapHandler Handler { get; } + internal TapErrorPolicy Policy { get; } + internal Action? ErrorSink { get; } + } +} diff --git a/src/PatternKit.Core/Messaging/Consumers/AsyncPollingConsumer.cs b/src/PatternKit.Core/Messaging/Consumers/AsyncPollingConsumer.cs new file mode 100644 index 00000000..6a2a9b3b --- /dev/null +++ b/src/PatternKit.Core/Messaging/Consumers/AsyncPollingConsumer.cs @@ -0,0 +1,212 @@ +namespace PatternKit.Messaging.Consumers; + +/// +/// Back-off policy applied when an empty poll is received. +/// +public enum BackOffPolicy +{ + /// Wait the same interval after an empty poll. + Constant, + + /// Double the wait after each consecutive empty poll, up to the configured cap. + Exponential, +} + +/// +/// Self-driving async polling consumer with configurable interval, jitter, and empty-poll back-off. +/// +/// The payload type produced by the source. +public sealed class AsyncPollingConsumer +{ + /// Async poll source delegate. Return to indicate an empty poll. + public delegate ValueTask?> AsyncPollSource(MessageContext context, CancellationToken cancellationToken); + + /// Async message handler delegate invoked for each non-empty poll result. + public delegate ValueTask AsyncMessageHandler(Message message, MessageContext context, CancellationToken cancellationToken); + + private readonly string _name; + private readonly AsyncPollSource _source; + private readonly TimeSpan _interval; + private readonly TimeSpan _jitter; + private readonly BackOffPolicy _backOffPolicy; + private readonly TimeSpan _backOffCap; + + private AsyncPollingConsumer( + string name, + AsyncPollSource source, + TimeSpan interval, + TimeSpan jitter, + BackOffPolicy backOffPolicy, + TimeSpan backOffCap) + { + _name = name; + _source = source; + _interval = interval; + _jitter = jitter; + _backOffPolicy = backOffPolicy; + _backOffCap = backOffCap; + } + + /// The consumer name. + public string Name => _name; + + /// Creates a new async polling consumer builder. + public static Builder Create(string name = "async-polling-consumer") => new(name); + + /// + /// Runs the polling loop until is cancelled. + /// Polls the source on the configured interval, invokes for each message, + /// and applies empty-poll back-off. + /// + public async ValueTask RunAsync( + AsyncMessageHandler handler, + MessageContext? context = null, + CancellationToken cancellationToken = default) + { + if (handler is null) + throw new ArgumentNullException(nameof(handler)); + + var effectiveContext = context ?? MessageContext.Empty; + var rng = new Random(); + var consecutiveEmpty = 0; + + while (!cancellationToken.IsCancellationRequested) + { + Message? message = null; + try + { + message = await _source(effectiveContext, cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + break; + } + + if (message is not null) + { + consecutiveEmpty = 0; + try + { + await handler(message, effectiveContext, cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + break; + } + } + else + { + consecutiveEmpty++; + } + + var delay = ComputeDelay(consecutiveEmpty, rng); + try + { + await Task.Delay(delay, cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + break; + } + } + } + + private TimeSpan ComputeDelay(int consecutiveEmpty, Random rng) + { + TimeSpan baseDelay; + if (consecutiveEmpty == 0) + { + baseDelay = _interval; + } + else + { + baseDelay = _backOffPolicy switch + { + BackOffPolicy.Exponential => Min( + TimeSpan.FromMilliseconds(_interval.TotalMilliseconds * Math.Pow(2, consecutiveEmpty - 1)), + _backOffCap), + _ => _interval, + }; + } + + if (_jitter > TimeSpan.Zero) + { + var jitterMs = rng.NextDouble() * _jitter.TotalMilliseconds; + baseDelay = baseDelay.Add(TimeSpan.FromMilliseconds(jitterMs)); + } + + return baseDelay; + } + + private static TimeSpan Min(TimeSpan a, TimeSpan b) => a < b ? a : b; + + /// Fluent builder for . + public sealed class Builder + { + private readonly string _name; + private AsyncPollSource? _source; + private TimeSpan _interval = TimeSpan.FromSeconds(5); + private TimeSpan _jitter = TimeSpan.Zero; + private BackOffPolicy _backOffPolicy = BackOffPolicy.Constant; + private TimeSpan _backOffCap = TimeSpan.FromMinutes(1); + + internal Builder(string name) + { + if (string.IsNullOrWhiteSpace(name)) + throw new ArgumentException("Polling consumer name cannot be null, empty, or whitespace.", nameof(name)); + + _name = name; + } + + /// Sets the poll source delegate. + public Builder WithSource(AsyncPollSource source) + { + _source = source ?? throw new ArgumentNullException(nameof(source)); + return this; + } + + /// Sets the polling interval (default: 5 seconds). + public Builder WithInterval(TimeSpan interval) + { + if (interval <= TimeSpan.Zero) + throw new ArgumentOutOfRangeException(nameof(interval), "Polling interval must be positive."); + + _interval = interval; + return this; + } + + /// Sets the maximum random jitter added to each delay (default: none). + public Builder WithJitter(TimeSpan jitter) + { + if (jitter < TimeSpan.Zero) + throw new ArgumentOutOfRangeException(nameof(jitter), "Jitter must be non-negative."); + + _jitter = jitter; + return this; + } + + /// + /// Sets the back-off policy applied on empty polls (default: ). + /// + public Builder OnEmpty(BackOffPolicy policy, TimeSpan? cap = null) + { + _backOffPolicy = policy; + if (cap.HasValue) + { + if (cap.Value <= TimeSpan.Zero) + throw new ArgumentOutOfRangeException(nameof(cap), "Back-off cap must be positive."); + _backOffCap = cap.Value; + } + return this; + } + + /// Builds an immutable async polling consumer. + public AsyncPollingConsumer Build() + { + if (_source is null) + throw new InvalidOperationException("AsyncPollingConsumer requires a poll source."); + + return new AsyncPollingConsumer(_name, _source, _interval, _jitter, _backOffPolicy, _backOffCap); + } + } +} diff --git a/src/PatternKit.Core/Messaging/Reliability/IOutboxStore.cs b/src/PatternKit.Core/Messaging/Reliability/IOutboxStore.cs new file mode 100644 index 00000000..118c4029 --- /dev/null +++ b/src/PatternKit.Core/Messaging/Reliability/IOutboxStore.cs @@ -0,0 +1,211 @@ +namespace PatternKit.Messaging.Reliability; + +/// +/// Pluggable backing store for the Transactional Outbox pattern. +/// Implement this interface to provide durable outbox storage (e.g., relational database, file system). +/// +/// The outbox message payload type. +public interface IOutboxStore +{ + /// Adds a message to the outbox and returns the stored record. + ValueTask> EnqueueAsync( + Message message, + string? id = null, + DateTimeOffset? createdAt = null, + CancellationToken cancellationToken = default); + + /// Returns all messages that have not yet been dispatched. + ValueTask>> SnapshotPendingAsync(CancellationToken cancellationToken = default); + + /// Marks a message as successfully dispatched. + ValueTask MarkDispatchedAsync(string id, DateTimeOffset dispatchedAt, CancellationToken cancellationToken = default); + + /// Records a failed dispatch attempt for a message. + ValueTask MarkFailedAsync(string id, string? error, CancellationToken cancellationToken = default); +} + +/// +/// Thread-safe in-memory implementation of for tests, +/// demos, and single-process applications. +/// +/// The outbox message payload type. +public sealed class InMemoryOutboxStore : IOutboxStore +{ + private readonly object _gate = new(); + private readonly List> _records = new(); + + /// All records currently held in the store. + public IReadOnlyList> Records + { + get + { + lock (_gate) + return _records.ToArray(); + } + } + + /// + public ValueTask> EnqueueAsync( + Message message, + string? id = null, + DateTimeOffset? createdAt = null, + CancellationToken cancellationToken = default) + { + if (message is null) + throw new ArgumentNullException(nameof(message)); + + cancellationToken.ThrowIfCancellationRequested(); + + var record = new OutboxMessage( + string.IsNullOrWhiteSpace(id) ? Guid.NewGuid().ToString("N") : id!, + message, + createdAt ?? DateTimeOffset.UtcNow); + + lock (_gate) + _records.Add(record); + + return new ValueTask>(record); + } + + /// + public ValueTask>> SnapshotPendingAsync(CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + + lock (_gate) + { + IReadOnlyList> pending = _records.Where(r => !r.Dispatched).ToArray(); + return new ValueTask>>(pending); + } + } + + /// + public ValueTask MarkDispatchedAsync(string id, DateTimeOffset dispatchedAt, CancellationToken cancellationToken = default) + { + if (string.IsNullOrWhiteSpace(id)) + throw new ArgumentException("Outbox message id is required.", nameof(id)); + + cancellationToken.ThrowIfCancellationRequested(); + + lock (_gate) + { + for (var i = 0; i < _records.Count; i++) + { + if (_records[i].Id == id) + { + _records[i] = _records[i].MarkDispatched(dispatchedAt); + return default; + } + } + } + + return default; + } + + /// + public ValueTask MarkFailedAsync(string id, string? error, CancellationToken cancellationToken = default) + { + if (string.IsNullOrWhiteSpace(id)) + throw new ArgumentException("Outbox message id is required.", nameof(id)); + + cancellationToken.ThrowIfCancellationRequested(); + + lock (_gate) + { + for (var i = 0; i < _records.Count; i++) + { + if (_records[i].Id == id) + { + _records[i] = _records[i].WithAttempt(error); + return default; + } + } + } + + return default; + } +} + +/// +/// Pluggable dispatch loop helper for the Transactional Outbox pattern. +/// Combines an with an +/// to provide a reusable relay loop. +/// +/// The outbox message payload type. +public sealed class OutboxDispatcher +{ + private readonly IOutboxStore _store; + private readonly IOutboxDispatcher _dispatcher; + + /// + /// Creates an outbox dispatcher bound to the given store and dispatcher. + /// + public OutboxDispatcher(IOutboxStore store, IOutboxDispatcher dispatcher) + { + _store = store ?? throw new ArgumentNullException(nameof(store)); + _dispatcher = dispatcher ?? throw new ArgumentNullException(nameof(dispatcher)); + } + + /// + /// Drains all pending outbox records by dispatching each through the configured dispatcher. + /// Returns the number of records successfully dispatched. + /// + public async ValueTask DrainAsync(CancellationToken cancellationToken = default) + { + var pending = await _store.SnapshotPendingAsync(cancellationToken).ConfigureAwait(false); + var dispatched = 0; + + foreach (var record in pending) + { + cancellationToken.ThrowIfCancellationRequested(); + try + { + await _dispatcher.DispatchAsync(record, cancellationToken).ConfigureAwait(false); + await _store.MarkDispatchedAsync(record.Id, DateTimeOffset.UtcNow, cancellationToken).ConfigureAwait(false); + dispatched++; + } + catch (Exception ex) + { + await _store.MarkFailedAsync(record.Id, ex.Message, cancellationToken).ConfigureAwait(false); + throw; + } + } + + return dispatched; + } + + /// + /// Continuously drains pending outbox records until is cancelled, + /// waiting between each drain cycle. + /// + public async ValueTask RunAsync(TimeSpan pollInterval, CancellationToken cancellationToken = default) + { + if (pollInterval <= TimeSpan.Zero) + throw new ArgumentOutOfRangeException(nameof(pollInterval), "Poll interval must be positive."); + + while (!cancellationToken.IsCancellationRequested) + { + try + { + await DrainAsync(cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + break; + } + catch + { + // Individual dispatch errors are recorded; loop continues + } + + try + { + await Task.Delay(pollInterval, cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + break; + } + } + } +} diff --git a/src/PatternKit.Core/Messaging/Reliability/InMemoryIdempotencyStoreWithTtl.cs b/src/PatternKit.Core/Messaging/Reliability/InMemoryIdempotencyStoreWithTtl.cs new file mode 100644 index 00000000..8760840b --- /dev/null +++ b/src/PatternKit.Core/Messaging/Reliability/InMemoryIdempotencyStoreWithTtl.cs @@ -0,0 +1,145 @@ +namespace PatternKit.Messaging.Reliability; + +/// +/// Extends with optional per-key TTL and periodic eviction. +/// +public interface IIdempotencyStoreWithTtl : IIdempotencyStore +{ + /// + /// Attempts to claim for processing with an optional time-to-live. + /// Keys expire after elapses from their creation time. + /// + ValueTask TryClaimAsync(string key, TimeSpan? ttl, CancellationToken cancellationToken = default); + + /// + /// Evicts all keys whose TTL has elapsed. + /// + ValueTask EvictExpiredAsync(CancellationToken cancellationToken = default); +} + +/// +/// Thread-safe in-memory idempotency store with optional per-key TTL and periodic eviction. +/// +public sealed class InMemoryIdempotencyStoreWithTtl : IIdempotencyStoreWithTtl +{ + private readonly object _gate = new(); + private readonly Dictionary _entries = new(StringComparer.Ordinal); + + /// The number of keys currently stored (including potentially expired ones). + public int Count + { + get + { + lock (_gate) + return _entries.Count; + } + } + + /// + public ValueTask TryClaimAsync(string key, CancellationToken cancellationToken = default) + => TryClaimAsync(key, null, cancellationToken); + + /// + public ValueTask TryClaimAsync(string key, TimeSpan? ttl, CancellationToken cancellationToken = default) + { + ValidateKey(key); + if (ttl.HasValue && ttl.Value < TimeSpan.Zero) + throw new ArgumentOutOfRangeException(nameof(ttl), "TTL must not be negative."); + cancellationToken.ThrowIfCancellationRequested(); + + var now = DateTimeOffset.UtcNow; + lock (_gate) + { + if (_entries.TryGetValue(key, out var existing)) + { + // Treat expired entries as if they don't exist + if (existing.ExpiresAt.HasValue && existing.ExpiresAt.Value <= now) + _entries.Remove(key); + else + return new ValueTask(IdempotencyClaim.Existing(key, existing.Status, existing.Result, existing.FailureReason)); + } + + var expiresAt = ttl.HasValue ? now + ttl.Value : (DateTimeOffset?)null; + _entries[key] = new Entry(IdempotencyEntryStatus.Processing, null, null, expiresAt); + return new ValueTask(IdempotencyClaim.ClaimedKey(key)); + } + } + + /// + public ValueTask MarkCompletedAsync(string key, object? result = null, CancellationToken cancellationToken = default) + { + ValidateKey(key); + cancellationToken.ThrowIfCancellationRequested(); + + lock (_gate) + { + var expiresAt = _entries.TryGetValue(key, out var existing) ? existing.ExpiresAt : null; + _entries[key] = new Entry(IdempotencyEntryStatus.Completed, result, null, expiresAt); + } + + return default; + } + + /// + public ValueTask MarkFailedAsync(string key, string? reason = null, CancellationToken cancellationToken = default) + { + ValidateKey(key); + cancellationToken.ThrowIfCancellationRequested(); + + lock (_gate) + { + var expiresAt = _entries.TryGetValue(key, out var existing) ? existing.ExpiresAt : null; + _entries[key] = new Entry(IdempotencyEntryStatus.Failed, null, reason, expiresAt); + } + + return default; + } + + /// + public ValueTask EvictExpiredAsync(CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + var now = DateTimeOffset.UtcNow; + var evicted = 0; + + lock (_gate) + { + var expiredKeys = new List(); + foreach (var pair in _entries) + { + if (pair.Value.ExpiresAt.HasValue && pair.Value.ExpiresAt.Value <= now) + expiredKeys.Add(pair.Key); + } + + foreach (var key in expiredKeys) + { + _entries.Remove(key); + evicted++; + } + } + + return new ValueTask(evicted); + } + + private static void ValidateKey(string key) + { + if (string.IsNullOrWhiteSpace(key)) + throw new ArgumentException("Idempotency key cannot be null, empty, or whitespace.", nameof(key)); + } + + private sealed class Entry + { + internal Entry(IdempotencyEntryStatus status, object? result, string? failureReason, DateTimeOffset? expiresAt) + { + Status = status; + Result = result; + FailureReason = failureReason; + ExpiresAt = expiresAt; + } + + internal IdempotencyEntryStatus Status { get; } + internal object? Result { get; } + internal string? FailureReason { get; } + internal DateTimeOffset? ExpiresAt { get; } + } +} diff --git a/src/PatternKit.Core/Messaging/Routing/AsyncScatterGather.cs b/src/PatternKit.Core/Messaging/Routing/AsyncScatterGather.cs new file mode 100644 index 00000000..905d1d80 --- /dev/null +++ b/src/PatternKit.Core/Messaging/Routing/AsyncScatterGather.cs @@ -0,0 +1,403 @@ +namespace PatternKit.Messaging.Routing; + +/// +/// Defines when considers the fan-out complete. +/// +public abstract class CompletionStrategy +{ + private CompletionStrategy() { } + + /// Wait for all recipients to respond. + public static CompletionStrategy All { get; } = new AllStrategy(); + + /// + /// Wait until at least recipients have responded (success or failure). + /// + public static CompletionStrategy Quorum(int n) => new QuorumStrategy(n); + + /// Wait until at least successful responses are received. + public static CompletionStrategy FirstN(int n) => new FirstNStrategy(n); + + /// Wait up to ; use whatever responses arrived by then. + public static CompletionStrategy Timeout(TimeSpan timeout) => new TimeoutStrategy(timeout); + + /// Wait for all responses, but stop waiting after . + public static CompletionStrategy AllOrTimeout(TimeSpan timeout) => new AllOrTimeoutStrategy(timeout); + + internal abstract TimeSpan? GetTimeout(); + + private sealed class AllStrategy : CompletionStrategy + { + internal override TimeSpan? GetTimeout() => null; + } + + private sealed class QuorumStrategy : CompletionStrategy + { + private readonly int _quorum; + internal QuorumStrategy(int n) + { + if (n <= 0) throw new ArgumentOutOfRangeException(nameof(n), "Quorum must be positive."); + _quorum = n; + } + + internal override TimeSpan? GetTimeout() => null; + + internal int Required => _quorum; + } + + private sealed class FirstNStrategy : CompletionStrategy + { + private readonly int _n; + internal FirstNStrategy(int n) + { + if (n <= 0) throw new ArgumentOutOfRangeException(nameof(n), "FirstN must be positive."); + _n = n; + } + + internal override TimeSpan? GetTimeout() => null; + + internal int Required => _n; + } + + private sealed class TimeoutStrategy : CompletionStrategy + { + private readonly TimeSpan _timeout; + internal TimeoutStrategy(TimeSpan timeout) + { + if (timeout <= TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(timeout), "Timeout must be positive."); + _timeout = timeout; + } + + internal override TimeSpan? GetTimeout() => _timeout; + } + + private sealed class AllOrTimeoutStrategy : CompletionStrategy + { + private readonly TimeSpan _timeout; + internal AllOrTimeoutStrategy(TimeSpan timeout) + { + if (timeout <= TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(timeout), "Timeout must be positive."); + _timeout = timeout; + } + + internal override TimeSpan? GetTimeout() => _timeout; + + internal bool RequireAll => true; + } + + internal bool IsQuorum(out int n) { if (this is QuorumStrategy q) { n = q.Required; return true; } n = 0; return false; } + internal bool IsFirstN(out int n) { if (this is FirstNStrategy f) { n = f.Required; return true; } n = 0; return false; } + internal bool IsAllOrTimeout(out TimeSpan timeout) { if (this is AllOrTimeoutStrategy a) { timeout = a.GetTimeout()!.Value; return true; } timeout = default; return false; } +} + +/// Envelope wrapping one recipient's response. +public sealed class ResponseEnvelope +{ + private ResponseEnvelope(string recipientName, TResponse? response, bool succeeded, Exception? exception) + { + RecipientName = recipientName; + Response = response; + Succeeded = succeeded; + Exception = exception; + } + + /// The recipient name. + public string RecipientName { get; } + + /// The response value, when successful. + public TResponse? Response { get; } + + /// Whether the recipient completed without error. + public bool Succeeded { get; } + + /// The exception thrown by the recipient, if any. + public Exception? Exception { get; } + + internal static ResponseEnvelope Success(string name, TResponse response) => new(name, response, true, null); + internal static ResponseEnvelope Failure(string name, Exception ex) => new(name, default, false, ex); +} + +/// +/// Async scatter-gather with pluggable completion strategy, per-branch error isolation, +/// and concurrent fan-out using Task.WhenAll. +/// +/// The fan-out request type. +/// The per-recipient response type. +/// The aggregated result type. +public sealed class AsyncScatterGather +{ + /// Async recipient delegate. + public delegate ValueTask AsyncRecipientHandler( + Message message, + MessageContext context, + CancellationToken cancellationToken); + + /// Aggregation delegate receiving all envelopes that completed before the strategy fired. + public delegate TResult ResponseAggregator( + IReadOnlyList> envelopes, + Message request, + MessageContext context); + + private readonly string _name; + private readonly Recipient[] _recipients; + private readonly CompletionStrategy _strategy; + private readonly ResponseAggregator _aggregator; + + private AsyncScatterGather( + string name, + Recipient[] recipients, + CompletionStrategy strategy, + ResponseAggregator aggregator) + { + _name = name; + _recipients = recipients; + _strategy = strategy; + _aggregator = aggregator; + } + + /// Creates a new async scatter-gather builder. + public static Builder Create(string name = "async-scatter-gather") => new(name); + + /// + /// Fans out to all recipients concurrently, waits per strategy, + /// and aggregates the results using concurrent fan-out. + /// + public async ValueTask> DispatchAsync( + Message message, + MessageContext? context = null, + CancellationToken cancellationToken = default) + { + if (message is null) + throw new ArgumentNullException(nameof(message)); + + var effectiveContext = context ?? MessageContext.From(message, cancellationToken); + var timeout = _strategy.GetTimeout(); + + using var cts = timeout.HasValue + ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken) + : null; + + if (timeout.HasValue && cts != null) + cts.CancelAfter(timeout.Value); + + var linkedToken = cts?.Token ?? cancellationToken; + + // Track completed envelopes in a thread-safe list + var envelopes = new System.Collections.Concurrent.ConcurrentBag>(); + + // FirstN counts only successful responses; Quorum counts any completed response (success or failure). + _strategy.IsFirstN(out var firstN); + _strategy.IsQuorum(out var quorum); + + using var earlyCts = (firstN > 0 || quorum > 0) + ? (cts != null + ? CancellationTokenSource.CreateLinkedTokenSource(cts.Token) + : CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) + : null; + + var earlyCounter = new EarlyExitCounter(firstN, quorum, earlyCts); + + var tasks = _recipients.Select(recipient => RunRecipientAsync( + recipient, message, effectiveContext, earlyCts?.Token ?? linkedToken, + envelopes, earlyCounter, cancellationToken)).ToArray(); + + try + { + if (timeout.HasValue) + { + var timeoutTask = Task.Delay(timeout.Value, cancellationToken); + var whenAll = Task.WhenAll(tasks); + await Task.WhenAny(whenAll, timeoutTask).ConfigureAwait(false); + } + else + { + await Task.WhenAll(tasks).ConfigureAwait(false); + } + } + catch + { + // Individual task errors are captured in envelopes; swallow aggregate exception + } + + var collected = envelopes.ToArray(); + if (collected.Length == 0) + return AsyncScatterGatherResult.Rejected(_name, collected, "No scatter-gather recipients produced a result."); + + var aggregated = _aggregator(collected, message, effectiveContext); + return AsyncScatterGatherResult.Success(_name, collected, aggregated); + } + + private static async Task RunRecipientAsync( + Recipient recipient, + Message message, + MessageContext context, + CancellationToken ct, + System.Collections.Concurrent.ConcurrentBag> envelopes, + EarlyExitCounter earlyCounter, + CancellationToken callerCt) + { + try + { + var response = await recipient.Handler(message, context, ct).ConfigureAwait(false); + envelopes.Add(ResponseEnvelope.Success(recipient.Name, response)); + earlyCounter.RecordSuccess(); + } + catch (OperationCanceledException) when (!callerCt.IsCancellationRequested) + { + // Cancellation from early-exit CTS or timeout — not a recipient error and not caller-initiated. + earlyCounter.RecordCompletion(); + } + catch (OperationCanceledException) + { + // Caller-initiated cancellation — surface as a failure envelope so callers can observe it, + // then re-throw so the task propagates cancellation normally. + envelopes.Add(ResponseEnvelope.Failure(recipient.Name, new OperationCanceledException("Recipient cancelled by caller.", callerCt))); + throw; + } + catch (Exception ex) + { + envelopes.Add(ResponseEnvelope.Failure(recipient.Name, ex)); + earlyCounter.RecordCompletion(); + } + } + + private sealed class EarlyExitCounter + { + private readonly int _firstN; + private readonly int _quorum; + private readonly CancellationTokenSource? _cts; + private int _successCount; + private int _completedCount; + + internal EarlyExitCounter(int firstN, int quorum, CancellationTokenSource? cts) + => (_firstN, _quorum, _cts) = (firstN, quorum, cts); + + /// Called when a recipient succeeds. Triggers early exit for FirstN; also counts toward Quorum. + internal void RecordSuccess() + { + if (_cts is null) + return; + + if (_firstN > 0) + { + var count = System.Threading.Interlocked.Increment(ref _successCount); + if (count >= _firstN) + { + try { _cts.Cancel(); } catch (ObjectDisposedException) { } + return; + } + } + + // Success also counts as a completion toward Quorum + RecordCompletion(); + } + + /// Called when a recipient completes (success or failure). Triggers early exit for Quorum. + internal void RecordCompletion() + { + if (_quorum <= 0 || _cts is null) + return; + + var count = System.Threading.Interlocked.Increment(ref _completedCount); + if (count >= _quorum) + { + try { _cts.Cancel(); } catch (ObjectDisposedException) { } + } + } + } + + /// Fluent builder for . + public sealed class Builder + { + private readonly string _name; + private readonly List _recipients = []; + private CompletionStrategy _strategy = CompletionStrategy.All; + private ResponseAggregator? _aggregator; + + internal Builder(string name) + { + if (string.IsNullOrWhiteSpace(name)) + throw new ArgumentException("Scatter-gather name cannot be null, empty, or whitespace.", nameof(name)); + + _name = name; + } + + /// Adds a named async recipient. + public Builder Recipient(string name, AsyncRecipientHandler handler) + { + if (string.IsNullOrWhiteSpace(name)) + throw new ArgumentException("Recipient name cannot be null, empty, or whitespace.", nameof(name)); + if (handler is null) + throw new ArgumentNullException(nameof(handler)); + + _recipients.Add(new Recipient(name, handler)); + return this; + } + + /// Sets the completion strategy (default: ). + public Builder CompleteWith(CompletionStrategy strategy) + { + _strategy = strategy ?? throw new ArgumentNullException(nameof(strategy)); + return this; + } + + /// Sets the aggregation delegate. + public Builder WithAggregator(ResponseAggregator aggregator) + { + _aggregator = aggregator ?? throw new ArgumentNullException(nameof(aggregator)); + return this; + } + + /// Builds an immutable async scatter-gather. + public AsyncScatterGather Build() + { + if (_recipients.Count == 0) + throw new InvalidOperationException("AsyncScatterGather requires at least one recipient."); + if (_aggregator is null) + throw new InvalidOperationException("AsyncScatterGather requires an aggregator."); + + return new AsyncScatterGather(_name, _recipients.ToArray(), _strategy, _aggregator); + } + } + + private sealed class Recipient + { + internal Recipient(string name, AsyncRecipientHandler handler) => (Name, Handler) = (name, handler); + internal string Name { get; } + internal AsyncRecipientHandler Handler { get; } + } +} + +/// Aggregated async scatter-gather result. +public sealed class AsyncScatterGatherResult +{ + private AsyncScatterGatherResult(string name, ResponseEnvelope[] envelopes, TResult? result, bool succeeded, string? rejectionReason) + { + Name = name; + Envelopes = envelopes; + Result = result; + Succeeded = succeeded; + RejectionReason = rejectionReason; + } + + /// The scatter-gather name. + public string Name { get; } + + /// Per-recipient response envelopes. + public IReadOnlyList> Envelopes { get; } + + /// The aggregated result. + public TResult? Result { get; } + + /// Whether any responses were aggregated. + public bool Succeeded { get; } + + /// Reason for failure when is false. + public string? RejectionReason { get; } + + internal static AsyncScatterGatherResult Success(string name, ResponseEnvelope[] envelopes, TResult result) + => new(name, envelopes, result, true, null); + + internal static AsyncScatterGatherResult Rejected(string name, ResponseEnvelope[] envelopes, string reason) + => new(name, envelopes, default, false, reason); +} diff --git a/src/PatternKit.Core/Messaging/Transformation/AsyncContentEnricher.cs b/src/PatternKit.Core/Messaging/Transformation/AsyncContentEnricher.cs new file mode 100644 index 00000000..af4926f0 --- /dev/null +++ b/src/PatternKit.Core/Messaging/Transformation/AsyncContentEnricher.cs @@ -0,0 +1,195 @@ +namespace PatternKit.Messaging.Transformation; + +/// +/// Determines how an enrichment step failure is handled by . +/// +public enum EnrichmentErrorPolicy +{ + /// Throw the exception; the enrichment pipeline aborts. + Throw, + + /// Skip this enrichment step; the payload is unchanged for this step. + Skip, + + /// Use a configured default value for this enrichment step. + UseDefault, +} + +/// Per-enrichment-step outcome captured by . +public sealed class EnrichmentStepResult +{ + private EnrichmentStepResult(string stepName, bool applied, bool skipped, Exception? exception) + { + StepName = stepName; + Applied = applied; + Skipped = skipped; + Exception = exception; + } + + /// The enrichment step name. + public string StepName { get; } + + /// Whether the step was applied successfully. + public bool Applied { get; } + + /// Whether the step was skipped due to an error or policy. + public bool Skipped { get; } + + /// The exception thrown by the step, if any. + public Exception? Exception { get; } + + internal static EnrichmentStepResult CreateApplied(string name) => new(name, true, false, null); + internal static EnrichmentStepResult CreateSkipped(string name, Exception? ex) => new(name, false, true, ex); +} + +/// Result returned by . +public sealed class AsyncContentEnricherResult +{ + internal AsyncContentEnricherResult(Message message, string enricherName, EnrichmentStepResult[] stepResults) + { + Message = message; + EnricherName = enricherName; + StepResults = stepResults; + } + + /// The enriched message. + public Message Message { get; } + + /// The enricher name. + public string EnricherName { get; } + + /// Per-step audit trail. + public IReadOnlyList StepResults { get; } +} + +/// +/// Augments a message payload with computed or fetched data without changing the payload type. +/// Each enrichment step is executed in registration order with per-step error isolation. +/// +/// The payload type to enrich. +public sealed class AsyncContentEnricher +{ + /// Async enrichment step delegate. Returns the enriched payload copy. + public delegate ValueTask AsyncEnrichStep(TPayload payload, MessageContext context, CancellationToken cancellationToken); + + private readonly string _name; + private readonly Step[] _steps; + + private AsyncContentEnricher(string name, Step[] steps) => (_name, _steps) = (name, steps); + + /// Creates a new content enricher builder. + public static Builder Create(string name = "content-enricher") => new(name); + + /// + /// Applies each enrichment step in order, returning the enriched message and a per-step audit trail. + /// + public async ValueTask> EnrichAsync( + Message message, + MessageContext? context = null, + CancellationToken cancellationToken = default) + { + if (message is null) + throw new ArgumentNullException(nameof(message)); + + var effectiveContext = context ?? MessageContext.From(message, cancellationToken); + var stepResults = new EnrichmentStepResult[_steps.Length]; + var currentPayload = message.Payload; + + for (var i = 0; i < _steps.Length; i++) + { + var step = _steps[i]; + try + { + currentPayload = await step.Handler(currentPayload, effectiveContext, cancellationToken).ConfigureAwait(false); + stepResults[i] = EnrichmentStepResult.CreateApplied(step.Name); + } + catch (Exception ex) + { + // Re-throw OCE when the caller requested cancellation — enrichment policy must not swallow it. + if (ex is OperationCanceledException && cancellationToken.IsCancellationRequested) + throw; + + switch (step.Policy) + { + case EnrichmentErrorPolicy.Throw: + throw; + case EnrichmentErrorPolicy.UseDefault: + if (step.DefaultFactory is not null) + currentPayload = step.DefaultFactory(currentPayload); + stepResults[i] = EnrichmentStepResult.CreateSkipped(step.Name, ex); + break; + case EnrichmentErrorPolicy.Skip: + default: + stepResults[i] = EnrichmentStepResult.CreateSkipped(step.Name, ex); + break; + } + } + } + + var enrichedMessage = new Message(currentPayload, message.Headers); + return new AsyncContentEnricherResult(enrichedMessage, _name, stepResults); + } + + /// Fluent builder for . + public sealed class Builder + { + private readonly string _name; + private readonly List _steps = new(4); + private EnrichmentErrorPolicy _defaultPolicy = EnrichmentErrorPolicy.Throw; + + internal Builder(string name) + { + if (string.IsNullOrWhiteSpace(name)) + throw new ArgumentException("Content enricher name cannot be null, empty, or whitespace.", nameof(name)); + + _name = name; + } + + /// Sets the default error policy for steps that do not specify their own. + public Builder WithDefaultPolicy(EnrichmentErrorPolicy policy) + { + _defaultPolicy = policy; + return this; + } + + /// Adds an enrichment step with the default policy. + public Builder Enrich(string name, AsyncEnrichStep handler) + => Enrich(name, handler, _defaultPolicy); + + /// Adds an enrichment step with an explicit policy. + public Builder Enrich(string name, AsyncEnrichStep handler, EnrichmentErrorPolicy policy, Func? defaultFactory = null) + { + if (string.IsNullOrWhiteSpace(name)) + throw new ArgumentException("Enrichment step name cannot be null, empty, or whitespace.", nameof(name)); + if (handler is null) + throw new ArgumentNullException(nameof(handler)); + if (policy == EnrichmentErrorPolicy.UseDefault && defaultFactory is null) + throw new ArgumentException( + $"A non-null {nameof(defaultFactory)} is required when policy is {nameof(EnrichmentErrorPolicy.UseDefault)}.", + nameof(defaultFactory)); + + _steps.Add(new Step(name, handler, policy, defaultFactory)); + return this; + } + + /// Builds an immutable content enricher. + public AsyncContentEnricher Build() + { + if (_steps.Count == 0) + throw new InvalidOperationException("AsyncContentEnricher must have at least one enrichment step."); + + return new AsyncContentEnricher(_name, _steps.ToArray()); + } + } + + private sealed class Step + { + internal Step(string name, AsyncEnrichStep handler, EnrichmentErrorPolicy policy, Func? defaultFactory) + => (Name, Handler, Policy, DefaultFactory) = (name, handler, policy, defaultFactory); + + internal string Name { get; } + internal AsyncEnrichStep Handler { get; } + internal EnrichmentErrorPolicy Policy { get; } + internal Func? DefaultFactory { get; } + } +} diff --git a/src/PatternKit.Core/Messaging/Transformation/InMemoryClaimCheckStoreWithTtl.cs b/src/PatternKit.Core/Messaging/Transformation/InMemoryClaimCheckStoreWithTtl.cs new file mode 100644 index 00000000..6a9ad8f7 --- /dev/null +++ b/src/PatternKit.Core/Messaging/Transformation/InMemoryClaimCheckStoreWithTtl.cs @@ -0,0 +1,109 @@ +using System.Collections.Concurrent; + +namespace PatternKit.Messaging.Transformation; + +/// +/// Extends with optional per-entry TTL and eviction. +/// +/// The payload type stored in the claim check. +public interface IClaimCheckStoreWithTtl : IClaimCheckStore +{ + /// + /// Stores under , optionally expiring after . + /// + ValueTask StoreAsync( + string claimId, + TPayload payload, + MessageHeaders headers, + TimeSpan? ttl, + CancellationToken cancellationToken = default); + + /// + /// Evicts all entries whose TTL has elapsed. + /// Returns the number of entries removed. + /// + ValueTask EvictExpiredAsync(CancellationToken cancellationToken = default); +} + +/// +/// Thread-safe in-memory claim check store with optional per-entry TTL and eviction. +/// Expired entries are removed lazily on read and proactively via . +/// +/// The payload type. +public sealed class InMemoryClaimCheckStoreWithTtl : IClaimCheckStoreWithTtl +{ + private readonly ConcurrentDictionary _items = new(StringComparer.Ordinal); + + /// + public ValueTask StoreAsync(string claimId, TPayload payload, MessageHeaders headers, CancellationToken cancellationToken = default) + => StoreAsync(claimId, payload, headers, null, cancellationToken); + + /// + public ValueTask StoreAsync( + string claimId, + TPayload payload, + MessageHeaders headers, + TimeSpan? ttl, + CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + if (string.IsNullOrWhiteSpace(claimId)) + throw new ArgumentException("Claim id is required.", nameof(claimId)); + if (headers is null) + throw new ArgumentNullException(nameof(headers)); + if (ttl.HasValue && ttl.Value < TimeSpan.Zero) + throw new ArgumentOutOfRangeException(nameof(ttl), "TTL must not be negative."); + + var expiresAt = ttl.HasValue ? DateTimeOffset.UtcNow + ttl.Value : (DateTimeOffset?)null; + _items[claimId] = new TimedEntry(new ClaimCheckStoredPayload(payload, headers), expiresAt); + return default; + } + + /// + public ValueTask?> TryLoadAsync(string claimId, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + if (string.IsNullOrWhiteSpace(claimId)) + throw new ArgumentException("Claim id is required.", nameof(claimId)); + + if (!_items.TryGetValue(claimId, out var entry)) + return new ValueTask?>(result: null); + + // Lazy expiry check + if (entry.ExpiresAt.HasValue && entry.ExpiresAt.Value <= DateTimeOffset.UtcNow) + { + _items.TryRemove(claimId, out _); + return new ValueTask?>(result: null); + } + + return new ValueTask?>(entry.Payload); + } + + /// + public ValueTask EvictExpiredAsync(CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + var now = DateTimeOffset.UtcNow; + var evicted = 0; + + foreach (var pair in _items) + { + if (pair.Value.ExpiresAt.HasValue && pair.Value.ExpiresAt.Value <= now) + { + if (_items.TryRemove(pair.Key, out _)) + evicted++; + } + } + + return new ValueTask(evicted); + } + + private sealed class TimedEntry + { + internal TimedEntry(ClaimCheckStoredPayload payload, DateTimeOffset? expiresAt) + => (Payload, ExpiresAt) = (payload, expiresAt); + + internal ClaimCheckStoredPayload Payload { get; } + internal DateTimeOffset? ExpiresAt { get; } + } +} diff --git a/src/PatternKit.Core/Messaging/Transformation/Normalizer.cs b/src/PatternKit.Core/Messaging/Transformation/Normalizer.cs new file mode 100644 index 00000000..4f70c49e --- /dev/null +++ b/src/PatternKit.Core/Messaging/Transformation/Normalizer.cs @@ -0,0 +1,164 @@ +namespace PatternKit.Messaging.Transformation; + +/// +/// Result returned by . +/// +public sealed class NormalizerResult +{ + private NormalizerResult(TCanonical? canonical, bool normalized, string? handlerName, string? missReason) + { + Canonical = canonical; + Normalized = normalized; + HandlerName = handlerName; + MissReason = missReason; + } + + /// The normalized canonical value when is true. + public TCanonical? Canonical { get; } + + /// Whether normalization succeeded. + public bool Normalized { get; } + + /// The name of the format handler that matched, if any. + public string? HandlerName { get; } + + /// The reason normalization did not succeed, when is false. + public string? MissReason { get; } + + internal static NormalizerResult Success(TCanonical canonical, string handlerName) + => new(canonical, true, handlerName, null); + + internal static NormalizerResult Miss(string reason) + => new(default, false, null, reason); +} + +/// +/// Content-predicate dispatcher that normalizes a raw value into a canonical form. +/// Unlike CanonicalDataModel, format detection is content-based (predicate), not CLR-type-based. +/// Registration order determines priority; first matching format wins. +/// +/// The incoming raw type (e.g., , ). +/// The target canonical type produced after normalization. +public sealed class Normalizer +{ + /// Content predicate that identifies a raw value's format. + public delegate bool FormatPredicate(TRaw raw); + + /// Async normalizer handler for a specific format. + public delegate ValueTask AsyncNormalizerHandler(TRaw raw, CancellationToken cancellationToken); + + private readonly string _name; + private readonly FormatEntry[] _entries; + private readonly AsyncNormalizerHandler? _default; + + private Normalizer(string name, FormatEntry[] entries, AsyncNormalizerHandler? @default) + => (_name, _entries, _default) = (name, entries, @default); + + /// Creates a new normalizer builder. + public static Builder Create(string name = "normalizer") => new(name); + + /// + /// Normalizes by finding the first matching format predicate + /// and invoking its async handler. + /// + public async ValueTask> NormalizeAsync( + TRaw raw, + CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + + foreach (var entry in _entries) + { + if (entry.Predicate(raw)) + { + var canonical = await entry.Handler(raw, cancellationToken).ConfigureAwait(false); + return NormalizerResult.Success(canonical, entry.Name); + } + } + + if (_default is not null) + { + var canonical = await _default(raw, cancellationToken).ConfigureAwait(false); + return NormalizerResult.Success(canonical, "default"); + } + + return NormalizerResult.Miss($"No format handler matched the raw input for normalizer '{_name}'."); + } + + /// Fluent builder for . + public sealed class Builder + { + private readonly string _name; + private readonly List _entries = new(4); + private AsyncNormalizerHandler? _default; + + internal Builder(string name) + { + if (string.IsNullOrWhiteSpace(name)) + throw new ArgumentException("Normalizer name cannot be null, empty, or whitespace.", nameof(name)); + + _name = name; + } + + /// Begins a format clause with an optional name label. + public WhenClause When(FormatPredicate predicate, string? label = null) + { + if (predicate is null) + throw new ArgumentNullException(nameof(predicate)); + + return new WhenClause(this, predicate, label ?? $"format-{_entries.Count + 1}"); + } + + /// Sets the default handler used when no format predicate matches. + public Builder Default(AsyncNormalizerHandler handler) + { + _default = handler ?? throw new ArgumentNullException(nameof(handler)); + return this; + } + + /// Builds an immutable normalizer. + public Normalizer Build() + { + if (_entries.Count == 0 && _default is null) + throw new InvalidOperationException("Normalizer requires at least one format handler or a default handler."); + + return new Normalizer(_name, _entries.ToArray(), _default); + } + + internal Builder AddEntry(string name, FormatPredicate predicate, AsyncNormalizerHandler handler) + { + _entries.Add(new FormatEntry(name, predicate, handler)); + return this; + } + + /// Fluent when-clause for chaining a normalizer handler. + public sealed class WhenClause + { + private readonly Builder _builder; + private readonly FormatPredicate _predicate; + private readonly string _name; + + internal WhenClause(Builder builder, FormatPredicate predicate, string name) + => (_builder, _predicate, _name) = (builder, predicate, name); + + /// Registers the async normalization handler for this format. + public Builder Normalize(AsyncNormalizerHandler handler) + { + if (handler is null) + throw new ArgumentNullException(nameof(handler)); + + return _builder.AddEntry(_name, _predicate, handler); + } + } + } + + private sealed class FormatEntry + { + internal FormatEntry(string name, FormatPredicate predicate, AsyncNormalizerHandler handler) + => (Name, Predicate, Handler) = (name, predicate, handler); + + internal string Name { get; } + internal FormatPredicate Predicate { get; } + internal AsyncNormalizerHandler Handler { get; } + } +} diff --git a/test/PatternKit.Tests/Messaging/Channels/AsyncWireTapTests.cs b/test/PatternKit.Tests/Messaging/Channels/AsyncWireTapTests.cs new file mode 100644 index 00000000..958e1231 --- /dev/null +++ b/test/PatternKit.Tests/Messaging/Channels/AsyncWireTapTests.cs @@ -0,0 +1,173 @@ +using PatternKit.Messaging; +using PatternKit.Messaging.Channels; +using TinyBDD; + +namespace PatternKit.Tests.Messaging.Channels; + +public sealed class AsyncWireTapTests +{ + [Scenario("PublishAsync InvokesAllTapsAndReturnsOriginalMessage")] + [Fact] + public async Task PublishAsync_InvokesAllTapsAndReturnsOriginalMessage() + { + var observed = new List(); + var tap = AsyncWireTap.Create("order-observer") + .Tap("audit", async (m, _, _) => { observed.Add($"audit:{m.Payload.Id}"); await Task.CompletedTask; }) + .Tap("metrics", async (m, _, _) => { observed.Add($"metrics:{m.Payload.Total}"); await Task.CompletedTask; }) + .Build(); + var message = Message.Create(new Order("o-1", 125m)); + + var result = await tap.PublishAsync(message); + + ScenarioExpect.Equal(message, result.Message); + ScenarioExpect.Equal("order-observer", result.TapName); + ScenarioExpect.Equal(2, result.TapResults.Count); + ScenarioExpect.True(result.TapResults.All(r => r.Succeeded)); + ScenarioExpect.Equal(["audit:o-1", "metrics:125"], observed); + } + + [Scenario("PublishAsync PassesContextToTapHandlers")] + [Fact] + public async Task PublishAsync_PassesContextToTapHandlers() + { + string? seenCorrelationId = null; + var tap = AsyncWireTap.Create() + .Tap("audit", async (_, ctx, _) => { seenCorrelationId = ctx.Headers.CorrelationId; await Task.CompletedTask; }) + .Build(); + var context = new MessageContext(MessageHeaders.Empty.WithCorrelationId("corr-1")); + + _ = await tap.PublishAsync(Message.Create(new Order("o-1", 125m)), context); + + ScenarioExpect.Equal("corr-1", seenCorrelationId); + } + + [Scenario("PublishAsync SwallowPolicy DoesNotPropagateTapException")] + [Fact] + public async Task PublishAsync_SwallowPolicy_DoesNotPropagateTapException() + { + var tap = AsyncWireTap.Create() + .Tap("failing", async (_, _, _) => { await Task.CompletedTask; throw new InvalidOperationException("tap error"); }, TapErrorPolicy.Swallow) + .Build(); + var message = Message.Create(new Order("o-1", 100m)); + + var result = await tap.PublishAsync(message); + + ScenarioExpect.Equal(message, result.Message); + ScenarioExpect.Equal(1, result.TapResults.Count); + ScenarioExpect.False(result.TapResults[0].Succeeded); + ScenarioExpect.NotNull(result.TapResults[0].Exception); + } + + [Scenario("PublishAsync LogPolicy InvokesSinkOnError")] + [Fact] + public async Task PublishAsync_LogPolicy_InvokesSinkOnError() + { + Exception? logged = null; + var tap = AsyncWireTap.Create() + .Tap("failing", async (_, _, _) => { await Task.CompletedTask; throw new InvalidOperationException("tap error"); }, + TapErrorPolicy.Log, ex => logged = ex) + .Build(); + var message = Message.Create(new Order("o-1", 100m)); + + var result = await tap.PublishAsync(message); + + ScenarioExpect.Equal(message, result.Message); + ScenarioExpect.NotNull(logged); + ScenarioExpect.False(result.TapResults[0].Succeeded); + } + + [Scenario("PublishAsync PropagatePolicy ThrowsOnTapException")] + [Fact] + public async Task PublishAsync_PropagatePolicy_ThrowsOnTapException() + { + var tap = AsyncWireTap.Create() + .Tap("failing", async (_, _, _) => { await Task.CompletedTask; throw new InvalidOperationException("tap error"); }, + TapErrorPolicy.Propagate) + .Build(); + var message = Message.Create(new Order("o-1", 100m)); + + await ScenarioExpect.ThrowsAsync(() => tap.PublishAsync(message).AsTask()); + } + + [Scenario("PublishAsync MainFlowUnaffectedBySwallowedTapFailure")] + [Fact] + public async Task PublishAsync_MainFlowUnaffectedBySwallowedTapFailure() + { + var secondTapInvoked = false; + var tap = AsyncWireTap.Create() + .Tap("failing", async (_, _, _) => { await Task.CompletedTask; throw new InvalidOperationException(); }, TapErrorPolicy.Swallow) + .Tap("succeeding", async (_, _, _) => { secondTapInvoked = true; await Task.CompletedTask; }) + .Build(); + + var result = await tap.PublishAsync(Message.Create(new Order("o-1", 100m))); + + ScenarioExpect.True(secondTapInvoked); + ScenarioExpect.Equal(2, result.TapResults.Count); + } + + [Scenario("Builder RejectsInvalidConfiguration")] + [Fact] + public void Builder_RejectsInvalidConfiguration() + { + ScenarioExpect.Throws(() => AsyncWireTap.Create("")); + ScenarioExpect.Throws(() => + AsyncWireTap.Create().Tap("", async (_, _, _) => await Task.CompletedTask)); + ScenarioExpect.Throws(() => + AsyncWireTap.Create().Tap("audit", null!)); + ScenarioExpect.Throws(() => + AsyncWireTap.Create().Build()); + } + + [Scenario("Builder LogPolicy RequiresSink")] + [Fact] + public void Builder_LogPolicy_RequiresSink() + { + ScenarioExpect.Throws(() => + AsyncWireTap.Create() + .Tap("audit", async (_, _, _) => await Task.CompletedTask, TapErrorPolicy.Log, null)); + } + + [Scenario("PublishAsync RejectsNullMessage")] + [Fact] + public async Task PublishAsync_RejectsNullMessage() + { + var tap = AsyncWireTap.Create() + .Tap("audit", async (_, _, _) => await Task.CompletedTask) + .Build(); + + await ScenarioExpect.ThrowsAsync(() => tap.PublishAsync(null!).AsTask()); + } + + [Scenario("PublishAsync RespectsCancellationToken")] + [Fact] + public async Task PublishAsync_RespectsCancellationToken() + { + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + // Propagate policy will propagate OperationCanceledException + var tap2 = AsyncWireTap.Create() + .Tap("audit", async (_, _, ct) => { await Task.Delay(100, ct); }, TapErrorPolicy.Propagate) + .Build(); + + await ScenarioExpect.ThrowsAsync( + () => tap2.PublishAsync(Message.Create(new Order("o-1", 100m)), cancellationToken: cts.Token).AsTask()); + } + + [Scenario("PublishAsync SwallowPolicy ReThrowsOCEOnCallerCancellation")] + [Fact] + public async Task PublishAsync_SwallowPolicy_ReThrowsOCEOnCallerCancellation() + { + // Swallow policy must NOT suppress OperationCanceledException when the caller's CT is cancelled. + using var cts = new CancellationTokenSource(); + cts.Cancel(); + var tap = AsyncWireTap.Create() + .Tap("audit", async (_, _, ct) => { await Task.Delay(100, ct); }, TapErrorPolicy.Swallow) + .Build(); + + await ScenarioExpect.ThrowsAsync( + () => tap.PublishAsync(Message.Create(new Order("o-1", 100m)), cancellationToken: cts.Token).AsTask()); + } + + private sealed record Order(string Id, decimal Total); +} diff --git a/test/PatternKit.Tests/Messaging/Consumers/AsyncPollingConsumerTests.cs b/test/PatternKit.Tests/Messaging/Consumers/AsyncPollingConsumerTests.cs new file mode 100644 index 00000000..710b2f27 --- /dev/null +++ b/test/PatternKit.Tests/Messaging/Consumers/AsyncPollingConsumerTests.cs @@ -0,0 +1,167 @@ +using PatternKit.Messaging; +using PatternKit.Messaging.Consumers; +using TinyBDD; + +namespace PatternKit.Tests.Messaging.Consumers; + +public sealed class AsyncPollingConsumerTests +{ + [Scenario("RunAsync InvokesHandlerForEachMessage")] + [Fact] + public async Task RunAsync_InvokesHandlerForEachMessage() + { + var received = new List(); + var pollCount = 0; + using var cts = new CancellationTokenSource(); + + var consumer = AsyncPollingConsumer.Create() + .WithSource(async (_, ct) => + { + await Task.CompletedTask; + if (Interlocked.Increment(ref pollCount) > 3) + { + cts.Cancel(); + return null; + } + return Message.Create($"msg-{pollCount}"); + }) + .WithInterval(TimeSpan.FromMilliseconds(10)) + .Build(); + + await consumer.RunAsync( + async (msg, _, _) => { received.Add(msg.Payload); await Task.CompletedTask; }, + cancellationToken: cts.Token); + + ScenarioExpect.Equal(3, received.Count); + ScenarioExpect.Equal("msg-1", received[0]); + } + + [Scenario("RunAsync StopsOnCancellation")] + [Fact] + public async Task RunAsync_StopsOnCancellation() + { + using var cts = new CancellationTokenSource(50); // cancel after 50ms + var invocations = 0; + + var consumer = AsyncPollingConsumer.Create() + .WithSource(async (_, _) => { await Task.CompletedTask; return null; }) + .WithInterval(TimeSpan.FromMilliseconds(10)) + .Build(); + + await consumer.RunAsync( + async (_, _, _) => { Interlocked.Increment(ref invocations); await Task.CompletedTask; }, + cancellationToken: cts.Token); + + // Should complete without throwing, just stop + ScenarioExpect.True(invocations == 0); // all polls returned null + } + + [Scenario("RunAsync EmptyPollConstantBackOff")] + [Fact] + public async Task RunAsync_EmptyPoll_ConstantBackOff() + { + var pollCount = 0; + using var cts = new CancellationTokenSource(); + + var consumer = AsyncPollingConsumer.Create() + .WithSource(async (_, _) => { await Task.CompletedTask; Interlocked.Increment(ref pollCount); return null; }) + .WithInterval(TimeSpan.FromMilliseconds(20)) + .OnEmpty(BackOffPolicy.Constant) + .Build(); + + _ = Task.Run(async () => + { + await Task.Delay(150); + cts.Cancel(); + }); + + await consumer.RunAsync(async (_, _, _) => await Task.CompletedTask, cancellationToken: cts.Token); + + // Should have polled several times in 150ms with 20ms interval + ScenarioExpect.True(pollCount >= 3, $"Expected >= 3 polls but got {pollCount}"); + } + + [Scenario("RunAsync EmptyPollExponentialBackOff")] + [Fact] + public async Task RunAsync_EmptyPoll_ExponentialBackOff() + { + var pollTimestamps = new List(); + using var cts = new CancellationTokenSource(); + + var consumer = AsyncPollingConsumer.Create() + .WithSource(async (_, _) => + { + await Task.CompletedTask; + pollTimestamps.Add(DateTimeOffset.UtcNow); + return null; + }) + .WithInterval(TimeSpan.FromMilliseconds(10)) + .OnEmpty(BackOffPolicy.Exponential, cap: TimeSpan.FromMilliseconds(500)) + .Build(); + + _ = Task.Run(async () => + { + await Task.Delay(300); + cts.Cancel(); + }); + + await consumer.RunAsync(async (_, _, _) => await Task.CompletedTask, cancellationToken: cts.Token); + + // With exponential backoff, later intervals should be longer + ScenarioExpect.True(pollTimestamps.Count >= 2); + } + + [Scenario("RunAsync JitterIsApplied")] + [Fact] + public async Task RunAsync_JitterIsApplied() + { + var pollTimestamps = new List(); + using var cts = new CancellationTokenSource(); + var pollCount = 0; + + var consumer = AsyncPollingConsumer.Create() + .WithSource(async (_, _) => + { + await Task.CompletedTask; + pollTimestamps.Add(DateTimeOffset.UtcNow); + if (Interlocked.Increment(ref pollCount) >= 5) cts.Cancel(); + return null; + }) + .WithInterval(TimeSpan.FromMilliseconds(10)) + .WithJitter(TimeSpan.FromMilliseconds(20)) + .Build(); + + await consumer.RunAsync(async (_, _, _) => await Task.CompletedTask, cancellationToken: cts.Token); + + ScenarioExpect.True(pollTimestamps.Count >= 2); + } + + [Scenario("Builder RejectsInvalidConfiguration")] + [Fact] + public void Builder_RejectsInvalidConfiguration() + { + ScenarioExpect.Throws(() => AsyncPollingConsumer.Create("")); + ScenarioExpect.Throws(() => AsyncPollingConsumer.Create().Build()); + ScenarioExpect.Throws(() => + AsyncPollingConsumer.Create() + .WithSource(async (_, _) => { await Task.CompletedTask; return null; }) + .WithInterval(TimeSpan.Zero) + .Build()); + ScenarioExpect.Throws(() => + AsyncPollingConsumer.Create() + .WithSource(async (_, _) => { await Task.CompletedTask; return null; }) + .WithJitter(TimeSpan.FromMilliseconds(-1)) + .Build()); + } + + [Scenario("RunAsync RejectsNullHandler")] + [Fact] + public async Task RunAsync_RejectsNullHandler() + { + var consumer = AsyncPollingConsumer.Create() + .WithSource(async (_, _) => { await Task.CompletedTask; return null; }) + .Build(); + + await ScenarioExpect.ThrowsAsync(() => consumer.RunAsync(null!).AsTask()); + } +} diff --git a/test/PatternKit.Tests/Messaging/Reliability/IOutboxStoreTests.cs b/test/PatternKit.Tests/Messaging/Reliability/IOutboxStoreTests.cs new file mode 100644 index 00000000..378f74b4 --- /dev/null +++ b/test/PatternKit.Tests/Messaging/Reliability/IOutboxStoreTests.cs @@ -0,0 +1,162 @@ +using PatternKit.Messaging; +using PatternKit.Messaging.Reliability; +using TinyBDD; + +namespace PatternKit.Tests.Messaging.Reliability; + +public sealed class IOutboxStoreTests +{ + [Scenario("InMemoryOutboxStore EnqueueAsync AddsRecord")] + [Fact] + public async Task InMemoryOutboxStore_EnqueueAsync_AddsRecord() + { + var store = new InMemoryOutboxStore(); + var message = Message.Create("hello"); + + var record = await store.EnqueueAsync(message); + + ScenarioExpect.NotNull(record); + ScenarioExpect.False(record.Dispatched); + ScenarioExpect.Equal(1, store.Records.Count); + } + + [Scenario("InMemoryOutboxStore SnapshotPendingAsync ReturnsOnlyUndispatched")] + [Fact] + public async Task InMemoryOutboxStore_SnapshotPendingAsync_ReturnsOnlyUndispatched() + { + var store = new InMemoryOutboxStore(); + var r1 = await store.EnqueueAsync(Message.Create("msg-1")); + var r2 = await store.EnqueueAsync(Message.Create("msg-2")); + await store.MarkDispatchedAsync(r1.Id, DateTimeOffset.UtcNow); + + var pending = await store.SnapshotPendingAsync(); + + ScenarioExpect.Equal(1, pending.Count); + ScenarioExpect.Equal(r2.Id, pending[0].Id); + } + + [Scenario("InMemoryOutboxStore MarkDispatchedAsync SetsDispatchedFlag")] + [Fact] + public async Task InMemoryOutboxStore_MarkDispatchedAsync_SetsDispatchedFlag() + { + var store = new InMemoryOutboxStore(); + var record = await store.EnqueueAsync(Message.Create("msg")); + var dispatchTime = DateTimeOffset.UtcNow; + + await store.MarkDispatchedAsync(record.Id, dispatchTime); + + var updated = store.Records.Single(r => r.Id == record.Id); + ScenarioExpect.True(updated.Dispatched); + } + + [Scenario("InMemoryOutboxStore MarkFailedAsync RecordsError")] + [Fact] + public async Task InMemoryOutboxStore_MarkFailedAsync_RecordsError() + { + var store = new InMemoryOutboxStore(); + var record = await store.EnqueueAsync(Message.Create("msg")); + + await store.MarkFailedAsync(record.Id, "network error"); + + var updated = store.Records.Single(r => r.Id == record.Id); + ScenarioExpect.Equal("network error", updated.LastError); + ScenarioExpect.Equal(1, updated.Attempts); + } + + [Scenario("OutboxDispatcher DrainAsync DispatchesAndMarksRecords")] + [Fact] + public async Task OutboxDispatcher_DrainAsync_DispatchesAndMarksRecords() + { + var store = new InMemoryOutboxStore(); + await store.EnqueueAsync(Message.Create("msg-1")); + await store.EnqueueAsync(Message.Create("msg-2")); + + var dispatched = new List(); + var mockDispatcher = new LambdaDispatcher(async (record, _) => + { + dispatched.Add(record.Message.Payload); + await Task.CompletedTask; + }); + + var dispatcher = new OutboxDispatcher(store, mockDispatcher); + var count = await dispatcher.DrainAsync(); + + ScenarioExpect.Equal(2, count); + ScenarioExpect.Equal(["msg-1", "msg-2"], dispatched); + ScenarioExpect.Empty(await store.SnapshotPendingAsync()); + } + + [Scenario("OutboxDispatcher DrainAsync PropagatesDispatcherException")] + [Fact] + public async Task OutboxDispatcher_DrainAsync_PropagatesDispatcherException() + { + var store = new InMemoryOutboxStore(); + await store.EnqueueAsync(Message.Create("msg-1")); + + var mockDispatcher = new LambdaDispatcher(async (_, _) => + { + await Task.CompletedTask; + throw new InvalidOperationException("dispatch failed"); + }); + + var dispatcher = new OutboxDispatcher(store, mockDispatcher); + + await ScenarioExpect.ThrowsAsync(() => dispatcher.DrainAsync().AsTask()); + + var record = store.Records.Single(); + ScenarioExpect.Equal("dispatch failed", record.LastError); + } + + [Scenario("OutboxDispatcher RunAsync DrainsContinuously")] + [Fact] + public async Task OutboxDispatcher_RunAsync_DrainsContinuously() + { + var store = new InMemoryOutboxStore(); + await store.EnqueueAsync(Message.Create("msg-1")); + await store.EnqueueAsync(Message.Create("msg-2")); + + using var cts = new CancellationTokenSource(); + var drainCount = 0; + + var mockDispatcher = new LambdaDispatcher(async (_, _) => + { + await Task.CompletedTask; + if (Interlocked.Increment(ref drainCount) >= 2) + cts.Cancel(); + }); + + var dispatcher = new OutboxDispatcher(store, mockDispatcher); + await dispatcher.RunAsync(TimeSpan.FromMilliseconds(10), cts.Token); + + ScenarioExpect.Equal(2, drainCount); + } + + [Scenario("OutboxDispatcher Constructor RejectsNullArguments")] + [Fact] + public void OutboxDispatcher_Constructor_RejectsNullArguments() + { + var store = new InMemoryOutboxStore(); + ScenarioExpect.Throws(() => new OutboxDispatcher(null!, new LambdaDispatcher(async (_, _) => await Task.CompletedTask))); + ScenarioExpect.Throws(() => new OutboxDispatcher(store, null!)); + } + + [Scenario("InMemoryOutboxStore EnqueueAsync RejectsNullMessage")] + [Fact] + public async Task InMemoryOutboxStore_EnqueueAsync_RejectsNullMessage() + { + var store = new InMemoryOutboxStore(); + + await ScenarioExpect.ThrowsAsync(() => store.EnqueueAsync(null!).AsTask()); + } + + private sealed class LambdaDispatcher : IOutboxDispatcher + { + private readonly Func, CancellationToken, ValueTask> _dispatch; + + internal LambdaDispatcher(Func, CancellationToken, ValueTask> dispatch) + => _dispatch = dispatch; + + public ValueTask DispatchAsync(OutboxMessage message, CancellationToken cancellationToken = default) + => _dispatch(message, cancellationToken); + } +} diff --git a/test/PatternKit.Tests/Messaging/Reliability/InMemoryIdempotencyStoreWithTtlTests.cs b/test/PatternKit.Tests/Messaging/Reliability/InMemoryIdempotencyStoreWithTtlTests.cs new file mode 100644 index 00000000..712a72c0 --- /dev/null +++ b/test/PatternKit.Tests/Messaging/Reliability/InMemoryIdempotencyStoreWithTtlTests.cs @@ -0,0 +1,135 @@ +using PatternKit.Messaging.Reliability; +using TinyBDD; + +namespace PatternKit.Tests.Messaging.Reliability; + +public sealed class InMemoryIdempotencyStoreWithTtlTests +{ + [Scenario("TryClaimAsync ClaimsNewKey")] + [Fact] + public async Task TryClaimAsync_ClaimsNewKey() + { + var store = new InMemoryIdempotencyStoreWithTtl(); + + var claim = await store.TryClaimAsync("key-1"); + + ScenarioExpect.True(claim.Claimed); + ScenarioExpect.Equal("key-1", claim.Key); + } + + [Scenario("TryClaimAsync DoesNotClaimExistingKey")] + [Fact] + public async Task TryClaimAsync_DoesNotClaimExistingKey() + { + var store = new InMemoryIdempotencyStoreWithTtl(); + await store.TryClaimAsync("key-1"); + + var second = await store.TryClaimAsync("key-1"); + + ScenarioExpect.False(second.Claimed); + } + + [Scenario("TryClaimAsync WithTtl ClaimsExpiredKey")] + [Fact] + public async Task TryClaimAsync_WithTtl_ClaimsExpiredKey() + { + var store = new InMemoryIdempotencyStoreWithTtl(); + await store.TryClaimAsync("key-1", TimeSpan.FromMilliseconds(20)); + + await Task.Delay(50); // wait for TTL to expire + + var second = await store.TryClaimAsync("key-1"); + ScenarioExpect.True(second.Claimed); + } + + [Scenario("TryClaimAsync WithTtl DoesNotClaimActiveKey")] + [Fact] + public async Task TryClaimAsync_WithTtl_DoesNotClaimActiveKey() + { + var store = new InMemoryIdempotencyStoreWithTtl(); + await store.TryClaimAsync("key-1", TimeSpan.FromMinutes(10)); + + var second = await store.TryClaimAsync("key-1"); + ScenarioExpect.False(second.Claimed); + } + + [Scenario("EvictExpiredAsync RemovesExpiredKeys")] + [Fact] + public async Task EvictExpiredAsync_RemovesExpiredKeys() + { + var store = new InMemoryIdempotencyStoreWithTtl(); + await store.TryClaimAsync("key-expire", TimeSpan.FromMilliseconds(20)); + await store.TryClaimAsync("key-keep", TimeSpan.FromMinutes(10)); + + await Task.Delay(50); + var evicted = await store.EvictExpiredAsync(); + + ScenarioExpect.Equal(1, evicted); + ScenarioExpect.Equal(1, store.Count); + } + + [Scenario("EvictExpiredAsync NoExpiredKeys ReturnsZero")] + [Fact] + public async Task EvictExpiredAsync_NoExpiredKeys_ReturnsZero() + { + var store = new InMemoryIdempotencyStoreWithTtl(); + await store.TryClaimAsync("key-1", TimeSpan.FromMinutes(5)); + + var evicted = await store.EvictExpiredAsync(); + + ScenarioExpect.Equal(0, evicted); + } + + [Scenario("MarkCompletedAsync KeepsExistingTtl")] + [Fact] + public async Task MarkCompletedAsync_KeepsExistingTtl() + { + var store = new InMemoryIdempotencyStoreWithTtl(); + await store.TryClaimAsync("key-1", TimeSpan.FromMilliseconds(20)); + await store.MarkCompletedAsync("key-1", "result"); + + await Task.Delay(50); + // After TTL, should be claimable again + var third = await store.TryClaimAsync("key-1"); + ScenarioExpect.True(third.Claimed); + } + + [Scenario("TryClaimAsync RejectsEmptyKey")] + [Fact] + public async Task TryClaimAsync_RejectsEmptyKey() + { + var store = new InMemoryIdempotencyStoreWithTtl(); + + await ScenarioExpect.ThrowsAsync(() => store.TryClaimAsync("").AsTask()); + } + + [Scenario("TryClaimAsync RejectsNegativeTtl")] + [Fact] + public async Task TryClaimAsync_RejectsNegativeTtl() + { + var store = new InMemoryIdempotencyStoreWithTtl(); + + await ScenarioExpect.ThrowsAsync( + () => store.TryClaimAsync("key-1", TimeSpan.FromSeconds(-1)).AsTask()); + } + + [Scenario("ConcurrentClaims OnlyOneSucceeds")] + [Fact] + public async Task ConcurrentClaims_OnlyOneSucceeds() + { + var store = new InMemoryIdempotencyStoreWithTtl(); + var claimCount = 0; + + var tasks = Enumerable.Range(0, 10) + .Select(_ => Task.Run(async () => + { + var claim = await store.TryClaimAsync("shared-key"); + if (claim.Claimed) + Interlocked.Increment(ref claimCount); + })); + + await Task.WhenAll(tasks); + + ScenarioExpect.Equal(1, claimCount); + } +} diff --git a/test/PatternKit.Tests/Messaging/Routing/AsyncScatterGatherTests.cs b/test/PatternKit.Tests/Messaging/Routing/AsyncScatterGatherTests.cs new file mode 100644 index 00000000..0881812b --- /dev/null +++ b/test/PatternKit.Tests/Messaging/Routing/AsyncScatterGatherTests.cs @@ -0,0 +1,163 @@ +using PatternKit.Messaging; +using PatternKit.Messaging.Routing; +using TinyBDD; + +namespace PatternKit.Tests.Messaging.Routing; + +public sealed class AsyncScatterGatherTests +{ + [Scenario("DispatchAsync CollectsAllRecipientResponses")] + [Fact] + public async Task DispatchAsync_CollectsAllRecipientResponses() + { + var sg = AsyncScatterGather.Create() + .Recipient("a", async (m, _, _) => { await Task.Delay(10); return 1; }) + .Recipient("b", async (m, _, _) => { await Task.Delay(5); return 2; }) + .Recipient("c", async (m, _, _) => { await Task.Delay(1); return 3; }) + .CompleteWith(CompletionStrategy.All) + .WithAggregator((envelopes, _, _) => envelopes.Where(e => e.Succeeded).Sum(e => e.Response)) + .Build(); + + var result = await sg.DispatchAsync(Message.Create("test")); + + ScenarioExpect.True(result.Succeeded); + ScenarioExpect.Equal(6, result.Result); + ScenarioExpect.Equal(3, result.Envelopes.Count); + } + + [Scenario("DispatchAsync PerBranchErrorIsolation")] + [Fact] + public async Task DispatchAsync_PerBranchErrorIsolation() + { + var sg = AsyncScatterGather.Create() + .Recipient("good", async (m, _, _) => { await Task.CompletedTask; return 42; }) + .Recipient("bad", async (m, _, _) => { await Task.CompletedTask; throw new InvalidOperationException("branch error"); }) + .WithAggregator((envelopes, _, _) => envelopes.Where(e => e.Succeeded).Sum(e => e.Response)) + .Build(); + + var result = await sg.DispatchAsync(Message.Create("test")); + + ScenarioExpect.True(result.Succeeded); + ScenarioExpect.Equal(42, result.Result); + var failedEnvelope = ScenarioExpect.Single(result.Envelopes, e => !e.Succeeded); + ScenarioExpect.NotNull(failedEnvelope.Exception); + } + + [Scenario("DispatchAsync TimeoutStrategy PartialResults")] + [Fact] + public async Task DispatchAsync_TimeoutStrategy_PartialResults() + { + var sg = AsyncScatterGather.Create() + .Recipient("fast", async (m, _, _) => { await Task.Delay(5); return 1; }) + .Recipient("slow", async (m, _, ct) => { await Task.Delay(5000, ct); return 2; }) + .CompleteWith(CompletionStrategy.Timeout(TimeSpan.FromMilliseconds(200))) + .WithAggregator((envelopes, _, _) => envelopes.Where(e => e.Succeeded).Sum(e => e.Response)) + .Build(); + + var result = await sg.DispatchAsync(Message.Create("test")); + + // At least one result came back (fast one) + ScenarioExpect.True(result.Succeeded); + ScenarioExpect.True(result.Result >= 1); + } + + [Scenario("DispatchAsync FirstN Strategy StopsAfterN")] + [Fact] + public async Task DispatchAsync_FirstNStrategy_StopsAfterN() + { + var sg = AsyncScatterGather.Create() + .Recipient("a", async (m, _, _) => { await Task.Delay(10); return 1; }) + .Recipient("b", async (m, _, _) => { await Task.Delay(10); return 2; }) + .Recipient("c", async (m, _, _) => { await Task.Delay(10); return 3; }) + .CompleteWith(CompletionStrategy.FirstN(2)) + .WithAggregator((envelopes, _, _) => envelopes.Count(e => e.Succeeded)) + .Build(); + + var result = await sg.DispatchAsync(Message.Create("test")); + + ScenarioExpect.True(result.Succeeded); + // Should have at least 2 successful results + ScenarioExpect.True(result.Result >= 2); + } + + [Scenario("DispatchAsync QuorumStrategy WaitsForQuorum")] + [Fact] + public async Task DispatchAsync_QuorumStrategy_WaitsForQuorum() + { + var sg = AsyncScatterGather.Create() + .Recipient("a", async (m, _, _) => { await Task.Delay(5); return 10; }) + .Recipient("b", async (m, _, _) => { await Task.Delay(10); return 20; }) + .Recipient("c", async (m, _, _) => { await Task.Delay(15); return 30; }) + .CompleteWith(CompletionStrategy.Quorum(2)) + .WithAggregator((envelopes, _, _) => envelopes.Where(e => e.Succeeded).Sum(e => e.Response)) + .Build(); + + var result = await sg.DispatchAsync(Message.Create("test")); + + ScenarioExpect.True(result.Succeeded); + ScenarioExpect.True(result.Result >= 30); // at least a and b responded + } + + [Scenario("DispatchAsync QuorumStrategy CountsFailedRecipientsTowardQuorum")] + [Fact] + public async Task DispatchAsync_QuorumStrategy_CountsFailedRecipientsTowardQuorum() + { + // A failing recipient counts toward quorum — quorum means "any N responses", not "N successes". + var completedCount = 0; + var sg = AsyncScatterGather.Create() + .Recipient("failing", async (m, _, _) => { await Task.CompletedTask; throw new InvalidOperationException("recipient error"); }) + .Recipient("slow", async (m, _, ct) => { await Task.Delay(5000, ct); Interlocked.Increment(ref completedCount); return 99; }) + .CompleteWith(CompletionStrategy.Quorum(1)) + .WithAggregator((envelopes, _, _) => envelopes.Count()) + .Build(); + + var result = await sg.DispatchAsync(Message.Create("test")); + + // Quorum(1) satisfied by the failing recipient; slow recipient was cancelled before completing. + ScenarioExpect.True(result.Succeeded); + ScenarioExpect.Equal(0, completedCount); // slow never completed + } + + [Scenario("DispatchAsync AllFail AggregatorReceivesFailedEnvelopes")] + [Fact] + public async Task DispatchAsync_AllFail_AggregatorReceivesFailedEnvelopes() + { + var sg = AsyncScatterGather.Create() + .Recipient("bad", async (m, _, _) => { await Task.CompletedTask; throw new InvalidOperationException(); }) + .WithAggregator((envelopes, _, _) => envelopes.Count(e => !e.Succeeded)) + .Build(); + + var result = await sg.DispatchAsync(Message.Create("test")); + + // Aggregator runs even if all recipients failed; it receives the failed envelopes + ScenarioExpect.True(result.Succeeded); + ScenarioExpect.Equal(1, result.Result); // one failed envelope + ScenarioExpect.Equal(1, result.Envelopes.Count(e => !e.Succeeded)); + } + + [Scenario("Builder RejectsInvalidConfiguration")] + [Fact] + public void Builder_RejectsInvalidConfiguration() + { + ScenarioExpect.Throws(() => + AsyncScatterGather.Create("")); + ScenarioExpect.Throws(() => + AsyncScatterGather.Create().WithAggregator((e, _, _) => 0).Build()); + ScenarioExpect.Throws(() => + AsyncScatterGather.Create() + .Recipient("a", async (m, _, _) => { await Task.CompletedTask; return 0; }) + .Build()); + } + + [Scenario("DispatchAsync RejectsNullMessage")] + [Fact] + public async Task DispatchAsync_RejectsNullMessage() + { + var sg = AsyncScatterGather.Create() + .Recipient("a", async (m, _, _) => { await Task.CompletedTask; return 0; }) + .WithAggregator((e, _, _) => 0) + .Build(); + + await ScenarioExpect.ThrowsAsync(() => sg.DispatchAsync(null!).AsTask()); + } +} diff --git a/test/PatternKit.Tests/Messaging/Transformation/AsyncContentEnricherTests.cs b/test/PatternKit.Tests/Messaging/Transformation/AsyncContentEnricherTests.cs new file mode 100644 index 00000000..2cdd67bf --- /dev/null +++ b/test/PatternKit.Tests/Messaging/Transformation/AsyncContentEnricherTests.cs @@ -0,0 +1,157 @@ +using PatternKit.Messaging; +using PatternKit.Messaging.Transformation; +using TinyBDD; + +namespace PatternKit.Tests.Messaging.Transformation; + +public sealed class AsyncContentEnricherTests +{ + [Scenario("EnrichAsync AppliesAllStepsInOrder")] + [Fact] + public async Task EnrichAsync_AppliesAllStepsInOrder() + { + var enricher = AsyncContentEnricher.Create() + .Enrich("add-email", async (c, _, _) => { await Task.CompletedTask; return c with { Email = "user@example.com" }; }) + .Enrich("add-tier", async (c, _, _) => { await Task.CompletedTask; return c with { Tier = "Gold" }; }) + .Build(); + + var message = Message.Create(new Customer("Alice", null, null)); + var result = await enricher.EnrichAsync(message); + + ScenarioExpect.Equal("user@example.com", result.Message.Payload.Email); + ScenarioExpect.Equal("Gold", result.Message.Payload.Tier); + ScenarioExpect.Equal(2, result.StepResults.Count); + ScenarioExpect.True(result.StepResults.All(r => r.Applied)); + } + + [Scenario("EnrichAsync PreservesHeadersUnchanged")] + [Fact] + public async Task EnrichAsync_PreservesHeadersUnchanged() + { + var enricher = AsyncContentEnricher.Create() + .Enrich("add-email", async (c, _, _) => { await Task.CompletedTask; return c with { Email = "x@x.com" }; }) + .Build(); + + var message = new Message(new Customer("Alice", null, null), MessageHeaders.Empty.WithCorrelationId("corr-99")); + var result = await enricher.EnrichAsync(message); + + ScenarioExpect.Equal("corr-99", result.Message.Headers.CorrelationId); + } + + [Scenario("EnrichAsync ThrowPolicy PropagatesException")] + [Fact] + public async Task EnrichAsync_ThrowPolicy_PropagatesException() + { + var enricher = AsyncContentEnricher.Create() + .Enrich("failing", async (c, _, _) => { await Task.CompletedTask; throw new InvalidOperationException("fetch error"); }, + EnrichmentErrorPolicy.Throw) + .Build(); + + await ScenarioExpect.ThrowsAsync( + () => enricher.EnrichAsync(Message.Create(new Customer("Alice", null, null))).AsTask()); + } + + [Scenario("EnrichAsync SkipPolicy ContinuesPipelineOnFailure")] + [Fact] + public async Task EnrichAsync_SkipPolicy_ContinuesPipelineOnFailure() + { + var enricher = AsyncContentEnricher.Create() + .Enrich("failing", async (c, _, _) => { await Task.CompletedTask; throw new InvalidOperationException(); }, + EnrichmentErrorPolicy.Skip) + .Enrich("succeeding", async (c, _, _) => { await Task.CompletedTask; return c with { Tier = "Bronze" }; }) + .Build(); + + var result = await enricher.EnrichAsync(Message.Create(new Customer("Alice", null, null))); + + ScenarioExpect.Equal("Bronze", result.Message.Payload.Tier); + ScenarioExpect.True(result.StepResults[0].Skipped); + ScenarioExpect.True(result.StepResults[1].Applied); + } + + [Scenario("EnrichAsync UseDefaultPolicy AppliesDefaultOnFailure")] + [Fact] + public async Task EnrichAsync_UseDefaultPolicy_AppliesDefaultOnFailure() + { + var enricher = AsyncContentEnricher.Create() + .Enrich("failing", + async (c, _, _) => { await Task.CompletedTask; throw new InvalidOperationException(); }, + EnrichmentErrorPolicy.UseDefault, + c => c with { Email = "noreply@default.com" }) + .Build(); + + var result = await enricher.EnrichAsync(Message.Create(new Customer("Alice", null, null))); + + ScenarioExpect.Equal("noreply@default.com", result.Message.Payload.Email); + ScenarioExpect.True(result.StepResults[0].Skipped); + } + + [Scenario("EnrichAsync StepAuditTrailCapturesException")] + [Fact] + public async Task EnrichAsync_StepAuditTrailCapturesException() + { + var enricher = AsyncContentEnricher.Create() + .WithDefaultPolicy(EnrichmentErrorPolicy.Skip) + .Enrich("failing", async (c, _, _) => { await Task.CompletedTask; throw new InvalidOperationException("fetch error"); }) + .Build(); + + var result = await enricher.EnrichAsync(Message.Create(new Customer("Alice", null, null))); + + var failedStep = result.StepResults[0]; + ScenarioExpect.NotNull(failedStep.Exception); + ScenarioExpect.Equal("InvalidOperationException", failedStep.Exception!.GetType().Name); + } + + [Scenario("Builder RejectsInvalidConfiguration")] + [Fact] + public void Builder_RejectsInvalidConfiguration() + { + ScenarioExpect.Throws(() => AsyncContentEnricher.Create("")); + ScenarioExpect.Throws(() => + AsyncContentEnricher.Create().Enrich("", async (c, _, _) => { await Task.CompletedTask; return c; })); + ScenarioExpect.Throws(() => + AsyncContentEnricher.Create().Enrich("step", null!)); + ScenarioExpect.Throws(() => + AsyncContentEnricher.Create().Build()); + } + + [Scenario("Builder RejectsUseDefaultWithoutFactory")] + [Fact] + public void Builder_RejectsUseDefaultWithoutFactory() + { + // UseDefault without a defaultFactory would silently behave like Skip; require factory at build time. + ScenarioExpect.Throws(() => + AsyncContentEnricher.Create() + .Enrich("step", async (c, _, _) => { await Task.CompletedTask; return c; }, + EnrichmentErrorPolicy.UseDefault, defaultFactory: null) + .Build()); + } + + [Scenario("EnrichAsync SkipPolicy ReThrowsOCEOnCallerCancellation")] + [Fact] + public async Task EnrichAsync_SkipPolicy_ReThrowsOCEOnCallerCancellation() + { + // Skip policy must NOT suppress OperationCanceledException when the caller's CT is cancelled. + using var cts = new CancellationTokenSource(); + cts.Cancel(); + var enricher = AsyncContentEnricher.Create() + .Enrich("step", async (c, _, ct) => { await Task.Delay(100, ct); return c; }, + EnrichmentErrorPolicy.Skip) + .Build(); + + await ScenarioExpect.ThrowsAsync( + () => enricher.EnrichAsync(Message.Create(new Customer("Alice", null, null)), cancellationToken: cts.Token).AsTask()); + } + + [Scenario("EnrichAsync RejectsNullMessage")] + [Fact] + public async Task EnrichAsync_RejectsNullMessage() + { + var enricher = AsyncContentEnricher.Create() + .Enrich("step", async (c, _, _) => { await Task.CompletedTask; return c; }) + .Build(); + + await ScenarioExpect.ThrowsAsync(() => enricher.EnrichAsync(null!).AsTask()); + } + + private sealed record Customer(string Name, string? Email, string? Tier); +} diff --git a/test/PatternKit.Tests/Messaging/Transformation/InMemoryClaimCheckStoreWithTtlTests.cs b/test/PatternKit.Tests/Messaging/Transformation/InMemoryClaimCheckStoreWithTtlTests.cs new file mode 100644 index 00000000..841d40fa --- /dev/null +++ b/test/PatternKit.Tests/Messaging/Transformation/InMemoryClaimCheckStoreWithTtlTests.cs @@ -0,0 +1,116 @@ +using PatternKit.Messaging; +using PatternKit.Messaging.Transformation; +using TinyBDD; + +namespace PatternKit.Tests.Messaging.Transformation; + +public sealed class InMemoryClaimCheckStoreWithTtlTests +{ + [Scenario("StoreAsync And TryLoadAsync RoundTrip")] + [Fact] + public async Task StoreAsync_And_TryLoadAsync_RoundTrip() + { + var store = new InMemoryClaimCheckStoreWithTtl(); + await store.StoreAsync("claim-1", "payload-1", MessageHeaders.Empty); + + var loaded = await store.TryLoadAsync("claim-1"); + + ScenarioExpect.NotNull(loaded); + ScenarioExpect.Equal("payload-1", loaded!.Payload); + } + + [Scenario("TryLoadAsync ReturnsNullForUnknownClaim")] + [Fact] + public async Task TryLoadAsync_ReturnsNullForUnknownClaim() + { + var store = new InMemoryClaimCheckStoreWithTtl(); + + var loaded = await store.TryLoadAsync("nonexistent"); + + ScenarioExpect.Null(loaded); + } + + [Scenario("StoreAsync WithTtl ExpiredEntryNotReturned")] + [Fact] + public async Task StoreAsync_WithTtl_ExpiredEntryNotReturned() + { + var store = new InMemoryClaimCheckStoreWithTtl(); + await store.StoreAsync("claim-1", "payload-1", MessageHeaders.Empty, TimeSpan.FromMilliseconds(20)); + + await Task.Delay(50); + var loaded = await store.TryLoadAsync("claim-1"); + + ScenarioExpect.Null(loaded); + } + + [Scenario("StoreAsync WithTtl ActiveEntryReturned")] + [Fact] + public async Task StoreAsync_WithTtl_ActiveEntryReturned() + { + var store = new InMemoryClaimCheckStoreWithTtl(); + await store.StoreAsync("claim-1", "payload-1", MessageHeaders.Empty, TimeSpan.FromMinutes(10)); + + var loaded = await store.TryLoadAsync("claim-1"); + + ScenarioExpect.NotNull(loaded); + } + + [Scenario("EvictExpiredAsync RemovesExpiredEntries")] + [Fact] + public async Task EvictExpiredAsync_RemovesExpiredEntries() + { + var store = new InMemoryClaimCheckStoreWithTtl(); + await store.StoreAsync("claim-expire", "a", MessageHeaders.Empty, TimeSpan.FromMilliseconds(20)); + await store.StoreAsync("claim-keep", "b", MessageHeaders.Empty, TimeSpan.FromMinutes(10)); + + await Task.Delay(50); + var evicted = await store.EvictExpiredAsync(); + + ScenarioExpect.Equal(1, evicted); + ScenarioExpect.Null(await store.TryLoadAsync("claim-expire")); + ScenarioExpect.NotNull(await store.TryLoadAsync("claim-keep")); + } + + [Scenario("StoreAsync PreservesHeaders")] + [Fact] + public async Task StoreAsync_PreservesHeaders() + { + var store = new InMemoryClaimCheckStoreWithTtl(); + var headers = MessageHeaders.Empty.WithCorrelationId("corr-42"); + await store.StoreAsync("claim-1", "payload", headers); + + var loaded = await store.TryLoadAsync("claim-1"); + + ScenarioExpect.Equal("corr-42", loaded!.Headers.CorrelationId); + } + + [Scenario("StoreAsync RejectsEmptyClaimId")] + [Fact] + public async Task StoreAsync_RejectsEmptyClaimId() + { + var store = new InMemoryClaimCheckStoreWithTtl(); + + await ScenarioExpect.ThrowsAsync( + () => store.StoreAsync("", "payload", MessageHeaders.Empty).AsTask()); + } + + [Scenario("StoreAsync RejectsNegativeTtl")] + [Fact] + public async Task StoreAsync_RejectsNegativeTtl() + { + var store = new InMemoryClaimCheckStoreWithTtl(); + + await ScenarioExpect.ThrowsAsync( + () => store.StoreAsync("claim-1", "payload", MessageHeaders.Empty, TimeSpan.FromSeconds(-1)).AsTask()); + } + + [Scenario("StoreAsync RejectsNullHeaders")] + [Fact] + public async Task StoreAsync_RejectsNullHeaders() + { + var store = new InMemoryClaimCheckStoreWithTtl(); + + await ScenarioExpect.ThrowsAsync( + () => store.StoreAsync("claim-1", "payload", null!).AsTask()); + } +} diff --git a/test/PatternKit.Tests/Messaging/Transformation/NormalizerTests.cs b/test/PatternKit.Tests/Messaging/Transformation/NormalizerTests.cs new file mode 100644 index 00000000..b67e54c8 --- /dev/null +++ b/test/PatternKit.Tests/Messaging/Transformation/NormalizerTests.cs @@ -0,0 +1,114 @@ +using PatternKit.Messaging.Transformation; +using TinyBDD; + +namespace PatternKit.Tests.Messaging.Transformation; + +public sealed class NormalizerTests +{ + [Scenario("NormalizeAsync FirstMatchWins")] + [Fact] + public async Task NormalizeAsync_FirstMatchWins() + { + var normalizer = Normalizer.Create() + .When(s => s.StartsWith("JSON:"), "json").Normalize(async (s, _) => { await Task.CompletedTask; return new Order(s[5..], "json"); }) + .When(s => s.StartsWith("XML:"), "xml").Normalize(async (s, _) => { await Task.CompletedTask; return new Order(s[4..], "xml"); }) + .Build(); + + var result = await normalizer.NormalizeAsync("JSON:order-1"); + + ScenarioExpect.True(result.Normalized); + ScenarioExpect.Equal("order-1", result.Canonical!.Id); + ScenarioExpect.Equal("json", result.Canonical.Format); + ScenarioExpect.Equal("json", result.HandlerName); + } + + [Scenario("NormalizeAsync SecondHandlerMatchesWhenFirstDoesNot")] + [Fact] + public async Task NormalizeAsync_SecondHandlerMatchesWhenFirstDoesNot() + { + var normalizer = Normalizer.Create() + .When(s => s.StartsWith("JSON:")).Normalize(async (s, _) => { await Task.CompletedTask; return new Order(s, "json"); }) + .When(s => s.StartsWith("XML:")).Normalize(async (s, _) => { await Task.CompletedTask; return new Order(s[4..], "xml"); }) + .Build(); + + var result = await normalizer.NormalizeAsync("XML:order-2"); + + ScenarioExpect.True(result.Normalized); + ScenarioExpect.Equal("xml", result.Canonical!.Format); + } + + [Scenario("NormalizeAsync DefaultHandlerUsedWhenNoMatchFound")] + [Fact] + public async Task NormalizeAsync_DefaultHandlerUsedWhenNoMatchFound() + { + var normalizer = Normalizer.Create() + .When(s => s.StartsWith("JSON:")).Normalize(async (s, _) => { await Task.CompletedTask; return new Order(s, "json"); }) + .Default(async (s, _) => { await Task.CompletedTask; return new Order(s, "unknown"); }) + .Build(); + + var result = await normalizer.NormalizeAsync("CSV:data"); + + ScenarioExpect.True(result.Normalized); + ScenarioExpect.Equal("unknown", result.Canonical!.Format); + ScenarioExpect.Equal("default", result.HandlerName); + } + + [Scenario("NormalizeAsync NoMatchAndNoDefault ReturnsMiss")] + [Fact] + public async Task NormalizeAsync_NoMatchAndNoDefault_ReturnsMiss() + { + var normalizer = Normalizer.Create() + .When(s => s.StartsWith("JSON:")).Normalize(async (s, _) => { await Task.CompletedTask; return new Order(s, "json"); }) + .Build(); + + var result = await normalizer.NormalizeAsync("CSV:data"); + + ScenarioExpect.False(result.Normalized); + ScenarioExpect.NotNull(result.MissReason); + } + + [Scenario("Builder RequiresAtLeastOneHandlerOrDefault")] + [Fact] + public void Builder_RequiresAtLeastOneHandlerOrDefault() + { + ScenarioExpect.Throws(() => + Normalizer.Create().Build()); + } + + [Scenario("Builder RejectsNullPredicate")] + [Fact] + public void Builder_RejectsNullPredicate() + { + ScenarioExpect.Throws(() => + Normalizer.Create().When(null!)); + } + + [Scenario("NormalizeAsync RespectsCancellation")] + [Fact] + public async Task NormalizeAsync_RespectsCancellation() + { + using var cts = new CancellationTokenSource(); + cts.Cancel(); + var normalizer = Normalizer.Create() + .When(s => true).Normalize(async (s, ct) => { await Task.Delay(100, ct); return new Order(s, "x"); }) + .Build(); + + await ScenarioExpect.ThrowsAsync(() => normalizer.NormalizeAsync("data", cts.Token).AsTask()); + } + + [Scenario("NormalizeAsync ContentPredicateReceivesActualValue")] + [Fact] + public async Task NormalizeAsync_ContentPredicateReceivesActualValue() + { + string? seenRaw = null; + var normalizer = Normalizer.Create() + .When(s => { seenRaw = s; return true; }).Normalize(async (s, _) => { await Task.CompletedTask; return new Order(s, "x"); }) + .Build(); + + await normalizer.NormalizeAsync("my-raw-value"); + + ScenarioExpect.Equal("my-raw-value", seenRaw); + } + + private sealed record Order(string Id, string Format); +}