Skip to content

Commit 39e4659

Browse files
authored
GH-10623: Add GrpcInboundGateway (#10667)
* GH-10623: Add `GrpcInboundGateway` Fixes: #10623 * Add `spring-integration-grpc` module * Manage respective new `io.grpc` dependencies * Add Checkstyle suppression for new Proto-generated classes * Add `GrpcHeaders` for convenient API * Implement `GrpcInboundGateway` with a proxy for gRPC service methods * Add `TestInProcessConfiguration` for a common in-process infrastructure * Document new feature * * Handle error as `StatusRuntimeException` * Add test with error from server * Fix typos in the doc
1 parent a4f470f commit 39e4659

File tree

12 files changed

+923
-0
lines changed

12 files changed

+923
-0
lines changed

build.gradle

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ ext {
6262
graalvmVersion = '25.0.1'
6363
greenmailVersion = '2.1.8'
6464
groovyVersion = '5.0.3'
65+
grpcVersion = '1.77.0'
6566
hamcrestVersion = '3.0'
6667
hazelcastVersion = '5.6.0'
6768
hibernateVersion = '7.1.12.Final'
@@ -173,6 +174,7 @@ allprojects {
173174
dependencyManagement(platform("org.springframework.security:spring-security-bom:$springSecurityVersion"))
174175
dependencyManagement(platform("org.springframework.ws:spring-ws-bom:$springWsVersion"))
175176
dependencyManagement(platform("org.mongodb:mongodb-driver-bom:$mongoDriverVersion"))
177+
dependencyManagement(platform("io.grpc:grpc-bom:$grpcVersion"))
176178
}
177179
}
178180

@@ -627,6 +629,45 @@ project('spring-integration-groovy') {
627629
}
628630
}
629631

