Skip to content

Commit bb17174

Browse files
committed
Enforce core runtime concurrency semantics
1 parent 7232550 commit bb17174

6 files changed

Lines changed: 336 additions & 6 deletions

File tree

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net10.0</TargetFramework>
5+
<ImplicitUsings>enable</ImplicitUsings>
6+
<Nullable>enable</Nullable>
7+
<IsPackable>false</IsPackable>
8+
</PropertyGroup>
9+
10+
<ItemGroup>
11+
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="10.0.2" />
12+
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="18.0.0" />
13+
<PackageReference Include="xunit" Version="2.9.3" />
14+
<PackageReference Include="xunit.runner.visualstudio" Version="3.1.4">
15+
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
16+
<PrivateAssets>all</PrivateAssets>
17+
</PackageReference>
18+
</ItemGroup>
19+
20+
<ItemGroup>
21+
<ProjectReference Include="..\..\src\ModularityKit.Mutator.csproj" />
22+
</ItemGroup>
23+
24+
</Project>
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
using Microsoft.Extensions.DependencyInjection;
2+
using System.Collections.Concurrent;
3+
using ModularityKit.Mutator.Abstractions;
4+
using ModularityKit.Mutator.Abstractions.Changes;
5+
using ModularityKit.Mutator.Abstractions.Context;
6+
using ModularityKit.Mutator.Abstractions.Engine;
7+
using ModularityKit.Mutator.Abstractions.Intent;
8+
using ModularityKit.Mutator.Abstractions.Results;
9+
using ModularityKit.Mutator.Runtime;
10+
using Xunit;
11+
12+
namespace ModularityKit.Mutator.Tests.Runtime;
13+
14+
public sealed class MutationEngineConcurrencyTests
15+
{
16+
[Fact]
17+
public async Task ExecuteAsync_serializes_mutations_that_target_the_same_state_id()
18+
{
19+
var services = new ServiceCollection();
20+
services.AddMutators(configure: options =>
21+
{
22+
options.MaxConcurrentMutations = 4;
23+
options.EnableDetailedMetrics = false;
24+
});
25+
26+
await using var provider = services.BuildServiceProvider();
27+
var engine = provider.GetRequiredService<IMutationEngine>();
28+
using var gate = new BlockingMutationGate();
29+
var state = new OrderedState("initial");
30+
31+
var first = new BlockingMutation(gate, "shared-state", "first");
32+
var second = new BlockingMutation(gate, "shared-state", "second");
33+
34+
var firstTask = Task.Run(() => engine.ExecuteAsync(first, state));
35+
var secondTask = Task.Run(() => engine.ExecuteAsync(second, state));
36+
37+
Assert.True(await gate.WaitForEntriesAsync(1, TimeSpan.FromSeconds(5)));
38+
Assert.Equal(1, gate.PeakConcurrency);
39+
40+
gate.Release();
41+
42+
var results = await Task.WhenAll(firstTask, secondTask);
43+
44+
Assert.All(results, result => Assert.True(result.IsSuccess));
45+
Assert.Equal(1, gate.PeakConcurrency);
46+
}
47+
48+
[Fact]
49+
public async Task ExecuteAsync_honors_max_concurrent_mutations_for_different_states()
50+
{
51+
var services = new ServiceCollection();
52+
services.AddMutators(configure: options =>
53+
{
54+
options.MaxConcurrentMutations = 2;
55+
options.EnableDetailedMetrics = false;
56+
});
57+
58+
await using var provider = services.BuildServiceProvider();
59+
var engine = provider.GetRequiredService<IMutationEngine>();
60+
using var gate = new BlockingMutationGate();
61+
var states = new[]
62+
{
63+
new OrderedState("one"),
64+
new OrderedState("two"),
65+
new OrderedState("three"),
66+
new OrderedState("four")
67+
};
68+
69+
var tasks = new[]
70+
{
71+
Task.Run(() => engine.ExecuteAsync(new BlockingMutation(gate, "state-1", "one"), states[0])),
72+
Task.Run(() => engine.ExecuteAsync(new BlockingMutation(gate, "state-2", "two"), states[1])),
73+
Task.Run(() => engine.ExecuteAsync(new BlockingMutation(gate, "state-3", "three"), states[2])),
74+
Task.Run(() => engine.ExecuteAsync(new BlockingMutation(gate, "state-4", "four"), states[3]))
75+
};
76+
77+
Assert.True(await gate.WaitForEntriesAsync(2, TimeSpan.FromSeconds(5)));
78+
Assert.Equal(2, gate.PeakConcurrency);
79+
80+
gate.Release();
81+
82+
var results = await Task.WhenAll(tasks);
83+
84+
Assert.All(results, result => Assert.True(result.IsSuccess));
85+
Assert.Equal(2, gate.PeakConcurrency);
86+
}
87+
88+
[Fact]
89+
public async Task ExecuteBatchAsync_remains_ordered_while_respecting_runtime_concurrency_gates()
90+
{
91+
var services = new ServiceCollection();
92+
services.AddMutators(configure: options =>
93+
{
94+
options.MaxConcurrentMutations = 2;
95+
options.EnableDetailedMetrics = false;
96+
});
97+
98+
await using var provider = services.BuildServiceProvider();
99+
var engine = provider.GetRequiredService<IMutationEngine>();
100+
var observed = new ConcurrentQueue<string>();
101+
102+
var batch = new[]
103+
{
104+
new OrderedMutation("state-1", "first", observed),
105+
new OrderedMutation("state-2", "second", observed),
106+
new OrderedMutation("state-1", "third", observed)
107+
};
108+
109+
var result = await engine.ExecuteBatchAsync(batch, new OrderedState("initial"));
110+
111+
Assert.True(result.IsSuccess);
112+
Assert.Equal(3, result.Results.Count);
113+
Assert.Equal(new[] { "first", "second", "third" }, observed);
114+
}
115+
116+
[Fact]
117+
public void AddMutators_rejects_non_positive_max_concurrent_mutations()
118+
{
119+
var services = new ServiceCollection();
120+
services.AddMutators(configure: options => options.MaxConcurrentMutations = 0);
121+
122+
Assert.Throws<ArgumentOutOfRangeException>(() => services.BuildServiceProvider().GetRequiredService<IMutationEngine>());
123+
}
124+
125+
private sealed record OrderedState(string Value);
126+
127+
private sealed class OrderedMutation(string stateId, string value, ConcurrentQueue<string> observed)
128+
: IMutation<OrderedState>
129+
{
130+
public MutationIntent Intent { get; } = new()
131+
{
132+
OperationName = "Order",
133+
Category = "Test",
134+
Description = "Observe execution order"
135+
};
136+
137+
public MutationContext Context { get; } = MutationContext.User("tester", "Tester", "Order test")
138+
with { StateId = stateId };
139+
140+
public MutationResult<OrderedState> Apply(OrderedState state)
141+
{
142+
observed.Enqueue(value);
143+
return MutationResult<OrderedState>.Success(state with { Value = value }, ChangeSet.Empty);
144+
}
145+
146+
public ValidationResult Validate(OrderedState state) => ValidationResult.Success();
147+
148+
public MutationResult<OrderedState> Simulate(OrderedState state) => Apply(state);
149+
}
150+
151+
private sealed class BlockingMutationGate : IDisposable
152+
{
153+
private readonly ManualResetEventSlim _release = new(false);
154+
private int _entered;
155+
private int _active;
156+
private int _peak;
157+
158+
public int PeakConcurrency => Volatile.Read(ref _peak);
159+
160+
public async Task<bool> WaitForEntriesAsync(int expectedEntries, TimeSpan timeout)
161+
{
162+
var started = DateTimeOffset.UtcNow;
163+
164+
while (Volatile.Read(ref _entered) < expectedEntries)
165+
{
166+
if (DateTimeOffset.UtcNow - started > timeout)
167+
return false;
168+
169+
await Task.Delay(10);
170+
}
171+
172+
return true;
173+
}
174+
175+
public void Enter()
176+
{
177+
Interlocked.Increment(ref _entered);
178+
var active = Interlocked.Increment(ref _active);
179+
180+
while (true)
181+
{
182+
var peak = Volatile.Read(ref _peak);
183+
if (active <= peak || Interlocked.CompareExchange(ref _peak, active, peak) == peak)
184+
break;
185+
}
186+
187+
_release.Wait();
188+
Interlocked.Decrement(ref _active);
189+
}
190+
191+
public void Release() => _release.Set();
192+
193+
public void Dispose() => _release.Dispose();
194+
}
195+
196+
private sealed class BlockingMutation(
197+
BlockingMutationGate gate,
198+
string stateId,
199+
string value) : IMutation<OrderedState>
200+
{
201+
public MutationIntent Intent { get; } = new()
202+
{
203+
OperationName = "Block",
204+
Category = "Test",
205+
Description = "Block until released"
206+
};
207+
208+
public MutationContext Context { get; } = MutationContext.User($"{stateId}-actor", $"{stateId}-actor", "Concurrency test")
209+
with { StateId = stateId };
210+
211+
public MutationResult<OrderedState> Apply(OrderedState state)
212+
{
213+
gate.Enter();
214+
return MutationResult<OrderedState>.Success(state with { Value = value }, ChangeSet.Empty);
215+
}
216+
217+
public ValidationResult Validate(OrderedState state) => ValidationResult.Success();
218+
219+
public MutationResult<OrderedState> Simulate(OrderedState state) => Apply(state);
220+
}
221+
}

