From b20b48710310fc28101825a2d925e19058fdad1e Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 12 Jun 2026 15:20:18 +0800 Subject: [PATCH] Throttle async pipe sink on receiver reject --- .../async/IoTDBDataRegionAsyncSink.java | 116 ++++++++++++++++++ .../handler/PipeTransferTrackableHandler.java | 56 ++++++--- .../PipeTransferTrackableHandlerTest.java | 38 +++++- 3 files changed, 193 insertions(+), 17 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index b8b169b1f6abe..a830c9994a0e2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.thrift.async; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.audit.UserEntity; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; @@ -63,6 +64,7 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import com.google.common.collect.ImmutableSet; @@ -86,6 +88,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE; @@ -131,6 +134,8 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { new ConcurrentHashMap<>(); private final Set droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); + private final Map receiverBackoffMap = + new ConcurrentHashMap<>(); private boolean enableSendTsFileLimit; private volatile boolean isConnectionException; @@ -756,6 +761,83 @@ public boolean isEnableSendTsFileLimit() { return enableSendTsFileLimit; } + public void waitIfReceiverTemporarilyUnavailable(final TEndPoint endPoint) { + final String endPointKey = format(endPoint); + if (Objects.isNull(endPointKey)) { + return; + } + + final ReceiverTemporaryUnavailableBackoff backoff = receiverBackoffMap.get(endPointKey); + if (Objects.isNull(backoff)) { + return; + } + + while (!isClosed.get()) { + final long waitTimeInMs = backoff.getRemainingWaitTimeInMs(); + if (waitTimeInMs <= 0) { + return; + } + + try { + Thread.sleep(waitTimeInMs); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + } + + public void recordReceiverStatus(final TEndPoint endPoint, final TSStatus status) { + final String endPointKey = format(endPoint); + if (Objects.isNull(endPointKey) || Objects.isNull(status)) { + return; + } + + if (isReceiverTemporarilyUnavailable(status)) { + final long backoffTimeInMs = + receiverBackoffMap + .computeIfAbsent(endPointKey, key -> new ReceiverTemporaryUnavailableBackoff()) + .markTemporarilyUnavailable(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Receiver {} is temporarily unavailable, throttle requests for {} ms. Status: {}", + endPointKey, + backoffTimeInMs, + status); + } + } else if (isSuccess(status)) { + final ReceiverTemporaryUnavailableBackoff backoff = receiverBackoffMap.get(endPointKey); + if (Objects.nonNull(backoff) && backoff.getRemainingWaitTimeInMs() <= 0) { + receiverBackoffMap.remove(endPointKey, backoff); + } + } + } + + private static boolean isReceiverTemporarilyUnavailable(final TSStatus status) { + if (Objects.isNull(status)) { + return false; + } + + final int statusCode = status.getCode(); + if (statusCode == TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode() + || statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) { + return true; + } + + return status.isSetSubStatus() + && status.getSubStatus().stream() + .anyMatch(IoTDBDataRegionAsyncSink::isReceiverTemporarilyUnavailable); + } + + private static boolean isSuccess(final TSStatus status) { + return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + || status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode(); + } + + private static String format(final TEndPoint endPoint) { + return Objects.isNull(endPoint) ? null : endPoint.getIp() + ":" + endPoint.getPort(); + } + //////////////////////////// Operations for close //////////////////////////// @Override @@ -830,6 +912,7 @@ public synchronized void close() { // clear reference count of events in retry queue after closing async client clearRetryEventsReferenceCount(); droppedPipeTaskKeys.clear(); + receiverBackoffMap.clear(); super.close(); } @@ -931,4 +1014,37 @@ public void setBatchEventSizeHistogram(Histogram eventSizeHistogram) { tabletBatchBuilder.setEventSizeHistogram(eventSizeHistogram); } } + + private static class ReceiverTemporaryUnavailableBackoff { + + private final long maxBackoffTimeInMs = + Math.max(0, PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalMaxMs()); + private final AtomicLong currentBackoffTimeInMs = + new AtomicLong( + Math.min( + Math.max(0, PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs()), + maxBackoffTimeInMs)); + private final AtomicLong nextAvailableTimeInMs = new AtomicLong(0); + + private long markTemporarilyUnavailable() { + final long backoffTimeInMs = currentBackoffTimeInMs.get(); + nextAvailableTimeInMs.updateAndGet( + current -> Math.max(current, System.currentTimeMillis() + backoffTimeInMs)); + currentBackoffTimeInMs.updateAndGet(this::getNextBackoffTimeInMs); + return backoffTimeInMs; + } + + private long getRemainingWaitTimeInMs() { + return nextAvailableTimeInMs.get() - System.currentTimeMillis(); + } + + private long getNextBackoffTimeInMs(final long currentBackoffTimeInMs) { + if (currentBackoffTimeInMs <= 0 || currentBackoffTimeInMs >= maxBackoffTimeInMs) { + return maxBackoffTimeInMs; + } + return currentBackoffTimeInMs >= maxBackoffTimeInMs - currentBackoffTimeInMs + ? maxBackoffTimeInMs + : currentBackoffTimeInMs << 1; + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java index d543e736743ff..61d31eaa7ccf1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java @@ -51,6 +51,10 @@ public PipeTransferTrackableHandler(final IoTDBDataRegionAsyncSink sink) { @Override public void onComplete(final TPipeTransferResp response) { + if (Objects.nonNull(client) && Objects.nonNull(response)) { + sink.recordReceiverStatus(client.getEndPoint(), response.getStatus()); + } + if (sink.isClosed()) { clearEventsReferenceCount(); sink.eliminateHandler(this, true); @@ -100,28 +104,40 @@ protected boolean tryTransfer( } // track handler before checking if connector is closed sink.trackHandler(this); - if (sink.isClosed()) { - clearEventsReferenceCount(); - sink.eliminateHandler(this, true); - client.setShouldReturnSelf(true); - client.returnSelf( - (e) -> { - if (e instanceof IllegalStateException) { - PipeLogger.log( - ignored -> - LOGGER.info(DataNodePipeMessages.ILLEGAL_STATE_WHEN_RETURN_THE_CLIENT_TO), - "Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore."); - return true; - } - return false; - }); - this.client = null; + if (returnFalseIfSinkIsClosed(client)) { + return false; + } + sink.waitIfReceiverTemporarilyUnavailable(client.getEndPoint()); + if (returnFalseIfSinkIsClosed(client)) { return false; } doTransfer(client, req); return true; } + private boolean returnFalseIfSinkIsClosed(final AsyncPipeDataTransferServiceClient client) { + if (!sink.isClosed()) { + return false; + } + + clearEventsReferenceCount(); + sink.eliminateHandler(this, true); + client.setShouldReturnSelf(true); + client.returnSelf( + (e) -> { + if (e instanceof IllegalStateException) { + PipeLogger.log( + ignored -> + LOGGER.info(DataNodePipeMessages.ILLEGAL_STATE_WHEN_RETURN_THE_CLIENT_TO), + "Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore."); + return true; + } + return false; + }); + this.client = null; + return true; + } + /** * @return {@code true} if all transmissions corresponding to the handler have been completed, * {@code false} otherwise @@ -190,6 +206,10 @@ public void onComplete(final TPipeTransferResp response) { return; } + if (Objects.nonNull(response)) { + sink.recordReceiverStatus(client.getEndPoint(), response.getStatus()); + } + if (response == null) { fallbackToWholeRequest( client, @@ -255,6 +275,10 @@ private void fallbackToWholeRequest( try { client.setShouldReturnSelf(shouldReturnSelf); + sink.waitIfReceiverTemporarilyUnavailable(client.getEndPoint()); + if (returnFalseIfSinkIsClosed(client)) { + return; + } client.pipeTransfer(originalReq, this); } catch (final Exception e) { PipeTransferTrackableHandler.this.onError(e); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java index 60b6923508548..8e0f780299881 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; import org.apache.iotdb.commons.conf.CommonConfig; @@ -38,6 +39,7 @@ import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; import org.mockito.Mockito; import java.nio.ByteBuffer; @@ -154,6 +156,36 @@ public void testLargeRequestFallsBackToWholeRequestWhenSliceTransferFails() thro Assert.assertEquals(0, handler.errorCount); } + @Test + public void testTransferWaitsForReceiverBackoffAndRecordsStatus() throws Exception { + final IoTDBDataRegionAsyncSink sink = Mockito.mock(IoTDBDataRegionAsyncSink.class); + final AsyncPipeDataTransferServiceClient client = + Mockito.mock(AsyncPipeDataTransferServiceClient.class); + final TEndPoint endPoint = new TEndPoint("127.0.0.1", 6667); + final TSStatus status = + new TSStatus() + .setCode(TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()); + + Mockito.when(client.getEndPoint()).thenReturn(endPoint); + Mockito.doAnswer( + invocation -> { + final AsyncMethodCallback callback = invocation.getArgument(1); + callback.onComplete(resp(status)); + return null; + }) + .when(client) + .pipeTransfer(Mockito.any(TPipeTransferReq.class), Mockito.any()); + + final TestPipeTransferTrackableHandler handler = new TestPipeTransferTrackableHandler(sink); + + handler.transfer(client, createReq(1)); + + final InOrder inOrder = Mockito.inOrder(sink, client); + inOrder.verify(sink).waitIfReceiverTemporarilyUnavailable(endPoint); + inOrder.verify(client).pipeTransfer(Mockito.any(TPipeTransferReq.class), Mockito.any()); + Mockito.verify(sink).recordReceiverStatus(endPoint, status); + } + private static TPipeTransferReq createReq(final int bodySize) { final byte[] body = new byte[bodySize]; for (int i = 0; i < body.length; ++i) { @@ -168,8 +200,12 @@ private static TPipeTransferReq createReq(final int bodySize) { } private static TPipeTransferResp successResp() { + return resp(new TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode())); + } + + private static TPipeTransferResp resp(final TSStatus status) { final TPipeTransferResp resp = new TPipeTransferResp(); - resp.setStatus(new TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode())); + resp.setStatus(status); return resp; }