diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java index 4852b9d116e25..6d8192aa85e9d 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java @@ -417,6 +417,15 @@ public CommonConfig setWriteMemoryProportion(String writeMemoryProportion) { return this; } + @Override + public CommonConfig setAutoResizingBufferMemoryProportion( + double autoResizingBufferMemoryProportion) { + setProperty( + "auto_resizing_buffer_memory_proportion", + String.valueOf(autoResizingBufferMemoryProportion)); + return this; + } + @Override public CommonConfig setQuotaEnable(boolean quotaEnable) { setProperty("quota_enable", String.valueOf(quotaEnable)); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java index 582c9a049e492..2cf435d49ea72 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java @@ -423,6 +423,14 @@ public CommonConfig setWriteMemoryProportion(String writeMemoryProportion) { return this; } + @Override + public CommonConfig setAutoResizingBufferMemoryProportion( + double autoResizingBufferMemoryProportion) { + dnConfig.setAutoResizingBufferMemoryProportion(autoResizingBufferMemoryProportion); + cnConfig.setAutoResizingBufferMemoryProportion(autoResizingBufferMemoryProportion); + return this; + } + @Override public CommonConfig setQuotaEnable(boolean quotaEnable) { dnConfig.setQuotaEnable(quotaEnable); diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java index 48c157e957be8..f554c463b9fc4 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java @@ -299,6 +299,12 @@ public CommonConfig setWriteMemoryProportion(String writeMemoryProportion) { return this; } + @Override + public CommonConfig setAutoResizingBufferMemoryProportion( + double autoResizingBufferMemoryProportion) { + return this; + } + @Override public CommonConfig setQuotaEnable(boolean quotaEnable) { return this; diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java index dc21234e2bad2..85cc0afc1a4fc 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java @@ -134,6 +134,8 @@ CommonConfig setEnableAutoLeaderBalanceForIoTConsensus( CommonConfig setWriteMemoryProportion(String writeMemoryProportion); + CommonConfig setAutoResizingBufferMemoryProportion(double autoResizingBufferMemoryProportion); + CommonConfig setClusterTimeseriesLimitThreshold(long clusterTimeseriesLimitThreshold); CommonConfig setClusterDeviceLimitThreshold(long clusterDeviceLimitThreshold); diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBAutoResizingBufferMemoryIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBAutoResizingBufferMemoryIT.java new file mode 100644 index 0000000000000..bd1b430b5df65 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBAutoResizingBufferMemoryIT.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.it; + +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.isession.SessionConfig; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.node.AbstractNodeWrapper; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; +import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.service.rpc.thrift.IClientRPCService; +import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq; +import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq; +import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq; +import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp; +import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq; +import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp; + +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TTransport; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.File; +import java.nio.file.Files; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +@RunWith(IoTDBTestRunner.class) +@Category({LocalStandaloneIT.class}) +public class IoTDBAutoResizingBufferMemoryIT { + + private static final double AUTO_RESIZING_BUFFER_MEMORY_PROPORTION = 0.0003; + private static final String CONFIG_FILE_ENTRY = "auto_resizing_buffer_memory_proportion=3.0E-4"; + private static final int DATANODE_MAX_HEAP_SIZE_IN_MB = 256; + private static final int AUTO_RESIZING_BUFFER_COUNT_PER_CONNECTION = 2; + private static final int CONNECTION_COUNT_OVERFLOW_MARGIN = 1; + private static final int INITIAL_GROWING_REQUEST_PAYLOAD_SIZE = 16 * 1024; + private static final int MAX_GROWING_REQUEST_PAYLOAD_SIZE = + calculateNextPowerOfTwo(calculateAutoResizingBufferMemorySizeInBytes()); + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setAutoResizingBufferMemoryProportion(AUTO_RESIZING_BUFFER_MEMORY_PROPORTION); + EnvFactory.getEnv() + .getConfig() + .getDataNodeJVMConfig() + .setMaxHeapSize(DATANODE_MAX_HEAP_SIZE_IN_MB); + EnvFactory.getEnv().initClusterEnvironment(); + } + + @AfterClass + public static void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testAutoResizingBufferMemoryProportionConfigTakesEffect() throws Exception { + Assert.assertTrue( + EnvFactory.getEnv().getNodeWrapperList().stream() + .allMatch(nodeWrapper -> checkConfigFileContains(nodeWrapper, CONFIG_FILE_ENTRY))); + + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("CREATE DATABASE root.auto_resizing_buffer_memory"); + statement.execute( + "CREATE TIMESERIES root.auto_resizing_buffer_memory.d1.s1 WITH DATATYPE=INT32, ENCODING=PLAIN"); + statement.execute( + "INSERT INTO root.auto_resizing_buffer_memory.d1(time, s1) VALUES (1, 100)"); + + try (ResultSet resultSet = + statement.executeQuery("SELECT s1 FROM root.auto_resizing_buffer_memory.d1")) { + Assert.assertTrue(resultSet.next()); + Assert.assertEquals(100, resultSet.getInt("root.auto_resizing_buffer_memory.d1.s1")); + Assert.assertFalse(resultSet.next()); + } + } + } + + @Test + public void testNewConnectionsWithWritesAreRejectedWhenBufferMemoryIsExhausted() + throws Exception { + List heldConnections = new ArrayList<>(); + boolean rejected = false; + + try { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("CREATE DATABASE root.auto_resizing_buffer_reject"); + statement.execute( + "CREATE TIMESERIES root.auto_resizing_buffer_reject.d1.s1 WITH DATATYPE=INT32, ENCODING=PLAIN"); + } + + int connectionCountToExhaustBufferMemory = + calculateConnectionCountToExhaustAutoResizingBufferMemory(); + for (int i = 0; i < connectionCountToExhaustBufferMemory; i++) { + try { + Connection connection = EnvFactory.getEnv().getConnection(); + heldConnections.add(connection); + try (Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "INSERT INTO root.auto_resizing_buffer_reject.d1(time, s1) VALUES (%d, %d)", + i + 1, i)); + } + } catch (Exception e) { + rejected = true; + break; + } + } + } finally { + for (Connection connection : heldConnections) { + closeQuietly(connection); + } + } + + Assert.assertTrue( + "Expected new connections with writes to be rejected after AutoResizingBuffer memory is exhausted", + rejected); + } + + @Test + public void testGrowingRequestsAreRejectedWhenBufferMemoryIsExhausted() throws Exception { + boolean rejected = false; + + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("CREATE DATABASE root.auto_resizing_buffer_growing_request"); + statement.execute( + "CREATE TIMESERIES root.auto_resizing_buffer_growing_request.d1.s1 WITH DATATYPE=TEXT, ENCODING=PLAIN"); + } + + // do not use connection because its inevitable retry will slow down the test + try (ThriftClientContext clientContext = ThriftClientContext.open()) { + int payloadSize = INITIAL_GROWING_REQUEST_PAYLOAD_SIZE; + while (payloadSize <= MAX_GROWING_REQUEST_PAYLOAD_SIZE) { + try { + clientContext.executeStatement( + String.format( + "INSERT INTO root.auto_resizing_buffer_growing_request.d1(time, s1) VALUES (%d, '%s')", + payloadSize, repeat('a', payloadSize))); + } catch (Exception e) { + rejected = true; + clientContext.markBroken(); + break; + } + payloadSize = payloadSize << 1; + } + } + + Assert.assertTrue( + "Expected a growing request to be rejected after AutoResizingBuffer memory is exhausted", + rejected); + } + + private static void closeQuietly(AutoCloseable closeable) { + if (closeable == null) { + return; + } + try { + closeable.close(); + } catch (Exception ignored) { + // ignored + } + } + + private static int calculateConnectionCountToExhaustAutoResizingBufferMemory() { + int autoResizingBufferInitialSizePerConnection = + AUTO_RESIZING_BUFFER_COUNT_PER_CONNECTION * RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY; + return (int) + (calculateAutoResizingBufferMemorySizeInBytes() / autoResizingBufferInitialSizePerConnection + + CONNECTION_COUNT_OVERFLOW_MARGIN); + } + + private static long calculateAutoResizingBufferMemorySizeInBytes() { + return (long) + (DATANODE_MAX_HEAP_SIZE_IN_MB * 1024L * 1024L * AUTO_RESIZING_BUFFER_MEMORY_PROPORTION); + } + + private static int calculateNextPowerOfTwo(long value) { + int result = 1; + while (result < value) { + result <<= 1; + } + return result; + } + + private static String repeat(char character, int count) { + char[] chars = new char[count]; + Arrays.fill(chars, character); + return new String(chars); + } + + private static TSOpenSessionReq createOpenSessionReq() { + TSOpenSessionReq req = new TSOpenSessionReq(); + req.setUsername(SessionConfig.DEFAULT_USER); + req.setPassword(SessionConfig.DEFAULT_PASSWORD); + req.setZoneId(ZoneId.systemDefault().toString()); + req.putToConfiguration("version", IoTDBConstant.ClientVersion.V_1_0.toString()); + req.putToConfiguration("sql_dialect", "tree"); + return req; + } + + private static class ThriftClientContext implements AutoCloseable { + + private final TTransport transport; + private final IClientRPCService.Client client; + private final long sessionId; + private final long statementId; + private boolean broken; + + private ThriftClientContext( + TTransport transport, IClientRPCService.Client client, long sessionId, long statementId) { + this.transport = transport; + this.client = client; + this.sessionId = sessionId; + this.statementId = statementId; + } + + private static ThriftClientContext open() throws Exception { + DataNodeWrapper dataNode = EnvFactory.getEnv().getDataNodeWrapper(0); + TTransport transport = + DeepCopyRpcTransportFactory.INSTANCE.getTransport( + dataNode.getIp(), dataNode.getPort(), 0); + transport.open(); + IClientRPCService.Client client = + new IClientRPCService.Client(new TBinaryProtocol(transport)); + TSOpenSessionResp openSessionResp = client.openSession(createOpenSessionReq()); + RpcUtils.verifySuccess(openSessionResp.getStatus()); + long sessionId = openSessionResp.getSessionId(); + return new ThriftClientContext( + transport, client, sessionId, client.requestStatementId(sessionId)); + } + + private void executeStatement(String sql) throws Exception { + TSExecuteStatementResp resp = + client.executeStatementV2(new TSExecuteStatementReq(sessionId, sql, statementId)); + RpcUtils.verifySuccess(resp.getStatus()); + } + + private void markBroken() { + broken = true; + } + + @Override + public void close() throws Exception { + try { + if (!broken) { + client.closeOperation(new TSCloseOperationReq(sessionId).setStatementId(statementId)); + client.closeSession(new TSCloseSessionReq(sessionId)); + } + } finally { + transport.close(); + } + } + } + + private static boolean checkConfigFileContains(AbstractNodeWrapper nodeWrapper, String content) { + try { + String systemPropertiesPath = + nodeWrapper.getNodePath() + + File.separator + + "conf" + + File.separator + + CommonConfig.SYSTEM_CONFIG_NAME; + return new String(Files.readAllBytes(new File(systemPropertiesPath).toPath())) + .contains(content); + } catch (Exception ignore) { + return false; + } + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java index 19a4214bed2f1..1855dcd6bac2d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java @@ -86,6 +86,7 @@ public void setUp() throws Exception { EnvFactory.getEnv().getConfig().getCommonConfig().setTimePartitionInterval(PARTITION_INTERVAL); EnvFactory.getEnv().getConfig().getCommonConfig().setEnforceStrongPassword(false); EnvFactory.getEnv().getConfig().getCommonConfig().setPipeMemoryManagementEnabled(false); + EnvFactory.getEnv().getConfig().getCommonConfig().setAutoResizingBufferMemoryProportion(0); EnvFactory.getEnv().getConfig().getCommonConfig().setDatanodeMemoryProportion("1:10:1:1:1:1"); EnvFactory.getEnv() .getConfig() diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java index 3b3fae80902f1..5201b4d3b1e7f 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java @@ -46,6 +46,7 @@ protected void setupConfig() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setEnforceStrongPassword(false) @@ -56,6 +57,7 @@ protected void setupConfig() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setEnforceStrongPassword(false) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java index f233596fc60cd..43221feca171e 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java @@ -65,6 +65,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(false) + .setAutoResizingBufferMemoryProportion(0) .setDefaultSchemaRegionGroupNumPerDatabase(1) .setTimestampPrecision("ms") .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) @@ -77,6 +78,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(false) + .setAutoResizingBufferMemoryProportion(0) .setTimestampPrecision("ms") .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java index 3fce11bf51f6d..87559cbe1da06 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java @@ -71,6 +71,7 @@ private void innerSetUp( .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(dataRegionConsensus) @@ -83,6 +84,7 @@ private void innerSetUp( .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(configNodeConsensus) .setSchemaRegionConsensusProtocolClass(schemaRegionConsensus) .setDataRegionConsensusProtocolClass(dataRegionConsensus) @@ -167,6 +169,7 @@ public void testPipeOnBothSenderAndReceiver() throws Exception { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) @@ -179,6 +182,7 @@ public void testPipeOnBothSenderAndReceiver() throws Exception { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) @@ -361,6 +365,7 @@ private void doTestUseNodeUrls(String connectorName) throws Exception { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setPipeAirGapReceiverEnabled(true) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) @@ -377,6 +382,7 @@ private void doTestUseNodeUrls(String connectorName) throws Exception { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setPipeAirGapReceiverEnabled(true) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java index 022d71d548775..3674d75bdcbcd 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java @@ -65,6 +65,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) // Disable sender compaction for tsfile determination in loose range test @@ -80,6 +81,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDnConnectionTimeoutMs(600000) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java index 57b5846fc4bf3..b6117f5b132e6 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java @@ -63,6 +63,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) // Disable sender compaction to test mods @@ -77,6 +78,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDnConnectionTimeoutMs(600000) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoConflictIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoConflictIT.java index de0f0c460eed8..723b1cb2451ea 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoConflictIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoConflictIT.java @@ -59,6 +59,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) @@ -69,6 +70,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java index ff906d8a5afcb..cd042f6f132af 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java @@ -70,6 +70,7 @@ protected void setupConfig() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setEnforceStrongPassword(false) @@ -79,6 +80,7 @@ protected void setupConfig() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setEnforceStrongPassword(false) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java index 27297c064719a..11fe02f7eaa82 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java @@ -83,6 +83,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) @@ -98,6 +99,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setDataReplicationFactor(2) .setSchemaReplicationFactor(3) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java index 547db349e2fa7..374fd9898fb71 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java @@ -71,6 +71,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDnConnectionTimeoutMs(600000) @@ -82,6 +83,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setPipeAirGapReceiverEnabled(true) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java index a7fae02f6d162..f0f6340a034a9 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java @@ -54,6 +54,7 @@ protected void setupConfig() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setEnforceStrongPassword(false) @@ -65,6 +66,7 @@ protected void setupConfig() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setPipeMemoryManagementEnabled(false) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java index 97e183468367f..5aa14e0a39117 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java @@ -61,6 +61,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setEnforceStrongPassword(false) @@ -71,6 +72,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setPipeMemoryManagementEnabled(false) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java index dfa7957a5746d..b95f25eb1d195 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java @@ -58,6 +58,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setTimestampPrecision("ms") .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) @@ -67,6 +68,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setPipeMemoryManagementEnabled(false) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java index 8a7ac4f194792..4d45f2e8b29ef 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java @@ -71,6 +71,7 @@ private void innerSetUp( .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(dataRegionConsensus) @@ -82,6 +83,7 @@ private void innerSetUp( .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(configNodeConsensus) .setSchemaRegionConsensusProtocolClass(schemaRegionConsensus) .setDataRegionConsensusProtocolClass(dataRegionConsensus) @@ -169,6 +171,7 @@ public void testPipeOnBothSenderAndReceiver() throws Exception { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) @@ -178,6 +181,7 @@ public void testPipeOnBothSenderAndReceiver() throws Exception { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) @@ -355,6 +359,7 @@ private void doTestUseNodeUrls(String sinkName) throws Exception { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setPipeAirGapReceiverEnabled(true) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) @@ -368,6 +373,7 @@ private void doTestUseNodeUrls(String sinkName) throws Exception { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setPipeAirGapReceiverEnabled(true) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java index 72646f81b8693..718511d7ca842 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java @@ -66,6 +66,7 @@ protected void setupConfig() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(false) + .setAutoResizingBufferMemoryProportion(0) .setDataReplicationFactor(1) .setSchemaReplicationFactor(1); } diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java index d93cb5e42d461..588c21de4242a 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java @@ -67,6 +67,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) // Disable sender compaction for tsfile determination in loose range test @@ -81,6 +82,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setPipeMemoryManagementEnabled(false) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java index f1a623143aad9..9075b87cca996 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java @@ -59,6 +59,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) @@ -68,6 +69,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java index 22b6d07f44a83..55c2a4299e8fe 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java @@ -84,6 +84,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) @@ -99,6 +100,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setDataReplicationFactor(2) .setSchemaReplicationFactor(3) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java index 6596c77913267..030b8ea8487a6 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java @@ -62,6 +62,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) // Limit the schemaRegion number to 1 to guarantee the after sql executed on the same region // of the tested idempotent sql. .setDefaultSchemaRegionGroupNumPerDatabase(1) @@ -75,6 +76,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setEnforceStrongPassword(false) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java index 3900506a658b5..f100bf3c9739e 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java @@ -69,6 +69,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setPipeMemoryManagementEnabled(false) @@ -79,6 +80,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setPipeAirGapReceiverEnabled(true) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java index 26790479f22fb..945b735b19811 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java @@ -56,6 +56,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) // Disable sender compaction to test mods @@ -68,6 +69,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setPipeMemoryManagementEnabled(false) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java index 11f70d944b639..82bbe395fc8be 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java @@ -54,6 +54,7 @@ protected void setupConfig() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(false) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setPipeMemoryManagementEnabled(false) @@ -65,6 +66,7 @@ protected void setupConfig() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(false) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setPipeMemoryManagementEnabled(false) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaHistoricalIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaHistoricalIT.java index 5f936cda51615..e41d6a4f3810a 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaHistoricalIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaHistoricalIT.java @@ -57,6 +57,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(false) + .setAutoResizingBufferMemoryProportion(0) .setDefaultSchemaRegionGroupNumPerDatabase(1) .setTimestampPrecision("ms") .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) @@ -69,6 +70,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(false) + .setAutoResizingBufferMemoryProportion(0) .setTimestampPrecision("ms") .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaLeaderChangeIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaLeaderChangeIT.java index 119f7ed59c2fb..90ccd972e8918 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaLeaderChangeIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaLeaderChangeIT.java @@ -53,6 +53,7 @@ public void setUp() { senderEnv .getConfig() .getCommonConfig() + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) @@ -60,7 +61,11 @@ public void setUp() { // 10 min, assert that the operations will not time out senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000); - receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000); + receiverEnv + .getConfig() + .getCommonConfig() + .setAutoResizingBufferMemoryProportion(0) + .setDnConnectionTimeoutMs(600000); senderEnv.initClusterEnvironment(3, 3, 180); receiverEnv.initClusterEnvironment(); diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java index d43bb6564b11b..c6ea863dd55c1 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java @@ -63,6 +63,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(false) + .setAutoResizingBufferMemoryProportion(0) .setDefaultSchemaRegionGroupNumPerDatabase(1) .setTimestampPrecision("ms") .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) @@ -76,6 +77,7 @@ public void setUp() { .getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(false) + .setAutoResizingBufferMemoryProportion(0) .setTimestampPrecision("ms") .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java index 3ade13c7209de..001860971d30d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java @@ -36,6 +36,7 @@ public void setUp() { env.getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false) .setPipeAutoSplitFullEnabled(false); diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBLegacyPipeReceiverSecurityIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBLegacyPipeReceiverSecurityIT.java index 8647e776b508e..0ea62b608f132 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBLegacyPipeReceiverSecurityIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBLegacyPipeReceiverSecurityIT.java @@ -54,6 +54,7 @@ public class IoTDBLegacyPipeReceiverSecurityIT { @BeforeClass public static void setUp() { + EnvFactory.getEnv().getConfig().getCommonConfig().setAutoResizingBufferMemoryProportion(0); EnvFactory.getEnv().initClusterEnvironment(); } diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java index b1d0a4dda7371..42813a9efd009 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java @@ -74,6 +74,7 @@ public void setUp() { env.getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setPipeMemoryManagementEnabled(false) .setDataReplicationFactor(1) .setSchemaReplicationFactor(1) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipePermissionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipePermissionIT.java index c7fa9d12c4e09..1e54a43caa049 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipePermissionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipePermissionIT.java @@ -51,6 +51,7 @@ public void setUp() { env.getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(true) + .setAutoResizingBufferMemoryProportion(0) .setPipeMemoryManagementEnabled(false) .setDataReplicationFactor(1) .setSchemaReplicationFactor(1) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/AbstractPipeTripleManualIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/AbstractPipeTripleManualIT.java index f4e63e1d2f85f..cb6d0374666e1 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/AbstractPipeTripleManualIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/AbstractPipeTripleManualIT.java @@ -48,6 +48,7 @@ protected void setupConfig() { env1.getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(false) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setPipeMemoryManagementEnabled(false) @@ -58,6 +59,7 @@ protected void setupConfig() { env2.getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(false) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setPipeMemoryManagementEnabled(false) @@ -67,6 +69,7 @@ protected void setupConfig() { env3.getConfig() .getCommonConfig() .setAutoCreateSchemaEnabled(false) + .setAutoResizingBufferMemoryProportion(0) .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setPipeMemoryManagementEnabled(false) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java index b351ace5a4da3..1195da8033188 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java @@ -71,6 +71,7 @@ public void setUp() throws Exception { tmpDir = new File(Files.createTempDirectory("load").toUri()); EnvFactory.getEnv().getConfig().getCommonConfig().setTimePartitionInterval(PARTITION_INTERVAL); EnvFactory.getEnv().getConfig().getCommonConfig().setEnforceStrongPassword(false); + EnvFactory.getEnv().getConfig().getCommonConfig().setAutoResizingBufferMemoryProportion(0); EnvFactory.getEnv() .getConfig() .getDataNodeConfig() diff --git a/iotdb-client/client-go b/iotdb-client/client-go index dc64b1a7648d3..2ea2655e090dc 160000 --- a/iotdb-client/client-go +++ b/iotdb-client/client-go @@ -1 +1 @@ -Subproject commit dc64b1a7648d3c505c10eed5419f422bb49f1def +Subproject commit 2ea2655e090dcefd12bf1a789a51c8df9a28fa24 diff --git a/iotdb-client/service-rpc/src/main/i18n/en/org/apache/iotdb/rpc/i18n/RpcMessages.java b/iotdb-client/service-rpc/src/main/i18n/en/org/apache/iotdb/rpc/i18n/RpcMessages.java index 0e545164e8d40..5c89472be5b90 100644 --- a/iotdb-client/service-rpc/src/main/i18n/en/org/apache/iotdb/rpc/i18n/RpcMessages.java +++ b/iotdb-client/service-rpc/src/main/i18n/en/org/apache/iotdb/rpc/i18n/RpcMessages.java @@ -50,6 +50,12 @@ public final class RpcMessages { public static final String COULD_NOT_LOAD_KEYSTORE = "Could not load keystore or truststore file"; + // AutoResizingBuffer + public static final String AUTO_RESIZING_BUFFER_ALLOCATE_INTERRUPTED = + "AutoResizingBuffer was interrupted while allocating %d bytes"; + public static final String AUTO_RESIZING_BUFFER_ALLOCATE_FAILED = + "AutoResizingBuffer failed to allocate %d bytes after %d retries"; + // IoTDBRpcDataSet / IoTDBJDBCDataSet public static final String CLOSE_OPERATION_SERVER_ERROR = "Error occurs for close operation in server side because "; diff --git a/iotdb-client/service-rpc/src/main/i18n/zh/org/apache/iotdb/rpc/i18n/RpcMessages.java b/iotdb-client/service-rpc/src/main/i18n/zh/org/apache/iotdb/rpc/i18n/RpcMessages.java index 03faa1dbb8374..d12d4593a397c 100644 --- a/iotdb-client/service-rpc/src/main/i18n/zh/org/apache/iotdb/rpc/i18n/RpcMessages.java +++ b/iotdb-client/service-rpc/src/main/i18n/zh/org/apache/iotdb/rpc/i18n/RpcMessages.java @@ -44,6 +44,12 @@ public final class RpcMessages { // BaseRpcTransportFactory public static final String COULD_NOT_LOAD_KEYSTORE = "无法加载密钥库或信任库文件"; + // AutoResizingBuffer + public static final String AUTO_RESIZING_BUFFER_ALLOCATE_INTERRUPTED = + "AutoResizingBuffer 分配 %d 字节内存时被中断"; + public static final String AUTO_RESIZING_BUFFER_ALLOCATE_FAILED = + "AutoResizingBuffer 在 %d 次重试后仍无法分配 %d 字节内存"; + // IoTDBRpcDataSet / IoTDBJDBCDataSet public static final String CLOSE_OPERATION_SERVER_ERROR = "服务端关闭操作失败,原因:"; public static final String CLOSE_OPERATION_CONNECTION_ERROR = "连接服务端执行关闭操作时出错 "; diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBuffer.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBuffer.java index 180d05b3d9324..397a5843171a0 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBuffer.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBuffer.java @@ -19,6 +19,7 @@ package org.apache.iotdb.rpc; +import java.io.IOException; import java.util.Arrays; /** @@ -31,33 +32,38 @@ * required size < current capacity * 0.6, and such small requests last for more than 5 times, * shrink to the middle of the required size and current capacity. */ -class AutoResizingBuffer { +class AutoResizingBuffer implements AutoCloseable { private byte[] array; private int bufTooLargeCounter = RpcUtils.MAX_BUFFER_OVERSIZE_TIME; private final int initialCapacity; private long lastShrinkTime; + private int accountedCapacity; + private boolean closed; - public AutoResizingBuffer(int initialCapacity) { + public AutoResizingBuffer(int initialCapacity) throws IOException { + AutoResizingBufferMemoryManager.allocate(initialCapacity); this.array = new byte[initialCapacity]; this.initialCapacity = initialCapacity; + this.accountedCapacity = initialCapacity; } - public void resizeIfNecessary(int size) { + public void resizeIfNecessary(int size) throws IOException { + reserveCurrentCapacityIfReleased(); final int currentCapacity = this.array.length; final double loadFactor = 0.6; if (currentCapacity < size) { // Increase by a factor of 1.5x int growCapacity = currentCapacity + (currentCapacity >> 1); int newCapacity = Math.max(growCapacity, size); - this.array = Arrays.copyOf(array, newCapacity); + resize(newCapacity); bufTooLargeCounter = RpcUtils.MAX_BUFFER_OVERSIZE_TIME; } else if (size > initialCapacity && currentCapacity * loadFactor > size && bufTooLargeCounter-- <= 0 && System.currentTimeMillis() - lastShrinkTime > RpcUtils.MIN_SHRINK_INTERVAL) { // do not resize if it is reading the request size and do not shrink too often - array = Arrays.copyOf(array, size + (currentCapacity - size) / 2); + resize(size + (currentCapacity - size) / 2); bufTooLargeCounter = RpcUtils.MAX_BUFFER_OVERSIZE_TIME; lastShrinkTime = System.currentTimeMillis(); } @@ -66,4 +72,41 @@ public void resizeIfNecessary(int size) { public byte[] array() { return this.array; } + + @Override + public void close() { + if (!closed) { + AutoResizingBufferMemoryManager.release(accountedCapacity); + accountedCapacity = 0; + closed = true; + } + } + + private void resize(int newCapacity) throws IOException { + final int currentCapacity = array.length; + if (newCapacity > currentCapacity) { + final int delta = newCapacity - currentCapacity; + AutoResizingBufferMemoryManager.allocate(delta); + try { + array = Arrays.copyOf(array, newCapacity); + accountedCapacity += delta; + } catch (RuntimeException | Error e) { + AutoResizingBufferMemoryManager.release(delta); + throw e; + } + } else if (newCapacity < currentCapacity) { + array = Arrays.copyOf(array, newCapacity); + final int delta = currentCapacity - newCapacity; + AutoResizingBufferMemoryManager.release(delta); + accountedCapacity -= delta; + } + } + + private void reserveCurrentCapacityIfReleased() throws IOException { + if (closed) { + AutoResizingBufferMemoryManager.allocate(array.length); + accountedCapacity = array.length; + closed = false; + } + } } diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBufferMemoryControl.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBufferMemoryControl.java new file mode 100644 index 0000000000000..49128727a0b96 --- /dev/null +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBufferMemoryControl.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rpc; + +/** Memory accounting hook for {@link AutoResizingBuffer}. */ +public interface AutoResizingBufferMemoryControl { + + boolean allocate(long sizeInBytes); + + void release(long sizeInBytes); +} diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBufferMemoryManager.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBufferMemoryManager.java new file mode 100644 index 0000000000000..8c9fb19785b0d --- /dev/null +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBufferMemoryManager.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rpc; + +import org.apache.iotdb.rpc.i18n.RpcMessages; + +import java.io.IOException; +import java.util.Objects; + +public final class AutoResizingBufferMemoryManager { + private static final int MEMORY_ALLOCATE_MAX_RETRIES = 5; + private static final long DEFAULT_MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS = 2_000L; + + private static final AutoResizingBufferMemoryControl NO_OP_MEMORY_CONTROL = + new AutoResizingBufferMemoryControl() { + @Override + public boolean allocate(long sizeInBytes) { + return true; + } + + @Override + public void release(long sizeInBytes) { + // Do nothing. + } + }; + + private static volatile AutoResizingBufferMemoryControl memoryControl = NO_OP_MEMORY_CONTROL; + private static volatile long memoryAllocateRetryIntervalInMs = + DEFAULT_MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS; + + private AutoResizingBufferMemoryManager() { + // Utility class. + } + + public static void setMemoryControl(AutoResizingBufferMemoryControl memoryControl) { + AutoResizingBufferMemoryManager.memoryControl = Objects.requireNonNull(memoryControl); + } + + static void resetMemoryControl() { + memoryControl = NO_OP_MEMORY_CONTROL; + memoryAllocateRetryIntervalInMs = DEFAULT_MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS; + } + + static void setMemoryAllocateRetryIntervalInMs(long memoryAllocateRetryIntervalInMs) { + AutoResizingBufferMemoryManager.memoryAllocateRetryIntervalInMs = + memoryAllocateRetryIntervalInMs; + } + + static void allocate(long sizeInBytes) throws IOException { + if (sizeInBytes <= 0) { + return; + } + for (int i = 0; i < MEMORY_ALLOCATE_MAX_RETRIES; i++) { + if (memoryControl.allocate(sizeInBytes)) { + return; + } + try { + Thread.sleep(memoryAllocateRetryIntervalInMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException( + String.format(RpcMessages.AUTO_RESIZING_BUFFER_ALLOCATE_INTERRUPTED, sizeInBytes), e); + } + } + throw new IOException( + String.format( + RpcMessages.AUTO_RESIZING_BUFFER_ALLOCATE_FAILED, + sizeInBytes, + MEMORY_ALLOCATE_MAX_RETRIES)); + } + + static void release(long sizeInBytes) { + if (sizeInBytes <= 0) { + return; + } + memoryControl.release(sizeInBytes); + } +} diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java index 35833321ba37d..035f8753de1ab 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java @@ -23,18 +23,24 @@ import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; +import java.io.IOException; + public class AutoScalingBufferReadTransport extends NonOpenTransport { private final AutoResizingBuffer buf; private int pos = 0; private int limit = 0; - public AutoScalingBufferReadTransport(int initialCapacity) { + public AutoScalingBufferReadTransport(int initialCapacity) throws IOException { this.buf = new AutoResizingBuffer(initialCapacity); } public void fill(TTransport inTrans, int length) throws TTransportException { - buf.resizeIfNecessary(length); + try { + buf.resizeIfNecessary(length); + } catch (IOException e) { + throw new TTransportException(e); + } inTrans.readAll(buf.array(), 0, length); pos = 0; limit = length; @@ -89,10 +95,16 @@ public final int getBytesRemainingInBuffer() { return limit - pos; } - public void resizeIfNecessary(int size) { + public void resizeIfNecessary(int size) throws IOException { buf.resizeIfNecessary(size); } + @Override + public void close() { + super.close(); + buf.close(); + } + public void limit(int newLimit) { this.limit = newLimit; } diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java index ae833fad23962..24278067fcdff 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java @@ -22,6 +22,8 @@ import org.apache.thrift.TConfiguration; import org.apache.thrift.transport.TTransportException; +import java.io.IOException; + /** * Note that this class is mainly copied from class {@link * org.apache.thrift.transport.AutoExpandingBufferWriteTransport}. since that class does not support @@ -32,7 +34,7 @@ public class AutoScalingBufferWriteTransport extends NonOpenTransport { private final AutoResizingBuffer buf; private int pos; - public AutoScalingBufferWriteTransport(int initialCapacity) { + public AutoScalingBufferWriteTransport(int initialCapacity) throws IOException { this.buf = new AutoResizingBuffer(initialCapacity); this.pos = 0; } @@ -43,8 +45,12 @@ public int read(byte[] buf, int off, int len) { } @Override - public void write(byte[] toWrite, int off, int len) { - buf.resizeIfNecessary(pos + len); + public void write(byte[] toWrite, int off, int len) throws TTransportException { + try { + buf.resizeIfNecessary(pos + len); + } catch (IOException e) { + throw new TTransportException(e); + } System.arraycopy(toWrite, off, buf.array(), pos, len); pos += len; } @@ -57,7 +63,13 @@ public void reset() { pos = 0; } - public void resizeIfNecessary(int size) { + @Override + public void close() { + super.close(); + buf.close(); + } + + public void resizeIfNecessary(int size) throws IOException { buf.resizeIfNecessary(size); } diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java index 62abc28e47023..88c9683db970b 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java @@ -34,10 +34,27 @@ protected TCompressedElasticFramedTransport( TTransport underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize, - boolean copyBinary) { + boolean copyBinary) + throws TTransportException { super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary); - writeCompressBuffer = new AutoScalingBufferWriteTransport(thriftDefaultBufferSize); - readCompressBuffer = new AutoScalingBufferReadTransport(thriftDefaultBufferSize); + try { + writeCompressBuffer = new AutoScalingBufferWriteTransport(thriftDefaultBufferSize); + readCompressBuffer = new AutoScalingBufferReadTransport(thriftDefaultBufferSize); + } catch (IOException e) { + closeAllocatedBuffers(); + throw new TTransportException(e); + } + } + + @Override + protected void closeAllocatedBuffers() { + super.closeAllocatedBuffers(); + if (writeCompressBuffer != null) { + writeCompressBuffer.close(); + } + if (readCompressBuffer != null) { + readCompressBuffer.close(); + } } @Override @@ -80,7 +97,11 @@ public void flush() throws TTransportException { writeBuffer.reset(); if (thriftDefaultBufferSize < length) { - writeBuffer.resizeIfNecessary(thriftDefaultBufferSize); + try { + writeBuffer.resizeIfNecessary(thriftDefaultBufferSize); + } catch (IOException e) { + throw new TTransportException(e); + } } underlying.flush(); } diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java index 6a174f86b6ad8..db3d2cc7d954a 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java @@ -32,6 +32,7 @@ import javax.net.ssl.SSLHandshakeException; import java.io.EOFException; +import java.io.IOException; import java.net.SocketAddress; import java.net.SocketException; import java.net.SocketTimeoutException; @@ -74,7 +75,7 @@ public Factory(int thriftDefaultBufferSize, int thriftMaxFrameSize, boolean copy } @Override - public TTransport getTransport(TTransport trans) { + public TTransport getTransport(TTransport trans) throws TTransportException { return new TElasticFramedTransport( trans, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary); } @@ -84,13 +85,19 @@ public TElasticFramedTransport( TTransport underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize, - boolean copyBinary) { + boolean copyBinary) + throws TTransportException { this.underlying = underlying; this.thriftDefaultBufferSize = thriftDefaultBufferSize; this.thriftMaxFrameSize = thriftMaxFrameSize; this.copyBinary = copyBinary; - readBuffer = new AutoScalingBufferReadTransport(thriftDefaultBufferSize); - writeBuffer = new AutoScalingBufferWriteTransport(thriftDefaultBufferSize); + try { + readBuffer = new AutoScalingBufferReadTransport(thriftDefaultBufferSize); + writeBuffer = new AutoScalingBufferWriteTransport(thriftDefaultBufferSize); + } catch (IOException e) { + closeAllocatedBuffers(); + throw new TTransportException(e); + } } protected final int thriftDefaultBufferSize; @@ -115,7 +122,20 @@ public void open() throws TTransportException { @Override public void close() { - underlying.close(); + try { + underlying.close(); + } finally { + closeAllocatedBuffers(); + } + } + + protected void closeAllocatedBuffers() { + if (readBuffer != null) { + readBuffer.close(); + } + if (writeBuffer != null) { + writeBuffer.close(); + } } @Override @@ -263,7 +283,11 @@ public void flush() throws TTransportException { underlying.write(writeBuffer.getBuffer(), 0, length); writeBuffer.reset(); if (length > thriftDefaultBufferSize) { - writeBuffer.resizeIfNecessary(thriftDefaultBufferSize); + try { + writeBuffer.resizeIfNecessary(thriftDefaultBufferSize); + } catch (IOException e) { + throw new TTransportException(e); + } } underlying.flush(); } @@ -292,7 +316,7 @@ public void checkReadBytesAvailable(long numBytes) throws TTransportException { } @Override - public void write(byte[] buf, int off, int len) { + public void write(byte[] buf, int off, int len) throws TTransportException { writeBuffer.write(buf, off, len); } diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java index 79ebc7983e32d..fd9d4ffbba359 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java @@ -20,6 +20,7 @@ package org.apache.iotdb.rpc; import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; import org.xerial.snappy.Snappy; import java.io.IOException; @@ -41,13 +42,13 @@ public Factory(int thriftDefaultBufferSize, int thriftMaxFrameSize, boolean copy } @Override - public TTransport getTransport(TTransport trans) { + public TTransport getTransport(TTransport trans) throws TTransportException { return new TSnappyElasticFramedTransport( trans, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary); } } - public TSnappyElasticFramedTransport(TTransport underlying) { + public TSnappyElasticFramedTransport(TTransport underlying) throws TTransportException { this(underlying, RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY, RpcUtils.THRIFT_FRAME_MAX_SIZE, true); } @@ -55,7 +56,8 @@ public TSnappyElasticFramedTransport( TTransport underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize, - boolean copyBinary) { + boolean copyBinary) + throws TTransportException { super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary); } diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java index 77c2bbf6b9757..505e1a99405a8 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java @@ -21,6 +21,7 @@ import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportFactory; import java.net.SocketException; @@ -31,7 +32,8 @@ public class TimeoutChangeableTFastFramedTransport extends TElasticFramedTranspo private final TSocket underlyingSocket; public TimeoutChangeableTFastFramedTransport( - TSocket underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize, boolean copyBinary) { + TSocket underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize, boolean copyBinary) + throws TTransportException { super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary); this.underlyingSocket = underlying; } @@ -65,7 +67,7 @@ public Factory(int thriftDefaultBufferSize, int thriftMaxFrameSize, boolean copy } @Override - public TTransport getTransport(TTransport trans) { + public TTransport getTransport(TTransport trans) throws TTransportException { if (trans instanceof TSocket) { return new TimeoutChangeableTFastFramedTransport( (TSocket) trans, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary); diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java index 168f52662aae7..ecd807e38d65a 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java @@ -21,6 +21,7 @@ import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportFactory; import java.net.SocketException; @@ -31,7 +32,8 @@ public class TimeoutChangeableTSnappyFramedTransport extends TSnappyElasticFrame private final TSocket underlyingSocket; public TimeoutChangeableTSnappyFramedTransport( - TSocket underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize, boolean copyBinary) { + TSocket underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize, boolean copyBinary) + throws TTransportException { super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary); this.underlyingSocket = underlying; } @@ -65,7 +67,7 @@ public Factory(int thriftDefaultBufferSize, int thriftMaxFrameSize, boolean copy } @Override - public TTransport getTransport(TTransport trans) { + public TTransport getTransport(TTransport trans) throws TTransportException { if (trans instanceof TSocket) { return new TimeoutChangeableTSnappyFramedTransport( (TSocket) trans, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary); diff --git a/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/AutoResizingBufferTest.java b/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/AutoResizingBufferTest.java new file mode 100644 index 0000000000000..802b966939069 --- /dev/null +++ b/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/AutoResizingBufferTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rpc; + +import org.apache.thrift.transport.TMemoryBuffer; +import org.apache.thrift.transport.TTransportException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.concurrent.atomic.AtomicLong; + +public class AutoResizingBufferTest { + + private final TestMemoryControl memoryControl = new TestMemoryControl(); + + @After + public void tearDown() { + AutoResizingBufferMemoryManager.resetMemoryControl(); + } + + @Test + public void testAllocateAndReleaseMemoryWhenResizing() throws Exception { + AutoResizingBufferMemoryManager.setMemoryControl(memoryControl); + + AutoResizingBuffer buffer = new AutoResizingBuffer(100); + Assert.assertEquals(100, memoryControl.getUsedMemoryInBytes()); + + buffer.resizeIfNecessary(200); + Assert.assertEquals(200, buffer.array().length); + Assert.assertEquals(200, memoryControl.getUsedMemoryInBytes()); + + setLastShrinkTime(buffer, System.currentTimeMillis() - RpcUtils.MIN_SHRINK_INTERVAL - 1); + for (int i = 0; i <= RpcUtils.MAX_BUFFER_OVERSIZE_TIME; i++) { + buffer.resizeIfNecessary(101); + } + Assert.assertEquals(150, buffer.array().length); + Assert.assertEquals(150, memoryControl.getUsedMemoryInBytes()); + + buffer.close(); + Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes()); + } + + @Test + public void testThrowIOExceptionWhenMemoryIsInsufficient() throws Exception { + memoryControl.setLimit(120); + AutoResizingBufferMemoryManager.setMemoryControl(memoryControl); + AutoResizingBufferMemoryManager.setMemoryAllocateRetryIntervalInMs(1); + + AutoResizingBuffer buffer = new AutoResizingBuffer(100); + try { + buffer.resizeIfNecessary(200); + Assert.fail("Expected IOException"); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("100")); + Assert.assertTrue(e.getMessage().contains("5")); + } finally { + Assert.assertEquals(100, memoryControl.getUsedMemoryInBytes()); + buffer.close(); + } + } + + @Test + public void testElasticFramedTransportReleasesMemoryWhenClosed() throws Exception { + AutoResizingBufferMemoryManager.setMemoryControl(memoryControl); + + TElasticFramedTransport transport = + new TElasticFramedTransport(new TMemoryBuffer(0), 100, 1024, true); + Assert.assertEquals(200, memoryControl.getUsedMemoryInBytes()); + + transport.close(); + Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes()); + } + + @Test + public void testElasticFramedTransportReleasesMemoryWhenConstructorFails() { + memoryControl.setLimit(150); + AutoResizingBufferMemoryManager.setMemoryControl(memoryControl); + AutoResizingBufferMemoryManager.setMemoryAllocateRetryIntervalInMs(1); + + try { + new TElasticFramedTransport(new TMemoryBuffer(0), 100, 1024, true); + Assert.fail("Expected TTransportException"); + } catch (TTransportException e) { + Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes()); + } + } + + @Test + public void testSnappyElasticFramedTransportReleasesMemory() throws Exception { + AutoResizingBufferMemoryManager.setMemoryControl(memoryControl); + + TSnappyElasticFramedTransport transport = + new TSnappyElasticFramedTransport(new TMemoryBuffer(0), 100, 1024, true); + Assert.assertEquals(400, memoryControl.getUsedMemoryInBytes()); + + transport.close(); + Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes()); + } + + @Test + public void testSnappyElasticFramedTransportReleasesMemoryWhenConstructorFails() { + memoryControl.setLimit(350); + AutoResizingBufferMemoryManager.setMemoryControl(memoryControl); + AutoResizingBufferMemoryManager.setMemoryAllocateRetryIntervalInMs(1); + + try { + new TSnappyElasticFramedTransport(new TMemoryBuffer(0), 100, 1024, true); + Assert.fail("Expected TTransportException"); + } catch (TTransportException e) { + Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes()); + } + } + + private void setLastShrinkTime(AutoResizingBuffer buffer, long lastShrinkTime) throws Exception { + Field field = AutoResizingBuffer.class.getDeclaredField("lastShrinkTime"); + field.setAccessible(true); + field.set(buffer, lastShrinkTime); + } + + private static class TestMemoryControl implements AutoResizingBufferMemoryControl { + + private final AtomicLong usedMemoryInBytes = new AtomicLong(); + private long limit = Long.MAX_VALUE; + + @Override + public boolean allocate(long sizeInBytes) { + while (true) { + long current = usedMemoryInBytes.get(); + long next = current + sizeInBytes; + if (next > limit) { + return false; + } + if (usedMemoryInBytes.compareAndSet(current, next)) { + return true; + } + } + } + + @Override + public void release(long sizeInBytes) { + usedMemoryInBytes.addAndGet(-sizeInBytes); + } + + private long getUsedMemoryInBytes() { + return usedMemoryInBytes.get(); + } + + private void setLimit(long limit) { + this.limit = limit; + } + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java index 037f138286a18..1d3c5128599a3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java @@ -34,6 +34,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.exception.StartupException; +import org.apache.iotdb.commons.memory.MemoryConfig; import org.apache.iotdb.commons.service.JMXService; import org.apache.iotdb.commons.service.RegisterManager; import org.apache.iotdb.commons.service.ServiceType; @@ -142,6 +143,7 @@ protected void start() throws IoTDBException { LOGGER.info(ConfigNodeMessages.STARTING_IOTDB, IoTDBConstant.VERSION_WITH_BUILD); ConfigNodeStartupCheck checks = new ConfigNodeStartupCheck(IoTDBConstant.CN_ROLE); checks.startUpCheck(); + MemoryConfig.getInstance(); } catch (StartupException | ConfigurationException | IOException e) { LOGGER.error(ConfigNodeMessages.MEET_ERROR_WHEN_DOING_START_CHECKING, e); throw new IoTDBException(ConfigNodeMessages.ERROR_STARTING, -1); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java index 2c24ff12b6d36..7a832b53067ab 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java @@ -60,12 +60,14 @@ import java.io.File; import java.io.FileNotFoundException; +import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.file.AccessDeniedException; import java.nio.file.Files; import java.nio.file.Paths; import java.security.KeyStore; +import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -190,10 +192,23 @@ public static ConsensusGroupId fromRaftGroupIdToConsensusGroupId(RaftGroupId raf public static ByteBuffer serializeTSStatus(TSStatus status) throws TException { AutoScalingBufferWriteTransport byteBuffer = - new AutoScalingBufferWriteTransport(TEMP_BUFFER_SIZE); - TCompactProtocol protocol = new TCompactProtocol(byteBuffer); - status.write(protocol); - return ByteBuffer.wrap(byteBuffer.getBuffer()); + createAutoScalingBufferWriteTransport(TEMP_BUFFER_SIZE); + try { + TCompactProtocol protocol = new TCompactProtocol(byteBuffer); + status.write(protocol); + return ByteBuffer.wrap(Arrays.copyOf(byteBuffer.getBuffer(), byteBuffer.getPos())); + } finally { + byteBuffer.close(); + } + } + + private static AutoScalingBufferWriteTransport createAutoScalingBufferWriteTransport( + int initialCapacity) throws TException { + try { + return new AutoScalingBufferWriteTransport(initialCapacity); + } catch (IOException e) { + throw new TException(e); + } } public static TSStatus deserializeFrom(ByteBuffer buffer) throws TException { diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 531cbac91a800..cdff08d59f861 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -786,6 +786,12 @@ partition_table_recover_max_read_megabytes_per_second=10 # effectiveMode: restart datanode_memory_proportion=3:3:1:1:1:1 +# The proportion of GlobalMemoryManager used by AutoResizingBuffer. +# AutoResizingBuffer memory control is disabled when this is less than or equal to 0. +# effectiveMode: restart +# Datatype: double +auto_resizing_buffer_memory_proportion=0.05 + # Schema Memory Allocation Ratio: SchemaRegion, SchemaCache, and PartitionCache. # The parameter form is a:b:c, where a, b and c are integers. for example: 1:1:1 , 6:2:1 # effectiveMode: restart diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 6a8956e423b48..66bd4bb0adab9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -416,6 +416,8 @@ public class CommonConfig { private long seriesLimitThreshold = -1; private long deviceLimitThreshold = -1; + private double autoResizingBufferMemoryProportion = 0.05; + private boolean enableBinaryAllocator = true; private int arenaNum = 4; @@ -2651,6 +2653,21 @@ public void setDeviceLimitThreshold(long deviceLimitThreshold) { this.deviceLimitThreshold = deviceLimitThreshold; } + public double getAutoResizingBufferMemoryProportion() { + return autoResizingBufferMemoryProportion; + } + + public void setAutoResizingBufferMemoryProportion(double autoResizingBufferMemoryProportion) { + if (Double.isNaN(autoResizingBufferMemoryProportion) + || autoResizingBufferMemoryProportion > 1) { + logger.warn( + "autoResizingBufferMemoryProportion should be a valid number less than or equal to 1, but was {}", + autoResizingBufferMemoryProportion); + return; + } + this.autoResizingBufferMemoryProportion = autoResizingBufferMemoryProportion; + } + public long getStartUpNanosecond() { return startUpNanosecond; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index 5cd954a09f7b8..e893a24e6f24c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -300,6 +300,11 @@ public void loadCommonProps(TrimProperties properties) throws IOException { properties.getProperty( "cluster_device_limit_threshold", String.valueOf(config.getDeviceLimitThreshold())))); + config.setAutoResizingBufferMemoryProportion( + Double.parseDouble( + properties.getProperty( + "auto_resizing_buffer_memory_proportion", + String.valueOf(config.getAutoResizingBufferMemoryProportion())))); config.setPathLogMaxSize( Integer.parseInt( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryConfig.java index dc1ca4e54734f..52dd0d3ba1306 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryConfig.java @@ -19,12 +19,19 @@ package org.apache.iotdb.commons.memory; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.rpc.AutoResizingBufferMemoryControl; +import org.apache.iotdb.rpc.AutoResizingBufferMemoryManager; + public class MemoryConfig { + private static final String AUTO_RESIZING_BUFFER_MEMORY_MANAGER_NAME = "AutoResizingBuffer"; + private static final String AUTO_RESIZING_BUFFER_MEMORY_BLOCK_NAME = "AutoResizingBufferBlock"; + private final MemoryManager globalMemoryManager = new MemoryManager("GlobalMemoryManager", null, Runtime.getRuntime().totalMemory()); private MemoryConfig() { - // singleton + initAutoResizingBufferMemoryControl(); } public static MemoryManager global() { @@ -40,4 +47,58 @@ private static class MemoryConfigHolder { private MemoryConfigHolder() {} } + + private void initAutoResizingBufferMemoryControl() { + AutoResizingBufferMemoryManager.setMemoryControl( + new AutoResizingBufferMemoryControl() { + private IMemoryBlock autoResizingBufferMemoryBlock; + + @Override + public synchronized boolean allocate(long sizeInBytes) { + if (isAutoResizingBufferMemoryControlDisabled()) { + return true; + } + return getAutoResizingBufferMemoryBlock().allocate(sizeInBytes); + } + + @Override + public synchronized void release(long sizeInBytes) { + if (isAutoResizingBufferMemoryControlDisabled()) { + return; + } + if (autoResizingBufferMemoryBlock != null + && !autoResizingBufferMemoryBlock.isReleased()) { + autoResizingBufferMemoryBlock.release(sizeInBytes); + } + } + + private boolean isAutoResizingBufferMemoryControlDisabled() { + return CommonDescriptor.getInstance() + .getConfig() + .getAutoResizingBufferMemoryProportion() + <= 0; + } + + private IMemoryBlock getAutoResizingBufferMemoryBlock() { + if (autoResizingBufferMemoryBlock == null + || autoResizingBufferMemoryBlock.isReleased()) { + long autoResizingBufferMemorySize = + (long) + (globalMemoryManager.getTotalMemorySizeInBytes() + * CommonDescriptor.getInstance() + .getConfig() + .getAutoResizingBufferMemoryProportion()); + MemoryManager autoResizingBufferMemoryManager = + globalMemoryManager.getOrCreateMemoryManager( + AUTO_RESIZING_BUFFER_MEMORY_MANAGER_NAME, autoResizingBufferMemorySize, true); + autoResizingBufferMemoryBlock = + autoResizingBufferMemoryManager.exactAllocate( + AUTO_RESIZING_BUFFER_MEMORY_BLOCK_NAME, + autoResizingBufferMemorySize, + MemoryBlockType.DYNAMIC); + } + return autoResizingBufferMemoryBlock; + } + }); + } } diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/conf/CommonConfigTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/conf/CommonConfigTest.java new file mode 100644 index 0000000000000..e9c4576712805 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/conf/CommonConfigTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.conf; + +import org.junit.Assert; +import org.junit.Test; + +public class CommonConfigTest { + + @Test + public void testAutoResizingBufferMemoryProportionCanDisableMemoryControl() { + CommonConfig config = new CommonConfig(); + + config.setAutoResizingBufferMemoryProportion(0); + Assert.assertEquals(0, config.getAutoResizingBufferMemoryProportion(), 0); + + config.setAutoResizingBufferMemoryProportion(-1); + Assert.assertEquals(-1, config.getAutoResizingBufferMemoryProportion(), 0); + } + + @Test + public void testInvalidAutoResizingBufferMemoryProportionIsIgnored() { + CommonConfig config = new CommonConfig(); + double originalValue = config.getAutoResizingBufferMemoryProportion(); + + config.setAutoResizingBufferMemoryProportion(1.1); + Assert.assertEquals(originalValue, config.getAutoResizingBufferMemoryProportion(), 0); + + config.setAutoResizingBufferMemoryProportion(Double.NaN); + Assert.assertEquals(originalValue, config.getAutoResizingBufferMemoryProportion(), 0); + } +} diff --git a/pom.xml b/pom.xml index b06647be5382a..73c3fd93c387f 100644 --- a/pom.xml +++ b/pom.xml @@ -764,6 +764,7 @@ **/.gitmodules **/.git-blame-ignore-revs **/git.properties + AGENTS.md **/target/**