src/Abstractions/Engine/IMutationEngine.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ namespace ModularityKit.Mutator.Abstractions.Engine;
2525
/// <item><description>History persistence</description></item>
2626
/// </list>
2727
/// <para>
28+
/// Core runtime concurrency is governed by <see cref="ModularityKit.Mutator.Abstractions.MutationEngineOptions.MaxConcurrentMutations"/>.
29+
/// Mutations that target the same <see cref="MutationContext.StateId"/> are serialized by the runtime so shared-state workloads
30+
/// remain deterministic. This is separate from governance request storage concurrency, which protects request lifecycle writes
31+
/// in the governance package.
32+
/// </para>
33+
/// <para>
2834
/// The engine acts as the primary governance boundary for all state mutations.
2935
/// </para>
3036
/// </remarks>
@@ -57,8 +63,9 @@ Task<MutationResult<TState>> ExecuteAsync<TState>(
5763
/// A <see cref="BatchMutationResult{TState}"/> describing the outcome of the batch execution.
5864
/// </returns>
5965
/// <remarks>
60-
/// Batch execution semantics (e.g. fail-fast vs best-effort) are controlled
61-
/// by <see cref="MutationEngineOptions"/>.
66+
/// Batch execution is ordered and sequential. Each batch step passes through the same core concurrency controls as a
67+
/// single execution, including the maximum concurrent execution limit and any state-specific serialization.
68+
/// Fail-fast vs best-effort behavior is controlled by <see cref="MutationEngineOptions"/>.
6269
/// </remarks>
6370
Task<BatchMutationResult<TState>> ExecuteBatchAsync<TState>(
6471
IEnumerable<IMutation<TState>> mutations,

src/Abstractions/MutationEngineOptions.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,14 @@ public sealed class MutationEngineOptions
5555
public bool EnableDetailedMetrics { get; set; } = false;
5656

5757
/// <summary>
58-
/// The maximum number of mutations that may be executed concurrently.
58+
/// The maximum number of mutations that may be executed concurrently by the core runtime.
5959
/// </summary>
6060
/// <remarks>
61-
/// This setting controls parallelism and can be used to limit resource usage
62-
/// or avoid contention.
61+
/// This setting limits concurrent core execution across the engine.
62+
/// Mutations that carry the same <see cref="Context.MutationContext.StateId"/>
63+
/// are serialized so shared-state workloads remain deterministic.
64+
/// Batch execution remains ordered; the limit applies to each batch step as it
65+
/// passes through the runtime.
6366
/// </remarks>
6467
public int MaxConcurrentMutations { get; set; } = 10;
6568

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
using System.Collections.Concurrent;
2+
3+
namespace ModularityKit.Mutator.Runtime.Internal;
4+
5+
/// <summary>
6+
/// Coordinates core mutation execution concurrency across the engine.
7+
/// </summary>
8+
internal sealed class MutationExecutionConcurrencyGate(int maxConcurrentMutations)
9+
{
10+
private readonly SemaphoreSlim _globalGate = new(maxConcurrentMutations, maxConcurrentMutations);
11+
private readonly ConcurrentDictionary<string, SemaphoreSlim> _stateGates = new(StringComparer.Ordinal);
12+
13+
public async ValueTask<Lease> EnterAsync(string? stateId, CancellationToken cancellationToken)
14+
{
15+
await _globalGate.WaitAsync(cancellationToken).ConfigureAwait(false);
16+
17+
var stateGate = default(SemaphoreSlim);
18+
19+
try
20+
{
21+
if (!string.IsNullOrWhiteSpace(stateId))
22+
{
23+
stateGate = _stateGates.GetOrAdd(stateId, static _ => new SemaphoreSlim(1, 1));
24+
await stateGate.WaitAsync(cancellationToken).ConfigureAwait(false);
25+
}
26+
27+
return new Lease(_globalGate, stateGate);
28+
}
29+
catch
30+
{
31+
_globalGate.Release();
32+
throw;
33+
}
34+
}
35+
36+
/// <summary>
37+
/// Represents an acquired execution slot.
38+
/// </summary>
39+
internal readonly struct Lease : IAsyncDisposable
40+
{
41+
private readonly SemaphoreSlim _globalGate;
42+
private readonly SemaphoreSlim? _stateGate;
43+
44+
public Lease(SemaphoreSlim globalGate, SemaphoreSlim? stateGate)
45+
{
46+
_globalGate = globalGate;
47+
_stateGate = stateGate;
48+
}
49+
50+
public ValueTask DisposeAsync()
51+
{
52+
_stateGate?.Release();
53+
_globalGate.Release();
54+
return ValueTask.CompletedTask;
55+
}
56+
}
57+
}

src/Runtime/MutationEngine.cs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ internal sealed class MutationEngine(
3232
private readonly IMutationHistoryStore _historyStore = historyStore ?? throw new ArgumentNullException(nameof(historyStore));
3333
private readonly IMetricsCollector _metricsCollector = metricsCollector ?? throw new ArgumentNullException(nameof(metricsCollector));
3434
private readonly MutationEngineOptions _options = options ?? throw new ArgumentNullException(nameof(options));
35+
private readonly MutationExecutionConcurrencyGate _concurrencyGate = CreateConcurrencyGate(options);
3536

3637
public async Task<MutationResult<TState>> ExecuteAsync<TState>(
3738
IMutation<TState> mutation,
@@ -42,9 +43,13 @@ public async Task<MutationResult<TState>> ExecuteAsync<TState>(
4243
var stopwatch = Stopwatch.StartNew();
4344
IMetricsScope? metricsScope = null;
4445

46+
await using var executionLease = await _concurrencyGate
47+
.EnterAsync(mutation.Context.StateId, cancellationToken)
48+
.ConfigureAwait(false);
49+
4550
if (_options.EnableDetailedMetrics)
4651
metricsScope = _metricsCollector.BeginScope(executionId);
47-
52+
4853
try
4954
{
5055
return await ExecutePipelineAsync(
@@ -374,4 +379,17 @@ private async Task StoreInHistoryAsync<TState>(
374379

375380
await _historyStore.StoreAsync(entry, cancellationToken);
376381
}
382+
383+
private static MutationExecutionConcurrencyGate CreateConcurrencyGate(MutationEngineOptions options)
384+
{
385+
ArgumentNullException.ThrowIfNull(options);
386+
387+
if (options.MaxConcurrentMutations < 1)
388+
throw new ArgumentOutOfRangeException(
389+
nameof(options),
390+
options.MaxConcurrentMutations,
391+
"MaxConcurrentMutations must be greater than zero.");
392+
393+
return new MutationExecutionConcurrencyGate(options.MaxConcurrentMutations);
394+
}
377395
}

0 commit comments

Comments
 (0)