Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

public class PipeTransferTabletBatchReq extends TPipeTransferReq {

private final transient List<PipeTransferTabletBinaryReq> binaryReqs = new ArrayList<>();
private final transient List<PipeTransferTabletInsertNodeReq> insertNodeReqs = new ArrayList<>();
private final transient List<PipeTransferTabletRawReq> tabletReqs = new ArrayList<>();

Expand All @@ -52,7 +53,7 @@
// Empty constructor
}

public Pair<InsertRowsStatement, InsertMultiTabletsStatement> constructStatements() {

Check failure on line 56 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 19 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ66wMvGoDZ83QuDcNzD&open=AZ66wMvGoDZ83QuDcNzD&pullRequest=17929
final InsertRowsStatement insertRowsStatement = new InsertRowsStatement();
final InsertMultiTabletsStatement insertMultiTabletsStatement =
new InsertMultiTabletsStatement();
Expand All @@ -60,6 +61,26 @@
final List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
final List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();

for (final PipeTransferTabletBinaryReq binaryReq : binaryReqs) {
final InsertBaseStatement statement = binaryReq.constructStatement();
if (statement.isEmpty()) {
continue;
}
if (statement instanceof InsertRowStatement) {

Check warning on line 69 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this instanceof check and cast with 'instanceof InsertRowStatement insertrowstatement'

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ66wMvGoDZ83QuDcNzE&open=AZ66wMvGoDZ83QuDcNzE&pullRequest=17929
insertRowStatementList.add((InsertRowStatement) statement);
} else if (statement instanceof InsertTabletStatement) {

Check warning on line 71 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this instanceof check and cast with 'instanceof InsertTabletStatement inserttabletstatement'

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ66wMvGoDZ83QuDcNzF&open=AZ66wMvGoDZ83QuDcNzF&pullRequest=17929
insertTabletStatementList.add((InsertTabletStatement) statement);
} else if (statement instanceof InsertRowsStatement) {

Check warning on line 73 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this instanceof check and cast with 'instanceof InsertRowsStatement insertrowsstatement'

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ66wMvGoDZ83QuDcNzG&open=AZ66wMvGoDZ83QuDcNzG&pullRequest=17929
insertRowStatementList.addAll(
((InsertRowsStatement) statement).getInsertRowStatementList());
} else {
throw new UnsupportedOperationException(
String.format(
"Unknown InsertBaseStatement %s constructed from PipeTransferTabletBinaryReq.",
statement));
}
}

for (final PipeTransferTabletInsertNodeReq insertNodeReq : insertNodeReqs) {
final InsertBaseStatement statement = insertNodeReq.constructStatement();
if (statement.isEmpty()) {
Expand Down Expand Up @@ -132,19 +153,52 @@
final PipeTransferTabletBatchReq batchReq = new PipeTransferTabletBatchReq();
final TabletStringInternPool tabletStringInternPool = new TabletStringInternPool();

// Binary size, for rolling upgrade
ReadWriteIOUtils.readInt(transferReq.body);
int size = ReadWriteIOUtils.readInt(transferReq.body);
// Legacy 1.3.x batch bodies may carry WAL binary requests before insert nodes and tablets.
int size = readNonNegativeSize(transferReq.body, "binary request count");
for (int i = 0; i < size; ++i) {
batchReq.insertNodeReqs.add(
PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(
(InsertNode) PlanFragment.deserializeHelper(transferReq.body, null)));
final int length = readNonNegativeSize(transferReq.body, "binary request body length");
if (length > transferReq.body.remaining()) {
throw new IllegalArgumentException(
String.format(
"Invalid binary request body length %s, remaining body length %s.",
length, transferReq.body.remaining()));
}
final byte[] body = new byte[length];
transferReq.body.get(body);
batchReq.binaryReqs.add(
PipeTransferTabletBinaryReq.toTPipeTransferReq(ByteBuffer.wrap(body)));
}

size = ReadWriteIOUtils.readInt(transferReq.body);
size = readNonNegativeSize(transferReq.body, "insert node count");
for (int i = 0; i < size; ++i) {
batchReq.tabletReqs.add(
PipeTransferTabletRawReq.toTPipeTransferRawReq(transferReq.body, tabletStringInternPool));
final int startPosition = transferReq.body.position();
try {
batchReq.insertNodeReqs.add(
PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(
(InsertNode) PlanFragment.deserializeHelper(transferReq.body, null)));
} catch (final RuntimeException e) {
throw new IllegalArgumentException(
String.format(
"Failed to deserialize insert node %s/%s in tablet batch at body position %s with remaining body length %s.",

Check warning on line 182 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 100 characters (found 125).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ66wMvGoDZ83QuDcNzH&open=AZ66wMvGoDZ83QuDcNzH&pullRequest=17929
i + 1, size, startPosition, transferReq.body.remaining()),
e);
}
}

size = readNonNegativeSize(transferReq.body, "raw tablet count");
for (int i = 0; i < size; ++i) {
final int startPosition = transferReq.body.position();
try {
batchReq.tabletReqs.add(
PipeTransferTabletRawReq.toTPipeTransferRawReq(
transferReq.body, tabletStringInternPool));
} catch (final RuntimeException e) {
throw new IllegalArgumentException(
String.format(
"Failed to deserialize raw tablet %s/%s in tablet batch at body position %s with remaining body length %s.",

Check warning on line 198 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 100 characters (found 124).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ66wMvGoDZ83QuDcNzI&open=AZ66wMvGoDZ83QuDcNzI&pullRequest=17929
i + 1, size, startPosition, transferReq.body.remaining()),
e);
}
}

batchReq.version = transferReq.version;
Expand All @@ -153,8 +207,29 @@
return batchReq;
}

private static int readNonNegativeSize(final ByteBuffer buffer, final String fieldName) {
if (buffer.remaining() < Integer.BYTES) {
throw new IllegalArgumentException(
String.format(
"Insufficient bytes to read %s in tablet batch, remaining body length %s.",
fieldName, buffer.remaining()));
}

final int size = ReadWriteIOUtils.readInt(buffer);
if (size < 0) {
throw new IllegalArgumentException(
String.format("Invalid negative %s %s in tablet batch.", fieldName, size));
}
return size;
}

/////////////////////////////// TestOnly ///////////////////////////////

@TestOnly
public List<PipeTransferTabletBinaryReq> getBinaryReqs() {
return binaryReqs;
}

@TestOnly
public List<PipeTransferTabletInsertNodeReq> getInsertNodeReqs() {
return insertNodeReqs;
Expand All @@ -176,7 +251,8 @@
return false;
}
final PipeTransferTabletBatchReq that = (PipeTransferTabletBatchReq) obj;
return insertNodeReqs.equals(that.insertNodeReqs)
return binaryReqs.equals(that.binaryReqs)
&& insertNodeReqs.equals(that.insertNodeReqs)
&& tabletReqs.equals(that.tabletReqs)
&& version == that.version
&& type == that.type
Expand All @@ -185,6 +261,6 @@

@Override
public int hashCode() {
return Objects.hash(insertNodeReqs, tabletReqs, version, type, body);
return Objects.hash(binaryReqs, insertNodeReqs, tabletReqs, version, type, body);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,17 @@
buffer.position(startPosition);
}

tablet = PipeTabletUtils.internTablet(Tablet.deserialize(buffer), tabletStringInternPool);
isAligned = ReadWriteIOUtils.readBool(buffer);
try {
tablet = PipeTabletUtils.internTablet(Tablet.deserialize(buffer), tabletStringInternPool);
isAligned = ReadWriteIOUtils.readBool(buffer);
} catch (final RuntimeException e) {
buffer.position(startPosition);
throw new IllegalArgumentException(
String.format(
"Failed to deserialize raw tablet request at body position %s with remaining body length %s.",

Check warning on line 198 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 100 characters (found 108).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ66wM0QoDZ83QuDcNzJ&open=AZ66wM0QoDZ83QuDcNzJ&pullRequest=17929
startPosition, buffer.remaining()),
e);
}
}

private static void ensureStatementDeserializedFromCurrentTabletFormat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.BytesUtils;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.UnSupportedDataTypeException;
Expand Down Expand Up @@ -101,7 +100,7 @@
return deserializeStatementFromTabletFormat(byteBuffer, false, tabletStringInternPool, false);
}

