Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -31,6 +32,7 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -91,14 +93,13 @@ public class XceiverClientGrpc extends XceiverClientSpi {
private static final int SHUTDOWN_WAIT_MAX_SECONDS = 5;
private final Pipeline pipeline;
private final ConfigurationSource config;
private final Map<DatanodeID, XceiverClientProtocolServiceStub> asyncStubs;
private final XceiverClientMetrics metrics;
private final Map<DatanodeID, ManagedChannel> channels;
private final Semaphore semaphore;
private long timeout;
private final SecurityConfig secConfig;
private final boolean topologyAwareRead;
private final ClientTrustManager trustManager;
private final ConcurrentMap<DatanodeID, ChannelInfo> dnChannelInfoMap;
// Cache the DN which returned the GetBlock command so that the ReadChunk
// command can be sent to the same DN.
private final Map<DatanodeBlockID, DatanodeDetails> getBlockDNcache;
Expand Down Expand Up @@ -126,8 +127,7 @@ public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config,
this.semaphore =
new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
this.metrics = XceiverClientManager.getXceiverClientMetrics();
this.channels = new ConcurrentHashMap<>();
this.asyncStubs = new ConcurrentHashMap<>();
this.dnChannelInfoMap = new ConcurrentHashMap<>();
this.topologyAwareRead = config.getBoolean(
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT);
Expand Down Expand Up @@ -161,49 +161,42 @@ public void connect() throws Exception {
connectToDatanode(dn);
}

private void connectToDatanode(DatanodeDetails dn)
throws IOException {
private void connectToDatanode(DatanodeDetails dn) throws IOException {
if (isClosed.get()) {
throw new IOException("Client is closed.");
}

if (isConnected(dn)) {
return;
}
// read port from the data node, on failure use default configured port
int port = dn.getStandalonePort().getValue();
if (port == 0) {
port = config.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT,
OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT);
}
final int finalPort = port;

LOG.debug("Connecting to server : {}; nodes in pipeline : {}, ", dn, pipeline.getNodes());

channels.computeIfPresent(dn.getID(), (dnId, channel) -> {
if (channel.isTerminated() || channel.isShutdown()) {
asyncStubs.remove(dnId);
return null; // removes from channels map
}

return channel;
});

ManagedChannel channel;
try {
channel = channels.computeIfAbsent(dn.getID(), dnId -> {
try {
return createChannel(dn, finalPort).build();
} catch (IOException e) {
throw new RuntimeException(e);
dnChannelInfoMap.compute(dn.getID(), (dnId, channelInfo) -> {
// channel is absent or stale
if (channelInfo == null || channelInfo.isChannelInactive()) {
LOG.debug("Connecting to server: {}; nodes in pipeline: {}", dn, pipeline.getNodes());
try {
return generateNewChannel(dn);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

// channel is present and active
return channelInfo;
});
} catch (RuntimeException e) {
} catch (UncheckedIOException e) {
LOG.error("Failed to create channel to datanode {}", dn, e);
throw new IOException(e.getCause());
throw e.getCause();
}
}

private ChannelInfo generateNewChannel(DatanodeDetails dn) throws IOException {
// read port from the data node, on failure use default configured port
int port = dn.getStandalonePort().getValue();
if (port == 0) {
port = config.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT, OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT);
}

asyncStubs.computeIfAbsent(dn.getID(), dnId -> XceiverClientProtocolServiceGrpc.newStub(channel));
ManagedChannel channel = createChannel(dn, port).build();
XceiverClientProtocolServiceStub stub = XceiverClientProtocolServiceGrpc.newStub(channel);
return new ChannelInfo(channel, stub);
}

protected NettyChannelBuilder createChannel(DatanodeDetails dn, int port)
Expand Down Expand Up @@ -248,11 +241,12 @@ private boolean datanodeUseHostName() {
*/
@VisibleForTesting
public boolean isConnected(DatanodeDetails details) {
return isConnected(channels.get(details.getID()));
}
if (details == null) {
return false;
}

private boolean isConnected(ManagedChannel channel) {
return channel != null && !channel.isTerminated() && !channel.isShutdown();
ChannelInfo channelInfo = dnChannelInfoMap.get(details.getID());
return channelInfo != null && !channelInfo.isChannelInactive();
}

/**
Expand All @@ -267,13 +261,17 @@ public void close() {
return;
}

for (ManagedChannel channel : channels.values()) {
channel.shutdown();
for (ChannelInfo channelInfo : dnChannelInfoMap.values()) {
channelInfo.getChannel().shutdown();
}

final long maxWaitNanos = TimeUnit.SECONDS.toNanos(SHUTDOWN_WAIT_MAX_SECONDS);
long deadline = System.nanoTime() + maxWaitNanos;
List<ManagedChannel> nonTerminatedChannels = new ArrayList<>(channels.values());
List<ManagedChannel> nonTerminatedChannels = dnChannelInfoMap.values()
.stream()
.map(ChannelInfo::getChannel)
.filter(Objects::nonNull)
.collect(Collectors.toList());

while (!nonTerminatedChannels.isEmpty() && System.nanoTime() < deadline) {
nonTerminatedChannels.removeIf(ManagedChannel::isTerminated);
Expand All @@ -286,16 +284,17 @@ public void close() {
}
}

List<DatanodeID> failedChannels = channels.entrySet().stream()
.filter(e -> !e.getValue().isTerminated())
List<DatanodeID> failedChannels = dnChannelInfoMap.entrySet()
.stream()
.filter(e -> !e.getValue().getChannel().isTerminated())
.map(Map.Entry::getKey)
.collect(Collectors.toList());

if (!failedChannels.isEmpty()) {
LOG.warn("Channels {} did not terminate within timeout.", failedChannels);
}

channels.clear();
asyncStubs.clear();
dnChannelInfoMap.clear();
}

@Override
Expand Down Expand Up @@ -581,7 +580,7 @@ public void initStreamRead(BlockID blockID, StreamingReaderSpi streamObserver) t
try {
checkOpen(dn);
semaphore.acquire();
XceiverClientProtocolServiceStub stub = asyncStubs.get(dn.getID());
XceiverClientProtocolServiceStub stub = dnChannelInfoMap.get(dn.getID()).getStub();
if (stub == null) {
throw new IOException("Failed to get gRPC stub for DataNode: " + dn);
}
Expand Down Expand Up @@ -698,7 +697,7 @@ public XceiverClientReply sendCommandAsync(

// create a new grpc message stream pair for each call.
final StreamObserver<ContainerCommandRequestProto> requestObserver =
asyncStubs.get(dnId).withDeadlineAfter(timeout, TimeUnit.SECONDS)
dnChannelInfoMap.get(dnId).getStub().withDeadlineAfter(timeout, TimeUnit.SECONDS)
.send(new StreamObserver<ContainerCommandResponseProto>() {
@Override
public void onNext(ContainerCommandResponseProto value) {
Expand Down Expand Up @@ -739,30 +738,13 @@ private void decreasePendingMetricsAndReleaseSemaphore() {

private void checkOpen(DatanodeDetails dn)
throws IOException {
if (isClosed.get()) {
throw new IOException("This channel is not connected.");
}

ManagedChannel channel = channels.get(dn.getID());
// If the channel doesn't exist for this specific datanode or the channel
// is closed, just reconnect
if (!isConnected(channel)) {
reconnect(dn);
}

}

private void reconnect(DatanodeDetails dn)
throws IOException {
ManagedChannel channel;
try {
connectToDatanode(dn);
channel = channels.get(dn.getID());
} catch (Exception e) {
throw new IOException("Error while connecting", e);
}
Comment on lines 741 to 745
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With simplified checkOpen, there are two test cases that need to be updated, sorry for missing that.

Expecting actual:
  "Error while connecting"
to contain:
  "This channel is not connected" 
	at org.apache.hadoop.hdds.scm.TestXceiverClientManager.testFreeByReference(TestXceiverClientManager.java:162)

...
	at org.apache.hadoop.hdds.scm.TestXceiverClientManager.testFreeByEviction(TestXceiverClientManager.java:211)

Let's remove this try-catch, we don't need to wrap the exception from connectToDatanode.

With that, exception will have the correct message (Client is closed). Please update the test to reflect that.

assertThat(t.getMessage()).contains("This channel is not connected");

assertThat(t.getMessage()).contains("This channel is not connected");

diff --git hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index a9dbcb9456..1f9ac0a122 100644
--- hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -738,11 +738,7 @@ private void decreasePendingMetricsAndReleaseSemaphore() {
 
   private void checkOpen(DatanodeDetails dn)
       throws IOException {
-    try {
-      connectToDatanode(dn);
-    } catch (Exception e) {
-      throw new IOException("Error while connecting", e);
-    }
+    connectToDatanode(dn);
 
     if (!isConnected(dn)) {
       throw new IOException("This channel is not connected.");
diff --git hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientManager.java hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientManager.java
index 06d19e5575..9468cec94f 100644
--- hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientManager.java
+++ hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientManager.java
@@ -159,7 +159,7 @@ public void testFreeByReference(@TempDir Path metaDir) throws IOException {
       Throwable t = assertThrows(IOException.class,
           () -> ContainerProtocolCalls.createContainer(client1,
               container1.getContainerInfo().getContainerID(), null));
-      assertThat(t.getMessage()).contains("This channel is not connected");
+      assertThat(t.getMessage()).contains("Client is closed");
 
       clientManager.releaseClient(client2, false);
     }
@@ -208,7 +208,7 @@ public void testFreeByEviction(@TempDir Path metaDir) throws IOException {
       Throwable t = assertThrows(IOException.class,
           () -> ContainerProtocolCalls.createContainer(client1,
               container1.getContainerInfo().getContainerID(), null));
-      assertThat(t.getMessage()).contains("This channel is not connected");
+      assertThat(t.getMessage()).contains("Client is closed");
 
       clientManager.releaseClient(client2, false);
     }


if (!isConnected(channel)) {
if (!isConnected(dn)) {
throw new IOException("This channel is not connected.");
}
}
Expand All @@ -784,4 +766,31 @@ public ConfigurationSource getConfig() {
public void setTimeout(long timeout) {
this.timeout = timeout;
}

/**
* Group the channel and stub so that they are published together.
*/
private static class ChannelInfo {
private final ManagedChannel channel;
private final XceiverClientProtocolServiceStub stub;

ChannelInfo(ManagedChannel channel, XceiverClientProtocolServiceStub stub) {
this.channel = channel;
this.stub = stub;
}

public ManagedChannel getChannel() {
return channel;
}

public XceiverClientProtocolServiceStub getStub() {
return stub;
}

public boolean isChannelInactive() {
return channel == null
|| channel.isTerminated()
|| channel.isShutdown();
}
}
}