From 7598fc41b366ff1256c6dcbc54309e108f9194ad Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 12 Jun 2026 10:49:04 +0800 Subject: [PATCH 1/4] Fix 1.3.7 binaryBuffers --- .../request/PipeTransferTabletBatchReq.java | 49 ++++++++++++++++-- .../sink/PipeDataNodeThriftRequestTest.java | 51 ++++++++++++++++++- 2 files changed, 95 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java index ede3370f5b0d..797ea509602f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java @@ -45,6 +45,7 @@ public class PipeTransferTabletBatchReq extends TPipeTransferReq { + private final transient List binaryReqs = new ArrayList<>(); private final transient List insertNodeReqs = new ArrayList<>(); private final transient List tabletReqs = new ArrayList<>(); @@ -60,6 +61,26 @@ public Pair constructStatement final List insertRowStatementList = new ArrayList<>(); final List insertTabletStatementList = new ArrayList<>(); + for (final PipeTransferTabletBinaryReq binaryReq : binaryReqs) { + final InsertBaseStatement statement = binaryReq.constructStatement(); + if (statement.isEmpty()) { + continue; + } + if (statement instanceof InsertRowStatement) { + insertRowStatementList.add((InsertRowStatement) statement); + } else if (statement instanceof InsertTabletStatement) { + insertTabletStatementList.add((InsertTabletStatement) statement); + } else if (statement instanceof InsertRowsStatement) { + insertRowStatementList.addAll( + ((InsertRowsStatement) statement).getInsertRowStatementList()); + } else { + throw new UnsupportedOperationException( + String.format( + "Unknown InsertBaseStatement %s constructed from PipeTransferTabletBinaryReq.", + statement)); + } + } + for (final PipeTransferTabletInsertNodeReq insertNodeReq : insertNodeReqs) { final InsertBaseStatement statement = insertNodeReq.constructStatement(); if (statement.isEmpty()) { @@ -132,9 +153,23 @@ public static PipeTransferTabletBatchReq fromTPipeTransferReq( final PipeTransferTabletBatchReq batchReq = new PipeTransferTabletBatchReq(); final TabletStringInternPool tabletStringInternPool = new TabletStringInternPool(); - // Binary size, for rolling upgrade - ReadWriteIOUtils.readInt(transferReq.body); + // Legacy 1.3.x batch bodies may carry WAL binary requests before insert nodes and tablets. int size = ReadWriteIOUtils.readInt(transferReq.body); + for (int i = 0; i < size; ++i) { + final int length = ReadWriteIOUtils.readInt(transferReq.body); + if (length < 0 || length > transferReq.body.remaining()) { + throw new IllegalArgumentException( + String.format( + "Invalid binary request body length %s, remaining body length %s.", + length, transferReq.body.remaining())); + } + final byte[] body = new byte[length]; + transferReq.body.get(body); + batchReq.binaryReqs.add( + PipeTransferTabletBinaryReq.toTPipeTransferReq(ByteBuffer.wrap(body))); + } + + size = ReadWriteIOUtils.readInt(transferReq.body); for (int i = 0; i < size; ++i) { batchReq.insertNodeReqs.add( PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq( @@ -155,6 +190,11 @@ public static PipeTransferTabletBatchReq fromTPipeTransferReq( /////////////////////////////// TestOnly /////////////////////////////// + @TestOnly + public List getBinaryReqs() { + return binaryReqs; + } + @TestOnly public List getInsertNodeReqs() { return insertNodeReqs; @@ -176,7 +216,8 @@ public boolean equals(final Object obj) { return false; } final PipeTransferTabletBatchReq that = (PipeTransferTabletBatchReq) obj; - return insertNodeReqs.equals(that.insertNodeReqs) + return binaryReqs.equals(that.binaryReqs) + && insertNodeReqs.equals(that.insertNodeReqs) && tabletReqs.equals(that.tabletReqs) && version == that.version && type == that.type @@ -185,6 +226,6 @@ public boolean equals(final Object obj) { @Override public int hashCode() { - return Objects.hash(insertNodeReqs, tabletReqs, version, type, body); + return Objects.hash(binaryReqs, insertNodeReqs, tabletReqs, version, type, body); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java index 88f7353f5192..5e7c6f279bf2 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java @@ -702,6 +702,43 @@ public void testPipeTransferTabletBatchReqFromLegacyV13Body() throws IOException assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(0).constructStatement()); } + @Test + public void testPipeTransferTabletBatchReqFromLegacyV13BodyWithBinaryReqs() throws IOException { + final InsertRowNode node = + new InsertRowNode( + new PlanNodeId(""), + new PartialPath(new String[] {"root", "sg", "d"}), + false, + new String[] {"s"}, + new TSDataType[] {TSDataType.INT32}, + 1, + new Object[] {1}, + false); + final ByteBuffer binaryBuffer = ByteBuffer.wrap(new byte[] {'a', 'b'}); + + final TPipeTransferReq req = + legacyTransferReq( + PipeRequestType.TRANSFER_TABLET_BATCH, + serializeLegacyTabletBatchBody( + Collections.singletonList(binaryBuffer), + Collections.singletonList(node.serializeToByteBuffer()), + Collections.singletonList(serializeLegacyTabletRawBuffer(false)))); + + final PipeTransferTabletBatchReq deserializedReq = + PipeTransferTabletBatchReq.fromTPipeTransferReq(req); + + Assert.assertEquals(req.getVersion(), deserializedReq.getVersion()); + Assert.assertEquals(req.getType(), deserializedReq.getType()); + Assert.assertEquals(1, deserializedReq.getBinaryReqs().size()); + Assert.assertArrayEquals( + new byte[] {'a', 'b'}, + byteBufferToByteArray(deserializedReq.getBinaryReqs().get(0).getByteBuffer())); + Assert.assertEquals(1, deserializedReq.getInsertNodeReqs().size()); + Assert.assertEquals(1, deserializedReq.getTabletReqs().size()); + Assert.assertEquals(node, deserializedReq.getInsertNodeReqs().get(0).getInsertNode()); + assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(0).constructStatement()); + } + @Test public void testPipeTransferTabletRawReqWithLegacyTabletFormat() throws IOException { final TPipeTransferReq req = new TPipeTransferReq(); @@ -1271,9 +1308,21 @@ private static ByteBuffer serializeLegacyTabletRawBuffer(final boolean isAligned private static ByteBuffer serializeLegacyTabletBatchBody( final List insertNodeBuffers, final List tabletBuffers) throws IOException { + return serializeLegacyTabletBatchBody(Collections.emptyList(), insertNodeBuffers, tabletBuffers); + } + + private static ByteBuffer serializeLegacyTabletBatchBody( + final List binaryBuffers, + final List insertNodeBuffers, + final List tabletBuffers) + throws IOException { try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { - ReadWriteIOUtils.write(0, outputStream); + ReadWriteIOUtils.write(binaryBuffers.size(), outputStream); + for (final ByteBuffer binaryBuffer : binaryBuffers) { + ReadWriteIOUtils.write(binaryBuffer.limit(), outputStream); + writeByteBuffer(outputStream, binaryBuffer); + } ReadWriteIOUtils.write(insertNodeBuffers.size(), outputStream); for (final ByteBuffer insertNodeBuffer : insertNodeBuffers) { From 4045b27f5e734b55397be0d521c9a9868f63d031 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 12 Jun 2026 11:07:24 +0800 Subject: [PATCH 2/4] Update PipeDataNodeThriftRequestTest.java --- .../iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java index 5e7c6f279bf2..69c0c117d5e1 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java @@ -1308,7 +1308,8 @@ private static ByteBuffer serializeLegacyTabletRawBuffer(final boolean isAligned private static ByteBuffer serializeLegacyTabletBatchBody( final List insertNodeBuffers, final List tabletBuffers) throws IOException { - return serializeLegacyTabletBatchBody(Collections.emptyList(), insertNodeBuffers, tabletBuffers); + return serializeLegacyTabletBatchBody( + Collections.emptyList(), insertNodeBuffers, tabletBuffers); } private static ByteBuffer serializeLegacyTabletBatchBody( From cec0069e9cdb46f789107776af68e896d00dc9a5 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 12 Jun 2026 14:19:52 +0800 Subject: [PATCH 3/4] Hotfix --- .../request/PipeTransferTabletBatchReq.java | 55 +++++++++--- .../request/PipeTransferTabletRawReq.java | 13 ++- .../sink/util/TabletStatementConverter.java | 87 +++++++++++++++---- .../sink/PipeDataNodeThriftRequestTest.java | 77 +++++++++++++++- 4 files changed, 203 insertions(+), 29 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java index 797ea509602f..340232c8b623 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java @@ -154,10 +154,10 @@ public static PipeTransferTabletBatchReq fromTPipeTransferReq( final TabletStringInternPool tabletStringInternPool = new TabletStringInternPool(); // Legacy 1.3.x batch bodies may carry WAL binary requests before insert nodes and tablets. - int size = ReadWriteIOUtils.readInt(transferReq.body); + int size = readNonNegativeSize(transferReq.body, "binary request count"); for (int i = 0; i < size; ++i) { - final int length = ReadWriteIOUtils.readInt(transferReq.body); - if (length < 0 || length > transferReq.body.remaining()) { + final int length = readNonNegativeSize(transferReq.body, "binary request body length"); + if (length > transferReq.body.remaining()) { throw new IllegalArgumentException( String.format( "Invalid binary request body length %s, remaining body length %s.", @@ -169,17 +169,36 @@ public static PipeTransferTabletBatchReq fromTPipeTransferReq( PipeTransferTabletBinaryReq.toTPipeTransferReq(ByteBuffer.wrap(body))); } - size = ReadWriteIOUtils.readInt(transferReq.body); + size = readNonNegativeSize(transferReq.body, "insert node count"); for (int i = 0; i < size; ++i) { - batchReq.insertNodeReqs.add( - PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq( - (InsertNode) PlanFragment.deserializeHelper(transferReq.body, null))); + final int startPosition = transferReq.body.position(); + try { + batchReq.insertNodeReqs.add( + PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq( + (InsertNode) PlanFragment.deserializeHelper(transferReq.body, null))); + } catch (final RuntimeException e) { + throw new IllegalArgumentException( + String.format( + "Failed to deserialize insert node %s/%s in tablet batch at body position %s with remaining body length %s.", + i + 1, size, startPosition, transferReq.body.remaining()), + e); + } } - size = ReadWriteIOUtils.readInt(transferReq.body); + size = readNonNegativeSize(transferReq.body, "raw tablet count"); for (int i = 0; i < size; ++i) { - batchReq.tabletReqs.add( - PipeTransferTabletRawReq.toTPipeTransferRawReq(transferReq.body, tabletStringInternPool)); + final int startPosition = transferReq.body.position(); + try { + batchReq.tabletReqs.add( + PipeTransferTabletRawReq.toTPipeTransferRawReq( + transferReq.body, tabletStringInternPool)); + } catch (final RuntimeException e) { + throw new IllegalArgumentException( + String.format( + "Failed to deserialize raw tablet %s/%s in tablet batch at body position %s with remaining body length %s.", + i + 1, size, startPosition, transferReq.body.remaining()), + e); + } } batchReq.version = transferReq.version; @@ -188,6 +207,22 @@ public static PipeTransferTabletBatchReq fromTPipeTransferReq( return batchReq; } + private static int readNonNegativeSize(final ByteBuffer buffer, final String fieldName) { + if (buffer.remaining() < Integer.BYTES) { + throw new IllegalArgumentException( + String.format( + "Insufficient bytes to read %s in tablet batch, remaining body length %s.", + fieldName, buffer.remaining())); + } + + final int size = ReadWriteIOUtils.readInt(buffer); + if (size < 0) { + throw new IllegalArgumentException( + String.format("Invalid negative %s %s in tablet batch.", fieldName, size)); + } + return size; + } + /////////////////////////////// TestOnly /////////////////////////////// @TestOnly diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java index 63db24a37ba7..e80b10c95e2b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java @@ -188,8 +188,17 @@ private void deserializeTPipeTransferRawReq( buffer.position(startPosition); } - tablet = PipeTabletUtils.internTablet(Tablet.deserialize(buffer), tabletStringInternPool); - isAligned = ReadWriteIOUtils.readBool(buffer); + try { + tablet = PipeTabletUtils.internTablet(Tablet.deserialize(buffer), tabletStringInternPool); + isAligned = ReadWriteIOUtils.readBool(buffer); + } catch (final RuntimeException e) { + buffer.position(startPosition); + throw new IllegalArgumentException( + String.format( + "Failed to deserialize raw tablet request at body position %s with remaining body length %s.", + startPosition, buffer.remaining()), + e); + } } private static void ensureStatementDeserializedFromCurrentTabletFormat( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java index 405859dd9828..27032f4cfe30 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java @@ -32,7 +32,6 @@ import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BitMap; -import org.apache.tsfile.utils.BytesUtils; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.apache.tsfile.write.UnSupportedDataTypeException; @@ -116,12 +115,19 @@ private static InsertTabletStatement deserializeStatementFromTabletFormat( intern(ReadWriteIOUtils.readString(byteBuffer), tabletStringInternPool); final int rowSize = ReadWriteIOUtils.readInt(byteBuffer); + if (rowSize < 0) { + throw new IllegalArgumentException( + String.format("Invalid row size %s in tablet format deserialization.", rowSize)); + } // deserialize schemas final int schemaSize = - BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)) - ? ReadWriteIOUtils.readInt(byteBuffer) - : 0; + readBooleanByte(byteBuffer, "schema existence") ? ReadWriteIOUtils.readInt(byteBuffer) : 0; + if (schemaSize < 0) { + throw new IllegalArgumentException( + String.format("Invalid schema size %s in tablet format deserialization.", schemaSize)); + } + ensureRemaining(byteBuffer, schemaSize, "measurement schema existence flags"); final String[] measurement = new String[schemaSize]; final TsTableColumnCategory[] columnCategories = new TsTableColumnCategory[schemaSize]; final TSDataType[] dataTypes = new TSDataType[schemaSize]; @@ -148,15 +154,26 @@ private static InsertTabletStatement deserializeStatementFromTabletFormat( // Deserialize and calculate memory in the same loop for (int i = 0; i < schemaSize; i++) { - final boolean hasSchema = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + final boolean hasSchema = readBooleanByte(byteBuffer, "measurement schema existence"); if (hasSchema) { final Pair pair = readMeasurement(byteBuffer, tabletStringInternPool); measurement[i] = pair.getLeft(); dataTypes[i] = pair.getRight(); if (readColumnCategory) { + if (!byteBuffer.hasRemaining()) { + throw new IllegalArgumentException( + "Missing column category in current tablet format deserialization."); + } + final byte columnCategory = byteBuffer.get(); + if (columnCategory < 0 || columnCategory >= ColumnCategory.values().length) { + throw new IllegalArgumentException( + String.format( + "Invalid column category %s in current tablet format deserialization.", + columnCategory)); + } columnCategories[i] = TsTableColumnCategory.fromTsFileColumnCategory( - ColumnCategory.values()[byteBuffer.get()]); + ColumnCategory.values()[columnCategory]); } // Calculate memory for each measurement string @@ -178,6 +195,15 @@ private static InsertTabletStatement deserializeStatementFromTabletFormat( memorySize += measurementMemorySize; memorySize += dataTypesMemorySize; + final boolean isTimesNotNull = readBooleanByte(byteBuffer, "timestamp column existence"); + if (rowSize > 0 && !isTimesNotNull) { + throw new IllegalArgumentException( + "Missing timestamps in tablet format deserialization with non-empty rows."); + } + if (isTimesNotNull) { + ensureRemaining(byteBuffer, (long) Long.BYTES * rowSize, "timestamps"); + } + // deserialize times and calculate memory during deserialization final long[] times = new long[rowSize]; // Calculate memory: array header + long size * rowSize @@ -185,7 +211,6 @@ private static InsertTabletStatement deserializeStatementFromTabletFormat( org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize( NUM_BYTES_ARRAY_HEADER + (long) Long.BYTES * rowSize); - final boolean isTimesNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); if (isTimesNotNull) { for (int i = 0; i < rowSize; i++) { times[i] = ReadWriteIOUtils.readLong(byteBuffer); @@ -199,7 +224,7 @@ private static InsertTabletStatement deserializeStatementFromTabletFormat( final BitMap[] bitMaps; final long bitMapsMemorySize; - final boolean isBitMapsNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + final boolean isBitMapsNotNull = readBooleanByte(byteBuffer, "bitmap column existence"); if (isBitMapsNotNull) { // Use the method that returns both BitMap array and memory size final Pair bitMapsAndMemory = @@ -218,7 +243,11 @@ private static InsertTabletStatement deserializeStatementFromTabletFormat( final Object[] values; final long valuesMemorySize; - final boolean isValuesNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + final boolean isValuesNotNull = readBooleanByte(byteBuffer, "value column existence"); + if (rowSize > 0 && schemaSize > 0 && !isValuesNotNull) { + throw new IllegalArgumentException( + "Missing values in tablet format deserialization with non-empty rows."); + } if (isValuesNotNull) { // Use the method that returns both values array and memory size final Pair valuesAndMemory = @@ -236,7 +265,7 @@ private static InsertTabletStatement deserializeStatementFromTabletFormat( // Add values memory to total memorySize += valuesMemorySize; - final boolean isAligned = ReadWriteIOUtils.readBoolean(byteBuffer); + final boolean isAligned = readBooleanByte(byteBuffer, "alignment"); statement.setMeasurements(measurement); statement.setTimes(times); @@ -321,6 +350,31 @@ private static void skipString(final ByteBuffer buffer) { } } + private static boolean readBooleanByte(final ByteBuffer buffer, final String fieldName) { + if (!buffer.hasRemaining()) { + throw new IllegalArgumentException( + String.format("Missing %s flag in tablet format deserialization.", fieldName)); + } + + final byte value = ReadWriteIOUtils.readByte(buffer); + if (value != 0 && value != 1) { + throw new IllegalArgumentException( + String.format( + "Invalid %s flag %s in tablet format deserialization.", fieldName, value)); + } + return value == 1; + } + + private static void ensureRemaining( + final ByteBuffer buffer, final long expectedSize, final String fieldName) { + if (expectedSize > buffer.remaining()) { + throw new IllegalArgumentException( + String.format( + "Insufficient bytes for %s in tablet format deserialization, expected %s, remaining %s.", + fieldName, expectedSize, buffer.remaining())); + } + } + /** * Read measurement name and data type from buffer, skipping other measurement schema fields * (encoding, compression, and tags/attributes) that are not needed for InsertTabletStatement. @@ -364,9 +418,13 @@ private static Pair readBitMapsFromBufferWithMemory( boolean hasMarkedBitMap = false; for (int i = 0; i < columns; i++) { - final boolean hasBitMap = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + final boolean hasBitMap = readBooleanByte(byteBuffer, "bitmap existence"); if (hasBitMap) { final int size = ReadWriteIOUtils.readInt(byteBuffer); + if (size < 0) { + throw new IllegalArgumentException( + String.format("Invalid bitmap size %s in tablet format deserialization.", size)); + } final Binary valueBinary = ReadWriteIOUtils.readBinary(byteBuffer); final byte[] byteArray = valueBinary.getValues(); final BitMap bitMap = new BitMap(size, byteArray); @@ -417,7 +475,7 @@ private static Pair readValuesFromBufferWithMemory( for (int i = 0; i < columns; i++) { final boolean isValueColumnsNotNull = - BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + readBooleanByte(byteBuffer, "value column existence"); if (types[i] == null) { continue; } @@ -427,7 +485,7 @@ private static Pair readValuesFromBufferWithMemory( final boolean[] boolValues = new boolean[rowSize]; if (isValueColumnsNotNull) { for (int index = 0; index < rowSize; index++) { - boolValues[index] = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + boolValues[index] = readBooleanByte(byteBuffer, "boolean value"); } } values[i] = boolValues; @@ -503,8 +561,7 @@ private static Pair readValuesFromBufferWithMemory( if (isValueColumnsNotNull) { for (int index = 0; index < rowSize; index++) { - final boolean isNotNull = - BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); + final boolean isNotNull = readBooleanByte(byteBuffer, "binary value existence"); if (isNotNull) { binaryValues[index] = ReadWriteIOUtils.readBinary(byteBuffer); // Calculate memory for each Binary object during deserialization diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java index 69c0c117d5e1..b95852aa936f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java @@ -755,6 +755,52 @@ public void testPipeTransferTabletRawReqWithLegacyTabletFormat() throws IOExcept assertLegacyTabletStatement(deserializedReq.constructStatement()); } + @Test + public void testPipeTransferTabletRawReqWithSingleColumnLegacyTabletFormat() + throws IOException { + final TPipeTransferReq req = new TPipeTransferReq(); + req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion(); + req.type = PipeRequestType.TRANSFER_TABLET_RAW.getType(); + req.body = serializeSingleColumnLegacyTabletRawBuffer(false); + + final PipeTransferTabletRawReq deserializedReq = + PipeTransferTabletRawReq.fromTPipeTransferReq(req); + + Assert.assertFalse(deserializedReq.getIsAligned()); + final InsertTabletStatement statement = deserializedReq.constructStatement(); + Assert.assertEquals("root.sg.d", statement.getDevicePath().getFullPath()); + Assert.assertArrayEquals(new String[] {"s1"}, statement.getMeasurements()); + Assert.assertArrayEquals(new TSDataType[] {TSDataType.INT32}, statement.getDataTypes()); + Assert.assertEquals(2, statement.getRowCount()); + Assert.assertArrayEquals(new long[] {1700000000000L, 1700000000001L}, statement.getTimes()); + Assert.assertArrayEquals(new int[] {2, 1}, (int[]) statement.getColumns()[0]); + } + + @Test + public void testPipeTransferTabletBatchReqRejectsTruncatedRawTablet() throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(0, outputStream); + ReadWriteIOUtils.write(0, outputStream); + ReadWriteIOUtils.write(1, outputStream); + outputStream.write(new byte[] {1, 0, 0, 0, 0, 0}); + + final TPipeTransferReq req = + legacyTransferReq( + PipeRequestType.TRANSFER_TABLET_BATCH, + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size())); + + try { + PipeTransferTabletBatchReq.fromTPipeTransferReq(req); + Assert.fail("Expected IllegalArgumentException"); + } catch (final IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains("Failed to deserialize raw tablet")); + Assert.assertTrue( + e.getCause().getMessage().contains("Failed to deserialize raw tablet request")); + } + } + } + @Test public void testPipeTransferTabletBatchReqV2() throws IOException { final List insertNodeBuffers = new ArrayList<>(); @@ -1285,8 +1331,8 @@ private static ByteBuffer serializeLegacyTabletRawBuffer(final boolean isAligned writeLegacyMeasurementSchema(outputStream, "s2", TSDataType.TEXT); ReadWriteIOUtils.write((byte) 1, outputStream); - ReadWriteIOUtils.write(2L, outputStream); - ReadWriteIOUtils.write(1L, outputStream); + ReadWriteIOUtils.write(1700000000000L, outputStream); + ReadWriteIOUtils.write(1700000000001L, outputStream); ReadWriteIOUtils.write((byte) 0, outputStream); @@ -1305,6 +1351,33 @@ private static ByteBuffer serializeLegacyTabletRawBuffer(final boolean isAligned } } + private static ByteBuffer serializeSingleColumnLegacyTabletRawBuffer(final boolean isAligned) + throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write("root.sg.d", outputStream); + ReadWriteIOUtils.write(2, outputStream); + + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write(1, outputStream); + writeLegacyMeasurementSchema(outputStream, "s1", TSDataType.INT32); + + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write(1700000000000L, outputStream); + ReadWriteIOUtils.write(1700000000001L, outputStream); + + ReadWriteIOUtils.write((byte) 0, outputStream); + + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write(2, outputStream); + ReadWriteIOUtils.write(1, outputStream); + + ReadWriteIOUtils.write(isAligned, outputStream); + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + } + private static ByteBuffer serializeLegacyTabletBatchBody( final List insertNodeBuffers, final List tabletBuffers) throws IOException { From e9f3c8523105238cbc0d6a88f961b687a39151b4 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 12 Jun 2026 14:23:26 +0800 Subject: [PATCH 4/4] spotless --- .../iotdb/db/pipe/sink/util/TabletStatementConverter.java | 6 ++---- .../iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java | 3 +-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java index 27032f4cfe30..5afe02658d92 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java @@ -359,8 +359,7 @@ private static boolean readBooleanByte(final ByteBuffer buffer, final String fie final byte value = ReadWriteIOUtils.readByte(buffer); if (value != 0 && value != 1) { throw new IllegalArgumentException( - String.format( - "Invalid %s flag %s in tablet format deserialization.", fieldName, value)); + String.format("Invalid %s flag %s in tablet format deserialization.", fieldName, value)); } return value == 1; } @@ -474,8 +473,7 @@ private static Pair readValuesFromBufferWithMemory( NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns); for (int i = 0; i < columns; i++) { - final boolean isValueColumnsNotNull = - readBooleanByte(byteBuffer, "value column existence"); + final boolean isValueColumnsNotNull = readBooleanByte(byteBuffer, "value column existence"); if (types[i] == null) { continue; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java index b95852aa936f..7641070800f5 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java @@ -756,8 +756,7 @@ public void testPipeTransferTabletRawReqWithLegacyTabletFormat() throws IOExcept } @Test - public void testPipeTransferTabletRawReqWithSingleColumnLegacyTabletFormat() - throws IOException { + public void testPipeTransferTabletRawReqWithSingleColumnLegacyTabletFormat() throws IOException { final TPipeTransferReq req = new TPipeTransferReq(); req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion(); req.type = PipeRequestType.TRANSFER_TABLET_RAW.getType();