private static InsertTabletStatement deserializeStatementFromTabletFormat(

Check warning on line 103 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 157 to 64, Complexity from 26 to 14, Nesting Level from 4 to 2, Number of Variables from 34 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ66wM0moDZ83QuDcNzK&open=AZ66wM0moDZ83QuDcNzK&pullRequest=17929
final ByteBuffer byteBuffer,
final boolean readDatabaseName,
final TabletStringInternPool tabletStringInternPool,
Expand All @@ -110,18 +109,25 @@
final InsertTabletStatement statement = new InsertTabletStatement();

// Calculate memory size during deserialization, use INSTANCE_SIZE constant
long memorySize = InsertTabletStatement.getInstanceSize();

Check warning on line 112 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Distance between variable 'memorySize' declaration and its first usage is 6, but allowed 3. Consider making that variable final if you still need to store its value in advance (before method calls that might have side effects on the original value).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ66wM0moDZ83QuDcNzM&open=AZ66wM0moDZ83QuDcNzM&pullRequest=17929

final String insertTargetName =
intern(ReadWriteIOUtils.readString(byteBuffer), tabletStringInternPool);

final int rowSize = ReadWriteIOUtils.readInt(byteBuffer);
if (rowSize < 0) {
throw new IllegalArgumentException(
String.format("Invalid row size %s in tablet format deserialization.", rowSize));
}

// deserialize schemas
final int schemaSize =
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer))
? ReadWriteIOUtils.readInt(byteBuffer)
: 0;
readBooleanByte(byteBuffer, "schema existence") ? ReadWriteIOUtils.readInt(byteBuffer) : 0;
if (schemaSize < 0) {
throw new IllegalArgumentException(
String.format("Invalid schema size %s in tablet format deserialization.", schemaSize));
}
ensureRemaining(byteBuffer, schemaSize, "measurement schema existence flags");
final String[] measurement = new String[schemaSize];
final TsTableColumnCategory[] columnCategories = new TsTableColumnCategory[schemaSize];
final TSDataType[] dataTypes = new TSDataType[schemaSize];
Expand All @@ -148,15 +154,26 @@

