Skip to content

Commit 72be793

Browse files
committed
* Handle error as StatusRuntimeException
* Add test with error from server * Fix typos in the doc
1 parent 1648529 commit 72be793

File tree

5 files changed

+53
-13
lines changed

5 files changed

+53
-13
lines changed

spring-integration-grpc/src/main/java/org/springframework/integration/grpc/inbound/GrpcInboundGateway.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import io.grpc.BindableService;
2323
import io.grpc.MethodDescriptor;
2424
import io.grpc.ServerServiceDefinition;
25+
import io.grpc.Status;
26+
import io.grpc.StatusRuntimeException;
2527
import io.grpc.stub.StreamObserver;
2628
import org.aopalliance.intercept.MethodInterceptor;
2729
import org.aopalliance.intercept.MethodInvocation;
@@ -152,15 +154,19 @@ private void unary(MethodDescriptor<?, ?> methodDescriptor, Object requestPayloa
152154
StreamObserver<Object> responseObserver) {
153155

154156
sendRequestAndProduceReply(methodDescriptor, requestPayload)
155-
.subscribe(responseObserver::onNext, responseObserver::onError, responseObserver::onCompleted);
157+
.subscribe(responseObserver::onNext,
158+
t -> responseObserver.onError(toGrpcStatusException(t)),
159+
responseObserver::onCompleted);
156160
}
157161

158162
private void serverStreaming(MethodDescriptor<?, ?> methodDescriptor, Object requestPayload,
159163
StreamObserver<Object> responseObserver) {
160164

161165
sendRequestAndProduceReply(methodDescriptor, requestPayload)
162166
.flatMapMany(payload -> payload instanceof Flux<?> flux ? flux : Flux.just(payload))
163-
.subscribe(responseObserver::onNext, responseObserver::onError, responseObserver::onCompleted);
167+
.subscribe(responseObserver::onNext,
168+
t -> responseObserver.onError(toGrpcStatusException(t)),
169+
responseObserver::onCompleted);
164170
}
165171

