Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.sink.protocol.thrift.async;

import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.audit.UserEntity;
import org.apache.iotdb.commons.client.ThriftClient;
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
Expand Down Expand Up @@ -63,6 +64,7 @@
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;

import com.google.common.collect.ImmutableSet;
Expand All @@ -86,6 +88,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE;
Expand Down Expand Up @@ -131,6 +134,8 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
new ConcurrentHashMap<>();

private final Set<CommitterKey> droppedPipeTaskKeys = ConcurrentHashMap.newKeySet();
private final Map<String, ReceiverTemporaryUnavailableBackoff> receiverBackoffMap =
new ConcurrentHashMap<>();

private boolean enableSendTsFileLimit;
private volatile boolean isConnectionException;
Expand Down Expand Up @@ -756,6 +761,83 @@ public boolean isEnableSendTsFileLimit() {
return enableSendTsFileLimit;
}

public void waitIfReceiverTemporarilyUnavailable(final TEndPoint endPoint) {
final String endPointKey = format(endPoint);
if (Objects.isNull(endPointKey)) {
return;
}

final ReceiverTemporaryUnavailableBackoff backoff = receiverBackoffMap.get(endPointKey);
if (Objects.isNull(backoff)) {
return;
}

while (!isClosed.get()) {
final long waitTimeInMs = backoff.getRemainingWaitTimeInMs();
if (waitTimeInMs <= 0) {
return;
}

try {
Thread.sleep(waitTimeInMs);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}

public void recordReceiverStatus(final TEndPoint endPoint, final TSStatus status) {
final String endPointKey = format(endPoint);
if (Objects.isNull(endPointKey) || Objects.isNull(status)) {
return;
}

if (isReceiverTemporarilyUnavailable(status)) {
final long backoffTimeInMs =
receiverBackoffMap
.computeIfAbsent(endPointKey, key -> new ReceiverTemporaryUnavailableBackoff())
.markTemporarilyUnavailable();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Receiver {} is temporarily unavailable, throttle requests for {} ms. Status: {}",
endPointKey,
backoffTimeInMs,
status);
}
} else if (isSuccess(status)) {
final ReceiverTemporaryUnavailableBackoff backoff = receiverBackoffMap.get(endPointKey);
if (Objects.nonNull(backoff) && backoff.getRemainingWaitTimeInMs() <= 0) {
receiverBackoffMap.remove(endPointKey, backoff);
}
}
}

private static boolean isReceiverTemporarilyUnavailable(final TSStatus status) {
if (Objects.isNull(status)) {
return false;
}

final int statusCode = status.getCode();
if (statusCode == TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()
|| statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
return true;
}

return status.isSetSubStatus()
&& status.getSubStatus().stream()
.anyMatch(IoTDBDataRegionAsyncSink::isReceiverTemporarilyUnavailable);
}

private static boolean isSuccess(final TSStatus status) {
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
|| status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode();
}

private static String format(final TEndPoint endPoint) {
return Objects.isNull(endPoint) ? null : endPoint.getIp() + ":" + endPoint.getPort();
}

//////////////////////////// Operations for close ////////////////////////////

@Override
Expand Down Expand Up @@ -830,6 +912,7 @@ public synchronized void close() {
// clear reference count of events in retry queue after closing async client
clearRetryEventsReferenceCount();
droppedPipeTaskKeys.clear();
receiverBackoffMap.clear();

super.close();
}
Expand Down Expand Up @@ -931,4 +1014,37 @@ public void setBatchEventSizeHistogram(Histogram eventSizeHistogram) {
tabletBatchBuilder.setEventSizeHistogram(eventSizeHistogram);
}
}

