Skip to content

Commit c6cba74

Browse files
authored
Merge branch 'release_v2.4.0' into update-dependency-ydb-proto-api
2 parents 2a5636d + 7b74739 commit c6cba74

39 files changed

+1266
-353
lines changed

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
<dependency>
3030
<groupId>io.grpc</groupId>
3131
<artifactId>grpc-netty</artifactId>
32-
<version>1.59.1</version>
32+
<version>1.68.3</version>
3333
<optional>true</optional>
3434
</dependency>
3535

core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ public enum InitMode {
7979
private Executor callExecutor = MoreExecutors.directExecutor();
8080
private AuthRpcProvider<? super GrpcAuthRpc> authProvider = NopAuthProvider.INSTANCE;
8181
private long readTimeoutMillis = 0;
82-
private long connectTimeoutMillis = 30_000;
8382
private long discoveryTimeoutMillis = 60_000;
8483
private boolean useDefaultGrpcResolver = false;
8584
private GrpcCompression compression = GrpcCompression.NO_COMPRESSION;
@@ -151,8 +150,9 @@ public long getReadTimeoutMillis() {
151150
return readTimeoutMillis;
152151
}
153152

153+
@Deprecated
154154
public long getConnectTimeoutMillis() {
155-
return connectTimeoutMillis;
155+
return 10_000;
156156
}
157157

158158
public long getDiscoveryTimeoutMillis() {
@@ -315,15 +315,13 @@ public GrpcTransportBuilder withReadTimeout(long timeout, TimeUnit unit) {
315315
return this;
316316
}
317317

318+
@Deprecated
318319
public GrpcTransportBuilder withConnectTimeout(Duration timeout) {
319-
this.connectTimeoutMillis = timeout.toMillis();
320-
Preconditions.checkArgument(connectTimeoutMillis > 0, "connectTimeoutMillis must be greater than 0");
321320
return this;
322321
}
323322

323+
@Deprecated
324324
public GrpcTransportBuilder withConnectTimeout(long timeout, TimeUnit unit) {
325-
this.connectTimeoutMillis = unit.toMillis(timeout);
326-
Preconditions.checkArgument(connectTimeoutMillis > 0, "connectTimeoutMillis must be greater than 0");
327325
return this;
328326
}
329327

core/src/main/java/tech/ydb/core/impl/YdbTransportImpl.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,8 @@ public YdbTransportImpl(GrpcTransportBuilder builder) {
5353

5454
this.channelFactory = builder.getManagedChannelFactory();
5555
this.scheduler = builder.getSchedulerFactory().get();
56-
this.callOptions = new AuthCallOptions(
57-
scheduler,
58-
Collections.singletonList(discoveryEndpoint),
59-
channelFactory,
60-
builder
61-
);
56+
this.callOptions = new AuthCallOptions(scheduler, Collections.singletonList(discoveryEndpoint),
57+
channelFactory, builder);
6258
this.channelPool = new GrpcChannelPool(channelFactory, scheduler);
6359
this.endpointPool = new EndpointPool(balancingSettings);
6460
this.discovery = new YdbDiscovery(new DiscoveryHandler(), scheduler, database, discoveryTimeout);
@@ -178,8 +174,14 @@ protected GrpcChannel getChannel(GrpcRequestSettings settings) {
178174
}
179175

180176
@Override
181-
protected void pessimizeEndpoint(EndpointRecord endpoint, String reason) {
182-
endpointPool.pessimizeEndpoint(endpoint, reason);
177+
protected void updateChannelStatus(GrpcChannel channel, io.grpc.Status status) {
178+
// Usally CANCELLED is received when ClientCall is canceled on client side
179+
if (!status.isOk() && status.getCode() != io.grpc.Status.Code.CANCELLED &&
180+
status.getCode() != io.grpc.Status.Code.DEADLINE_EXCEEDED &&
181+
status.getCode() != io.grpc.Status.Code.RESOURCE_EXHAUSTED
182+
) {
183+
endpointPool.pessimizeEndpoint(channel.getEndpoint());
184+
}
183185
}
184186

185187
private class DiscoveryHandler implements YdbDiscovery.Handler {
Lines changed: 13 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
package tech.ydb.core.impl.pool;
22

3-
import java.util.concurrent.CompletableFuture;
4-
import java.util.concurrent.ExecutionException;
53
import java.util.concurrent.TimeUnit;
6-
import java.util.concurrent.TimeoutException;
74

85
import io.grpc.Channel;
96
import io.grpc.ConnectivityState;
@@ -14,25 +11,21 @@
1411
/**
1512
* @author Nikolay Perfilov
1613
*/
17-
public class GrpcChannel {
14+
public final class GrpcChannel implements Runnable {
15+
1816
/* Channel shutdown waits for finish of active grpc calls, so there must be enough time to complete them all */
1917
private static final long WAIT_FOR_CLOSING_MS = 5000;
2018
private static final Logger logger = LoggerFactory.getLogger(GrpcChannel.class);
2119

2220
private final EndpointRecord endpoint;
2321
private final ManagedChannel channel;
24-
private final long connectTimeoutMs;
25-
private final ReadyWatcher readyWatcher;
2622

2723
public GrpcChannel(EndpointRecord endpoint, ManagedChannelFactory factory) {
2824
try {
2925
logger.debug("Creating grpc channel with {}", endpoint);
3026
this.endpoint = endpoint;
31-
this.channel = factory.newManagedChannel(endpoint.getHost(), endpoint.getPort(),
32-
endpoint.getAuthority());
33-
this.connectTimeoutMs = factory.getConnectTimeoutMs();
34-
this.readyWatcher = new ReadyWatcher();
35-
this.readyWatcher.checkState();
27+
this.channel = factory.newManagedChannel(endpoint.getHost(), endpoint.getPort(), endpoint.getAuthority());
28+
checkState();
3629
} catch (Throwable th) {
3730
throw new RuntimeException("cannot create channel", th);
3831
}
@@ -43,7 +36,7 @@ public EndpointRecord getEndpoint() {
4336
}
4437

4538
public Channel getReadyChannel() {
46-
return readyWatcher.getReadyChannel();
39+
return channel;
4740
}
4841

4942
public boolean isShutdown() {
@@ -72,50 +65,14 @@ public boolean shutdown() {
7265
}
7366
}
7467

75-
private class ReadyWatcher implements Runnable {
76-
private final CompletableFuture<ManagedChannel> future = new CompletableFuture<>();
77-
78-
public Channel getReadyChannel() {
79-
try {
80-
return future.get(connectTimeoutMs, TimeUnit.MILLISECONDS);
81-
} catch (InterruptedException ex) {
82-
logger.warn("Grpc channel {} ready waiting is interrupted: ", endpoint, ex);
83-
Thread.currentThread().interrupt();
84-
} catch (ExecutionException ex) {
85-
logger.warn("Grpc channel {} connecting problem: ", endpoint, ex);
86-
throw new RuntimeException("Channel " + endpoint + " connecting problem", ex);
87-
} catch (TimeoutException ex) {
88-
logger.warn("Grpc channel {} connect timeout exceeded", endpoint);
89-
throw new RuntimeException("Channel " + endpoint + " connecting timeout");
90-
}
91-
return null;
92-
}
93-
94-
public void checkState() {
95-
ConnectivityState state = channel.getState(true);
96-
logger.debug("Grpc channel {} new state: {}", endpoint, state);
97-
switch (state) {
98-
case READY:
99-
future.complete(channel);
100-
// keep tracking channel state
101-
channel.notifyWhenStateChanged(state, this);
102-
break;
103-
case SHUTDOWN:
104-
future.completeExceptionally(new IllegalStateException("Grpc channel already closed"));
105-
break;
106-
case TRANSIENT_FAILURE:
107-
case CONNECTING:
108-
case IDLE:
109-
default:
110-
// keep tracking channel state
111-
channel.notifyWhenStateChanged(state, this);
112-
break;
113-
}
114-
}
68+
private void checkState() {
69+
ConnectivityState state = channel.getState(true);
70+
logger.debug("Grpc channel {} new state: {}", endpoint, state);
71+
channel.notifyWhenStateChanged(state, this);
72+
}
11573

116-
@Override
117-
public void run() {
118-
checkState();
119-
}
74+
@Override
75+
public void run() {
76+
checkState();
12077
}
12178
}

core/src/main/java/tech/ydb/core/impl/pool/ManagedChannelFactory.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,4 @@ interface Builder {
1414
}
1515

1616
ManagedChannel newManagedChannel(String host, int port, String authority);
17-
18-
long getConnectTimeoutMs();
1917
}

core/src/main/java/tech/ydb/core/impl/pool/NettyChannelFactory.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ public class NettyChannelFactory implements ManagedChannelFactory {
3838
private final boolean useTLS;
3939
private final byte[] cert;
4040
private final boolean retryEnabled;
41-
private final long connectTimeoutMs;
4241
private final boolean useDefaultGrpcResolver;
4342
private final Long grpcKeepAliveTimeMillis;
4443
private final List<Consumer<? super ManagedChannelBuilder<?>>> initializers;
@@ -49,17 +48,11 @@ private NettyChannelFactory(GrpcTransportBuilder builder) {
4948
this.useTLS = builder.getUseTls();
5049
this.cert = builder.getCert();
5150
this.retryEnabled = builder.isEnableRetry();
52-
this.connectTimeoutMs = builder.getConnectTimeoutMillis();
5351
this.useDefaultGrpcResolver = builder.useDefaultGrpcResolver();
5452
this.grpcKeepAliveTimeMillis = builder.getGrpcKeepAliveTimeMillis();
5553
this.initializers = builder.getChannelInitializers();
5654
}
5755

58-
@Override
59-
public long getConnectTimeoutMs() {
60-
return this.connectTimeoutMs;
61-
}
62-
6356
@SuppressWarnings("deprecation")
6457
@Override
6558
public ManagedChannel newManagedChannel(String host, int port, String sslHostOverride) {

core/src/main/java/tech/ydb/core/impl/pool/ShadedNettyChannelFactory.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ public class ShadedNettyChannelFactory implements ManagedChannelFactory {
3838
private final boolean useTLS;
3939
private final byte[] cert;
4040
private final boolean retryEnabled;
41-
private final long connectTimeoutMs;
4241
private final boolean useDefaultGrpcResolver;
4342
private final Long grpcKeepAliveTimeMillis;
4443
private final List<Consumer<? super ManagedChannelBuilder<?>>> initializers;
@@ -49,17 +48,11 @@ public ShadedNettyChannelFactory(GrpcTransportBuilder builder) {
4948
this.useTLS = builder.getUseTls();
5049
this.cert = builder.getCert();
5150
this.retryEnabled = builder.isEnableRetry();
52-
this.connectTimeoutMs = builder.getConnectTimeoutMillis();
5351
this.useDefaultGrpcResolver = builder.useDefaultGrpcResolver();
5452
this.grpcKeepAliveTimeMillis = builder.getGrpcKeepAliveTimeMillis();
5553
this.initializers = builder.getChannelInitializers();
5654
}
5755

58-
@Override
59-
public long getConnectTimeoutMs() {
60-
return this.connectTimeoutMs;
61-
}
62-
6356
@SuppressWarnings("deprecation")
6457
@Override
6558
public ManagedChannel newManagedChannel(String host, int port, String sslHostOverride) {

core/src/test/java/tech/ydb/core/impl/pool/DefaultChannelFactoryTest.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
import java.io.IOException;
66
import java.nio.file.Files;
77
import java.security.cert.CertificateException;
8-
import java.time.Duration;
9-
import java.util.concurrent.TimeUnit;
108

119
import com.google.common.io.ByteStreams;
1210
import io.grpc.ClientInterceptor;
@@ -79,7 +77,6 @@ public void defaultParams() {
7977
ManagedChannelFactory factory = ChannelFactoryLoader.load().buildFactory(builder);
8078
channelStaticMock.verify(FOR_ADDRESS, Mockito.times(0));
8179

82-
Assert.assertEquals(30_000l, factory.getConnectTimeoutMs());
8380
Assert.assertSame(channelMock, factory.newManagedChannel(MOCKED_HOST, MOCKED_PORT, null));
8481

8582
channelStaticMock.verify(FOR_ADDRESS, Mockito.times(1));
@@ -100,13 +97,11 @@ public void defaultParams() {
10097
public void defaultSslFactory() {
10198
GrpcTransportBuilder builder = GrpcTransport.forHost(MOCKED_HOST, MOCKED_PORT, "/Root")
10299
.withSecureConnection()
103-
.withGrpcRetry(true)
104-
.withConnectTimeout(Duration.ofMinutes(1));
100+
.withGrpcRetry(true);
105101

106102
ManagedChannelFactory factory = ChannelFactoryLoader.load().buildFactory(builder);
107103
channelStaticMock.verify(FOR_ADDRESS, Mockito.times(0));
108104

109-
Assert.assertEquals(60000l, factory.getConnectTimeoutMs());
110105
Assert.assertSame(channelMock, factory.newManagedChannel(MOCKED_HOST, MOCKED_PORT, null));
111106

112107
channelStaticMock.verify(FOR_ADDRESS, Mockito.times(1));
@@ -129,7 +124,7 @@ public void customChannelInitializer() {
129124
.withUseDefaultGrpcResolver(true);
130125

131126
ManagedChannelFactory factory = ShadedNettyChannelFactory
132-
.withInterceptor(ForwardingChannelBuilder2::enableFullStreamDecompression)
127+
.withInterceptor(ForwardingChannelBuilder2::useTransportSecurity)
133128
.buildFactory(builder);
134129

135130
channelStaticMock.verify(FOR_ADDRESS, Mockito.times(0));
@@ -146,7 +141,7 @@ public void customChannelInitializer() {
146141
Mockito.verify(channelBuilderMock, Mockito.times(1))
147142
.withOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);
148143
Mockito.verify(channelBuilderMock, Mockito.times(1)).withOption(ChannelOption.TCP_NODELAY, Boolean.TRUE);
149-
Mockito.verify(channelBuilderMock, Mockito.times(1)).enableFullStreamDecompression();
144+
Mockito.verify(channelBuilderMock, Mockito.times(1)).useTransportSecurity();
150145
}
151146

152147
@Test
@@ -186,12 +181,10 @@ public void customSslFactory() throws CertificateException, IOException {
186181

187182
GrpcTransportBuilder builder = GrpcTransport.forHost(MOCKED_HOST, MOCKED_PORT, "/Root")
188183
.withSecureConnection(baos.toByteArray())
189-
.withGrpcRetry(false)
190-
.withConnectTimeout(4, TimeUnit.SECONDS);
184+
.withGrpcRetry(false);
191185

192186
ManagedChannelFactory factory = ChannelFactoryLoader.load().buildFactory(builder);
193187

194-
Assert.assertEquals(4000l, factory.getConnectTimeoutMs());
195188
Assert.assertSame(channelMock, factory.newManagedChannel(MOCKED_HOST, MOCKED_PORT, null));
196189

197190
} finally {

core/src/test/java/tech/ydb/core/impl/pool/EndpointPoolTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ public class EndpointPoolTest {
3939

4040
@Before
4141
public void setUp() throws IOException {
42+
Mockito.when(socketFactory.createSocket()).thenReturn(socket);
43+
Mockito.doNothing().when(socket).connect(Mockito.any(SocketAddress.class));
4244
mocks = MockitoAnnotations.openMocks(this);
4345
threadLocalStaticMock.when(ThreadLocalRandom::current).thenReturn(random);
4446
socketFactoryStaticMock.when(SocketFactory::getDefault).thenReturn(socketFactory);
45-
Mockito.when(socketFactory.createSocket()).thenReturn(socket);
46-
Mockito.doNothing().when(socket).connect(Mockito.any(SocketAddress.class));
4747
}
4848

4949
@After

core/src/test/java/tech/ydb/core/impl/pool/GrpcChannelPoolTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ public class GrpcChannelPoolTest {
2222

2323
@Before
2424
public void setUp() {
25-
Mockito.when(factoryMock.getConnectTimeoutMs()).thenReturn(500l); // timeout for ready watcher
2625
Mockito.when(factoryMock.newManagedChannel(Mockito.any(), Mockito.anyInt(), Mockito.isNull()))
2726
.then((args) -> ManagedChannelMock.good());
2827
}

0 commit comments

Comments
 (0)