632+
project('spring-integration-grpc') {
633+
description = 'Spring Integration gRPC Support'
634+
635+
apply plugin: 'com.google.protobuf'
636+
637+
configurations {
638+
[compileProtoPath, testCompileProtoPath].each {
639+
it.extendsFrom(dependencyManagement)
640+
}
641+
}
642+
643+
dependencies {
644+
api 'io.grpc:grpc-stub'
645+
646+
testImplementation 'io.grpc:grpc-protobuf'
647+
testImplementation 'io.grpc:grpc-inprocess'
648+
testImplementation "com.google.protobuf:protobuf-java:$protobufVersion"
649+
}
650+
651+
protobuf {
652+
protoc {
653+
artifact = "com.google.protobuf:protoc:$protobufVersion"
654+
}
655+
plugins {
656+
grpc {
657+
artifact = "io.grpc:protoc-gen-grpc-java:$grpcVersion"
658+
}
659+
}
660+
generateProtoTasks {
661+
// Only generate for test source set, not main
662+
ofSourceSet('test')*.plugins {
663+
grpc {
664+
option '@generated=omit'
665+
}
666+
}
667+
}
668+
}
669+
}
670+
630671
project('spring-integration-hazelcast') {
631672
description = 'Spring Integration Hazelcast Support'
632673
dependencies {
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.grpc;
18+
19+
/**
20+
* Constants for gRPC-specific message headers.
21+
*
22+
* @author Artem Bilan
23+
*
24+
* @since 7.1
25+
*/
26+
public final class GrpcHeaders {
27+
28+
/**
29+
* The prefix for all gRPC-specific headers.
30+
*/
31+
public static final String PREFIX = "grpc_";
32+
33+
/**
34+
* The header containing the called gRPC service name.
35+
*/
36+
public static final String SERVICE = PREFIX + "service";
37+
38+
/**
39+
* The header containing the gRPC service method name.
40+
*/
41+
public static final String SERVICE_METHOD = PREFIX + "serviceMethod";
42+
43+
/**
44+
* The header containing the gRPC service method type.
45+
* One of the {@link io.grpc.MethodDescriptor.MethodType}
46+
*/
47+
public static final String METHOD_TYPE = PREFIX + "methodType";
48+
49+
/**
50+
* The header containing the gRPC service method schema descriptor.
51+
* A value from the {@link io.grpc.MethodDescriptor#getSchemaDescriptor()}
52+
*/
53+
public static final String SCHEMA_DESCRIPTOR = PREFIX + "schemaDescriptor";
54+
55+
private GrpcHeaders() {
56+
}
57+
58+
}
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.grpc.inbound;
18+
19+
import java.lang.reflect.Method;
20+
import java.util.Arrays;
21+
22+
import io.grpc.BindableService;
23+
import io.grpc.MethodDescriptor;
24+
import io.grpc.ServerServiceDefinition;
25+
import io.grpc.Status;
26+
import io.grpc.StatusRuntimeException;
27+
import io.grpc.stub.StreamObserver;
28+
import org.aopalliance.intercept.MethodInterceptor;
29+
import org.aopalliance.intercept.MethodInvocation;
30+
import org.jspecify.annotations.Nullable;
31+
import reactor.core.publisher.Flux;
32+
import reactor.core.publisher.Mono;
33+
import reactor.core.publisher.Sinks;
34+
35+
import org.springframework.aop.framework.ProxyFactory;
36+
import org.springframework.core.log.LogMessage;
37+
import org.springframework.integration.gateway.DefaultMethodInvokingMethodInterceptor;
38+
import org.springframework.integration.gateway.MessagingGatewaySupport;
39+
import org.springframework.integration.grpc.GrpcHeaders;
40+
import org.springframework.messaging.Message;
41+
import org.springframework.util.Assert;
42+
import org.springframework.util.ClassUtils;
43+
import org.springframework.util.ReflectionUtils;
44+
import org.springframework.util.StringUtils;
45+
46+
/**
47+
* The {@link MessagingGatewaySupport} implementation for gRPC {@link BindableService}.
48+
* An instance of this class requires a {@link BindableService} class from the gRPC service definition.
49+
* Only standard 'grpc' services are supported which implements a generated {@code AsyncService} interface.
50+
* This gateway is a {@link BindableService} by itself to be registered with the gRPC server.
51+
* An internal proxy is created to intercept gRPC method calls and convert them to Spring Integration messages.
52+
* A reply from the downstream flow is produced back to the gRPC response payload.
53+
* The request payload is a Proto message from gRPC request.
54+
* The reply payload must be a Proto message for gRPC response.
55+
* <p>
56+
* This gateway supports all the gRPC {@link MethodDescriptor.MethodType} types.
57+
* All the requests are produced to downstream flow in a reactive manner via {@link #sendAndReceiveMessageReactive(Object)}.
58+
* The {@link MethodDescriptor.MethodType#UNARY} and {@link MethodDescriptor.MethodType#BIDI_STREAMING}
59+
* are same from the downstream handling logic perspective.
60+
* The {@link MethodDescriptor.MethodType#CLIENT_STREAMING} produces a {@link Flux} of gRPC request payloads.
61+
* The {@link MethodDescriptor.MethodType#SERVER_STREAMING} reply can be a single entity or a {@link Flux} of them.
62+
* <p>
63+
* For convenience, the {@link GrpcHeaders} are populated into a request message.
64+
* Such information can be used, for example, in downstream flow for routing.
65+
*
66+
* @author Artem Bilan
67+
*
68+
* @since 7.1
69+
*/
70+
public class GrpcInboundGateway extends MessagingGatewaySupport implements BindableService {
71+
72+
private final Class<? extends BindableService> grpcServiceClass;
73+
74+
@SuppressWarnings("NullAway.Init")
75+
private Object asyncService;
76+
77+
@SuppressWarnings("NullAway.Init")
78+
private ServerServiceDefinition serverServiceDefinition;
79+
80+
public GrpcInboundGateway(Class<? extends BindableService> grpcServiceClass) {
81+
this.grpcServiceClass = grpcServiceClass;
82+
}
83+
84+
@Override
85+
protected void onInit() {
86+
super.onInit();
87+
Class<?>[] serviceInterfaces =
88+
ClassUtils.getAllInterfacesForClass(this.grpcServiceClass, getApplicationContext().getClassLoader());
89+
90+
for (Class<?> serviceInterface : serviceInterfaces) {
91+
if ("AsyncService".equals(serviceInterface.getSimpleName())) {
92+
createServiceProxyAndServerDefinition(serviceInterface);
93+
break;
94+
}
95+
}
96+
97+
Assert.state(this.asyncService != null,
98+
"Only standard 'grpc' service are supported providing an 'AsyncService' contract.");
99+
}
100+
101+
@SuppressWarnings("NullAway")
102+
private void createServiceProxyAndServerDefinition(Class<?> serviceInterface) {
103+
ProxyFactory proxyFactory = new ProxyFactory(serviceInterface, (MethodInterceptor) this::interceptGrpc);
104+
proxyFactory.addAdvice(new DefaultMethodInvokingMethodInterceptor());
105+
this.asyncService = proxyFactory.getProxy(getApplicationContext().getClassLoader());
106+
Method bindServiceMethod =
107+
ClassUtils.getStaticMethod(this.grpcServiceClass.getEnclosingClass(), "bindService", serviceInterface);
108+
109+
this.serverServiceDefinition =
110+
(ServerServiceDefinition) ReflectionUtils.invokeMethod(bindServiceMethod, null, this.asyncService);
111+
}
112+
113+
@Override
114+
public ServerServiceDefinition bindService() {
115+
return this.serverServiceDefinition;
116+
}
117+
118+
@SuppressWarnings({"unchecked", "NullAway"})
119+
private @Nullable Object interceptGrpc(MethodInvocation invocation) {
120+
Object[] arguments = invocation.getArguments();
121+
122+
String fullMethodName =
123+
this.serverServiceDefinition.getServiceDescriptor().getName() +
124+
'/' +
125+
StringUtils.capitalize(invocation.getMethod().getName());
126+
127+
MethodDescriptor<?, ?> serviceMethod =
128+
this.serverServiceDefinition.getMethod(fullMethodName)
129+
.getMethodDescriptor();
130+
131+
logger.debug(LogMessage.format("gRPC request for [%s] with arguments %s",
132+
fullMethodName, Arrays.toString(arguments)));
133+
134+
switch (serviceMethod.getType()) {
135+
case UNARY -> {
136+
unary(serviceMethod, arguments[0], (StreamObserver<Object>) arguments[1]);
137+
return null;
138+
}
139+
case SERVER_STREAMING -> {
140+
serverStreaming(serviceMethod, arguments[0], (StreamObserver<Object>) arguments[1]);
141+
return null;
142+
}
143+
case CLIENT_STREAMING -> {
144+
return clientStreaming(serviceMethod, (StreamObserver<Object>) arguments[0]);
145+
}
146+
case BIDI_STREAMING -> {
147+
return bidiStreaming(serviceMethod, (StreamObserver<Object>) arguments[0]);
148+
}
149+
default -> throw new IllegalStateException("Unknown gRPC method type: " + serviceMethod.getType());
150+
}
151+
}
152+
153+
private void unary(MethodDescriptor<?, ?> methodDescriptor, Object requestPayload,
154+
StreamObserver<Object> responseObserver) {
155+
156+
sendRequestAndProduceReply(methodDescriptor, requestPayload)
157+
.subscribe(responseObserver::onNext,
158+
t -> responseObserver.onError(toGrpcStatusException(t)),
159+
responseObserver::onCompleted);
160+
}
161+
162+
private void serverStreaming(MethodDescriptor<?, ?> methodDescriptor, Object requestPayload,
163+
StreamObserver<Object> responseObserver) {
164+
165+
sendRequestAndProduceReply(methodDescriptor, requestPayload)
166+
.flatMapMany(payload -> payload instanceof Flux<?> flux ? flux : Flux.just(payload))
167+
.subscribe(responseObserver::onNext,
168+
t -> responseObserver.onError(toGrpcStatusException(t)),
169+
responseObserver::onCompleted);
170+
}
171+
172+
private StreamObserver<?> clientStreaming(MethodDescriptor<?, ?> methodDescriptor,
173+
StreamObserver<Object> responseObserver) {
174+
175+
Sinks.Many<Object> requestPayload = Sinks.many().unicast().onBackpressureBuffer();
176+
177+
return new StreamObserver<>() {
178+
179+
@Override
180+
public void onNext(Object value) {
181+
requestPayload.tryEmitNext(value);
182+
}
183+
184+
@Override
185+
public void onError(Throwable t) {
186+
throw toGrpcStatusException(t, "gRPC request [" + methodDescriptor.getFullMethodName() + "] has failed");
187+
}
188+
189+
@Override
190+
public void onCompleted() {
191+
requestPayload.tryEmitComplete();
192+
sendRequestAndProduceReply(methodDescriptor, requestPayload.asFlux())
193+
.subscribe(responseObserver::onNext,
194+
t -> responseObserver.onError(toGrpcStatusException(t)),
195+
responseObserver::onCompleted);
196+
}
197+
198+
};
199+
}
200+
201+
private StreamObserver<?> bidiStreaming(MethodDescriptor<?, ?> methodDescriptor,
202+
StreamObserver<Object> responseObserver) {
203+
204+
return new StreamObserver<>() {
205+
206+
@Override
207+
public void onNext(Object value) {
208+
sendRequestAndProduceReply(methodDescriptor, value)
209+
.subscribe(responseObserver::onNext,
210+
t -> responseObserver.onError(toGrpcStatusException(t)));
211+
}
212+
213+
@Override
214+
public void onError(Throwable t) {
215+
throw toGrpcStatusException(t, "gRPC request [" + methodDescriptor.getFullMethodName() + "] has failed");
216+
}
217+
218+
@Override
219+
public void onCompleted() {
220+
responseObserver.onCompleted();
221+
}
222+
223+
};
224+
}
225+
226+
private Mono<?> sendRequestAndProduceReply(MethodDescriptor<?, ?> serviceMethod, Object requestPayload) {
227+
Message<?> requestMessage =
228+
getMessageBuilderFactory()
229+
.withPayload(requestPayload)
230+
.setHeader(GrpcHeaders.SERVICE, serviceMethod.getServiceName())
231+
.setHeader(GrpcHeaders.SERVICE_METHOD, serviceMethod.getBareMethodName())
232+
.setHeader(GrpcHeaders.METHOD_TYPE, serviceMethod.getType())
233+
.setHeader(GrpcHeaders.SCHEMA_DESCRIPTOR, serviceMethod.getSchemaDescriptor())
234+
.build();
235+
236+
return sendAndReceiveMessageReactive(requestMessage)
237+
.map(Message::getPayload);
238+
}
239+
240+
private static StatusRuntimeException toGrpcStatusException(Throwable throwable) {
241+
return toGrpcStatusException(throwable, throwable.getMessage());
242+
}
243+
244+
private static StatusRuntimeException toGrpcStatusException(Throwable throwable, @Nullable String description) {
245+
return Status.fromThrowable(throwable)
246+
.withDescription(description)
247+
.asRuntimeException();
248+
}
249+
250+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
/**
2+
* Components for server-side gRPC support.
3+
*/
4+
@org.jspecify.annotations.NullMarked
5+
package org.springframework.integration.grpc.inbound;
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
/**
2+
* Base package for gRPC support.
3+
*/
4+
@org.jspecify.annotations.NullMarked
5+
package org.springframework.integration.grpc;

0 commit comments

Comments
 (0)