diff --git a/src/NetEvolve.Pulse.Extensibility/ITimeoutRequest.cs b/src/NetEvolve.Pulse.Extensibility/ITimeoutRequest.cs new file mode 100644 index 00000000..ca7661ac --- /dev/null +++ b/src/NetEvolve.Pulse.Extensibility/ITimeoutRequest.cs @@ -0,0 +1,53 @@ +namespace NetEvolve.Pulse.Extensibility; + +/// +/// Marker interface for requests that enforce a per-request deadline using a . +/// Implement this interface alongside or to opt in to +/// built-in timeout enforcement without any external dependencies. +/// +/// +/// Usage: +/// When a request implements , the TimeoutRequestInterceptor +/// will create a linked using the effective timeout. +/// If the handler does not complete within that deadline, a is thrown. +/// Timeout Resolution: +/// +/// If is non-, that value is used as the deadline. +/// If is , the globally configured fallback timeout is used (if set). +/// If neither is set, the interceptor is a transparent pass-through. +/// +/// Requests that do not implement are always passed through without any timeout. +/// Distinguishing Timeout from User Cancellation: +/// The interceptor correctly distinguishes between a timeout-triggered cancellation and a caller-initiated +/// cancellation, re-throwing a only in the former case. +/// +/// +/// +/// // Explicit per-request timeout +/// public record ProcessOrderCommand(string OrderId) : ICommand<OrderResult>, ITimeoutRequest +/// { +/// public string? CorrelationId { get; set; } +/// public TimeSpan? Timeout => TimeSpan.FromSeconds(10); +/// } +/// +/// // Defer to the global fallback configured via AddRequestTimeout(globalTimeout: ...) +/// public record GetStatusQuery(string Id) : IQuery<Status>, ITimeoutRequest +/// { +/// public string? CorrelationId { get; set; } +/// public TimeSpan? Timeout => null; +/// } +/// +/// +/// +/// +/// +public interface ITimeoutRequest +{ + /// + /// Gets the maximum allowed duration for the handler to complete before a + /// is raised. + /// When , the globally configured fallback timeout is applied if set; + /// otherwise the interceptor is a transparent pass-through for this request. + /// + TimeSpan? Timeout { get; } +} diff --git a/src/NetEvolve.Pulse/Interceptors/TimeoutRequestInterceptor.cs b/src/NetEvolve.Pulse/Interceptors/TimeoutRequestInterceptor.cs new file mode 100644 index 00000000..c1e82899 --- /dev/null +++ b/src/NetEvolve.Pulse/Interceptors/TimeoutRequestInterceptor.cs @@ -0,0 +1,93 @@ +namespace NetEvolve.Pulse.Interceptors; + +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Options; +using NetEvolve.Pulse.Extensibility; + +/// +/// Built-in request interceptor that enforces a per-request deadline using a linked +/// , without any external dependencies. +/// +/// The type of request being intercepted. +/// The type of response produced by the request. +/// +/// Activation: +/// The interceptor only activates when the request implements . +/// Requests that do not implement are always passed through without any timeout. +/// For implementations the effective deadline is resolved as follows: +/// +/// — used when non-. +/// — used as fallback when is . +/// If neither is set, the interceptor is a transparent pass-through for that request. +/// +/// Cancellation Semantics: +/// The interceptor correctly distinguishes between a timeout-triggered cancellation and a +/// caller-initiated cancellation: only when the deadline is exceeded is a +/// thrown. Caller cancellations are propagated as +/// as usual. +/// Resource Management: +/// The internally created is always disposed, even when +/// the handler throws. +/// +internal sealed class TimeoutRequestInterceptor : IRequestInterceptor + where TRequest : IRequest +{ + private readonly IOptions _options; + + /// + /// Initializes a new instance of the class. + /// + /// The timeout interceptor options. + /// Thrown when is . + public TimeoutRequestInterceptor(IOptions options) + { + ArgumentNullException.ThrowIfNull(options); + _options = options; + } + + /// + /// + /// Thrown when the handler does not complete within the configured deadline and the original + /// has not been independently cancelled. + /// + public async Task HandleAsync( + TRequest request, + Func> handler, + CancellationToken cancellationToken = default + ) + { + ArgumentNullException.ThrowIfNull(handler); + + // Requests not implementing ITimeoutRequest are always passed through. + if (request is not ITimeoutRequest timeoutRequest) + { + return await handler(request, cancellationToken).ConfigureAwait(false); + } + + // Resolve effective timeout: per-request value first, global fallback second. + var timeout = timeoutRequest.Timeout ?? _options.Value.GlobalTimeout; + + // No timeout configured — transparent pass-through. + if (timeout is null) + { + return await handler(request, cancellationToken).ConfigureAwait(false); + } + + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(timeout.Value); + + try + { + return await handler(request, cts.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) + when (!cancellationToken.IsCancellationRequested && cts.Token.IsCancellationRequested) + { + throw new TimeoutException( + $"The request '{typeof(TRequest).Name}' timed out after {timeout.Value.TotalMilliseconds}ms." + ); + } + } +} diff --git a/src/NetEvolve.Pulse/TimeoutMediatorConfiguratorExtensions.cs b/src/NetEvolve.Pulse/TimeoutMediatorConfiguratorExtensions.cs new file mode 100644 index 00000000..6873c0a6 --- /dev/null +++ b/src/NetEvolve.Pulse/TimeoutMediatorConfiguratorExtensions.cs @@ -0,0 +1,73 @@ +namespace NetEvolve.Pulse; + +using System; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using NetEvolve.Pulse.Extensibility; +using NetEvolve.Pulse.Interceptors; + +/// +/// Provides fluent extension methods for registering the built-in request timeout interceptor +/// with the Pulse mediator. +/// +/// +/// +public static class TimeoutMediatorConfiguratorExtensions +{ + /// + /// Registers the built-in TimeoutRequestInterceptor that enforces per-request deadlines + /// using a linked . + /// + /// The mediator configurator. + /// + /// An optional global fallback timeout applied to implementations + /// that return from . + /// Requests that do not implement are always passed through + /// regardless of this value. + /// When (default), only requests implementing + /// with a non- are subject to a deadline. + /// + /// The configurator for method chaining. + /// Thrown when is . + /// + /// Timeout Resolution (for requests only): + /// + /// — used when non-. + /// — used as fallback when is . + /// If neither is set, the interceptor is a transparent pass-through. + /// + /// Requests that do not implement are always passed through. + /// Cancellation Semantics: + /// A is thrown only when the deadline is exceeded. + /// Caller-initiated cancellations propagate as as usual. + /// + /// + /// Without global timeout (only ITimeoutRequest requests with a non-null Timeout are affected): + /// + /// services.AddPulse(c => c.AddRequestTimeout()); + /// + /// With global fallback timeout (ITimeoutRequest requests with a null Timeout use this as deadline): + /// + /// services.AddPulse(c => c.AddRequestTimeout(TimeSpan.FromSeconds(30))); + /// + /// + /// + /// + public static IMediatorConfigurator AddRequestTimeout( + this IMediatorConfigurator configurator, + TimeSpan? globalTimeout = null + ) + { + ArgumentNullException.ThrowIfNull(configurator); + + _ = configurator.Services.Configure(opts => + opts.GlobalTimeout = globalTimeout + ); + + configurator.Services.TryAddEnumerable( + ServiceDescriptor.Singleton(typeof(IRequestInterceptor<,>), typeof(TimeoutRequestInterceptor<,>)) + ); + + return configurator; + } +} diff --git a/src/NetEvolve.Pulse/TimeoutRequestInterceptorOptions.cs b/src/NetEvolve.Pulse/TimeoutRequestInterceptorOptions.cs new file mode 100644 index 00000000..9785ec37 --- /dev/null +++ b/src/NetEvolve.Pulse/TimeoutRequestInterceptorOptions.cs @@ -0,0 +1,31 @@ +namespace NetEvolve.Pulse; + +/// +/// Options for the built-in request timeout interceptor registered via AddRequestTimeout(). +/// +/// +/// Global Timeout: +/// When is set, all requests that do not implement +/// are also subject to the global deadline. +/// Requests that implement always use their own +/// value, which takes precedence over +/// . +/// +/// +/// +/// services.AddPulse(c => c.AddRequestTimeout(TimeSpan.FromSeconds(30))); +/// +/// +/// +public sealed class TimeoutRequestInterceptorOptions +{ + /// + /// Gets or sets the global fallback timeout applied to + /// implementations that return from . + /// Requests that do not implement are always passed through + /// regardless of this value. + /// When (default), requests with a + /// are not subject to any deadline. + /// + public TimeSpan? GlobalTimeout { get; set; } +} diff --git a/tests/NetEvolve.Pulse.Tests.Integration/RequestTimeoutTests.cs b/tests/NetEvolve.Pulse.Tests.Integration/RequestTimeoutTests.cs new file mode 100644 index 00000000..084a68ac --- /dev/null +++ b/tests/NetEvolve.Pulse.Tests.Integration/RequestTimeoutTests.cs @@ -0,0 +1,214 @@ +namespace NetEvolve.Pulse.Tests.Integration; + +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using NetEvolve.Pulse.Extensibility; +using TUnit.Core; + +public sealed class RequestTimeoutTests +{ + private static ServiceCollection CreateServiceCollection() + { + var services = new ServiceCollection(); + _ = services.AddLogging().AddSingleton(TimeProvider.System); + return services; + } + + [Test] + public async Task SendAsync_WithTimeoutRequest_WhenCompletesWithinDeadline_ReturnsResult() + { + var services = CreateServiceCollection(); + _ = services + .AddPulse(config => config.AddRequestTimeout()) + .AddScoped, FastTimeoutCommandHandler>(); + + await using var provider = services.BuildServiceProvider(); + var mediator = provider.GetRequiredService(); + + var command = new FastTimeoutCommand(TimeSpan.FromSeconds(5)); + var result = await mediator.SendAsync(command).ConfigureAwait(false); + + _ = await Assert.That(result).IsEqualTo("fast-result"); + } + + [Test] + public async Task SendAsync_WithTimeoutRequest_WhenExceedsDeadline_ThrowsTimeoutException() + { + var services = CreateServiceCollection(); + _ = services + .AddPulse(config => config.AddRequestTimeout()) + .AddScoped, SlowTimeoutCommandHandler>(); + + await using var provider = services.BuildServiceProvider(); + var mediator = provider.GetRequiredService(); + + var command = new SlowTimeoutCommand(TimeSpan.FromMilliseconds(50)); + + _ = await Assert.ThrowsAsync(async () => + await mediator.SendAsync(command).ConfigureAwait(false) + ); + } + + [Test] + public async Task SendAsync_WithTimeoutRequest_WhenOriginalTokenCancelled_ThrowsOperationCanceledException() + { + var services = CreateServiceCollection(); + _ = services + .AddPulse(config => config.AddRequestTimeout()) + .AddScoped, SlowTimeoutCommandHandler>(); + + await using var provider = services.BuildServiceProvider(); + var mediator = provider.GetRequiredService(); + + using var cts = new CancellationTokenSource(); + cts.CancelAfter(TimeSpan.FromMilliseconds(50)); + + var command = new SlowTimeoutCommand(TimeSpan.FromSeconds(5)); + + var exception = await Assert.ThrowsAsync(async () => + await mediator.SendAsync(command, cts.Token).ConfigureAwait(false) + ); + + _ = await Assert.That(exception).IsNotTypeOf(); + } + + [Test] + public async Task SendAsync_WithNonTimeoutRequest_AlwaysPassesThrough_EvenWithGlobalTimeout() + { + var services = CreateServiceCollection(); + // GlobalTimeout of 1ms — but PlainCommand doesn't implement ITimeoutRequest, so it should pass through. + _ = services + .AddPulse(config => config.AddRequestTimeout(TimeSpan.FromMilliseconds(1))) + .AddScoped, PlainCommandHandler>(); + + await using var provider = services.BuildServiceProvider(); + var mediator = provider.GetRequiredService(); + + var command = new PlainCommand(); + var result = await mediator.SendAsync(command).ConfigureAwait(false); + + _ = await Assert.That(result).IsEqualTo("plain-result"); + } + + [Test] + public async Task SendAsync_WithTimeoutRequest_NullTimeout_AndGlobalTimeout_WhenCompletesWithinDeadline_ReturnsResult() + { + var services = CreateServiceCollection(); + _ = services + .AddPulse(config => config.AddRequestTimeout(TimeSpan.FromSeconds(5))) + .AddScoped, NullTimeoutCommandHandler>(); + + await using var provider = services.BuildServiceProvider(); + var mediator = provider.GetRequiredService(); + + var command = new NullTimeoutCommand(); + var result = await mediator.SendAsync(command).ConfigureAwait(false); + + _ = await Assert.That(result).IsEqualTo("null-timeout-result"); + } + + [Test] + public async Task SendAsync_WithTimeoutRequest_NullTimeout_AndGlobalTimeout_WhenExceedsDeadline_ThrowsTimeoutException() + { + var services = CreateServiceCollection(); + _ = services + .AddPulse(config => config.AddRequestTimeout(TimeSpan.FromMilliseconds(50))) + .AddScoped, SlowNullTimeoutCommandHandler>(); + + await using var provider = services.BuildServiceProvider(); + var mediator = provider.GetRequiredService(); + + var command = new SlowNullTimeoutCommand(); + + _ = await Assert.ThrowsAsync(async () => + await mediator.SendAsync(command).ConfigureAwait(false) + ); + } + + [Test] + public async Task SendAsync_WithTimeoutRequest_NullTimeout_AndNoGlobalTimeout_PassesThrough() + { + var services = CreateServiceCollection(); + _ = services + .AddPulse(config => config.AddRequestTimeout()) + .AddScoped, NullTimeoutCommandHandler>(); + + await using var provider = services.BuildServiceProvider(); + var mediator = provider.GetRequiredService(); + + var command = new NullTimeoutCommand(); + var result = await mediator.SendAsync(command).ConfigureAwait(false); + + _ = await Assert.That(result).IsEqualTo("null-timeout-result"); + } + + private sealed record FastTimeoutCommand(TimeSpan? Timeout) : ICommand, ITimeoutRequest + { + public string? CorrelationId { get; set; } + } + + private sealed record SlowTimeoutCommand(TimeSpan? Timeout) : ICommand, ITimeoutRequest + { + public string? CorrelationId { get; set; } + } + + private sealed record NullTimeoutCommand : ICommand, ITimeoutRequest + { + public string? CorrelationId { get; set; } + + public TimeSpan? Timeout => null; + } + + private sealed record SlowNullTimeoutCommand : ICommand, ITimeoutRequest + { + public string? CorrelationId { get; set; } + + public TimeSpan? Timeout => null; + } + + private sealed record PlainCommand : ICommand + { + public string? CorrelationId { get; set; } + } + + private sealed class FastTimeoutCommandHandler : ICommandHandler + { + public Task HandleAsync(FastTimeoutCommand command, CancellationToken cancellationToken = default) => + Task.FromResult("fast-result"); + } + + private sealed class SlowTimeoutCommandHandler : ICommandHandler + { + public async Task HandleAsync(SlowTimeoutCommand command, CancellationToken cancellationToken = default) + { + await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken).ConfigureAwait(false); + return "slow-result"; + } + } + + private sealed class NullTimeoutCommandHandler : ICommandHandler + { + public Task HandleAsync(NullTimeoutCommand command, CancellationToken cancellationToken = default) => + Task.FromResult("null-timeout-result"); + } + + private sealed class SlowNullTimeoutCommandHandler : ICommandHandler + { + public async Task HandleAsync( + SlowNullTimeoutCommand command, + CancellationToken cancellationToken = default + ) + { + await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken).ConfigureAwait(false); + return "slow-null-timeout-result"; + } + } + + private sealed class PlainCommandHandler : ICommandHandler + { + public Task HandleAsync(PlainCommand command, CancellationToken cancellationToken = default) => + Task.FromResult("plain-result"); + } +} diff --git a/tests/NetEvolve.Pulse.Tests.Unit/Interceptors/TimeoutRequestInterceptorTests.cs b/tests/NetEvolve.Pulse.Tests.Unit/Interceptors/TimeoutRequestInterceptorTests.cs new file mode 100644 index 00000000..04b21dad --- /dev/null +++ b/tests/NetEvolve.Pulse.Tests.Unit/Interceptors/TimeoutRequestInterceptorTests.cs @@ -0,0 +1,211 @@ +namespace NetEvolve.Pulse.Tests.Unit.Interceptors; + +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Options; +using NetEvolve.Pulse.Extensibility; +using NetEvolve.Pulse.Interceptors; +using TUnit.Core; + +public sealed class TimeoutRequestInterceptorTests +{ + [Test] + public async Task HandleAsync_WithNullHandler_ThrowsArgumentNullException() + { + var options = Options.Create(new TimeoutRequestInterceptorOptions()); + var interceptor = new TimeoutRequestInterceptor(options); + var command = new TestTimeoutCommand(TimeSpan.FromSeconds(5)); + + _ = await Assert.ThrowsAsync(async () => + await interceptor.HandleAsync(command, null!).ConfigureAwait(false) + ); + } + + [Test] + public async Task HandleAsync_WithTimeoutRequest_WhenCompletesWithinDeadline_ReturnsResult() + { + var options = Options.Create(new TimeoutRequestInterceptorOptions()); + var interceptor = new TimeoutRequestInterceptor(options); + var command = new TestTimeoutCommand(TimeSpan.FromSeconds(5)); + + var result = await interceptor.HandleAsync(command, (_, _) => Task.FromResult("success")).ConfigureAwait(false); + + _ = await Assert.That(result).IsEqualTo("success"); + } + + [Test] + public async Task HandleAsync_WithTimeoutRequest_WhenExceedsDeadline_ThrowsTimeoutException() + { + var options = Options.Create(new TimeoutRequestInterceptorOptions()); + var interceptor = new TimeoutRequestInterceptor(options); + var command = new TestTimeoutCommand(TimeSpan.FromMilliseconds(50)); + + var exception = await Assert.ThrowsAsync(async () => + await interceptor + .HandleAsync( + command, + async (_, ct) => + { + await Task.Delay(TimeSpan.FromSeconds(5), ct).ConfigureAwait(false); + return "never"; + } + ) + .ConfigureAwait(false) + ); + + _ = await Assert.That(exception).IsNotNull(); + _ = await Assert.That(exception!.Message).Contains("TestTimeoutCommand"); + _ = await Assert.That(exception.Message).Contains("50"); + } + + [Test] + public async Task HandleAsync_WithOriginalTokenCancelled_ThrowsOperationCanceledException_NotTimeoutException() + { + var options = Options.Create(new TimeoutRequestInterceptorOptions()); + var interceptor = new TimeoutRequestInterceptor(options); + var command = new TestTimeoutCommand(TimeSpan.FromSeconds(5)); + using var cts = new CancellationTokenSource(); + cts.CancelAfter(TimeSpan.FromMilliseconds(50)); + + var exception = await Assert.ThrowsAsync(async () => + await interceptor + .HandleAsync( + command, + async (_, ct) => + { + await Task.Delay(TimeSpan.FromSeconds(5), ct).ConfigureAwait(false); + return "never"; + }, + cts.Token + ) + .ConfigureAwait(false) + ); + + _ = await Assert.That(exception).IsNotNull(); + _ = await Assert.That(exception).IsNotTypeOf(); + } + + [Test] + public async Task HandleAsync_WithNonTimeoutRequest_AlwaysPassesThrough_RegardlessOfGlobalTimeout() + { + var options = Options.Create( + new TimeoutRequestInterceptorOptions { GlobalTimeout = TimeSpan.FromMilliseconds(1) } + ); + var interceptor = new TimeoutRequestInterceptor(options); + var command = new TestCommand(); + + // Even though GlobalTimeout is 1ms, the non-ITimeoutRequest should pass through immediately. + var result = await interceptor + .HandleAsync(command, (_, _) => Task.FromResult("passed-through")) + .ConfigureAwait(false); + + _ = await Assert.That(result).IsEqualTo("passed-through"); + } + + [Test] + public async Task HandleAsync_WithTimeoutRequest_NullTimeout_AndNoGlobalTimeout_PassesThrough() + { + var options = Options.Create(new TimeoutRequestInterceptorOptions()); + var interceptor = new TimeoutRequestInterceptor(options); + var command = new TestTimeoutCommand(null); + + var result = await interceptor + .HandleAsync(command, (_, _) => Task.FromResult("passed-through")) + .ConfigureAwait(false); + + _ = await Assert.That(result).IsEqualTo("passed-through"); + } + + [Test] + public async Task HandleAsync_WithTimeoutRequest_NullTimeout_AndGlobalTimeout_WhenCompletesWithinDeadline_ReturnsResult() + { + var options = Options.Create(new TimeoutRequestInterceptorOptions { GlobalTimeout = TimeSpan.FromSeconds(5) }); + var interceptor = new TimeoutRequestInterceptor(options); + var command = new TestTimeoutCommand(null); + + var result = await interceptor + .HandleAsync(command, (_, _) => Task.FromResult("global-fallback-success")) + .ConfigureAwait(false); + + _ = await Assert.That(result).IsEqualTo("global-fallback-success"); + } + + [Test] + public async Task HandleAsync_WithTimeoutRequest_NullTimeout_AndGlobalTimeout_WhenExceedsDeadline_ThrowsTimeoutException() + { + var options = Options.Create( + new TimeoutRequestInterceptorOptions { GlobalTimeout = TimeSpan.FromMilliseconds(50) } + ); + var interceptor = new TimeoutRequestInterceptor(options); + var command = new TestTimeoutCommand(null); + + var exception = await Assert.ThrowsAsync(async () => + await interceptor + .HandleAsync( + command, + async (_, ct) => + { + await Task.Delay(TimeSpan.FromSeconds(5), ct).ConfigureAwait(false); + return "never"; + } + ) + .ConfigureAwait(false) + ); + + _ = await Assert.That(exception).IsNotNull(); + _ = await Assert.That(exception!.Message).Contains("TestTimeoutCommand"); + } + + [Test] + public async Task HandleAsync_WithTimeoutRequest_ExplicitTimeoutOverridesGlobalTimeout() + { + // Per-request timeout (50ms) should take precedence over global (5s), + // so the request should time out. + var options = Options.Create(new TimeoutRequestInterceptorOptions { GlobalTimeout = TimeSpan.FromSeconds(5) }); + var interceptor = new TimeoutRequestInterceptor(options); + var command = new TestTimeoutCommand(TimeSpan.FromMilliseconds(50)); + + var exception = await Assert.ThrowsAsync(async () => + await interceptor + .HandleAsync( + command, + async (_, ct) => + { + await Task.Delay(TimeSpan.FromSeconds(5), ct).ConfigureAwait(false); + return "never"; + } + ) + .ConfigureAwait(false) + ); + + _ = await Assert.That(exception).IsNotNull(); + } + + [Test] + public async Task HandleAsync_DisposesLinkedCts_EvenWhenHandlerThrows() + { + var options = Options.Create(new TimeoutRequestInterceptorOptions()); + var interceptor = new TimeoutRequestInterceptor(options); + var command = new TestTimeoutCommand(TimeSpan.FromSeconds(5)); + + _ = await Assert.ThrowsAsync(async () => + await interceptor + .HandleAsync(command, (_, _) => throw new InvalidOperationException("handler error")) + .ConfigureAwait(false) + ); + + // If CancellationTokenSource was not disposed, a subsequent test run might detect undisposed resources. + // This test simply verifies the interceptor completes without resource-leak exceptions. + } + + private sealed record TestTimeoutCommand(TimeSpan? Timeout) : ICommand, ITimeoutRequest + { + public string? CorrelationId { get; set; } + } + + private sealed record TestCommand : ICommand + { + public string? CorrelationId { get; set; } + } +}