diff --git a/src/tests/ReactiveUI.Primitives.Async.Tests/TimeBasedOperatorTests.cs b/src/tests/ReactiveUI.Primitives.Async.Tests/TimeBasedOperatorTests.cs index 08da228..f62086d 100644 --- a/src/tests/ReactiveUI.Primitives.Async.Tests/TimeBasedOperatorTests.cs +++ b/src/tests/ReactiveUI.Primitives.Async.Tests/TimeBasedOperatorTests.cs @@ -671,14 +671,21 @@ public async Task WhenThrottleReceivesRapidValues_ThenOnlyEmitsLatest() DirectSource source = new(); List items = []; TaskCompletionSource completed = new(TaskCreationOptions.RunContinuationsAsynchronously); - TaskCompletionSource lastEmitted = new(TaskCreationOptions.RunContinuationsAsynchronously); await using var sub = await source.Throttle(TimeSpan.FromMilliseconds(50), manualProvider).SubscribeAsync( - (x, ct) => + async (x, ct) => { _ = ct; items.Add(x); - _ = x == LastValue && lastEmitted.TrySetResult(); - return default; + + // Drive completion re-entrantly from the throttled emission itself. The emission + // runs on a pooled timer continuation; completing the source from the test thread + // would race that still-unwinding OnNext call and trip the witness's concurrent-call + // guard, silently dropping the completion. Completing from inside the handler keeps + // both notifications on the same thread, where re-entrant calls are permitted. + if (x == LastValue) + { + await source.Complete(Result.Success); + } }, null, _ => @@ -692,8 +699,6 @@ public async Task WhenThrottleReceivesRapidValues_ThenOnlyEmitsLatest() await Assert.That(manualProvider.TimerCount).IsEqualTo(LastValue); manualProvider.FireAll(); - await lastEmitted.Task.WaitAsync(TimeSpan.FromSeconds(5)); - await source.Complete(Result.Success); await completed.Task.WaitAsync(TimeSpan.FromSeconds(5)); await Assert.That(items).Contains(LastValue); } diff --git a/src/tests/ReactiveUI.Primitives.Extensions.Tests/Operators/ThrottleUntilTrueObservableTests.cs b/src/tests/ReactiveUI.Primitives.Extensions.Tests/Operators/ThrottleUntilTrueObservableTests.cs index 134c09c..8119262 100644 --- a/src/tests/ReactiveUI.Primitives.Extensions.Tests/Operators/ThrottleUntilTrueObservableTests.cs +++ b/src/tests/ReactiveUI.Primitives.Extensions.Tests/Operators/ThrottleUntilTrueObservableTests.cs @@ -2,6 +2,7 @@ // ReactiveUI Association Incorporated licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. +using System.Collections.Concurrent; using System.Reactive.Subjects; namespace ReactiveUI.Primitives.Extensions.Tests.Operators; @@ -64,13 +65,23 @@ public async Task WhenThrottleUntilTrueFastReplacements_ThenLatestWins() const int Earlier = 1; const int Later = 2; Subject subject = new(); - TaskCompletionSource emitted = new(TaskCreationOptions.RunContinuationsAsynchronously); - using var sub = subject.ThrottleUntilTrue(ThrottleWindow, static _ => false) - .Subscribe(v => emitted.TrySetResult(v)); + ConcurrentQueue emissions = new(); + TaskCompletionSource laterArrived = new(TaskCreationOptions.RunContinuationsAsynchronously); + using var sub = subject.ThrottleUntilTrue(ThrottleWindow, static _ => false).Subscribe(v => + { + emissions.Enqueue(v); + _ = v == Later && laterArrived.TrySetResult(); + }); subject.OnNext(Earlier); subject.OnNext(Later); - var got = await emitted.Task.WaitAsync(TimeSpan.FromSeconds(5)); - await Assert.That(got).IsEqualTo(Later); + + // Later's timer is never superseded, so it always fires; wait on that deterministically + // rather than racing the wall-clock window. Under scheduling pressure Earlier's timer may + // still slip through first, so assert the invariant the operator guarantees: whatever the + // intermediate emissions, the final value observed is the latest one. + await laterArrived.Task.WaitAsync(TimeSpan.FromSeconds(5)); + var observed = emissions.ToArray(); + await Assert.That(observed[^1]).IsEqualTo(Later); } /// Verifies that source errors are forwarded.