Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ jobs:
uses: swiftlang/github-workflows/.github/workflows/soundness.yml@main
with:
license_header_check_project_name: "Swift Async Algorithms"
format_check_container_image: "swiftlang/swift:nightly-6.1-noble" # Needed since 6.0.x doesn't support sending keyword
format_check_container_image: "swiftlang/swift:nightly-main-noble" # Needed due to https://github.com/swiftlang/swift-format/issues/1081
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,95 @@ extension MultiProducerSingleConsumerAsyncChannel {
}
}

#if compiler(>=6.2)
/// Send new elements to the channel.
///
/// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the
/// first element of the provided sequence. If the channel already terminated then this method will throw an error
/// indicating the failure.
///
/// This method returns once more elements should be produced.
///
/// - Parameters:
/// - sequence: The elements to send to the channel.
@inlinable
public mutating nonisolated(nonsending) func send<S>(
contentsOf sequence: consuming sending S
) async throws where Element == S.Element, S: Sequence, Element: Copyable {
let syncSend: (sending S, inout Self) throws -> SendResult = { try $1.send(contentsOf: $0) }
let sendResult = try syncSend(sequence, &self)

switch consume sendResult {
case .produceMore:
return ()

case .enqueueCallback(let callbackToken):
let id = callbackToken._id
let storage = self._storage
try await withTaskCancellationHandler {
try await withUnsafeThrowingContinuation { continuation in
self._storage.enqueueProducer(
callbackToken: id,
continuation: continuation
)
}
} onCancel: {
storage.cancelProducer(callbackToken: id)
}
}
}

/// Send new element to the channel.
///
/// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the
/// provided element. If the channel already terminated then this method will throw an error
/// indicating the failure.
///
/// This method returns once more elements should be produced.
///
/// - Parameters:
/// - element: The element to send to the channel.
@inlinable
public mutating nonisolated(nonsending) func send(_ element: consuming sending Element) async throws {
let syncSend: (consuming sending Element, inout Self) throws -> SendResult = { try $1.send($0) }
let sendResult = try syncSend(element, &self)

switch consume sendResult {
case .produceMore:
return ()

case .enqueueCallback(let callbackHandle):
let id = callbackHandle._id
let storage = self._storage
try await withTaskCancellationHandler {
try await withUnsafeThrowingContinuation { continuation in
self._storage.enqueueProducer(
callbackToken: id,
continuation: continuation
)
}
} onCancel: {
storage.cancelProducer(callbackToken: id)
}
}
}

/// Send the elements of the asynchronous sequence to the channel.
///
/// This method returns once the provided asynchronous sequence or the channel finished.
///
/// - Important: This method does not finish the source if consuming the upstream sequence terminated.
///
/// - Parameters:
/// - sequence: The elements to send to the channel.
@inlinable
public mutating nonisolated(nonsending) func send<S>(contentsOf sequence: consuming sending S) async throws
where Element == S.Element, S: AsyncSequence, Element: Copyable, S: Sendable, Element: Sendable {
for try await element in sequence {
try await self.send(contentsOf: CollectionOfOne(element))
}
}
#else
/// Send new elements to the channel.
///
/// If there is a task consuming the channel and awaiting the next element then the task will get resumed with the
Expand Down Expand Up @@ -532,6 +621,7 @@ extension MultiProducerSingleConsumerAsyncChannel {
try await self.send(contentsOf: CollectionOfOne(element))
}
}
#endif

/// Indicates that the production terminated.
///
Expand Down
Loading