// Deserialize and calculate memory in the same loop
for (int i = 0; i < schemaSize; i++) {
final boolean hasSchema = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
final boolean hasSchema = readBooleanByte(byteBuffer, "measurement schema existence");
if (hasSchema) {
final Pair<String, TSDataType> pair = readMeasurement(byteBuffer, tabletStringInternPool);
measurement[i] = pair.getLeft();
dataTypes[i] = pair.getRight();
if (readColumnCategory) {
if (!byteBuffer.hasRemaining()) {
throw new IllegalArgumentException(
"Missing column category in current tablet format deserialization.");
}
final byte columnCategory = byteBuffer.get();
if (columnCategory < 0 || columnCategory >= ColumnCategory.values().length) {
throw new IllegalArgumentException(
String.format(
"Invalid column category %s in current tablet format deserialization.",
columnCategory));
}
columnCategories[i] =
TsTableColumnCategory.fromTsFileColumnCategory(
ColumnCategory.values()[byteBuffer.get()]);
ColumnCategory.values()[columnCategory]);
}

// Calculate memory for each measurement string
Expand All @@ -178,14 +195,22 @@
memorySize += measurementMemorySize;
memorySize += dataTypesMemorySize;

final boolean isTimesNotNull = readBooleanByte(byteBuffer, "timestamp column existence");
if (rowSize > 0 && !isTimesNotNull) {
throw new IllegalArgumentException(
"Missing timestamps in tablet format deserialization with non-empty rows.");
}
if (isTimesNotNull) {
ensureRemaining(byteBuffer, (long) Long.BYTES * rowSize, "timestamps");
}

// deserialize times and calculate memory during deserialization
final long[] times = new long[rowSize];
// Calculate memory: array header + long size * rowSize
final long timesMemorySize =
org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
NUM_BYTES_ARRAY_HEADER + (long) Long.BYTES * rowSize);

