Skip to content

Commit fdb01da

Browse files
authored
Add ReceiveChannel.consumeTo(destination) (#4520)
Use case: collecting elements up until the point the channel is closed without losing the elements when `toList` when the exception is thrown. This function is similar to `Flow<T>.toList(destination)`, which we already have, so the addition makes sense from the point of view of consistency as well. * Remove the assumed pitfall from `consumeEach` documentation The way to trigger the pitfall that used to be described can be boiled down to this pattern: ```kotlin run { channel.consumeEach { if (channel.isEmpty) { // do something return@run } else { // do something else } } } ``` However, here, `isEmpty` is already introducing a race condition, so `consumeEach` itself does not cause any additional issues. This does not seem like a pitfall in realistic scenarios after all. `consumeEach` can perform an early return and thus erase the elements present in the channel, but this also goes for elements that possibly entered the channel long ago; without explicitly checking if the channel is empty, there is no way to distinguish these two scenarios.
1 parent 6a6e336 commit fdb01da

File tree

4 files changed

+118
-16
lines changed

4 files changed

+118
-16
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -776,6 +776,7 @@ public final class kotlinx/coroutines/channels/ChannelsKt {
776776
public static final fun consume (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
777777
public static final fun consumeEach (Lkotlinx/coroutines/channels/BroadcastChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
778778
public static final fun consumeEach (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
779+
public static final fun consumeTo (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/Collection;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
779780
public static final fun consumes (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlin/jvm/functions/Function1;
780781
public static final fun consumesAll ([Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlin/jvm/functions/Function1;
781782
public static final synthetic fun count (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;

kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1082,6 +1082,7 @@ final suspend fun <#A: kotlin/Any, #B: kotlin.collections/MutableCollection<in #
10821082
final suspend fun <#A: kotlin/Any, #B: kotlinx.coroutines.channels/SendChannel<#A>> (kotlinx.coroutines.channels/ReceiveChannel<#A?>).kotlinx.coroutines.channels/filterNotNullTo(#B): #B // kotlinx.coroutines.channels/filterNotNullTo|filterNotNullTo@kotlinx.coroutines.channels.ReceiveChannel<0:0?>(0:1){0§<kotlin.Any>;1§<kotlinx.coroutines.channels.SendChannel<0:0>>}[0]
10831083
final suspend fun <#A: kotlin/Any> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/receiveOrNull(): #A? // kotlinx.coroutines.channels/receiveOrNull|receiveOrNull@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§<kotlin.Any>}[0]
10841084
final suspend fun <#A: kotlin/Any?, #B: #A> (kotlinx.coroutines.flow/Flow<#B>).kotlinx.coroutines.flow/reduce(kotlin.coroutines/SuspendFunction2<#A, #B, #A>): #A // kotlinx.coroutines.flow/reduce|reduce@kotlinx.coroutines.flow.Flow<0:1>(kotlin.coroutines.SuspendFunction2<0:0,0:1,0:0>){0§<kotlin.Any?>;1§<0:0>}[0]
1085+
final suspend fun <#A: kotlin/Any?, #B: kotlin.collections/MutableCollection<#A>> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/consumeTo(#B): #B // kotlinx.coroutines.channels/consumeTo|consumeTo@kotlinx.coroutines.channels.ReceiveChannel<0:0>(0:1){0§<kotlin.Any?>;1§<kotlin.collections.MutableCollection<0:0>>}[0]
10851086
final suspend fun <#A: kotlin/Any?, #B: kotlin.collections/MutableCollection<in #A>> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/toCollection(#B): #B // kotlinx.coroutines.channels/toCollection|toCollection@kotlinx.coroutines.channels.ReceiveChannel<0:0>(0:1){0§<kotlin.Any?>;1§<kotlin.collections.MutableCollection<in|0:0>>}[0]
10861087
final suspend fun <#A: kotlin/Any?, #B: kotlin.collections/MutableCollection<in #A>> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/toCollection(#B): #B // kotlinx.coroutines.flow/toCollection|toCollection@kotlinx.coroutines.flow.Flow<0:0>(0:1){0§<kotlin.Any?>;1§<kotlin.collections.MutableCollection<in|0:0>>}[0]
10871088
final suspend fun <#A: kotlin/Any?, #B: kotlin/Any?, #C: kotlin.collections/MutableMap<in #A, in #B>> (kotlinx.coroutines.channels/ReceiveChannel<kotlin/Pair<#A, #B>>).kotlinx.coroutines.channels/toMap(#C): #C // kotlinx.coroutines.channels/toMap|toMap@kotlinx.coroutines.channels.ReceiveChannel<kotlin.Pair<0:0,0:1>>(0:2){0§<kotlin.Any?>;1§<kotlin.Any?>;2§<kotlin.collections.MutableMap<in|0:0,in|0:1>>}[0]

kotlinx-coroutines-core/common/src/channels/Channels.common.kt

Lines changed: 82 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
package kotlinx.coroutines.channels
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.consumeAsFlow
9+
import kotlinx.coroutines.flow.toCollection
10+
import kotlinx.coroutines.flow.toList
811
import kotlinx.coroutines.selects.*
912
import kotlin.contracts.*
1013
import kotlin.jvm.*
@@ -148,23 +151,23 @@ public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -
148151
*
149152
* In this example, several coroutines put elements into a single channel, and a single consumer processes the elements.
150153
* Once it finds the elements it's looking for, it stops [consumeEach] by making an early return.
151-
*
152-
* **Pitfall**: even though the name says "each", some elements could be left unprocessed if they are added after
153-
* this function decided to close the channel.
154-
* In this case, the elements will simply be lost.
155-
* If the elements of the channel are resources that must be closed (like file handles, sockets, etc.),
156-
* an `onUndeliveredElement` must be passed to the [Channel] on construction.
157-
* It will be called for each element left in the channel at the point of cancellation.
158154
*/
159155
public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit): Unit =
160156
consume {
161157
for (e in this) action(e)
162158
}
163159

164160
/**
165-
* Returns a [List] containing all the elements sent to this channel, preserving their order.
161+
* [Consumes][consume] the elements of this channel into a list, preserving their order.
162+
*
163+
* This is a convenience function equivalent to calling [consumeAsFlow] followed by [kotlinx.coroutines.flow.toList].
164+
* It is useful for testing code that uses channels to observe the elements the channel contains at the end of the test.
166165
*
167-
* This function will attempt to receive elements and put them into the list until the channel is
166+
* There is no way to recover channel elements if the channel gets closed with an exception
167+
* or to apply additional transformations to the elements before building the resulting collection.
168+
* Please use [consumeAsFlow] and [kotlinx.coroutines.flow.toCollection] for such advanced use-cases.
169+
*
170+
* [toList] attempts to receive elements and put them into the list until the channel is
168171
* [closed][SendChannel.close].
169172
* Calling [toList] on channels that are not eventually closed is always incorrect:
170173
* - It will suspend indefinitely if the channel is not closed, but no new elements arrive.
@@ -173,8 +176,12 @@ public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit)
173176
*
174177
* If the channel is [closed][SendChannel.close] with a cause, [toList] will rethrow that cause.
175178
*
176-
* The operation is _terminal_.
177-
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
179+
* Since this function is implemented using [consume], it is _terminal_,
180+
* meaning that [toList] will [cancel][ReceiveChannel.cancel] the channel before exiting.
181+
* A [toList] call can finish before the sender closes the channel
182+
* if it gets cancelled while waiting for the next element,
183+
* or if [MutableList.add] fails with an exception.
184+
* In both cases, the exception will be used for cancelling the channel and then rethrown.
178185
*
179186
* Example:
180187
* ```
@@ -189,11 +196,72 @@ public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit)
189196
* ```
190197
*/
191198
public suspend fun <E> ReceiveChannel<E>.toList(): List<E> = buildList {
192-
consumeEach {
193-
add(it)
194-
}
199+
consumeEach(::add)
195200
}
196201

202+
/**
203+
* [Consumes][consume] the elements of this channel into the provided mutable collection.
204+
*
205+
* This is a convenience function equivalent to calling [consumeAsFlow]
206+
* followed by [kotlinx.coroutines.flow.toCollection].
207+
* Please use [consumeAsFlow] directly in scenarios where elements should undergo additional transformations
208+
* before being added to the resulting collection.
209+
*
210+
* [consumeTo] attempts to receive elements and put them into the collection until the channel is
211+
* [closed][SendChannel.close].
212+
*
213+
* If the channel is [closed][SendChannel.close] with a cause, [consumeTo] will rethrow that cause.
214+
* However, the elements already received up to that point will remain in the collection.
215+
*
216+
* Since this function is implemented using [consume], it is _terminal_,
217+
* meaning that [consumeTo] will [cancel][ReceiveChannel.cancel] the channel before exiting.
218+
* A [consumeTo] call can finish before the sender closes the channel
219+
* if it gets cancelled while waiting for the next element,
220+
* or if [MutableCollection.add] fails with an exception.
221+
* In both cases, the exception will be used for cancelling the channel and then rethrown.
222+
*
223+
* The intended use case for this function is collecting the remaining elements of a closed channel
224+
* and processing them in a single batch.
225+
*
226+
* Example:
227+
* ```
228+
* val doContinue = AtomicBoolean(true)
229+
*
230+
* // Start the sender
231+
* val channel = produce {
232+
* var i = 0
233+
* while (doContinue.load()) {
234+
* send(i++)
235+
* delay(10.milliseconds)
236+
* if (i == 42) break
237+
* }
238+
* }
239+
*
240+
* // Start the consumer
241+
* launch {
242+
* // Read elements until we suddenly decide to stop
243+
* // or until the channel is closed.
244+
* while (Random.nextInt(100) != 0) {
245+
* val nextElement = channel.receiveCatching()
246+
* if (nextElement.isClosed) return@launch
247+
* println("Received ${nextElement.getOrNull()}")
248+
* }
249+
* delay(100.milliseconds)
250+
* doContinue.store(false)
251+
* delay(100.milliseconds)
252+
* val remainingElements = mutableListOf<Int>()
253+
* try {
254+
* channel.consumeTo(remainingElements)
255+
* } finally {
256+
* println("Remaining elements: $remainingElements")
257+
* }
258+
* }
259+
* ```
260+
*/
261+
public suspend fun <E, C: MutableCollection<E>> ReceiveChannel<E>.consumeTo(collection: C): C =
262+
consumeEach(collection::add).let { collection }
263+
264+
197265
@PublishedApi
198266
internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) {
199267
cancel(cause?.let {

kotlinx-coroutines-core/common/test/channels/ChannelsTest.kt

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,13 @@ class ChannelsTest: TestBase() {
8585
}
8686

8787
@Test
88-
fun testEmptyList() = runTest {
88+
fun testEmptyToList() = runTest {
8989
assertTrue(emptyList<Nothing>().asReceiveChannel().toList().isEmpty())
9090
}
9191

9292
@Test
9393
fun testToList() = runTest {
9494
assertEquals(testList, testList.asReceiveChannel().toList())
95-
9695
}
9796

9897
@Test
@@ -104,6 +103,39 @@ class ChannelsTest: TestBase() {
104103
}
105104
}
106105

106+
@Test
107+
fun testEmptyConsumeToWithDestination() = runTest {
108+
val initialList = listOf(-1, -2, -3)
109+
val destination = initialList.toMutableList()
110+
emptyList<Nothing>().asReceiveChannel().consumeTo(destination)
111+
assertEquals(initialList, destination)
112+
}
113+
114+
@Test
115+
fun testConsumeToWithDestination() = runTest {
116+
val initialList = listOf(-1, -2, -3)
117+
val destination = initialList.toMutableList()
118+
testList.asReceiveChannel().consumeTo(destination)
119+
assertEquals(initialList + testList, destination)
120+
}
121+
122+
@Test
123+
fun testConsumeToWithDestinationOnFailedChannel() = runTest {
124+
val initialList = listOf(-1, -2, -3)
125+
val destination = initialList.toMutableList()
126+
val channel = Channel<Int>(10)
127+
val elementsToSend = (1..5)
128+
elementsToSend.forEach {
129+
val result = channel.trySend(it)
130+
assertTrue(result.isSuccess)
131+
}
132+
channel.close(TestException())
133+
assertFailsWith<TestException> {
134+
channel.consumeTo(destination)
135+
}
136+
assertEquals(initialList + elementsToSend, destination)
137+
}
138+
107139
private fun <E> Iterable<E>.asReceiveChannel(context: CoroutineContext = Dispatchers.Unconfined): ReceiveChannel<E> =
108140
GlobalScope.produce(context) {
109141
for (element in this@asReceiveChannel)

0 commit comments

Comments
 (0)