diff --git a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/GrpcClient.kt b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/GrpcClient.kt index 0e0426d6c..d08fd0797 100644 --- a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/GrpcClient.kt +++ b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/GrpcClient.kt @@ -8,9 +8,9 @@ import kotlinx.coroutines.flow.Flow import kotlinx.rpc.RpcCall import kotlinx.rpc.RpcClient import kotlinx.rpc.grpc.GrpcMetadata -import kotlinx.rpc.grpc.client.GrpcCallOptions import kotlinx.rpc.grpc.client.internal.ManagedChannel import kotlinx.rpc.grpc.client.internal.ManagedChannelBuilder +import kotlinx.rpc.grpc.client.internal.applyConfig import kotlinx.rpc.grpc.client.internal.bidirectionalStreamingRpc import kotlinx.rpc.grpc.client.internal.buildChannel import kotlinx.rpc.grpc.client.internal.clientStreamingRpc @@ -27,6 +27,7 @@ import kotlinx.rpc.grpc.descriptor.MethodType import kotlinx.rpc.grpc.descriptor.methodType import kotlinx.rpc.internal.utils.map.RpcInternalConcurrentHashMap import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds private typealias RequestClient = Any @@ -180,9 +181,7 @@ private fun GrpcClient( builder: ManagedChannelBuilder<*>, config: GrpcClientConfiguration, ): GrpcClient { - val channel = builder.apply { - config.overrideAuthority?.let { overrideAuthority(it) } - }.buildChannel() + val channel = builder.applyConfig(config).buildChannel() return GrpcClient(channel, config.messageCodecResolver, config.interceptors) } @@ -198,6 +197,7 @@ private fun GrpcClient( */ public class GrpcClientConfiguration internal constructor() { internal val interceptors: MutableList = mutableListOf() + internal var keepAlive: KeepAlive? = null /** * Configurable resolver used to determine the appropriate codec for a given Kotlin type @@ -294,4 +294,65 @@ public class GrpcClientConfiguration internal constructor() { public fun tls(configure: TlsClientCredentialsBuilder.() -> Unit): ClientCredentials = TlsClientCredentials(configure) -} \ No newline at end of file + /** + * Configures keep-alive settings for the gRPC client. + * + * Keep-alive allows you to fine-tune the behavior of the client to ensure the connection + * between the client and server remains active according to specific parameters. + * + * By default, keep-alive is disabled. + * + * ``` + * GrpcClient("localhost", 50051) { + * keepAlive { + * time = 10.seconds + * timeout = 20.seconds + * withoutCalls = false + * } + * } + * ``` + * + * @param configure A lambda to apply custom configurations to the [KeepAlive] instance. + * The [KeepAlive] settings include: + * - `time`: The maximum amount of time that the channel can be idle before a keep-alive + * ping is sent. + * - `timeout`: The time allowed for a keep-alive ping to complete. + * - `withoutCalls`: Whether to send keep-alive pings even when there are no outstanding + * RPCs on the connection. + * + * @see KeepAlive + */ + public fun keepAlive(configure: KeepAlive.() -> Unit) { + keepAlive = KeepAlive().apply(configure) + } + + /** + * Represents keep-alive settings for a gRPC client connection. + * + * Keep-alive ensures that the connection between the client and the server remains active. + * It helps detect connection issues proactively before a request is made and facilitates + * maintaining long-lived idle connections. + * + * Client authors must coordinate with service owners for whether a particular client-side + * setting is acceptable. + * + * @property time Specifies the maximum amount of time the channel can remain idle before a + * keep-alive ping is sent to the server to check the connection state. + * The default value is `Duration.INFINITE`, which disables keep-alive pings when idle. + * + * @property timeout Sets the amount of time to wait for a keep-alive ping response. + * If the server does not respond within this timeout, the connection will be considered broken. + * The default value is 20 seconds. + * + * @property withoutCalls Defines whether keep-alive pings will be sent even when there + * are no active RPCs on the connection. If set to `true`, pings will be sent regardless + * of ongoing calls; otherwise, pings are only sent during active RPCs. + * The default value is `false`. + */ + public class KeepAlive internal constructor() { + public var time: Duration = Duration.INFINITE + public var timeout: Duration = 20.seconds + public var withoutCalls: Boolean = false + } +} + diff --git a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.kt b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.kt index c6239c781..82b782985 100644 --- a/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.kt +++ b/grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.kt @@ -7,6 +7,7 @@ package kotlinx.rpc.grpc.client.internal import kotlinx.rpc.grpc.client.ClientCredentials +import kotlinx.rpc.grpc.client.GrpcClientConfiguration import kotlinx.rpc.internal.utils.InternalRpcApi import kotlin.time.Duration @@ -71,9 +72,7 @@ public interface ManagedChannel { * Builder class for [ManagedChannel]. */ @InternalRpcApi -public expect abstract class ManagedChannelBuilder> { - public abstract fun overrideAuthority(authority: String): T -} +public expect abstract class ManagedChannelBuilder> @InternalRpcApi public expect fun ManagedChannelBuilder( @@ -88,5 +87,7 @@ public expect fun ManagedChannelBuilder( credentials: ClientCredentials? = null, ): ManagedChannelBuilder<*> +internal expect fun ManagedChannelBuilder<*>.applyConfig(config: GrpcClientConfiguration): ManagedChannelBuilder<*> + @InternalRpcApi public expect fun ManagedChannelBuilder<*>.buildChannel(): ManagedChannel diff --git a/grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.jvm.kt b/grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.jvm.kt index 71d2d4345..509f467bb 100644 --- a/grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.jvm.kt +++ b/grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.jvm.kt @@ -10,6 +10,7 @@ import io.grpc.Grpc import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext import kotlinx.rpc.grpc.client.ClientCredentials +import kotlinx.rpc.grpc.client.GrpcClientConfiguration import kotlinx.rpc.internal.utils.InternalRpcApi import java.util.concurrent.TimeUnit import kotlin.time.Duration @@ -80,3 +81,14 @@ private class JvmManagedChannel(private val channel: io.grpc.ManagedChannel) : M override val platformApi: ManagedChannelPlatform get() = channel } + +internal actual fun ManagedChannelBuilder<*>.applyConfig(config: GrpcClientConfiguration): ManagedChannelBuilder<*> { + config.keepAlive?.let { + keepAliveTime(it.time.inWholeMilliseconds, TimeUnit.MILLISECONDS) + keepAliveTimeout(it.timeout.inWholeMilliseconds, TimeUnit.MILLISECONDS) + keepAliveWithoutCalls(it.withoutCalls) + } + + config.overrideAuthority?.let { overrideAuthority(it) } + return this +} \ No newline at end of file diff --git a/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.native.kt b/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.native.kt index a1eb2c987..baad4661d 100644 --- a/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.native.kt +++ b/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.native.kt @@ -7,6 +7,7 @@ package kotlinx.rpc.grpc.client.internal import kotlinx.rpc.grpc.client.ClientCredentials +import kotlinx.rpc.grpc.client.GrpcClientConfiguration import kotlinx.rpc.grpc.client.TlsClientCredentials import kotlinx.rpc.grpc.internal.internalError import kotlinx.rpc.internal.utils.InternalRpcApi @@ -22,25 +23,23 @@ public actual abstract class ManagedChannelPlatform : GrpcChannel() */ @InternalRpcApi public actual abstract class ManagedChannelBuilder> { - public actual abstract fun overrideAuthority(authority: String): T + internal var config: GrpcClientConfiguration? = null } internal class NativeManagedChannelBuilder( private val target: String, private var credentials: Lazy, ) : ManagedChannelBuilder() { - - private var authority: String? = null - - override fun overrideAuthority(authority: String): NativeManagedChannelBuilder { - this.authority = authority - return this - } - fun buildChannel(): NativeManagedChannel { + val keepAlive = config?.keepAlive + keepAlive?.run { + require(time.isPositive()) { "keepalive time must be positive" } + require(timeout.isPositive()) { "keepalive timeout must be positive" } + } return NativeManagedChannel( target, - authority = authority, + authority = config?.overrideAuthority, + keepAlive = config?.keepAlive, credentials = credentials.value, ) } @@ -69,4 +68,7 @@ public actual fun ManagedChannelBuilder(target: String, credentials: ClientCrede return NativeManagedChannelBuilder(target, credentials) } - +internal actual fun ManagedChannelBuilder<*>.applyConfig(config: GrpcClientConfiguration): ManagedChannelBuilder<*> { + this.config = config + return this +} \ No newline at end of file diff --git a/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeManagedChannel.kt b/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeManagedChannel.kt index 5cf6563b9..fad1b9371 100644 --- a/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeManagedChannel.kt +++ b/grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeManagedChannel.kt @@ -10,7 +10,10 @@ import cnames.structs.grpc_channel import kotlinx.atomicfu.atomic import kotlinx.cinterop.CPointer import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.cinterop.MemScope import kotlinx.cinterop.alloc +import kotlinx.cinterop.allocArray +import kotlinx.cinterop.convert import kotlinx.cinterop.cstr import kotlinx.cinterop.memScoped import kotlinx.cinterop.ptr @@ -21,6 +24,7 @@ import kotlinx.coroutines.cancelChildren import kotlinx.coroutines.withTimeoutOrNull import kotlinx.rpc.grpc.client.ClientCredentials import kotlinx.rpc.grpc.client.GrpcCallOptions +import kotlinx.rpc.grpc.client.GrpcClientConfiguration import kotlinx.rpc.grpc.client.rawDeadline import kotlinx.rpc.grpc.descriptor.MethodDescriptor import kotlinx.rpc.grpc.internal.CompletionQueue @@ -50,6 +54,7 @@ import kotlin.time.Duration internal class NativeManagedChannel( target: String, val authority: String?, + val keepAlive: GrpcClientConfiguration.KeepAlive?, // we must store them, otherwise the credentials are getting released credentials: ClientCredentials, ) : ManagedChannel, ManagedChannelPlatform() { @@ -66,22 +71,36 @@ internal class NativeManagedChannel( private val cq = CompletionQueue() internal val raw: CPointer = memScoped { - val args = authority?.let { + val args = mutableListOf() + + authority?.let { // the C Core API doesn't have a way to override the authority (used for TLS SNI) as it // is available in the Java gRPC implementation. // instead, it can be done by setting the "grpc.ssl_target_name_override" argument. - val authorityOverride = alloc { - type = grpc_arg_type.GRPC_ARG_STRING - key = "grpc.ssl_target_name_override".cstr.ptr - value.string = authority.cstr.ptr - } + args.add(GrpcArg.Str( + key = "grpc.ssl_target_name_override", + value = it + )) + } - alloc { - num_args = 1u - args = authorityOverride.ptr - } + keepAlive?.let { + args.add(GrpcArg.Integer( + key = "grpc.keepalive_time_ms", + value = it.time.inWholeMilliseconds.convert() + )) + args.add(GrpcArg.Integer( + key = "grpc.keepalive_timeout_ms", + value = it.timeout.inWholeMilliseconds.convert() + )) + args.add(GrpcArg.Integer( + key = "grpc.keepalive_permit_without_calls", + value = if (it.withoutCalls) 1 else 0 + )) } - grpc_channel_create(target, credentials.raw, args?.ptr) + + var rawArgs = if (args.isNotEmpty()) args.toRaw(this) else null + + grpc_channel_create(target, credentials.raw, rawArgs?.ptr) ?: error("Failed to create channel") } @@ -170,3 +189,33 @@ internal class NativeManagedChannel( } } + +internal sealed class GrpcArg(val key: String) { + internal class Str(key: String, val value: String) : GrpcArg(key) + internal class Integer(key: String, val value: Int) : GrpcArg(key) + + internal val rawType: grpc_arg_type + get() = when (this) { + is Str -> grpc_arg_type.GRPC_ARG_STRING + is Integer -> grpc_arg_type.GRPC_ARG_INTEGER + } +} + +private fun List.toRaw(memScope: MemScope): grpc_channel_args { + with(memScope) { + val arr = allocArray(size) { + val arg = get(it) + type = arg.rawType + key = arg.key.cstr.ptr + when (arg) { + is GrpcArg.Str -> value.string = arg.value.cstr.ptr + is GrpcArg.Integer -> value.integer = arg.value.convert() + } + } + + return alloc { + num_args = size.convert() + args = arr + } + } +} \ No newline at end of file diff --git a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcCompressionTest.kt b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcCompressionTest.kt index e14276919..4b538a357 100644 --- a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcCompressionTest.kt +++ b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcCompressionTest.kt @@ -8,7 +8,6 @@ import kotlinx.coroutines.test.runTest import kotlinx.rpc.RpcServer import kotlinx.rpc.grpc.GrpcCompression import kotlinx.rpc.grpc.GrpcMetadata -import kotlinx.rpc.grpc.Status import kotlinx.rpc.grpc.StatusCode import kotlinx.rpc.grpc.get import kotlinx.rpc.grpc.keys @@ -18,11 +17,9 @@ import kotlinx.rpc.grpc.test.EchoServiceImpl import kotlinx.rpc.grpc.test.Runtime import kotlinx.rpc.grpc.test.assertContainsAll import kotlinx.rpc.grpc.test.assertGrpcFailure -import kotlinx.rpc.grpc.test.captureStdErr -import kotlinx.rpc.grpc.test.clearNativeEnv +import kotlinx.rpc.grpc.test.captureGrpcLogs import kotlinx.rpc.grpc.test.invoke import kotlinx.rpc.grpc.test.runtime -import kotlinx.rpc.grpc.test.setNativeEnv import kotlinx.rpc.registerService import kotlinx.rpc.withService import kotlin.collections.emptyList @@ -101,7 +98,10 @@ class GrpcCompressionTest : GrpcProtoTest() { ) { var reqHeaders = emptyMap() var respHeaders = emptyMap() - val logs = captureNativeGrpcLogs { + val logs = captureGrpcLogs( + nativeTracers = listOf("compression", "http"), + jvmLoggers = emptyList(), + ) { runGrpcTest( clientInterceptors = clientInterceptor { clientCompression?.let { compression -> @@ -149,17 +149,6 @@ class GrpcCompressionTest : GrpcProtoTest() { assertContainsAll(listOf("gzip"), respHeaders.grpcAcceptEncoding()) } - private suspend fun captureNativeGrpcLogs(block: suspend () -> Unit): String { - try { - return captureStdErr { - setNativeEnv("GRPC_TRACE", "compression,http") - block() - } - } finally { - clearNativeEnv("GRPC_GRACE") - } - } - private fun GrpcMetadata.toMap(): Map { return keys().mapNotNull { key -> if (!key.endsWith("-bin")) { diff --git a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcKeepAliveTest.kt b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcKeepAliveTest.kt new file mode 100644 index 000000000..9f7ab2df6 --- /dev/null +++ b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcKeepAliveTest.kt @@ -0,0 +1,74 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.rpc.grpc.test.proto + +import kotlinx.rpc.RpcServer +import kotlinx.rpc.grpc.test.EchoService +import kotlinx.rpc.grpc.test.EchoServiceImpl +import kotlinx.rpc.registerService +import kotlin.test.Test +import kotlin.test.assertContains +import kotlin.test.assertFailsWith +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds + +/** + * Tests that the client can configure the compression of requests. + * + * This test is hard to realize on native, as the gRPC-Core doesn't expose internal headers like + * `grpc-encoding` to the user application. This means we cannot verify that the client or sever + * actually sent those headers on native. Instead, we capture the grpc trace output (written to stderr) + * and verify that the client and server actually used the compression algorithm. + */ +class GrpcKeepAliveTest : GrpcProtoTest() { + override fun RpcServer.registerServices() { + return registerService { EchoServiceImpl() } + } + + @Test + fun `test keepalive set - should propagate settings to core libraries`() = testKeepAlive( + time = 15.seconds, + timeout = 5.seconds, + withoutCalls = true, + ) + + @Test + fun `test keepalive negative time - should fail`() { + val error = assertFailsWith { + runGrpcTest( + configure = { + keepAlive { + this.time = (-1).seconds + } + } + ) { + // not reached + } + } + assertContains(error.message!!, "keepalive time must be positive") + } + + @Test + fun `test keepalive negative timeout - should fail`() { + val error = assertFailsWith { + runGrpcTest( + configure = { + keepAlive { + this.timeout = (-1).seconds + } + } + ) { + // not reached + } + } + assertContains(error.message!!, "keepalive timeout must be positive") + } +} + +expect fun GrpcProtoTest.testKeepAlive( + time: Duration, + timeout: Duration, + withoutCalls: Boolean, +) diff --git a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcProtoTest.kt b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcProtoTest.kt index 231d4cdaf..a379f2d97 100644 --- a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcProtoTest.kt +++ b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcProtoTest.kt @@ -5,6 +5,7 @@ package kotlinx.rpc.grpc.test.proto import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.test.runTest @@ -13,6 +14,7 @@ import kotlinx.rpc.grpc.client.ClientCallScope import kotlinx.rpc.grpc.client.ClientCredentials import kotlinx.rpc.grpc.client.ClientInterceptor import kotlinx.rpc.grpc.client.GrpcClient +import kotlinx.rpc.grpc.client.GrpcClientConfiguration import kotlinx.rpc.grpc.server.GrpcServer import kotlinx.rpc.grpc.server.ServerCallScope import kotlinx.rpc.grpc.server.ServerCredentials @@ -23,12 +25,13 @@ abstract class GrpcProtoTest { abstract fun RpcServer.registerServices() - protected fun runGrpcTest( + fun runGrpcTest( serverCreds: ServerCredentials? = null, clientCreds: ClientCredentials? = null, overrideAuthority: String? = null, clientInterceptors: List = emptyList(), serverInterceptors: List = emptyList(), + configure: GrpcClientConfiguration.() -> Unit = {}, test: suspend (GrpcClient) -> Unit, ) = runTest { serverMutex.withLock { @@ -36,6 +39,7 @@ abstract class GrpcProtoTest { credentials = clientCreds ?: plaintext() if (overrideAuthority != null) this.overrideAuthority = overrideAuthority clientInterceptors.forEach { intercept(it) } + configure() } val grpcServer = GrpcServer( diff --git a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcTimeoutTest.kt b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcTimeoutTest.kt index 8b51571ae..759433626 100644 --- a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcTimeoutTest.kt +++ b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcTimeoutTest.kt @@ -74,24 +74,6 @@ class GrpcTimeoutTest : GrpcProtoTest() { } } - @Test - fun `test timeout applies to actual call duration not just processing time`() { - val exc = assertFailsWith { - runGrpcTest( - clientInterceptors = clientInterceptor { - // Set timeout before making the call - callOptions.timeout = 500.milliseconds - proceed(it) - } - ) { - // Server delays for 1 second - val request = EchoRequest { message = "Echo"; timeout = 500u } - it.withService().UnaryEcho(request) - } - } - assertEquals(StatusCode.DEADLINE_EXCEEDED, exc.getStatus().statusCode) - } - @Test fun `test timeout set to very short milliseconds triggers immediately`() { val exc = assertFailsWith { @@ -101,27 +83,10 @@ class GrpcTimeoutTest : GrpcProtoTest() { proceed(it) } ) { - // Even with no server delay, 0ms timeout should trigger - val request = EchoRequest { message = "Echo"; timeout = 0u } + val request = EchoRequest { message = "Echo"; timeout = 2u } it.withService().UnaryEcho(request) } } assertEquals(StatusCode.DEADLINE_EXCEEDED, exc.getStatus().statusCode) } - - @Test - fun `test timeout boundary condition - call completes just before timeout`() { - runGrpcTest( - clientInterceptors = clientInterceptor { - callOptions.timeout = 2.seconds - proceed(it) - } - ) { - // Server delays for slightly less than timeout - val request = EchoRequest { message = "Just in time"; timeout = 1800u } - val response = it.withService().UnaryEcho(request) - assertEquals("Just in time", response.message) - } - } - } \ No newline at end of file diff --git a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/ServerInterceptorTest.kt b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/ServerInterceptorTest.kt index 973321dd3..3ac0b3902 100644 --- a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/ServerInterceptorTest.kt +++ b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/ServerInterceptorTest.kt @@ -37,21 +37,6 @@ class ServerInterceptorTest : GrpcProtoTest() { registerService { EchoServiceImpl() } } - @Test - fun `throw during onClosing - should fail propagate the exception to the server root`() { - val error = assertFailsWith { - val interceptor = interceptor { - onClose { _, _ -> throw IllegalStateException("Illegal failing in onClose") } - proceed(it) - } - runGrpcTest(serverInterceptors = interceptor, test = ::unaryCall) - } - - assertContains(error.message!!, "Illegal failing in onClose") - // check that the error is indeed causing a server crash - assertContains(error.stackTraceToString(), "suspendServerCall") - } - @Test fun `throw during intercept - should fail with unknown status on client`() { var cause: Throwable? = null @@ -115,38 +100,6 @@ class ServerInterceptorTest : GrpcProtoTest() { assertContains(error.message!!, "Close in response flow") } - @Test - fun `close during onClose - should fail with correct status on client`() { - val error = assertFailsWith { - val interceptor = interceptor { - onClose { _, _ -> close(Status(StatusCode.UNAUTHENTICATED, "Close in onClose"), GrpcMetadata()) } - proceed(it) - } - runGrpcTest(serverInterceptors = interceptor, test = ::unaryCall) - } - - assertEquals(StatusCode.UNAUTHENTICATED, error.getStatus().statusCode) - assertContains(error.message!!, "Close in onClose") - } - - @Test - fun `close in two interceptors - should fail with correct status on client`() { - val error = assertFailsWith { - val interceptor1 = interceptor { - onClose { _, _ -> close(Status(StatusCode.UNAUTHENTICATED, "[1] Close in onClose"), GrpcMetadata()) } - proceed(it) - } - val interceptor2 = interceptor { - onClose { _, _ -> close(Status(StatusCode.UNAUTHENTICATED, "[2] Close in onClose"), GrpcMetadata()) } - proceed(it) - } - runGrpcTest(serverInterceptors = interceptor1 + interceptor2, test = ::unaryCall) - } - - assertEquals(StatusCode.UNAUTHENTICATED, error.getStatus().statusCode) - assertContains(error.message!!, "[1] Close in onClose") - } - @Test fun `dont proceed and return custom message - should succeed on client`() { val interceptor = interceptor { @@ -273,7 +226,6 @@ class ServerInterceptorTest : GrpcProtoTest() { } - private fun interceptor( block: ServerCallScope.(Flow) -> Flow, ): List { diff --git a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/utils.kt b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/utils.kt index f4b0ea9cf..af5b35492 100644 --- a/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/utils.kt +++ b/grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/utils.kt @@ -34,9 +34,6 @@ enum class Runtime { } expect val runtime: Runtime -expect fun setNativeEnv(key: String, value: String) -expect fun clearNativeEnv(key: String) - /** * Captures the standard error output written during the execution of the provided suspending block. * @@ -45,3 +42,10 @@ expect fun clearNativeEnv(key: String) */ expect suspend fun captureStdErr(block: suspend () -> Unit): String +expect suspend fun captureGrpcLogs( + jvmLogLevel: String = "DEBUG", + jvmLoggers: List = listOf("io.grpc"), + nativeVerbosity: String = "DEBUG", + nativeTracers: List = listOf("all"), + block: suspend () -> Unit +): String \ No newline at end of file diff --git a/grpc/grpc-core/src/jvmTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcKeepAliveTest.jvm.kt b/grpc/grpc-core/src/jvmTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcKeepAliveTest.jvm.kt new file mode 100644 index 000000000..f2bc226e6 --- /dev/null +++ b/grpc/grpc-core/src/jvmTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcKeepAliveTest.jvm.kt @@ -0,0 +1,70 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.rpc.grpc.test.proto + +import kotlinx.rpc.grpc.client.internal.ManagedChannel +import kotlinx.rpc.grpc.test.EchoRequest +import kotlinx.rpc.grpc.test.EchoService +import kotlinx.rpc.grpc.test.invoke +import kotlinx.rpc.withService +import java.lang.reflect.Field +import kotlin.test.assertEquals +import kotlin.time.Duration +import kotlin.time.Duration.Companion.nanoseconds + +actual fun GrpcProtoTest.testKeepAlive( + time: Duration, + timeout: Duration, + withoutCalls: Boolean +) { + runGrpcTest( + configure = { + keepAlive { + this.time = time + this.timeout = timeout + this.withoutCalls = withoutCalls + } + } + ) { + it.withService().UnaryEcho(EchoRequest { message = "Hello" }) + val nettyClientTransport = it.getField("channel") + .platformApi + .getField>("delegate", "subchannels") + .first() + .getField>("transports").first() + .getField("delegate", "delegate") + + val keepAliveTime = nettyClientTransport.getField("keepAliveTimeNanos").nanoseconds + val keepAliveTimeout = nettyClientTransport.getField("keepAliveTimeoutNanos").nanoseconds + val keepAliveWithoutCalls = nettyClientTransport.getField("keepAliveWithoutCalls") + + assertEquals(time, keepAliveTime) + assertEquals(timeout, keepAliveTimeout) + assertEquals(withoutCalls, keepAliveWithoutCalls) + } +} + +private inline fun Any.getField(vararg names: String): R { + var curr: Any = this + for (name in names) { + val field = findFieldInHierarchy(curr::class.java, name) + ?: throw NoSuchFieldException("Field '$name' not found in ${curr::class.java}") + field.isAccessible = true + curr = field.get(curr) as Any + } + return curr as R +} + +private fun findFieldInHierarchy(clazz: Class<*>, name: String): Field? { + var c: Class<*>? = clazz + while (c != null) { + try { + return c.getDeclaredField(name) + } catch (_: NoSuchFieldException) { + c = c.superclass + } + } + return null +} \ No newline at end of file diff --git a/grpc/grpc-core/src/jvmTest/kotlin/kotlinx/rpc/grpc/test/utils.jvm.kt b/grpc/grpc-core/src/jvmTest/kotlin/kotlinx/rpc/grpc/test/utils.jvm.kt index ef56db05d..cb714dc8f 100644 --- a/grpc/grpc-core/src/jvmTest/kotlin/kotlinx/rpc/grpc/test/utils.jvm.kt +++ b/grpc/grpc-core/src/jvmTest/kotlin/kotlinx/rpc/grpc/test/utils.jvm.kt @@ -6,18 +6,14 @@ package kotlinx.rpc.grpc.test import java.io.ByteArrayOutputStream import java.io.PrintStream +import java.util.logging.Handler +import java.util.logging.Level +import java.util.logging.LogRecord +import java.util.logging.Logger actual val runtime: Runtime get() = Runtime.JVM -actual fun setNativeEnv(key: String, value: String) { - // Nothing to do on JVM -} - -actual fun clearNativeEnv(key: String) { - // Nothing to do on JVM -} - actual suspend fun captureStdErr(block: suspend () -> Unit): String { val orig = System.out val baos = ByteArrayOutputStream() @@ -30,3 +26,41 @@ actual suspend fun captureStdErr(block: suspend () -> Unit): String { } } +actual suspend fun captureGrpcLogs( + jvmLogLevel: String, + jvmLoggers: List, + nativeVerbosity: String, + nativeTracers: List, + block: suspend () -> Unit +): String { + val sb = StringBuilder() + val handler = object : Handler() { + override fun publish(record: LogRecord) { + sb.append('[').append(record.loggerName).append("] ") + .append(record.level).append(": ") + .append(record.message).append('\n') + } + override fun flush() {} + override fun close() {} + } + handler.level = Level.ALL + + val saved = mutableListOf>() + try { + for (name in jvmLoggers) { + val logger = Logger.getLogger(name) + saved += logger to logger.level + logger.level = Level.ALL + logger.useParentHandlers = false + logger.addHandler(handler) + } + block() + return sb.toString() + } finally { + saved.forEach { (lg, lvl) -> + lg.level = lvl + lg.handlers.filterIsInstance() + .forEach { if (it === handler) lg.removeHandler(it) } + } + } +} diff --git a/grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcKeepAliveTest.native.kt b/grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcKeepAliveTest.native.kt new file mode 100644 index 000000000..ce30e151c --- /dev/null +++ b/grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcKeepAliveTest.native.kt @@ -0,0 +1,74 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.rpc.grpc.test.proto + +import kotlinx.coroutines.test.runTest +import kotlinx.rpc.grpc.test.EchoRequest +import kotlinx.rpc.grpc.test.EchoService +import kotlinx.rpc.grpc.test.captureGrpcLogs +import kotlinx.rpc.grpc.test.invoke +import kotlinx.rpc.withService +import kotlin.test.assertEquals +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds + + +actual fun GrpcProtoTest.testKeepAlive( + time: Duration, + timeout: Duration, + withoutCalls: Boolean, +) = runTest { + val logs = captureGrpcLogs( + nativeTracers = listOf("pick_first") + ) { + runGrpcTest( + configure = { + keepAlive { + this.time = time + this.timeout = timeout + this.withoutCalls = withoutCalls + } + } + ) { + it.withService().UnaryEcho(EchoRequest { message = "Hello" }) + } + } + + val keepAliveSettings = extractKeepAliveSettings(logs) + assertEquals(time, keepAliveSettings.time) + assertEquals(timeout, keepAliveSettings.timeout) + assertEquals(withoutCalls, keepAliveSettings.permitWithoutCalls) +} + +private data class KeepAliveSettings( + val permitWithoutCalls: Boolean, + val time: Duration, + val timeout: Duration +) + +private fun extractKeepAliveSettings(logs: String): KeepAliveSettings { + val channelArgsPattern = Regex("""channel args: \{([^}]+)\}""") + val channelArgsMatch = channelArgsPattern.find(logs) + ?: error("Could not find channel args in logs") + + val argsText = channelArgsMatch.groupValues[1] + + val permitWithoutCalls = Regex("""grpc\.keepalive_permit_without_calls=(\d+)""") + .find(argsText)?.groupValues?.get(1)?.toInt() == 1 + + val timeMs = Regex("""grpc\.keepalive_time_ms=(\d+)""") + .find(argsText)?.groupValues?.get(1)?.toInt() + ?: error("Could not find grpc.keepalive_time_ms in logs") + + val timeoutMs = Regex("""grpc\.keepalive_timeout_ms=(\d+)""") + .find(argsText)?.groupValues?.get(1)?.toInt() + ?: error("Could not find grpc.keepalive_timeout_ms in logs") + + return KeepAliveSettings( + permitWithoutCalls = permitWithoutCalls, + time = timeMs.milliseconds, + timeout = timeoutMs.milliseconds + ) +} diff --git a/grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/test/utils.native.kt b/grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/test/utils.native.kt index 17565693a..60ccd8ea8 100644 --- a/grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/test/utils.native.kt +++ b/grpc/grpc-core/src/nativeTest/kotlin/kotlinx/rpc/grpc/test/utils.native.kt @@ -6,6 +6,9 @@ package kotlinx.rpc.grpc.test +import platform.posix.getenv +import platform.posix.setenv + import kotlinx.cinterop.ExperimentalForeignApi import kotlinx.cinterop.IntVar import kotlinx.cinterop.allocArray @@ -22,20 +25,21 @@ import platform.posix.close import platform.posix.dup import platform.posix.dup2 import platform.posix.fflush -import platform.posix.fprintf import platform.posix.pipe import platform.posix.read import platform.posix.stderr +import libkgrpc.grpc_tracer_set_enabled +import platform.posix.unsetenv actual val runtime: Runtime get() = Runtime.NATIVE -actual fun setNativeEnv(key: String, value: String) { - platform.posix.setenv(key, value, 1) +fun setNativeEnv(key: String, value: String) { + setenv(key, value, 1) } -actual fun clearNativeEnv(key: String) { - platform.posix.unsetenv(key) +fun clearNativeEnv(key: String) { + unsetenv(key) } actual suspend fun captureStdErr(block: suspend () -> Unit): String = coroutineScope { @@ -75,3 +79,26 @@ actual suspend fun captureStdErr(block: suspend () -> Unit): String = coroutineS } } +actual suspend fun captureGrpcLogs( + jvmLogLevel: String, + jvmLoggers: List, + nativeVerbosity: String, + nativeTracers: List, + block: suspend () -> Unit +): String { + try { + return captureStdErr { + setNativeEnv("GRPC_VERBOSITY", nativeVerbosity) + nativeTracers.forEach { tracer -> + grpc_tracer_set_enabled(tracer, 1) + } + block() + } + } finally { + clearNativeEnv("GRPC_VERBOSITY") + nativeTracers.forEach { tracer -> + // set tracer to disabled + grpc_tracer_set_enabled(tracer, 0) + } + } +} \ No newline at end of file diff --git a/grpc/grpc-server/src/commonMain/kotlin/kotlinx/rpc/grpc/server/ServerInterceptor.kt b/grpc/grpc-server/src/commonMain/kotlin/kotlinx/rpc/grpc/server/ServerInterceptor.kt index a2e6eb4ac..9cd809597 100644 --- a/grpc/grpc-server/src/commonMain/kotlin/kotlinx/rpc/grpc/server/ServerInterceptor.kt +++ b/grpc/grpc-server/src/commonMain/kotlin/kotlinx/rpc/grpc/server/ServerInterceptor.kt @@ -58,6 +58,9 @@ public interface ServerCallScope { /** * Register a callback invoked when the call is closed (successfully or exceptionally). * Provides the final [kotlinx.rpc.grpc.Status] and the sent [GrpcMetadata] trailers. + * + * IMPORTANT: The callback must not throw an exception or use [close]. + * Behavior is undefined and may lead to crashes depending on the platform. */ public fun onClose(block: (Status, GrpcMetadata) -> Unit)