From e9c14f1983f2e42e64014b470d5071c3d197f989 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Tue, 26 May 2026 16:40:45 +0800 Subject: [PATCH] Add Tablet.serializedSize() and comprehensive size validation tests. Pre-allocate serialization buffer using exact size estimation, support OBJECT type in tablet serialize/deserialize path, and consolidate serializedSize tests. --- .../apache/tsfile/write/record/Tablet.java | 113 +++++- .../tsfile/write/record/TabletTest.java | 380 ++++++++++++++++++ 2 files changed, 492 insertions(+), 1 deletion(-) diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java b/java/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java index 6093350e2..a83fa1162 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/record/Tablet.java @@ -748,13 +748,26 @@ private Object createValueColumnOfDataType(TSDataType dataType, int capacity) { /** Serialize {@link Tablet} */ public ByteBuffer serialize() throws IOException { - try (PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final int serializedSize = serializedSize(); + try (PublicBAOS byteArrayOutputStream = new PublicBAOS(serializedSize); DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { serialize(outputStream); return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); } } + /** Return the exact serialized byte size of this tablet. */ + public int serializedSize() { + int size = 0; + size += ReadWriteIOUtils.sizeToWrite(insertTargetName); + size += Integer.BYTES; + size += serializedSizeOfMeasurementSchemas(); + size += serializedSizeOfTimes(); + size += serializedSizeOfBitMaps(); + size += serializedSizeOfValues(); + return size; + } + public void serialize(DataOutputStream stream) throws IOException { ReadWriteIOUtils.write(insertTargetName, stream); ReadWriteIOUtils.write(rowSize, stream); @@ -764,6 +777,104 @@ public void serialize(DataOutputStream stream) throws IOException { writeValues(stream); } + private int serializedSizeOfMeasurementSchemas() { + int size = Byte.BYTES; + if (schemas != null) { + size += Integer.BYTES; + for (int i = 0; i < schemas.size(); i++) { + size += Byte.BYTES; + final IMeasurementSchema schema = schemas.get(i); + if (schema != null) { + size += schema.serializedSize(); + size += Byte.BYTES; + } + } + } + return size; + } + + private int serializedSizeOfTimes() { + int size = Byte.BYTES; + if (timestamps != null) { + size += (long) Long.BYTES * rowSize; + } + return size; + } + + private int serializedSizeOfBitMaps() { + int size = Byte.BYTES; + if (bitMaps != null) { + final int columnCount = schemas == null ? 0 : schemas.size(); + for (int i = 0; i < columnCount; i++) { + if (bitMaps[i] == null || bitMaps[i].isAllUnmarked(rowSize)) { + size += Byte.BYTES; + } else { + size += Byte.BYTES; + size += Integer.BYTES; + size += + ReadWriteIOUtils.sizeToWrite(new Binary(bitMaps[i].getTruncatedByteArray(rowSize))); + } + } + } + return size; + } + + private int serializedSizeOfValues() { + int size = Byte.BYTES; + if (values != null) { + final int columnCount = schemas == null ? 0 : schemas.size(); + for (int i = 0; i < columnCount; i++) { + size += serializedSizeOfColumn(schemas.get(i).getType(), values[i]); + } + } + return size; + } + + private int serializedSizeOfColumn(final TSDataType dataType, final Object column) { + int size = Byte.BYTES; + if (column == null) { + return size; + } + switch (dataType) { + case INT32: + return size + Integer.BYTES * rowSize; + case DATE: + return size + Integer.BYTES * rowSize; + case INT64: + case TIMESTAMP: + return size + Long.BYTES * rowSize; + case FLOAT: + return size + Float.BYTES * rowSize; + case DOUBLE: + return size + Double.BYTES * rowSize; + case BOOLEAN: + return size + rowSize; + case TEXT: + case STRING: + case BLOB: + case OBJECT: + return size + serializedSizeOfBinaryValues((Binary[]) column); + default: + throw new UnSupportedDataTypeException( + Messages.format("error.write.type_not_supported", dataType)); + } + } + + private static int serializedSizeOfBinaryValues(final Binary[] binaryValues, final int rowSize) { + int size = 0; + for (int j = 0; j < rowSize; j++) { + size += Byte.BYTES; + if (binaryValues[j] != null) { + size += ReadWriteIOUtils.sizeToWrite(binaryValues[j]); + } + } + return size; + } + + private int serializedSizeOfBinaryValues(final Binary[] binaryValues) { + return serializedSizeOfBinaryValues(binaryValues, rowSize); + } + /** Serialize {@link MeasurementSchema}s */ private void writeMeasurementSchemas(DataOutputStream stream) throws IOException { ReadWriteIOUtils.write(BytesUtils.boolToByte(schemas != null), stream); diff --git a/java/tsfile/src/test/java/org/apache/tsfile/write/record/TabletTest.java b/java/tsfile/src/test/java/org/apache/tsfile/write/record/TabletTest.java index 65911c18a..0d0eb86d3 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/write/record/TabletTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/write/record/TabletTest.java @@ -25,13 +25,16 @@ import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BitMap; +import org.apache.tsfile.utils.BytesUtils; import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.Assert; import org.junit.Test; +import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -147,6 +150,7 @@ public void testSerializationAndDeSerializationWithMoreData() { measurementSchemas.add(new MeasurementSchema("s7", TSDataType.BLOB, TSEncoding.PLAIN)); measurementSchemas.add(new MeasurementSchema("s8", TSDataType.TIMESTAMP, TSEncoding.PLAIN)); measurementSchemas.add(new MeasurementSchema("s9", TSDataType.DATE, TSEncoding.PLAIN)); + measurementSchemas.add(new MeasurementSchema("s10", TSDataType.OBJECT, TSEncoding.PLAIN)); final int rowSize = 1000; final Tablet tablet = new Tablet(deviceId, measurementSchemas); @@ -170,6 +174,7 @@ public void testSerializationAndDeSerializationWithMoreData() { measurementSchemas.get(9).getMeasurementName(), i, LocalDate.of(2000 + i, i / 100 + 1, i / 100 + 1)); + tablet.addValue(i, 10, i % 2 == 0, (long) i, new byte[] {(byte) i, (byte) (i + 1)}); tablet.getBitMaps()[i % measurementSchemas.size()].mark(i); } @@ -186,9 +191,11 @@ public void testSerializationAndDeSerializationWithMoreData() { tablet.addValue(measurementSchemas.get(7).getMeasurementName(), rowSize - 1, null); tablet.addValue(measurementSchemas.get(8).getMeasurementName(), rowSize - 1, null); tablet.addValue(measurementSchemas.get(9).getMeasurementName(), rowSize - 1, null); + tablet.addValue(measurementSchemas.get(10).getMeasurementName(), rowSize - 1, null); try { final ByteBuffer byteBuffer = tablet.serialize(); + assertEquals(tablet.serializedSize(), byteBuffer.remaining()); final Tablet newTablet = Tablet.deserialize(byteBuffer); assertEquals(tablet, newTablet); for (int i = 0; i < rowSize; i++) { @@ -357,6 +364,366 @@ public void testSerializeDateColumnWithNullValue() throws IOException { Assert.assertTrue(deserializeTablet.isNull(1, 0)); } + /** Data types supported by {@link Tablet#serialize()}. */ + private static final TSDataType[] SERIALIZABLE_DATA_TYPES = { + TSDataType.BOOLEAN, + TSDataType.INT32, + TSDataType.INT64, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TEXT, + TSDataType.STRING, + TSDataType.BLOB, + TSDataType.TIMESTAMP, + TSDataType.DATE, + TSDataType.OBJECT + }; + + private static final int[] ROW_COUNTS_FOR_SIZE_TEST = {0, 1, 7, 50}; + + @Test + public void testSerializedSizeMatchesActualSize() throws IOException { + // tree model: single column per type + for (final TSDataType type : SERIALIZABLE_DATA_TYPES) { + for (final int rowCount : ROW_COUNTS_FOR_SIZE_TEST) { + assertSerializedSizeMatches( + createAndFillTreeTablet( + "root.sg.d1", + columnNamesForType(type), + Arrays.asList(type), + rowCount, + 0, + false, + false), + "tree single column " + type + " rows=" + rowCount); + } + } + + // table model: single column per type + for (final TSDataType type : SERIALIZABLE_DATA_TYPES) { + for (final int rowCount : ROW_COUNTS_FOR_SIZE_TEST) { + assertSerializedSizeMatches( + createAndFillTableTablet( + "table1", + columnNamesForType(type), + Arrays.asList(type), + ColumnCategory.nCopy(ColumnCategory.FIELD, 1), + rowCount, + 0, + false, + false), + "table single column " + type + " rows=" + rowCount); + } + } + + // all types combined + final List treeTypes = Arrays.asList(SERIALIZABLE_DATA_TYPES); + final List tableTypes = new ArrayList<>(); + tableTypes.add(TSDataType.STRING); + tableTypes.addAll(treeTypes); + for (final int rowCount : new int[] {1, 25, 100}) { + assertSerializedSizeMatches( + createAndFillTreeTablet( + "root.sg.d1", buildColumnNames(treeTypes), treeTypes, rowCount, 100, false, false), + "tree all types combined rows=" + rowCount); + assertSerializedSizeMatches( + createAndFillTableTablet( + "table1", + buildColumnNames(tableTypes), + tableTypes, + buildTableColumnCategories(tableTypes.size()), + rowCount, + 100, + false, + false), + "table all types combined rows=" + rowCount); + } + + // variable-length binary columns + final List binaryTypes = + Arrays.asList(TSDataType.TEXT, TSDataType.STRING, TSDataType.BLOB, TSDataType.OBJECT); + assertSerializedSizeMatches( + createAndFillTreeTablet( + "root.sg.d1", buildColumnNames(binaryTypes), binaryTypes, 30, 0, false, true), + "tree variable binary lengths"); + assertSerializedSizeMatches( + createAndFillTableTablet( + "table1", + buildColumnNames(binaryTypes), + binaryTypes, + ColumnCategory.nCopy(ColumnCategory.FIELD, binaryTypes.size()), + 30, + 0, + false, + true), + "table variable binary lengths"); + + // sparse null values + assertSerializedSizeMatches( + createAndFillTreeTablet( + "root.sg.d1", buildColumnNames(treeTypes), treeTypes, 40, 0, true, false), + "tree with null values"); + assertSerializedSizeMatches( + createAndFillTableTablet( + "table1", + buildColumnNames(tableTypes), + tableTypes, + buildTableColumnCategories(tableTypes.size()), + 40, + 0, + true, + false), + "table with null values"); + + // table model with TAG columns + final List tagColumnNames = new ArrayList<>(); + final List tagDataTypes = new ArrayList<>(); + final List tagCategories = new ArrayList<>(); + tagColumnNames.add("region"); + tagDataTypes.add(TSDataType.STRING); + tagCategories.add(ColumnCategory.TAG); + for (int i = 0; i < SERIALIZABLE_DATA_TYPES.length; i++) { + tagColumnNames.add("m" + i); + tagDataTypes.add(SERIALIZABLE_DATA_TYPES[i]); + tagCategories.add(ColumnCategory.FIELD); + } + assertSerializedSizeMatches( + createAndFillTableTablet( + "metrics_table", tagColumnNames, tagDataTypes, tagCategories, 20, 0, false, true), + "table model with TAG columns"); + + // mixed fixed-length and variable-length columns + final List mixedTypes = + Arrays.asList( + TSDataType.INT32, + TSDataType.TEXT, + TSDataType.STRING, + TSDataType.BLOB, + TSDataType.DOUBLE); + assertSerializedSizeMatches( + createAndFillTreeTablet( + "root.sg.d1", buildColumnNames(mixedTypes), mixedTypes, 15, 5, false, true), + "tree mixed column payload lengths"); + assertSerializedSizeMatches( + createAndFillTableTablet( + "table1", + buildColumnNames(mixedTypes), + mixedTypes, + ColumnCategory.nCopy(ColumnCategory.FIELD, mixedTypes.size()), + 15, + 5, + false, + true), + "table mixed column payload lengths"); + + // OBJECT column via dedicated write API + final List objectSchemas = + Arrays.asList(new MeasurementSchema("obj", TSDataType.OBJECT, TSEncoding.PLAIN)); + final Tablet objectTablet = new Tablet("root.sg.d1", objectSchemas, 5); + for (int i = 0; i < 5; i++) { + objectTablet.addTimestamp(i, i); + objectTablet.addValue(i, 0, i % 2 == 0, i * 10L, new byte[] {(byte) i, (byte) (i + 1)}); + } + assertSerializedSizeMatches(objectTablet, "tree OBJECT column"); + final Tablet deserializedObject = Tablet.deserialize(objectTablet.serialize()); + assertEquals(objectTablet, deserializedObject); + for (int i = 0; i < 5; i++) { + assertEquals(objectTablet.getValue(i, 0), deserializedObject.getValue(i, 0)); + } + } + + private static List buildTableColumnCategories(int columnCount) { + final List categories = new ArrayList<>(columnCount); + categories.add(ColumnCategory.TAG); + for (int i = 1; i < columnCount; i++) { + categories.add(ColumnCategory.FIELD); + } + return categories; + } + + private static List buildColumnNames(List dataTypes) { + final List names = new ArrayList<>(dataTypes.size()); + for (int i = 0; i < dataTypes.size(); i++) { + if (i == 0 && dataTypes.size() > 1) { + names.add("tag"); + } else { + names.add("m_" + dataTypes.get(i).name() + "_" + i); + } + } + return names; + } + + private static List columnNamesForType(TSDataType type) { + return Arrays.asList("m_" + type.name() + "_0"); + } + + private Tablet createAndFillTreeTablet( + String deviceId, + List columnNames, + List dataTypes, + int rowCount, + int valueOffset, + boolean withNulls, + boolean variableBinaryLength) + throws IOException { + validateTabletSchema(columnNames, dataTypes, null); + final List schemas = new ArrayList<>(dataTypes.size()); + for (int i = 0; i < dataTypes.size(); i++) { + schemas.add(new MeasurementSchema(columnNames.get(i), dataTypes.get(i), TSEncoding.PLAIN)); + } + final Tablet tablet = new Tablet(deviceId, schemas, Math.max(1024, rowCount + 1)); + fillTabletRows(tablet, rowCount, valueOffset, withNulls, variableBinaryLength); + return tablet; + } + + private Tablet createAndFillTableTablet( + String tableName, + List columnNames, + List dataTypes, + List columnCategories, + int rowCount, + int valueOffset, + boolean withNulls, + boolean variableBinaryLength) + throws IOException { + validateTabletSchema(columnNames, dataTypes, columnCategories); + final Tablet tablet = + new Tablet( + tableName, columnNames, dataTypes, columnCategories, Math.max(1024, rowCount + 1)); + fillTabletRows(tablet, rowCount, valueOffset, withNulls, variableBinaryLength); + return tablet; + } + + private static void validateTabletSchema( + List columnNames, List dataTypes, List columnCategories) { + if (columnNames.size() != dataTypes.size()) { + throw new IllegalArgumentException( + "columnNames size " + + columnNames.size() + + " must match dataTypes size " + + dataTypes.size()); + } + if (columnCategories != null && columnCategories.size() != dataTypes.size()) { + throw new IllegalArgumentException( + "columnCategories size " + + columnCategories.size() + + " must match dataTypes size " + + dataTypes.size()); + } + } + + private void fillTabletRows( + Tablet tablet, + int rowCount, + int valueOffset, + boolean withNulls, + boolean variableBinaryLength) { + if (rowCount > 0) { + fillTabletForSerializedSizeTest( + tablet, valueOffset, rowCount, withNulls, variableBinaryLength); + } + } + + private void fillTabletForSerializedSizeTest( + Tablet tablet, + int valueOffset, + int rowCount, + boolean withNulls, + boolean variableBinaryLength) { + for (int row = 0; row < rowCount; row++) { + tablet.addTimestamp(row, valueOffset + row); + for (int col = 0; col < tablet.getSchemas().size(); col++) { + final TSDataType type = tablet.getSchemas().get(col).getType(); + if (isNullCell(withNulls, row, col)) { + tablet.addValue(tablet.getSchemas().get(col).getMeasurementName(), row, null); + } else if (type == TSDataType.OBJECT) { + tablet.addValue( + row, + col, + (row + col) % 2 == 0, + valueOffset + row * 1000L + col, + payloadBytes(binaryPayloadLength(variableBinaryLength, row, col))); + } else { + tablet.addValue( + tablet.getSchemas().get(col).getMeasurementName(), + row, + sampleValue(type, row, col, variableBinaryLength)); + } + } + } + } + + private static boolean isNullCell(boolean withNulls, int row, int col) { + return withNulls && (row + col) % 3 == 0; + } + + private static int binaryPayloadLength(boolean variableBinaryLength, int row, int col) { + if (variableBinaryLength) { + return (col + 1) * 17 + row * 3 + 1; + } + return 8 + row % 11; + } + + private Object sampleValue(TSDataType type, int row, int col, boolean variableBinaryLength) { + switch (type) { + case BOOLEAN: + return (row + col) % 2 == 0; + case INT32: + return row + col * 100; + case INT64: + case TIMESTAMP: + return (long) (valueOffset(row, col) * 1_000_000L); + case FLOAT: + return (row + col) * 1.5f; + case DOUBLE: + return (row + col) * 2.5; + case TEXT: + case STRING: + return stringOfLength(binaryPayloadLength(variableBinaryLength, row, col)); + case BLOB: + return binaryOfLength(binaryPayloadLength(variableBinaryLength, row, col)); + case DATE: + return LocalDate.of(2000 + (row % 20), (col % 12) + 1, (row % 28) + 1); + default: + throw new IllegalArgumentException("Unsupported type in test: " + type); + } + } + + private static int valueOffset(int row, int col) { + return row + col + 1; + } + + private static String stringOfLength(int length) { + final char[] chars = new char[length]; + Arrays.fill(chars, 'x'); + return new String(chars); + } + + private static Binary binaryOfLength(int length) { + final byte[] bytes = new byte[length]; + Arrays.fill(bytes, (byte) 'b'); + return new Binary(bytes); + } + + private static byte[] payloadBytes(int length) { + final byte[] bytes = new byte[length]; + Arrays.fill(bytes, (byte) 'p'); + return bytes; + } + + private void assertSerializedSizeMatches(Tablet tablet, String scenario) throws IOException { + final int expectedSize = tablet.serializedSize(); + final ByteBuffer buffer = tablet.serialize(); + assertEquals(scenario + ": serialize() buffer size", expectedSize, buffer.remaining()); + try (PublicBAOS baos = new PublicBAOS(); + DataOutputStream outputStream = new DataOutputStream(baos)) { + tablet.serialize(outputStream); + assertEquals(scenario + ": serialize(stream) size", expectedSize, baos.size()); + } + buffer.rewind(); + assertEquals(scenario + ": deserialize roundtrip", tablet, Tablet.deserialize(buffer)); + } + @Test public void testAppendInconsistent() { Tablet t1 = @@ -425,6 +792,9 @@ private void fillTablet(Tablet t, int valueOffset, int length) { case BLOB: t.addValue(i, j, String.valueOf(i + valueOffset)); break; + case OBJECT: + t.addValue(i, j, (i + valueOffset) % 2 == 0, i + valueOffset, new byte[] {(byte) i}); + break; case DATE: t.addValue(i, j, LocalDate.of(i + valueOffset, 1, 1)); break; @@ -655,6 +1025,16 @@ private void checkAppendedTablet( new Binary(String.valueOf(i).getBytes(StandardCharsets.UTF_8)), result.getValue(i, j)); break; + case OBJECT: + { + byte[] content = new byte[] {(byte) i}; + byte[] expected = new byte[content.length + 9]; + expected[0] = (byte) (i % 2); + System.arraycopy(BytesUtils.longToBytes(i), 0, expected, 1, 8); + System.arraycopy(content, 0, expected, 9, content.length); + assertEquals(new Binary(expected), result.getValue(i, j)); + } + break; case DATE: assertEquals(LocalDate.of(i, 1, 1), result.getValue(i, j)); break;