Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -671,14 +671,21 @@ public async Task WhenThrottleReceivesRapidValues_ThenOnlyEmitsLatest()
DirectSource<int> source = new();
List<int> items = [];
TaskCompletionSource completed = new(TaskCreationOptions.RunContinuationsAsynchronously);
TaskCompletionSource lastEmitted = new(TaskCreationOptions.RunContinuationsAsynchronously);
await using var sub = await source.Throttle(TimeSpan.FromMilliseconds(50), manualProvider).SubscribeAsync(
(x, ct) =>
async (x, ct) =>
{
_ = ct;
items.Add(x);
_ = x == LastValue && lastEmitted.TrySetResult();
return default;

// Drive completion re-entrantly from the throttled emission itself. The emission
// runs on a pooled timer continuation; completing the source from the test thread
// would race that still-unwinding OnNext call and trip the witness's concurrent-call
// guard, silently dropping the completion. Completing from inside the handler keeps
// both notifications on the same thread, where re-entrant calls are permitted.
if (x == LastValue)
{
await source.Complete(Result.Success);
}
},
null,
_ =>
Expand All @@ -692,8 +699,6 @@ public async Task WhenThrottleReceivesRapidValues_ThenOnlyEmitsLatest()

await Assert.That(manualProvider.TimerCount).IsEqualTo(LastValue);
manualProvider.FireAll();
await lastEmitted.Task.WaitAsync(TimeSpan.FromSeconds(5));
await source.Complete(Result.Success);
await completed.Task.WaitAsync(TimeSpan.FromSeconds(5));
await Assert.That(items).Contains(LastValue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System.Collections.Concurrent;
using System.Reactive.Subjects;

namespace ReactiveUI.Primitives.Extensions.Tests.Operators;
Expand Down Expand Up @@ -64,13 +65,23 @@ public async Task WhenThrottleUntilTrueFastReplacements_ThenLatestWins()
const int Earlier = 1;
const int Later = 2;
Subject<int> subject = new();
TaskCompletionSource<int> emitted = new(TaskCreationOptions.RunContinuationsAsynchronously);
using var sub = subject.ThrottleUntilTrue(ThrottleWindow, static _ => false)
.Subscribe(v => emitted.TrySetResult(v));
ConcurrentQueue<int> emissions = new();
TaskCompletionSource laterArrived = new(TaskCreationOptions.RunContinuationsAsynchronously);
using var sub = subject.ThrottleUntilTrue(ThrottleWindow, static _ => false).Subscribe(v =>
{
emissions.Enqueue(v);
_ = v == Later && laterArrived.TrySetResult();
});
subject.OnNext(Earlier);
subject.OnNext(Later);
var got = await emitted.Task.WaitAsync(TimeSpan.FromSeconds(5));
await Assert.That(got).IsEqualTo(Later);

// Later's timer is never superseded, so it always fires; wait on that deterministically
// rather than racing the wall-clock window. Under scheduling pressure Earlier's timer may
// still slip through first, so assert the invariant the operator guarantees: whatever the
// intermediate emissions, the final value observed is the latest one.
await laterArrived.Task.WaitAsync(TimeSpan.FromSeconds(5));
var observed = emissions.ToArray();
await Assert.That(observed[^1]).IsEqualTo(Later);
}

/// <summary>Verifies that source errors are forwarded.</summary>
Expand Down
Loading