Skip to content

Commit afd0695

Browse files
committed
Make MPSCAsyncChannel source methods nonisolated(nonsending)
`nonisolated(nonsending)` is a 6.2 language feature that allows the inheritance of the callers isolation. The new `MPSCAsyncChannel.Source` send methods should adopt this to avoid unnecessary isolation hops.
1 parent 2773d41 commit afd0695

File tree

2 files changed

+91
-1
lines changed

2 files changed

+91
-1
lines changed

.github/workflows/pull_request.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,4 @@ jobs:
2222
uses: swiftlang/github-workflows/.github/workflows/soundness.yml@main
2323
with:
2424
license_header_check_project_name: "Swift Async Algorithms"
25-
format_check_container_image: "swiftlang/swift:nightly-6.1-noble" # Needed since 6.0.x doesn't support sending keyword
25+
format_check_container_image: "swiftlang/swift:nightly-main-noble" # Needed due to https://github.com/swiftlang/swift-format/issues/1081

Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel.swift

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,95 @@ extension MultiProducerSingleConsumerAsyncChannel {
445445
}
446446
}
447447

448+
#if compiler(>=6.2)
449+
/// Send new elements to the channel.
450+
///
451+
/// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the
452+
/// first element of the provided sequence. If the channel already terminated then this method will throw an error
453+
/// indicating the failure.
454+
///
455+
/// This method returns once more elements should be produced.
456+
///
457+
/// - Parameters:
458+
/// - sequence: The elements to send to the channel.
459+
@inlinable
460+
public mutating nonisolated(nonsending) func send<S>(
461+
contentsOf sequence: consuming sending S
462+
) async throws where Element == S.Element, S: Sequence, Element: Copyable {
463+
let syncSend: (sending S, inout Self) throws -> SendResult = { try $1.send(contentsOf: $0) }
464+
let sendResult = try syncSend(sequence, &self)
465+
466+
switch consume sendResult {
467+
case .produceMore:
468+
return ()
469+
470+
case .enqueueCallback(let callbackToken):
471+
let id = callbackToken._id
472+
let storage = self._storage
473+
try await withTaskCancellationHandler {
474+
try await withUnsafeThrowingContinuation { continuation in
475+
self._storage.enqueueProducer(
476+
callbackToken: id,
477+
continuation: continuation
478+
)
479+
}
480+
} onCancel: {
481+
storage.cancelProducer(callbackToken: id)
482+
}
483+
}
484+
}
485+
486+
/// Send new element to the channel.
487+
///
488+
/// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the
489+
/// provided element. If the channel already terminated then this method will throw an error
490+
/// indicating the failure.
491+
///
492+
/// This method returns once more elements should be produced.
493+
///
494+
/// - Parameters:
495+
/// - element: The element to send to the channel.
496+
@inlinable
497+
public mutating nonisolated(nonsending) func send(_ element: consuming sending Element) async throws {
498+
let syncSend: (consuming sending Element, inout Self) throws -> SendResult = { try $1.send($0) }
499+
let sendResult = try syncSend(element, &self)
500+
501+
switch consume sendResult {
502+
case .produceMore:
503+
return ()
504+
505+
case .enqueueCallback(let callbackHandle):
506+
let id = callbackHandle._id
507+
let storage = self._storage
508+
try await withTaskCancellationHandler {
509+
try await withUnsafeThrowingContinuation { continuation in
510+
self._storage.enqueueProducer(
511+
callbackToken: id,
512+
continuation: continuation
513+
)
514+
}
515+
} onCancel: {
516+
storage.cancelProducer(callbackToken: id)
517+
}
518+
}
519+
}
520+
521+
/// Send the elements of the asynchronous sequence to the channel.
522+
///
523+
/// This method returns once the provided asynchronous sequence or the channel finished.
524+
///
525+
/// - Important: This method does not finish the source if consuming the upstream sequence terminated.
526+
///
527+
/// - Parameters:
528+
/// - sequence: The elements to send to the channel.
529+
@inlinable
530+
public mutating nonisolated(nonsending) func send<S>(contentsOf sequence: consuming sending S) async throws
531+
where Element == S.Element, S: AsyncSequence, Element: Copyable, S: Sendable, Element: Sendable {
532+
for try await element in sequence {
533+
try await self.send(contentsOf: CollectionOfOne(element))
534+
}
535+
}
536+
#else
448537
/// Send new elements to the channel.
449538
///
450539
/// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the
@@ -532,6 +621,7 @@ extension MultiProducerSingleConsumerAsyncChannel {
532621
try await self.send(contentsOf: CollectionOfOne(element))
533622
}
534623
}
624+
#endif
535625

536626
/// Indicates that the production terminated.
537627
///

0 commit comments

Comments
 (0)