diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index a453945e729a..c6bb4b80f83e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -293,6 +293,7 @@ private Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller implements RunnableFuture { private final RpcRetryingCaller retryingCaller; private boolean resultObtained = false; private final int replicaId; // replica id + private long submitTimeMs; + private long queueWaitTimeMs = 0; + private long executionTimeMs = 0; public QueueingFuture(RetryingCallable future, int rpcTimeout, int operationTimeout, int id) { @@ -64,11 +67,14 @@ public QueueingFuture(RetryingCallable future, int rpcTimeout, int operationT this.operationTimeout = operationTimeout; this.retryingCaller = retryingCallerFactory. newCaller(rpcTimeout); this.replicaId = id; + this.submitTimeMs = EnvironmentEdgeManager.currentTime(); } @SuppressWarnings("unchecked") @Override public void run() { + long executionStartTimeMs = EnvironmentEdgeManager.currentTime(); + queueWaitTimeMs += (executionStartTimeMs - submitTimeMs); try { if (!cancelled) { result = this.retryingCaller.callWithRetries(future, operationTimeout); @@ -77,6 +83,7 @@ public void run() { } catch (Throwable t) { exeEx = new ExecutionException(t); } finally { + executionTimeMs += (EnvironmentEdgeManager.currentTime() - executionStartTimeMs); synchronized (tasks) { // If this wasn't canceled then store the result. if (!cancelled) { @@ -147,6 +154,14 @@ public int getReplicaId() { public ExecutionException getExeEx() { return exeEx; } + + public long getQueueWaitTimeMs() { + return queueWaitTimeMs; + } + + public long getExecutionTimeMs() { + return executionTimeMs; + } } @SuppressWarnings("unchecked") diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 8ba8df0c6a8c..140e25a3b865 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -48,6 +48,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; @@ -76,6 +79,10 @@ public class ScannerCallable extends ClientServiceCallable { private boolean logScannerActivity = false; private int logCutOffLatency = 1000; protected final int id; + private long scanExecutionTimeMs = 0; + private long rpcCallTimeMs = 0; + private long threadPoolWaitTimeMs = 0; + private long threadPoolExecutionTimeMs = 0; enum MoreResults { YES, @@ -192,7 +199,7 @@ private ScanResponse next() throws IOException { ScanRequest request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, this.scanMetrics != null, renew, scan.getLimit()); try { - ScanResponse response = getStub().scan(getRpcController(), request); + ScanResponse response = doScan(getRpcController(), request); nextCallSeq++; return response; } catch (Exception e) { @@ -247,60 +254,87 @@ private void setAlreadyClosed() { this.closed = true; } + private long getRpcCallTimeMs(HBaseRpcController hrc) { + long requestSendTimestampInMs = hrc.getRequestSendTimestampInMs(); + long responseReceiveTimestampInMs = hrc.getResponseReceiveTimestampInMs(); + if (requestSendTimestampInMs == 0 || responseReceiveTimestampInMs == 0) { + return 0; + } + return responseReceiveTimestampInMs - requestSendTimestampInMs; + } + + private ScanResponse doScan(RpcController controller, ScanRequest request) + throws ServiceException { + try { + ScanResponse response = getStub().scan(controller, request); + return response; + } finally { + if (controller instanceof HBaseRpcController) { + HBaseRpcController hrc = (HBaseRpcController) controller; + rpcCallTimeMs += getRpcCallTimeMs(hrc); + } + } + } + @Override protected Result[] rpcCall() throws Exception { if (Thread.interrupted()) { throw new InterruptedIOException(); } - if (closed) { - close(); - return null; - } - ScanResponse response; - if (this.scannerId == -1L) { - response = openScanner(); - } else { - response = next(); - } - long timestamp = EnvironmentEdgeManager.currentTime(); - boolean isHeartBeat = response.hasHeartbeatMessage() && response.getHeartbeatMessage(); - setHeartbeatMessage(isHeartBeat); - if (isHeartBeat && scan.isNeedCursorResult() && response.hasCursor()) { - cursor = ProtobufUtil.toCursor(response.getCursor()); - } - Result[] rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response); - if (logScannerActivity) { - long now = EnvironmentEdgeManager.currentTime(); - if (now - timestamp > logCutOffLatency) { - int rows = rrs == null ? 0 : rrs.length; - LOG.info( - "Took " + (now - timestamp) + "ms to fetch " + rows + " rows from scanner=" + scannerId); + long scanExecutionStartTimeMs = EnvironmentEdgeManager.currentTime(); + try { + if (closed) { + close(); + return null; } - } - updateServerSideMetrics(scanMetrics, response); - // moreResults is only used for the case where a filter exhausts all elements - if (response.hasMoreResults()) { - if (response.getMoreResults()) { - setMoreResultsForScan(MoreResults.YES); + ScanResponse response; + if (this.scannerId == -1L) { + response = openScanner(); } else { - setMoreResultsForScan(MoreResults.NO); - setAlreadyClosed(); + response = next(); } - } else { - setMoreResultsForScan(MoreResults.UNKNOWN); - } - if (response.hasMoreResultsInRegion()) { - if (response.getMoreResultsInRegion()) { - setMoreResultsInRegion(MoreResults.YES); + long timestamp = EnvironmentEdgeManager.currentTime(); + boolean isHeartBeat = response.hasHeartbeatMessage() && response.getHeartbeatMessage(); + setHeartbeatMessage(isHeartBeat); + if (isHeartBeat && scan.isNeedCursorResult() && response.hasCursor()) { + cursor = ProtobufUtil.toCursor(response.getCursor()); + } + Result[] rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response); + if (logScannerActivity) { + long now = EnvironmentEdgeManager.currentTime(); + if (now - timestamp > logCutOffLatency) { + int rows = rrs == null ? 0 : rrs.length; + LOG.info("Took " + (now - timestamp) + "ms to fetch " + rows + " rows from scanner=" + + scannerId); + } + } + updateServerSideMetrics(scanMetrics, response); + // moreResults is only used for the case where a filter exhausts all elements + if (response.hasMoreResults()) { + if (response.getMoreResults()) { + setMoreResultsForScan(MoreResults.YES); + } else { + setMoreResultsForScan(MoreResults.NO); + setAlreadyClosed(); + } } else { - setMoreResultsInRegion(MoreResults.NO); - setAlreadyClosed(); + setMoreResultsForScan(MoreResults.UNKNOWN); } - } else { - setMoreResultsInRegion(MoreResults.UNKNOWN); + if (response.hasMoreResultsInRegion()) { + if (response.getMoreResultsInRegion()) { + setMoreResultsInRegion(MoreResults.YES); + } else { + setMoreResultsInRegion(MoreResults.NO); + setAlreadyClosed(); + } + } else { + setMoreResultsInRegion(MoreResults.UNKNOWN); + } + updateResultsMetrics(scanMetrics, rrs, isRegionServerRemote); + return rrs; + } finally { + scanExecutionTimeMs += (EnvironmentEdgeManager.currentTime() - scanExecutionStartTimeMs); } - updateResultsMetrics(scanMetrics, rrs, isRegionServerRemote); - return rrs; } /** @@ -344,7 +378,7 @@ private void close() { controller.setPriority(HConstants.HIGH_QOS); try { - getStub().scan(controller, request); + doScan(controller, request); } catch (Exception e) { throw ProtobufUtil.handleRemoteException(e); } @@ -362,7 +396,7 @@ private ScanResponse openScanner() throws IOException { ScanRequest request = RequestConverter.buildScanRequest( getLocation().getRegionInfo().getRegionName(), this.scan, this.caching, false); try { - ScanResponse response = getStub().scan(getRpcController(), request); + ScanResponse response = doScan(getRpcController(), request); long id = response.getScannerId(); if (logScannerActivity) { LOG.info("Open scanner=" + id + " for scan=" + scan.toString() + " on region " @@ -451,4 +485,33 @@ MoreResults moreResultsForScan() { void setMoreResultsForScan(MoreResults moreResults) { this.moreResultsForScan = moreResults; } + + void updateThreadPoolWaitTimeMs(long waitTimeMs) { + this.threadPoolWaitTimeMs += waitTimeMs; + } + + void updateThreadPoolExecutionTimeMs(long executionTimeMs) { + this.threadPoolExecutionTimeMs += executionTimeMs; + } + + void populateScanMetrics(ScanMetrics scanMetrics) { + if (scanMetrics == null) { + return; + } + scanMetrics.addToCounter(ScanMetrics.THREAD_POOL_WAIT_TIME_MS_METRIC_NAME, + threadPoolWaitTimeMs); + scanMetrics.addToCounter(ScanMetrics.THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME, + threadPoolExecutionTimeMs); + scanMetrics.addToCounter(ScanMetrics.SCAN_EXECUTION_TIME_MS_METRIC_NAME, scanExecutionTimeMs); + scanMetrics.addToCounter(ScanMetrics.RPC_ROUND_TRIP_TIME_MS_METRIC_NAME, rpcCallTimeMs); + threadPoolWaitTimeMs = 0; + threadPoolExecutionTimeMs = 0; + scanExecutionTimeMs = 0; + rpcCallTimeMs = 0; + } + + // Need in ScannerCallableWithReplias during closeScanner call. + long getScanExecutionTimeMs() { + return scanExecutionTimeMs; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 242b9031a4ed..f386dbff16f8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -25,7 +25,6 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; @@ -35,6 +34,8 @@ import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -71,6 +72,8 @@ class ScannerCallableWithReplicas implements RetryingCallable { private Set outstandingCallables = new HashSet<>(); private boolean someRPCcancelled = false; // required for testing purposes only private int regionReplication = 0; + private long metaLookupTimeMs = 0; + private long scannerCloseTimeMs = 0; public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection, ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, @@ -126,6 +129,17 @@ public MoreResults moreResultsForScan() { return currentScannerCallable.moreResultsForScan(); } + private void updateThreadPoolMetricsInScannerCallable( + ResultBoundedCompletionService>.QueueingFuture< + Pair> future, + ScannerCallable scannerCallable) { + if (future == null || scannerCallable == null) { + return; + } + scannerCallable.updateThreadPoolWaitTimeMs(future.getQueueWaitTimeMs()); + scannerCallable.updateThreadPoolExecutionTimeMs(future.getExecutionTimeMs()); + } + @Override public Result[] call(int timeout) throws IOException { // If the active replica callable was closed somewhere, invoke the RPC to @@ -139,6 +153,7 @@ public Result[] call(int timeout) throws IOException { LOG.trace("Closing scanner id=" + currentScannerCallable.scannerId); } Result[] r = currentScannerCallable.call(timeout); + scannerCloseTimeMs += currentScannerCallable.getScanExecutionTimeMs(); currentScannerCallable = null; return r; } else if (currentScannerCallable == null) { @@ -158,6 +173,7 @@ public Result[] call(int timeout) throws IOException { if (regionReplication <= 0) { RegionLocations rl = null; + long metaLookupStartTimeMs = EnvironmentEdgeManager.currentTime(); try { rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, @@ -175,6 +191,8 @@ public Result[] call(int timeout) throws IOException { // For completeness throw e; } + } finally { + metaLookupTimeMs += (EnvironmentEdgeManager.currentTime() - metaLookupStartTimeMs); } regionReplication = rl.size(); } @@ -198,15 +216,16 @@ public Result[] call(int timeout) throws IOException { addCallsForCurrentReplica(cs, rpcTimeoutForCall); int startIndex = 0; + ResultBoundedCompletionService>.QueueingFuture< + Pair> f = null; try { // wait for the timeout to see whether the primary responds back - Future> f = - cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds + f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds if (f != null) { // After poll, if f is not null, there must be a completed task Pair r = f.get(); if (r != null && r.getSecond() != null) { - updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); + updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool, f); } return r == null ? null : r.getFirst(); // great we got a response } @@ -244,8 +263,8 @@ public Result[] call(int timeout) throws IOException { } try { - Future> f = cs.pollForFirstSuccessfullyCompletedTask(timeout, - TimeUnit.MILLISECONDS, startIndex, endIndex); + f = cs.pollForFirstSuccessfullyCompletedTask(timeout, TimeUnit.MILLISECONDS, startIndex, + endIndex); if (f == null) { throw new IOException("Failed to get result within timeout, timeout=" + timeout + "ms"); @@ -253,7 +272,7 @@ public Result[] call(int timeout) throws IOException { Pair r = f.get(); if (r != null && r.getSecond() != null) { - updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); + updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool, f); } return r == null ? null : r.getFirst(); // great we got an answer @@ -274,10 +293,13 @@ public Result[] call(int timeout) throws IOException { @SuppressWarnings("FutureReturnValueIgnored") private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result, - AtomicBoolean done, ExecutorService pool) { + AtomicBoolean done, ExecutorService pool, + ResultBoundedCompletionService>.QueueingFuture< + Pair> future) { if (done.compareAndSet(false, true)) { if (currentScannerCallable != scanner) replicaSwitched.set(true); currentScannerCallable = scanner; + updateThreadPoolMetricsInScannerCallable(future, currentScannerCallable); // store where to start the replica scanner from if we need to. if (result != null && result.length != 0) this.lastResult = result[result.length - 1]; if (LOG.isTraceEnabled()) { @@ -503,4 +525,17 @@ public long sleep(long pause, int tries) { public HRegionLocation getLocation() { return currentScannerCallable.getLocation(); } + + public void populateScanMetrics(ScanMetrics scanMetrics) { + if (scanMetrics == null) { + return; + } + scanMetrics.addToCounter(ScanMetrics.META_LOOKUP_TIME_MS_METRIC_NAME, metaLookupTimeMs); + metaLookupTimeMs = 0; + scanMetrics.addToCounter(ScanMetrics.SCANNER_CLOSE_TIME_MS_METRIC_NAME, scannerCloseTimeMs); + scannerCloseTimeMs = 0; + if (currentScannerCallable != null) { + currentScannerCallable.populateScanMetrics(scanMetrics); + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java index 8617a851509b..28dccbf0df0d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java @@ -47,7 +47,14 @@ public class ScanMetrics extends ServerSideScanMetrics { public static final String REGIONS_SCANNED_METRIC_NAME = "REGIONS_SCANNED"; public static final String RPC_RETRIES_METRIC_NAME = "RPC_RETRIES"; public static final String REMOTE_RPC_RETRIES_METRIC_NAME = "REMOTE_RPC_RETRIES"; - + public static final String THREAD_POOL_WAIT_TIME_MS_METRIC_NAME = "THREAD_POOL_WAIT_TIME_MS"; + public static final String THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME = + "THREAD_POOL_EXECUTION_TIME_MS"; + public static final String META_LOOKUP_TIME_MS_METRIC_NAME = "META_LOOKUP_TIME_MS"; + public static final String SCANNER_CLOSE_TIME_MS_METRIC_NAME = "SCANNER_CLOSE_TIME_MS"; + public static final String SCAN_EXECUTION_TIME_MS_METRIC_NAME = "SCAN_EXECUTION_TIME_MS"; + public static final String RPC_ROUND_TRIP_TIME_MS_METRIC_NAME = "RPC_ROUND_TRIP_TIME_MS"; + public static final String CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME = "CACHE_LOAD_WAIT_TIME_MS"; /** * number of RPC calls */ @@ -95,6 +102,43 @@ public class ScanMetrics extends ServerSideScanMetrics { */ public final AtomicLong countOfRemoteRPCRetries = createCounter(REMOTE_RPC_RETRIES_METRIC_NAME); + /** + * time spent waiting in the thread pool queue before execution started + */ + public final AtomicLong threadPoolWaitTimeMs = + createCounter(THREAD_POOL_WAIT_TIME_MS_METRIC_NAME); + + /** + * time spent executing in the thread pool, including retry overhead + */ + public final AtomicLong threadPoolExecutionTimeMs = + createCounter(THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); + + /** + * time spent looking up region locations from the meta table + */ + public final AtomicLong metaLookupTimeMs = createCounter(META_LOOKUP_TIME_MS_METRIC_NAME); + + /** + * time spent closing the server-side scanner via a close RPC + */ + public final AtomicLong scannerCloseTimeMs = createCounter(SCANNER_CLOSE_TIME_MS_METRIC_NAME); + + /** + * time spent inside ScannerCallable.call() executing the scan RPC logic + */ + public final AtomicLong scanExecutionTimeMs = createCounter(SCAN_EXECUTION_TIME_MS_METRIC_NAME); + + /** + * time for the raw RPC round trip on the wire + */ + public final AtomicLong rpcRoundTripTimeMs = createCounter(RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); + + /** + * total wall-clock time for a loadCache() call in ClientScanner + */ + public final AtomicLong cacheLoadWaitTimeMs = createCounter(CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); + /** * Constructor */ @@ -113,5 +157,12 @@ public void moveToNextRegion() { currentRegionScanMetricsData.createCounter(REGIONS_SCANNED_METRIC_NAME); currentRegionScanMetricsData.createCounter(RPC_RETRIES_METRIC_NAME); currentRegionScanMetricsData.createCounter(REMOTE_RPC_RETRIES_METRIC_NAME); + currentRegionScanMetricsData.createCounter(THREAD_POOL_WAIT_TIME_MS_METRIC_NAME); + currentRegionScanMetricsData.createCounter(THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); + currentRegionScanMetricsData.createCounter(META_LOOKUP_TIME_MS_METRIC_NAME); + currentRegionScanMetricsData.createCounter(SCANNER_CLOSE_TIME_MS_METRIC_NAME); + currentRegionScanMetricsData.createCounter(SCAN_EXECUTION_TIME_MS_METRIC_NAME); + currentRegionScanMetricsData.createCounter(RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); + currentRegionScanMetricsData.createCounter(CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 60f4b7932bfc..c155322c348e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -338,12 +338,15 @@ private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcCont Message param, Message returnType, final User ticket, final Address isa) throws ServiceException { BlockingRpcCallback done = new BlockingRpcCallback<>(); - callMethod(md, hrc, param, returnType, ticket, isa, done); + Call call = callMethod(md, hrc, param, returnType, ticket, isa, done); Message val; try { val = done.get(); } catch (IOException e) { throw new ServiceException(e); + } finally { + hrc.setRequestSendTimestampInMs(call.getRequestSendTimestampInMs()); + hrc.setResponseReceiveTimestampInMs(call.getResponseReceiveTimestampInMs()); } if (hrc.failed()) { throw new ServiceException(hrc.getFailed()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java index 980e708d235c..d5a0d873c126 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java @@ -62,6 +62,8 @@ class Call { private final RpcCallback callback; final Span span; Timeout timeoutTask; + long requestSendTimestampInMs = 0; + long responseReceiveTimestampInMs = 0; Call(int id, final Descriptors.MethodDescriptor md, Message param, final CellScanner cells, final Message responseDefaultType, int timeout, int priority, Map attributes, @@ -159,4 +161,20 @@ public long getStartTime() { public boolean isConnectionRegistryCall() { return md.getService().equals(ConnectionRegistryService.getDescriptor()); } + + public void setRequestSendTimestampInMs(long timestamp) { + this.requestSendTimestampInMs = timestamp; + } + + public void setResponseReceiveTimestampInMs(long timestamp) { + this.responseReceiveTimestampInMs = timestamp; + } + + public long getRequestSendTimestampInMs() { + return requestSendTimestampInMs; + } + + public long getResponseReceiveTimestampInMs() { + return responseReceiveTimestampInMs; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java index 2b8839bf8462..c86192ea403c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java @@ -153,4 +153,24 @@ public void setTableName(TableName tableName) { public TableName getTableName() { return delegate.getTableName(); } + + @Override + public void setRequestSendTimestampInMs(long timestamp) { + delegate.setRequestSendTimestampInMs(timestamp); + } + + @Override + public long getRequestSendTimestampInMs() { + return delegate.getRequestSendTimestampInMs(); + } + + @Override + public void setResponseReceiveTimestampInMs(long timestamp) { + delegate.setResponseReceiveTimestampInMs(timestamp); + } + + @Override + public long getResponseReceiveTimestampInMs() { + return delegate.getResponseReceiveTimestampInMs(); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java index 4d3e038bb5ec..4209baf39225 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java @@ -140,4 +140,31 @@ default void setTableName(TableName tableName) { default TableName getTableName() { return null; } + + /** + * Sets the timestamp when the RPC request was sent over the wire (includes request serialization) + */ + default void setRequestSendTimestampInMs(long timestamp) { + } + + /** + * Sets the timestamp when the RPC response was received from the server (includes response + * deserialization) + */ + default void setResponseReceiveTimestampInMs(long timestamp) { + } + + /** + * Returns the timestamp when the RPC request was sent over the wire, or 0 if not set + */ + default long getRequestSendTimestampInMs() { + return 0; + } + + /** + * Returns the timestamp when the RPC response was received from the server, or 0 if not set + */ + default long getResponseReceiveTimestampInMs() { + return 0; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java index 1245fc0f20d2..3ea2211711fc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java @@ -53,6 +53,10 @@ public class HBaseRpcControllerImpl implements HBaseRpcController { private TableName tableName; + private long requestSendTimestampInMs = 0; + + private long responseReceiveTimestampInMs = 0; + /** * Rpc target Region's RegionInfo we are going against. May be null. * @see #hasRegionInfo() @@ -294,4 +298,24 @@ public void setTableName(TableName tableName) { public TableName getTableName() { return tableName; } + + @Override + public void setRequestSendTimestampInMs(long timestamp) { + this.requestSendTimestampInMs = timestamp; + } + + @Override + public long getRequestSendTimestampInMs() { + return requestSendTimestampInMs; + } + + @Override + public void setResponseReceiveTimestampInMs(long timestamp) { + this.responseReceiveTimestampInMs = timestamp; + } + + @Override + public long getResponseReceiveTimestampInMs() { + return responseReceiveTimestampInMs; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java index ef3752bbf47c..de730a22fa3d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -113,6 +114,8 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) if (msg instanceof Call) { Call call = (Call) msg; try (Scope scope = call.span.makeCurrent()) { + long currentTime = EnvironmentEdgeManager.currentTime(); + call.setRequestSendTimestampInMs(currentTime); writeRequest(ctx, call, promise); } } else { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java index 2ec5a7ee1a39..a89328a1efe0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java @@ -366,30 +366,36 @@ protected final Call createSecurityPreambleCall(RpcCallback callback) { private void finishCall(ResponseHeader responseHeader, T in, Call call) throws IOException { Message value; - if (call.responseDefaultType != null) { - Message.Builder builder = call.responseDefaultType.newBuilderForType(); - if (!builder.mergeDelimitedFrom(in)) { - // The javadoc of mergeDelimitedFrom says returning false means the stream reaches EOF - // before reading any bytes out, so here we need to manually finish create the EOFException - // and finish the call - call.setException(new EOFException("EOF while reading response with type: " - + call.responseDefaultType.getClass().getName())); - return; - } - value = builder.build(); - } else { - value = null; - } CellScanner cellBlockScanner; - if (responseHeader.hasCellBlockMeta()) { - int size = responseHeader.getCellBlockMeta().getLength(); - // Maybe we could read directly from the ByteBuf. - // The problem here is that we do not know when to release it. - byte[] cellBlock = new byte[size]; - in.readFully(cellBlock); - cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock); - } else { - cellBlockScanner = null; + try { + if (call.responseDefaultType != null) { + Message.Builder builder = call.responseDefaultType.newBuilderForType(); + if (!builder.mergeDelimitedFrom(in)) { + // The javadoc of mergeDelimitedFrom says returning false means the stream reaches EOF + // before reading any bytes out, so here we need to manually finish create the + // EOFException + // and finish the call + call.setException(new EOFException("EOF while reading response with type: " + + call.responseDefaultType.getClass().getName())); + return; + } + value = builder.build(); + } else { + value = null; + } + if (responseHeader.hasCellBlockMeta()) { + int size = responseHeader.getCellBlockMeta().getLength(); + // Maybe we could read directly from the ByteBuf. + // The problem here is that we do not know when to release it. + byte[] cellBlock = new byte[size]; + in.readFully(cellBlock); + cellBlockScanner = + cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock); + } else { + cellBlockScanner = null; + } + } finally { + setResponseReceiveTimestampInMs(call); } call.setResponse(value, cellBlockScanner); } @@ -452,6 +458,7 @@ void readResponse(T in, Map i } call.callStats.setResponseSizeBytes(totalSize); if (remoteExc != null) { + setResponseReceiveTimestampInMs(call); call.setException(remoteExc); return; } @@ -467,4 +474,8 @@ void readResponse(T in, Map i throw e; } } + + private void setResponseReceiveTimestampInMs(Call call) { + call.setResponseReceiveTimestampInMs(EnvironmentEdgeManager.currentTime()); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideTableScanMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideTableScanMetrics.java new file mode 100644 index 000000000000..df680cb6297a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideTableScanMetrics.java @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@Category({ ClientTests.class, MediumTests.class }) +public class TestClientSideTableScanMetrics extends FromClientSideBase { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestClientSideTableScanMetrics.class); + + static AtomicInteger sleepTimeMs = new AtomicInteger(0); + + private static final TableName TABLE_NAME = + TableName.valueOf(TestClientSideTableScanMetrics.class.getSimpleName()); + + public static class SleepOnScanCoprocessor implements RegionCoprocessor, RegionObserver { + static final int SLEEP_TIME_MS = 5; + static final AtomicBoolean throwOnNextOnce = new AtomicBoolean(false); + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public void preScannerOpen(ObserverContext c, Scan scan) + throws IOException { + try { + Thread.sleep(SLEEP_TIME_MS); + sleepTimeMs.addAndGet(SLEEP_TIME_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + @Override + public boolean preScannerNext(ObserverContext c, + InternalScanner s, List results, int limit, boolean hasMore) throws IOException { + if ( + TABLE_NAME.equals(c.getEnvironment().getRegionInfo().getTable()) + && throwOnNextOnce.compareAndSet(true, false) + ) { + throw new IOException("Injected fault for testing remoteExc path"); + } + try { + Thread.sleep(SLEEP_TIME_MS); + sleepTimeMs.addAndGet(SLEEP_TIME_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return true; + } + + @Override + public void preScannerClose(ObserverContext c, InternalScanner s) + throws IOException { + try { + Thread.sleep(SLEEP_TIME_MS); + sleepTimeMs.addAndGet(SLEEP_TIME_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static final byte[] CF = Bytes.toBytes("cf"); + + private static final byte[] CQ = Bytes.toBytes("cq"); + + private static final byte[] VALUE = Bytes.toBytes("value"); + + private static int NUM_REGIONS; + + private static Connection CONN; + + @Parameters(name = "{index}: scanner={0}") + public static List params() { + return Arrays.asList(new Object[] { "ForwardScanner", new Scan() }, + new Object[] { "ReverseScanner", new Scan().setReversed(true) }); + } + + @Parameter(0) + public String scannerName; + + @Parameter(1) + public Scan originalScan; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.getConfiguration().setInt(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0); + TEST_UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + SleepOnScanCoprocessor.class.getName()); + TEST_UTIL.startMiniCluster(2); + try (Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, CF)) { + table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, VALUE), + new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE), + new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE), + new Put(Bytes.toBytes("zzz2")).addColumn(CF, CQ, VALUE), + new Put(Bytes.toBytes("zzz3")).addColumn(CF, CQ, VALUE))); + } + CONN = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); + NUM_REGIONS = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).size(); + } + + @AfterClass + public static void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void testCaseSetup() { + sleepTimeMs.set(0); + SleepOnScanCoprocessor.throwOnNextOnce.set(false); + } + + protected Scan generateScan(byte[] smallerRow, byte[] largerRow) throws IOException { + Scan scan = new Scan(originalScan); + if (originalScan.isReversed()) { + scan.withStartRow(largerRow, true); + scan.withStopRow(smallerRow, true); + } else { + scan.withStartRow(smallerRow, true); + scan.withStopRow(largerRow, true); + } + return scan; + } + + protected ScanMetrics assertScannedRowsAndGetScanMetrics(Scan scan, int expectedCount) + throws IOException { + int countOfRows = 0; + ScanMetrics scanMetrics; + try (Table table = CONN.getTable(TABLE_NAME); ResultScanner scanner = table.getScanner(scan)) { + for (Result result : scanner) { + Assert.assertFalse(result.isEmpty()); + countOfRows++; + } + scanMetrics = scanner.getScanMetrics(); + } + Assert.assertEquals(expectedCount, countOfRows); + System.out.println("ScanMetrics: " + scanMetrics + ", sleepTimeMs: " + sleepTimeMs.get()); + return scanMetrics; + } + + @Test + public void testScanExecutionAndRpcTimeMetrics() throws Exception { + Scan scan = generateScan(HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + scan.setScanMetricsEnabled(true); + ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 5); + Assert.assertNotNull(scanMetrics); + Map metricsMap = scanMetrics.getMetricsMap(false); + + long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); + Assert.assertEquals(NUM_REGIONS, regionsScanned); + + long cacheLoad = metricsMap.get(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); + long scanExec = metricsMap.get(ScanMetrics.SCAN_EXECUTION_TIME_MS_METRIC_NAME); + long rpcRoundTrip = metricsMap.get(ScanMetrics.RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); + + // Per region two times the slowness is introduced, once for preScannerOpen and once for + // preScannerNext. + Assert.assertTrue(cacheLoad >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + Assert.assertTrue(scanExec >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + Assert.assertTrue(rpcRoundTrip >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + + Assert.assertTrue( + "rpcRoundTrip (" + rpcRoundTrip + ") should be <= scanExecution (" + scanExec + ")", + rpcRoundTrip <= scanExec); + Assert.assertTrue("scanExecution (" + scanExec + ") should be <= cacheLoad (" + cacheLoad + ")", + scanExec <= cacheLoad); + } + + @Test + public void testMetaLookupTimeMetric() throws Exception { + CONN.clearRegionLocationCache(); + + Scan scan = generateScan(HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + scan.setScanMetricsEnabled(true); + ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 5); + Assert.assertNotNull(scanMetrics); + Map metricsMap = scanMetrics.getMetricsMap(false); + + long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); + Assert.assertEquals(NUM_REGIONS, regionsScanned); + + // Meta lookup time for forward scan happens in ScannerCallableWithReplicas.call() but for + // reverse scan it happens in ReverseScannerCallable.prepare() except the first time. Thus, + // asserting on minimum value of meta lookup time across both directions of scan. + long metaLookupTime = metricsMap.get(ScanMetrics.META_LOOKUP_TIME_MS_METRIC_NAME); + Assert.assertTrue(metaLookupTime >= SleepOnScanCoprocessor.SLEEP_TIME_MS * 2); + + long cacheLoad = metricsMap.get(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); + Assert.assertTrue(cacheLoad >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + Assert.assertTrue( + "metaLookupTime (" + metaLookupTime + ") should be <= cacheLoad (" + cacheLoad + ")", + metaLookupTime <= cacheLoad); + } + + @Test + public void testThreadPoolMetrics() throws Exception { + int minWaitTimeMs = 50; + ExecutorService singleThreadPool = Executors.newFixedThreadPool(1); + try { + CountDownLatch taskStarted = new CountDownLatch(1); + CountDownLatch allowFinish = new CountDownLatch(1); + singleThreadPool.submit(() -> { + taskStarted.countDown(); + try { + allowFinish.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + taskStarted.await(); + + Scan scan = generateScan(HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + scan.setScanMetricsEnabled(true); + + new Thread(() -> { + try { + ThreadPoolExecutor tpe = (ThreadPoolExecutor) singleThreadPool; + while (tpe.getQueue().size() < 1) { + Thread.sleep(10); + } + Thread.sleep(minWaitTimeMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + allowFinish.countDown(); + }).start(); + + int countOfRows = 0; + ScanMetrics scanMetrics; + try (Table table = CONN.getTable(TABLE_NAME, singleThreadPool); + ResultScanner scanner = table.getScanner(scan)) { + for (Result result : scanner) { + Assert.assertFalse(result.isEmpty()); + countOfRows++; + } + scanMetrics = scanner.getScanMetrics(); + } + Assert.assertEquals(5, countOfRows); + Assert.assertNotNull(scanMetrics); + Map metricsMap = scanMetrics.getMetricsMap(false); + + long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); + Assert.assertEquals(NUM_REGIONS, regionsScanned); + + long threadPoolWaitTimeMs = metricsMap.get(ScanMetrics.THREAD_POOL_WAIT_TIME_MS_METRIC_NAME); + Assert.assertTrue(threadPoolWaitTimeMs >= minWaitTimeMs); + long threadPoolExecutionTimeMs = + metricsMap.get(ScanMetrics.THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); + Assert.assertTrue( + threadPoolExecutionTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + long scanExecutionTimeMs = metricsMap.get(ScanMetrics.SCAN_EXECUTION_TIME_MS_METRIC_NAME); + Assert + .assertTrue(scanExecutionTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + long cacheLoadWaitTimeMs = metricsMap.get(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); + + Assert + .assertTrue( + "scanExecutionTimeMs (" + scanExecutionTimeMs + + ") should be <= threadPoolExecutionTimeMs (" + threadPoolExecutionTimeMs + ")", + scanExecutionTimeMs <= threadPoolExecutionTimeMs); + Assert.assertTrue( + "threadPoolExecutionTimeMs (" + threadPoolExecutionTimeMs + + ") should be <= cacheLoadWaitTimeMs (" + cacheLoadWaitTimeMs + ")", + threadPoolExecutionTimeMs <= cacheLoadWaitTimeMs); + Assert.assertTrue("threadPoolWaitTimeMs (" + threadPoolWaitTimeMs + + ") should be <= cacheLoadWaitTimeMs (" + cacheLoadWaitTimeMs + ")", + threadPoolWaitTimeMs <= cacheLoadWaitTimeMs); + } finally { + singleThreadPool.shutdownNow(); + } + } + + @Test + public void testScannerCloseTimeMetric() throws Exception { + Scan scan = generateScan(Bytes.toBytes("zzz"), HConstants.EMPTY_END_ROW); + scan.setScanMetricsEnabled(true); + scan.setCaching(1); + + ScanMetrics scanMetrics; + try (Table table = CONN.getTable(TABLE_NAME); ResultScanner scanner = table.getScanner(scan)) { + Result result = scanner.next(); + Assert.assertNotNull(result); + Assert.assertFalse(result.isEmpty()); + scanMetrics = scanner.getScanMetrics(); + } + + Assert.assertNotNull(scanMetrics); + Map metricsMap = scanMetrics.getMetricsMap(false); + System.out.println("ScanMetrics: " + scanMetrics + ", sleepTimeMs: " + sleepTimeMs.get()); + + long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); + Assert.assertEquals(1, regionsScanned); + + long scannerCloseTime = metricsMap.get(ScanMetrics.SCANNER_CLOSE_TIME_MS_METRIC_NAME); + Assert.assertTrue(scannerCloseTime >= SleepOnScanCoprocessor.SLEEP_TIME_MS); + long cacheLoadWaitTimeMs = metricsMap.get(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); + Assert.assertTrue(cacheLoadWaitTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * 2); + Assert.assertTrue("scannerCloseTime (" + scannerCloseTime + + ") should be <= cacheLoadWaitTimeMs (" + cacheLoadWaitTimeMs + ")", + scannerCloseTime <= cacheLoadWaitTimeMs); + } + + @Test + public void testScanMetricsWithRemoteException() throws Exception { + SleepOnScanCoprocessor.throwOnNextOnce.set(true); + + Scan scan = generateScan(HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + scan.setScanMetricsEnabled(true); + ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 5); + Assert.assertNotNull(scanMetrics); + Map metricsMap = scanMetrics.getMetricsMap(false); + + long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); + Assert.assertEquals(NUM_REGIONS + 1, regionsScanned); + long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME); + Assert.assertEquals("rpcCalls should be 1 + NUM_REGIONS due to 1 extra retry", 1 + NUM_REGIONS, + rpcCalls); + + long cacheLoadWaitTimeMs = metricsMap.get(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); + long threadPoolExecutionTimeMs = + metricsMap.get(ScanMetrics.THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); + long scanExecutionTimeMs = metricsMap.get(ScanMetrics.SCAN_EXECUTION_TIME_MS_METRIC_NAME); + long rpcRoundTripTimeMs = metricsMap.get(ScanMetrics.RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); + + Assert + .assertTrue(cacheLoadWaitTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + Assert.assertTrue( + threadPoolExecutionTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + Assert + .assertTrue(scanExecutionTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + Assert.assertTrue(rpcRoundTripTimeMs >= SleepOnScanCoprocessor.SLEEP_TIME_MS * NUM_REGIONS * 2); + + Assert.assertTrue("rpcRoundTripTimeMs (" + rpcRoundTripTimeMs + + ") should be <= scanExecutionTimeMs (" + scanExecutionTimeMs + ")", + rpcRoundTripTimeMs <= scanExecutionTimeMs); + Assert.assertTrue("scanExecutionTimeMs (" + scanExecutionTimeMs + + ") should be <= threadPoolExecutionTimeMs (" + threadPoolExecutionTimeMs + ")", + scanExecutionTimeMs <= threadPoolExecutionTimeMs); + Assert.assertTrue( + "threadPoolExecutionTimeMs (" + threadPoolExecutionTimeMs + + ") should be <= cacheLoadWaitTimeMs (" + cacheLoadWaitTimeMs + ")", + threadPoolExecutionTimeMs <= cacheLoadWaitTimeMs); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java index 5d372ac7b70b..69b0f380962e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java @@ -25,6 +25,7 @@ import com.codahale.metrics.Counter; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.StartMiniClusterOption; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; @@ -109,6 +111,7 @@ public static class SlowMeCopro implements RegionCoprocessor, RegionObserver { static final AtomicInteger primaryCountOfScan = new AtomicInteger(0); static final AtomicInteger secondaryCountOfScan = new AtomicInteger(0); static final AtomicLong sleepTime = new AtomicLong(0); + static final AtomicLong secondarySleepTime = new AtomicLong(0); static final AtomicBoolean slowDownNext = new AtomicBoolean(false); static final AtomicInteger countOfNext = new AtomicInteger(0); private static final AtomicReference primaryCdl = @@ -186,6 +189,11 @@ private void slowdownCode(final ObserverContext e) LOG.info("We're not the primary replicas."); CountDownLatch latch = getSecondaryCdl().get(); try { + if (secondarySleepTime.get() > 0) { + LOG.info( + "Sleeping for " + secondarySleepTime.get() + " ms while fetching secondary replica"); + Thread.sleep(secondarySleepTime.get()); + } if (latch.getCount() > 0) { LOG.info("Waiting for the secondary counterCountDownLatch"); latch.await(2, TimeUnit.MINUTES); // To help the tests to finish. @@ -262,6 +270,7 @@ public void before() throws IOException { } SlowMeCopro.slowDownNext.set(false); SlowMeCopro.sleepTime.set(0); + SlowMeCopro.secondarySleepTime.set(0); SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(0)); SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(0)); table = HTU.getConnection().getTable(TABLE_NAME); @@ -831,6 +840,169 @@ public void testScanMetricsByRegion() throws Exception { } } + @Test + public void testClientScanTimingMetricsFromSecondaryReplica() throws Exception { + byte[] b1 = Bytes.toBytes("testScanTimingMetrics"); + long secondaryReplicaSleepTime = 20; + openRegion(hriSecondary); + + try { + Put p = new Put(b1); + p.addColumn(f, b1, b1); + table.put(p); + flushRegion(hriPrimary); + Thread.sleep(2 * REFRESH_PERIOD); + + SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1)); + SlowMeCopro.secondarySleepTime.set(secondaryReplicaSleepTime); + + Scan scan = new Scan(); + scan.setScanMetricsEnabled(true); + scan.withStartRow(b1, true); + scan.withStopRow(b1, true); + scan.setConsistency(Consistency.TIMELINE); + + ScanMetrics scanMetrics; + try (ResultScanner rs = table.getScanner(scan)) { + for (Result r : rs) { + Assert.assertTrue(r.isStale()); + Assert.assertFalse(r.isEmpty()); + } + scanMetrics = rs.getScanMetrics(); + } + Assert.assertNotNull(scanMetrics); + Map metricsMap = scanMetrics.getMetricsMap(false); + + long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); + Assert.assertEquals("regionsScanned should be 1", 1, regionsScanned); + + long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME); + // 1 for primary replica and 1 for secondary replica + Assert.assertEquals("rpcCalls should be 2", 2, rpcCalls); + + long cacheLoad = metricsMap.get(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); + long threadPoolExec = metricsMap.get(ScanMetrics.THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); + long scanExec = metricsMap.get(ScanMetrics.SCAN_EXECUTION_TIME_MS_METRIC_NAME); + long rpcRoundTrip = metricsMap.get(ScanMetrics.RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); + + Assert.assertTrue( + "cacheLoadWaitTimeMs should be >= " + secondaryReplicaSleepTime + ", got " + cacheLoad, + cacheLoad >= secondaryReplicaSleepTime); + Assert.assertTrue("threadPoolExecutionTimeMs should be >= " + secondaryReplicaSleepTime + + ", got " + threadPoolExec, threadPoolExec >= secondaryReplicaSleepTime); + Assert.assertTrue( + "scanExecutionTimeMs should be >= " + secondaryReplicaSleepTime + ", got " + scanExec, + scanExec >= secondaryReplicaSleepTime); + Assert.assertTrue( + "rpcRoundTripTimeMs should be >= " + secondaryReplicaSleepTime + ", got " + rpcRoundTrip, + rpcRoundTrip >= secondaryReplicaSleepTime); + + Assert.assertTrue( + "rpcRoundTrip (" + rpcRoundTrip + ") should be <= scanExecution (" + scanExec + ")", + rpcRoundTrip <= scanExec); + Assert.assertTrue("scanExecution (" + scanExec + ") should be <= threadPoolExecution (" + + threadPoolExec + ")", scanExec <= threadPoolExec); + Assert.assertTrue( + "threadPoolExecution (" + threadPoolExec + ") should be <= cacheLoad (" + cacheLoad + ")", + threadPoolExec <= cacheLoad); + } finally { + SlowMeCopro.getPrimaryCdl().get().countDown(); + SlowMeCopro.secondarySleepTime.set(0); + Delete d = new Delete(b1); + table.delete(d); + closeRegion(hriSecondary); + } + } + + @Test + public void testClientScanTimingMetricsWithReplicaSwitchMidScan() throws Exception { + byte[] b1 = Bytes.toBytes("testScanTimingPriSec"); + byte[] b2 = Bytes.toBytes("testScanTimingPriSed"); + byte[] b3 = Bytes.toBytes("testScanTimingPriSee"); + long replicaSleepTime = 50; + openRegion(hriSecondary); + + try { + table.put(Arrays.asList(new Put(b1).addColumn(f, b1, b1), new Put(b2).addColumn(f, b2, b2), + new Put(b3).addColumn(f, b3, b3))); + flushRegion(hriPrimary); + Thread.sleep(2 * REFRESH_PERIOD); + + SlowMeCopro.sleepTime.set(replicaSleepTime); + SlowMeCopro.slowDownNext.set(true); + SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(1)); + + Scan scan = new Scan(); + scan.setScanMetricsEnabled(true); + scan.withStartRow(b1, true); + scan.withStopRow(b3, true); + scan.setCaching(1); + scan.setConsistency(Consistency.TIMELINE); + + ScanMetrics scanMetrics; + try (ResultScanner rs = table.getScanner(scan)) { + Result r1 = rs.next(); + Assert.assertNotNull(r1); + Assert.assertFalse(r1.isStale()); + + SlowMeCopro.sleepTime.set(0); + SlowMeCopro.secondarySleepTime.set(replicaSleepTime); + SlowMeCopro.getSecondaryCdl().get().countDown(); + + Result r2 = rs.next(); + Assert.assertNotNull(r2); + Assert.assertTrue(r2.isStale()); + + scanMetrics = rs.getScanMetrics(); + } + Assert.assertNotNull(scanMetrics); + Map metricsMap = scanMetrics.getMetricsMap(false); + + long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME); + Assert.assertEquals("regionsScanned should be 1", 1, regionsScanned); + + long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME); + + Assert.assertEquals("rpcCalls should be 5", 5, rpcCalls); + + long cacheLoad = metricsMap.get(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME); + long threadPoolExec = metricsMap.get(ScanMetrics.THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); + long scanExec = metricsMap.get(ScanMetrics.SCAN_EXECUTION_TIME_MS_METRIC_NAME); + long rpcRoundTrip = metricsMap.get(ScanMetrics.RPC_ROUND_TRIP_TIME_MS_METRIC_NAME); + + long minExpected = 2 * replicaSleepTime; + Assert.assertTrue("cacheLoadWaitTimeMs should be >= " + minExpected + ", got " + cacheLoad, + cacheLoad >= minExpected); + Assert.assertTrue( + "threadPoolExecutionTimeMs should be >= " + minExpected + ", got " + threadPoolExec, + threadPoolExec >= minExpected); + Assert.assertTrue("scanExecutionTimeMs should be >= " + minExpected + ", got " + scanExec, + scanExec >= minExpected); + Assert.assertTrue("rpcRoundTripTimeMs should be >= " + minExpected + ", got " + rpcRoundTrip, + rpcRoundTrip >= minExpected); + + Assert.assertTrue( + "rpcRoundTrip (" + rpcRoundTrip + ") should be <= scanExecution (" + scanExec + ")", + rpcRoundTrip <= scanExec); + Assert.assertTrue("scanExecution (" + scanExec + ") should be <= threadPoolExecution (" + + threadPoolExec + ")", scanExec <= threadPoolExec); + Assert.assertTrue( + "threadPoolExecution (" + threadPoolExec + ") should be <= cacheLoad (" + cacheLoad + ")", + threadPoolExec <= cacheLoad); + } finally { + SlowMeCopro.getPrimaryCdl().get().countDown(); + SlowMeCopro.getSecondaryCdl().get().countDown(); + SlowMeCopro.sleepTime.set(0); + SlowMeCopro.secondarySleepTime.set(0); + SlowMeCopro.slowDownNext.set(false); + SlowMeCopro.countOfNext.set(0); + table.delete(new Delete(b1)); + table.delete(new Delete(b2)); + table.delete(new Delete(b3)); + closeRegion(hriSecondary); + } + } + private void assertScanMetrics(Scan scan, RegionInfo regionInfo, boolean isStale) throws IOException { try (ResultScanner rs = table.getScanner(scan);) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java index ac3b98a6bf78..6a7d443f1cea 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java @@ -18,10 +18,17 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY; +import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME; +import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.META_LOOKUP_TIME_MS_METRIC_NAME; import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME; import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.NOT_SERVING_REGION_EXCEPTION_METRIC_NAME; import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME; import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_RETRIES_METRIC_NAME; +import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_ROUND_TRIP_TIME_MS_METRIC_NAME; +import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.SCANNER_CLOSE_TIME_MS_METRIC_NAME; +import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.SCAN_EXECUTION_TIME_MS_METRIC_NAME; +import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME; +import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.THREAD_POOL_WAIT_TIME_MS_METRIC_NAME; import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME; import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.RPC_SCAN_PROCESSING_TIME_METRIC_NAME; import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME; @@ -86,6 +93,13 @@ public class TestTableScanMetrics extends FromClientSideBase { private static Connection CONN; + private static final List NON_DETERMINISTIC_METRICS = + Arrays.asList(MILLIS_BETWEEN_NEXTS_METRIC_NAME, RPC_SCAN_PROCESSING_TIME_METRIC_NAME, + RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME, RPC_ROUND_TRIP_TIME_MS_METRIC_NAME, + SCAN_EXECUTION_TIME_MS_METRIC_NAME, CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME, + META_LOOKUP_TIME_MS_METRIC_NAME, SCANNER_CLOSE_TIME_MS_METRIC_NAME, + THREAD_POOL_WAIT_TIME_MS_METRIC_NAME, THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME); + @Parameters(name = "{index}: scanner={0}") public static List params() { return Arrays.asList(new Object[] { "ForwardScanner", new Scan() }, @@ -338,10 +352,10 @@ public void run() { .entrySet()) { ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); Map metricsMap = entry.getValue(); - // Remove millis between nexts metric as it is not deterministic - metricsMap.remove(MILLIS_BETWEEN_NEXTS_METRIC_NAME); - metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME); - metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME); + // Remove time taken metrics as they are not deterministic + for (String metric : NON_DETERMINISTIC_METRICS) { + Assert.assertNotNull(metricsMap.remove(metric)); + } Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName()); Assert.assertNotNull(scanMetricsRegionInfo.getServerName()); Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); @@ -650,10 +664,10 @@ private void mergeScanMetricsByRegion(Map> entry : srcMap.entrySet()) { ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); Map metricsMap = entry.getValue(); - // Remove millis between nexts metric as it is not deterministic - metricsMap.remove(MILLIS_BETWEEN_NEXTS_METRIC_NAME); - metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME); - metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME); + // Remove time taken metrics as they are not deterministic + for (String metric : NON_DETERMINISTIC_METRICS) { + Assert.assertNotNull(metricsMap.remove(metric)); + } if (dstMap.containsKey(scanMetricsRegionInfo)) { Map dstMetricsMap = dstMap.get(scanMetricsRegionInfo); for (Map.Entry metricEntry : metricsMap.entrySet()) {