Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ private Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller<Re
currentRegion = callable.getHRegionInfo();
initScanMetricsRegionInfo();
}
callable.populateScanMetrics(scanMetrics);
return rrs;
}

Expand Down Expand Up @@ -332,7 +333,15 @@ protected Result nextWithSyncCache() throws IOException {
return null;
}

loadCache();
long cacheLoadStartTimeMs = EnvironmentEdgeManager.currentTime();
try {
loadCache();
} finally {
if (scanMetrics != null) {
scanMetrics.addToCounter(ScanMetrics.CACHE_LOAD_WAIT_TIME_MS_METRIC_NAME,
EnvironmentEdgeManager.currentTime() - cacheLoadStartTimeMs);
}
}

// try again to load from cache
result = cache.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,24 @@ class QueueingFuture<T> implements RunnableFuture<T> {
private final RpcRetryingCaller<T> retryingCaller;
private boolean resultObtained = false;
private final int replicaId; // replica id
private long submitTimeMs;
private long queueWaitTimeMs = 0;
private long executionTimeMs = 0;

public QueueingFuture(RetryingCallable<T> future, int rpcTimeout, int operationTimeout,
int id) {
this.future = future;
this.operationTimeout = operationTimeout;
this.retryingCaller = retryingCallerFactory.<T> newCaller(rpcTimeout);
this.replicaId = id;
this.submitTimeMs = EnvironmentEdgeManager.currentTime();
}

@SuppressWarnings("unchecked")
@Override
public void run() {
long executionStartTimeMs = EnvironmentEdgeManager.currentTime();
queueWaitTimeMs += (executionStartTimeMs - submitTimeMs);
try {
if (!cancelled) {
result = this.retryingCaller.callWithRetries(future, operationTimeout);
Expand All @@ -77,6 +83,7 @@ public void run() {
} catch (Throwable t) {
exeEx = new ExecutionException(t);
} finally {
executionTimeMs += (EnvironmentEdgeManager.currentTime() - executionStartTimeMs);
synchronized (tasks) {
// If this wasn't canceled then store the result.
if (!cancelled) {
Expand Down Expand Up @@ -147,6 +154,14 @@ public int getReplicaId() {
public ExecutionException getExeEx() {
return exeEx;
}

public long getQueueWaitTimeMs() {
return queueWaitTimeMs;
}

public long getExecutionTimeMs() {
return executionTimeMs;
}
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
Expand Down Expand Up @@ -76,6 +79,10 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
private boolean logScannerActivity = false;
private int logCutOffLatency = 1000;
protected final int id;
private long scanExecutionTimeMs = 0;
private long rpcCallTimeMs = 0;
private long threadPoolWaitTimeMs = 0;
private long threadPoolExecutionTimeMs = 0;

enum MoreResults {
YES,
Expand Down Expand Up @@ -192,7 +199,7 @@ private ScanResponse next() throws IOException {
ScanRequest request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
this.scanMetrics != null, renew, scan.getLimit());
try {
ScanResponse response = getStub().scan(getRpcController(), request);
ScanResponse response = doScan(getRpcController(), request);
nextCallSeq++;
return response;
} catch (Exception e) {
Expand Down Expand Up @@ -247,60 +254,87 @@ private void setAlreadyClosed() {
this.closed = true;
}

private long getRpcCallTimeMs(HBaseRpcController hrc) {
long requestSendTimestampInMs = hrc.getRequestSendTimestampInMs();
long responseReceiveTimestampInMs = hrc.getResponseReceiveTimestampInMs();
if (requestSendTimestampInMs == 0 || responseReceiveTimestampInMs == 0) {
return 0;
}
return responseReceiveTimestampInMs - requestSendTimestampInMs;
}

private ScanResponse doScan(RpcController controller, ScanRequest request)
throws ServiceException {
try {
ScanResponse response = getStub().scan(controller, request);
return response;
} finally {
if (controller instanceof HBaseRpcController) {
HBaseRpcController hrc = (HBaseRpcController) controller;
rpcCallTimeMs += getRpcCallTimeMs(hrc);
}
}
}

@Override
protected Result[] rpcCall() throws Exception {
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
if (closed) {
close();
return null;
}
ScanResponse response;
if (this.scannerId == -1L) {
response = openScanner();
} else {
response = next();
}
long timestamp = EnvironmentEdgeManager.currentTime();
boolean isHeartBeat = response.hasHeartbeatMessage() && response.getHeartbeatMessage();
setHeartbeatMessage(isHeartBeat);
if (isHeartBeat && scan.isNeedCursorResult() && response.hasCursor()) {
cursor = ProtobufUtil.toCursor(response.getCursor());
}
Result[] rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response);
if (logScannerActivity) {
long now = EnvironmentEdgeManager.currentTime();
if (now - timestamp > logCutOffLatency) {
int rows = rrs == null ? 0 : rrs.length;
LOG.info(
"Took " + (now - timestamp) + "ms to fetch " + rows + " rows from scanner=" + scannerId);
long scanExecutionStartTimeMs = EnvironmentEdgeManager.currentTime();
try {
if (closed) {
close();
return null;
}
}
updateServerSideMetrics(scanMetrics, response);
// moreResults is only used for the case where a filter exhausts all elements
if (response.hasMoreResults()) {
if (response.getMoreResults()) {
setMoreResultsForScan(MoreResults.YES);
ScanResponse response;
if (this.scannerId == -1L) {
response = openScanner();
} else {
setMoreResultsForScan(MoreResults.NO);
setAlreadyClosed();
response = next();
}
} else {
setMoreResultsForScan(MoreResults.UNKNOWN);
}
if (response.hasMoreResultsInRegion()) {
if (response.getMoreResultsInRegion()) {
setMoreResultsInRegion(MoreResults.YES);
long timestamp = EnvironmentEdgeManager.currentTime();
boolean isHeartBeat = response.hasHeartbeatMessage() && response.getHeartbeatMessage();
setHeartbeatMessage(isHeartBeat);
if (isHeartBeat && scan.isNeedCursorResult() && response.hasCursor()) {
cursor = ProtobufUtil.toCursor(response.getCursor());
}
Result[] rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response);
if (logScannerActivity) {
long now = EnvironmentEdgeManager.currentTime();
if (now - timestamp > logCutOffLatency) {
int rows = rrs == null ? 0 : rrs.length;
LOG.info("Took " + (now - timestamp) + "ms to fetch " + rows + " rows from scanner="
+ scannerId);
}
}
updateServerSideMetrics(scanMetrics, response);
// moreResults is only used for the case where a filter exhausts all elements
if (response.hasMoreResults()) {
if (response.getMoreResults()) {
setMoreResultsForScan(MoreResults.YES);
} else {
setMoreResultsForScan(MoreResults.NO);
setAlreadyClosed();
}
} else {
setMoreResultsInRegion(MoreResults.NO);
setAlreadyClosed();
setMoreResultsForScan(MoreResults.UNKNOWN);
}
} else {
setMoreResultsInRegion(MoreResults.UNKNOWN);
if (response.hasMoreResultsInRegion()) {
if (response.getMoreResultsInRegion()) {
setMoreResultsInRegion(MoreResults.YES);
} else {
setMoreResultsInRegion(MoreResults.NO);
setAlreadyClosed();
}
} else {
setMoreResultsInRegion(MoreResults.UNKNOWN);
}
updateResultsMetrics(scanMetrics, rrs, isRegionServerRemote);
return rrs;
} finally {
scanExecutionTimeMs += (EnvironmentEdgeManager.currentTime() - scanExecutionStartTimeMs);
}
updateResultsMetrics(scanMetrics, rrs, isRegionServerRemote);
return rrs;
}

/**
Expand Down Expand Up @@ -344,7 +378,7 @@ private void close() {
controller.setPriority(HConstants.HIGH_QOS);

try {
getStub().scan(controller, request);
doScan(controller, request);
} catch (Exception e) {
throw ProtobufUtil.handleRemoteException(e);
}
Expand All @@ -362,7 +396,7 @@ private ScanResponse openScanner() throws IOException {
ScanRequest request = RequestConverter.buildScanRequest(
getLocation().getRegionInfo().getRegionName(), this.scan, this.caching, false);
try {
ScanResponse response = getStub().scan(getRpcController(), request);
ScanResponse response = doScan(getRpcController(), request);
long id = response.getScannerId();
if (logScannerActivity) {
LOG.info("Open scanner=" + id + " for scan=" + scan.toString() + " on region "
Expand Down Expand Up @@ -451,4 +485,33 @@ MoreResults moreResultsForScan() {
void setMoreResultsForScan(MoreResults moreResults) {
this.moreResultsForScan = moreResults;
}

void updateThreadPoolWaitTimeMs(long waitTimeMs) {
this.threadPoolWaitTimeMs += waitTimeMs;
}

void updateThreadPoolExecutionTimeMs(long executionTimeMs) {
this.threadPoolExecutionTimeMs += executionTimeMs;
}

void populateScanMetrics(ScanMetrics scanMetrics) {
if (scanMetrics == null) {
return;
}
scanMetrics.addToCounter(ScanMetrics.THREAD_POOL_WAIT_TIME_MS_METRIC_NAME,
threadPoolWaitTimeMs);
scanMetrics.addToCounter(ScanMetrics.THREAD_POOL_EXECUTION_TIME_MS_METRIC_NAME,
threadPoolExecutionTimeMs);
scanMetrics.addToCounter(ScanMetrics.SCAN_EXECUTION_TIME_MS_METRIC_NAME, scanExecutionTimeMs);
scanMetrics.addToCounter(ScanMetrics.RPC_ROUND_TRIP_TIME_MS_METRIC_NAME, rpcCallTimeMs);
threadPoolWaitTimeMs = 0;
threadPoolExecutionTimeMs = 0;
scanExecutionTimeMs = 0;
rpcCallTimeMs = 0;
}

// Need in ScannerCallableWithReplias during closeScanner call.
long getScanExecutionTimeMs() {
return scanExecutionTimeMs;
}
}
Loading
Loading