166172
private StreamObserver<?> clientStreaming(MethodDescriptor<?, ?> methodDescriptor,
@@ -177,15 +183,15 @@ public void onNext(Object value) {
177183

178184
@Override
179185
public void onError(Throwable t) {
180-
throw new IllegalStateException(
181-
"gRPC request [" + methodDescriptor.getFullMethodName() + "] has failed", t);
186+
throw toGrpcStatusException(t, "gRPC request [" + methodDescriptor.getFullMethodName() + "] has failed");
182187
}
183188

184189
@Override
185190
public void onCompleted() {
186191
requestPayload.tryEmitComplete();
187192
sendRequestAndProduceReply(methodDescriptor, requestPayload.asFlux())
188-
.subscribe(responseObserver::onNext, responseObserver::onError,
193+
.subscribe(responseObserver::onNext,
194+
t -> responseObserver.onError(toGrpcStatusException(t)),
189195
responseObserver::onCompleted);
190196
}
191197

@@ -200,13 +206,13 @@ private StreamObserver<?> bidiStreaming(MethodDescriptor<?, ?> methodDescriptor,
200206
@Override
201207
public void onNext(Object value) {
202208
sendRequestAndProduceReply(methodDescriptor, value)
203-
.subscribe(responseObserver::onNext, responseObserver::onError);
209+
.subscribe(responseObserver::onNext,
210+
t -> responseObserver.onError(toGrpcStatusException(t)));
204211
}
205212

206213
@Override
207214
public void onError(Throwable t) {
208-
throw new IllegalStateException(
209-
"gRPC request [" + methodDescriptor.getFullMethodName() + "] has failed", t);
215+
throw toGrpcStatusException(t, "gRPC request [" + methodDescriptor.getFullMethodName() + "] has failed");
210216
}
211217

212218
@Override
@@ -231,4 +237,14 @@ private Mono<?> sendRequestAndProduceReply(MethodDescriptor<?, ?> serviceMethod,
231237
.map(Message::getPayload);
232238
}
233239

240+
private static StatusRuntimeException toGrpcStatusException(Throwable throwable) {
241+
return toGrpcStatusException(throwable, throwable.getMessage());
242+
}
243+
244+
private static StatusRuntimeException toGrpcStatusException(Throwable throwable, @Nullable String description) {
245+
return Status.fromThrowable(throwable)
246+
.withDescription(description)
247+
.asRuntimeException();
248+
}
249+
234250
}

spring-integration-grpc/src/test/java/org/springframework/integration/grpc/TestInProcessConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ void startServer() throws IOException {
7373

7474
@Override
7575
public void destroy() {
76-
this.server.shutdown();
76+
this.server.shutdownNow();
7777
}
7878

7979
}

spring-integration-grpc/src/test/java/org/springframework/integration/grpc/inbound/GrpcInboundGatewayTests.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727

2828
import com.google.common.util.concurrent.ListenableFuture;
2929
import io.grpc.ManagedChannel;
30+
import io.grpc.Status;
31+
import io.grpc.StatusRuntimeException;
3032
import io.grpc.stub.StreamObserver;
3133
import org.junit.jupiter.api.Test;
3234
import reactor.core.publisher.Flux;
@@ -49,6 +51,7 @@
4951
import org.springframework.util.StringUtils;
5052

5153
import static org.assertj.core.api.Assertions.assertThat;
54+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
5255

5356
/**
5457
* @author Artem Bilan
@@ -146,6 +149,18 @@ void bidiStreaming() throws InterruptedException {
146149
.containsAll(Arrays.stream(names).map("Hello "::concat).toList());
147150
}
148151

152+
@Test
153+
void errorFromServer() {
154+
assertThatExceptionOfType(StatusRuntimeException.class)
155+
.isThrownBy(() -> this.testHelloWorldBlockingStub.errorOnHello(newHelloRequest("Error")))
156+
.satisfies(e -> {
157+
assertThat(e.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
158+
assertThat(e.getStatus().getDescription())
159+
.contains("Failed to transform Message in bean " +
160+
"'grpcIntegrationFlow.subFlow#4.method-invoking-transformer#1'");
161+
});
162+
}
163+
149164
private static HelloRequest newHelloRequest(String message) {
150165
return HelloRequest.newBuilder().setName(message).build();
151166
}
@@ -218,6 +233,12 @@ IntegrationFlow grpcIntegrationFlow(GrpcInboundGateway helloWorldService) {
218233

219234
.subFlowMapping("BidiStreamHello", flow -> flow
220235
.transform(this::requestReply))
236+
237+
.subFlowMapping("ErrorOnHello", flow -> flow
238+
.transform(p -> {
239+
throw Status.UNAVAILABLE.withDescription("intentional")
240+
.asRuntimeException();
241+
}))
221242
)
222243
.get();
223244
}

spring-integration-grpc/src/test/proto/test_hello.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ service TestHelloWorld {
2323
// Streams requests and replies
2424
rpc BidiStreamHello(stream HelloRequest) returns (stream HelloReply) {}
2525

26+
// Fail with error
27+
rpc ErrorOnHello(HelloRequest) returns (HelloReply) {}
28+
2629
}
2730

2831
// The request message containing the user's name.

src/reference/antora/modules/ROOT/pages/grpc.adoc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ The `GrpcInboundGateway` is a `MessagingGatewaySupport` implementation to receiv
6666
For initialization, the instance of this gateway requires only an abstract gRPC service class implementing `BindableService`, usually generated from Protobuf and comes with a `*ImplBase` class name.
6767

6868
WARNING: Only standard gRPC services are supported: a generated `AsyncService` contract is what `GrpcInboundGateway` logic based on.
69-
The Reactor and Kotlin-based service generation don't make sense in Spring Integration logic since those types are not exposed anyhow from the gateway definition.
69+
The Reactor and Kotlin-based service generation don't make sense in Spring Integration logic since those types are not exposed from the gateway definition.
7070

7171
The gateway uses the mentioned `AsyncService` interface to create proxy and intercept gRPC service methods.
7272

@@ -81,17 +81,17 @@ GrpcInboundGateway helloWorldService() {
8181
----
8282

8383
The `GrpcInboundGateway` implements a `BindableService` and exposes a `ServerServiceDefinition` based on the mentioned proxy for an `AsyncService` contract of the gRPC service.
84-
Therefore, exactly instance of this gateway has to be registered into a `ServerBuilder` and no need in any other `*ImplBase` implementations in the application.
84+
Therefore, an instance of this gateway has to be registered into a `ServerBuilder` and no need in any other `*ImplBase` implementations in the application.
8585

8686
IMPORTANT: With https://spring.io/projects/spring-grpc[Spring gRPC] and its auto-discovery for `BindableService` implementations, the `GrpcInboundGateway` has to be declared as a top-level bean.
8787
Therefore, Java DSL API like `IntegrationFlow.from(new GrpcInboundGateway(TestHelloWorldGrpc.TestHelloWorldImplBase.class))` is not recommended because such a `BindableService` implementation won't make it visible for respective Spring gRPC infrastructure.
8888

8989
The `GrpcInboundGateway` uses a `sendAndReceiveMessageReactive()` API to interact with the downstream flow and adapts a `Mono` reply to the gRPC `StreamObserver`.
9090
As mentioned before, the request message payload is exactly a gRPC request message, and it expects a reply in a from of gRPC response message.
91-
The downstream logic could be type-safe and deal with gRPC messages similar way as if `*ImplBase` would be implemented manually.
91+
The downstream logic can be type-safe and deal with gRPC messages similar way as if `*ImplBase` would be implemented manually.
9292

9393
The `MethodDescriptor.MethodType.UNARY` and `MethodDescriptor.MethodType.BIDI_STREAMING` are same from the downstream handling logic perspective.
94-
In other words, the `BIDI_STREAMING` is handled as a loop on request items and the gateway produces a response item for each of them.
94+
In other words, the `BIDI_STREAMING` is handled as a loop on request items and the gateway produces a response item immediately into the response `StreamObserver`.
9595
For different `BIDI_STREAMING` logic, the regular gRPC service implementation is recommended.
9696

9797
The `MethodDescriptor.MethodType.CLIENT_STREAMING` mode produces a message with a `Flux` as a payload of gRPC request items.

0 commit comments

Comments
 (0)