Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,18 @@ class ClusterIdFetcher {

private final RpcControllerFactory rpcControllerFactory;

private final int rpcTimeoutMs;

private final CompletableFuture<String> future;

ClusterIdFetcher(Configuration conf, User user, RpcControllerFactory rpcControllerFactory,
Set<ServerName> bootstrapServers) {
Set<ServerName> bootstrapServers, int rpcTimeoutMs) {
this.user = user;
// use null cluster id here as we do not know the cluster id yet, we will fetch it through this
// rpc client
this.rpcClient = RpcClientFactory.createClient(conf, null);
this.rpcControllerFactory = rpcControllerFactory;
this.rpcTimeoutMs = rpcTimeoutMs;
this.bootstrapServers = new ArrayList<ServerName>(bootstrapServers);
// shuffle the bootstrap servers so we will not always fetch from the same one
Collections.shuffle(this.bootstrapServers);
Expand All @@ -88,10 +91,9 @@ class ClusterIdFetcher {
private void getClusterId(int index) {
ServerName server = bootstrapServers.get(index);
LOG.debug("Going to request {} for getting cluster id", server);
// user and rpcTimeout are both not important here, as we will not actually send any rpc calls
// out, only a preamble connection header, but if we pass null as user, there will be NPE in
// some code paths...
RpcChannel channel = rpcClient.createRpcChannel(server, user, 0);
// The preamble exchange still requires TCP connect and potentially TLS negotiation, so we use
// the configured rpc timeout to bound the connection attempt.
RpcChannel channel = rpcClient.createRpcChannel(server, user, rpcTimeoutMs);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only send a preamble header here, I wonder whether the rpc timeout actually work. And I can make sure that we do not have TLS negotiation when sending preamble header.

ConnectionRegistryService.Interface stub = ConnectionRegistryService.newStub(channel);
HBaseRpcController controller = rpcControllerFactory.newController();
stub.getConnectionRegistry(controller, GetConnectionRegistryRequest.getDefaultInstance(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,35 @@ private ImmutableMap<ServerName, ClientMetaService.Interface> createStubs(RpcCli
CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> future =
new CompletableFuture<>();
addr2StubFuture = future;
FutureUtils.addListener(
new ClusterIdFetcher(noAuthConf, user, rpcControllerFactory, bootstrapNodes).fetchClusterId(),
(clusterId, error) -> {
synchronized (ConnectionRegistryRpcStubHolder.this) {
if (error != null) {
addr2StubFuture.completeExceptionally(error);
} else {
RpcClient c = RpcClientFactory.createClient(conf, clusterId);
ImmutableMap<ServerName, ClientMetaService.Interface> m =
createStubs(c, bootstrapNodes);
rpcClient = c;
addr2Stub = m;
addr2StubFuture.complete(m);
try {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need the try catch here? FutureUtils.addListener does not throw any exception out.

FutureUtils.addListener(
new ClusterIdFetcher(noAuthConf, user, rpcControllerFactory, bootstrapNodes, rpcTimeoutMs)
.fetchClusterId(),
(clusterId, error) -> {
synchronized (ConnectionRegistryRpcStubHolder.this) {
try {
if (error != null) {
addr2StubFuture.completeExceptionally(error);
} else {
RpcClient c = RpcClientFactory.createClient(conf, clusterId);
ImmutableMap<ServerName, ClientMetaService.Interface> m =
createStubs(c, bootstrapNodes);
rpcClient = c;
addr2Stub = m;
addr2StubFuture.complete(m);
}
} catch (Throwable t) {
addr2StubFuture.completeExceptionally(
new IOException("Failed to create RPC client or stubs", t));
} finally {
addr2StubFuture = null;
}
}
addr2StubFuture = null;
}
});
});
} catch (Throwable t) {
future.completeExceptionally(new IOException("Failed to start cluster ID fetch", t));
addr2StubFuture = null;
}
// here we must use the local variable future instead of addr2StubFuture, as the above listener
// could be executed directly in the same thread(if the future completes quick enough), since
// the synchronized lock is reentrant, it could set addr2StubFuture to null in the end, so when
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
import java.net.SocketAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;

import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ConnectionRegistryService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse;

/**
* Test that ConnectionRegistryRpcStubHolder properly completes its future when RPC client creation
* fails. Before the fix, an exception thrown by RpcClientFactory.createClient() inside the
* FutureUtils.addListener callback would be swallowed, leaving the CompletableFuture permanently
* incomplete and hanging all callers.
*/
@Tag(ClientTests.TAG)
@Tag(SmallTests.TAG)
public class TestConnectionRegistryRpcClientFailure {

private static final String HEDGED_REQS_FANOUT_CONFIG_NAME = "hbase.test.hedged.reqs.fanout";
private static final String INITIAL_DELAY_SECS_CONFIG_NAME =
"hbase.test.refresh.initial.delay.secs";
private static final String REFRESH_INTERVAL_SECS_CONFIG_NAME =
"hbase.test.refresh.interval.secs";
private static final String MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME =
"hbase.test.min.refresh.interval.secs";

private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil();

private static Set<ServerName> BOOTSTRAP_NODES;

/**
* RPC client that succeeds for the preamble cluster ID fetch (clusterId is null) but throws on
* the real client creation (clusterId is non-null). This simulates the production failure where
* TLS certificate provisioning fails during RPC client construction.
*/
public static final class FailingRpcClientImpl implements RpcClient {

public FailingRpcClientImpl(Configuration configuration, String clusterId,
SocketAddress localAddress, MetricsConnection metrics, Map<String, byte[]> attributes) {
if (clusterId != null) {
throw new RuntimeException("Simulated RPC client creation failure");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the actually problem is here, we may throw exception when creating RpcClient?

Looking at the code, neither BlockingRpcClient nor NettyRpcClient will throw exception when constructing.

  @SuppressWarnings("FutureReturnValueIgnored")
  public static <T> void addListener(CompletableFuture<T> future,
    BiConsumer<? super T, ? super Throwable> action) {
    future.whenComplete((resp, error) -> {
      try {
        // See this post on stack overflow(shorten since the url is too long),
        // https://s.apache.org/completionexception
        // For a chain of CompletableFuture, only the first child CompletableFuture can get the
        // original exception, others will get a CompletionException, which wraps the original
        // exception. So here we unwrap it before passing it to the callback action.
        action.accept(resp, unwrapCompletionException(error));
      } catch (Throwable t) {
        LOG.error("Unexpected error caught when processing CompletableFuture", t);
      }
    });
  }

We will log something out when the callback throws exception, have you seen something like this?

}
}

@Override
public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout) {
throw new UnsupportedOperationException();
}

@Override
public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) {
return new PreambleOnlyRpcChannel();
}

@Override
public void cancelConnections(ServerName sn) {
}

@Override
public void close() {
}

@Override
public boolean hasCellBlockSupport() {
return false;
}
}

/**
* RPC channel that only handles the preamble GetConnectionRegistry call, returning a valid
* cluster ID. All other RPCs are ignored.
*/
public static final class PreambleOnlyRpcChannel implements RpcChannel {

@Override
public void callMethod(MethodDescriptor method, RpcController controller, Message request,
Message responsePrototype, RpcCallback<Message> done) {
if (method.getService().equals(ConnectionRegistryService.getDescriptor())) {
done.run(
GetConnectionRegistryResponse.newBuilder().setClusterId("test-cluster-id").build());
} else {
controller.setFailed("unexpected call");
done.run(null);
}
}
}

@BeforeAll
public static void setUpBeforeClass() {
Configuration conf = UTIL.getConfiguration();
conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, FailingRpcClientImpl.class,
RpcClient.class);
conf.setLong(INITIAL_DELAY_SECS_CONFIG_NAME, Integer.MAX_VALUE);
conf.setLong(REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE);
conf.setLong(MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE - 1);
BOOTSTRAP_NODES = IntStream.range(0, 3)
.mapToObj(i -> ServerName.valueOf("localhost", (10000 + 100 * i), ServerName.NON_STARTCODE))
.collect(Collectors.toSet());
}

private AbstractRpcBasedConnectionRegistry createRegistry() throws IOException {
Configuration conf = UTIL.getConfiguration();
conf.setInt(HEDGED_REQS_FANOUT_CONFIG_NAME, 1);
return new AbstractRpcBasedConnectionRegistry(conf, User.getCurrent(),
HEDGED_REQS_FANOUT_CONFIG_NAME, INITIAL_DELAY_SECS_CONFIG_NAME,
REFRESH_INTERVAL_SECS_CONFIG_NAME, MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) {

@Override
protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException {
return BOOTSTRAP_NODES;
}

@Override
protected CompletableFuture<Set<ServerName>> fetchEndpoints() {
return CompletableFuture.completedFuture(BOOTSTRAP_NODES);
}

@Override
public String getConnectionString() {
return "unimplemented";
}
};
}

/**
* Verify that when RPC client creation throws during stub initialization, the registry's future
* completes exceptionally rather than hanging indefinitely.
*/
@Test
public void testRpcClientCreationFailureCompletesExceptionally() throws Exception {
try (AbstractRpcBasedConnectionRegistry registry = createRegistry()) {
CompletableFuture<String> future = registry.getClusterId();
// The future must complete within a few seconds. Before the fix, this would hang forever.
IOException e = assertThrows(IOException.class,
() -> FutureUtils.get(future, 5, TimeUnit.SECONDS));
assertTrue(e.getCause().getMessage().contains("Simulated RPC client creation failure"),
"Expected simulated failure in cause chain, got: " + e);
}
}

/**
* Verify that after a failed stub initialization, subsequent getClusterId() calls also fail
* promptly rather than returning a zombie future.
*/
@Test
public void testSubsequentCallsAfterFailure() throws Exception {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this test fail before the patch? We will create a ClusterIdFetcher every time when fetching in RpcConnectionRegistry, so I do not get the point why we could return a zombie future...

try (AbstractRpcBasedConnectionRegistry registry = createRegistry()) {
// First call triggers the failure
CompletableFuture<String> first = registry.getClusterId();
assertThrows(IOException.class, () -> FutureUtils.get(first, 5, TimeUnit.SECONDS));

// Second call should also fail promptly, not return a stale zombie future
CompletableFuture<String> second = registry.getClusterId();
assertThrows(IOException.class, () -> FutureUtils.get(second, 5, TimeUnit.SECONDS));
}
}
}
Loading