final boolean isTimesNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
if (isTimesNotNull) {
for (int i = 0; i < rowSize; i++) {
times[i] = ReadWriteIOUtils.readLong(byteBuffer);
Expand All @@ -199,7 +224,7 @@
final BitMap[] bitMaps;
final long bitMapsMemorySize;

final boolean isBitMapsNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
final boolean isBitMapsNotNull = readBooleanByte(byteBuffer, "bitmap column existence");
if (isBitMapsNotNull) {
// Use the method that returns both BitMap array and memory size
final Pair<BitMap[], Long> bitMapsAndMemory =
Expand All @@ -218,7 +243,11 @@
final Object[] values;
final long valuesMemorySize;

final boolean isValuesNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
final boolean isValuesNotNull = readBooleanByte(byteBuffer, "value column existence");
if (rowSize > 0 && schemaSize > 0 && !isValuesNotNull) {
throw new IllegalArgumentException(
"Missing values in tablet format deserialization with non-empty rows.");
}
if (isValuesNotNull) {
// Use the method that returns both values array and memory size
final Pair<Object[], Long> valuesAndMemory =
Expand All @@ -236,7 +265,7 @@
// Add values memory to total
memorySize += valuesMemorySize;

final boolean isAligned = ReadWriteIOUtils.readBoolean(byteBuffer);
final boolean isAligned = readBooleanByte(byteBuffer, "alignment");

statement.setMeasurements(measurement);
statement.setTimes(times);
Expand Down Expand Up @@ -321,6 +350,30 @@
}
}

private static boolean readBooleanByte(final ByteBuffer buffer, final String fieldName) {
if (!buffer.hasRemaining()) {
throw new IllegalArgumentException(
String.format("Missing %s flag in tablet format deserialization.", fieldName));
}

final byte value = ReadWriteIOUtils.readByte(buffer);
if (value != 0 && value != 1) {
throw new IllegalArgumentException(
String.format("Invalid %s flag %s in tablet format deserialization.", fieldName, value));
}
return value == 1;
}

private static void ensureRemaining(
final ByteBuffer buffer, final long expectedSize, final String fieldName) {
if (expectedSize > buffer.remaining()) {
throw new IllegalArgumentException(
String.format(
"Insufficient bytes for %s in tablet format deserialization, expected %s, remaining %s.",

Check warning on line 372 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 100 characters (found 103).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ66wM0moDZ83QuDcNzN&open=AZ66wM0moDZ83QuDcNzN&pullRequest=17929
fieldName, expectedSize, buffer.remaining()));
}
}

/**
* Read measurement name and data type from buffer, skipping other measurement schema fields
* (encoding, compression, and tags/attributes) that are not needed for InsertTabletStatement.
Expand Down Expand Up @@ -364,9 +417,13 @@
boolean hasMarkedBitMap = false;

for (int i = 0; i < columns; i++) {
final boolean hasBitMap = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
final boolean hasBitMap = readBooleanByte(byteBuffer, "bitmap existence");
if (hasBitMap) {
final int size = ReadWriteIOUtils.readInt(byteBuffer);
if (size < 0) {
throw new IllegalArgumentException(

Check warning on line 424 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Expected @throws tag for 'IllegalArgumentException'.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ66wM0moDZ83QuDcNzO&open=AZ66wM0moDZ83QuDcNzO&pullRequest=17929
String.format("Invalid bitmap size %s in tablet format deserialization.", size));
}
final Binary valueBinary = ReadWriteIOUtils.readBinary(byteBuffer);
final byte[] byteArray = valueBinary.getValues();
final BitMap bitMap = new BitMap(size, byteArray);
Expand Down Expand Up @@ -406,7 +463,7 @@
* @return Pair of values array and memory size
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
private static Pair<Object[], Long> readValuesFromBufferWithMemory(

Check warning on line 466 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 106 to 64, Complexity from 27 to 14, Nesting Level from 5 to 2, Number of Variables from 22 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ66wM0moDZ83QuDcNzL&open=AZ66wM0moDZ83QuDcNzL&pullRequest=17929
final ByteBuffer byteBuffer, final TSDataType[] types, final int columns, final int rowSize) {
final Object[] values = new Object[columns];

Expand All @@ -416,8 +473,7 @@
NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns);

for (int i = 0; i < columns; i++) {
final boolean isValueColumnsNotNull =
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
final boolean isValueColumnsNotNull = readBooleanByte(byteBuffer, "value column existence");
if (types[i] == null) {
continue;
}
Expand All @@ -427,7 +483,7 @@
final boolean[] boolValues = new boolean[rowSize];
if (isValueColumnsNotNull) {
for (int index = 0; index < rowSize; index++) {
boolValues[index] = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
boolValues[index] = readBooleanByte(byteBuffer, "boolean value");
}
}
values[i] = boolValues;
Expand Down Expand Up @@ -503,8 +559,7 @@

if (isValueColumnsNotNull) {
for (int index = 0; index < rowSize; index++) {
final boolean isNotNull =
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
final boolean isNotNull = readBooleanByte(byteBuffer, "binary value existence");
if (isNotNull) {
binaryValues[index] = ReadWriteIOUtils.readBinary(byteBuffer);
// Calculate memory for each Binary object during deserialization
Expand Down
Loading
Loading