Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,16 @@ public static void verifySuccessWithRedirectionForMultiDevices(
}

public static void verifySuccess(List<TSStatus> statuses) throws BatchExecutionException {
StringBuilder errMsgs =
new StringBuilder().append(TSStatusCode.MULTIPLE_ERROR.getStatusCode()).append(": ");
StringBuilder errMsgs = new StringBuilder();
for (TSStatus status : statuses) {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
errMsgs.append(status.getMessage()).append("; ");
}
}
if (errMsgs.length() > 0) {
throw new BatchExecutionException(statuses, errMsgs.toString());
throw new BatchExecutionException(
statuses, TSStatusCode.MULTIPLE_ERROR.getStatusCode() + ": " + errMsgs);
}
}

Expand All @@ -181,9 +181,9 @@ public static TSStatus getStatus(List<TSStatus> statusList) {
for (TSStatus subStatus : statusList) {
if (subStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& subStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
if (!msgSet.contains(status)) {
errMsg.append(status).append("; ");
msgSet.add(status);
if (!msgSet.contains(subStatus)) {
errMsg.append(subStatus).append("; ");
msgSet.add(subStatus);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@

package org.apache.iotdb.rpc;

import org.apache.iotdb.common.rpc.thrift.TSStatus;

import org.junit.Assert;
import org.junit.Test;

import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Collections;

public class RpcUtilsTest {

Expand Down Expand Up @@ -74,4 +78,25 @@ public void testIsSetSqlDialect() {
Assert.assertFalse(RpcUtils.isSetSqlDialect("setsql_dialect =table"));
Assert.assertFalse(RpcUtils.isSetSqlDialect("set sql_dia"));
}

@Test
public void testVerifySuccessListAllowsSuccessfulStatuses() throws BatchExecutionException {
RpcUtils.verifySuccess(
Arrays.asList(
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS),
RpcUtils.getStatus(TSStatusCode.REDIRECTION_RECOMMEND)));
}

@Test
public void testVerifySuccessListThrowsOnFailure() {
TSStatus failedStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "failed");

try {
RpcUtils.verifySuccess(Collections.singletonList(failedStatus));
Assert.fail("Expected BatchExecutionException");
} catch (BatchExecutionException e) {
Assert.assertEquals(Collections.singletonList(failedStatus), e.getStatusList());
Assert.assertTrue(e.getMessage().contains("failed"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public void getAINodeHeartBeat(
client = clientManager.borrowClient(endPoint);
client.getAIHeartbeat(req, handler);
dispatched = true;
} catch (Exception ignore) {
// Just ignore
} catch (Exception e) {
handleError(handler, e);
} finally {
// After the async call is dispatched, the client's onComplete/onError callback is
// responsible for returning the client. If the RPC was not dispatched (exception
Expand All @@ -67,6 +67,14 @@ public void getAINodeHeartBeat(
}
}

private void handleError(final AINodeHeartbeatHandler handler, final Exception e) {
try {
handler.onError(e);
} catch (final Exception ignore) {
// Ignore handler failures in heartbeat best-effort path.
}
}

private static class AsyncAINodeHeartbeatClientPoolHolder {

private static final AsyncAINodeHeartbeatClientPool INSTANCE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ public void getConfigNodeHeartBeat(
client = clientManager.borrowClient(endPoint);
client.getConfigNodeHeartBeat(heartbeatReq, handler);
dispatched = true;
} catch (Exception ignore) {
// Just ignore
} catch (Exception e) {
handleError(handler, e);
} finally {
// After the async call is dispatched, the client's onComplete/onError callback is
// responsible for returning the client. If the RPC was not dispatched (exception
Expand All @@ -68,6 +68,14 @@ public void getConfigNodeHeartBeat(
}
}

private void handleError(final ConfigNodeHeartbeatHandler handler, final Exception e) {
try {
handler.onError(e);
} catch (final Exception ignore) {
// Ignore handler failures in heartbeat best-effort path.
}
}

private static class AsyncConfigNodeHeartbeatClientPoolHolder {

private static final AsyncConfigNodeHeartbeatClientPool INSTANCE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public void getDataNodeHeartBeat(
client = clientManager.borrowClient(endPoint);
client.getDataNodeHeartBeat(req, handler);
dispatched = true;
} catch (Exception ignore) {
// Just ignore
} catch (Exception e) {
handleError(handler, e);
} finally {
returnClientIfNotDispatched(endPoint, client, dispatched);
}
Expand All @@ -72,7 +72,7 @@ public void writeAuditLog(
client.writeAuditLog(req, handler);
dispatched = true;
} catch (Exception e) {
// Just ignore
handleError(handler, e);
} finally {
returnClientIfNotDispatched(endPoint, client, dispatched);
}
Expand All @@ -89,6 +89,22 @@ private void returnClientIfNotDispatched(
}
}

private void handleError(final DataNodeHeartbeatHandler handler, final Exception e) {
try {
handler.onError(e);
} catch (final Exception ignore) {
// Ignore handler failures in heartbeat best-effort path.
}
}

private void handleError(final DataNodeWriteAuditLogHandler handler, final Exception e) {
try {
handler.onError(e);
} catch (final Exception ignore) {
// Ignore handler failures in audit-log best-effort path.
}
}

private static class AsyncDataNodeHeartbeatClientPoolHolder {

private static final AsyncDataNodeHeartbeatClientPool INSTANCE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ public void onError(Exception e) {
+ e.getMessage();
LOGGER.error(errorMsg);

countDownLatch.countDown();
TCheckTimeSeriesExistenceResp resp = new TCheckTimeSeriesExistenceResp();
resp.setStatus(
new TSStatus(
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg)));
responseMap.put(requestId, resp);
countDownLatch.countDown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ public void onError(Exception e) {
+ e.getMessage();
LOGGER.error(errorMsg);

countDownLatch.countDown();
TCountPathsUsingTemplateResp resp = new TCountPathsUsingTemplateResp();
resp.setStatus(
new TSStatus(
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg)));
responseMap.put(requestId, resp);
countDownLatch.countDown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ public void onError(Exception e) {
+ e.getMessage();
LOGGER.error(errorMsg);

countDownLatch.countDown();
TFetchSchemaBlackListResp resp = new TFetchSchemaBlackListResp();
resp.setStatus(
new TSStatus(
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg)));
responseMap.put(requestId, resp);
countDownLatch.countDown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ public void onError(Exception e) {
+ e.getMessage();
LOGGER.warn(errorMsg);

countDownLatch.countDown();
responseMap.put(
requestId,
new TSStatus(
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg)));
countDownLatch.countDown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ public void onError(Exception e) {
+ e.getMessage();
LOGGER.error(errorMsg);

countDownLatch.countDown();
TCheckSchemaRegionUsingTemplateResp resp = new TCheckSchemaRegionUsingTemplateResp();
resp.setStatus(
new TSStatus(
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg)));
responseMap.put(requestId, resp);
countDownLatch.countDown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
import org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateClusterTopologyReq;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.ratis.util.AwaitForSignal;
import org.apache.tsfile.utils.Pair;
Expand Down Expand Up @@ -72,6 +73,7 @@
public class TopologyService implements Runnable, IClusterStatusSubscriber {
private static final Logger LOGGER = LoggerFactory.getLogger(TopologyService.class);
private static final int SAMPLING_WINDOW_SIZE = 100;
private static final int TOPOLOGY_PROBING_RETRY_NUM = 1;

private final ExecutorService topologyThread =
IoTDBThreadPoolFactory.newSingleThreadExecutor(
Expand Down Expand Up @@ -222,7 +224,7 @@ private synchronized void topologyProbing() {
nodeLocations,
proberLocationMap);
CnToDnInternalServiceAsyncRequestManager.getInstance()
.sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, timeout);
.sendAsyncRequest(dataNodeAsyncRequestContext, TOPOLOGY_PROBING_RETRY_NUM, timeout, true);
final List<TTestConnectionResult> results = new ArrayList<>();
dataNodeAsyncRequestContext
.getResponseMap()
Expand Down Expand Up @@ -360,15 +362,18 @@ private void pushTopologyToDataNodes(
}

CnToDnInternalServiceAsyncRequestManager.getInstance()
.sendAsyncRequestWithTimeoutInMs(context, CONF.getTopologyProbingBaseIntervalInMs());
.sendAsyncRequest(
context, TOPOLOGY_PROBING_RETRY_NUM, CONF.getTopologyProbingBaseIntervalInMs(), true);

context
.getResponseMap()
.forEach(
(nodeId, resp) -> {
Set<Integer> reachableSet =
computedTopology.getOrDefault(nodeId, Collections.emptySet());
lastPushedTopology.put(nodeId, new HashSet<>(reachableSet));
if (resp.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
Set<Integer> reachableSet =
computedTopology.getOrDefault(nodeId, Collections.emptySet());
lastPushedTopology.put(nodeId, new HashSet<>(reachableSet));
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class PipeHeartbeatScheduler {
PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled();
private static final long HEARTBEAT_INTERVAL_SECONDS =
PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta();
private static final int PIPE_HEARTBEAT_RETRY_NUM = 1;

private static final ScheduledExecutorService HEARTBEAT_EXECUTOR =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
Expand Down Expand Up @@ -100,12 +101,8 @@ private synchronized void heartbeat() {
new DataNodeAsyncRequestContext<>(
CnToDnAsyncRequestType.PIPE_HEARTBEAT, request, dataNodeLocationMap);
CnToDnInternalServiceAsyncRequestManager.getInstance()
.sendAsyncRequestToNodeWithRetryAndTimeoutInMs(
clientHandler,
PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta()
* 1000L
* 2
/ 3);
.sendAsyncRequest(
clientHandler, PIPE_HEARTBEAT_RETRY_NUM, getPipeHeartbeatRequestTimeoutInMs(), true);
clientHandler
.getResponseMap()
.forEach(
Expand Down Expand Up @@ -134,6 +131,10 @@ private synchronized void heartbeat() {
}
}

private static long getPipeHeartbeatRequestTimeoutInMs() {
return TimeUnit.SECONDS.toMillis(HEARTBEAT_INTERVAL_SECONDS) * 2 / 3;
}

public synchronized void stop() {
if (IS_SEPERATED_PIPE_HEARTBEAT_ENABLED && heartbeatFuture != null) {
heartbeatFuture.cancel(false);
Expand Down
Loading
Loading