Skip to content

Commit 40c6093

Browse files
authored
grpc: Add keepalive Support (#528)
1 parent cda97dc commit 40c6093

File tree

16 files changed

+468
-147
lines changed

16 files changed

+468
-147
lines changed

grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/GrpcClient.kt

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ import kotlinx.coroutines.flow.Flow
88
import kotlinx.rpc.RpcCall
99
import kotlinx.rpc.RpcClient
1010
import kotlinx.rpc.grpc.GrpcMetadata
11-
import kotlinx.rpc.grpc.client.GrpcCallOptions
1211
import kotlinx.rpc.grpc.client.internal.ManagedChannel
1312
import kotlinx.rpc.grpc.client.internal.ManagedChannelBuilder
13+
import kotlinx.rpc.grpc.client.internal.applyConfig
1414
import kotlinx.rpc.grpc.client.internal.bidirectionalStreamingRpc
1515
import kotlinx.rpc.grpc.client.internal.buildChannel
1616
import kotlinx.rpc.grpc.client.internal.clientStreamingRpc
@@ -27,6 +27,7 @@ import kotlinx.rpc.grpc.descriptor.MethodType
2727
import kotlinx.rpc.grpc.descriptor.methodType
2828
import kotlinx.rpc.internal.utils.map.RpcInternalConcurrentHashMap
2929
import kotlin.time.Duration
30+
import kotlin.time.Duration.Companion.seconds
3031

3132
private typealias RequestClient = Any
3233

@@ -180,9 +181,7 @@ private fun GrpcClient(
180181
builder: ManagedChannelBuilder<*>,
181182
config: GrpcClientConfiguration,
182183
): GrpcClient {
183-
val channel = builder.apply {
184-
config.overrideAuthority?.let { overrideAuthority(it) }
185-
}.buildChannel()
184+
val channel = builder.applyConfig(config).buildChannel()
186185
return GrpcClient(channel, config.messageCodecResolver, config.interceptors)
187186
}
188187

@@ -198,6 +197,7 @@ private fun GrpcClient(
198197
*/
199198
public class GrpcClientConfiguration internal constructor() {
200199
internal val interceptors: MutableList<ClientInterceptor> = mutableListOf()
200+
internal var keepAlive: KeepAlive? = null
201201

202202
/**
203203
* Configurable resolver used to determine the appropriate codec for a given Kotlin type
@@ -294,4 +294,65 @@ public class GrpcClientConfiguration internal constructor() {
294294
public fun tls(configure: TlsClientCredentialsBuilder.() -> Unit): ClientCredentials =
295295
TlsClientCredentials(configure)
296296

297-
}
297+
/**
298+
* Configures keep-alive settings for the gRPC client.
299+
*
300+
* Keep-alive allows you to fine-tune the behavior of the client to ensure the connection
301+
* between the client and server remains active according to specific parameters.
302+
*
303+
* By default, keep-alive is disabled.
304+
*
305+
* ```
306+
* GrpcClient("localhost", 50051) {
307+
* keepAlive {
308+
* time = 10.seconds
309+
* timeout = 20.seconds
310+
* withoutCalls = false
311+
* }
312+
* }
313+
* ```
314+
*
315+
* @param configure A lambda to apply custom configurations to the [KeepAlive] instance.
316+
* The [KeepAlive] settings include:
317+
* - `time`: The maximum amount of time that the channel can be idle before a keep-alive
318+
* ping is sent.
319+
* - `timeout`: The time allowed for a keep-alive ping to complete.
320+
* - `withoutCalls`: Whether to send keep-alive pings even when there are no outstanding
321+
* RPCs on the connection.
322+
*
323+
* @see KeepAlive
324+
*/
325+
public fun keepAlive(configure: KeepAlive.() -> Unit) {
326+
keepAlive = KeepAlive().apply(configure)
327+
}
328+
329+
/**
330+
* Represents keep-alive settings for a gRPC client connection.
331+
*
332+
* Keep-alive ensures that the connection between the client and the server remains active.
333+
* It helps detect connection issues proactively before a request is made and facilitates
334+
* maintaining long-lived idle connections.
335+
*
336+
* Client authors must coordinate with service owners for whether a particular client-side
337+
* setting is acceptable.
338+
*
339+
* @property time Specifies the maximum amount of time the channel can remain idle before a
340+
* keep-alive ping is sent to the server to check the connection state.
341+
* The default value is `Duration.INFINITE`, which disables keep-alive pings when idle.
342+
*
343+
* @property timeout Sets the amount of time to wait for a keep-alive ping response.
344+
* If the server does not respond within this timeout, the connection will be considered broken.
345+
* The default value is 20 seconds.
346+
*
347+
* @property withoutCalls Defines whether keep-alive pings will be sent even when there
348+
* are no active RPCs on the connection. If set to `true`, pings will be sent regardless
349+
* of ongoing calls; otherwise, pings are only sent during active RPCs.
350+
* The default value is `false`.
351+
*/
352+
public class KeepAlive internal constructor() {
353+
public var time: Duration = Duration.INFINITE
354+
public var timeout: Duration = 20.seconds
355+
public var withoutCalls: Boolean = false
356+
}
357+
}
358+

grpc/grpc-client/src/commonMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package kotlinx.rpc.grpc.client.internal
88

99
import kotlinx.rpc.grpc.client.ClientCredentials
10+
import kotlinx.rpc.grpc.client.GrpcClientConfiguration
1011
import kotlinx.rpc.internal.utils.InternalRpcApi
1112
import kotlin.time.Duration
1213

@@ -71,9 +72,7 @@ public interface ManagedChannel {
7172
* Builder class for [ManagedChannel].
7273
*/
7374
@InternalRpcApi
74-
public expect abstract class ManagedChannelBuilder<T : ManagedChannelBuilder<T>> {
75-
public abstract fun overrideAuthority(authority: String): T
76-
}
75+
public expect abstract class ManagedChannelBuilder<T : ManagedChannelBuilder<T>>
7776

7877
@InternalRpcApi
7978
public expect fun ManagedChannelBuilder(
@@ -88,5 +87,7 @@ public expect fun ManagedChannelBuilder(
8887
credentials: ClientCredentials? = null,
8988
): ManagedChannelBuilder<*>
9089

90+
internal expect fun ManagedChannelBuilder<*>.applyConfig(config: GrpcClientConfiguration): ManagedChannelBuilder<*>
91+
9192
@InternalRpcApi
9293
public expect fun ManagedChannelBuilder<*>.buildChannel(): ManagedChannel

grpc/grpc-client/src/jvmMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.jvm.kt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import io.grpc.Grpc
1010
import kotlinx.coroutines.Dispatchers
1111
import kotlinx.coroutines.withContext
1212
import kotlinx.rpc.grpc.client.ClientCredentials
13+
import kotlinx.rpc.grpc.client.GrpcClientConfiguration
1314
import kotlinx.rpc.internal.utils.InternalRpcApi
1415
import java.util.concurrent.TimeUnit
1516
import kotlin.time.Duration
@@ -80,3 +81,14 @@ private class JvmManagedChannel(private val channel: io.grpc.ManagedChannel) : M
8081
override val platformApi: ManagedChannelPlatform
8182
get() = channel
8283
}
84+
85+
internal actual fun ManagedChannelBuilder<*>.applyConfig(config: GrpcClientConfiguration): ManagedChannelBuilder<*> {
86+
config.keepAlive?.let {
87+
keepAliveTime(it.time.inWholeMilliseconds, TimeUnit.MILLISECONDS)
88+
keepAliveTimeout(it.timeout.inWholeMilliseconds, TimeUnit.MILLISECONDS)
89+
keepAliveWithoutCalls(it.withoutCalls)
90+
}
91+
92+
config.overrideAuthority?.let { overrideAuthority(it) }
93+
return this
94+
}

grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/ManagedChannel.native.kt

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package kotlinx.rpc.grpc.client.internal
88

99
import kotlinx.rpc.grpc.client.ClientCredentials
10+
import kotlinx.rpc.grpc.client.GrpcClientConfiguration
1011
import kotlinx.rpc.grpc.client.TlsClientCredentials
1112
import kotlinx.rpc.grpc.internal.internalError
1213
import kotlinx.rpc.internal.utils.InternalRpcApi
@@ -22,25 +23,23 @@ public actual abstract class ManagedChannelPlatform : GrpcChannel()
2223
*/
2324
@InternalRpcApi
2425
public actual abstract class ManagedChannelBuilder<T : ManagedChannelBuilder<T>> {
25-
public actual abstract fun overrideAuthority(authority: String): T
26+
internal var config: GrpcClientConfiguration? = null
2627
}
2728

2829
internal class NativeManagedChannelBuilder(
2930
private val target: String,
3031
private var credentials: Lazy<ClientCredentials>,
3132
) : ManagedChannelBuilder<NativeManagedChannelBuilder>() {
32-
33-
private var authority: String? = null
34-
35-
override fun overrideAuthority(authority: String): NativeManagedChannelBuilder {
36-
this.authority = authority
37-
return this
38-
}
39-
4033
fun buildChannel(): NativeManagedChannel {
34+
val keepAlive = config?.keepAlive
35+
keepAlive?.run {
36+
require(time.isPositive()) { "keepalive time must be positive" }
37+
require(timeout.isPositive()) { "keepalive timeout must be positive" }
38+
}
4139
return NativeManagedChannel(
4240
target,
43-
authority = authority,
41+
authority = config?.overrideAuthority,
42+
keepAlive = config?.keepAlive,
4443
credentials = credentials.value,
4544
)
4645
}
@@ -69,4 +68,7 @@ public actual fun ManagedChannelBuilder(target: String, credentials: ClientCrede
6968
return NativeManagedChannelBuilder(target, credentials)
7069
}
7170

72-
71+
internal actual fun ManagedChannelBuilder<*>.applyConfig(config: GrpcClientConfiguration): ManagedChannelBuilder<*> {
72+
this.config = config
73+
return this
74+
}

grpc/grpc-client/src/nativeMain/kotlin/kotlinx/rpc/grpc/client/internal/NativeManagedChannel.kt

Lines changed: 60 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ import cnames.structs.grpc_channel
1010
import kotlinx.atomicfu.atomic
1111
import kotlinx.cinterop.CPointer
1212
import kotlinx.cinterop.ExperimentalForeignApi
13+
import kotlinx.cinterop.MemScope
1314
import kotlinx.cinterop.alloc
15+
import kotlinx.cinterop.allocArray
16+
import kotlinx.cinterop.convert
1417
import kotlinx.cinterop.cstr
1518
import kotlinx.cinterop.memScoped
1619
import kotlinx.cinterop.ptr
@@ -21,6 +24,7 @@ import kotlinx.coroutines.cancelChildren
2124
import kotlinx.coroutines.withTimeoutOrNull
2225
import kotlinx.rpc.grpc.client.ClientCredentials
2326
import kotlinx.rpc.grpc.client.GrpcCallOptions
27+
import kotlinx.rpc.grpc.client.GrpcClientConfiguration
2428
import kotlinx.rpc.grpc.client.rawDeadline
2529
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
2630
import kotlinx.rpc.grpc.internal.CompletionQueue
@@ -50,6 +54,7 @@ import kotlin.time.Duration
5054
internal class NativeManagedChannel(
5155
target: String,
5256
val authority: String?,
57+
val keepAlive: GrpcClientConfiguration.KeepAlive?,
5358
// we must store them, otherwise the credentials are getting released
5459
credentials: ClientCredentials,
5560
) : ManagedChannel, ManagedChannelPlatform() {
@@ -66,22 +71,36 @@ internal class NativeManagedChannel(
6671
private val cq = CompletionQueue()
6772

6873
internal val raw: CPointer<grpc_channel> = memScoped {
69-
val args = authority?.let {
74+
val args = mutableListOf<GrpcArg>()
75+
76+
authority?.let {
7077
// the C Core API doesn't have a way to override the authority (used for TLS SNI) as it
7178
// is available in the Java gRPC implementation.
7279
// instead, it can be done by setting the "grpc.ssl_target_name_override" argument.
73-
val authorityOverride = alloc<grpc_arg> {
74-
type = grpc_arg_type.GRPC_ARG_STRING
75-
key = "grpc.ssl_target_name_override".cstr.ptr
76-
value.string = authority.cstr.ptr
77-
}
80+
args.add(GrpcArg.Str(
81+
key = "grpc.ssl_target_name_override",
82+
value = it
83+
))
84+
}
7885

79-
alloc<grpc_channel_args> {
80-
num_args = 1u
81-
args = authorityOverride.ptr
82-
}
86+
keepAlive?.let {
87+
args.add(GrpcArg.Integer(
88+
key = "grpc.keepalive_time_ms",
89+
value = it.time.inWholeMilliseconds.convert()
90+
))
91+
args.add(GrpcArg.Integer(
92+
key = "grpc.keepalive_timeout_ms",
93+
value = it.timeout.inWholeMilliseconds.convert()
94+
))
95+
args.add(GrpcArg.Integer(
96+
key = "grpc.keepalive_permit_without_calls",
97+
value = if (it.withoutCalls) 1 else 0
98+
))
8399
}
84-
grpc_channel_create(target, credentials.raw, args?.ptr)
100+
101+
var rawArgs = if (args.isNotEmpty()) args.toRaw(this) else null
102+
103+
grpc_channel_create(target, credentials.raw, rawArgs?.ptr)
85104
?: error("Failed to create channel")
86105
}
87106

@@ -170,3 +189,33 @@ internal class NativeManagedChannel(
170189
}
171190

172191
}
192+
193+
internal sealed class GrpcArg(val key: String) {
194+
internal class Str(key: String, val value: String) : GrpcArg(key)
195+
internal class Integer(key: String, val value: Int) : GrpcArg(key)
196+
197+
internal val rawType: grpc_arg_type
198+
get() = when (this) {
199+
is Str -> grpc_arg_type.GRPC_ARG_STRING
200+
is Integer -> grpc_arg_type.GRPC_ARG_INTEGER
201+
}
202+
}
203+
204+
private fun List<GrpcArg>.toRaw(memScope: MemScope): grpc_channel_args {
205+
with(memScope) {
206+
val arr = allocArray<grpc_arg>(size) {
207+
val arg = get(it)
208+
type = arg.rawType
209+
key = arg.key.cstr.ptr
210+
when (arg) {
211+
is GrpcArg.Str -> value.string = arg.value.cstr.ptr
212+
is GrpcArg.Integer -> value.integer = arg.value.convert()
213+
}
214+
}
215+
216+
return alloc<grpc_channel_args> {
217+
num_args = size.convert()
218+
args = arr
219+
}
220+
}
221+
}

grpc/grpc-core/src/commonTest/kotlin/kotlinx/rpc/grpc/test/proto/GrpcCompressionTest.kt

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import kotlinx.coroutines.test.runTest
88
import kotlinx.rpc.RpcServer
99
import kotlinx.rpc.grpc.GrpcCompression
1010
import kotlinx.rpc.grpc.GrpcMetadata
11-
import kotlinx.rpc.grpc.Status
1211
import kotlinx.rpc.grpc.StatusCode
1312
import kotlinx.rpc.grpc.get
1413
import kotlinx.rpc.grpc.keys
@@ -18,11 +17,9 @@ import kotlinx.rpc.grpc.test.EchoServiceImpl
1817
import kotlinx.rpc.grpc.test.Runtime
1918
import kotlinx.rpc.grpc.test.assertContainsAll
2019
import kotlinx.rpc.grpc.test.assertGrpcFailure
21-
import kotlinx.rpc.grpc.test.captureStdErr
22-
import kotlinx.rpc.grpc.test.clearNativeEnv
20+
import kotlinx.rpc.grpc.test.captureGrpcLogs
2321
import kotlinx.rpc.grpc.test.invoke
2422
import kotlinx.rpc.grpc.test.runtime
25-
import kotlinx.rpc.grpc.test.setNativeEnv
2623
import kotlinx.rpc.registerService
2724
import kotlinx.rpc.withService
2825
import kotlin.collections.emptyList
@@ -101,7 +98,10 @@ class GrpcCompressionTest : GrpcProtoTest() {
10198
) {
10299
var reqHeaders = emptyMap<String, String>()
103100
var respHeaders = emptyMap<String, String>()
104-
val logs = captureNativeGrpcLogs {
101+
val logs = captureGrpcLogs(
102+
nativeTracers = listOf("compression", "http"),
103+
jvmLoggers = emptyList(),
104+
) {
105105
runGrpcTest(
106106
clientInterceptors = clientInterceptor {
107107
clientCompression?.let { compression ->
@@ -149,17 +149,6 @@ class GrpcCompressionTest : GrpcProtoTest() {
149149
assertContainsAll(listOf("gzip"), respHeaders.grpcAcceptEncoding())
150150
}
151151

152-
private suspend fun captureNativeGrpcLogs(block: suspend () -> Unit): String {
153-
try {
154-
return captureStdErr {
155-
setNativeEnv("GRPC_TRACE", "compression,http")
156-
block()
157-
}
158-
} finally {
159-
clearNativeEnv("GRPC_GRACE")
160-
}
161-
}
162-
163152
private fun GrpcMetadata.toMap(): Map<String, String> {
164153
return keys().mapNotNull { key ->
165154
if (!key.endsWith("-bin")) {

0 commit comments

Comments
 (0)