From 100dd0986f8b92c7a125e5054c52e3b20d6216d6 Mon Sep 17 00:00:00 2001 From: Scott Marchant Date: Fri, 14 Nov 2025 14:45:57 -0700 Subject: [PATCH] feat: Implement NIOThreadPool using swift concurrency --- Sources/NIOAsyncRuntime/NIOThreadPool.swift | 286 ++++++++++++++++++++ 1 file changed, 286 insertions(+) create mode 100644 Sources/NIOAsyncRuntime/NIOThreadPool.swift diff --git a/Sources/NIOAsyncRuntime/NIOThreadPool.swift b/Sources/NIOAsyncRuntime/NIOThreadPool.swift new file mode 100644 index 0000000..2a164a5 --- /dev/null +++ b/Sources/NIOAsyncRuntime/NIOThreadPool.swift @@ -0,0 +1,286 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2025 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import DequeModule +import NIOConcurrencyHelpers + +import class Atomics.ManagedAtomic +import protocol NIOCore.EventLoop +import class NIOCore.EventLoopFuture +import enum NIOCore.System + +/// Errors that may be thrown when executing work on a `NIOThreadPool`. +public enum NIOThreadPoolError: Sendable { + public struct ThreadPoolInactive: Error { + public init() {} + } + + public struct UnsupportedOperation: Error { + public init() {} + } +} + +/// Drop‑in stand‑in for `NIOThreadPool`, powered by Swift Concurrency. +@available(macOS 10.15, *) +public final class NIOThreadPool: @unchecked Sendable { + /// The state of the `WorkItem`. + public enum WorkItemState: Sendable { + /// The work item is currently being executed. + case active + /// The work item has been cancelled and will not run. + case cancelled + } + + /// The work that should be done by the thread pool. + public typealias WorkItem = @Sendable (WorkItemState) -> Void + + @usableFromInline + struct IdentifiableWorkItem: Sendable { + @usableFromInline var workItem: WorkItem + @usableFromInline var id: Int? + } + + private let shutdownFlag = ManagedAtomic(false) + private let started = ManagedAtomic(false) + private let numberOfThreads: Int + private let workQueue = WorkQueue() + private let workerTasksLock = NIOLock() + private var workerTasks: [Task] = [] + + public init(numberOfThreads: Int? = nil) { + let threads = numberOfThreads ?? System.coreCount + self.numberOfThreads = max(1, threads) + } + + public func start() { + startWorkersIfNeeded() + } + + private var isActive: Bool { + self.started.load(ordering: .acquiring) && !self.shutdownFlag.load(ordering: .acquiring) + } + + // MARK: - Public API - + + public func submit(_ body: @escaping WorkItem) { + guard self.isActive else { + body(.cancelled) + return + } + + startWorkersIfNeeded() + + Task { + await self.workQueue.enqueue(IdentifiableWorkItem(workItem: body, id: nil)) + } + } + + @preconcurrency + public func submit(on eventLoop: EventLoop, _ fn: @escaping @Sendable () throws -> T) + -> EventLoopFuture + { + self.submit(on: eventLoop) { () throws -> _UncheckedSendable in + _UncheckedSendable(try fn()) + }.map { $0.value } + } + + public func submit( + on eventLoop: EventLoop, + _ fn: @escaping @Sendable () throws -> T + ) -> EventLoopFuture { + self.makeFutureByRunningOnPool(eventLoop: eventLoop, fn) + } + + /// Async helper mirroring `runIfActive` without an EventLoop context. + public func runIfActive(_ body: @escaping @Sendable () throws -> T) async throws -> T + { + try Task.checkCancellation() + guard self.isActive else { throw CancellationError() } + + return try await Task { + try Task.checkCancellation() + guard self.isActive else { throw CancellationError() } + return try body() + }.value + } + + /// Event‑loop variant returning only the future. + @preconcurrency + public func runIfActive(eventLoop: EventLoop, _ body: @escaping @Sendable () throws -> T) + -> EventLoopFuture + { + self.runIfActive(eventLoop: eventLoop) { () throws -> _UncheckedSendable in + _UncheckedSendable(try body()) + }.map { $0.value } + } + + public func runIfActive( + eventLoop: EventLoop, + _ body: @escaping @Sendable () throws -> T + ) -> EventLoopFuture { + self.makeFutureByRunningOnPool(eventLoop: eventLoop, body) + } + + private func makeFutureByRunningOnPool( + eventLoop: EventLoop, + _ body: @escaping @Sendable () throws -> T + ) -> EventLoopFuture { + guard self.isActive else { + return eventLoop.makeFailedFuture(NIOThreadPoolError.ThreadPoolInactive()) + } + + let promise = eventLoop.makePromise(of: T.self) + self.submit { state in + switch state { + case .active: + do { + let value = try body() + promise.succeed(value) + } catch { + promise.fail(error) + } + case .cancelled: + promise.fail(NIOThreadPoolError.ThreadPoolInactive()) + } + } + return promise.futureResult + } + + // Lifecycle -------------------------------------------------------------- + + public static let singleton: NIOThreadPool = { + let pool = NIOThreadPool() + pool.start() + return pool + }() + + @preconcurrency + public func shutdownGracefully(_ callback: @escaping @Sendable (Error?) -> Void = { _ in }) { + _shutdownGracefully { + callback(nil) + } + } + + @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) + public func shutdownGracefully() async throws { + try await withCheckedThrowingContinuation { continuation in + _shutdownGracefully { + continuation.resume(returning: ()) + } + } + } + + private func _shutdownGracefully(completion: (@Sendable () -> Void)? = nil) { + if shutdownFlag.exchange(true, ordering: .acquiring) { + completion?() + return + } + + Task { + let remaining = await workQueue.shutdown() + for item in remaining { + item.workItem(.cancelled) + } + + workerTasksLock.withLock { + for worker in workerTasks { + worker.cancel() + } + workerTasks.removeAll() + } + + started.store(false, ordering: .releasing) + completion?() + } + } + + // MARK: - Worker infrastructure + + private func startWorkersIfNeeded() { + if self.shutdownFlag.load(ordering: .acquiring) { + return + } + + if self.started.compareExchange(expected: false, desired: true, ordering: .acquiring).exchanged + { + spawnWorkers() + } + } + + private func spawnWorkers() { + workerTasksLock.withLock { + guard workerTasks.isEmpty else { return } + for index in 0..() + private var waiters: [CheckedContinuation] = [] + private var isShuttingDown = false + + func enqueue(_ item: IdentifiableWorkItem) { + if let continuation = waiters.popLast() { + continuation.resume(returning: item) + } else { + queue.append(item) + } + } + + func nextWorkItem(shutdownFlag: ManagedAtomic) async -> IdentifiableWorkItem? { + if !queue.isEmpty { + return queue.removeFirst() + } + + if isShuttingDown || shutdownFlag.load(ordering: .acquiring) { + return nil + } + + return await withCheckedContinuation { continuation in + waiters.append(continuation) + } + } + + func shutdown() -> [IdentifiableWorkItem] { + isShuttingDown = true + let remaining = Array(queue) + queue.removeAll() + while let waiter = waiters.popLast() { + waiter.resume(returning: nil) + } + return remaining + } + } + + private struct _UncheckedSendable: @unchecked Sendable { + let value: T + init(_ value: T) { self.value = value } + } +}