From c2d83360a5ddc396a03167de53f23dd48b7ca876 Mon Sep 17 00:00:00 2001 From: Sanjeet Malhotra Date: Mon, 23 Feb 2026 23:59:58 +0530 Subject: [PATCH 01/10] Add HBase client metrics --- .../hadoop/hbase/client/ClientScanner.java | 8 +- .../ResultBoundedCompletionService.java | 15 ++ .../hadoop/hbase/client/ScannerCallable.java | 183 ++++++++++++------ .../client/ScannerCallableWithReplicas.java | 33 +++- .../hbase/client/metrics/ScanMetrics.java | 33 +++- 5 files changed, 199 insertions(+), 73 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index a453945e729a..7d86f7247ad1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -293,6 +293,7 @@ private Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller implements RunnableFuture { private final RpcRetryingCaller retryingCaller; private boolean resultObtained = false; private final int replicaId; // replica id + private long submitTimeMs; + private long queueWaitTimeMs = 0; + private long executionTimeMs = 0; public QueueingFuture(RetryingCallable future, int rpcTimeout, int operationTimeout, int id) { @@ -64,11 +67,14 @@ public QueueingFuture(RetryingCallable future, int rpcTimeout, int operationT this.operationTimeout = operationTimeout; this.retryingCaller = retryingCallerFactory. newCaller(rpcTimeout); this.replicaId = id; + this.submitTimeMs = EnvironmentEdgeManager.currentTime(); } @SuppressWarnings("unchecked") @Override public void run() { + long executionStartTimeMs = EnvironmentEdgeManager.currentTime(); + queueWaitTimeMs += (executionStartTimeMs - submitTimeMs); try { if (!cancelled) { result = this.retryingCaller.callWithRetries(future, operationTimeout); @@ -77,6 +83,7 @@ public void run() { } catch (Throwable t) { exeEx = new ExecutionException(t); } finally { + executionTimeMs += (EnvironmentEdgeManager.currentTime() - executionStartTimeMs); synchronized (tasks) { // If this wasn't canceled then store the result. if (!cancelled) { @@ -147,6 +154,14 @@ public int getReplicaId() { public ExecutionException getExeEx() { return exeEx; } + + public long getQueueWaitTimeMs() { + return queueWaitTimeMs; + } + + public long getExecutionTimeMs() { + return executionTimeMs; + } } @SuppressWarnings("unchecked") diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 8ba8df0c6a8c..358be62c5e7b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -44,6 +44,8 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,6 +78,11 @@ public class ScannerCallable extends ClientServiceCallable { private boolean logScannerActivity = false; private int logCutOffLatency = 1000; protected final int id; + private long scanPrepareTimeMs = 0; + private long scanExecutionTimeMs = 0; + private long rpcCallTimeMs = 0; + private long threadPoolWaitTimeMs = 0; + private long threadPoolExecutionTimeMs = 0; enum MoreResults { YES, @@ -155,25 +162,30 @@ public void prepare(boolean reload) throws IOException { throw new InterruptedIOException(); } - if ( - reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME) - && getConnection().isTableDisabled(getTableName()) - ) { - throw new TableNotEnabledException(getTableName().getNameAsString() + " is disabled."); - } + long prepareStartTimeMs = EnvironmentEdgeManager.currentTime(); + try { + if ( + reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME) + && getConnection().isTableDisabled(getTableName()) + ) { + throw new TableNotEnabledException(getTableName().getNameAsString() + " is disabled."); + } - RegionLocations rl = getRegionLocationsForPrepare(getRow()); - location = getLocationForReplica(rl); - ServerName dest = location.getServerName(); - setStub(super.getConnection().getClient(dest)); - if (!instantiated || reload) { - checkIfRegionServerIsRemote(); - instantiated = true; - } - cursor = null; - // check how often we retry. - if (reload) { - incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); + RegionLocations rl = getRegionLocationsForPrepare(getRow()); + location = getLocationForReplica(rl); + ServerName dest = location.getServerName(); + setStub(super.getConnection().getClient(dest)); + if (!instantiated || reload) { + checkIfRegionServerIsRemote(); + instantiated = true; + } + cursor = null; + // check how often we retry. + if (reload) { + incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); + } + } finally { + scanPrepareTimeMs += (EnvironmentEdgeManager.currentTime() - prepareStartTimeMs); } } @@ -192,7 +204,7 @@ private ScanResponse next() throws IOException { ScanRequest request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, this.scanMetrics != null, renew, scan.getLimit()); try { - ScanResponse response = getStub().scan(getRpcController(), request); + ScanResponse response = doScan(getRpcController(), request); nextCallSeq++; return response; } catch (Exception e) { @@ -247,60 +259,75 @@ private void setAlreadyClosed() { this.closed = true; } + private ScanResponse doScan(RpcController controller, ScanRequest request) throws ServiceException { + long rpcCallStartTimeMs = EnvironmentEdgeManager.currentTime(); + try { + ScanResponse response = getStub().scan(controller, request); + return response; + } finally { + rpcCallTimeMs += (EnvironmentEdgeManager.currentTime() - rpcCallStartTimeMs); + } + } + @Override protected Result[] rpcCall() throws Exception { if (Thread.interrupted()) { throw new InterruptedIOException(); } - if (closed) { - close(); - return null; - } - ScanResponse response; - if (this.scannerId == -1L) { - response = openScanner(); - } else { - response = next(); - } - long timestamp = EnvironmentEdgeManager.currentTime(); - boolean isHeartBeat = response.hasHeartbeatMessage() && response.getHeartbeatMessage(); - setHeartbeatMessage(isHeartBeat); - if (isHeartBeat && scan.isNeedCursorResult() && response.hasCursor()) { - cursor = ProtobufUtil.toCursor(response.getCursor()); - } - Result[] rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response); - if (logScannerActivity) { - long now = EnvironmentEdgeManager.currentTime(); - if (now - timestamp > logCutOffLatency) { - int rows = rrs == null ? 0 : rrs.length; - LOG.info( - "Took " + (now - timestamp) + "ms to fetch " + rows + " rows from scanner=" + scannerId); + long scanExecutionStartTimeMs = EnvironmentEdgeManager.currentTime(); + try { + if (closed) { + close(); + return null; } - } - updateServerSideMetrics(scanMetrics, response); - // moreResults is only used for the case where a filter exhausts all elements - if (response.hasMoreResults()) { - if (response.getMoreResults()) { - setMoreResultsForScan(MoreResults.YES); + ScanResponse response; + if (this.scannerId == -1L) { + response = openScanner(); } else { - setMoreResultsForScan(MoreResults.NO); - setAlreadyClosed(); + response = next(); } - } else { - setMoreResultsForScan(MoreResults.UNKNOWN); - } - if (response.hasMoreResultsInRegion()) { - if (response.getMoreResultsInRegion()) { - setMoreResultsInRegion(MoreResults.YES); + long timestamp = EnvironmentEdgeManager.currentTime(); + boolean isHeartBeat = response.hasHeartbeatMessage() && response.getHeartbeatMessage(); + setHeartbeatMessage(isHeartBeat); + if (isHeartBeat && scan.isNeedCursorResult() && response.hasCursor()) { + cursor = ProtobufUtil.toCursor(response.getCursor()); + } + Result[] rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response); + if (logScannerActivity) { + long now = EnvironmentEdgeManager.currentTime(); + if (now - timestamp > logCutOffLatency) { + int rows = rrs == null ? 0 : rrs.length; + LOG.info( + "Took " + (now - timestamp) + "ms to fetch " + rows + " rows from scanner=" + scannerId); + } + } + updateServerSideMetrics(scanMetrics, response); + // moreResults is only used for the case where a filter exhausts all elements + if (response.hasMoreResults()) { + if (response.getMoreResults()) { + setMoreResultsForScan(MoreResults.YES); + } else { + setMoreResultsForScan(MoreResults.NO); + setAlreadyClosed(); + } + } else { + setMoreResultsForScan(MoreResults.UNKNOWN); + } + if (response.hasMoreResultsInRegion()) { + if (response.getMoreResultsInRegion()) { + setMoreResultsInRegion(MoreResults.YES); + } else { + setMoreResultsInRegion(MoreResults.NO); + setAlreadyClosed(); + } } else { - setMoreResultsInRegion(MoreResults.NO); - setAlreadyClosed(); + setMoreResultsInRegion(MoreResults.UNKNOWN); } - } else { - setMoreResultsInRegion(MoreResults.UNKNOWN); + updateResultsMetrics(scanMetrics, rrs, isRegionServerRemote); + return rrs; + } finally { + scanExecutionTimeMs += (EnvironmentEdgeManager.currentTime() - scanExecutionStartTimeMs); } - updateResultsMetrics(scanMetrics, rrs, isRegionServerRemote); - return rrs; } /** @@ -344,7 +371,7 @@ private void close() { controller.setPriority(HConstants.HIGH_QOS); try { - getStub().scan(controller, request); + doScan(controller, request); } catch (Exception e) { throw ProtobufUtil.handleRemoteException(e); } @@ -362,7 +389,7 @@ private ScanResponse openScanner() throws IOException { ScanRequest request = RequestConverter.buildScanRequest( getLocation().getRegionInfo().getRegionName(), this.scan, this.caching, false); try { - ScanResponse response = getStub().scan(getRpcController(), request); + ScanResponse response = doScan(getRpcController(), request); long id = response.getScannerId(); if (logScannerActivity) { LOG.info("Open scanner=" + id + " for scan=" + scan.toString() + " on region " @@ -451,4 +478,32 @@ MoreResults moreResultsForScan() { void setMoreResultsForScan(MoreResults moreResults) { this.moreResultsForScan = moreResults; } + + void updateThreadPoolWaitTimeMs(long waitTimeMs) { + this.threadPoolWaitTimeMs += waitTimeMs; + } + + void updateThreadPoolExecutionTimeMs(long executionTimeMs) { + this.threadPoolExecutionTimeMs += executionTimeMs; + } + + void populateScanMetrics(ScanMetrics scanMetrics) { + if (scanMetrics == null) { + return; + } + scanMetrics.addToCounter(ScanMetrics.CLIENT_THREAD_POOL_WAIT_TIME_MS_METRIC_NAME, + threadPoolWaitTimeMs); + scanMetrics.addToCounter(ScanMetrics.CLIENT_THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME, + threadPoolExecutionTimeMs); + scanMetrics.addToCounter(ScanMetrics.CLIENT_SCAN_PREPARE_TIME_MS_METRIC_NAME, + scanPrepareTimeMs); + scanMetrics.addToCounter(ScanMetrics.CLIENT_SCAN_EXECUTION_TIME_MS_METRIC_NAME, + scanExecutionTimeMs); + scanMetrics.addToCounter(ScanMetrics.CLIENT_RPC_ROUND_TRIP_TIME_MS_METRIC_NAME, rpcCallTimeMs); + threadPoolWaitTimeMs = 0; + threadPoolExecutionTimeMs = 0; + scanPrepareTimeMs = 0; + scanExecutionTimeMs = 0; + rpcCallTimeMs = 0; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 242b9031a4ed..fd2810d44a9a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -34,7 +34,9 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ResultBoundedCompletionService.QueueingFuture; import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -126,6 +128,17 @@ public MoreResults moreResultsForScan() { return currentScannerCallable.moreResultsForScan(); } + private void updateThreadPoolMetricsInScannerCallable( + ResultBoundedCompletionService>.QueueingFuture< + Pair> future, + ScannerCallable scannerCallable) { + if (future == null || scannerCallable == null) { + return; + } + scannerCallable.updateThreadPoolWaitTimeMs(future.getQueueWaitTimeMs()); + scannerCallable.updateThreadPoolExecutionTimeMs(future.getExecutionTimeMs()); + } + @Override public Result[] call(int timeout) throws IOException { // If the active replica callable was closed somewhere, invoke the RPC to @@ -198,15 +211,15 @@ public Result[] call(int timeout) throws IOException { addCallsForCurrentReplica(cs, rpcTimeoutForCall); int startIndex = 0; + ResultBoundedCompletionService>.QueueingFuture> f = null; try { // wait for the timeout to see whether the primary responds back - Future> f = - cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds + f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds if (f != null) { // After poll, if f is not null, there must be a completed task Pair r = f.get(); if (r != null && r.getSecond() != null) { - updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); + updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool, f); } return r == null ? null : r.getFirst(); // great we got a response } @@ -244,8 +257,7 @@ public Result[] call(int timeout) throws IOException { } try { - Future> f = cs.pollForFirstSuccessfullyCompletedTask(timeout, - TimeUnit.MILLISECONDS, startIndex, endIndex); + f = cs.pollForFirstSuccessfullyCompletedTask(timeout, TimeUnit.MILLISECONDS, startIndex, endIndex); if (f == null) { throw new IOException("Failed to get result within timeout, timeout=" + timeout + "ms"); @@ -253,7 +265,7 @@ public Result[] call(int timeout) throws IOException { Pair r = f.get(); if (r != null && r.getSecond() != null) { - updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); + updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool, f); } return r == null ? null : r.getFirst(); // great we got an answer @@ -274,10 +286,11 @@ public Result[] call(int timeout) throws IOException { @SuppressWarnings("FutureReturnValueIgnored") private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result, - AtomicBoolean done, ExecutorService pool) { + AtomicBoolean done, ExecutorService pool, ResultBoundedCompletionService>.QueueingFuture> future) { if (done.compareAndSet(false, true)) { if (currentScannerCallable != scanner) replicaSwitched.set(true); currentScannerCallable = scanner; + updateThreadPoolMetricsInScannerCallable(future, currentScannerCallable); // store where to start the replica scanner from if we need to. if (result != null && result.length != 0) this.lastResult = result[result.length - 1]; if (LOG.isTraceEnabled()) { @@ -503,4 +516,10 @@ public long sleep(long pause, int tries) { public HRegionLocation getLocation() { return currentScannerCallable.getLocation(); } + + public void populateScanMetrics(ScanMetrics scanMetrics) { + if (currentScannerCallable != null) { + currentScannerCallable.populateScanMetrics(scanMetrics); + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java index 8617a851509b..8cde6ced043e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java @@ -47,7 +47,15 @@ public class ScanMetrics extends ServerSideScanMetrics { public static final String REGIONS_SCANNED_METRIC_NAME = "REGIONS_SCANNED"; public static final String RPC_RETRIES_METRIC_NAME = "RPC_RETRIES"; public static final String REMOTE_RPC_RETRIES_METRIC_NAME = "REMOTE_RPC_RETRIES"; - + public static final String CLIENT_THREAD_POOL_WAIT_TIME_MS_METRIC_NAME = "CLIENT_THREAD_POOL_WAIT_TIME_MS"; + public static final String CLIENT_THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME = + "CLIENT_THREAD_POOL_EXECUTION_TIME_MS"; + public static final String CLIENT_SCAN_PREPARE_TIME_MS_METRIC_NAME = "CLIENT_SCAN_PREPARE_TIME_MS"; + public static final String CLIENT_SCAN_EXECUTION_TIME_MS_METRIC_NAME = "CLIENT_SCAN_EXECUTION_TIME_MS"; + public static final String CLIENT_RPC_ROUND_TRIP_TIME_MS_METRIC_NAME = + "CLIENT_RPC_ROUND_TRIP_TIME_MS"; + public static final String CLIENT_CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME = + "CLIENT_CACHE_LOAD_WAIT_TIME_MS"; /** * number of RPC calls */ @@ -95,6 +103,23 @@ public class ScanMetrics extends ServerSideScanMetrics { */ public final AtomicLong countOfRemoteRPCRetries = createCounter(REMOTE_RPC_RETRIES_METRIC_NAME); + public final AtomicLong clientThreadPoolWaitTimeMs = + createCounter(CLIENT_THREAD_POOL_WAIT_TIME_MS_METRIC_NAME); + + public final AtomicLong clientThreadPoolExecutionTimeMs = + createCounter(CLIENT_THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); + + public final AtomicLong clientScanPrepareTimeMs = + createCounter(CLIENT_SCAN_PREPARE_TIME_MS_METRIC_NAME); + + public final AtomicLong clientScanExecutionTimeMs = + createCounter(CLIENT_SCAN_EXECUTION_TIME_MS_METRIC_NAME); + + public final AtomicLong clientRpcRoundTripTimeMs = + createCounter(CLIENT_RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); + + public final AtomicLong clientCacheLoadWaitTimeMs = + createCounter(CLIENT_CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); /** * Constructor */ @@ -113,5 +138,11 @@ public void moveToNextRegion() { currentRegionScanMetricsData.createCounter(REGIONS_SCANNED_METRIC_NAME); currentRegionScanMetricsData.createCounter(RPC_RETRIES_METRIC_NAME); currentRegionScanMetricsData.createCounter(REMOTE_RPC_RETRIES_METRIC_NAME); + currentRegionScanMetricsData.createCounter(CLIENT_THREAD_POOL_WAIT_TIME_MS_METRIC_NAME); + currentRegionScanMetricsData.createCounter(CLIENT_THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); + currentRegionScanMetricsData.createCounter(CLIENT_SCAN_PREPARE_TIME_MS_METRIC_NAME); + currentRegionScanMetricsData.createCounter(CLIENT_SCAN_EXECUTION_TIME_MS_METRIC_NAME); + currentRegionScanMetricsData.createCounter(CLIENT_RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); + currentRegionScanMetricsData.createCounter(CLIENT_CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); } } From b790086eb17fd1f5af7a484cca25fd0289ce0a8a Mon Sep 17 00:00:00 2001 From: Sanjeet Malhotra Date: Sun, 22 Mar 2026 04:49:29 +0530 Subject: [PATCH 02/10] Capture roundtrip time as Netty layer --- .../hadoop/hbase/client/ClientScanner.java | 4 +++- .../hadoop/hbase/client/ScannerCallable.java | 6 +++-- .../hadoop/hbase/ipc/AbstractRpcClient.java | 6 ++++- .../org/apache/hadoop/hbase/ipc/Call.java | 18 ++++++++++++++ .../ipc/DelegatingHBaseRpcController.java | 20 ++++++++++++++++ .../hadoop/hbase/ipc/HBaseRpcController.java | 14 +++++++++++ .../hbase/ipc/HBaseRpcControllerImpl.java | 24 +++++++++++++++++++ .../hbase/ipc/NettyRpcDuplexHandler.java | 3 +++ .../hadoop/hbase/ipc/RpcConnection.java | 4 ++++ 9 files changed, 95 insertions(+), 4 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 7d86f7247ad1..8276873bf4fc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -337,7 +337,9 @@ protected Result nextWithSyncCache() throws IOException { try { loadCache(); } finally { - scanMetrics.addToCounter(ScanMetrics.CLIENT_CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME, EnvironmentEdgeManager.currentTime() - cacheLoadStartTimeMs); + if (scanMetrics != null) { + scanMetrics.addToCounter(ScanMetrics.CLIENT_CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME, EnvironmentEdgeManager.currentTime() - cacheLoadStartTimeMs); + } } // try again to load from cache diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 358be62c5e7b..2fa19aa99b83 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -260,12 +260,14 @@ private void setAlreadyClosed() { } private ScanResponse doScan(RpcController controller, ScanRequest request) throws ServiceException { - long rpcCallStartTimeMs = EnvironmentEdgeManager.currentTime(); try { ScanResponse response = getStub().scan(controller, request); return response; } finally { - rpcCallTimeMs += (EnvironmentEdgeManager.currentTime() - rpcCallStartTimeMs); + if (controller instanceof HBaseRpcController) { + HBaseRpcController hrc = (HBaseRpcController) controller; + rpcCallTimeMs += hrc.getResponseReceiveTimestampInMs() - hrc.getRequestSendTimestampInMs(); + } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 60f4b7932bfc..1fe33de2054f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -338,13 +338,17 @@ private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcCont Message param, Message returnType, final User ticket, final Address isa) throws ServiceException { BlockingRpcCallback done = new BlockingRpcCallback<>(); - callMethod(md, hrc, param, returnType, ticket, isa, done); + Call call = callMethod(md, hrc, param, returnType, ticket, isa, done); Message val; try { val = done.get(); } catch (IOException e) { throw new ServiceException(e); } + finally { + hrc.setRequestSendTimestampInMs(call.getRequestSendTimestampInMs()); + hrc.setResponseReceiveTimestampInMs(call.getResponseReceiveTimestampInMs()); + } if (hrc.failed()) { throw new ServiceException(hrc.getFailed()); } else { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java index 980e708d235c..d5a0d873c126 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java @@ -62,6 +62,8 @@ class Call { private final RpcCallback callback; final Span span; Timeout timeoutTask; + long requestSendTimestampInMs = 0; + long responseReceiveTimestampInMs = 0; Call(int id, final Descriptors.MethodDescriptor md, Message param, final CellScanner cells, final Message responseDefaultType, int timeout, int priority, Map attributes, @@ -159,4 +161,20 @@ public long getStartTime() { public boolean isConnectionRegistryCall() { return md.getService().equals(ConnectionRegistryService.getDescriptor()); } + + public void setRequestSendTimestampInMs(long timestamp) { + this.requestSendTimestampInMs = timestamp; + } + + public void setResponseReceiveTimestampInMs(long timestamp) { + this.responseReceiveTimestampInMs = timestamp; + } + + public long getRequestSendTimestampInMs() { + return requestSendTimestampInMs; + } + + public long getResponseReceiveTimestampInMs() { + return responseReceiveTimestampInMs; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java index 2b8839bf8462..f0fea71071a8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java @@ -153,4 +153,24 @@ public void setTableName(TableName tableName) { public TableName getTableName() { return delegate.getTableName(); } + + @Override + public void setRequestSendTimestampInMs(long timestamp) { + delegate.setRequestSendTimestampInMs(timestamp); + } + + @Override + public long getRequestSendTimestampInMs() { + return delegate.getRequestSendTimestampInMs(); + } + + @Override + public void setResponseReceiveTimestampInMs(long timestamp) { + delegate.setResponseReceiveTimestampInMs(timestamp); + } + + @Override + public long getResponseReceiveTimestampInMs() { + return delegate.getResponseReceiveTimestampInMs(); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java index 4d3e038bb5ec..9ba6c0534bed 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java @@ -140,4 +140,18 @@ default void setTableName(TableName tableName) { default TableName getTableName() { return null; } + + default void setRequestSendTimestampInMs(long timestamp) { + } + + default void setResponseReceiveTimestampInMs(long timestamp) { + } + + default long getRequestSendTimestampInMs() { + return 0; + } + + default long getResponseReceiveTimestampInMs() { + return 0; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java index 1245fc0f20d2..3ea2211711fc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java @@ -53,6 +53,10 @@ public class HBaseRpcControllerImpl implements HBaseRpcController { private TableName tableName; + private long requestSendTimestampInMs = 0; + + private long responseReceiveTimestampInMs = 0; + /** * Rpc target Region's RegionInfo we are going against. May be null. * @see #hasRegionInfo() @@ -294,4 +298,24 @@ public void setTableName(TableName tableName) { public TableName getTableName() { return tableName; } + + @Override + public void setRequestSendTimestampInMs(long timestamp) { + this.requestSendTimestampInMs = timestamp; + } + + @Override + public long getRequestSendTimestampInMs() { + return requestSendTimestampInMs; + } + + @Override + public void setResponseReceiveTimestampInMs(long timestamp) { + this.responseReceiveTimestampInMs = timestamp; + } + + @Override + public long getResponseReceiveTimestampInMs() { + return responseReceiveTimestampInMs; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java index ef3752bbf47c..ed33610fc62f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * The netty rpc handler. @@ -113,6 +114,8 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) if (msg instanceof Call) { Call call = (Call) msg; try (Scope scope = call.span.makeCurrent()) { + long currentTime = EnvironmentEdgeManager.currentTime(); + call.setRequestSendTimestampInMs(currentTime); writeRequest(ctx, call, promise); } } else { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java index 2ec5a7ee1a39..23f3ed2cc340 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java @@ -466,5 +466,9 @@ void readResponse(T in, Map i // problem throw e; } + finally { + long currentTime = EnvironmentEdgeManager.currentTime(); + call.setResponseReceiveTimestampInMs(currentTime); + } } } From 11c21955afafba8a2c6bb4f5e3e34144f0c145d6 Mon Sep 17 00:00:00 2001 From: Sanjeet Malhotra Date: Mon, 23 Mar 2026 02:06:58 +0530 Subject: [PATCH 03/10] Add basic test coverage --- .../hbase/client/ReversedScannerCallable.java | 6 +- .../hadoop/hbase/client/ScannerCallable.java | 55 ++-- .../hadoop/hbase/ipc/RpcConnection.java | 59 ++-- .../TestClientSideTableScanMetrics.java | 282 ++++++++++++++++++ 4 files changed, 349 insertions(+), 53 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideTableScanMetrics.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java index ba215450e5af..a3404213e467 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java @@ -75,11 +75,7 @@ public void throwable(Throwable t, boolean retrying) { * @param reload force reload of server location */ @Override - public void prepare(boolean reload) throws IOException { - if (Thread.interrupted()) { - throw new InterruptedIOException(); - } - + protected void doPrepare(boolean reload) throws IOException { if ( reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME) && getConnection().isTableDisabled(getTableName()) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 2fa19aa99b83..a7bc47643df7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -153,6 +153,29 @@ protected final RegionLocations getRegionLocationsForPrepare(byte[] row) throws getTableName(), row); } + protected void doPrepare(boolean reload) throws IOException { + if ( + reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME) + && getConnection().isTableDisabled(getTableName()) + ) { + throw new TableNotEnabledException(getTableName().getNameAsString() + " is disabled."); + } + + RegionLocations rl = getRegionLocationsForPrepare(getRow()); + location = getLocationForReplica(rl); + ServerName dest = location.getServerName(); + setStub(super.getConnection().getClient(dest)); + if (!instantiated || reload) { + checkIfRegionServerIsRemote(); + instantiated = true; + } + cursor = null; + // check how often we retry. + if (reload) { + incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); + } + } + /** * @param reload force reload of server location */ @@ -164,26 +187,7 @@ public void prepare(boolean reload) throws IOException { long prepareStartTimeMs = EnvironmentEdgeManager.currentTime(); try { - if ( - reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME) - && getConnection().isTableDisabled(getTableName()) - ) { - throw new TableNotEnabledException(getTableName().getNameAsString() + " is disabled."); - } - - RegionLocations rl = getRegionLocationsForPrepare(getRow()); - location = getLocationForReplica(rl); - ServerName dest = location.getServerName(); - setStub(super.getConnection().getClient(dest)); - if (!instantiated || reload) { - checkIfRegionServerIsRemote(); - instantiated = true; - } - cursor = null; - // check how often we retry. - if (reload) { - incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); - } + doPrepare(reload); } finally { scanPrepareTimeMs += (EnvironmentEdgeManager.currentTime() - prepareStartTimeMs); } @@ -259,6 +263,15 @@ private void setAlreadyClosed() { this.closed = true; } + private long getRpcCallTimeMs(HBaseRpcController hrc) { + long requestSendTimestampInMs = hrc.getRequestSendTimestampInMs(); + long responseReceiveTimestampInMs = hrc.getResponseReceiveTimestampInMs(); + if (requestSendTimestampInMs == 0 || responseReceiveTimestampInMs == 0) { + return 0; + } + return responseReceiveTimestampInMs - requestSendTimestampInMs; + } + private ScanResponse doScan(RpcController controller, ScanRequest request) throws ServiceException { try { ScanResponse response = getStub().scan(controller, request); @@ -266,7 +279,7 @@ private ScanResponse doScan(RpcController controller, ScanRequest request) throw } finally { if (controller instanceof HBaseRpcController) { HBaseRpcController hrc = (HBaseRpcController) controller; - rpcCallTimeMs += hrc.getResponseReceiveTimestampInMs() - hrc.getRequestSendTimestampInMs(); + rpcCallTimeMs += getRpcCallTimeMs(hrc); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java index 23f3ed2cc340..46db97029996 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java @@ -366,30 +366,34 @@ protected final Call createSecurityPreambleCall(RpcCallback callback) { private void finishCall(ResponseHeader responseHeader, T in, Call call) throws IOException { Message value; - if (call.responseDefaultType != null) { - Message.Builder builder = call.responseDefaultType.newBuilderForType(); - if (!builder.mergeDelimitedFrom(in)) { - // The javadoc of mergeDelimitedFrom says returning false means the stream reaches EOF - // before reading any bytes out, so here we need to manually finish create the EOFException - // and finish the call - call.setException(new EOFException("EOF while reading response with type: " - + call.responseDefaultType.getClass().getName())); - return; - } - value = builder.build(); - } else { - value = null; - } CellScanner cellBlockScanner; - if (responseHeader.hasCellBlockMeta()) { - int size = responseHeader.getCellBlockMeta().getLength(); - // Maybe we could read directly from the ByteBuf. - // The problem here is that we do not know when to release it. - byte[] cellBlock = new byte[size]; - in.readFully(cellBlock); - cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock); - } else { - cellBlockScanner = null; + try { + if (call.responseDefaultType != null) { + Message.Builder builder = call.responseDefaultType.newBuilderForType(); + if (!builder.mergeDelimitedFrom(in)) { + // The javadoc of mergeDelimitedFrom says returning false means the stream reaches EOF + // before reading any bytes out, so here we need to manually finish create the EOFException + // and finish the call + call.setException(new EOFException("EOF while reading response with type: " + + call.responseDefaultType.getClass().getName())); + return; + } + value = builder.build(); + } else { + value = null; + } + if (responseHeader.hasCellBlockMeta()) { + int size = responseHeader.getCellBlockMeta().getLength(); + // Maybe we could read directly from the ByteBuf. + // The problem here is that we do not know when to release it. + byte[] cellBlock = new byte[size]; + in.readFully(cellBlock); + cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock); + } else { + cellBlockScanner = null; + } + } finally { + setResponseReceiveTimestampInMs(call); } call.setResponse(value, cellBlockScanner); } @@ -452,6 +456,7 @@ void readResponse(T in, Map i } call.callStats.setResponseSizeBytes(totalSize); if (remoteExc != null) { + setResponseReceiveTimestampInMs(call); call.setException(remoteExc); return; } @@ -466,9 +471,9 @@ void readResponse(T in, Map i // problem throw e; } - finally { - long currentTime = EnvironmentEdgeManager.currentTime(); - call.setResponseReceiveTimestampInMs(currentTime); - } + } + + private void setResponseReceiveTimestampInMs(Call call) { + call.setResponseReceiveTimestampInMs(EnvironmentEdgeManager.currentTime()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideTableScanMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideTableScanMetrics.java new file mode 100644 index 000000000000..987e276d421f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideTableScanMetrics.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@Category({ ClientTests.class, LargeTests.class }) +public class TestClientSideTableScanMetrics extends FromClientSideBase { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestClientSideTableScanMetrics.class); + + public static class SleepOnScanCoprocessor implements RegionCoprocessor, RegionObserver { + static final int SLEEP_TIME_MS = 5; + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public void preScannerOpen(ObserverContext c, Scan scan) + throws IOException { + try { + Thread.sleep(SLEEP_TIME_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + @Override + public boolean preScannerNext(ObserverContext c, + InternalScanner s, List results, int limit, boolean hasMore) throws IOException { + try { + Thread.sleep(SLEEP_TIME_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return true; + } + } + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static final TableName TABLE_NAME = + TableName.valueOf(TestClientSideTableScanMetrics.class.getSimpleName()); + + private static final byte[] CF = Bytes.toBytes("cf"); + + private static final byte[] CQ = Bytes.toBytes("cq"); + + private static final byte[] VALUE = Bytes.toBytes("value"); + + private static int NUM_REGIONS; + + private static Connection CONN; + + @Parameters(name = "{index}: scanner={0}") + public static List params() { + return Arrays.asList(new Object[] { "ForwardScanner", new Scan() }, + new Object[] { "ReverseScanner", new Scan().setReversed(true) }); + } + + @Parameter(0) + public String scannerName; + + @Parameter(1) + public Scan originalScan; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.getConfiguration().setInt(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0); + TEST_UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + SleepOnScanCoprocessor.class.getName()); + TEST_UTIL.startMiniCluster(2); + try (Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, CF)) { + table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, VALUE), + new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE), + new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE))); + } + CONN = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); + NUM_REGIONS = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).size(); + } + + @AfterClass + public static void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + protected Scan generateScan(byte[] smallerRow, byte[] largerRow) throws IOException { + Scan scan = new Scan(originalScan); + if (originalScan.isReversed()) { + scan.withStartRow(largerRow, true); + scan.withStopRow(smallerRow, true); + } else { + scan.withStartRow(smallerRow, true); + scan.withStopRow(largerRow, true); + } + return scan; + } + + protected ScanMetrics assertScannedRowsAndGetScanMetrics(Scan scan, int expectedCount) + throws IOException { + int countOfRows = 0; + ScanMetrics scanMetrics; + try (Table table = CONN.getTable(TABLE_NAME); ResultScanner scanner = table.getScanner(scan)) { + for (Result result : scanner) { + Assert.assertFalse(result.isEmpty()); + countOfRows++; + } + scanMetrics = scanner.getScanMetrics(); + } + Assert.assertEquals(expectedCount, countOfRows); + return scanMetrics; + } + + @Test + public void testScanExecutionAndRpcTimeMetrics() throws Exception { + Scan scan = generateScan(HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + scan.setScanMetricsEnabled(true); + ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 3); + Assert.assertNotNull(scanMetrics); + Map metricsMap = scanMetrics.getMetricsMap(false); + + long cacheLoad = metricsMap.get(ScanMetrics.CLIENT_CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); + long scanExec = metricsMap.get(ScanMetrics.CLIENT_SCAN_EXECUTION_TIME_MS_METRIC_NAME); + long rpcRoundTrip = metricsMap.get(ScanMetrics.CLIENT_RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); + + Assert.assertTrue("clientCacheLoadWaitTimeMs should be > 0, got " + cacheLoad, + cacheLoad > 0); + Assert.assertTrue("clientScanExecutionTimeMs should be > 0, got " + scanExec, + scanExec > 0); + Assert.assertTrue("clientRpcRoundTripTimeMs should be > 0, got " + rpcRoundTrip, + rpcRoundTrip > 0); + + Assert.assertTrue("rpcRoundTrip (" + rpcRoundTrip + ") should be <= scanExecution (" + + scanExec + ")", rpcRoundTrip <= scanExec); + Assert.assertTrue("scanExecution (" + scanExec + ") should be <= cacheLoad (" + + cacheLoad + ")", scanExec <= cacheLoad); + } + + @Test + public void testScanPrepareTimeMetric() throws Exception { + CONN.clearRegionLocationCache(); + + Scan scan = generateScan(HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + scan.setScanMetricsEnabled(true); + ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 3); + Assert.assertNotNull(scanMetrics); + Map metricsMap = scanMetrics.getMetricsMap(false); + + long prepareTime = metricsMap.get(ScanMetrics.CLIENT_SCAN_PREPARE_TIME_MS_METRIC_NAME); + Assert.assertTrue("clientScanPrepareTimeMs should be > 0, got " + prepareTime, + prepareTime > 0); + } + + @Test + public void testThreadPoolWaitTimeMetric() throws Exception { + ExecutorService singleThreadPool = Executors.newFixedThreadPool(1); + try { + CountDownLatch taskStarted = new CountDownLatch(1); + CountDownLatch allowFinish = new CountDownLatch(1); + singleThreadPool.submit(() -> { + taskStarted.countDown(); + try { + allowFinish.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + taskStarted.await(); + + Scan scan = generateScan(HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + scan.setScanMetricsEnabled(true); + + new Thread(() -> { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + allowFinish.countDown(); + }).start(); + + int countOfRows = 0; + ScanMetrics scanMetrics; + try (Table table = CONN.getTable(TABLE_NAME, singleThreadPool); + ResultScanner scanner = table.getScanner(scan)) { + for (Result result : scanner) { + Assert.assertFalse(result.isEmpty()); + countOfRows++; + } + scanMetrics = scanner.getScanMetrics(); + } + Assert.assertEquals(3, countOfRows); + Assert.assertNotNull(scanMetrics); + + long waitTime = scanMetrics.clientThreadPoolWaitTimeMs.get(); + long execTime = scanMetrics.clientThreadPoolExecutionTimeMs.get(); + Assert.assertTrue("clientThreadPoolWaitTimeMs should be > 0, got " + waitTime, + waitTime > 0); + Assert.assertTrue("clientThreadPoolExecutionTimeMs should be > 0, got " + execTime, + execTime > 0); + } finally { + singleThreadPool.shutdownNow(); + } + } + + @Test + public void testTimingHierarchyBetweenClientAndServerMetrics() throws Exception { + Scan scan = generateScan(HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + scan.setScanMetricsEnabled(true); + ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 3); + Assert.assertNotNull(scanMetrics); + Map metricsMap = scanMetrics.getMetricsMap(false); + + long serverProcessingTime = + metricsMap.get(ServerSideScanMetrics.RPC_SCAN_PROCESSING_TIME_METRIC_NAME); + long serverQueueWaitTime = + metricsMap.get(ServerSideScanMetrics.RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME); + long serverTime = serverProcessingTime + serverQueueWaitTime; + long rpcRoundTrip = metricsMap.get(ScanMetrics.CLIENT_RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); + long scanExec = metricsMap.get(ScanMetrics.CLIENT_SCAN_EXECUTION_TIME_MS_METRIC_NAME); + long cacheLoad = metricsMap.get(ScanMetrics.CLIENT_CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); + + Assert.assertTrue("serverTime should be > 0, got " + serverTime, serverTime > 0); + + Assert.assertTrue("serverTime (" + serverTime + ") should be <= rpcRoundTrip (" + + rpcRoundTrip + ")", serverTime <= rpcRoundTrip); + Assert.assertTrue("rpcRoundTrip (" + rpcRoundTrip + ") should be <= scanExecution (" + + scanExec + ")", rpcRoundTrip <= scanExec); + Assert.assertTrue("scanExecution (" + scanExec + ") should be <= cacheLoad (" + + cacheLoad + ")", scanExec <= cacheLoad); + } +} From 73f2edc16f6348769230e1bb0638f8aff4b2a3ba Mon Sep 17 00:00:00 2001 From: Sanjeet Malhotra Date: Mon, 30 Mar 2026 21:10:53 +0530 Subject: [PATCH 04/10] Covered testing all metrics --- .../hadoop/hbase/client/ClientScanner.java | 2 +- .../hbase/client/ReversedScannerCallable.java | 6 +- .../hadoop/hbase/client/ScannerCallable.java | 44 ++--- .../client/ScannerCallableWithReplicas.java | 17 ++ .../hbase/client/metrics/ScanMetrics.java | 68 ++++---- .../TestClientSideTableScanMetrics.java | 161 ++++++++++++------ 6 files changed, 190 insertions(+), 108 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 8276873bf4fc..a55f922ba8e3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -338,7 +338,7 @@ protected Result nextWithSyncCache() throws IOException { loadCache(); } finally { if (scanMetrics != null) { - scanMetrics.addToCounter(ScanMetrics.CLIENT_CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME, EnvironmentEdgeManager.currentTime() - cacheLoadStartTimeMs); + scanMetrics.addToCounter(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME, EnvironmentEdgeManager.currentTime() - cacheLoadStartTimeMs); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java index a3404213e467..ba215450e5af 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java @@ -75,7 +75,11 @@ public void throwable(Throwable t, boolean retrying) { * @param reload force reload of server location */ @Override - protected void doPrepare(boolean reload) throws IOException { + public void prepare(boolean reload) throws IOException { + if (Thread.interrupted()) { + throw new InterruptedIOException(); + } + if ( reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME) && getConnection().isTableDisabled(getTableName()) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index a7bc47643df7..83fc575b7f19 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -78,7 +78,6 @@ public class ScannerCallable extends ClientServiceCallable { private boolean logScannerActivity = false; private int logCutOffLatency = 1000; protected final int id; - private long scanPrepareTimeMs = 0; private long scanExecutionTimeMs = 0; private long rpcCallTimeMs = 0; private long threadPoolWaitTimeMs = 0; @@ -153,7 +152,15 @@ protected final RegionLocations getRegionLocationsForPrepare(byte[] row) throws getTableName(), row); } - protected void doPrepare(boolean reload) throws IOException { + /** + * @param reload force reload of server location + */ + @Override + public void prepare(boolean reload) throws IOException { + if (Thread.interrupted()) { + throw new InterruptedIOException(); + } + if ( reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME) && getConnection().isTableDisabled(getTableName()) @@ -176,23 +183,6 @@ && getConnection().isTableDisabled(getTableName()) } } - /** - * @param reload force reload of server location - */ - @Override - public void prepare(boolean reload) throws IOException { - if (Thread.interrupted()) { - throw new InterruptedIOException(); - } - - long prepareStartTimeMs = EnvironmentEdgeManager.currentTime(); - try { - doPrepare(reload); - } finally { - scanPrepareTimeMs += (EnvironmentEdgeManager.currentTime() - prepareStartTimeMs); - } - } - /** * compare the local machine hostname with region server's hostname to decide if hbase client * connects to a remote region server @@ -506,19 +496,21 @@ void populateScanMetrics(ScanMetrics scanMetrics) { if (scanMetrics == null) { return; } - scanMetrics.addToCounter(ScanMetrics.CLIENT_THREAD_POOL_WAIT_TIME_MS_METRIC_NAME, + scanMetrics.addToCounter(ScanMetrics.THREAD_POOL_WAIT_TIME_MS_METRIC_NAME, threadPoolWaitTimeMs); - scanMetrics.addToCounter(ScanMetrics.CLIENT_THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME, + scanMetrics.addToCounter(ScanMetrics.THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME, threadPoolExecutionTimeMs); - scanMetrics.addToCounter(ScanMetrics.CLIENT_SCAN_PREPARE_TIME_MS_METRIC_NAME, - scanPrepareTimeMs); - scanMetrics.addToCounter(ScanMetrics.CLIENT_SCAN_EXECUTION_TIME_MS_METRIC_NAME, + scanMetrics.addToCounter(ScanMetrics.SCAN_EXECUTION_TIME_MS_METRIC_NAME, scanExecutionTimeMs); - scanMetrics.addToCounter(ScanMetrics.CLIENT_RPC_ROUND_TRIP_TIME_MS_METRIC_NAME, rpcCallTimeMs); + scanMetrics.addToCounter(ScanMetrics.RPC_ROUND_TRIP_TIME_MS_METRIC_NAME, rpcCallTimeMs); threadPoolWaitTimeMs = 0; threadPoolExecutionTimeMs = 0; - scanPrepareTimeMs = 0; scanExecutionTimeMs = 0; rpcCallTimeMs = 0; } + + // Need in ScannerCallableWithReplias during closeScanner call. + long getScanExecutionTimeMs() { + return scanExecutionTimeMs; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index fd2810d44a9a..a584ccb03b15 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.ResultBoundedCompletionService.QueueingFuture; import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -73,6 +74,8 @@ class ScannerCallableWithReplicas implements RetryingCallable { private Set outstandingCallables = new HashSet<>(); private boolean someRPCcancelled = false; // required for testing purposes only private int regionReplication = 0; + private long metaLookupTimeMs = 0; + private long scannerCloseTimeMs = 0; public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection, ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, @@ -152,6 +155,7 @@ public Result[] call(int timeout) throws IOException { LOG.trace("Closing scanner id=" + currentScannerCallable.scannerId); } Result[] r = currentScannerCallable.call(timeout); + scannerCloseTimeMs += currentScannerCallable.getScanExecutionTimeMs(); currentScannerCallable = null; return r; } else if (currentScannerCallable == null) { @@ -171,6 +175,7 @@ public Result[] call(int timeout) throws IOException { if (regionReplication <= 0) { RegionLocations rl = null; + long metaLookupStartTimeMs = EnvironmentEdgeManager.currentTime(); try { rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, @@ -189,6 +194,9 @@ public Result[] call(int timeout) throws IOException { throw e; } } + finally { + metaLookupTimeMs += (EnvironmentEdgeManager.currentTime() - metaLookupStartTimeMs); + } regionReplication = rl.size(); } // allocate a bounded-completion pool of some multiple of number of replicas. @@ -518,6 +526,15 @@ public HRegionLocation getLocation() { } public void populateScanMetrics(ScanMetrics scanMetrics) { + if (scanMetrics == null) { + return; + } + scanMetrics.addToCounter(ScanMetrics.META_LOOKUP_TIME_MS_METRIC_NAME, + metaLookupTimeMs); + metaLookupTimeMs = 0; + scanMetrics.addToCounter(ScanMetrics.SCANNER_CLOSE_TIME_MS_METRIC_NAME, + scannerCloseTimeMs); + scannerCloseTimeMs = 0; if (currentScannerCallable != null) { currentScannerCallable.populateScanMetrics(scanMetrics); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java index 8cde6ced043e..73a8c6fad79e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java @@ -47,15 +47,15 @@ public class ScanMetrics extends ServerSideScanMetrics { public static final String REGIONS_SCANNED_METRIC_NAME = "REGIONS_SCANNED"; public static final String RPC_RETRIES_METRIC_NAME = "RPC_RETRIES"; public static final String REMOTE_RPC_RETRIES_METRIC_NAME = "REMOTE_RPC_RETRIES"; - public static final String CLIENT_THREAD_POOL_WAIT_TIME_MS_METRIC_NAME = "CLIENT_THREAD_POOL_WAIT_TIME_MS"; - public static final String CLIENT_THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME = - "CLIENT_THREAD_POOL_EXECUTION_TIME_MS"; - public static final String CLIENT_SCAN_PREPARE_TIME_MS_METRIC_NAME = "CLIENT_SCAN_PREPARE_TIME_MS"; - public static final String CLIENT_SCAN_EXECUTION_TIME_MS_METRIC_NAME = "CLIENT_SCAN_EXECUTION_TIME_MS"; - public static final String CLIENT_RPC_ROUND_TRIP_TIME_MS_METRIC_NAME = - "CLIENT_RPC_ROUND_TRIP_TIME_MS"; - public static final String CLIENT_CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME = - "CLIENT_CACHE_LOAD_WAIT_TIME_MS"; + public static final String THREAD_POOL_WAIT_TIME_MS_METRIC_NAME = + "THREAD_POOL_WAIT_TIME_MS"; + public static final String THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME = + "THREAD_POOL_EXECUTION_TIME_MS"; + public static final String META_LOOKUP_TIME_MS_METRIC_NAME = "META_LOOKUP_TIME_MS"; + public static final String SCANNER_CLOSE_TIME_MS_METRIC_NAME = "SCANNER_CLOSE_TIME_MS"; + public static final String SCAN_EXECUTION_TIME_MS_METRIC_NAME = "SCAN_EXECUTION_TIME_MS"; + public static final String RPC_ROUND_TRIP_TIME_MS_METRIC_NAME = "RPC_ROUND_TRIP_TIME_MS"; + public static final String CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME = "CACHE_LOAD_WAIT_TIME_MS"; /** * number of RPC calls */ @@ -103,23 +103,26 @@ public class ScanMetrics extends ServerSideScanMetrics { */ public final AtomicLong countOfRemoteRPCRetries = createCounter(REMOTE_RPC_RETRIES_METRIC_NAME); - public final AtomicLong clientThreadPoolWaitTimeMs = - createCounter(CLIENT_THREAD_POOL_WAIT_TIME_MS_METRIC_NAME); - - public final AtomicLong clientThreadPoolExecutionTimeMs = - createCounter(CLIENT_THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); - - public final AtomicLong clientScanPrepareTimeMs = - createCounter(CLIENT_SCAN_PREPARE_TIME_MS_METRIC_NAME); - - public final AtomicLong clientScanExecutionTimeMs = - createCounter(CLIENT_SCAN_EXECUTION_TIME_MS_METRIC_NAME); - - public final AtomicLong clientRpcRoundTripTimeMs = - createCounter(CLIENT_RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); - - public final AtomicLong clientCacheLoadWaitTimeMs = - createCounter(CLIENT_CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); + public final AtomicLong threadPoolWaitTimeMs = + createCounter(THREAD_POOL_WAIT_TIME_MS_METRIC_NAME); + + public final AtomicLong threadPoolExecutionTimeMs = + createCounter(THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); + + public final AtomicLong metaLookupTimeMs = + createCounter(META_LOOKUP_TIME_MS_METRIC_NAME); + + public final AtomicLong scannerCloseTimeMs = + createCounter(SCANNER_CLOSE_TIME_MS_METRIC_NAME); + + public final AtomicLong scanExecutionTimeMs = + createCounter(SCAN_EXECUTION_TIME_MS_METRIC_NAME); + + public final AtomicLong rpcRoundTripTimeMs = + createCounter(RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); + + public final AtomicLong cacheLoadWaitTimeMs = + createCounter(CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); /** * Constructor */ @@ -138,11 +141,12 @@ public void moveToNextRegion() { currentRegionScanMetricsData.createCounter(REGIONS_SCANNED_METRIC_NAME); currentRegionScanMetricsData.createCounter(RPC_RETRIES_METRIC_NAME); currentRegionScanMetricsData.createCounter(REMOTE_RPC_RETRIES_METRIC_NAME); - currentRegionScanMetricsData.createCounter(CLIENT_THREAD_POOL_WAIT_TIME_MS_METRIC_NAME); - currentRegionScanMetricsData.createCounter(CLIENT_THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); - currentRegionScanMetricsData.createCounter(CLIENT_SCAN_PREPARE_TIME_MS_METRIC_NAME); - currentRegionScanMetricsData.createCounter(CLIENT_SCAN_EXECUTION_TIME_MS_METRIC_NAME); - currentRegionScanMetricsData.createCounter(CLIENT_RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); - currentRegionScanMetricsData.createCounter(CLIENT_CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); + currentRegionScanMetricsData.createCounter(THREAD_POOL_WAIT_TIME_MS_METRIC_NAME); + currentRegionScanMetricsData.createCounter(THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); + currentRegionScanMetricsData.createCounter(META_LOOKUP_TIME_MS_METRIC_NAME); + currentRegionScanMetricsData.createCounter(SCANNER_CLOSE_TIME_MS_METRIC_NAME); + currentRegionScanMetricsData.createCounter(SCAN_EXECUTION_TIME_MS_METRIC_NAME); + currentRegionScanMetricsData.createCounter(RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); + currentRegionScanMetricsData.createCounter(CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideTableScanMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideTableScanMetrics.java index 987e276d421f..1edb240710ac 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideTableScanMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideTableScanMetrics.java @@ -24,15 +24,16 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; -import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; @@ -44,6 +45,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -57,6 +59,8 @@ public class TestClientSideTableScanMetrics extends FromClientSideBase { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestClientSideTableScanMetrics.class); + static AtomicInteger sleepTimeMs = new AtomicInteger(0); + public static class SleepOnScanCoprocessor implements RegionCoprocessor, RegionObserver { static final int SLEEP_TIME_MS = 5; @@ -70,6 +74,7 @@ public void preScannerOpen(ObserverContext c, Scan throws IOException { try { Thread.sleep(SLEEP_TIME_MS); + sleepTimeMs.addAndGet(SLEEP_TIME_MS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -80,11 +85,23 @@ public boolean preScannerNext(ObserverContext c, InternalScanner s, List results, int limit, boolean hasMore) throws IOException { try { Thread.sleep(SLEEP_TIME_MS); + sleepTimeMs.addAndGet(SLEEP_TIME_MS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return true; } + + @Override + public void preScannerClose(ObserverContext c, + InternalScanner s) throws IOException { + try { + Thread.sleep(SLEEP_TIME_MS); + sleepTimeMs.addAndGet(SLEEP_TIME_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -123,7 +140,9 @@ public static void setUp() throws Exception { try (Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, CF)) { table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, VALUE), new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE), - new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE))); + new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE), + new Put(Bytes.toBytes("zzz2")).addColumn(CF, CQ, VALUE), + new Put(Bytes.toBytes("zzz3")).addColumn(CF, CQ, VALUE))); } CONN = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); NUM_REGIONS = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).size(); @@ -134,6 +153,11 @@ public static void tearDown() throws Exception { TEST_UTIL.shutdownMiniCluster(); } + @Before + public void testCaseSetup() { + sleepTimeMs.set(0); + } + protected Scan generateScan(byte[] smallerRow, byte[] largerRow) throws IOException { Scan scan = new Scan(originalScan); if (originalScan.isReversed()) { @@ -158,6 +182,7 @@ protected ScanMetrics assertScannedRowsAndGetScanMetrics(Scan scan, int expected scanMetrics = scanner.getScanMetrics(); } Assert.assertEquals(expectedCount, countOfRows); + System.out.println("ScanMetrics: " + scanMetrics + ", sleepTimeMs: " + sleepTimeMs.get()); return scanMetrics; } @@ -165,20 +190,24 @@ protected ScanMetrics assertScannedRowsAndGetScanMetrics(Scan scan, int expected public void testScanExecutionAndRpcTimeMetrics() throws Exception { Scan scan = generateScan(HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); scan.setScanMetricsEnabled(true); - ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 3); + ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 5); Assert.assertNotNull(scanMetrics); Map metricsMap = scanMetrics.getMetricsMap(false); - long cacheLoad = metricsMap.get(ScanMetrics.CLIENT_CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); - long scanExec = metricsMap.get(ScanMetrics.CLIENT_SCAN_EXECUTION_TIME_MS_METRIC_NAME); - long rpcRoundTrip = metricsMap.get(ScanMetrics.CLIENT_RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); + long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); + Assert.assertEquals(NUM_REGIONS, regionsScanned); - Assert.assertTrue("clientCacheLoadWaitTimeMs should be > 0, got " + cacheLoad, - cacheLoad > 0); - Assert.assertTrue("clientScanExecutionTimeMs should be > 0, got " + scanExec, - scanExec > 0); - Assert.assertTrue("clientRpcRoundTripTimeMs should be > 0, got " + rpcRoundTrip, - rpcRoundTrip > 0); + long cacheLoad = metricsMap.get(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); + long scanExec = metricsMap.get(ScanMetrics.SCAN_EXECUTION_TIME_MS_METRIC_NAME); + long rpcRoundTrip = metricsMap.get(ScanMetrics.RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); + + // Per region two times the slowness is introduced, once for preScannerOpen and once for preScannerNext. + Assert.assertTrue( + cacheLoad >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + Assert.assertTrue( + scanExec >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + Assert.assertTrue( + rpcRoundTrip >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); Assert.assertTrue("rpcRoundTrip (" + rpcRoundTrip + ") should be <= scanExecution (" + scanExec + ")", rpcRoundTrip <= scanExec); @@ -187,22 +216,33 @@ public void testScanExecutionAndRpcTimeMetrics() throws Exception { } @Test - public void testScanPrepareTimeMetric() throws Exception { + public void testMetaLookupTimeMetric() throws Exception { CONN.clearRegionLocationCache(); Scan scan = generateScan(HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); scan.setScanMetricsEnabled(true); - ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 3); + ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 5); Assert.assertNotNull(scanMetrics); Map metricsMap = scanMetrics.getMetricsMap(false); - long prepareTime = metricsMap.get(ScanMetrics.CLIENT_SCAN_PREPARE_TIME_MS_METRIC_NAME); - Assert.assertTrue("clientScanPrepareTimeMs should be > 0, got " + prepareTime, - prepareTime > 0); + long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); + Assert.assertEquals(NUM_REGIONS, regionsScanned); + + // Meta lookup time for forward scan happens in ScannerCallableWithReplicas.call() but for reverse scan it happens in ReverseScannerCallable.prepare() except the first time. Thus, asserting on minimum value of meta lookup time across both directions of scan. + long metaLookupTime = metricsMap.get(ScanMetrics.META_LOOKUP_TIME_MS_METRIC_NAME); + Assert.assertTrue( + metaLookupTime >= SleepOnScanCoprocessor.SLEEP_TIME_MS * 2); + + long cacheLoad = metricsMap.get(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); + Assert.assertTrue( + cacheLoad >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + Assert.assertTrue("metaLookupTime (" + metaLookupTime + ") should be <= cacheLoad (" + + cacheLoad + ")", metaLookupTime <= cacheLoad); } @Test - public void testThreadPoolWaitTimeMetric() throws Exception { + public void testThreadPoolMetrics() throws Exception { + int minWaitTimeMs = 50; ExecutorService singleThreadPool = Executors.newFixedThreadPool(1); try { CountDownLatch taskStarted = new CountDownLatch(1); @@ -222,7 +262,11 @@ public void testThreadPoolWaitTimeMetric() throws Exception { new Thread(() -> { try { - Thread.sleep(50); + ThreadPoolExecutor tpe = (ThreadPoolExecutor) singleThreadPool; + while (tpe.getQueue().size() < 1) { + Thread.sleep(10); + } + Thread.sleep(minWaitTimeMs); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -239,44 +283,65 @@ public void testThreadPoolWaitTimeMetric() throws Exception { } scanMetrics = scanner.getScanMetrics(); } - Assert.assertEquals(3, countOfRows); + Assert.assertEquals(5, countOfRows); Assert.assertNotNull(scanMetrics); - - long waitTime = scanMetrics.clientThreadPoolWaitTimeMs.get(); - long execTime = scanMetrics.clientThreadPoolExecutionTimeMs.get(); - Assert.assertTrue("clientThreadPoolWaitTimeMs should be > 0, got " + waitTime, - waitTime > 0); - Assert.assertTrue("clientThreadPoolExecutionTimeMs should be > 0, got " + execTime, - execTime > 0); + Map metricsMap = scanMetrics.getMetricsMap(false); + + long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); + Assert.assertEquals(NUM_REGIONS, regionsScanned); + + long threadPoolWaitTimeMs = metricsMap.get(ScanMetrics.THREAD_POOL_WAIT_TIME_MS_METRIC_NAME); + Assert.assertTrue(threadPoolWaitTimeMs >= minWaitTimeMs); + long threadPoolExecutionTimeMs = metricsMap.get(ScanMetrics.THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); + Assert.assertTrue(threadPoolExecutionTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + long scanExecutionTimeMs = metricsMap.get(ScanMetrics.SCAN_EXECUTION_TIME_MS_METRIC_NAME); + Assert.assertTrue( + scanExecutionTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + long cacheLoadWaitTimeMs = metricsMap.get(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); + + Assert.assertTrue("scanExecutionTimeMs (" + scanExecutionTimeMs + + ") should be <= threadPoolExecutionTimeMs (" + threadPoolExecutionTimeMs + ")", + scanExecutionTimeMs <= threadPoolExecutionTimeMs); + Assert.assertTrue("threadPoolExecutionTimeMs (" + threadPoolExecutionTimeMs + + ") should be <= cacheLoadWaitTimeMs (" + cacheLoadWaitTimeMs + ")", + threadPoolExecutionTimeMs <= cacheLoadWaitTimeMs); + Assert.assertTrue("threadPoolWaitTimeMs (" + threadPoolWaitTimeMs + + ") should be <= cacheLoadWaitTimeMs (" + cacheLoadWaitTimeMs + ")", + threadPoolWaitTimeMs <= cacheLoadWaitTimeMs); } finally { singleThreadPool.shutdownNow(); } } @Test - public void testTimingHierarchyBetweenClientAndServerMetrics() throws Exception { - Scan scan = generateScan(HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + public void testScannerCloseTimeMetric() throws Exception { + Scan scan = generateScan(Bytes.toBytes("zzz"), HConstants.EMPTY_END_ROW); scan.setScanMetricsEnabled(true); - ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 3); - Assert.assertNotNull(scanMetrics); - Map metricsMap = scanMetrics.getMetricsMap(false); + scan.setCaching(1); - long serverProcessingTime = - metricsMap.get(ServerSideScanMetrics.RPC_SCAN_PROCESSING_TIME_METRIC_NAME); - long serverQueueWaitTime = - metricsMap.get(ServerSideScanMetrics.RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME); - long serverTime = serverProcessingTime + serverQueueWaitTime; - long rpcRoundTrip = metricsMap.get(ScanMetrics.CLIENT_RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); - long scanExec = metricsMap.get(ScanMetrics.CLIENT_SCAN_EXECUTION_TIME_MS_METRIC_NAME); - long cacheLoad = metricsMap.get(ScanMetrics.CLIENT_CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); - - Assert.assertTrue("serverTime should be > 0, got " + serverTime, serverTime > 0); + ScanMetrics scanMetrics; + try (Table table = CONN.getTable(TABLE_NAME); + ResultScanner scanner = table.getScanner(scan)) { + Result result = scanner.next(); + Assert.assertNotNull(result); + Assert.assertFalse(result.isEmpty()); + scanMetrics = scanner.getScanMetrics(); + } - Assert.assertTrue("serverTime (" + serverTime + ") should be <= rpcRoundTrip (" - + rpcRoundTrip + ")", serverTime <= rpcRoundTrip); - Assert.assertTrue("rpcRoundTrip (" + rpcRoundTrip + ") should be <= scanExecution (" - + scanExec + ")", rpcRoundTrip <= scanExec); - Assert.assertTrue("scanExecution (" + scanExec + ") should be <= cacheLoad (" - + cacheLoad + ")", scanExec <= cacheLoad); + Assert.assertNotNull(scanMetrics); + Map metricsMap = scanMetrics.getMetricsMap(false); + System.out.println("ScanMetrics: " + scanMetrics + ", sleepTimeMs: " + sleepTimeMs.get()); + + long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); + Assert.assertEquals(1, regionsScanned); + + long scannerCloseTime = metricsMap.get(ScanMetrics.SCANNER_CLOSE_TIME_MS_METRIC_NAME); + Assert.assertTrue(scannerCloseTime >= SleepOnScanCoprocessor.SLEEP_TIME_MS); + long cacheLoadWaitTimeMs = metricsMap.get(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); + Assert.assertTrue( + cacheLoadWaitTimeMs > SleepOnScanCoprocessor.SLEEP_TIME_MS * 2); + Assert.assertTrue("scannerCloseTime (" + scannerCloseTime + + ") should be <= cacheLoadWaitTimeMs (" + cacheLoadWaitTimeMs + ")", + scannerCloseTime <= cacheLoadWaitTimeMs); } } From 2e4d389da3e061404be95f23f5d187d9be85df37 Mon Sep 17 00:00:00 2001 From: Sanjeet Malhotra Date: Tue, 31 Mar 2026 06:49:13 +0530 Subject: [PATCH 05/10] Add test coverage --- .../TestClientSideTableScanMetrics.java | 2 +- .../hbase/client/TestReplicasClient.java | 167 ++++++++++++++++++ 2 files changed, 168 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideTableScanMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideTableScanMetrics.java index 1edb240710ac..6708f256d3cf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideTableScanMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideTableScanMetrics.java @@ -339,7 +339,7 @@ public void testScannerCloseTimeMetric() throws Exception { Assert.assertTrue(scannerCloseTime >= SleepOnScanCoprocessor.SLEEP_TIME_MS); long cacheLoadWaitTimeMs = metricsMap.get(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); Assert.assertTrue( - cacheLoadWaitTimeMs > SleepOnScanCoprocessor.SLEEP_TIME_MS * 2); + cacheLoadWaitTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * 2); Assert.assertTrue("scannerCloseTime (" + scannerCloseTime + ") should be <= cacheLoadWaitTimeMs (" + cacheLoadWaitTimeMs + ")", scannerCloseTime <= cacheLoadWaitTimeMs); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java index 5d372ac7b70b..cc422e7a25f4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java @@ -25,6 +25,7 @@ import com.codahale.metrics.Counter; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.StartMiniClusterOption; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; @@ -109,6 +111,7 @@ public static class SlowMeCopro implements RegionCoprocessor, RegionObserver { static final AtomicInteger primaryCountOfScan = new AtomicInteger(0); static final AtomicInteger secondaryCountOfScan = new AtomicInteger(0); static final AtomicLong sleepTime = new AtomicLong(0); + static final AtomicLong secondarySleepTime = new AtomicLong(0); static final AtomicBoolean slowDownNext = new AtomicBoolean(false); static final AtomicInteger countOfNext = new AtomicInteger(0); private static final AtomicReference primaryCdl = @@ -186,6 +189,10 @@ private void slowdownCode(final ObserverContext e) LOG.info("We're not the primary replicas."); CountDownLatch latch = getSecondaryCdl().get(); try { + if (secondarySleepTime.get() > 0) { + LOG.info("Sleeping for " + secondarySleepTime.get() + " ms while fetching secondary replica"); + Thread.sleep(secondarySleepTime.get()); + } if (latch.getCount() > 0) { LOG.info("Waiting for the secondary counterCountDownLatch"); latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. @@ -262,6 +269,7 @@ public void before() throws IOException { } SlowMeCopro.slowDownNext.set(false); SlowMeCopro.sleepTime.set(0); + SlowMeCopro.secondarySleepTime.set(0); SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(0)); SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(0)); table = HTU.getConnection().getTable(TABLE_NAME); @@ -831,6 +839,165 @@ public void testScanMetricsByRegion() throws Exception { } } + @Test + public void testClientScanTimingMetricsFromSecondaryReplica() throws Exception { + byte[] b1 = Bytes.toBytes("testScanTimingMetrics"); + long secondaryReplicaSleepTime = 20; + openRegion(hriSecondary); + + try { + Put p = new Put(b1); + p.addColumn(f, b1, b1); + table.put(p); + flushRegion(hriPrimary); + Thread.sleep(2 * REFRESH_PERIOD); + + SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); + SlowMeCopro.secondarySleepTime.set(secondaryReplicaSleepTime); + + Scan scan = new Scan(); + scan.setScanMetricsEnabled(true); + scan.withStartRow(b1, true); + scan.withStopRow(b1, true); + scan.setConsistency(Consistency.TIMELINE); + + ScanMetrics scanMetrics; + try (ResultScanner rs = table.getScanner(scan)) { + for (Result r : rs) { + Assert.assertTrue(r.isStale()); + Assert.assertFalse(r.isEmpty()); + } + scanMetrics = rs.getScanMetrics(); + } + Assert.assertNotNull(scanMetrics); + Map metricsMap = scanMetrics.getMetricsMap(false); + + long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); + Assert.assertEquals("regionsScanned should be 1", 1, regionsScanned); + + long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME); + // 1 for primary replica and 1 for secondary replica + Assert.assertEquals("rpcCalls should be 2", 2, rpcCalls); + + long cacheLoad = metricsMap.get(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); + long threadPoolExec = + metricsMap.get(ScanMetrics.THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); + long scanExec = metricsMap.get(ScanMetrics.SCAN_EXECUTION_TIME_MS_METRIC_NAME); + long rpcRoundTrip = metricsMap.get(ScanMetrics.RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); + + Assert.assertTrue("cacheLoadWaitTimeMs should be >= " + secondaryReplicaSleepTime + + ", got " + cacheLoad, cacheLoad >= secondaryReplicaSleepTime); + Assert.assertTrue("threadPoolExecutionTimeMs should be >= " + secondaryReplicaSleepTime + + ", got " + threadPoolExec, threadPoolExec >= secondaryReplicaSleepTime); + Assert.assertTrue("scanExecutionTimeMs should be >= " + secondaryReplicaSleepTime + + ", got " + scanExec, scanExec >= secondaryReplicaSleepTime); + Assert.assertTrue("rpcRoundTripTimeMs should be >= " + secondaryReplicaSleepTime + + ", got " + rpcRoundTrip, rpcRoundTrip >= secondaryReplicaSleepTime); + + Assert.assertTrue("rpcRoundTrip (" + rpcRoundTrip + ") should be <= scanExecution (" + + scanExec + ")", rpcRoundTrip <= scanExec); + Assert.assertTrue("scanExecution (" + scanExec + ") should be <= threadPoolExecution (" + + threadPoolExec + ")", scanExec <= threadPoolExec); + Assert.assertTrue("threadPoolExecution (" + threadPoolExec + ") should be <= cacheLoad (" + + cacheLoad + ")", threadPoolExec <= cacheLoad); + } finally { + SlowMeCopro.getPrimaryCdl().get().countDown(); + SlowMeCopro.secondarySleepTime.set(0); + Delete d = new Delete(b1); + table.delete(d); + closeRegion(hriSecondary); + } + } + + @Test + public void testClientScanTimingMetricsWithReplicaSwitchMidScan() throws Exception { + byte[] b1 = Bytes.toBytes("testScanTimingPriSec"); + byte[] b2 = Bytes.toBytes("testScanTimingPriSed"); + byte[] b3 = Bytes.toBytes("testScanTimingPriSee"); + long replicaSleepTime = 50; + openRegion(hriSecondary); + + try { + table.put(Arrays.asList( + new Put(b1).addColumn(f, b1, b1), + new Put(b2).addColumn(f, b2, b2), + new Put(b3).addColumn(f, b3, b3))); + flushRegion(hriPrimary); + Thread.sleep(2 * REFRESH_PERIOD); + + SlowMeCopro.sleepTime.set(replicaSleepTime); + SlowMeCopro.slowDownNext.set(true); + SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(1)); + + Scan scan = new Scan(); + scan.setScanMetricsEnabled(true); + scan.withStartRow(b1, true); + scan.withStopRow(b3, true); + scan.setCaching(1); + scan.setConsistency(Consistency.TIMELINE); + + ScanMetrics scanMetrics; + try (ResultScanner rs = table.getScanner(scan)) { + Result r1 = rs.next(); + Assert.assertNotNull(r1); + Assert.assertFalse(r1.isStale()); + + SlowMeCopro.sleepTime.set(0); + SlowMeCopro.secondarySleepTime.set(replicaSleepTime); + SlowMeCopro.getSecondaryCdl().get().countDown(); + + Result r2 = rs.next(); + Assert.assertNotNull(r2); + Assert.assertTrue(r2.isStale()); + + scanMetrics = rs.getScanMetrics(); + } + Assert.assertNotNull(scanMetrics); + Map metricsMap = scanMetrics.getMetricsMap(false); + + long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); + Assert.assertEquals("regionsScanned should be 1", 1, regionsScanned); + + long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME); + + Assert.assertEquals("rpcCalls should be 5", 5, rpcCalls); + + long cacheLoad = metricsMap.get(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); + long threadPoolExec = + metricsMap.get(ScanMetrics.THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); + long scanExec = metricsMap.get(ScanMetrics.SCAN_EXECUTION_TIME_MS_METRIC_NAME); + long rpcRoundTrip = metricsMap.get(ScanMetrics.RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); + + long minExpected = 2 * replicaSleepTime; + Assert.assertTrue("cacheLoadWaitTimeMs should be >= " + minExpected + + ", got " + cacheLoad, cacheLoad >= minExpected); + Assert.assertTrue("threadPoolExecutionTimeMs should be >= " + minExpected + + ", got " + threadPoolExec, threadPoolExec >= minExpected); + Assert.assertTrue("scanExecutionTimeMs should be >= " + minExpected + + ", got " + scanExec, scanExec >= minExpected); + Assert.assertTrue("rpcRoundTripTimeMs should be >= " + minExpected + + ", got " + rpcRoundTrip, rpcRoundTrip >= minExpected); + + Assert.assertTrue("rpcRoundTrip (" + rpcRoundTrip + ") should be <= scanExecution (" + + scanExec + ")", rpcRoundTrip <= scanExec); + Assert.assertTrue("scanExecution (" + scanExec + ") should be <= threadPoolExecution (" + + threadPoolExec + ")", scanExec <= threadPoolExec); + Assert.assertTrue("threadPoolExecution (" + threadPoolExec + ") should be <= cacheLoad (" + + cacheLoad + ")", threadPoolExec <= cacheLoad); + } finally { + SlowMeCopro.getPrimaryCdl().get().countDown(); + SlowMeCopro.getSecondaryCdl().get().countDown(); + SlowMeCopro.sleepTime.set(0); + SlowMeCopro.secondarySleepTime.set(0); + SlowMeCopro.slowDownNext.set(false); + SlowMeCopro.countOfNext.set(0); + table.delete(new Delete(b1)); + table.delete(new Delete(b2)); + table.delete(new Delete(b3)); + closeRegion(hriSecondary); + } + } + private void assertScanMetrics(Scan scan, RegionInfo regionInfo, boolean isStale) throws IOException { try (ResultScanner rs = table.getScanner(scan);) { From e5eba3be44dcc6506d8b5f81bfe2d50aefdce2e8 Mon Sep 17 00:00:00 2001 From: Sanjeet Malhotra Date: Tue, 31 Mar 2026 11:04:15 +0530 Subject: [PATCH 06/10] Complete test coverage --- .../TestClientSideTableScanMetrics.java | 57 ++++++++++++++++++- 1 file changed, 54 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideTableScanMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideTableScanMetrics.java index 6708f256d3cf..3dc3ed8cb424 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideTableScanMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideTableScanMetrics.java @@ -27,6 +27,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -61,8 +62,12 @@ public class TestClientSideTableScanMetrics extends FromClientSideBase { static AtomicInteger sleepTimeMs = new AtomicInteger(0); + private static final TableName TABLE_NAME = + TableName.valueOf(TestClientSideTableScanMetrics.class.getSimpleName()); + public static class SleepOnScanCoprocessor implements RegionCoprocessor, RegionObserver { static final int SLEEP_TIME_MS = 5; + static final AtomicBoolean throwOnNextOnce = new AtomicBoolean(false); @Override public Optional getRegionObserver() { @@ -83,6 +88,12 @@ public void preScannerOpen(ObserverContext c, Scan @Override public boolean preScannerNext(ObserverContext c, InternalScanner s, List results, int limit, boolean hasMore) throws IOException { + if ( + TABLE_NAME.equals(c.getEnvironment().getRegionInfo().getTable()) + && throwOnNextOnce.compareAndSet(true, false) + ) { + throw new IOException("Injected fault for testing remoteExc path"); + } try { Thread.sleep(SLEEP_TIME_MS); sleepTimeMs.addAndGet(SLEEP_TIME_MS); @@ -106,9 +117,6 @@ public void preScannerClose(ObserverContext c, private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static final TableName TABLE_NAME = - TableName.valueOf(TestClientSideTableScanMetrics.class.getSimpleName()); - private static final byte[] CF = Bytes.toBytes("cf"); private static final byte[] CQ = Bytes.toBytes("cq"); @@ -156,6 +164,7 @@ public static void tearDown() throws Exception { @Before public void testCaseSetup() { sleepTimeMs.set(0); + SleepOnScanCoprocessor.throwOnNextOnce.set(false); } protected Scan generateScan(byte[] smallerRow, byte[] largerRow) throws IOException { @@ -344,4 +353,46 @@ public void testScannerCloseTimeMetric() throws Exception { + ") should be <= cacheLoadWaitTimeMs (" + cacheLoadWaitTimeMs + ")", scannerCloseTime <= cacheLoadWaitTimeMs); } + + @Test + public void testScanMetricsWithRemoteException() throws Exception { + SleepOnScanCoprocessor.throwOnNextOnce.set(true); + + Scan scan = generateScan(HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + scan.setScanMetricsEnabled(true); + ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 5); + Assert.assertNotNull(scanMetrics); + Map metricsMap = scanMetrics.getMetricsMap(false); + + long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); + Assert.assertEquals(NUM_REGIONS + 1, regionsScanned); + long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME); + Assert.assertEquals("rpcCalls should be 1 + NUM_REGIONS due to 1 extra retry", + 1 + NUM_REGIONS, rpcCalls); + + long cacheLoadWaitTimeMs = metricsMap.get(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); + long threadPoolExecutionTimeMs = + metricsMap.get(ScanMetrics.THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); + long scanExecutionTimeMs = metricsMap.get(ScanMetrics.SCAN_EXECUTION_TIME_MS_METRIC_NAME); + long rpcRoundTripTimeMs = metricsMap.get(ScanMetrics.RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); + + Assert.assertTrue( + cacheLoadWaitTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + Assert.assertTrue( + threadPoolExecutionTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + Assert.assertTrue( + scanExecutionTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + Assert.assertTrue( + rpcRoundTripTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + + Assert.assertTrue("rpcRoundTripTimeMs (" + rpcRoundTripTimeMs + + ") should be <= scanExecutionTimeMs (" + scanExecutionTimeMs + ")", + rpcRoundTripTimeMs <= scanExecutionTimeMs); + Assert.assertTrue("scanExecutionTimeMs (" + scanExecutionTimeMs + + ") should be <= threadPoolExecutionTimeMs (" + threadPoolExecutionTimeMs + ")", + scanExecutionTimeMs <= threadPoolExecutionTimeMs); + Assert.assertTrue("threadPoolExecutionTimeMs (" + threadPoolExecutionTimeMs + + ") should be <= cacheLoadWaitTimeMs (" + cacheLoadWaitTimeMs + ")", + threadPoolExecutionTimeMs <= cacheLoadWaitTimeMs); + } } From 999732453f7dd2989dcb40ecbcfd5375981aa3f7 Mon Sep 17 00:00:00 2001 From: Sanjeet Malhotra Date: Tue, 31 Mar 2026 11:04:32 +0530 Subject: [PATCH 07/10] spotless apply --- .../hadoop/hbase/client/ClientScanner.java | 3 +- .../hadoop/hbase/client/ScannerCallable.java | 15 +-- .../client/ScannerCallableWithReplicas.java | 21 ++-- .../hbase/client/metrics/ScanMetrics.java | 19 ++-- .../hadoop/hbase/ipc/AbstractRpcClient.java | 3 +- .../ipc/DelegatingHBaseRpcController.java | 2 +- .../hbase/ipc/NettyRpcDuplexHandler.java | 2 +- .../hadoop/hbase/ipc/RpcConnection.java | 8 +- .../TestClientSideTableScanMetrics.java | 100 +++++++++--------- .../hbase/client/TestReplicasClient.java | 71 +++++++------ 10 files changed, 124 insertions(+), 120 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index a55f922ba8e3..c6bb4b80f83e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -338,7 +338,8 @@ protected Result nextWithSyncCache() throws IOException { loadCache(); } finally { if (scanMetrics != null) { - scanMetrics.addToCounter(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME, EnvironmentEdgeManager.currentTime() - cacheLoadStartTimeMs); + scanMetrics.addToCounter(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME, + EnvironmentEdgeManager.currentTime() - cacheLoadStartTimeMs); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 83fc575b7f19..140e25a3b865 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -44,12 +44,13 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; -import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; @@ -262,7 +263,8 @@ private long getRpcCallTimeMs(HBaseRpcController hrc) { return responseReceiveTimestampInMs - requestSendTimestampInMs; } - private ScanResponse doScan(RpcController controller, ScanRequest request) throws ServiceException { + private ScanResponse doScan(RpcController controller, ScanRequest request) + throws ServiceException { try { ScanResponse response = getStub().scan(controller, request); return response; @@ -302,8 +304,8 @@ protected Result[] rpcCall() throws Exception { long now = EnvironmentEdgeManager.currentTime(); if (now - timestamp > logCutOffLatency) { int rows = rrs == null ? 0 : rrs.length; - LOG.info( - "Took " + (now - timestamp) + "ms to fetch " + rows + " rows from scanner=" + scannerId); + LOG.info("Took " + (now - timestamp) + "ms to fetch " + rows + " rows from scanner=" + + scannerId); } } updateServerSideMetrics(scanMetrics, response); @@ -500,8 +502,7 @@ void populateScanMetrics(ScanMetrics scanMetrics) { threadPoolWaitTimeMs); scanMetrics.addToCounter(ScanMetrics.THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME, threadPoolExecutionTimeMs); - scanMetrics.addToCounter(ScanMetrics.SCAN_EXECUTION_TIME_MS_METRIC_NAME, - scanExecutionTimeMs); + scanMetrics.addToCounter(ScanMetrics.SCAN_EXECUTION_TIME_MS_METRIC_NAME, scanExecutionTimeMs); scanMetrics.addToCounter(ScanMetrics.RPC_ROUND_TRIP_TIME_MS_METRIC_NAME, rpcCallTimeMs); threadPoolWaitTimeMs = 0; threadPoolExecutionTimeMs = 0; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index a584ccb03b15..f386dbff16f8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -25,7 +25,6 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; @@ -34,7 +33,6 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.ResultBoundedCompletionService.QueueingFuture; import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -193,8 +191,7 @@ public Result[] call(int timeout) throws IOException { // For completeness throw e; } - } - finally { + } finally { metaLookupTimeMs += (EnvironmentEdgeManager.currentTime() - metaLookupStartTimeMs); } regionReplication = rl.size(); @@ -219,7 +216,8 @@ public Result[] call(int timeout) throws IOException { addCallsForCurrentReplica(cs, rpcTimeoutForCall); int startIndex = 0; - ResultBoundedCompletionService>.QueueingFuture> f = null; + ResultBoundedCompletionService>.QueueingFuture< + Pair> f = null; try { // wait for the timeout to see whether the primary responds back f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds @@ -265,7 +263,8 @@ public Result[] call(int timeout) throws IOException { } try { - f = cs.pollForFirstSuccessfullyCompletedTask(timeout, TimeUnit.MILLISECONDS, startIndex, endIndex); + f = cs.pollForFirstSuccessfullyCompletedTask(timeout, TimeUnit.MILLISECONDS, startIndex, + endIndex); if (f == null) { throw new IOException("Failed to get result within timeout, timeout=" + timeout + "ms"); @@ -294,7 +293,9 @@ public Result[] call(int timeout) throws IOException { @SuppressWarnings("FutureReturnValueIgnored") private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result, - AtomicBoolean done, ExecutorService pool, ResultBoundedCompletionService>.QueueingFuture> future) { + AtomicBoolean done, ExecutorService pool, + ResultBoundedCompletionService>.QueueingFuture< + Pair> future) { if (done.compareAndSet(false, true)) { if (currentScannerCallable != scanner) replicaSwitched.set(true); currentScannerCallable = scanner; @@ -529,11 +530,9 @@ public void populateScanMetrics(ScanMetrics scanMetrics) { if (scanMetrics == null) { return; } - scanMetrics.addToCounter(ScanMetrics.META_LOOKUP_TIME_MS_METRIC_NAME, - metaLookupTimeMs); + scanMetrics.addToCounter(ScanMetrics.META_LOOKUP_TIME_MS_METRIC_NAME, metaLookupTimeMs); metaLookupTimeMs = 0; - scanMetrics.addToCounter(ScanMetrics.SCANNER_CLOSE_TIME_MS_METRIC_NAME, - scannerCloseTimeMs); + scanMetrics.addToCounter(ScanMetrics.SCANNER_CLOSE_TIME_MS_METRIC_NAME, scannerCloseTimeMs); scannerCloseTimeMs = 0; if (currentScannerCallable != null) { currentScannerCallable.populateScanMetrics(scanMetrics); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java index 73a8c6fad79e..d739e57d43b2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java @@ -47,8 +47,7 @@ public class ScanMetrics extends ServerSideScanMetrics { public static final String REGIONS_SCANNED_METRIC_NAME = "REGIONS_SCANNED"; public static final String RPC_RETRIES_METRIC_NAME = "RPC_RETRIES"; public static final String REMOTE_RPC_RETRIES_METRIC_NAME = "REMOTE_RPC_RETRIES"; - public static final String THREAD_POOL_WAIT_TIME_MS_METRIC_NAME = - "THREAD_POOL_WAIT_TIME_MS"; + public static final String THREAD_POOL_WAIT_TIME_MS_METRIC_NAME = "THREAD_POOL_WAIT_TIME_MS"; public static final String THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME = "THREAD_POOL_EXECUTION_TIME_MS"; public static final String META_LOOKUP_TIME_MS_METRIC_NAME = "META_LOOKUP_TIME_MS"; @@ -109,20 +108,16 @@ public class ScanMetrics extends ServerSideScanMetrics { public final AtomicLong threadPoolExecutionTimeMs = createCounter(THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); - public final AtomicLong metaLookupTimeMs = - createCounter(META_LOOKUP_TIME_MS_METRIC_NAME); + public final AtomicLong metaLookupTimeMs = createCounter(META_LOOKUP_TIME_MS_METRIC_NAME); - public final AtomicLong scannerCloseTimeMs = - createCounter(SCANNER_CLOSE_TIME_MS_METRIC_NAME); + public final AtomicLong scannerCloseTimeMs = createCounter(SCANNER_CLOSE_TIME_MS_METRIC_NAME); - public final AtomicLong scanExecutionTimeMs = - createCounter(SCAN_EXECUTION_TIME_MS_METRIC_NAME); + public final AtomicLong scanExecutionTimeMs = createCounter(SCAN_EXECUTION_TIME_MS_METRIC_NAME); - public final AtomicLong rpcRoundTripTimeMs = - createCounter(RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); + public final AtomicLong rpcRoundTripTimeMs = createCounter(RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); + + public final AtomicLong cacheLoadWaitTimeMs = createCounter(CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); - public final AtomicLong cacheLoadWaitTimeMs = - createCounter(CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); /** * Constructor */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 1fe33de2054f..c155322c348e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -344,8 +344,7 @@ private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcCont val = done.get(); } catch (IOException e) { throw new ServiceException(e); - } - finally { + } finally { hrc.setRequestSendTimestampInMs(call.getRequestSendTimestampInMs()); hrc.setResponseReceiveTimestampInMs(call.getResponseReceiveTimestampInMs()); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java index f0fea71071a8..c86192ea403c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java @@ -163,7 +163,7 @@ public void setRequestSendTimestampInMs(long timestamp) { public long getRequestSendTimestampInMs() { return delegate.getRequestSendTimestampInMs(); } - + @Override public void setResponseReceiveTimestampInMs(long timestamp) { delegate.setResponseReceiveTimestampInMs(timestamp); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java index ed33610fc62f..de730a22fa3d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -40,7 +41,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * The netty rpc handler. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java index 46db97029996..a89328a1efe0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java @@ -372,7 +372,8 @@ private void finishCall(ResponseHeader respo Message.Builder builder = call.responseDefaultType.newBuilderForType(); if (!builder.mergeDelimitedFrom(in)) { // The javadoc of mergeDelimitedFrom says returning false means the stream reaches EOF - // before reading any bytes out, so here we need to manually finish create the EOFException + // before reading any bytes out, so here we need to manually finish create the + // EOFException // and finish the call call.setException(new EOFException("EOF while reading response with type: " + call.responseDefaultType.getClass().getName())); @@ -388,7 +389,8 @@ private void finishCall(ResponseHeader respo // The problem here is that we do not know when to release it. byte[] cellBlock = new byte[size]; in.readFully(cellBlock); - cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock); + cellBlockScanner = + cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock); } else { cellBlockScanner = null; } @@ -472,7 +474,7 @@ void readResponse(T in, Map i throw e; } } - + private void setResponseReceiveTimestampInMs(Call call) { call.setResponseReceiveTimestampInMs(EnvironmentEdgeManager.currentTime()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideTableScanMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideTableScanMetrics.java index 3dc3ed8cb424..df680cb6297a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideTableScanMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideTableScanMetrics.java @@ -24,12 +24,11 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -42,7 +41,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; import org.junit.Assert; @@ -54,7 +53,7 @@ import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; -@Category({ ClientTests.class, LargeTests.class }) +@Category({ ClientTests.class, MediumTests.class }) public class TestClientSideTableScanMetrics extends FromClientSideBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -76,7 +75,7 @@ public Optional getRegionObserver() { @Override public void preScannerOpen(ObserverContext c, Scan scan) - throws IOException { + throws IOException { try { Thread.sleep(SLEEP_TIME_MS); sleepTimeMs.addAndGet(SLEEP_TIME_MS); @@ -87,7 +86,7 @@ public void preScannerOpen(ObserverContext c, Scan @Override public boolean preScannerNext(ObserverContext c, - InternalScanner s, List results, int limit, boolean hasMore) throws IOException { + InternalScanner s, List results, int limit, boolean hasMore) throws IOException { if ( TABLE_NAME.equals(c.getEnvironment().getRegionInfo().getTable()) && throwOnNextOnce.compareAndSet(true, false) @@ -104,8 +103,8 @@ public boolean preScannerNext(ObserverContext c, } @Override - public void preScannerClose(ObserverContext c, - InternalScanner s) throws IOException { + public void preScannerClose(ObserverContext c, InternalScanner s) + throws IOException { try { Thread.sleep(SLEEP_TIME_MS); sleepTimeMs.addAndGet(SLEEP_TIME_MS); @@ -210,18 +209,17 @@ public void testScanExecutionAndRpcTimeMetrics() throws Exception { long scanExec = metricsMap.get(ScanMetrics.SCAN_EXECUTION_TIME_MS_METRIC_NAME); long rpcRoundTrip = metricsMap.get(ScanMetrics.RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); - // Per region two times the slowness is introduced, once for preScannerOpen and once for preScannerNext. - Assert.assertTrue( - cacheLoad >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); - Assert.assertTrue( - scanExec >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); - Assert.assertTrue( - rpcRoundTrip >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + // Per region two times the slowness is introduced, once for preScannerOpen and once for + // preScannerNext. + Assert.assertTrue(cacheLoad >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + Assert.assertTrue(scanExec >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + Assert.assertTrue(rpcRoundTrip >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); - Assert.assertTrue("rpcRoundTrip (" + rpcRoundTrip + ") should be <= scanExecution (" - + scanExec + ")", rpcRoundTrip <= scanExec); - Assert.assertTrue("scanExecution (" + scanExec + ") should be <= cacheLoad (" - + cacheLoad + ")", scanExec <= cacheLoad); + Assert.assertTrue( + "rpcRoundTrip (" + rpcRoundTrip + ") should be <= scanExecution (" + scanExec + ")", + rpcRoundTrip <= scanExec); + Assert.assertTrue("scanExecution (" + scanExec + ") should be <= cacheLoad (" + cacheLoad + ")", + scanExec <= cacheLoad); } @Test @@ -237,16 +235,17 @@ public void testMetaLookupTimeMetric() throws Exception { long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); Assert.assertEquals(NUM_REGIONS, regionsScanned); - // Meta lookup time for forward scan happens in ScannerCallableWithReplicas.call() but for reverse scan it happens in ReverseScannerCallable.prepare() except the first time. Thus, asserting on minimum value of meta lookup time across both directions of scan. + // Meta lookup time for forward scan happens in ScannerCallableWithReplicas.call() but for + // reverse scan it happens in ReverseScannerCallable.prepare() except the first time. Thus, + // asserting on minimum value of meta lookup time across both directions of scan. long metaLookupTime = metricsMap.get(ScanMetrics.META_LOOKUP_TIME_MS_METRIC_NAME); - Assert.assertTrue( - metaLookupTime >= SleepOnScanCoprocessor.SLEEP_TIME_MS * 2); + Assert.assertTrue(metaLookupTime >= SleepOnScanCoprocessor.SLEEP_TIME_MS * 2); long cacheLoad = metricsMap.get(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); + Assert.assertTrue(cacheLoad >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); Assert.assertTrue( - cacheLoad >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); - Assert.assertTrue("metaLookupTime (" + metaLookupTime + ") should be <= cacheLoad (" - + cacheLoad + ")", metaLookupTime <= cacheLoad); + "metaLookupTime (" + metaLookupTime + ") should be <= cacheLoad (" + cacheLoad + ")", + metaLookupTime <= cacheLoad); } @Test @@ -285,7 +284,7 @@ public void testThreadPoolMetrics() throws Exception { int countOfRows = 0; ScanMetrics scanMetrics; try (Table table = CONN.getTable(TABLE_NAME, singleThreadPool); - ResultScanner scanner = table.getScanner(scan)) { + ResultScanner scanner = table.getScanner(scan)) { for (Result result : scanner) { Assert.assertFalse(result.isEmpty()); countOfRows++; @@ -301,18 +300,23 @@ public void testThreadPoolMetrics() throws Exception { long threadPoolWaitTimeMs = metricsMap.get(ScanMetrics.THREAD_POOL_WAIT_TIME_MS_METRIC_NAME); Assert.assertTrue(threadPoolWaitTimeMs >= minWaitTimeMs); - long threadPoolExecutionTimeMs = metricsMap.get(ScanMetrics.THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); - Assert.assertTrue(threadPoolExecutionTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); - long scanExecutionTimeMs = metricsMap.get(ScanMetrics.SCAN_EXECUTION_TIME_MS_METRIC_NAME); + long threadPoolExecutionTimeMs = + metricsMap.get(ScanMetrics.THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); Assert.assertTrue( - scanExecutionTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + threadPoolExecutionTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + long scanExecutionTimeMs = metricsMap.get(ScanMetrics.SCAN_EXECUTION_TIME_MS_METRIC_NAME); + Assert + .assertTrue(scanExecutionTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); long cacheLoadWaitTimeMs = metricsMap.get(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); - Assert.assertTrue("scanExecutionTimeMs (" + scanExecutionTimeMs - + ") should be <= threadPoolExecutionTimeMs (" + threadPoolExecutionTimeMs + ")", - scanExecutionTimeMs <= threadPoolExecutionTimeMs); - Assert.assertTrue("threadPoolExecutionTimeMs (" + threadPoolExecutionTimeMs - + ") should be <= cacheLoadWaitTimeMs (" + cacheLoadWaitTimeMs + ")", + Assert + .assertTrue( + "scanExecutionTimeMs (" + scanExecutionTimeMs + + ") should be <= threadPoolExecutionTimeMs (" + threadPoolExecutionTimeMs + ")", + scanExecutionTimeMs <= threadPoolExecutionTimeMs); + Assert.assertTrue( + "threadPoolExecutionTimeMs (" + threadPoolExecutionTimeMs + + ") should be <= cacheLoadWaitTimeMs (" + cacheLoadWaitTimeMs + ")", threadPoolExecutionTimeMs <= cacheLoadWaitTimeMs); Assert.assertTrue("threadPoolWaitTimeMs (" + threadPoolWaitTimeMs + ") should be <= cacheLoadWaitTimeMs (" + cacheLoadWaitTimeMs + ")", @@ -329,8 +333,7 @@ public void testScannerCloseTimeMetric() throws Exception { scan.setCaching(1); ScanMetrics scanMetrics; - try (Table table = CONN.getTable(TABLE_NAME); - ResultScanner scanner = table.getScanner(scan)) { + try (Table table = CONN.getTable(TABLE_NAME); ResultScanner scanner = table.getScanner(scan)) { Result result = scanner.next(); Assert.assertNotNull(result); Assert.assertFalse(result.isEmpty()); @@ -347,8 +350,7 @@ public void testScannerCloseTimeMetric() throws Exception { long scannerCloseTime = metricsMap.get(ScanMetrics.SCANNER_CLOSE_TIME_MS_METRIC_NAME); Assert.assertTrue(scannerCloseTime >= SleepOnScanCoprocessor.SLEEP_TIME_MS); long cacheLoadWaitTimeMs = metricsMap.get(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); - Assert.assertTrue( - cacheLoadWaitTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * 2); + Assert.assertTrue(cacheLoadWaitTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * 2); Assert.assertTrue("scannerCloseTime (" + scannerCloseTime + ") should be <= cacheLoadWaitTimeMs (" + cacheLoadWaitTimeMs + ")", scannerCloseTime <= cacheLoadWaitTimeMs); @@ -367,8 +369,8 @@ public void testScanMetricsWithRemoteException() throws Exception { long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); Assert.assertEquals(NUM_REGIONS + 1, regionsScanned); long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME); - Assert.assertEquals("rpcCalls should be 1 + NUM_REGIONS due to 1 extra retry", - 1 + NUM_REGIONS, rpcCalls); + Assert.assertEquals("rpcCalls should be 1 + NUM_REGIONS due to 1 extra retry", 1 + NUM_REGIONS, + rpcCalls); long cacheLoadWaitTimeMs = metricsMap.get(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); long threadPoolExecutionTimeMs = @@ -376,14 +378,13 @@ public void testScanMetricsWithRemoteException() throws Exception { long scanExecutionTimeMs = metricsMap.get(ScanMetrics.SCAN_EXECUTION_TIME_MS_METRIC_NAME); long rpcRoundTripTimeMs = metricsMap.get(ScanMetrics.RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); - Assert.assertTrue( - cacheLoadWaitTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + Assert + .assertTrue(cacheLoadWaitTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); Assert.assertTrue( threadPoolExecutionTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); - Assert.assertTrue( - scanExecutionTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); - Assert.assertTrue( - rpcRoundTripTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + Assert + .assertTrue(scanExecutionTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + Assert.assertTrue(rpcRoundTripTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); Assert.assertTrue("rpcRoundTripTimeMs (" + rpcRoundTripTimeMs + ") should be <= scanExecutionTimeMs (" + scanExecutionTimeMs + ")", @@ -391,8 +392,9 @@ public void testScanMetricsWithRemoteException() throws Exception { Assert.assertTrue("scanExecutionTimeMs (" + scanExecutionTimeMs + ") should be <= threadPoolExecutionTimeMs (" + threadPoolExecutionTimeMs + ")", scanExecutionTimeMs <= threadPoolExecutionTimeMs); - Assert.assertTrue("threadPoolExecutionTimeMs (" + threadPoolExecutionTimeMs - + ") should be <= cacheLoadWaitTimeMs (" + cacheLoadWaitTimeMs + ")", + Assert.assertTrue( + "threadPoolExecutionTimeMs (" + threadPoolExecutionTimeMs + + ") should be <= cacheLoadWaitTimeMs (" + cacheLoadWaitTimeMs + ")", threadPoolExecutionTimeMs <= cacheLoadWaitTimeMs); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java index cc422e7a25f4..69b0f380962e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java @@ -190,7 +190,8 @@ private void slowdownCode(final ObserverContext e) CountDownLatch latch = getSecondaryCdl().get(); try { if (secondarySleepTime.get() > 0) { - LOG.info("Sleeping for " + secondarySleepTime.get() + " ms while fetching secondary replica"); + LOG.info( + "Sleeping for " + secondarySleepTime.get() + " ms while fetching secondary replica"); Thread.sleep(secondarySleepTime.get()); } if (latch.getCount() > 0) { @@ -880,26 +881,30 @@ public void testClientScanTimingMetricsFromSecondaryReplica() throws Exception { Assert.assertEquals("rpcCalls should be 2", 2, rpcCalls); long cacheLoad = metricsMap.get(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); - long threadPoolExec = - metricsMap.get(ScanMetrics.THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); + long threadPoolExec = metricsMap.get(ScanMetrics.THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); long scanExec = metricsMap.get(ScanMetrics.SCAN_EXECUTION_TIME_MS_METRIC_NAME); long rpcRoundTrip = metricsMap.get(ScanMetrics.RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); - Assert.assertTrue("cacheLoadWaitTimeMs should be >= " + secondaryReplicaSleepTime - + ", got " + cacheLoad, cacheLoad >= secondaryReplicaSleepTime); + Assert.assertTrue( + "cacheLoadWaitTimeMs should be >= " + secondaryReplicaSleepTime + ", got " + cacheLoad, + cacheLoad >= secondaryReplicaSleepTime); Assert.assertTrue("threadPoolExecutionTimeMs should be >= " + secondaryReplicaSleepTime + ", got " + threadPoolExec, threadPoolExec >= secondaryReplicaSleepTime); - Assert.assertTrue("scanExecutionTimeMs should be >= " + secondaryReplicaSleepTime - + ", got " + scanExec, scanExec >= secondaryReplicaSleepTime); - Assert.assertTrue("rpcRoundTripTimeMs should be >= " + secondaryReplicaSleepTime - + ", got " + rpcRoundTrip, rpcRoundTrip >= secondaryReplicaSleepTime); - - Assert.assertTrue("rpcRoundTrip (" + rpcRoundTrip + ") should be <= scanExecution (" - + scanExec + ")", rpcRoundTrip <= scanExec); + Assert.assertTrue( + "scanExecutionTimeMs should be >= " + secondaryReplicaSleepTime + ", got " + scanExec, + scanExec >= secondaryReplicaSleepTime); + Assert.assertTrue( + "rpcRoundTripTimeMs should be >= " + secondaryReplicaSleepTime + ", got " + rpcRoundTrip, + rpcRoundTrip >= secondaryReplicaSleepTime); + + Assert.assertTrue( + "rpcRoundTrip (" + rpcRoundTrip + ") should be <= scanExecution (" + scanExec + ")", + rpcRoundTrip <= scanExec); Assert.assertTrue("scanExecution (" + scanExec + ") should be <= threadPoolExecution (" + threadPoolExec + ")", scanExec <= threadPoolExec); - Assert.assertTrue("threadPoolExecution (" + threadPoolExec + ") should be <= cacheLoad (" - + cacheLoad + ")", threadPoolExec <= cacheLoad); + Assert.assertTrue( + "threadPoolExecution (" + threadPoolExec + ") should be <= cacheLoad (" + cacheLoad + ")", + threadPoolExec <= cacheLoad); } finally { SlowMeCopro.getPrimaryCdl().get().countDown(); SlowMeCopro.secondarySleepTime.set(0); @@ -918,9 +923,7 @@ public void testClientScanTimingMetricsWithReplicaSwitchMidScan() throws Excepti openRegion(hriSecondary); try { - table.put(Arrays.asList( - new Put(b1).addColumn(f, b1, b1), - new Put(b2).addColumn(f, b2, b2), + table.put(Arrays.asList(new Put(b1).addColumn(f, b1, b1), new Put(b2).addColumn(f, b2, b2), new Put(b3).addColumn(f, b3, b3))); flushRegion(hriPrimary); Thread.sleep(2 * REFRESH_PERIOD); @@ -959,31 +962,33 @@ public void testClientScanTimingMetricsWithReplicaSwitchMidScan() throws Excepti Assert.assertEquals("regionsScanned should be 1", 1, regionsScanned); long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME); - + Assert.assertEquals("rpcCalls should be 5", 5, rpcCalls); long cacheLoad = metricsMap.get(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); - long threadPoolExec = - metricsMap.get(ScanMetrics.THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); + long threadPoolExec = metricsMap.get(ScanMetrics.THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); long scanExec = metricsMap.get(ScanMetrics.SCAN_EXECUTION_TIME_MS_METRIC_NAME); long rpcRoundTrip = metricsMap.get(ScanMetrics.RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); long minExpected = 2 * replicaSleepTime; - Assert.assertTrue("cacheLoadWaitTimeMs should be >= " + minExpected - + ", got " + cacheLoad, cacheLoad >= minExpected); - Assert.assertTrue("threadPoolExecutionTimeMs should be >= " + minExpected - + ", got " + threadPoolExec, threadPoolExec >= minExpected); - Assert.assertTrue("scanExecutionTimeMs should be >= " + minExpected - + ", got " + scanExec, scanExec >= minExpected); - Assert.assertTrue("rpcRoundTripTimeMs should be >= " + minExpected - + ", got " + rpcRoundTrip, rpcRoundTrip >= minExpected); - - Assert.assertTrue("rpcRoundTrip (" + rpcRoundTrip + ") should be <= scanExecution (" - + scanExec + ")", rpcRoundTrip <= scanExec); + Assert.assertTrue("cacheLoadWaitTimeMs should be >= " + minExpected + ", got " + cacheLoad, + cacheLoad >= minExpected); + Assert.assertTrue( + "threadPoolExecutionTimeMs should be >= " + minExpected + ", got " + threadPoolExec, + threadPoolExec >= minExpected); + Assert.assertTrue("scanExecutionTimeMs should be >= " + minExpected + ", got " + scanExec, + scanExec >= minExpected); + Assert.assertTrue("rpcRoundTripTimeMs should be >= " + minExpected + ", got " + rpcRoundTrip, + rpcRoundTrip >= minExpected); + + Assert.assertTrue( + "rpcRoundTrip (" + rpcRoundTrip + ") should be <= scanExecution (" + scanExec + ")", + rpcRoundTrip <= scanExec); Assert.assertTrue("scanExecution (" + scanExec + ") should be <= threadPoolExecution (" + threadPoolExec + ")", scanExec <= threadPoolExec); - Assert.assertTrue("threadPoolExecution (" + threadPoolExec + ") should be <= cacheLoad (" - + cacheLoad + ")", threadPoolExec <= cacheLoad); + Assert.assertTrue( + "threadPoolExecution (" + threadPoolExec + ") should be <= cacheLoad (" + cacheLoad + ")", + threadPoolExec <= cacheLoad); } finally { SlowMeCopro.getPrimaryCdl().get().countDown(); SlowMeCopro.getSecondaryCdl().get().countDown(); From 368f95ff279dbd1c4009c8b601ded1399ecc6466 Mon Sep 17 00:00:00 2001 From: Sanjeet Malhotra Date: Tue, 31 Mar 2026 11:19:56 +0530 Subject: [PATCH 08/10] Add comments --- .../hbase/client/metrics/ScanMetrics.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java index d739e57d43b2..28dccbf0df0d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java @@ -102,20 +102,41 @@ public class ScanMetrics extends ServerSideScanMetrics { */ public final AtomicLong countOfRemoteRPCRetries = createCounter(REMOTE_RPC_RETRIES_METRIC_NAME); + /** + * time spent waiting in the thread pool queue before execution started + */ public final AtomicLong threadPoolWaitTimeMs = createCounter(THREAD_POOL_WAIT_TIME_MS_METRIC_NAME); + /** + * time spent executing in the thread pool, including retry overhead + */ public final AtomicLong threadPoolExecutionTimeMs = createCounter(THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); + /** + * time spent looking up region locations from the meta table + */ public final AtomicLong metaLookupTimeMs = createCounter(META_LOOKUP_TIME_MS_METRIC_NAME); + /** + * time spent closing the server-side scanner via a close RPC + */ public final AtomicLong scannerCloseTimeMs = createCounter(SCANNER_CLOSE_TIME_MS_METRIC_NAME); + /** + * time spent inside ScannerCallable.call() executing the scan RPC logic + */ public final AtomicLong scanExecutionTimeMs = createCounter(SCAN_EXECUTION_TIME_MS_METRIC_NAME); + /** + * time for the raw RPC round trip on the wire + */ public final AtomicLong rpcRoundTripTimeMs = createCounter(RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); + /** + * total wall-clock time for a loadCache() call in ClientScanner + */ public final AtomicLong cacheLoadWaitTimeMs = createCounter(CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); /** From afa894f72cfe0bb6a16659a138c251bb8f7eb9c2 Mon Sep 17 00:00:00 2001 From: Sanjeet Malhotra Date: Tue, 31 Mar 2026 11:33:50 +0530 Subject: [PATCH 09/10] Add comments --- .../apache/hadoop/hbase/ipc/HBaseRpcController.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java index 9ba6c0534bed..4209baf39225 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java @@ -141,16 +141,29 @@ default TableName getTableName() { return null; } + /** + * Sets the timestamp when the RPC request was sent over the wire (includes request serialization) + */ default void setRequestSendTimestampInMs(long timestamp) { } + /** + * Sets the timestamp when the RPC response was received from the server (includes response + * deserialization) + */ default void setResponseReceiveTimestampInMs(long timestamp) { } + /** + * Returns the timestamp when the RPC request was sent over the wire, or 0 if not set + */ default long getRequestSendTimestampInMs() { return 0; } + /** + * Returns the timestamp when the RPC response was received from the server, or 0 if not set + */ default long getResponseReceiveTimestampInMs() { return 0; } From 80fa4380a1d714bbbe52610d43a0a8810468a044 Mon Sep 17 00:00:00 2001 From: Sanjeet Malhotra Date: Wed, 1 Apr 2026 10:18:39 +0530 Subject: [PATCH 10/10] Fix existing tests --- .../hbase/client/TestTableScanMetrics.java | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java index ac3b98a6bf78..6a7d443f1cea 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java @@ -18,10 +18,17 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY; +import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME; +import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.META_LOOKUP_TIME_MS_METRIC_NAME; import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME; import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.NOT_SERVING_REGION_EXCEPTION_METRIC_NAME; import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME; import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_RETRIES_METRIC_NAME; +import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_ROUND_TRIP_TIME_MS_METRIC_NAME; +import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.SCANNER_CLOSE_TIME_MS_METRIC_NAME; +import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.SCAN_EXECUTION_TIME_MS_METRIC_NAME; +import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME; +import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.THREAD_POOL_WAIT_TIME_MS_METRIC_NAME; import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME; import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.RPC_SCAN_PROCESSING_TIME_METRIC_NAME; import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME; @@ -86,6 +93,13 @@ public class TestTableScanMetrics extends FromClientSideBase { private static Connection CONN; + private static final List NON_DETERMINISTIC_METRICS = + Arrays.asList(MILLIS_BETWEEN_NEXTS_METRIC_NAME, RPC_SCAN_PROCESSING_TIME_METRIC_NAME, + RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME, RPC_ROUND_TRIP_TIME_MS_METRIC_NAME, + SCAN_EXECUTION_TIME_MS_METRIC_NAME, CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME, + META_LOOKUP_TIME_MS_METRIC_NAME, SCANNER_CLOSE_TIME_MS_METRIC_NAME, + THREAD_POOL_WAIT_TIME_MS_METRIC_NAME, THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); + @Parameters(name = "{index}: scanner={0}") public static List params() { return Arrays.asList(new Object[] { "ForwardScanner", new Scan() }, @@ -338,10 +352,10 @@ public void run() { .entrySet()) { ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); Map metricsMap = entry.getValue(); - // Remove millis between nexts metric as it is not deterministic - metricsMap.remove(MILLIS_BETWEEN_NEXTS_METRIC_NAME); - metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME); - metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME); + // Remove time taken metrics as they are not deterministic + for (String metric : NON_DETERMINISTIC_METRICS) { + Assert.assertNotNull(metricsMap.remove(metric)); + } Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName()); Assert.assertNotNull(scanMetricsRegionInfo.getServerName()); Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); @@ -650,10 +664,10 @@ private void mergeScanMetricsByRegion(Map> entry : srcMap.entrySet()) { ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); Map metricsMap = entry.getValue(); - // Remove millis between nexts metric as it is not deterministic - metricsMap.remove(MILLIS_BETWEEN_NEXTS_METRIC_NAME); - metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME); - metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME); + // Remove time taken metrics as they are not deterministic + for (String metric : NON_DETERMINISTIC_METRICS) { + Assert.assertNotNull(metricsMap.remove(metric)); + } if (dstMap.containsKey(scanMetricsRegionInfo)) { Map dstMetricsMap = dstMap.get(scanMetricsRegionInfo); for (Map.Entry metricEntry : metricsMap.entrySet()) {