Skip to content

Commit 023e473

Browse files
committed
feat: Implement AsynceEventLoop and MultiThreadedEventLoopGroup using Swift Concurrency and the AsyncEventLoopExecutor. Forms a key foundation for several vapor repositories to use NIO without using NIOPosix. This enables a large amount of wasm compilation for packages that currently consume NIOPosix.
1 parent d7f5ff7 commit 023e473

File tree

2 files changed

+436
-0
lines changed

2 files changed

+436
-0
lines changed
Lines changed: 355 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,355 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// Copyright (c) 2025 PassiveLogic, Inc.
4+
// Licensed under Apache License v2.0
5+
//
6+
// See LICENSE.txt for license information
7+
//
8+
// SPDX-License-Identifier: Apache-2.0
9+
//
10+
//===----------------------------------------------------------------------===//
11+
12+
import Atomics
13+
import NIOCore
14+
15+
import struct Foundation.UUID
16+
17+
#if canImport(Dispatch)
18+
import Dispatch
19+
#endif
20+
21+
// MARK: - AsyncEventLoop -
22+
23+
/// A single‑threaded `EventLoop` implemented solely with Swift Concurrency.
24+
@available(macOS 13, *)
25+
public final class AsyncEventLoop: EventLoop, @unchecked Sendable {
26+
public enum AsynceEventLoopError: Error {
27+
case cancellationFailure
28+
}
29+
30+
private let _id = UUID() // unique identifier
31+
private let executor: AsyncEventLoopExecutor
32+
private var cachedSucceededVoidFuture: EventLoopFuture<Void>?
33+
private enum ShutdownState: UInt8 {
34+
case running = 0
35+
case closing = 1
36+
case closed = 2
37+
}
38+
private let shutdownState = ManagedAtomic<UInt8>(ShutdownState.running.rawValue)
39+
40+
public init(manualTimeModeForTesting: Bool = false) {
41+
self.executor = AsyncEventLoopExecutor(loopID: _id, manualTimeMode: manualTimeModeForTesting)
42+
}
43+
44+
// MARK: - EventLoop basics -
45+
46+
public var inEventLoop: Bool {
47+
_CurrentEventLoopKey.id == _id
48+
}
49+
50+
private func isAcceptingNewTasks() -> Bool {
51+
shutdownState.load(ordering: .acquiring) == ShutdownState.running.rawValue
52+
}
53+
54+
private func isFullyShutdown() -> Bool {
55+
shutdownState.load(ordering: .acquiring) == ShutdownState.closed.rawValue
56+
}
57+
58+
@_disfavoredOverload
59+
public func execute(_ task: @escaping @Sendable () -> Void) {
60+
guard self.isAcceptingNewTasks() || self._canAcceptExecuteDuringShutdown else { return }
61+
executor.enqueue(task)
62+
}
63+
64+
private var _canAcceptExecuteDuringShutdown: Bool {
65+
self.inEventLoop
66+
|| MultiThreadedEventLoopGroup._GroupContextKey.isFromMultiThreadedEventLoopGroup
67+
}
68+
69+
// MARK: - Promises / Futures -
70+
71+
public func makeSucceededFuture<T: Sendable>(_ value: T) -> EventLoopFuture<T> {
72+
if T.self == Void.self {
73+
return self.makeSucceededVoidFuture() as! EventLoopFuture<T>
74+
}
75+
let p = makePromise(of: T.self)
76+
p.succeed(value)
77+
return p.futureResult
78+
}
79+
80+
public func makeFailedFuture<T>(_ error: Error) -> EventLoopFuture<T> {
81+
let p = makePromise(of: T.self)
82+
p.fail(error)
83+
return p.futureResult
84+
}
85+
86+
public func makeSucceededVoidFuture() -> EventLoopFuture<Void> {
87+
if self.inEventLoop {
88+
if let cached = self.cachedSucceededVoidFuture {
89+
return cached
90+
}
91+
let future = self.makeSucceededVoidFutureUncached()
92+
self.cachedSucceededVoidFuture = future
93+
return future
94+
} else {
95+
return self.makeSucceededVoidFutureUncached()
96+
}
97+
}
98+
99+
private func makeSucceededVoidFutureUncached() -> EventLoopFuture<Void> {
100+
let promise = self.makePromise(of: Void.self)
101+
promise.succeed(())
102+
return promise.futureResult
103+
}
104+
105+
// MARK: - Submitting work -
106+
@preconcurrency
107+
public func submit<T>(_ task: @escaping @Sendable () throws -> T) -> EventLoopFuture<T> {
108+
self.submit { () throws -> _UncheckedSendable<T> in
109+
_UncheckedSendable(try task())
110+
}.map { $0.value }
111+
}
112+
113+
public func submit<T: Sendable>(_ task: @escaping @Sendable () throws -> T) -> EventLoopFuture<T>
114+
{
115+
guard self.isAcceptingNewTasks() else {
116+
return self.makeFailedFuture(EventLoopError.shutdown)
117+
}
118+
let promise = makePromise(of: T.self)
119+
executor.enqueue {
120+
do {
121+
let value = try task()
122+
promise.succeed(value)
123+
} catch { promise.fail(error) }
124+
}
125+
return promise.futureResult
126+
}
127+
128+
public func flatSubmit<T: Sendable>(_ task: @escaping @Sendable () -> EventLoopFuture<T>)
129+
-> EventLoopFuture<T>
130+
{
131+
guard self.isAcceptingNewTasks() else {
132+
return self.makeFailedFuture(EventLoopError.shutdown)
133+
}
134+
let promise = makePromise(of: T.self)
135+
executor.enqueue {
136+
let future = task()
137+
future.cascade(to: promise)
138+
}
139+
return promise.futureResult
140+
}
141+
142+
// MARK: - Scheduling -
143+
144+
/// NOTE:
145+
///
146+
/// Timing for execute vs submit vs schedule:
147+
///
148+
/// Tasks scheduled via `execute` or `submit` are appended to the back of the event loop's task queue
149+
/// and are executed serially in FIFO order. Scheduled tasks (e.g., via `schedule(deadline:)`) are
150+
/// placed in a timing wheel and, when their deadline arrives, are enqueued at the back of the main
151+
/// queue after any already-pending work. This means that if the event loop is backed up, a scheduled
152+
/// task may execute slightly after its scheduled time, as it must wait for previously enqueued tasks
153+
/// to finish. Scheduled tasks never preempt or jump ahead of already-queued immediate work.
154+
@preconcurrency
155+
public func scheduleTask<T>(
156+
deadline: NIODeadline,
157+
_ task: @escaping @Sendable () throws -> T
158+
) -> Scheduled<T> {
159+
let scheduled: Scheduled<_UncheckedSendable<T>> = self._scheduleTask(
160+
deadline: deadline,
161+
task: { try _UncheckedSendable(task()) }
162+
)
163+
return self._unsafelyRewrapScheduled(scheduled)
164+
}
165+
166+
public func scheduleTask<T: Sendable>(
167+
deadline: NIODeadline,
168+
_ task: @escaping @Sendable () throws -> T
169+
) -> Scheduled<T> {
170+
self._scheduleTask(deadline: deadline, task: task)
171+
}
172+
173+
@preconcurrency
174+
public func scheduleTask<T>(
175+
in delay: TimeAmount,
176+
_ task: @escaping @Sendable () throws -> T
177+
) -> Scheduled<T> {
178+
let scheduled: Scheduled<_UncheckedSendable<T>> = self._scheduleTask(
179+
in: delay,
180+
task: { try _UncheckedSendable(task()) }
181+
)
182+
return self._unsafelyRewrapScheduled(scheduled)
183+
}
184+
185+
public func scheduleTask<T: Sendable>(
186+
in delay: TimeAmount,
187+
_ task: @escaping @Sendable () throws -> T
188+
) -> Scheduled<T> {
189+
self._scheduleTask(in: delay, task: task)
190+
}
191+
192+
private func _scheduleTask<T: Sendable>(
193+
deadline: NIODeadline,
194+
task: @escaping @Sendable () throws -> T
195+
) -> Scheduled<T> {
196+
let promise = makePromise(of: T.self)
197+
guard self.isAcceptingNewTasks() else {
198+
promise.fail(EventLoopError._shutdown)
199+
return Scheduled(promise: promise) {}
200+
}
201+
202+
let jobID = executor.schedule(
203+
at: deadline,
204+
job: {
205+
do {
206+
promise.succeed(try task())
207+
} catch {
208+
promise.fail(error)
209+
}
210+
},
211+
failFn: { error in
212+
promise.fail(error)
213+
}
214+
)
215+
216+
return Scheduled(promise: promise) { [weak self] in
217+
// NOTE: Documented cancellation procedure indicates
218+
// cancellation is not guaranteed. As such, and to match existing Promise API's,
219+
// using a Task here to avoid pushing async up the software stack.
220+
self?.executor.cancelScheduledJob(withID: jobID)
221+
222+
// NOTE: NIO Core already fails the promise before calling the cancellation closure,
223+
// so we do NOT try to fail the promise. Also cancellation is not guaranteed, so we
224+
// allow cancellation to silently fail rather than re-negotiating to a throwing API.
225+
}
226+
}
227+
228+
private func _scheduleTask<T: Sendable>(
229+
in delay: TimeAmount,
230+
task: @escaping @Sendable () throws -> T
231+
) -> Scheduled<T> {
232+
// NOTE: This is very similar to the `scheduleTask(deadline:)` implementation. However
233+
// due to the nonisolated context here, we keep the implementations separate until they
234+
// reach isolating mechanisms within the executor.
235+
236+
let promise = makePromise(of: T.self)
237+
guard self.isAcceptingNewTasks() else {
238+
promise.fail(EventLoopError._shutdown)
239+
return Scheduled(promise: promise) {}
240+
}
241+
242+
let jobID = executor.schedule(
243+
after: delay,
244+
job: {
245+
do {
246+
promise.succeed(try task())
247+
} catch {
248+
promise.fail(error)
249+
}
250+
},
251+
failFn: { error in
252+
promise.fail(error)
253+
}
254+
)
255+
256+
return Scheduled(promise: promise) { [weak self] in
257+
// NOTE: Documented cancellation procedure indicates
258+
// cancellation is not guaranteed. As such, and to match existing Promise API's,
259+
// using a Task here to avoid pushing async up the software stack.
260+
self?.executor.cancelScheduledJob(withID: jobID)
261+
262+
// NOTE: NIO Core already fails the promise before calling the cancellation closure,
263+
// so we do NOT try to fail the promise. Also cancellation is not guaranteed, so we
264+
// allow cancellation to silently fail rather than re-negotiating to a throwing API.
265+
}
266+
}
267+
268+
func closeGracefully() async {
269+
let previous = shutdownState.exchange(ShutdownState.closing.rawValue, ordering: .acquiring)
270+
guard ShutdownState(rawValue: previous) != .closed else { return }
271+
self.cachedSucceededVoidFuture = nil
272+
await executor.clearQueue()
273+
shutdownState.store(ShutdownState.closed.rawValue, ordering: .releasing)
274+
}
275+
276+
public func next() -> EventLoop {
277+
self
278+
}
279+
public func any() -> EventLoop {
280+
self
281+
}
282+
283+
/// Moves time forward by specified increment, and runs event loop, causing
284+
/// all pending events either from enqueing or scheduling requirements to run.
285+
func advanceTime(by increment: TimeAmount) async throws {
286+
try await executor.advanceTime(by: increment)
287+
}
288+
289+
func advanceTime(to deadline: NIODeadline) async throws {
290+
try await executor.advanceTime(to: deadline)
291+
}
292+
293+
func run() async {
294+
await executor.run()
295+
}
296+
297+
#if canImport(Dispatch)
298+
public func shutdownGracefully(
299+
queue: DispatchQueue, _ callback: @escaping @Sendable (Error?) -> Void
300+
) {
301+
if MultiThreadedEventLoopGroup._GroupContextKey.isFromMultiThreadedEventLoopGroup {
302+
Task {
303+
await closeGracefully()
304+
queue.async { callback(nil) }
305+
}
306+
} else {
307+
// Bypassing the group shutdown and calling an event loop
308+
// shutdown directly is considered api-misuse
309+
callback(EventLoopError.unsupportedOperation)
310+
}
311+
}
312+
#endif
313+
314+
public func syncShutdownGracefully() throws {
315+
// The test AsyncEventLoopTests.testIllegalCloseOfEventLoopFails requires
316+
// this implementation to throw an error, because uses should call shutdown on
317+
// MultiThreadedEventLoopGroup instead of calling it directly on the loop.
318+
throw EventLoopError.unsupportedOperation
319+
}
320+
321+
public func shutdownGracefully() async throws {
322+
await self.closeGracefully()
323+
}
324+
325+
#if !canImport(Dispatch)
326+
public func _preconditionSafeToSyncShutdown(file: StaticString, line: UInt) {
327+
assertionFailure("Synchronous shutdown API's are not currently supported by AsyncEventLoop")
328+
}
329+
#endif
330+
331+
@preconcurrency
332+
private func _unsafelyRewrapScheduled<T>(
333+
_ scheduled: Scheduled<_UncheckedSendable<T>>
334+
) -> Scheduled<T> {
335+
let promise = self.makePromise(of: T.self)
336+
scheduled.futureResult.whenComplete { result in
337+
switch result {
338+
case .success(let boxed):
339+
promise.assumeIsolatedUnsafeUnchecked().succeed(boxed.value)
340+
case .failure(let error):
341+
promise.fail(error)
342+
}
343+
}
344+
return Scheduled(promise: promise) {
345+
scheduled.cancel()
346+
}
347+
}
348+
349+
/// This is a shim used to support older protocol-required API's without compiler warnings, and provide more modern
350+
/// concurrency-ready overloads.
351+
private struct _UncheckedSendable<T>: @unchecked Sendable {
352+
let value: T
353+
init(_ value: T) { self.value = value }
354+
}
355+
}

0 commit comments

Comments
 (0)