From 5153aef1dd34dd1a6e43ef6ef1ae9806323e4d01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Quenaudon?= Date: Wed, 15 Apr 2026 07:36:59 +0100 Subject: [PATCH] Execute GrpcCalls in the provided scope --- .../kotlin/com/squareup/wire/GrpcCalls.kt | 5 ++-- .../java/com/squareup/wire/GrpcCallsTest.kt | 25 +++++++++++++++++++ 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/wire-grpc-client/src/jvmMain/kotlin/com/squareup/wire/GrpcCalls.kt b/wire-grpc-client/src/jvmMain/kotlin/com/squareup/wire/GrpcCalls.kt index 0de44d2f00..7ba9bdccba 100644 --- a/wire-grpc-client/src/jvmMain/kotlin/com/squareup/wire/GrpcCalls.kt +++ b/wire-grpc-client/src/jvmMain/kotlin/com/squareup/wire/GrpcCalls.kt @@ -21,7 +21,6 @@ import com.squareup.wire.internal.asGrpcClientStreamingCall import com.squareup.wire.internal.asGrpcServerStreamingCall import java.util.concurrent.atomic.AtomicBoolean import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ReceiveChannel @@ -117,7 +116,7 @@ fun GrpcCall(function: (S) -> R): GrpcCall { * Returns a new instance of [GrpcStreamingCall] that can be used for a single call to * [executeIn][GrpcStreamingCall.executeIn] or [executeBlocking][GrpcStreamingCall.executeBlocking]. * - * The returned instance launches [function] on [Dispatchers.IO]. The function must close the + * The returned instance launches [function] in the provided scope. The function must close the * [SendChannel] when it has no more messages to transmit. If [function] throws, both channels will * be closed using the thrown exception as a cause. * @@ -189,7 +188,7 @@ fun GrpcStreamingCall( override fun executeIn(scope: CoroutineScope): Pair, ReceiveChannel> { check(executed.compareAndSet(false, true)) { "already executed" } - val job = scope.launch(Dispatchers.IO) { + val job = scope.launch { try { function(requestChannel, responseChannel) } catch (e: Exception) { diff --git a/wire-grpc-tests/src/test/java/com/squareup/wire/GrpcCallsTest.kt b/wire-grpc-tests/src/test/java/com/squareup/wire/GrpcCallsTest.kt index 0d7463f05e..b9b79b2e52 100644 --- a/wire-grpc-tests/src/test/java/com/squareup/wire/GrpcCallsTest.kt +++ b/wire-grpc-tests/src/test/java/com/squareup/wire/GrpcCallsTest.kt @@ -19,10 +19,13 @@ import assertk.assertThat import assertk.assertions.hasMessage import assertk.assertions.isEmpty import assertk.assertions.isEqualTo +import assertk.assertions.isTrue import java.util.Locale.US import java.util.concurrent.LinkedBlockingQueue +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.ObsoleteCoroutinesApi +import kotlinx.coroutines.newSingleThreadContext import kotlinx.coroutines.runBlocking import okio.IOException import org.junit.Assert.fail @@ -242,6 +245,28 @@ class GrpcCallsTest { assertThat(log).isEmpty() } + @Test + fun executeInRespectsScope() { + val dispatcher = newSingleThreadContext("grpc-test-thread") + val scope = CoroutineScope(dispatcher) + var threadName: String? = null + + val grpcCall = GrpcStreamingCall { requests, responses -> + threadName = Thread.currentThread().name + requests.receive() + responses.close() + } + + val (send, receive) = grpcCall.executeIn(scope) + runBlocking { + send.send("hello") + receive.receiveCatching() + } + + assertThat(threadName!!.startsWith("grpc-test-thread")).isTrue() + dispatcher.close() + } + @Test fun cloneIsIndependent() { val grpcCall = GrpcCall { it.uppercase(US) }