private static class ReceiverTemporaryUnavailableBackoff {

private final long maxBackoffTimeInMs =
Math.max(0, PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalMaxMs());
private final AtomicLong currentBackoffTimeInMs =
new AtomicLong(
Math.min(
Math.max(0, PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs()),
maxBackoffTimeInMs));
private final AtomicLong nextAvailableTimeInMs = new AtomicLong(0);

private long markTemporarilyUnavailable() {
final long backoffTimeInMs = currentBackoffTimeInMs.get();
nextAvailableTimeInMs.updateAndGet(
current -> Math.max(current, System.currentTimeMillis() + backoffTimeInMs));
currentBackoffTimeInMs.updateAndGet(this::getNextBackoffTimeInMs);
return backoffTimeInMs;
}

private long getRemainingWaitTimeInMs() {
return nextAvailableTimeInMs.get() - System.currentTimeMillis();
}

private long getNextBackoffTimeInMs(final long currentBackoffTimeInMs) {
if (currentBackoffTimeInMs <= 0 || currentBackoffTimeInMs >= maxBackoffTimeInMs) {
return maxBackoffTimeInMs;
}
return currentBackoffTimeInMs >= maxBackoffTimeInMs - currentBackoffTimeInMs
? maxBackoffTimeInMs
: currentBackoffTimeInMs << 1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public PipeTransferTrackableHandler(final IoTDBDataRegionAsyncSink sink) {

@Override
public void onComplete(final TPipeTransferResp response) {
if (Objects.nonNull(client) && Objects.nonNull(response)) {
sink.recordReceiverStatus(client.getEndPoint(), response.getStatus());
}

if (sink.isClosed()) {
clearEventsReferenceCount();
sink.eliminateHandler(this, true);
Expand Down Expand Up @@ -100,28 +104,40 @@ protected boolean tryTransfer(
}
// track handler before checking if connector is closed
sink.trackHandler(this);
if (sink.isClosed()) {
clearEventsReferenceCount();
sink.eliminateHandler(this, true);
client.setShouldReturnSelf(true);
client.returnSelf(
(e) -> {
if (e instanceof IllegalStateException) {
PipeLogger.log(
ignored ->
LOGGER.info(DataNodePipeMessages.ILLEGAL_STATE_WHEN_RETURN_THE_CLIENT_TO),
"Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore.");
return true;
}
return false;
});
this.client = null;
if (returnFalseIfSinkIsClosed(client)) {
return false;
}
sink.waitIfReceiverTemporarilyUnavailable(client.getEndPoint());
if (returnFalseIfSinkIsClosed(client)) {
return false;
}
doTransfer(client, req);
return true;
}

private boolean returnFalseIfSinkIsClosed(final AsyncPipeDataTransferServiceClient client) {
if (!sink.isClosed()) {
return false;
}

clearEventsReferenceCount();
sink.eliminateHandler(this, true);
client.setShouldReturnSelf(true);
client.returnSelf(
(e) -> {
if (e instanceof IllegalStateException) {
PipeLogger.log(
ignored ->
LOGGER.info(DataNodePipeMessages.ILLEGAL_STATE_WHEN_RETURN_THE_CLIENT_TO),
"Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore.");
return true;
}
return false;
});
this.client = null;
return true;
}

/**
* @return {@code true} if all transmissions corresponding to the handler have been completed,
* {@code false} otherwise
Expand Down Expand Up @@ -190,6 +206,10 @@ public void onComplete(final TPipeTransferResp response) {
return;
}

if (Objects.nonNull(response)) {
sink.recordReceiverStatus(client.getEndPoint(), response.getStatus());
}

if (response == null) {
fallbackToWholeRequest(
client,
Expand Down Expand Up @@ -255,6 +275,10 @@ private void fallbackToWholeRequest(

try {
client.setShouldReturnSelf(shouldReturnSelf);
sink.waitIfReceiverTemporarilyUnavailable(client.getEndPoint());
if (returnFalseIfSinkIsClosed(client)) {
return;
}
client.pipeTransfer(originalReq, this);
} catch (final Exception e) {
PipeTransferTrackableHandler.this.onError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler;

import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.conf.CommonConfig;
Expand All @@ -38,6 +39,7 @@
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mockito;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -154,6 +156,36 @@ public void testLargeRequestFallsBackToWholeRequestWhenSliceTransferFails() thro
Assert.assertEquals(0, handler.errorCount);
}

@Test
public void testTransferWaitsForReceiverBackoffAndRecordsStatus() throws Exception {
final IoTDBDataRegionAsyncSink sink = Mockito.mock(IoTDBDataRegionAsyncSink.class);
final AsyncPipeDataTransferServiceClient client =
Mockito.mock(AsyncPipeDataTransferServiceClient.class);
final TEndPoint endPoint = new TEndPoint("127.0.0.1", 6667);
final TSStatus status =
new TSStatus()
.setCode(TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode());

Mockito.when(client.getEndPoint()).thenReturn(endPoint);
Mockito.doAnswer(
invocation -> {
final AsyncMethodCallback<TPipeTransferResp> callback = invocation.getArgument(1);
callback.onComplete(resp(status));
return null;
})
.when(client)
.pipeTransfer(Mockito.any(TPipeTransferReq.class), Mockito.any());

final TestPipeTransferTrackableHandler handler = new TestPipeTransferTrackableHandler(sink);

handler.transfer(client, createReq(1));

final InOrder inOrder = Mockito.inOrder(sink, client);
inOrder.verify(sink).waitIfReceiverTemporarilyUnavailable(endPoint);
inOrder.verify(client).pipeTransfer(Mockito.any(TPipeTransferReq.class), Mockito.any());
Mockito.verify(sink).recordReceiverStatus(endPoint, status);
}

private static TPipeTransferReq createReq(final int bodySize) {
final byte[] body = new byte[bodySize];
for (int i = 0; i < body.length; ++i) {
Expand All @@ -168,8 +200,12 @@ private static TPipeTransferReq createReq(final int bodySize) {
}

private static TPipeTransferResp successResp() {
return resp(new TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
}

private static TPipeTransferResp resp(final TSStatus status) {
final TPipeTransferResp resp = new TPipeTransferResp();
resp.setStatus(new TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
resp.setStatus(status);
return resp;
}

Expand Down
Loading