From beb2a3f422443a7c5d9e848eb245cb4105b62047 Mon Sep 17 00:00:00 2001 From: Ian Griffiths Date: Thu, 18 Jul 2024 07:21:03 +0100 Subject: [PATCH 1/3] Enable application-defined Synchronize gate Our AsyncGate does not support cancellation, and for now at least we don't want to add it. (For one thing, it opens the can of worms of whether we want to attempt to support cancellation across the board in AsyncRx.NET. But also, more or less everyone who tries to add cancellation support to this sort of primitive ends up creating subtle bugs.) So this defines an IAsyncGate interface and Synchronize overloads that accept it, enabling them to work with application-supplied implementations. --- .../Linq/Operators/Synchronize.cs | 5 +- .../Threading/AsyncGate.cs | 20 ++--- .../Threading/AsyncGateReleaser.cs | 15 ++++ .../Threading/IAsyncGate.cs | 73 +++++++++++++++++++ 4 files changed, 97 insertions(+), 16 deletions(-) create mode 100644 AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateReleaser.cs create mode 100644 AsyncRx.NET/System.Reactive.Async/Threading/IAsyncGate.cs diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Synchronize.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Synchronize.cs index 27d016799c..3cce871273 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Synchronize.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Synchronize.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. +using System.Reactive.Threading; using System.Threading; namespace System.Reactive.Linq @@ -16,7 +17,7 @@ public static IAsyncObservable Synchronize(this IAsyncObservab return Create(source, static (source, observer) => source.SubscribeSafeAsync(AsyncObserver.Synchronize(observer))); } - public static IAsyncObservable Synchronize(this IAsyncObservable source, AsyncGate gate) + public static IAsyncObservable Synchronize(this IAsyncObservable source, IAsyncGate gate) { if (source == null) throw new ArgumentNullException(nameof(source)); @@ -40,7 +41,7 @@ public static IAsyncObserver Synchronize(IAsyncObserver Synchronize(IAsyncObserver observer, AsyncGate gate) + public static IAsyncObserver Synchronize(IAsyncObserver observer, IAsyncGate gate) { if (observer == null) throw new ArgumentNullException(nameof(observer)); diff --git a/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGate.cs b/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGate.cs index 507aa676f7..8d63aa8201 100644 --- a/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGate.cs +++ b/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGate.cs @@ -3,17 +3,18 @@ // See the LICENSE file in the project root for more information. using System.Diagnostics; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Threading { - public sealed class AsyncGate + public sealed class AsyncGate : IAsyncGate { private readonly object _gate = new(); private readonly SemaphoreSlim _semaphore = new(1, 1); private readonly AsyncLocal _recursionCount = new(); - public ValueTask LockAsync() + public ValueTask LockAsync() { var shouldAcquire = false; @@ -32,13 +33,13 @@ public ValueTask LockAsync() if (shouldAcquire) { - return new ValueTask(_semaphore.WaitAsync().ContinueWith(_ => new Releaser(this))); + return new ValueTask(_semaphore.WaitAsync().ContinueWith(_ => new AsyncGateReleaser(this))); } - return new ValueTask(new Releaser(this)); + return new ValueTask(new AsyncGateReleaser(this)); } - private void Release() + void IAsyncGate.Release() { lock (_gate) { @@ -50,14 +51,5 @@ private void Release() } } } - - public readonly struct Releaser : IDisposable - { - private readonly AsyncGate _parent; - - public Releaser(AsyncGate parent) => _parent = parent; - - public void Dispose() => _parent.Release(); - } } } diff --git a/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateReleaser.cs b/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateReleaser.cs new file mode 100644 index 0000000000..7ae06528dc --- /dev/null +++ b/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateReleaser.cs @@ -0,0 +1,15 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT License. +// See the LICENSE file in the project root for more information. + +namespace System.Reactive.Threading +{ + public readonly struct AsyncGateReleaser : IDisposable + { + private readonly IAsyncGate _parent; + + public AsyncGateReleaser(IAsyncGate parent) => _parent = parent; + + public void Dispose() => _parent.Release(); + } +} diff --git a/AsyncRx.NET/System.Reactive.Async/Threading/IAsyncGate.cs b/AsyncRx.NET/System.Reactive.Async/Threading/IAsyncGate.cs new file mode 100644 index 0000000000..5d34f8bf83 --- /dev/null +++ b/AsyncRx.NET/System.Reactive.Async/Threading/IAsyncGate.cs @@ -0,0 +1,73 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT License. +// See the LICENSE file in the project root for more information. + +using System.Reactive.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Reactive.Threading +{ + /// + /// Synchronization primitive that provides -style + /// exclusive access semantics, but with an asynchronous API. + /// + /// + /// + /// This enables + /// and + /// to be used to synchronize access to an observer with a custom synchronization primitive. + /// + /// + /// These methods model the equivalents for and + /// in System.Reactive. Those offer overloads accepting a 'gate' parameter, and if you pass + /// the same object to multiple calls to these methods, they will all synchronize their operation + /// through that same gate object. The gate parameter in those methods is of type + /// , which works because all .NET objects have an associated monitor. + /// (It's created on demand when you first use lock or something equivalent.) + /// + /// + /// That approach is problematic in an async world, because this built-in monitor blocks the + /// calling thread when contention occurs. The basic idea of AsyncRx.NET is to avoid such + /// blocking. It can't always be avoided, and in cases where we can be certain that lock + /// acquisition times will be short, the conventional .NET monitor is still a good choice. + /// But since these Synchronize operators allow the caller to pass a gate which the + /// application code itself might lock, we have no control over how long the lock might be + /// held. So it would be inappropriate to use a monitor here. + /// + /// + /// Since the .NET runtime does not currently offer any asynchronous direct equivalent to + /// monitor, this interface defines the required API. The class + /// provide a basic implementation. If applications require additional features, (e.g. + /// if they want cancellation support when the application tries to acquire the lock) + /// they can provide their own implementation. + /// + /// + public interface IAsyncGate + { + /// + /// Acquires the lock. + /// + /// + /// A task that completes when the lock has been acquired, returning an + /// which can be disposed to release the lock. + /// + /// + /// + /// Applications release the lock by disposing the returned by this + /// method. Typically this is done with a using statement or declaration. + /// + /// + public ValueTask LockAsync(); + + /// + /// Releases the lock. Applications typically won't call this directly, and will use + /// the returned by instead. + /// + /// + /// This method needs to be publicly accessible so that a single + /// can be shared by all implementations of this interface. + /// + public void Release(); + } +} From c9ac48c902b317e2bd599b50d85875e03345db2c Mon Sep 17 00:00:00 2001 From: Ian Griffiths Date: Fri, 6 Dec 2024 09:15:36 +0000 Subject: [PATCH 2/3] Enable AsyncGateRelease to work with either another IDisposable or an IAsyncGate. Add XML doc comments. --- .../Threading/AsyncGateReleaser.cs | 47 +++++++++++++++++-- 1 file changed, 43 insertions(+), 4 deletions(-) diff --git a/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateReleaser.cs b/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateReleaser.cs index 7ae06528dc..fc5e4e1e8e 100644 --- a/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateReleaser.cs +++ b/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateReleaser.cs @@ -4,12 +4,51 @@ namespace System.Reactive.Threading { - public readonly struct AsyncGateReleaser : IDisposable + /// + /// Returned by , enabling the caller to release the lock. + /// + public struct AsyncGateReleaser : IDisposable { - private readonly IAsyncGate _parent; + // Holds either an IAsyncGate or an IDisposable. + // In the case where this is an IAsyncGate, it's important that we try to avoid + // calling Release more than once, because this releaser is associated with just one + // call to LockAsync. IDisposable implementations are expected to be idempotent, + // so we need to remember when we've already made our one call to Release. (This + // can't be perfect because this is a struct, so callers might end up copying + // this value and then disposing each copy. But for normal using usage that won't + // be a problem, and this provides a reasonable best-effort approach. It's why + // this can't be a readonly struct though.) + private object _parentOrDisposable; - public AsyncGateReleaser(IAsyncGate parent) => _parent = parent; + /// + /// Creates an that calls + /// on its parent when disposed. + /// + /// + public AsyncGateReleaser(IAsyncGate parent) => _parentOrDisposable = parent; - public void Dispose() => _parent.Release(); + /// + /// Creates an that calls another disposable when disposed. + /// + /// + /// The implementation to which to defer. + /// + /// + /// This can be convenient for custom implementations in that wrap + /// some underlying lock implementation that returns an as the means + /// by which the lock is released. + /// + public AsyncGateReleaser(IDisposable disposable) => _parentOrDisposable = disposable; + + public void Dispose() + { + switch (_parentOrDisposable) + { + case IDisposable d: d.Dispose(); break; + case IAsyncGate g: g.Release(); break; + } + + _parentOrDisposable = null; + } } } From d00ad42db75cab963d94381f3ea308f138c0e8ea Mon Sep 17 00:00:00 2001 From: Ian Griffiths Date: Thu, 1 May 2025 08:52:18 +0100 Subject: [PATCH 3/3] Add IAsyncGateReleaser Also removed public AsyncGate ctor, and reworked code using AsyncGate directly to work in terms of IAsyncGate. --- .../Disposables/CompositeAsyncDisposable.cs | 4 +- .../Disposables/RefCountAsyncDisposable.cs | 3 +- .../Disposables/SerialAsyncDisposable.cs | 4 +- .../Internal/ScheduledAsyncObserverBase.cs | 3 +- .../Joins/AsyncJoinObserver.cs | 6 +- .../Joins/IAsyncJoinObserver.cs | 4 +- .../Linq/Operators/Amb.cs | 6 +- .../Linq/Operators/Buffer.cs | 11 ++-- .../Linq/Operators/CombineLatest.Generated.cs | 57 ++++++++++--------- .../Linq/Operators/CombineLatest.Generated.tt | 5 +- .../Linq/Operators/Delay.cs | 3 +- .../Linq/Operators/GroupByUntil.cs | 4 +- .../Linq/Operators/GroupJoin.cs | 4 +- .../Linq/Operators/Join.cs | 4 +- .../Linq/Operators/Merge.cs | 4 +- .../Linq/Operators/ObserveOn.cs | 3 +- .../Linq/Operators/RefCount.cs | 4 +- .../Linq/Operators/Sample.cs | 4 +- .../Linq/Operators/SelectMany.cs | 4 +- .../Linq/Operators/SequenceEqual.cs | 4 +- .../Linq/Operators/Skip.cs | 4 +- .../Linq/Operators/SkipUntil.cs | 6 +- .../Linq/Operators/Switch.cs | 4 +- .../Linq/Operators/Synchronize.cs | 3 +- .../Linq/Operators/Take.cs | 4 +- .../Linq/Operators/TakeUntil.cs | 6 +- .../Linq/Operators/Throttle.cs | 6 +- .../Linq/Operators/Timeout.cs | 4 +- .../Linq/Operators/When.cs | 4 +- .../Linq/Operators/Window.cs | 11 ++-- .../Linq/Operators/WithLatestFrom.cs | 6 +- .../Linq/Operators/Zip.Generated.cs | 57 ++++++++++--------- .../Linq/Operators/Zip.Generated.tt | 5 +- .../Linq/Operators/Zip.cs | 4 +- .../Subjects/BehaviorAsyncSubject.cs | 4 +- .../Subjects/ConnectableAsyncObservable.cs | 4 +- .../Subjects/ReplayAsyncSubject.cs | 4 +- .../Threading/AsyncGate.cs | 44 +++++++++++--- .../Threading/AsyncGateExtensions.cs | 43 ++++++++++++++ .../Threading/AsyncGateReleaser.cs | 54 ------------------ .../Threading/AsyncQueueLock.cs | 3 +- .../Threading/DisposableGateReleaser.cs | 15 +++++ .../Threading/IAsyncGate.cs | 22 ++----- .../Threading/IAsyncGateReleaser.cs | 24 ++++++++ 44 files changed, 270 insertions(+), 212 deletions(-) create mode 100644 AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateExtensions.cs delete mode 100644 AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateReleaser.cs create mode 100644 AsyncRx.NET/System.Reactive.Async/Threading/DisposableGateReleaser.cs create mode 100644 AsyncRx.NET/System.Reactive.Async/Threading/IAsyncGateReleaser.cs diff --git a/AsyncRx.NET/System.Reactive.Async/Disposables/CompositeAsyncDisposable.cs b/AsyncRx.NET/System.Reactive.Async/Disposables/CompositeAsyncDisposable.cs index ab78b7a00c..3742fe52a4 100644 --- a/AsyncRx.NET/System.Reactive.Async/Disposables/CompositeAsyncDisposable.cs +++ b/AsyncRx.NET/System.Reactive.Async/Disposables/CompositeAsyncDisposable.cs @@ -4,14 +4,14 @@ using System.Collections.Generic; using System.Linq; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Disposables { public sealed class CompositeAsyncDisposable : IAsyncDisposable { - private readonly AsyncGate _gate = new(); + private readonly IAsyncGate _gate = AsyncGate.Create(); private readonly List _disposables; private bool _disposed; diff --git a/AsyncRx.NET/System.Reactive.Async/Disposables/RefCountAsyncDisposable.cs b/AsyncRx.NET/System.Reactive.Async/Disposables/RefCountAsyncDisposable.cs index 93f801af2f..90f2aca8bc 100644 --- a/AsyncRx.NET/System.Reactive.Async/Disposables/RefCountAsyncDisposable.cs +++ b/AsyncRx.NET/System.Reactive.Async/Disposables/RefCountAsyncDisposable.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. +using System.Reactive.Threading; using System.Threading; using System.Threading.Tasks; @@ -9,7 +10,7 @@ namespace System.Reactive.Disposables { public sealed class RefCountAsyncDisposable : IAsyncDisposable { - private readonly AsyncGate _gate = new(); + private readonly IAsyncGate _gate = AsyncGate.Create(); private IAsyncDisposable _disposable; private bool _primaryDisposed; private int _count; diff --git a/AsyncRx.NET/System.Reactive.Async/Disposables/SerialAsyncDisposable.cs b/AsyncRx.NET/System.Reactive.Async/Disposables/SerialAsyncDisposable.cs index 94bb1a6b7d..2020a8f50c 100644 --- a/AsyncRx.NET/System.Reactive.Async/Disposables/SerialAsyncDisposable.cs +++ b/AsyncRx.NET/System.Reactive.Async/Disposables/SerialAsyncDisposable.cs @@ -2,14 +2,14 @@ // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Disposables { public sealed class SerialAsyncDisposable : IAsyncDisposable { - private readonly AsyncGate _gate = new(); + private readonly IAsyncGate _gate = AsyncGate.Create(); private IAsyncDisposable _disposable; private bool _disposed; diff --git a/AsyncRx.NET/System.Reactive.Async/Internal/ScheduledAsyncObserverBase.cs b/AsyncRx.NET/System.Reactive.Async/Internal/ScheduledAsyncObserverBase.cs index 36fe60b463..371195d644 100644 --- a/AsyncRx.NET/System.Reactive.Async/Internal/ScheduledAsyncObserverBase.cs +++ b/AsyncRx.NET/System.Reactive.Async/Internal/ScheduledAsyncObserverBase.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System.Collections.Generic; +using System.Reactive.Threading; using System.Threading; using System.Threading.Tasks; @@ -12,7 +13,7 @@ internal abstract class ScheduledAsyncObserverBase : AsyncObserverBase, IS { private readonly IAsyncObserver _observer; - private readonly AsyncGate _lock = new(); + private readonly IAsyncGate _lock = AsyncGate.Create(); private readonly Queue _queue = new(); private bool _hasFaulted = false; diff --git a/AsyncRx.NET/System.Reactive.Async/Joins/AsyncJoinObserver.cs b/AsyncRx.NET/System.Reactive.Async/Joins/AsyncJoinObserver.cs index e1e8df311e..5803eba6d5 100644 --- a/AsyncRx.NET/System.Reactive.Async/Joins/AsyncJoinObserver.cs +++ b/AsyncRx.NET/System.Reactive.Async/Joins/AsyncJoinObserver.cs @@ -5,7 +5,7 @@ using System.Collections.Generic; using System.Reactive.Disposables; using System.Reactive.Linq; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Joins @@ -18,7 +18,7 @@ internal sealed class AsyncJoinObserver : AsyncObserverBase>, private readonly List _activePlans = new(); private readonly SingleAssignmentAsyncDisposable _subscription = new(); - private AsyncGate _gate; + private IAsyncGate _gate; private bool _isDisposed; public AsyncJoinObserver(IAsyncObservable source, Func onError) @@ -56,7 +56,7 @@ public async ValueTask DisposeAsync() } } - public async Task SubscribeAsync(AsyncGate gate) + public async Task SubscribeAsync(IAsyncGate gate) { _gate = gate; diff --git a/AsyncRx.NET/System.Reactive.Async/Joins/IAsyncJoinObserver.cs b/AsyncRx.NET/System.Reactive.Async/Joins/IAsyncJoinObserver.cs index df2d71c599..eb3c488c99 100644 --- a/AsyncRx.NET/System.Reactive.Async/Joins/IAsyncJoinObserver.cs +++ b/AsyncRx.NET/System.Reactive.Async/Joins/IAsyncJoinObserver.cs @@ -2,14 +2,14 @@ // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Joins { internal interface IAsyncJoinObserver : IAsyncDisposable { - Task SubscribeAsync(AsyncGate gate); + Task SubscribeAsync(IAsyncGate gate); void Dequeue(); } diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Amb.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Amb.cs index 9105d85dc0..037688eacc 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Amb.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Amb.cs @@ -5,7 +5,7 @@ using System.Collections.Generic; using System.Linq; using System.Reactive.Disposables; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq @@ -83,7 +83,7 @@ public static (IAsyncObserver, IAsyncObserver) Amb(IA if (second == null) throw new ArgumentNullException(nameof(second)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var state = AmbState.None; @@ -199,7 +199,7 @@ public static IAsyncObserver[] Amb(IAsyncObserver obs if (subscriptions == null) throw new ArgumentNullException(nameof(subscriptions)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var winner = default(int?); diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Buffer.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Buffer.cs index 000c3b50b0..1b59fbb083 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Buffer.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Buffer.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; +using System.Reactive.Threading; using System.Threading; using System.Threading.Tasks; @@ -306,7 +307,7 @@ public static IAsyncObserver Buffer(IAsyncObserver, IAsyncDisposable)> CoreAsync() { - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var buffer = new List(); @@ -378,7 +379,7 @@ public static IAsyncObserver Buffer(IAsyncObserver, IAsyncDisposable)> CoreAsync() { - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var queue = new Queue>(); @@ -509,7 +510,7 @@ TimeSpan GetNextDue() async Task<(IAsyncObserver, IAsyncDisposable)> CoreAsync() { - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var timer = new SerialAsyncDisposable(); @@ -586,7 +587,7 @@ public static (IAsyncObserver, IAsyncObserver) Buffer< if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var buffer = new List(); @@ -660,7 +661,7 @@ public static (IAsyncObserver, IAsyncObserver) Buffer< { var closeSubscription = new SerialAsyncDisposable(); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var queueLock = new AsyncQueueLock(); var buffer = new List(); diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/CombineLatest.Generated.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/CombineLatest.Generated.cs index c8ae78381a..c5cf24f3a4 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/CombineLatest.Generated.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/CombineLatest.Generated.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System.Reactive.Disposables; +using System.Reactive.Threading; using System.Threading; using System.Threading.Tasks; @@ -1827,7 +1828,7 @@ public static (IAsyncObserver, IAsyncObserver) CombineLatest(IAs bool isDone2 = false; T2 latestValue2 = default(T2); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -1946,7 +1947,7 @@ public static (IAsyncObserver, IAsyncObserver) CombineLatest, IAsyncObserver, IAsyncObserver) Combi bool isDone3 = false; T3 latestValue3 = default(T3); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -2243,7 +2244,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver) Combi bool isDone3 = false; T3 latestValue3 = default(T3); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -2432,7 +2433,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone4 = false; T4 latestValue4 = default(T4); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -2643,7 +2644,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone4 = false; T4 latestValue4 = default(T4); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -2889,7 +2890,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone5 = false; T5 latestValue5 = default(T5); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -3146,7 +3147,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone5 = false; T5 latestValue5 = default(T5); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -3449,7 +3450,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone6 = false; T6 latestValue6 = default(T6); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -3752,7 +3753,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone6 = false; T6 latestValue6 = default(T6); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -4112,7 +4113,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone7 = false; T7 latestValue7 = default(T7); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -4461,7 +4462,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone7 = false; T7 latestValue7 = default(T7); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -4878,7 +4879,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone8 = false; T8 latestValue8 = default(T8); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -5273,7 +5274,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone8 = false; T8 latestValue8 = default(T8); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -5747,7 +5748,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone9 = false; T9 latestValue9 = default(T9); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -6188,7 +6189,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone9 = false; T9 latestValue9 = default(T9); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -6719,7 +6720,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone10 = false; T10 latestValue10 = default(T10); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -7206,7 +7207,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone10 = false; T10 latestValue10 = default(T10); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -7794,7 +7795,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone11 = false; T11 latestValue11 = default(T11); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -8327,7 +8328,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone11 = false; T11 latestValue11 = default(T11); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -8972,7 +8973,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone12 = false; T12 latestValue12 = default(T12); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -9551,7 +9552,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone12 = false; T12 latestValue12 = default(T12); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -10253,7 +10254,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone13 = false; T13 latestValue13 = default(T13); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -10878,7 +10879,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone13 = false; T13 latestValue13 = default(T13); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -11637,7 +11638,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone14 = false; T14 latestValue14 = default(T14); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -12308,7 +12309,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone14 = false; T14 latestValue14 = default(T14); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -13124,7 +13125,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone15 = false; T15 latestValue15 = default(T15); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -13841,7 +13842,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn bool isDone15 = false; T15 latestValue15 = default(T15); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/CombineLatest.Generated.tt b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/CombineLatest.Generated.tt index 5e42119a4b..e6b44999df 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/CombineLatest.Generated.tt +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/CombineLatest.Generated.tt @@ -9,6 +9,7 @@ <#@ import namespace="System.Collections.Generic" #> <#@ output extension=".cs" #> using System.Reactive.Disposables; +using System.Reactive.Threading; using System.Threading; using System.Threading.Tasks; @@ -161,7 +162,7 @@ for (var j = 1; j <= i; j++) } #> - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( @@ -248,7 +249,7 @@ for (var j = 1; j <= i; j++) } #> - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Delay.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Delay.cs index 918871500f..c5510bfe40 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Delay.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Delay.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; +using System.Reactive.Threading; using System.Threading; using System.Threading.Tasks; @@ -76,7 +77,7 @@ public partial class AsyncObserver var semaphore = new SemaphoreSlim(0); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var queue = new Queue>(); var isDone = false; diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs index 5221045a9f..a724d17b9c 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs @@ -6,7 +6,7 @@ using System.Collections.Generic; using System.Reactive.Disposables; using System.Reactive.Subjects; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq @@ -609,7 +609,7 @@ public partial class AsyncObserver groups = new ConcurrentDictionary>(Environment.ProcessorCount * 4, capacity, comparer); } - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var nullGate = new object(); var nullGroup = default(IAsyncSubject); diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupJoin.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupJoin.cs index 93d855bc95..1718da00b2 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupJoin.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupJoin.cs @@ -5,7 +5,7 @@ using System.Collections.Generic; using System.Reactive.Disposables; using System.Reactive.Subjects; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq @@ -57,7 +57,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncDisposable) if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var group = new CompositeAsyncDisposable(subscriptions); var refCount = new RefCountAsyncDisposable(group); diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Join.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Join.cs index 5d9971479b..0c78aa7892 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Join.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Join.cs @@ -4,7 +4,7 @@ using System.Collections.Generic; using System.Reactive.Disposables; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq @@ -59,7 +59,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncDisposable) if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var group = new CompositeAsyncDisposable(subscriptions); diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Merge.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Merge.cs index f03a496dbb..4f111931a7 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Merge.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Merge.cs @@ -3,7 +3,7 @@ // See the LICENSE file in the project root for more information. using System.Reactive.Disposables; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq @@ -35,7 +35,7 @@ public static (IAsyncObserver>, IAsyncDisposable) Merg if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var count = 1; diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/ObserveOn.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/ObserveOn.cs index d4949b1e1c..3fc84e0f3f 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/ObserveOn.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/ObserveOn.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; +using System.Reactive.Threading; using System.Threading; using System.Threading.Tasks; @@ -44,7 +45,7 @@ public partial class AsyncObserver var semaphore = new SemaphoreSlim(0); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var queue = new Queue(); var error = default(Exception); diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/RefCount.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/RefCount.cs index 60da700b31..309c01845d 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/RefCount.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/RefCount.cs @@ -4,7 +4,7 @@ using System.Reactive.Disposables; using System.Reactive.Subjects; -using System.Threading; +using System.Reactive.Threading; namespace System.Reactive.Linq { @@ -15,7 +15,7 @@ public static IAsyncObservable RefCount(this IConnectableAsync if (source == null) throw new ArgumentNullException(nameof(source)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var count = 0; var connectable = default(IAsyncDisposable); diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Sample.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Sample.cs index e8752d3d8f..aa326863ed 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Sample.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Sample.cs @@ -4,7 +4,7 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq @@ -78,7 +78,7 @@ public static (IAsyncObserver, IAsyncObserver) Sample, IAsyncDisposable) SelectMany, IAsyncObserver) SequenceEqual(); var queueRight = new Queue(); diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Skip.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Skip.cs index 126dbb00d3..1b59adeb19 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Skip.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Skip.cs @@ -4,7 +4,7 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq @@ -130,7 +130,7 @@ public static IAsyncObserver Skip(IAsyncObserver obse // REVIEW: May be easier to just use SkipUntil with a Timer parameter. Do we want Skip on the observer? // DESIGN: It seems that if an observer would be an IAsyncDisposable, this could get a bit easier ("inject" the inner disposable). - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var open = false; return diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/SkipUntil.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/SkipUntil.cs index 7f6c478d79..75b9ab35d1 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/SkipUntil.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/SkipUntil.cs @@ -4,7 +4,7 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq @@ -87,7 +87,7 @@ public static (IAsyncObserver, IAsyncObserver) SkipUntil, IAsyncObserver) SkipUntil>, IAsyncDisposable) Swit if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var isStopped = false; var hasLatest = false; diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Synchronize.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Synchronize.cs index 3cce871273..32fd3c3da0 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Synchronize.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Synchronize.cs @@ -3,7 +3,6 @@ // See the LICENSE file in the project root for more information. using System.Reactive.Threading; -using System.Threading; namespace System.Reactive.Linq { @@ -38,7 +37,7 @@ public static IAsyncObserver Synchronize(IAsyncObserver Synchronize(IAsyncObserver observer, IAsyncGate gate) diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Take.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Take.cs index d0064c23a2..63cce1f73e 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Take.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Take.cs @@ -4,7 +4,7 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq @@ -130,7 +130,7 @@ public static IAsyncObserver Take(IAsyncObserver obse // REVIEW: May be easier to just use TakeUntil with a Timer parameter. Do we want TakeUntil on the observer? // DESIGN: It seems that if an observer would be an IAsyncDisposable, this could get a bit easier ("inject" the inner disposable). - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); return ( diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/TakeUntil.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/TakeUntil.cs index b71eb727af..5d70568b39 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/TakeUntil.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/TakeUntil.cs @@ -4,7 +4,7 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq @@ -87,7 +87,7 @@ public static (IAsyncObserver, IAsyncObserver) TakeUntil, IAsyncObserver) TakeUntil, IAsyncDisposable) Throttle(IAsy if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var timer = new SerialAsyncDisposable(); @@ -187,7 +187,7 @@ public static (IAsyncObserver, IAsyncDisposable) Throttle, IAsyncDisposable)> CoreAsync() { - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var switched = false; var id = 0UL; diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/When.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/When.cs index 562c28ad3d..5155540351 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/When.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/When.cs @@ -5,7 +5,7 @@ using System.Collections.Generic; using System.Reactive.Disposables; using System.Reactive.Joins; -using System.Threading; +using System.Reactive.Threading; namespace System.Reactive.Linq { @@ -19,7 +19,7 @@ public static IAsyncObservable When(IEnumerable(async observer => { var externalSubscriptions = new Dictionary(); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var activePlans = new List(); var outputObserver = AsyncObserver.Create( diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Window.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Window.cs index aafbe08e33..886d511879 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Window.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Window.cs @@ -6,6 +6,7 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Subjects; +using System.Reactive.Threading; using System.Threading; using System.Threading.Tasks; @@ -296,7 +297,7 @@ public static (IAsyncObserver, IAsyncDisposable) Window(IAsync if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var window = default(IAsyncSubject); var d = new CompositeAsyncDisposable(); @@ -382,7 +383,7 @@ async Task CreateWindowAsync() if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var d = new CompositeAsyncDisposable(); var timer = new SerialAsyncDisposable(); @@ -538,7 +539,7 @@ async Task CreateTimer() if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var n = 0; var window = default(IAsyncSubject); @@ -649,7 +650,7 @@ async Task> CreateWindowAsync() if (subscription == null) throw new ArgumentNullException(nameof(subscription)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var refCount = new RefCountAsyncDisposable(subscription); var window = default(IAsyncSubject); @@ -736,7 +737,7 @@ async Task CreateWindowAsync() var closeSubscription = new SerialAsyncDisposable(); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var queueLock = new AsyncQueueLock(); var refCount = new RefCountAsyncDisposable(subscription); diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/WithLatestFrom.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/WithLatestFrom.cs index 859aaaca2e..c15d3d2c08 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/WithLatestFrom.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/WithLatestFrom.cs @@ -3,7 +3,7 @@ // See the LICENSE file in the project root for more information. using System.Reactive.Disposables; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq @@ -103,7 +103,7 @@ public static (IAsyncObserver, IAsyncObserver) WithLatestFrom, IAsyncObserver) WithLatestFrom, IAsyncObserver) Zip(IAsyncObserve if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -1913,7 +1914,7 @@ public static (IAsyncObserver, IAsyncObserver) Zip(IAsy if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -2007,7 +2008,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(); var values2 = new Queue(); @@ -2103,7 +2104,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver) Zip(); var values2 = new Queue(); @@ -2199,7 +2200,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -2297,7 +2298,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -2395,7 +2396,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -2495,7 +2496,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -2595,7 +2596,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -2697,7 +2698,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -2799,7 +2800,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -2903,7 +2904,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -3007,7 +3008,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -3113,7 +3114,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -3219,7 +3220,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -3327,7 +3328,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -3435,7 +3436,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -3545,7 +3546,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -3655,7 +3656,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -3767,7 +3768,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -3879,7 +3880,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -3993,7 +3994,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -4107,7 +4108,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -4223,7 +4224,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -4339,7 +4340,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -4457,7 +4458,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -4575,7 +4576,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); @@ -4695,7 +4696,7 @@ public static (IAsyncObserver, IAsyncObserver, IAsyncObserver, IAsyn if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); var values1 = new Queue(); var values2 = new Queue(); diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Zip.Generated.tt b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Zip.Generated.tt index f21866efc4..fd34ced99c 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Zip.Generated.tt +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Zip.Generated.tt @@ -10,6 +10,7 @@ <#@ output extension=".cs" #> using System.Collections.Generic; using System.Reactive.Disposables; +using System.Reactive.Threading; using System.Threading; using System.Threading.Tasks; @@ -152,7 +153,7 @@ for (var i = 2; i <= 15; i++) if (observer == null) throw new ArgumentNullException(nameof(observer)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); <# for (var j = 1; j <= i; j++) @@ -258,7 +259,7 @@ for (var j = 1; j <= i; j++) if (selector == null) throw new ArgumentNullException(nameof(selector)); - var gate = new AsyncGate(); + var gate = AsyncGate.Create(); <# for (var j = 1; j <= i; j++) diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Zip.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Zip.cs index 4deb6427c7..f61d83348b 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Zip.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/Zip.cs @@ -5,7 +5,7 @@ using System.Collections.Generic; using System.Linq; using System.Reactive.Disposables; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq @@ -50,7 +50,7 @@ public static IAsyncObserver[] Zip(IAsyncObserver[count]; var isDone = new bool[count]; diff --git a/AsyncRx.NET/System.Reactive.Async/Subjects/BehaviorAsyncSubject.cs b/AsyncRx.NET/System.Reactive.Async/Subjects/BehaviorAsyncSubject.cs index 5f12f775c3..94c871c035 100644 --- a/AsyncRx.NET/System.Reactive.Async/Subjects/BehaviorAsyncSubject.cs +++ b/AsyncRx.NET/System.Reactive.Async/Subjects/BehaviorAsyncSubject.cs @@ -4,14 +4,14 @@ using System.Collections.Generic; using System.Reactive.Disposables; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Subjects { public abstract class BehaviorAsyncSubject : IAsyncSubject { - private readonly AsyncGate _gate = new(); + private readonly IAsyncGate _gate = AsyncGate.Create(); private readonly List> _observers = new(); private T _value; private bool _done; diff --git a/AsyncRx.NET/System.Reactive.Async/Subjects/ConnectableAsyncObservable.cs b/AsyncRx.NET/System.Reactive.Async/Subjects/ConnectableAsyncObservable.cs index 3723ce014f..daa7328826 100644 --- a/AsyncRx.NET/System.Reactive.Async/Subjects/ConnectableAsyncObservable.cs +++ b/AsyncRx.NET/System.Reactive.Async/Subjects/ConnectableAsyncObservable.cs @@ -3,7 +3,7 @@ // See the LICENSE file in the project root for more information. using System.Reactive.Linq; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Subjects @@ -12,7 +12,7 @@ internal sealed class ConnectableAsyncObservable : IConnectabl { private readonly IAsyncSubject _subject; private readonly IAsyncObservable _source; - private readonly AsyncGate _gate = new(); + private readonly IAsyncGate _gate = AsyncGate.Create(); private Connection _connection; diff --git a/AsyncRx.NET/System.Reactive.Async/Subjects/ReplayAsyncSubject.cs b/AsyncRx.NET/System.Reactive.Async/Subjects/ReplayAsyncSubject.cs index 5928dc9957..49f614ba17 100644 --- a/AsyncRx.NET/System.Reactive.Async/Subjects/ReplayAsyncSubject.cs +++ b/AsyncRx.NET/System.Reactive.Async/Subjects/ReplayAsyncSubject.cs @@ -6,7 +6,7 @@ using System.Linq; using System.Reactive.Concurrency; using System.Reactive.Disposables; -using System.Threading; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Reactive.Subjects @@ -120,7 +120,7 @@ public ReplayAsyncSubject(bool concurrent, int bufferSize, TimeSpan window, IAsy private abstract class ReplayBase : IAsyncSubject { private readonly bool _concurrent; - private readonly AsyncGate _lock = new(); + private readonly IAsyncGate _lock = AsyncGate.Create(); private readonly List> _observers = new(); // TODO: immutable array private bool _done; private Exception _error; diff --git a/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGate.cs b/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGate.cs index 8d63aa8201..7665946534 100644 --- a/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGate.cs +++ b/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGate.cs @@ -3,18 +3,43 @@ // See the LICENSE file in the project root for more information. using System.Diagnostics; -using System.Reactive.Threading; +using System.Threading; using System.Threading.Tasks; -namespace System.Threading +namespace System.Reactive.Threading { - public sealed class AsyncGate : IAsyncGate + /// + /// Provides an implementation of , enabling mutually exclusive locking + /// in async code. + /// + public sealed class AsyncGate : IAsyncGate, IAsyncGateReleaser { private readonly object _gate = new(); private readonly SemaphoreSlim _semaphore = new(1, 1); private readonly AsyncLocal _recursionCount = new(); - public ValueTask LockAsync() + /// + /// Creates an . + /// + /// + /// This is private because we hope that one day, the .NET runtime will provide a built-in + /// asynchronous mutual exclusion primitive, and that we might be able to use that instead of + /// our own implementation. Although that might be something we could do by modifying this + /// class, it might prove useful to be able to provide the old implementation for backwards + /// compatibility, so we don't want AsyncRx.NET consumers to depend on a specific concrete type + /// as the implementation. + /// + private AsyncGate() + { + } + + /// + /// Creates a new instance of an implementation. + /// + /// + public static IAsyncGate Create() => new AsyncGate(); + + ValueTask IAsyncGate.AcquireAsync() { var shouldAcquire = false; @@ -33,13 +58,18 @@ public ValueTask LockAsync() if (shouldAcquire) { - return new ValueTask(_semaphore.WaitAsync().ContinueWith(_ => new AsyncGateReleaser(this))); + Task acquireTask = _semaphore.WaitAsync(); + if (acquireTask.IsCompleted) + { + return new ValueTask(this); + } + return new ValueTask(acquireTask.ContinueWith(_ => this)); } - return new ValueTask(new AsyncGateReleaser(this)); + return new ValueTask(this); } - void IAsyncGate.Release() + void IAsyncGateReleaser.Release() { lock (_gate) { diff --git a/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateExtensions.cs b/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateExtensions.cs new file mode 100644 index 0000000000..f801251e74 --- /dev/null +++ b/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateExtensions.cs @@ -0,0 +1,43 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT License. +// See the LICENSE file in the project root for more information. + +using System.Threading.Tasks; + +namespace System.Reactive.Threading; + +/// +/// Extension methods for . +/// +public static class AsyncGateExtensions +{ + /// + /// Acquires an in a way enables the gate to be released with a + /// statement or declaration. + /// + /// The gate to lock. + /// + /// A that produces a that will call + /// when disposed. + /// + public static ValueTask LockAsync(this IAsyncGate gate) + { + // Note, we are avoiding async/await here because we MUST NOT create a new child ExecutionContext + // (The AsyncGate.LockAsync method does not use async/await either, and for the same reason.) + // + // IAsyncGate implementations are allowed to require that their LockAsync method is called from the same + // execution context as Release will be called. For example, AsyncGate uses an AsyncLocal to track + // the recursion count, and when you update an AsyncLocal's value, that modified value is visible only + // in the current ExecutionContext and its descendants. An async method effectively introduces a new child + // context, so any AsyncLocal value changes are lost when an async method returns, but we need the + // recursion count to live in our caller's context, which is why we must make sure we don't introduce a + // new child context here. That's why this needs to be old-school manual task management, and not async/await. + ValueTask releaserValueTask = gate.AcquireAsync(); + if (releaserValueTask.IsCompleted) + { + return new ValueTask(new DisposableGateReleaser(releaserValueTask.Result)); + } + + return new ValueTask(releaserValueTask.AsTask().ContinueWith(t => new DisposableGateReleaser(t.Result))); + } +} diff --git a/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateReleaser.cs b/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateReleaser.cs deleted file mode 100644 index fc5e4e1e8e..0000000000 --- a/AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateReleaser.cs +++ /dev/null @@ -1,54 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT License. -// See the LICENSE file in the project root for more information. - -namespace System.Reactive.Threading -{ - /// - /// Returned by , enabling the caller to release the lock. - /// - public struct AsyncGateReleaser : IDisposable - { - // Holds either an IAsyncGate or an IDisposable. - // In the case where this is an IAsyncGate, it's important that we try to avoid - // calling Release more than once, because this releaser is associated with just one - // call to LockAsync. IDisposable implementations are expected to be idempotent, - // so we need to remember when we've already made our one call to Release. (This - // can't be perfect because this is a struct, so callers might end up copying - // this value and then disposing each copy. But for normal using usage that won't - // be a problem, and this provides a reasonable best-effort approach. It's why - // this can't be a readonly struct though.) - private object _parentOrDisposable; - - /// - /// Creates an that calls - /// on its parent when disposed. - /// - /// - public AsyncGateReleaser(IAsyncGate parent) => _parentOrDisposable = parent; - - /// - /// Creates an that calls another disposable when disposed. - /// - /// - /// The implementation to which to defer. - /// - /// - /// This can be convenient for custom implementations in that wrap - /// some underlying lock implementation that returns an as the means - /// by which the lock is released. - /// - public AsyncGateReleaser(IDisposable disposable) => _parentOrDisposable = disposable; - - public void Dispose() - { - switch (_parentOrDisposable) - { - case IDisposable d: d.Dispose(); break; - case IAsyncGate g: g.Release(); break; - } - - _parentOrDisposable = null; - } - } -} diff --git a/AsyncRx.NET/System.Reactive.Async/Threading/AsyncQueueLock.cs b/AsyncRx.NET/System.Reactive.Async/Threading/AsyncQueueLock.cs index 407924a459..5b9d8e6a2b 100644 --- a/AsyncRx.NET/System.Reactive.Async/Threading/AsyncQueueLock.cs +++ b/AsyncRx.NET/System.Reactive.Async/Threading/AsyncQueueLock.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System.Collections.Generic; +using System.Reactive.Threading; using System.Threading.Tasks; namespace System.Threading @@ -10,7 +11,7 @@ namespace System.Threading public sealed class AsyncQueueLock : IAsyncDisposable { private readonly Queue> _queue = new(); - private readonly AsyncGate _gate = new(); + private readonly IAsyncGate _gate = AsyncGate.Create(); private bool _isAcquired; private bool _hasFaulted; diff --git a/AsyncRx.NET/System.Reactive.Async/Threading/DisposableGateReleaser.cs b/AsyncRx.NET/System.Reactive.Async/Threading/DisposableGateReleaser.cs new file mode 100644 index 0000000000..5d0bce6912 --- /dev/null +++ b/AsyncRx.NET/System.Reactive.Async/Threading/DisposableGateReleaser.cs @@ -0,0 +1,15 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT License. +// See the LICENSE file in the project root for more information. + +namespace System.Reactive.Threading; + +/// +/// Enables a statement or declaration to be used to release an +/// . Typically obtained through +/// +/// +public struct DisposableGateReleaser(IAsyncGateReleaser gateReleaser) : IDisposable +{ + public void Dispose() => gateReleaser.Release(); +} diff --git a/AsyncRx.NET/System.Reactive.Async/Threading/IAsyncGate.cs b/AsyncRx.NET/System.Reactive.Async/Threading/IAsyncGate.cs index 5d34f8bf83..68892a4489 100644 --- a/AsyncRx.NET/System.Reactive.Async/Threading/IAsyncGate.cs +++ b/AsyncRx.NET/System.Reactive.Async/Threading/IAsyncGate.cs @@ -3,7 +3,6 @@ // See the LICENSE file in the project root for more information. using System.Reactive.Linq; -using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Threading @@ -49,25 +48,16 @@ public interface IAsyncGate /// Acquires the lock. /// /// - /// A task that completes when the lock has been acquired, returning an - /// which can be disposed to release the lock. + /// A task that completes when the lock has been acquired, returning an + /// with which to release the lock. /// /// /// - /// Applications release the lock by disposing the returned by this - /// method. Typically this is done with a using statement or declaration. + /// Applications release the lock by calling on the object + /// returned by this method. Typically this is done with a using statement or declaration by + /// using the extension method. /// /// - public ValueTask LockAsync(); - - /// - /// Releases the lock. Applications typically won't call this directly, and will use - /// the returned by instead. - /// - /// - /// This method needs to be publicly accessible so that a single - /// can be shared by all implementations of this interface. - /// - public void Release(); + public ValueTask AcquireAsync(); } } diff --git a/AsyncRx.NET/System.Reactive.Async/Threading/IAsyncGateReleaser.cs b/AsyncRx.NET/System.Reactive.Async/Threading/IAsyncGateReleaser.cs new file mode 100644 index 0000000000..4b1d8df33f --- /dev/null +++ b/AsyncRx.NET/System.Reactive.Async/Threading/IAsyncGateReleaser.cs @@ -0,0 +1,24 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT License. +// See the LICENSE file in the project root for more information. + +namespace System.Reactive.Threading; + +/// +/// Releases a lock acquired from . +/// +/// +/// +/// Note that implementations of may return a reference to themselves +/// as the , so callers should not depend on each lock +/// acquisition returning a distinct . (This enables gate +/// implementations to avoid unnecessary allocation during lock acquisition.) +/// +/// +public interface IAsyncGateReleaser +{ + /// + /// Releases a lock acquired from . + /// + void Release(); +}