From 0c6b0193ab39bf34769914080c6f8d5dd403dc2e Mon Sep 17 00:00:00 2001 From: Glenn Watson <5834289+glennawatson@users.noreply.github.com> Date: Thu, 25 Jun 2026 22:44:34 +1000 Subject: [PATCH] test(operators): make throttle supersession tests deterministic - WhenThrottleReceivesRapidValues_ThenOnlyEmitsLatest completed the DirectSource from the test thread while the throttled emission was still unwinding on a pooled timer continuation. That overlap tripped WitnessAsync's concurrent-call guard, which silently drops the completion and left the test waiting on a never-signalled TCS until the 5s WaitAsync timed out. Drive completion re-entrantly from inside the emission handler so both notifications stay on one thread. - WhenThrottleUntilTrueFastReplacements_ThenLatestWins latched the first emission into a single-value TCS, so a wall-clock race where the earlier value's timer fired before the later value replaced it poisoned the result. Record all emissions and wait for the later value (whose timer is never superseded), then assert the final observed value is the latest. Test-only; no product behaviour changes. --- .../TimeBasedOperatorTests.cs | 17 +++++++++------ .../ThrottleUntilTrueObservableTests.cs | 21 ++++++++++++++----- 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/src/tests/ReactiveUI.Primitives.Async.Tests/TimeBasedOperatorTests.cs b/src/tests/ReactiveUI.Primitives.Async.Tests/TimeBasedOperatorTests.cs index 08da228a..f62086d7 100644 --- a/src/tests/ReactiveUI.Primitives.Async.Tests/TimeBasedOperatorTests.cs +++ b/src/tests/ReactiveUI.Primitives.Async.Tests/TimeBasedOperatorTests.cs @@ -671,14 +671,21 @@ public async Task WhenThrottleReceivesRapidValues_ThenOnlyEmitsLatest() DirectSource source = new(); List items = []; TaskCompletionSource completed = new(TaskCreationOptions.RunContinuationsAsynchronously); - TaskCompletionSource lastEmitted = new(TaskCreationOptions.RunContinuationsAsynchronously); await using var sub = await source.Throttle(TimeSpan.FromMilliseconds(50), manualProvider).SubscribeAsync( - (x, ct) => + async (x, ct) => { _ = ct; items.Add(x); - _ = x == LastValue && lastEmitted.TrySetResult(); - return default; + + // Drive completion re-entrantly from the throttled emission itself. The emission + // runs on a pooled timer continuation; completing the source from the test thread + // would race that still-unwinding OnNext call and trip the witness's concurrent-call + // guard, silently dropping the completion. Completing from inside the handler keeps + // both notifications on the same thread, where re-entrant calls are permitted. + if (x == LastValue) + { + await source.Complete(Result.Success); + } }, null, _ => @@ -692,8 +699,6 @@ public async Task WhenThrottleReceivesRapidValues_ThenOnlyEmitsLatest() await Assert.That(manualProvider.TimerCount).IsEqualTo(LastValue); manualProvider.FireAll(); - await lastEmitted.Task.WaitAsync(TimeSpan.FromSeconds(5)); - await source.Complete(Result.Success); await completed.Task.WaitAsync(TimeSpan.FromSeconds(5)); await Assert.That(items).Contains(LastValue); } diff --git a/src/tests/ReactiveUI.Primitives.Extensions.Tests/Operators/ThrottleUntilTrueObservableTests.cs b/src/tests/ReactiveUI.Primitives.Extensions.Tests/Operators/ThrottleUntilTrueObservableTests.cs index 134c09c8..8119262f 100644 --- a/src/tests/ReactiveUI.Primitives.Extensions.Tests/Operators/ThrottleUntilTrueObservableTests.cs +++ b/src/tests/ReactiveUI.Primitives.Extensions.Tests/Operators/ThrottleUntilTrueObservableTests.cs @@ -2,6 +2,7 @@ // ReactiveUI Association Incorporated licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. +using System.Collections.Concurrent; using System.Reactive.Subjects; namespace ReactiveUI.Primitives.Extensions.Tests.Operators; @@ -64,13 +65,23 @@ public async Task WhenThrottleUntilTrueFastReplacements_ThenLatestWins() const int Earlier = 1; const int Later = 2; Subject subject = new(); - TaskCompletionSource emitted = new(TaskCreationOptions.RunContinuationsAsynchronously); - using var sub = subject.ThrottleUntilTrue(ThrottleWindow, static _ => false) - .Subscribe(v => emitted.TrySetResult(v)); + ConcurrentQueue emissions = new(); + TaskCompletionSource laterArrived = new(TaskCreationOptions.RunContinuationsAsynchronously); + using var sub = subject.ThrottleUntilTrue(ThrottleWindow, static _ => false).Subscribe(v => + { + emissions.Enqueue(v); + _ = v == Later && laterArrived.TrySetResult(); + }); subject.OnNext(Earlier); subject.OnNext(Later); - var got = await emitted.Task.WaitAsync(TimeSpan.FromSeconds(5)); - await Assert.That(got).IsEqualTo(Later); + + // Later's timer is never superseded, so it always fires; wait on that deterministically + // rather than racing the wall-clock window. Under scheduling pressure Earlier's timer may + // still slip through first, so assert the invariant the operator guarantees: whatever the + // intermediate emissions, the final value observed is the latest one. + await laterArrived.Task.WaitAsync(TimeSpan.FromSeconds(5)); + var observed = emissions.ToArray(); + await Assert.That(observed[^1]).IsEqualTo(Later); } /// Verifies that source errors are forwarded.