From afd06956ed9cc268399ed9c6641f0a22129327f8 Mon Sep 17 00:00:00 2001 From: Franz Busch Date: Thu, 27 Nov 2025 11:12:15 +0100 Subject: [PATCH] Make `MPSCAsyncChannel` source methods `nonisolated(nonsending)` `nonisolated(nonsending)` is a 6.2 language feature that allows the inheritance of the callers isolation. The new `MPSCAsyncChannel.Source` send methods should adopt this to avoid unnecessary isolation hops. --- .github/workflows/pull_request.yml | 2 +- ...tiProducerSingleConsumerAsyncChannel.swift | 90 +++++++++++++++++++ 2 files changed, 91 insertions(+), 1 deletion(-) diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index e4804029..8325fa8e 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -22,4 +22,4 @@ jobs: uses: swiftlang/github-workflows/.github/workflows/soundness.yml@main with: license_header_check_project_name: "Swift Async Algorithms" - format_check_container_image: "swiftlang/swift:nightly-6.1-noble" # Needed since 6.0.x doesn't support sending keyword + format_check_container_image: "swiftlang/swift:nightly-main-noble" # Needed due to https://github.com/swiftlang/swift-format/issues/1081 diff --git a/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel.swift b/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel.swift index 131946fd..acc9a72a 100644 --- a/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel.swift +++ b/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel.swift @@ -445,6 +445,95 @@ extension MultiProducerSingleConsumerAsyncChannel { } } + #if compiler(>=6.2) + /// Send new elements to the channel. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// first element of the provided sequence. If the channel already terminated then this method will throw an error + /// indicating the failure. + /// + /// This method returns once more elements should be produced. + /// + /// - Parameters: + /// - sequence: The elements to send to the channel. + @inlinable + public mutating nonisolated(nonsending) func send( + contentsOf sequence: consuming sending S + ) async throws where Element == S.Element, S: Sequence, Element: Copyable { + let syncSend: (sending S, inout Self) throws -> SendResult = { try $1.send(contentsOf: $0) } + let sendResult = try syncSend(sequence, &self) + + switch consume sendResult { + case .produceMore: + return () + + case .enqueueCallback(let callbackToken): + let id = callbackToken._id + let storage = self._storage + try await withTaskCancellationHandler { + try await withUnsafeThrowingContinuation { continuation in + self._storage.enqueueProducer( + callbackToken: id, + continuation: continuation + ) + } + } onCancel: { + storage.cancelProducer(callbackToken: id) + } + } + } + + /// Send new element to the channel. + /// + /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the + /// provided element. If the channel already terminated then this method will throw an error + /// indicating the failure. + /// + /// This method returns once more elements should be produced. + /// + /// - Parameters: + /// - element: The element to send to the channel. + @inlinable + public mutating nonisolated(nonsending) func send(_ element: consuming sending Element) async throws { + let syncSend: (consuming sending Element, inout Self) throws -> SendResult = { try $1.send($0) } + let sendResult = try syncSend(element, &self) + + switch consume sendResult { + case .produceMore: + return () + + case .enqueueCallback(let callbackHandle): + let id = callbackHandle._id + let storage = self._storage + try await withTaskCancellationHandler { + try await withUnsafeThrowingContinuation { continuation in + self._storage.enqueueProducer( + callbackToken: id, + continuation: continuation + ) + } + } onCancel: { + storage.cancelProducer(callbackToken: id) + } + } + } + + /// Send the elements of the asynchronous sequence to the channel. + /// + /// This method returns once the provided asynchronous sequence or the channel finished. + /// + /// - Important: This method does not finish the source if consuming the upstream sequence terminated. + /// + /// - Parameters: + /// - sequence: The elements to send to the channel. + @inlinable + public mutating nonisolated(nonsending) func send(contentsOf sequence: consuming sending S) async throws + where Element == S.Element, S: AsyncSequence, Element: Copyable, S: Sendable, Element: Sendable { + for try await element in sequence { + try await self.send(contentsOf: CollectionOfOne(element)) + } + } + #else /// Send new elements to the channel. /// /// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the @@ -532,6 +621,7 @@ extension MultiProducerSingleConsumerAsyncChannel { try await self.send(contentsOf: CollectionOfOne(element)) } } + #endif /// Indicates that the production terminated. ///