Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import com.squareup.wire.internal.asGrpcClientStreamingCall
import com.squareup.wire.internal.asGrpcServerStreamingCall
import java.util.concurrent.atomic.AtomicBoolean
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
Expand Down Expand Up @@ -117,7 +116,7 @@ fun <S : Any, R : Any> GrpcCall(function: (S) -> R): GrpcCall<S, R> {
* Returns a new instance of [GrpcStreamingCall] that can be used for a single call to
* [executeIn][GrpcStreamingCall.executeIn] or [executeBlocking][GrpcStreamingCall.executeBlocking].
*
* The returned instance launches [function] on [Dispatchers.IO]. The function must close the
* The returned instance launches [function] in the provided scope. The function must close the
* [SendChannel] when it has no more messages to transmit. If [function] throws, both channels will
* be closed using the thrown exception as a cause.
*
Expand Down Expand Up @@ -189,7 +188,7 @@ fun <S : Any, R : Any> GrpcStreamingCall(
override fun executeIn(scope: CoroutineScope): Pair<SendChannel<S>, ReceiveChannel<R>> {
check(executed.compareAndSet(false, true)) { "already executed" }

val job = scope.launch(Dispatchers.IO) {
val job = scope.launch {
try {
function(requestChannel, responseChannel)
} catch (e: Exception) {
Expand Down
25 changes: 25 additions & 0 deletions wire-grpc-tests/src/test/java/com/squareup/wire/GrpcCallsTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ import assertk.assertThat
import assertk.assertions.hasMessage
import assertk.assertions.isEmpty
import assertk.assertions.isEqualTo
import assertk.assertions.isTrue
import java.util.Locale.US
import java.util.concurrent.LinkedBlockingQueue
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.ObsoleteCoroutinesApi
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.runBlocking
import okio.IOException
import org.junit.Assert.fail
Expand Down Expand Up @@ -242,6 +245,28 @@ class GrpcCallsTest {
assertThat(log).isEmpty()
}

@Test
fun executeInRespectsScope() {
val dispatcher = newSingleThreadContext("grpc-test-thread")
val scope = CoroutineScope(dispatcher)
var threadName: String? = null

val grpcCall = GrpcStreamingCall<String, String> { requests, responses ->
threadName = Thread.currentThread().name
requests.receive()
responses.close()
}

val (send, receive) = grpcCall.executeIn(scope)
runBlocking {
send.send("hello")
receive.receiveCatching()
}

assertThat(threadName!!.startsWith("grpc-test-thread")).isTrue()
dispatcher.close()
}

@Test
fun cloneIsIndependent() {
val grpcCall = GrpcCall<String, String> { it.uppercase(US) }
Expand Down
Loading