diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java index 2cf55a1abdc0..b615b49fc3d9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java @@ -25,6 +25,8 @@ import org.apache.hadoop.hbase.replication.ReplicationLoadSource; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; + /** * This class is used for exporting current state of load on a RegionServer. */ @@ -112,4 +114,6 @@ default String getVersion() { * rounded to MB */ Map getRegionCachedInfo(); + + List getClientConnectionsInfo(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java index c7aea21e845a..19a0c4b66c56 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java @@ -88,7 +88,8 @@ public static ServerMetrics toServerMetrics(ServerName serverName, int versionNu .setRegionCachedInfo(serverLoadPB.getRegionCachedInfoMap()) .setReportTimestamp(serverLoadPB.getReportEndTime()) .setLastReportTimestamp(serverLoadPB.getReportStartTime()).setVersionNumber(versionNumber) - .setVersion(version).build(); + .setClientConnectionInfos(serverLoadPB.getClientConnectionInfosList()).setVersion(version) + .build(); } public static List toCoprocessor(Collection names) { @@ -145,6 +146,8 @@ public static ServerMetricsBuilder newBuilder(ServerName sn) { private long lastReportTimestamp = 0; private final List tasks = new ArrayList<>(); private Map regionCachedInfo = new HashMap<>(); + private final List clientConnectionInfos = + new ArrayList<>(); private ServerMetricsBuilder(ServerName serverName) { this.serverName = serverName; @@ -240,11 +243,17 @@ public ServerMetricsBuilder setRegionCachedInfo(Map value) { return this; } + public ServerMetricsBuilder + setClientConnectionInfos(List value) { + clientConnectionInfos.addAll(value); + return this; + } + public ServerMetrics build() { return new ServerMetricsImpl(serverName, versionNumber, version, requestCountPerSecond, requestCount, readRequestCount, writeRequestCount, usedHeapSize, maxHeapSize, infoServerPort, sources, sink, regionStatus, coprocessorNames, reportTimestamp, lastReportTimestamp, - userMetrics, tasks, regionCachedInfo); + userMetrics, tasks, regionCachedInfo, clientConnectionInfos); } private static class ServerMetricsImpl implements ServerMetrics { @@ -268,6 +277,7 @@ private static class ServerMetricsImpl implements ServerMetrics { private final Map userMetrics; private final List tasks; private final Map regionCachedInfo; + private final List clientConnectionInfos; ServerMetricsImpl(ServerName serverName, int versionNumber, String version, long requestCountPerSecond, long requestCount, long readRequestsCount, @@ -275,7 +285,8 @@ private static class ServerMetricsImpl implements ServerMetrics { List sources, ReplicationLoadSink sink, Map regionStatus, Set coprocessorNames, long reportTimestamp, long lastReportTimestamp, Map userMetrics, List tasks, - Map regionCachedInfo) { + Map regionCachedInfo, + List clientConnectionInfos) { this.serverName = Preconditions.checkNotNull(serverName); this.versionNumber = versionNumber; this.version = version; @@ -295,6 +306,7 @@ private static class ServerMetricsImpl implements ServerMetrics { this.lastReportTimestamp = lastReportTimestamp; this.tasks = tasks; this.regionCachedInfo = regionCachedInfo; + this.clientConnectionInfos = clientConnectionInfos; } @Override @@ -402,6 +414,11 @@ public Map getRegionCachedInfo() { return Collections.unmodifiableMap(regionCachedInfo); } + @Override + public List getClientConnectionsInfo() { + return Collections.unmodifiableList(clientConnectionInfos); + } + @Override public String toString() { int storeCount = 0; diff --git a/hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto b/hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto index 58fd3c8d2a5b..fb0edc1e8b73 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto @@ -250,6 +250,14 @@ message ServerTask { } } +message ClientConnectionInfo { + optional string host_address = 1; + optional string user_name = 2; + optional string client_version = 3; + optional string service_name = 4; + optional string port = 5; +} + message ServerLoad { /** Number of requests since last report. */ optional uint64 number_of_requests = 1; @@ -326,6 +334,8 @@ message ServerLoad { * The metrics for region cached on this region server */ map regionCachedInfo = 16; + + repeated ClientConnectionInfo clientConnectionInfos = 17; } message LiveServerInfo { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientConnectionInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientConnectionInfo.java new file mode 100644 index 000000000000..c6c892991f59 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientConnectionInfo.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Holds information about a client connection including IP address, username, client version, and + * service name. + */ +@InterfaceAudience.Private +public class ClientConnectionInfo { + private final String hostAddress; + private final int port; + private final String userName; + private final String clientVersion; + private final String serviceName; + + private ClientConnectionInfo(Builder builder) { + this.hostAddress = builder.hostAddress; + this.port = builder.port; + this.userName = builder.userName; + this.clientVersion = builder.clientVersion; + this.serviceName = builder.serviceName; + } + + public String getHostAddress() { + return hostAddress; + } + + public String getUserName() { + return userName; + } + + public String getClientVersion() { + return clientVersion; + } + + public String getServiceName() { + return serviceName; + } + + public String getClientId() { + return hostAddress + ":" + port; + } + + public String getClientPort() { + return String.valueOf(port); + } + + @Override + public String toString() { + return "ClientConnectionInfo{" + "hostAddress='" + hostAddress + '\'' + ", port='" + port + '\'' + + ", userName='" + userName + '\'' + ", clientVersion='" + clientVersion + '\'' + + ", serviceName='" + serviceName + '\'' + '}'; + } + + public static class Builder { + private String hostAddress; + private int port; + private String userName; + private String clientVersion; + private String serviceName; + + public Builder hostAddress(String hostAddress) { + this.hostAddress = hostAddress; + return this; + } + + public Builder port(int port) { + this.port = port; + return this; + } + + public Builder userName(String userName) { + this.userName = userName; + return this; + } + + public Builder clientVersion(String clientVersion) { + this.clientVersion = clientVersion; + return this; + } + + public Builder serviceName(String serviceName) { + this.serviceName = serviceName; + return this; + } + + public ClientConnectionInfo build() { + return new ClientConnectionInfo(this); + } + } + + public static Builder newBuilder() { + return new Builder(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientConnectionRegistry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientConnectionRegistry.java new file mode 100644 index 000000000000..ee2b0d06145a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ClientConnectionRegistry.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Registry for tracking active client connections to the RPC server. Maintains a thread-safe map of + * client connection information. + */ +@InterfaceAudience.Private +public class ClientConnectionRegistry { + + private final ConcurrentHashMap clientConnections; + + public ClientConnectionRegistry() { + this.clientConnections = new ConcurrentHashMap<>(); + } + + public void registerClientConnection(ClientConnectionInfo connectionInfo) { + clientConnections.put(connectionInfo.getClientId(), connectionInfo); + } + + public void unregisterClientConnection(String clientId) { + clientConnections.remove(clientId); + } + + public Collection getClientConnections() { + return clientConnections.values(); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java index b21b6e19c78e..d1f551f861c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java @@ -132,6 +132,8 @@ public class NettyRpcServer extends RpcServer { private final AtomicReference keyStoreWatcher = new AtomicReference<>(); private final AtomicReference trustStoreWatcher = new AtomicReference<>(); + private final ClientConnectionRegistry clientConnectionRegistry; + private volatile int writeBufferFatalThreshold; private volatile WriteBufferWaterMark writeBufferWaterMark; @@ -141,6 +143,7 @@ public NettyRpcServer(Server server, String name, List getTotalAndMaxNettyOutboundBytes() { } return Pair.newPair(total, max); } + + @Override + public ClientConnectionRegistry getClientConnectionRegistry() { + return clientConnectionRegistry; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java index f63b8d2730f7..fe7708802b8a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java @@ -34,6 +34,7 @@ import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.channel.Channel; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; /** @@ -44,6 +45,7 @@ class NettyServerRpcConnection extends ServerRpcConnection { final Channel channel; + private ClientConnectionInfo clientConnectionInfo = null; NettyServerRpcConnection(NettyRpcServer rpcServer, Channel channel) { super(rpcServer); @@ -55,6 +57,7 @@ class NettyServerRpcConnection extends ServerRpcConnection { NettyFutureUtils.addListener(channel.closeFuture(), f -> { disposeSasl(); callCleanupIfNeeded(); + deregisterClientConnectionInfo(); // Unregister when connection closes NettyRpcServer.LOG.trace("Disconnection {}; # active connections={}", channel.remoteAddress(), rpcServer.allChannels.size() - 1); rpcServer.allChannels.remove(channel); @@ -86,7 +89,13 @@ void process(ByteBuf buf) throws IOException, InterruptedException { this.callCleanup = () -> buf.release(); ByteBuff byteBuff = new SingleByteBuff(buf.nioBuffer()); try { + boolean wasHeaderRead = connectionHeaderRead; processOneRpc(byteBuff); + + // Register only once, right after the connection header is processed + if (!wasHeaderRead && connectionHeaderRead) { + registerClientConnectionInfo(); + } } catch (Exception e) { callCleanupIfNeeded(); throw e; @@ -118,4 +127,62 @@ public NettyServerCall createCall(int id, final BlockingService service, protected void doRespond(RpcResponse resp) { NettyFutureUtils.safeWriteAndFlush(channel, resp); } + + protected void registerClientConnectionInfo() { + this.clientConnectionInfo = createClientConnectionInfo(); + + NettyRpcServer.LOG.info("Registering client connection information in Registry: {}", + clientConnectionInfo.toString()); + + if (clientConnectionInfo != null) { + ClientConnectionRegistry registry = rpcServer.getClientConnectionRegistry(); + if (registry != null) { + registry.registerClientConnection(clientConnectionInfo); + } + } + } + + protected void deregisterClientConnectionInfo() { + if (clientConnectionInfo != null) { + NettyRpcServer.LOG.info("Deregistering client connection information in Registry: {}", + clientConnectionInfo.getClientId()); + ClientConnectionRegistry registry = rpcServer.getClientConnectionRegistry(); + if (registry != null) { + registry.unregisterClientConnection(clientConnectionInfo.getClientId()); + } + } + } + + protected ClientConnectionInfo createClientConnectionInfo() { + if (!connectionHeaderRead) { + return null; + } + + String userName = "UNKNOWN"; + if (connectionHeader.hasUserInfo()) { + RPCProtos.UserInformation userInfoProto = connectionHeader.getUserInfo(); + if (userInfoProto.hasEffectiveUser()) { + userName = userInfoProto.getEffectiveUser(); + } + } + + String clientVersion = "UNKNOWN"; + if (connectionHeader.hasVersionInfo()) { + clientVersion = connectionHeader.getVersionInfo().getVersion(); + } + + String serviceName = "UNKNOWN"; + if (connectionHeader.hasServiceName()) { + serviceName = connectionHeader.getServiceName(); + } + + InetSocketAddress remoteAddress = (InetSocketAddress) (channel.remoteAddress()); + int port = remoteAddress.getPort(); + + InetSocketAddress localInetSocketAddress = (InetSocketAddress) (channel.localAddress()); + String hostAddress = localInetSocketAddress.getAddress().getHostAddress(); + + return ClientConnectionInfo.newBuilder().hostAddress(hostAddress).port(port).userName(userName) + .clientVersion(clientVersion).serviceName(serviceName).build(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java index a2df50118ad0..eaba206494a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java @@ -87,4 +87,8 @@ Pair call(RpcCall call, MonitoredRPCHandler status /** Return RPC's instance of {@link RpcCoprocessorHost} */ RpcCoprocessorHost getRpcCoprocessorHost(); + + default ClientConnectionRegistry getClientConnectionRegistry() { + return null; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index eea82ca511eb..61a0d85a2617 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -114,10 +114,13 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.io.util.MemorySizeUtil; +import org.apache.hadoop.hbase.ipc.ClientConnectionInfo; +import org.apache.hadoop.hbase.ipc.ClientConnectionRegistry; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.ipc.DecommissionedHostRejectedException; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.log.HBaseMarkers; @@ -1290,6 +1293,41 @@ private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, lon .setState(ClusterStatusProtos.ServerTask.State.valueOf(task.getState().name())) .setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTimestamp()).build())); + // Add client connection information from the RpcServer's registry + try { + RpcServerInterface rpcServer = rpcServices.getRpcServer(); + ClientConnectionRegistry registry = rpcServer.getClientConnectionRegistry(); + if (registry != null) { + Collection clientConnections = registry.getClientConnections(); + for (ClientConnectionInfo connInfo : clientConnections) { + ClusterStatusProtos.ClientConnectionInfo.Builder clientConnBuilder = + ClusterStatusProtos.ClientConnectionInfo.newBuilder(); + + if (connInfo.getHostAddress() != null) { + clientConnBuilder.setHostAddress(connInfo.getHostAddress()); + } + if (connInfo.getUserName() != null) { + clientConnBuilder.setUserName(connInfo.getUserName()); + } + if (connInfo.getClientVersion() != null) { + clientConnBuilder.setClientVersion(connInfo.getClientVersion()); + } + if (connInfo.getServiceName() != null) { + clientConnBuilder.setServiceName(connInfo.getServiceName()); + } + if (connInfo.getClientPort() != null) { + clientConnBuilder.setPort(connInfo.getClientPort()); + } + + ClusterStatusProtos.ClientConnectionInfo clientConn = clientConnBuilder.build(); + serverLoad.addClientConnectionInfos(clientConn); + LOG.debug("Client connection info added to server load report : " + clientConn); + } + } + } catch (Exception e) { + LOG.warn("Failed to get client connection information", e); + } + return serverLoad.build(); } diff --git a/hbase-server/src/main/resources/hbase-webapps/master/regionServerList.jsp b/hbase-server/src/main/resources/hbase-webapps/master/regionServerList.jsp index 9fc0adda00cc..17525e11979d 100644 --- a/hbase-server/src/main/resources/hbase-webapps/master/regionServerList.jsp +++ b/hbase-server/src/main/resources/hbase-webapps/master/regionServerList.jsp @@ -63,6 +63,9 @@ +
<% request.setAttribute("serverNames", serverNames); %> @@ -84,6 +87,9 @@
+
+ +
diff --git a/hbase-server/src/main/resources/hbase-webapps/master/regionServerListClientConnectionsStats.jsp b/hbase-server/src/main/resources/hbase-webapps/master/regionServerListClientConnectionsStats.jsp new file mode 100644 index 000000000000..1b704409b6fd --- /dev/null +++ b/hbase-server/src/main/resources/hbase-webapps/master/regionServerListClientConnectionsStats.jsp @@ -0,0 +1,47 @@ +<%@ page contentType="text/html;charset=UTF-8" + import="org.apache.hadoop.hbase.ServerName" + import="org.apache.hadoop.hbase.master.HMaster" + import="org.apache.hadoop.hbase.ServerMetrics" + import="org.apache.hadoop.hbase.master.ServerManager" + import="org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos" %> + +<% + HMaster master = (HMaster) getServletContext().getAttribute(HMaster.MASTER); + ServerName[] serverNames = (ServerName[]) request.getAttribute("serverNames"); + ServerManager serverManager = master.getServerManager(); +%> + + + + + + + + + + + + + <% + for (ServerName serverName: serverNames) { + ServerMetrics sl = serverManager.getLoad(serverName); + if (sl != null) { + java.util.List clientConnections = sl.getClientConnectionsInfo(); + if (clientConnections != null) { + for (ClusterStatusProtos.ClientConnectionInfo clientConnection : clientConnections) { + %> + + + + + + + + <% + } + } + } + } + %> + +
ClientIPUserNameClientVersionServiceNameServerInfo
<%= clientConnection.getHostAddress() %><%= clientConnection.getUserName() %><%= clientConnection.getClientVersion() %><%= clientConnection.getServiceName() %><%= serverName.getServerName() %>
diff --git a/hbase-server/src/main/resources/hbase-webapps/static/js/masterStatusInit.js b/hbase-server/src/main/resources/hbase-webapps/static/js/masterStatusInit.js index 0b09b6ee06b9..957889a9ada6 100644 --- a/hbase-server/src/main/resources/hbase-webapps/static/js/masterStatusInit.js +++ b/hbase-server/src/main/resources/hbase-webapps/static/js/masterStatusInit.js @@ -92,6 +92,8 @@ $(document).ready(function() } }); + $("#clientConnectionsStatsTable").tablesorter(); + $("#userTables").tablesorter(); function showRitPages() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyServerRpcConnectionClientConnectionInfo.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyServerRpcConnectionClientConnectionInfo.java new file mode 100644 index 000000000000..10fb3730c013 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyServerRpcConnectionClientConnectionInfo.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; +import org.apache.hbase.thirdparty.io.netty.channel.Channel; +import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; + +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto; + +/** + * Test to verify that ClientConnectionInfo is properly registered and unregistered in + * NettyServerRpcConnection lifecycle to prevent memory leaks. + */ +@Category({ RPCTests.class, MediumTests.class }) +public class TestNettyServerRpcConnectionClientConnectionInfo { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestNettyServerRpcConnectionClientConnectionInfo.class); + + private static Configuration CONF = HBaseConfiguration.create(); + private NioEventLoopGroup group; + private NettyRpcServer server; + private NettyRpcClient client; + private TestProtobufRpcProto.BlockingInterface stub; + + @Before + public void setUp() throws IOException { + group = new NioEventLoopGroup(); + server = new NettyRpcServer(null, getClass().getSimpleName(), + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), + new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1), true); + NettyRpcClientConfigHelper.setEventLoopConfig(CONF, group, NioSocketChannel.class); + client = new NettyRpcClient(CONF); + server.start(); + stub = TestProtobufRpcServiceImpl.newBlockingStub(client, server.getListenerAddress()); + } + + @After + public void tearDown() throws Exception { + Closeables.close(client, true); + server.stop(); + group.shutdownGracefully().sync(); + } + + @Test + public void testClientConnectionInfoRegisteredAndUnregisteredFromRegistry() throws Exception { + ClientConnectionRegistry registry = server.getClientConnectionRegistry(); + // Verify that the registry is not null + assertNotNull(registry); + + // Get the initial number of client connections registered + Collection connections = registry.getClientConnections(); + int initialSize = connections.size(); + + // Make an RPC call which will trigger connection header read and registration + assertEquals("test", + stub.echo(null, EchoRequestProto.newBuilder().setMessage("test").build()).getMessage()); + + // Wait for the ClientConnectionInfo to be registered + Waiter.waitFor(CONF, 5000, () -> registry.getClientConnections().size() > initialSize); + + // Verify that one connection is registered + connections = registry.getClientConnections(); + assertTrue(connections.size() >= initialSize + 1); + + // Get the channel to verify it closes + Channel channel = server.allChannels.stream().filter(c -> c instanceof NioSocketChannel) + .findFirst().orElse(null); + assertNotNull(channel); + + // Close the client connection + client.close(); + + // Wait for the channel to close + channel.closeFuture().await(5000, TimeUnit.MILLISECONDS); + + // Wait for the ClientConnectionInfo to be unregistered + Waiter.waitFor(CONF, 5000, () -> registry.getClientConnections().size() <= initialSize); + + // Verify that the connection is unregistered + connections = registry.getClientConnections(); + assertTrue(connections.size() <= initialSize); + } + + @Test + public void testMultipleConnectionsRegisteredAndUnregistered() throws Exception { + ClientConnectionRegistry registry = server.getClientConnectionRegistry(); + assertNotNull(registry); + + Collection connections = registry.getClientConnections(); + int initialSize = connections.size(); + + // Create multiple clients - using fresh clients not from setUp + NettyRpcClient client1 = new NettyRpcClient(CONF); + NettyRpcClient client2 = new NettyRpcClient(CONF); + NettyRpcClient client3 = new NettyRpcClient(CONF); + + try { + TestProtobufRpcProto.BlockingInterface stub1 = + TestProtobufRpcServiceImpl.newBlockingStub(client1, server.getListenerAddress()); + TestProtobufRpcProto.BlockingInterface stub2 = + TestProtobufRpcServiceImpl.newBlockingStub(client2, server.getListenerAddress()); + TestProtobufRpcProto.BlockingInterface stub3 = + TestProtobufRpcServiceImpl.newBlockingStub(client3, server.getListenerAddress()); + + // Make RPC calls from all clients + assertEquals("test1", + stub1.echo(null, EchoRequestProto.newBuilder().setMessage("test1").build()).getMessage()); + assertEquals("test2", + stub2.echo(null, EchoRequestProto.newBuilder().setMessage("test2").build()).getMessage()); + assertEquals("test3", + stub3.echo(null, EchoRequestProto.newBuilder().setMessage("test3").build()).getMessage()); + + // Wait for all ClientConnectionInfos to be registered + Waiter.waitFor(CONF, 10000, () -> registry.getClientConnections().size() >= initialSize + 3); + + // Verify that three connections are registered + connections = registry.getClientConnections(); + assertTrue(connections.size() >= initialSize + 3); + + int sizeBeforeClosing = connections.size(); + + // Close client2 + client2.close(); + + // Wait for client2 connection to be unregistered + Waiter.waitFor(CONF, 10000, () -> registry.getClientConnections().size() < sizeBeforeClosing); + + // Close client3 + int sizeAfterClient2 = registry.getClientConnections().size(); + client3.close(); + + // Wait for client3 connection to be unregistered + Waiter.waitFor(CONF, 10000, () -> registry.getClientConnections().size() < sizeAfterClient2); + + // Close client1 + int sizeAfterClient3 = registry.getClientConnections().size(); + client1.close(); + + // Wait for client1 connection to be unregistered + Waiter.waitFor(CONF, 10000, () -> registry.getClientConnections().size() < sizeAfterClient3); + + // Verify that we're back to the initial size or less + connections = registry.getClientConnections(); + assertTrue(connections.size() <= initialSize); + } finally { + Closeables.close(client1, true); + Closeables.close(client2, true); + Closeables.close(client3, true); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryChore.java index 06b66d118bf3..b7cc0d4735e9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryChore.java @@ -52,6 +52,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; + /** * Test for RegionsRecoveryChore */ @@ -399,6 +401,11 @@ public List getTasks() { public Map getRegionCachedInfo() { return new HashMap<>(); } + + @Override + public List getClientConnectionsInfo() { + return null; + } }; return serverMetrics; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBuildServerLoadWithClientConnectionInfo.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBuildServerLoadWithClientConnectionInfo.java new file mode 100644 index 000000000000..5e0eb15376d7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBuildServerLoadWithClientConnectionInfo.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.ServerMetrics; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.ipc.NettyRpcServer; +import org.apache.hadoop.hbase.ipc.RpcServerFactory; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test to verify ClientConnectionInfo is properly collected and included in buildServerLoad + */ +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestBuildServerLoadWithClientConnectionInfo { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBuildServerLoadWithClientConnectionInfo.class); + + private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); + private static final byte[] FAMILY = Bytes.toBytes("f"); + private static final TableName TABLE_NAME = TableName.valueOf("test_client_connection_info"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = HBaseConfiguration.create(); + // Use NettyRpcServer which supports ClientConnectionRegistry + conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, NettyRpcServer.class.getName()); + UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws Exception { + // Create table for test + try (Admin admin = UTIL.getAdmin()) { + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME); + builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)); + admin.createTable(builder.build()); + } + } + + @After + public void tearDown() throws Exception { + try (Admin admin = UTIL.getAdmin()) { + if (admin.tableExists(TABLE_NAME)) { + admin.disableTable(TABLE_NAME); + admin.deleteTable(TABLE_NAME); + } + } + } + + @Test + public void testClientConnectionInfoFromServerLoad() throws Exception { + // Create a connection + try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration()); + Table table = connection.getTable(TABLE_NAME)) { + + // Perform operations to register client connection + byte[] row = Bytes.toBytes("row3"); + Put put = new Put(row); + put.addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("value3")); + table.put(put); + + Get get = new Get(row); + table.get(get); + + // Wait for registration + Thread.sleep(1000); + + // Access the region server + HRegionServer rs = UTIL.getMiniHBaseCluster().getRegionServer(0); + + // Trigger region server report to generate ServerLoad + long now = EnvironmentEdgeManager.currentTime(); + long last = now - 1000; + rs.tryRegionServerReport(last, now); + + // Wait for the report to be processed + Thread.sleep(500); + + // Get ServerMetrics from ServerManager + HMaster master = UTIL.getMiniHBaseCluster().getMaster(); + ServerManager serverManager = master.getServerManager(); + ServerName rsServerName = rs.getServerName(); + ServerMetrics metrics = serverManager.getLoad(rsServerName); + + Assert.assertNotNull(metrics); + + // Get client connection info from ServerMetrics + Assert.assertNotNull(metrics.getClientConnectionsInfo()); + } + } +}