diff --git a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java index 90447c6de285e..252ca9a743057 100644 --- a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java +++ b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java @@ -68,6 +68,14 @@ public GrpcConfiguration getConfiguration() { return configuration; } + /** + * Returns the actual local port the gRPC server is listening on. Useful when the server was started with port 0 + * (OS-assigned port). + */ + public int getLocalPort() { + return server != null ? server.getPort() : -1; + } + @Override protected void doStart() throws Exception { super.doStart(); @@ -94,7 +102,7 @@ protected void initializeServer() throws Exception { BindableService bindableService = getBindableServiceFactory().createBindableService(this); ServerInterceptor headerInterceptor = new GrpcHeaderInterceptor(); - if (ObjectHelper.isNotEmpty(configuration.getHost()) && configuration.getPort() > 0) { + if (ObjectHelper.isNotEmpty(configuration.getHost()) && configuration.getPort() >= 0) { LOG.debug("Building gRPC server on {}:{}", configuration.getHost(), configuration.getPort()); serverBuilder = NettyServerBuilder.forAddress(new InetSocketAddress(configuration.getHost(), configuration.getPort())); diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerAggregationTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerAggregationTest.java index 76bedb6ca4a55..f936349e5559d 100644 --- a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerAggregationTest.java +++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerAggregationTest.java @@ -25,12 +25,10 @@ import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit6.CamelTestSupport; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,10 +40,6 @@ public class GrpcConsumerAggregationTest extends CamelTestSupport { private static final Logger LOG = LoggerFactory.getLogger(GrpcConsumerAggregationTest.class); - @RegisterExtension - static AvailablePortFinder.Port grpcSyncRequestTestPort = AvailablePortFinder.find(); - @RegisterExtension - static AvailablePortFinder.Port grpcAsyncRequestTestPort = AvailablePortFinder.find(); private static final int GRPC_TEST_PING_ID = 1; private static final String GRPC_TEST_PING_VALUE = "PING"; private static final String GRPC_TEST_PONG_VALUE = "PONG"; @@ -58,10 +52,12 @@ public class GrpcConsumerAggregationTest extends CamelTestSupport { @BeforeEach public void startGrpcChannels() { - syncRequestChannel - = ManagedChannelBuilder.forAddress("localhost", grpcSyncRequestTestPort.getPort()).usePlaintext().build(); - asyncRequestChannel - = ManagedChannelBuilder.forAddress("localhost", grpcAsyncRequestTestPort.getPort()).usePlaintext().build(); + syncRequestChannel = ManagedChannelBuilder + .forAddress("localhost", ((GrpcConsumer) context.getRoute("grpc-sync").getConsumer()).getLocalPort()) + .usePlaintext().build(); + asyncRequestChannel = ManagedChannelBuilder + .forAddress("localhost", ((GrpcConsumer) context.getRoute("grpc-async").getConsumer()).getLocalPort()) + .usePlaintext().build(); blockingStub = PingPongGrpc.newBlockingStub(syncRequestChannel); nonBlockingStub = PingPongGrpc.newStub(syncRequestChannel); asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel); @@ -186,12 +182,12 @@ protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { @Override public void configure() { - from("grpc://localhost:" + grpcSyncRequestTestPort.getPort() - + "/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION") + from("grpc://localhost:0/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION") + .routeId("grpc-sync") .bean(new GrpcMessageBuilder(), "buildPongResponse"); - from("grpc://localhost:" + grpcAsyncRequestTestPort.getPort() - + "/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION") + from("grpc://localhost:0/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION") + .routeId("grpc-async") .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse"); } }; diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java index 9d9e2a802dacc..03e0e31e03106 100644 --- a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java +++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java @@ -29,10 +29,8 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit6.CamelTestSupport; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,10 +41,6 @@ public class GrpcConsumerConcurrentTest extends CamelTestSupport { private static final Logger LOG = LoggerFactory.getLogger(GrpcConsumerConcurrentTest.class); - @RegisterExtension - static AvailablePortFinder.Port grpcAsyncRequestTestPort = AvailablePortFinder.find(); - @RegisterExtension - static AvailablePortFinder.Port grpcHeadersTestPort = AvailablePortFinder.find(); private static final int CONCURRENT_THREAD_COUNT = 30; private static final int ROUNDS_PER_THREAD_COUNT = 10; private static final String GRPC_TEST_PING_VALUE = "PING"; @@ -64,13 +58,14 @@ public static Integer getId() { @Test public void testAsyncWithConcurrentThreads() { + int asyncPort = ((GrpcConsumer) context.getRoute("grpc-async").getConsumer()).getLocalPort(); RunnableAssert ra = new RunnableAssert("foo") { @Override public void run() { final CountDownLatch latch = new CountDownLatch(1); ManagedChannel asyncRequestChannel - = NettyChannelBuilder.forAddress("localhost", grpcAsyncRequestTestPort.getPort()).usePlaintext() + = NettyChannelBuilder.forAddress("localhost", asyncPort).usePlaintext() .build(); PingPongGrpc.PingPongStub asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel); @@ -105,13 +100,14 @@ public void run() { @Test public void testHeadersWithConcurrentThreads() { + int headersPort = ((GrpcConsumer) context.getRoute("grpc-headers").getConsumer()).getLocalPort(); RunnableAssert ra = new RunnableAssert("foo") { @Override public void run() { int instanceId = createId(); final CountDownLatch latch = new CountDownLatch(1); - ManagedChannel asyncRequestChannel = NettyChannelBuilder.forAddress("localhost", grpcHeadersTestPort.getPort()) + ManagedChannel asyncRequestChannel = NettyChannelBuilder.forAddress("localhost", headersPort) .userAgent(GRPC_USER_AGENT_PREFIX + instanceId) .usePlaintext().build(); PingPongGrpc.PingPongStub asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel); @@ -149,12 +145,12 @@ protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { @Override public void configure() { - from("grpc://localhost:" + grpcAsyncRequestTestPort.getPort() - + "/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION") + from("grpc://localhost:0/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION") + .routeId("grpc-async") .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse"); - from("grpc://localhost:" + grpcHeadersTestPort.getPort() - + "/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION") + from("grpc://localhost:0/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION") + .routeId("grpc-headers") .process(new HeaderExchangeProcessor()); } }; diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerExceptionTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerExceptionTest.java index 9733eccb51b99..29fe62504bff0 100644 --- a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerExceptionTest.java +++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerExceptionTest.java @@ -25,12 +25,10 @@ import io.grpc.stub.StreamObserver; import org.apache.camel.CamelException; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit6.CamelTestSupport; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,8 +41,6 @@ public class GrpcConsumerExceptionTest extends CamelTestSupport { private static final Logger LOG = LoggerFactory.getLogger(GrpcConsumerExceptionTest.class); - @RegisterExtension - static AvailablePortFinder.Port grpcSyncRequestTestPort = AvailablePortFinder.find(); private static final int GRPC_TEST_PING_ID = 1; private static final String GRPC_TEST_PING_VALUE = "PING"; @@ -54,8 +50,8 @@ public class GrpcConsumerExceptionTest extends CamelTestSupport { @BeforeEach public void startGrpcChannels() { - syncRequestChannel - = ManagedChannelBuilder.forAddress("localhost", grpcSyncRequestTestPort.getPort()).usePlaintext().build(); + int port = ((GrpcConsumer) context.getRoute("grpc-exception").getConsumer()).getLocalPort(); + syncRequestChannel = ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build(); blockingStub = PingPongGrpc.newBlockingStub(syncRequestChannel); nonBlockingStub = PingPongGrpc.newStub(syncRequestChannel); } @@ -96,8 +92,8 @@ protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { @Override public void configure() { - from("grpc://localhost:" + grpcSyncRequestTestPort.getPort() - + "/org.apache.camel.component.grpc.PingPong?synchronous=true") + from("grpc://localhost:0/org.apache.camel.component.grpc.PingPong?synchronous=true") + .routeId("grpc-exception") .throwException(CamelException.class, "GRPC Camel exception message"); } diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java index 7792b2509690d..7f416e30f6439 100644 --- a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java +++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerPropagationTest.java @@ -24,12 +24,10 @@ import io.grpc.stub.StreamObserver; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit6.CamelTestSupport; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,10 +39,6 @@ public class GrpcConsumerPropagationTest extends CamelTestSupport { private static final Logger LOG = LoggerFactory.getLogger(GrpcConsumerPropagationTest.class); - @RegisterExtension - static AvailablePortFinder.Port grpcAsyncNextRequestTestPort = AvailablePortFinder.find(); - @RegisterExtension - static AvailablePortFinder.Port grpcAsyncCompletedRequestTestPort = AvailablePortFinder.find(); private static final int GRPC_TEST_PING_ID = 1; private static final String GRPC_TEST_PING_VALUE = "PING"; private static final String GRPC_TEST_PONG_VALUE = "PONG"; @@ -56,11 +50,12 @@ public class GrpcConsumerPropagationTest extends CamelTestSupport { @BeforeEach public void startGrpcChannels() { - asyncOnNextChannel - = ManagedChannelBuilder.forAddress("localhost", grpcAsyncNextRequestTestPort.getPort()).usePlaintext().build(); - asyncOnCompletedChannel - = ManagedChannelBuilder.forAddress("localhost", grpcAsyncCompletedRequestTestPort.getPort()).usePlaintext() - .build(); + asyncOnNextChannel = ManagedChannelBuilder + .forAddress("localhost", ((GrpcConsumer) context.getRoute("grpc-on-next").getConsumer()).getLocalPort()) + .usePlaintext().build(); + asyncOnCompletedChannel = ManagedChannelBuilder + .forAddress("localhost", ((GrpcConsumer) context.getRoute("grpc-on-completed").getConsumer()).getLocalPort()) + .usePlaintext().build(); asyncOnNextStub = PingPongGrpc.newStub(asyncOnNextChannel); asyncOnCompletedStub = PingPongGrpc.newStub(asyncOnCompletedChannel); } @@ -126,13 +121,13 @@ protected RouteBuilder createRouteBuilder() { @Override public void configure() { - from("grpc://localhost:" + grpcAsyncNextRequestTestPort.getPort() - + "/org.apache.camel.component.grpc.PingPong?consumerStrategy=PROPAGATION") + from("grpc://localhost:0/org.apache.camel.component.grpc.PingPong?consumerStrategy=PROPAGATION") + .routeId("grpc-on-next") .to("mock:async-on-next-propagation") .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse"); - from("grpc://localhost:" + grpcAsyncCompletedRequestTestPort.getPort() - + "/org.apache.camel.component.grpc.PingPong?consumerStrategy=PROPAGATION&forwardOnCompleted=true") + from("grpc://localhost:0/org.apache.camel.component.grpc.PingPong?consumerStrategy=PROPAGATION&forwardOnCompleted=true") + .routeId("grpc-on-completed") .to("mock:async-on-completed-propagation"); } }; diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerSecurityTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerSecurityTest.java index 802ba2069bf8e..80beb566133b2 100644 --- a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerSecurityTest.java +++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerSecurityTest.java @@ -34,13 +34,11 @@ import org.apache.camel.component.grpc.auth.jwt.JwtCallCredentials; import org.apache.camel.component.grpc.auth.jwt.JwtHelper; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit6.CamelTestSupport; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,12 +50,6 @@ public class GrpcConsumerSecurityTest extends CamelTestSupport { private static final Logger LOG = LoggerFactory.getLogger(GrpcConsumerSecurityTest.class); - @RegisterExtension - static AvailablePortFinder.Port grpcTlsTestPort = AvailablePortFinder.find(); - @RegisterExtension - static AvailablePortFinder.Port grpcJwtCorrectTestPort = AvailablePortFinder.find(); - @RegisterExtension - static AvailablePortFinder.Port grpcJwtIncorrectTestPort = AvailablePortFinder.find(); private static final int GRPC_TEST_PING_ID = 1; private static final String GRPC_TEST_PING_VALUE = "PING"; private static final String GRPC_TEST_PONG_VALUE = "PONG"; @@ -85,14 +77,16 @@ public void startGrpcChannels() throws SSLException { Assumptions.assumeTrue(sslContext instanceof OpenSslClientContext || sslContext instanceof JdkSslContext); - tlsChannel = NettyChannelBuilder.forAddress("localhost", grpcTlsTestPort.getPort()) + int tlsPort = ((GrpcConsumer) context.getRoute("grpc-tls").getConsumer()).getLocalPort(); + int jwtCorrectPort = ((GrpcConsumer) context.getRoute("grpc-jwt-correct").getConsumer()).getLocalPort(); + int jwtIncorrectPort = ((GrpcConsumer) context.getRoute("grpc-jwt-incorrect").getConsumer()).getLocalPort(); + + tlsChannel = NettyChannelBuilder.forAddress("localhost", tlsPort) .sslContext(sslContext) .build(); - jwtCorrectChannel - = NettyChannelBuilder.forAddress("localhost", grpcJwtCorrectTestPort.getPort()).usePlaintext().build(); - jwtIncorrectChannel - = NettyChannelBuilder.forAddress("localhost", grpcJwtIncorrectTestPort.getPort()).usePlaintext().build(); + jwtCorrectChannel = NettyChannelBuilder.forAddress("localhost", jwtCorrectPort).usePlaintext().build(); + jwtIncorrectChannel = NettyChannelBuilder.forAddress("localhost", jwtIncorrectPort).usePlaintext().build(); tlsAsyncStub = PingPongGrpc.newStub(tlsChannel); jwtCorrectAsyncStub @@ -193,22 +187,25 @@ protected RouteBuilder createRouteBuilder() { @Override public void configure() { - from("grpc://localhost:" + grpcTlsTestPort.getPort() - + "/org.apache.camel.component.grpc.PingPong?consumerStrategy=PROPAGATION&" + from("grpc://localhost:0" + + "/org.apache.camel.component.grpc.PingPong?consumerStrategy=PROPAGATION&" + "negotiationType=TLS&keyCertChainResource=file:src/test/resources/certs/server.pem&" + "keyResource=file:src/test/resources/certs/server.key&trustCertCollectionResource=file:src/test/resources/certs/ca.pem") + .routeId("grpc-tls") .to("mock:tls-enable") .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse"); - from("grpc://localhost:" + grpcJwtCorrectTestPort.getPort() - + "/org.apache.camel.component.grpc.PingPong?consumerStrategy=PROPAGATION&" + from("grpc://localhost:0" + + "/org.apache.camel.component.grpc.PingPong?consumerStrategy=PROPAGATION&" + "authenticationType=JWT&jwtSecret=" + GRPC_JWT_CORRECT_SECRET) + .routeId("grpc-jwt-correct") .to("mock:jwt-correct-secret") .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse"); - from("grpc://localhost:" + grpcJwtIncorrectTestPort.getPort() - + "/org.apache.camel.component.grpc.PingPong?consumerStrategy=PROPAGATION&" + from("grpc://localhost:0" + + "/org.apache.camel.component.grpc.PingPong?consumerStrategy=PROPAGATION&" + "authenticationType=JWT&jwtSecret=" + GRPC_JWT_CORRECT_SECRET) + .routeId("grpc-jwt-incorrect") .to("mock:jwt-incorrect-secret") .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse"); } diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerServerInterceptorTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerServerInterceptorTest.java index 94f7fcd5a2e97..312bda9fb3abc 100644 --- a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerServerInterceptorTest.java +++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerServerInterceptorTest.java @@ -20,14 +20,11 @@ import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; -import io.grpc.Server; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit6.CamelTestSupport; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,15 +40,10 @@ public class GrpcConsumerServerInterceptorTest extends CamelTestSupport { private static final Logger LOG = LoggerFactory.getLogger(GrpcConsumerServerInterceptorTest.class); - @RegisterExtension - static AvailablePortFinder.Port grpcRequestInterceptTestPort = AvailablePortFinder.find(); - @RegisterExtension - static AvailablePortFinder.Port grpcRequestNoInterceptTestPort = AvailablePortFinder.find(); private static final int GRPC_TEST_PING_ID = 1; private static final String GRPC_TEST_PING_VALUE = "PING"; private static final String GRPC_TEST_PONG_VALUE = "PONG"; - private static Server grpcServer; private final GrpcMockServerInterceptor mockServerInterceptor = mock(GrpcMockServerInterceptor.class); private final GrpcMockServerInterceptor mockServerInterceptor2 = mock(GrpcMockServerInterceptor.class); @@ -62,11 +54,12 @@ public class GrpcConsumerServerInterceptorTest extends CamelTestSupport { @BeforeEach public void startGrpcChannels() { - interceptRequestChannel - = ManagedChannelBuilder.forAddress("localhost", grpcRequestInterceptTestPort.getPort()).usePlaintext().build(); - nointerceptRequestChannel - = ManagedChannelBuilder.forAddress("localhost", grpcRequestNoInterceptTestPort.getPort()).usePlaintext() - .build(); + interceptRequestChannel = ManagedChannelBuilder + .forAddress("localhost", ((GrpcConsumer) context.getRoute("grpc-intercept").getConsumer()).getLocalPort()) + .usePlaintext().build(); + nointerceptRequestChannel = ManagedChannelBuilder + .forAddress("localhost", ((GrpcConsumer) context.getRoute("grpc-no-intercept").getConsumer()).getLocalPort()) + .usePlaintext().build(); interceptBlockingStub = PingPongGrpc.newBlockingStub(interceptRequestChannel); nointerceptBlockingStub = PingPongGrpc.newBlockingStub(nointerceptRequestChannel); } @@ -116,13 +109,12 @@ protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() { - from("grpc://localhost:" + grpcRequestInterceptTestPort.getPort() - + "/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION") + from("grpc://localhost:0/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION") + .routeId("grpc-intercept") .bean(new GrpcMessageBuilder(), "buildPongResponse"); - from("grpc://localhost:" + grpcRequestNoInterceptTestPort.getPort() - + "/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION" - + "&autoDiscoverServerInterceptors=false") + from("grpc://localhost:0/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION&autoDiscoverServerInterceptors=false") + .routeId("grpc-no-intercept") .bean(new GrpcMessageBuilder(), "buildPongResponse"); } }; diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerClientInterceptorTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerClientInterceptorTest.java index af77a8aec4018..97c15b19983ff 100644 --- a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerClientInterceptorTest.java +++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerClientInterceptorTest.java @@ -20,12 +20,10 @@ import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit6.CamelTestSupport; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,8 +39,6 @@ public class GrpcProducerClientInterceptorTest extends CamelTestSupport { private static final Logger LOG = LoggerFactory.getLogger(GrpcProducerClientInterceptorTest.class); - @RegisterExtension - static AvailablePortFinder.Port grpcTestPort = AvailablePortFinder.find(); private static final int GRPC_TEST_PING_ID = 1; private static final String GRPC_TEST_PING_VALUE = "PING"; private static final String GRPC_TEST_PONG_VALUE = "PONG"; @@ -53,10 +49,8 @@ public class GrpcProducerClientInterceptorTest extends CamelTestSupport { @BeforeAll public static void startGrpcServer() throws Exception { - grpcServer - = ServerBuilder.forPort(grpcTestPort.getPort()).addService(new GrpcProducerClientInterceptorTest.PingPongImpl()) - .build().start(); - LOG.info("gRPC server started on port {}", grpcTestPort.getPort()); + grpcServer = ServerBuilder.forPort(0).addService(new GrpcProducerClientInterceptorTest.PingPongImpl()).build().start(); + LOG.info("gRPC server started on port {}", grpcServer.getPort()); } @AfterAll @@ -83,7 +77,7 @@ public void testClientInterceptors() { @Test public void testNoAutoDiscover() throws Exception { GrpcComponent component = context.getComponent("grpc", GrpcComponent.class); - GrpcEndpoint endpoint = (GrpcEndpoint) component.createEndpoint("grpc://localhost:" + grpcTestPort.getPort() + GrpcEndpoint endpoint = (GrpcEndpoint) component.createEndpoint("grpc://localhost:" + grpcServer.getPort() + "/org.apache.camel.component.grpc" + ".PingPong?method=pingSyncSync&autoDiscoverClientInterceptors=false"); @@ -101,7 +95,7 @@ protected RouteBuilder createRouteBuilder() { @Override public void configure() { from("direct:grpc-interceptor") - .to("grpc://localhost:" + grpcTestPort.getPort() + .to("grpc://localhost:" + grpcServer.getPort() + "/org.apache.camel.component.grpc" + ".PingPong?method=pingSyncSync"); } diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerSecurityTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerSecurityTest.java index 7eaebc51d8e2c..517e88f309905 100644 --- a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerSecurityTest.java +++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerSecurityTest.java @@ -30,13 +30,11 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.grpc.auth.jwt.JwtAlgorithm; import org.apache.camel.component.grpc.auth.jwt.JwtServerInterceptor; -import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit6.CamelTestSupport; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,10 +46,6 @@ public class GrpcProducerSecurityTest extends CamelTestSupport { private static final Logger LOG = LoggerFactory.getLogger(GrpcProducerSecurityTest.class); - @RegisterExtension - static AvailablePortFinder.Port grpcTlsTestPort = AvailablePortFinder.find(); - @RegisterExtension - static AvailablePortFinder.Port grpcJwtTestPort = AvailablePortFinder.find(); private static final int GRPC_TEST_PING_ID = 1; private static final int GRPC_TEST_PONG_ID01 = 1; private static final int GRPC_TEST_PONG_ID02 = 2; @@ -74,18 +68,18 @@ public static void startGrpcServer() throws Exception { Assumptions.assumeTrue(sslContext instanceof OpenSslClientContext || sslContext instanceof JdkSslContext); - grpcServerWithTLS = NettyServerBuilder.forPort(grpcTlsTestPort.getPort()) + grpcServerWithTLS = NettyServerBuilder.forPort(0) .sslContext(sslContext) .addService(new PingPongImpl()).build().start(); - grpcServerWithJWT = NettyServerBuilder.forPort(grpcJwtTestPort.getPort()) + grpcServerWithJWT = NettyServerBuilder.forPort(0) .addService(new PingPongImpl()) .intercept(new JwtServerInterceptor(JwtAlgorithm.HMAC256, GRPC_JWT_CORRECT_SECRET, null, null)) .build() .start(); - LOG.info("gRPC server with TLS started on port {}", grpcTlsTestPort.getPort()); - LOG.info("gRPC server with the JWT auth started on port {}", grpcJwtTestPort.getPort()); + LOG.info("gRPC server with TLS started on port {}", grpcServerWithTLS.getPort()); + LOG.info("gRPC server with the JWT auth started on port {}", grpcServerWithJWT.getPort()); } @AfterAll @@ -153,18 +147,18 @@ protected RouteBuilder createRouteBuilder() { @Override public void configure() { from("direct:grpc-tls") - .to("grpc://localhost:" + grpcTlsTestPort.getPort() + .to("grpc://localhost:" + grpcServerWithTLS.getPort() + "/org.apache.camel.component.grpc.PingPong?method=pingSyncSync&synchronous=true&" + "negotiationType=TLS&keyCertChainResource=file:src/test/resources/certs/client.pem&" + "keyResource=file:src/test/resources/certs/client.key&trustCertCollectionResource=file:src/test/resources/certs/ca.pem"); from("direct:grpc-correct-jwt") - .to("grpc://localhost:" + grpcJwtTestPort.getPort() + .to("grpc://localhost:" + grpcServerWithJWT.getPort() + "/org.apache.camel.component.grpc.PingPong?method=pingSyncSync&synchronous=true&" + "authenticationType=JWT&jwtSecret=" + GRPC_JWT_CORRECT_SECRET); from("direct:grpc-incorrect-jwt") - .to("grpc://localhost:" + grpcJwtTestPort.getPort() + .to("grpc://localhost:" + grpcServerWithJWT.getPort() + "/org.apache.camel.component.grpc.PingPong?method=pingSyncSync&synchronous=true&" + "authenticationType=JWT&jwtSecret=" + GRPC_JWT_INCORRECT_SECRET); } diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerStreamingTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerStreamingTest.java index 24884ff1c77f1..bf838447c1d09 100644 --- a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerStreamingTest.java +++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerStreamingTest.java @@ -26,13 +26,11 @@ import org.apache.camel.RuntimeCamelException; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit6.CamelTestSupport; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledIfSystemProperty; -import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,17 +43,14 @@ public class GrpcProducerStreamingTest extends CamelTestSupport { private static final Logger LOG = LoggerFactory.getLogger(GrpcProducerStreamingTest.class); - @RegisterExtension - static AvailablePortFinder.Port grpcTestPort = AvailablePortFinder.find(); - private static Server grpcServer; private static PingPongImpl pingPongServer; @BeforeEach public void startGrpcServer() throws Exception { pingPongServer = new PingPongImpl(); - grpcServer = ServerBuilder.forPort(grpcTestPort.getPort()).addService(pingPongServer).build().start(); - LOG.info("gRPC server started on port {}", grpcTestPort.getPort()); + grpcServer = ServerBuilder.forPort(0).addService(pingPongServer).build().start(); + LOG.info("gRPC server started on port {}", grpcServer.getPort()); } @AfterEach @@ -125,7 +120,7 @@ protected RouteBuilder createRouteBuilder() { @Override public void configure() { from("direct:grpc-stream-async-async-route") - .to("grpc://localhost:" + grpcTestPort.getPort() + .to("grpc://localhost:" + grpcServer.getPort() + "/org.apache.camel.component.grpc.PingPong?producerStrategy=STREAMING&streamRepliesTo=direct:grpc-replies&method=pingAsyncAsync"); from("direct:grpc-replies") diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerSyncTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerSyncTest.java index 1525b119ed74b..c283b906bf2af 100644 --- a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerSyncTest.java +++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProducerSyncTest.java @@ -22,13 +22,11 @@ import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit6.CamelTestSupport; import org.apache.camel.util.StopWatch; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,8 +38,6 @@ public class GrpcProducerSyncTest extends CamelTestSupport { private static final Logger LOG = LoggerFactory.getLogger(GrpcProducerSyncTest.class); - @RegisterExtension - static AvailablePortFinder.Port grpcTestPort = AvailablePortFinder.find(); private static final int GRPC_TEST_PING_ID = 1; private static final int GRPC_TEST_PONG_ID01 = 1; private static final int GRPC_TEST_PONG_ID02 = 2; @@ -53,8 +49,8 @@ public class GrpcProducerSyncTest extends CamelTestSupport { @BeforeAll public static void startGrpcServer() throws Exception { - grpcServer = ServerBuilder.forPort(grpcTestPort.getPort()).addService(new PingPongImpl()).build().start(); - LOG.info("gRPC server started on port {}", grpcTestPort.getPort()); + grpcServer = ServerBuilder.forPort(0).addService(new PingPongImpl()).build().start(); + LOG.info("gRPC server started on port {}", grpcServer.getPort()); } @AfterAll @@ -120,13 +116,13 @@ protected RouteBuilder createRouteBuilder() { @Override public void configure() { from("direct:grpc-sync-sync") - .to("grpc://localhost:" + grpcTestPort.getPort() + .to("grpc://localhost:" + grpcServer.getPort() + "/org.apache.camel.component.grpc.PingPong?method=pingSyncSync&synchronous=true"); from("direct:grpc-sync-proto-method-name") - .to("grpc://localhost:" + grpcTestPort.getPort() + .to("grpc://localhost:" + grpcServer.getPort() + "/org.apache.camel.component.grpc.PingPong?method=PingSyncSync&synchronous=true"); from("direct:grpc-sync-async") - .to("grpc://localhost:" + grpcTestPort.getPort() + .to("grpc://localhost:" + grpcServer.getPort() + "/org.apache.camel.component.grpc.PingPong?method=pingSyncAsync&synchronous=true"); } }; diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProxyAsyncAsyncTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProxyAsyncAsyncTest.java index 2cf98a824d12f..0d06a75533651 100644 --- a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProxyAsyncAsyncTest.java +++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProxyAsyncAsyncTest.java @@ -29,10 +29,8 @@ import io.grpc.stub.StreamObserver; import org.apache.camel.RoutesBuilder; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit6.CamelTestSupport; import org.junit.jupiter.api.*; -import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,11 +40,6 @@ public class GrpcProxyAsyncAsyncTest extends CamelTestSupport { private static final Logger LOG = LoggerFactory.getLogger(GrpcProxyAsyncAsyncTest.class); - @RegisterExtension - static AvailablePortFinder.Port grpcStubPort = AvailablePortFinder.find(); - @RegisterExtension - static AvailablePortFinder.Port grpcRoutePort = AvailablePortFinder.find(); - private static Server grpcServer; private ManagedChannel channel; private PingPongGrpc.PingPongStub stub; @@ -54,8 +47,8 @@ public class GrpcProxyAsyncAsyncTest extends CamelTestSupport { @BeforeAll public static void beforeAll() throws Exception { - grpcServer = ServerBuilder.forPort(grpcStubPort.getPort()).addService(new PingPongImpl()).build().start(); - LOG.info("gRPC server started on port {}", grpcStubPort.getPort()); + grpcServer = ServerBuilder.forPort(0).addService(new PingPongImpl()).build().start(); + LOG.info("gRPC server started on port {}", grpcServer.getPort()); } @AfterAll @@ -68,10 +61,14 @@ public static void afterAll() { @BeforeEach public void beforeEach() { - channel = ManagedChannelBuilder.forAddress("localhost", grpcRoutePort.getPort()).usePlaintext().build(); + channel = ManagedChannelBuilder.forAddress("localhost", getRoutePort()).usePlaintext().build(); stub = PingPongGrpc.newStub(channel); } + private int getRoutePort() { + return ((GrpcConsumer) context.getRoute("grpc-consumer").getConsumer()).getLocalPort(); + } + @AfterEach public void afterEach() throws Exception { channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); @@ -120,13 +117,13 @@ protected RoutesBuilder createRouteBuilder() throws Exception { @Override public void configure() throws Exception { onException(Exception.class).process(e -> routeHasException.set(true)); - from("grpc://localhost:" + grpcRoutePort.getPort() + + from("grpc://localhost:0" + "/org.apache.camel.component.grpc.PingPong" + "?routeControlledStreamObserver=true" + "&consumerStrategy=DELEGATION" + "&forwardOnError=true" + - "&forwardOnCompleted=true") - .toD("grpc://localhost:" + grpcStubPort.getPort() + + "&forwardOnCompleted=true").routeId("grpc-consumer") + .toD("grpc://localhost:" + grpcServer.getPort() + "/org.apache.camel.component.grpc.PingPong" + "?method=${header.CamelGrpcMethodName}" + "&producerStrategy=STREAMING" + diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProxyAsyncSyncTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProxyAsyncSyncTest.java index 067a079e9c9e7..dc57aa59cca82 100644 --- a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProxyAsyncSyncTest.java +++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProxyAsyncSyncTest.java @@ -29,10 +29,8 @@ import io.grpc.stub.StreamObserver; import org.apache.camel.RoutesBuilder; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit6.CamelTestSupport; import org.junit.jupiter.api.*; -import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,11 +40,6 @@ public class GrpcProxyAsyncSyncTest extends CamelTestSupport { private static final Logger LOG = LoggerFactory.getLogger(GrpcProxyAsyncSyncTest.class); - @RegisterExtension - static AvailablePortFinder.Port grpcStubPort = AvailablePortFinder.find(); - @RegisterExtension - static AvailablePortFinder.Port grpcRoutePort = AvailablePortFinder.find(); - private static Server grpcServer; private ManagedChannel channel; private PingPongGrpc.PingPongStub stub; @@ -54,8 +47,8 @@ public class GrpcProxyAsyncSyncTest extends CamelTestSupport { @BeforeAll public static void beforeAll() throws Exception { - grpcServer = ServerBuilder.forPort(grpcStubPort.getPort()).addService(new PingPongImpl()).build().start(); - LOG.info("gRPC server started on port {}", grpcStubPort.getPort()); + grpcServer = ServerBuilder.forPort(0).addService(new PingPongImpl()).build().start(); + LOG.info("gRPC server started on port {}", grpcServer.getPort()); } @AfterAll @@ -68,10 +61,14 @@ public static void afterAll() { @BeforeEach public void beforeEach() { - channel = ManagedChannelBuilder.forAddress("localhost", grpcRoutePort.getPort()).usePlaintext().build(); + channel = ManagedChannelBuilder.forAddress("localhost", getRoutePort()).usePlaintext().build(); stub = PingPongGrpc.newStub(channel); } + private int getRoutePort() { + return ((GrpcConsumer) context.getRoute("grpc-consumer").getConsumer()).getLocalPort(); + } + @AfterEach public void afterEach() throws Exception { channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); @@ -118,13 +115,13 @@ protected RoutesBuilder createRouteBuilder() throws Exception { @Override public void configure() throws Exception { onException(Exception.class).process(e -> routeHasException.set(true)); - from("grpc://localhost:" + grpcRoutePort.getPort() + + from("grpc://localhost:0" + "/org.apache.camel.component.grpc.PingPong" + "?routeControlledStreamObserver=true" + "&consumerStrategy=DELEGATION" + "&forwardOnError=true" + - "&forwardOnCompleted=true") - .toD("grpc://localhost:" + grpcStubPort.getPort() + + "&forwardOnCompleted=true").routeId("grpc-consumer") + .toD("grpc://localhost:" + grpcServer.getPort() + "/org.apache.camel.component.grpc.PingPong" + "?method=${header.CamelGrpcMethodName}" + "&producerStrategy=STREAMING" + diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProxySyncAsyncTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProxySyncAsyncTest.java index 5058a6d5b1048..43294717fdb8b 100644 --- a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProxySyncAsyncTest.java +++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProxySyncAsyncTest.java @@ -29,10 +29,8 @@ import io.grpc.stub.StreamObserver; import org.apache.camel.RoutesBuilder; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit6.CamelTestSupport; import org.junit.jupiter.api.*; -import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,11 +40,6 @@ public class GrpcProxySyncAsyncTest extends CamelTestSupport { private static final Logger LOG = LoggerFactory.getLogger(GrpcProxySyncAsyncTest.class); - @RegisterExtension - static AvailablePortFinder.Port grpcStubPort = AvailablePortFinder.find(); - @RegisterExtension - static AvailablePortFinder.Port grpcRoutePort = AvailablePortFinder.find(); - private static Server grpcServer; private ManagedChannel channel; private PingPongGrpc.PingPongStub stub; @@ -54,8 +47,8 @@ public class GrpcProxySyncAsyncTest extends CamelTestSupport { @BeforeAll public static void beforeAll() throws Exception { - grpcServer = ServerBuilder.forPort(grpcStubPort.getPort()).addService(new PingPongImpl()).build().start(); - LOG.info("gRPC server started on port {}", grpcStubPort.getPort()); + grpcServer = ServerBuilder.forPort(0).addService(new PingPongImpl()).build().start(); + LOG.info("gRPC server started on port {}", grpcServer.getPort()); } @AfterAll @@ -68,10 +61,14 @@ public static void afterAll() { @BeforeEach public void beforeEach() { - channel = ManagedChannelBuilder.forAddress("localhost", grpcRoutePort.getPort()).usePlaintext().build(); + channel = ManagedChannelBuilder.forAddress("localhost", getRoutePort()).usePlaintext().build(); stub = PingPongGrpc.newStub(channel); } + private int getRoutePort() { + return ((GrpcConsumer) context.getRoute("grpc-consumer").getConsumer()).getLocalPort(); + } + @AfterEach public void afterEach() throws Exception { channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); @@ -116,10 +113,10 @@ protected RoutesBuilder createRouteBuilder() throws Exception { @Override public void configure() throws Exception { onException(Exception.class).process(e -> routeHasException.set(true)); - from("grpc://localhost:" + grpcRoutePort.getPort() + + from("grpc://localhost:0" + "/org.apache.camel.component.grpc.PingPong" + - "?routeControlledStreamObserver=true") - .toD("grpc://localhost:" + grpcStubPort.getPort() + + "?routeControlledStreamObserver=true").routeId("grpc-consumer") + .toD("grpc://localhost:" + grpcServer.getPort() + "/org.apache.camel.component.grpc.PingPong" + "?method=${header.CamelGrpcMethodName}" + "&streamRepliesTo=direct:next" + diff --git a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/RouteControlledStreamObserverTest.java b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/RouteControlledStreamObserverTest.java index e8ddea17d883c..68bcb4e5c7ca3 100644 --- a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/RouteControlledStreamObserverTest.java +++ b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/RouteControlledStreamObserverTest.java @@ -29,12 +29,10 @@ import org.apache.camel.Message; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit6.CamelTestSupport; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,10 +42,6 @@ public class RouteControlledStreamObserverTest extends CamelTestSupport { private static final Logger LOG = LoggerFactory.getLogger(GrpcConsumerAggregationTest.class); - @RegisterExtension - static AvailablePortFinder.Port grpcSyncRequestTestPort = AvailablePortFinder.find(); - @RegisterExtension - static AvailablePortFinder.Port grpcAsyncRequestTestPort = AvailablePortFinder.find(); private static final int GRPC_TEST_PING_ID = 1; private static final String GRPC_TEST_PING_VALUE = "PING"; private static final String GRPC_TEST_PONG_VALUE = "PONG"; @@ -60,10 +54,12 @@ public class RouteControlledStreamObserverTest extends CamelTestSupport { @BeforeEach public void startGrpcChannels() { - syncRequestChannel - = ManagedChannelBuilder.forAddress("localhost", grpcSyncRequestTestPort.getPort()).usePlaintext().build(); - asyncRequestChannel - = ManagedChannelBuilder.forAddress("localhost", grpcAsyncRequestTestPort.getPort()).usePlaintext().build(); + syncRequestChannel = ManagedChannelBuilder + .forAddress("localhost", ((GrpcConsumer) context.getRoute("grpc-sync").getConsumer()).getLocalPort()) + .usePlaintext().build(); + asyncRequestChannel = ManagedChannelBuilder + .forAddress("localhost", ((GrpcConsumer) context.getRoute("grpc-async").getConsumer()).getLocalPort()) + .usePlaintext().build(); blockingStub = PingPongGrpc.newBlockingStub(syncRequestChannel); nonBlockingStub = PingPongGrpc.newStub(syncRequestChannel); asyncNonBlockingStub = PingPongGrpc.newStub(asyncRequestChannel); @@ -189,9 +185,8 @@ public void unsupportedEndpointConfigurationFailureTest() throws Exception { camelContext.addRoutes(new RouteBuilder() { @Override public void configure() { - from("grpc://localhost:" + grpcSyncRequestTestPort.getPort() - + "/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION" + - "&routeControlledStreamObserver=true").to("log:foo"); + from("grpc://localhost:0/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION&routeControlledStreamObserver=true") + .to("log:foo"); } }); assertThrows(IllegalArgumentException.class, camelContext::start); @@ -216,12 +211,12 @@ private void process(Exchange exchange) { @Override public void configure() { - from("grpc://localhost:" + grpcSyncRequestTestPort.getPort() - + "/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=PROPAGATION&routeControlledStreamObserver=true") + from("grpc://localhost:0/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=PROPAGATION&routeControlledStreamObserver=true") + .routeId("grpc-sync") .process(this::process); - from("grpc://localhost:" + grpcAsyncRequestTestPort.getPort() - + "/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION") + from("grpc://localhost:0/org.apache.camel.component.grpc.PingPong?synchronous=true&consumerStrategy=AGGREGATION") + .routeId("grpc-async") .bean(new GrpcMessageBuilder(), "buildAsyncPongResponse"); } }; diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java index 638d8537f6d46..9d88c6a210710 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java @@ -236,6 +236,14 @@ public void startAcceptThread(ServerSocket serverSocket) { acceptThread.start(); } + /** + * Returns the local port the server socket is bound to, or -1 if not yet bound. Useful when the consumer is + * configured with port 0 to let the OS assign a port atomically. + */ + public int getLocalPort() { + return acceptThread != null ? acceptThread.getLocalPort() : -1; + } + public void startConsumer(Socket clientSocket, MllpSocketBuffer mllpBuffer) { TcpSocketConsumerRunnable client = new TcpSocketConsumerRunnable( this, clientSocket, mllpBuffer, hl7Util, logPhi); diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerAcceptThread.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerAcceptThread.java index a61dfbb2a73d1..d001632436335 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerAcceptThread.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerAcceptThread.java @@ -45,6 +45,10 @@ public TcpServerAcceptThread(MllpTcpServerConsumer consumer, ServerSocket server this.serverSocket = serverSocket; } + public int getLocalPort() { + return serverSocket != null && serverSocket.isBound() ? serverSocket.getLocalPort() : -1; + } + /** * Derive a thread name from the class name, the component URI and the connection information. *

diff --git a/components/camel-mllp/src/test/java/org/apache/camel/MllpTcpServerConsumerLenientBindTest.java b/components/camel-mllp/src/test/java/org/apache/camel/MllpTcpServerConsumerLenientBindTest.java index c6da50db22ea2..e2f02970d861a 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/MllpTcpServerConsumerLenientBindTest.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/MllpTcpServerConsumerLenientBindTest.java @@ -22,7 +22,6 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit.rule.mllp.MllpClientResource; import org.apache.camel.test.junit.rule.mllp.MllpJUnitResourceTimeoutException; import org.apache.camel.test.junit6.CamelTestSupport; @@ -40,9 +39,6 @@ public class MllpTcpServerConsumerLenientBindTest extends CamelTestSupport { static final int RECEIVE_TIMEOUT = 1000; static final int READ_TIMEOUT = 500; - @RegisterExtension - AvailablePortFinder.Port mllpClientPort = AvailablePortFinder.find(); - @RegisterExtension public MllpClientResource mllpClient = new MllpClientResource(); @@ -54,9 +50,10 @@ public class MllpTcpServerConsumerLenientBindTest extends CamelTestSupport { @Override protected void doPreSetup() throws Exception { mllpClient.setMllpHost("localhost"); - mllpClient.setMllpPort(mllpClientPort.getPort()); - portBlocker = new ServerSocket(mllpClient.getMllpPort()); + // Bind to port 0 to get an OS-assigned port atomically, then close and reuse that port + portBlocker = new ServerSocket(0); + mllpClient.setMllpPort(portBlocker.getLocalPort()); assertTrue(portBlocker.isBound()); } diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/LogPhiTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/LogPhiTest.java index 2c198bd5a22b8..49e64c431b526 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/LogPhiTest.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/LogPhiTest.java @@ -25,7 +25,6 @@ import org.apache.camel.RoutesBuilder; import org.apache.camel.ServiceStatus; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit.rule.mllp.MllpServerResource; import org.apache.camel.test.junit6.CamelTestSupport; import org.apache.camel.test.mllp.Hl7TestMessageGenerator; @@ -41,9 +40,6 @@ public class LogPhiTest extends CamelTestSupport { static final int SERVER_ACKNOWLEDGEMENT_DELAY = 10000; - @RegisterExtension - AvailablePortFinder.Port mllpServerPort = AvailablePortFinder.find(); - @RegisterExtension public MllpServerResource mllpServer = new MllpServerResource(); @@ -59,7 +55,7 @@ public class LogPhiTest extends CamelTestSupport { @Override protected void doPreSetup() throws Exception { mllpServer.setListenHost("localhost"); - mllpServer.setListenPort(mllpServerPort.getPort()); + mllpServer.setListenPort(0); mllpServer.setDelayDuringAcknowledgement(SERVER_ACKNOWLEDGEMENT_DELAY); mllpServer.startup(); assertTrue(mllpServer.isActive()); diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpIdleTimeoutStrategyTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpIdleTimeoutStrategyTest.java index 2a7018c7129e0..7c0885c3be5be 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpIdleTimeoutStrategyTest.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpIdleTimeoutStrategyTest.java @@ -29,7 +29,6 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mllp.internal.MllpSocketBuffer; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit.rule.mllp.MllpServerResource; import org.apache.camel.test.junit6.CamelTestSupport; import org.apache.camel.test.mllp.Hl7TestMessageGenerator; @@ -50,10 +49,7 @@ public class MllpIdleTimeoutStrategyTest extends CamelTestSupport { static final int IDLE_TIMEOUT = 500; @RegisterExtension - AvailablePortFinder.Port mllpServerPort = AvailablePortFinder.find(); - - @RegisterExtension - public MllpServerResource mllpServer = new MllpServerResource("localhost", mllpServerPort.getPort()); + public MllpServerResource mllpServer = new MllpServerResource("localhost", 0); @EndpointInject("direct://sourcedefault") ProducerTemplate defaultStrategySource; diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerConnectionErrorTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerConnectionErrorTest.java index 42d03daae7485..ef2cf6a23776c 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerConnectionErrorTest.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerConnectionErrorTest.java @@ -27,7 +27,6 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit.rule.mllp.MllpServerResource; import org.apache.camel.test.junit6.CamelTestSupport; import org.apache.camel.test.mllp.Hl7TestMessageGenerator; @@ -42,10 +41,7 @@ public class MllpTcpClientProducerConnectionErrorTest extends CamelTestSupport { @RegisterExtension - AvailablePortFinder.Port mllpServerPort = AvailablePortFinder.find(); - - @RegisterExtension - public MllpServerResource mllpServer = new MllpServerResource("localhost", mllpServerPort.getPort()); + public MllpServerResource mllpServer = new MllpServerResource("localhost", 0); @EndpointInject("direct://source") ProducerTemplate source; diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerIdleConnectionTimeoutTest.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerIdleConnectionTimeoutTest.java index a1b4309873127..99976bb5b4b90 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerIdleConnectionTimeoutTest.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/MllpTcpClientProducerIdleConnectionTimeoutTest.java @@ -26,7 +26,6 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit.rule.mllp.MllpJUnitResourceException; import org.apache.camel.test.junit.rule.mllp.MllpServerResource; import org.apache.camel.test.junit6.CamelTestSupport; @@ -50,10 +49,7 @@ public class MllpTcpClientProducerIdleConnectionTimeoutTest extends CamelTestSup Logger log = LoggerFactory.getLogger(MllpTcpClientProducerIdleConnectionTimeoutTest.class); @RegisterExtension - AvailablePortFinder.Port mllpServerPort = AvailablePortFinder.find(); - - @RegisterExtension - MllpServerResource mllpServer = new MllpServerResource("localhost", mllpServerPort.getPort()); + MllpServerResource mllpServer = new MllpServerResource("localhost", 0); @EndpointInject("direct://source") ProducerTemplate source; diff --git a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerAcknowledgementTestSupport.java b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerAcknowledgementTestSupport.java index 846a5361d0c22..a958aeb580822 100644 --- a/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerAcknowledgementTestSupport.java +++ b/components/camel-mllp/src/test/java/org/apache/camel/component/mllp/TcpServerConsumerAcknowledgementTestSupport.java @@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.CamelContext; +import org.apache.camel.Consumer; import org.apache.camel.EndpointInject; import org.apache.camel.ExchangePattern; import org.apache.camel.LoggingLevel; @@ -26,7 +27,6 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.test.junit.rule.mllp.MllpClientResource; import org.apache.camel.test.junit6.CamelTestSupport; import org.junit.jupiter.api.extension.RegisterExtension; @@ -41,9 +41,6 @@ public abstract class TcpServerConsumerAcknowledgementTestSupport extends CamelT static final String EXPECTED_ACKNOWLEDGEMENT = "MSH|^~\\&|^org^sys||APP_A|FAC_A|||ACK^A04^ACK|||2.6" + '\r' + "MSA|AA|" + '\r'; - @RegisterExtension - AvailablePortFinder.Port mllpClientPort = AvailablePortFinder.find(); - @RegisterExtension public MllpClientResource mllpClient = new MllpClientResource(); @@ -64,6 +61,9 @@ public abstract class TcpServerConsumerAcknowledgementTestSupport extends CamelT @Override protected void doPostSetup() throws Exception { + Consumer consumer = context.getRoute("mllp-test-receiver-route").getConsumer(); + mllpClient.setMllpPort(((MllpTcpServerConsumer) consumer).getLocalPort()); + result.expectedMessageCount(0); complete.expectedMessageCount(0); failure.expectedMessageCount(0); @@ -85,7 +85,6 @@ protected CamelContext createCamelContext() throws Exception { @Override protected RouteBuilder createRouteBuilder() { mllpClient.setMllpHost("localhost"); - mllpClient.setMllpPort(mllpClientPort.getPort()); return new RouteBuilder() { int connectTimeout = 500; @@ -114,7 +113,7 @@ public void configure() { .to("mock://on-failure-only"); fromF("mllp://%s:%d?bridgeErrorHandler=%b&autoAck=%b&exchangePattern=%s&connectTimeout=%d&receiveTimeout=%d", - mllpClient.getMllpHost(), mllpClient.getMllpPort(), isBridgeErrorHandler(), isAutoAck(), + mllpClient.getMllpHost(), 0, isBridgeErrorHandler(), isAutoAck(), exchangePattern(), connectTimeout, responseTimeout) .routeId(routeId) .to(result); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java index e267e31c34abb..035dad02bf021 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java @@ -18,6 +18,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.camel.AggregationStrategy; @@ -237,7 +238,7 @@ private void doTestAggregateProcessorCompletionTimeout(boolean eager) throws Exc ap.process(e2); ap.process(e3); - await().atMost(5, java.util.concurrent.TimeUnit.SECONDS) + await().atMost(5, TimeUnit.SECONDS) .untilAsserted(() -> assertEquals(1, mock.getReceivedCounter())); ap.process(e4); @@ -285,7 +286,7 @@ public void testAggregateCompletionInterval() throws Exception { ap.process(e2); ap.process(e3); - await().atMost(5, java.util.concurrent.TimeUnit.SECONDS) + await().atMost(5, TimeUnit.SECONDS) .untilAsserted(() -> assertEquals(1, mock.getReceivedCounter())); ap.process(e4);