diff --git a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java index 14e46634a1..d57d3a3621 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/converter/PojoToRowConverter.java @@ -18,6 +18,7 @@ package org.apache.fluss.client.converter; import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.Variant; import org.apache.fluss.types.DataType; import org.apache.fluss.types.DataTypeChecks; import org.apache.fluss.types.DecimalType; @@ -119,6 +120,8 @@ private static FieldToRow createFieldConverter(PojoType.Property prop, DataType case BINARY: case BYTES: return prop::read; + case VARIANT: + return (obj) -> convertVariantValue(prop, prop.read(obj)); case CHAR: case STRING: return (obj) -> @@ -165,6 +168,28 @@ private static FieldToRow createFieldConverter(PojoType.Property prop, DataType } } + /** + * Converts a byte array value from a POJO property to a Variant. If the value is already a + * Variant, it is returned directly. If it is a byte[], it is interpreted as the combined format + * [4-byte value length (big-endian)][value][metadata] and converted to a Variant. + */ + private static @Nullable Variant convertVariantValue( + PojoType.Property prop, @Nullable Object v) { + if (v == null) { + return null; + } + if (v instanceof Variant) { + return (Variant) v; + } + if (v instanceof byte[]) { + return Variant.bytesToVariant((byte[]) v); + } + throw new IllegalArgumentException( + String.format( + "Field %s is not a byte[] or Variant. Cannot convert to Variant.", + prop.name)); + } + private interface FieldToRow { Object readAndConvert(Object pojo) throws Exception; } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/converter/RowToPojoConverter.java b/fluss-client/src/main/java/org/apache/fluss/client/converter/RowToPojoConverter.java index aa7b0d3157..ad9dbd46b5 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/converter/RowToPojoConverter.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/converter/RowToPojoConverter.java @@ -19,6 +19,7 @@ import org.apache.fluss.row.InternalArray; import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.Variant; import org.apache.fluss.types.ArrayType; import org.apache.fluss.types.DataType; import org.apache.fluss.types.DataTypeChecks; @@ -140,6 +141,8 @@ private static RowToField createRowReader(DataType fieldType, PojoType.Property case BINARY: case BYTES: return InternalRow::getBytes; + case VARIANT: + return (row, pos) -> Variant.variantToBytes(row.getVariant(pos)); case DECIMAL: DecimalType decimalType = (DecimalType) fieldType; return (row, pos) -> diff --git a/fluss-common/src/main/java/org/apache/fluss/row/AbstractBinaryWriter.java b/fluss-common/src/main/java/org/apache/fluss/row/AbstractBinaryWriter.java index f18c574f4b..e5886e4989 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/AbstractBinaryWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/AbstractBinaryWriter.java @@ -104,6 +104,22 @@ public void writeRow(int pos, InternalRow value, RowSerializer serializer) { pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes()); } + @Override + public void writeVariant(int pos, Variant variant) { + byte[] value = variant.value(); + byte[] metadata = variant.metadata(); + int totalSize = 4 + value.length + metadata.length; + byte[] combined = new byte[totalSize]; + // Write value length as big-endian 4-byte integer + combined[0] = (byte) ((value.length >> 24) & 0xFF); + combined[1] = (byte) ((value.length >> 16) & 0xFF); + combined[2] = (byte) ((value.length >> 8) & 0xFF); + combined[3] = (byte) (value.length & 0xFF); + System.arraycopy(value, 0, combined, 4, value.length); + System.arraycopy(metadata, 0, combined, 4 + value.length, metadata.length); + writeBytes(pos, combined); + } + @Override public void writeChar(int pos, BinaryString value, int length) { // TODO: currently, we encoding CHAR(length) as the same with STRING, the length info can diff --git a/fluss-common/src/main/java/org/apache/fluss/row/BinaryArray.java b/fluss-common/src/main/java/org/apache/fluss/row/BinaryArray.java index f33300287c..b23870be53 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/BinaryArray.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/BinaryArray.java @@ -25,6 +25,7 @@ import java.lang.reflect.Array; import static org.apache.fluss.memory.MemoryUtils.UNSAFE; +import static org.apache.fluss.row.Variant.bytesToVariant; import static org.apache.fluss.utils.Preconditions.checkArgument; /** @@ -82,6 +83,7 @@ public static int calculateFixLengthPartSize(DataType type) { case STRING: case BINARY: case BYTES: + case VARIANT: case DECIMAL: case BIGINT: case DOUBLE: @@ -233,6 +235,12 @@ public byte[] getBytes(int pos) { return BinarySegmentUtils.readBinary(segments, offset, fieldOffset, offsetAndSize); } + @Override + public Variant getVariant(int pos) { + byte[] bytes = getBytes(pos); + return bytesToVariant(bytes); + } + @Override public TimestampNtz getTimestampNtz(int pos, int precision) { assertIndexIsValid(pos); diff --git a/fluss-common/src/main/java/org/apache/fluss/row/BinaryArrayWriter.java b/fluss-common/src/main/java/org/apache/fluss/row/BinaryArrayWriter.java index cf19c7ae36..da56cf76c5 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/BinaryArrayWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/BinaryArrayWriter.java @@ -213,6 +213,7 @@ public static NullSetter createNullSetter(DataType elementType) { case STRING: case BINARY: case BYTES: + case VARIANT: case DECIMAL: case BIGINT: case TIMESTAMP_WITHOUT_TIME_ZONE: diff --git a/fluss-common/src/main/java/org/apache/fluss/row/BinaryWriter.java b/fluss-common/src/main/java/org/apache/fluss/row/BinaryWriter.java index 1064750c1b..c88326968d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/BinaryWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/BinaryWriter.java @@ -84,6 +84,8 @@ public interface BinaryWriter { void writeRow(int pos, InternalRow value, RowSerializer serializer); + void writeVariant(int pos, Variant value); + /** Finally, complete write to set real size to binary. */ void complete(); @@ -135,6 +137,8 @@ static BinaryWriter.ValueWriter createNotNullValueWriter( writer.writeBinary(pos, (byte[]) value, binaryLength); case BYTES: return (writer, pos, value) -> writer.writeBytes(pos, (byte[]) value); + case VARIANT: + return (writer, pos, value) -> writer.writeVariant(pos, (Variant) value); case DECIMAL: final int decimalPrecision = getPrecision(elementType); return (writer, pos, value) -> diff --git a/fluss-common/src/main/java/org/apache/fluss/row/DataGetters.java b/fluss-common/src/main/java/org/apache/fluss/row/DataGetters.java index 5af9dcdcd4..ff88d330d8 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/DataGetters.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/DataGetters.java @@ -92,6 +92,9 @@ public interface DataGetters { /** Returns the binary value at the given position. */ byte[] getBytes(int pos); + /** Returns the variant value at the given position. */ + Variant getVariant(int pos); + /** Returns the array value at the given position. */ InternalArray getArray(int pos); diff --git a/fluss-common/src/main/java/org/apache/fluss/row/GenericArray.java b/fluss-common/src/main/java/org/apache/fluss/row/GenericArray.java index 08c5a90271..16c8ac7766 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/GenericArray.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/GenericArray.java @@ -213,6 +213,11 @@ public byte[] getBytes(int pos) { return (byte[]) getObject(pos); } + @Override + public Variant getVariant(int pos) { + return (Variant) getObject(pos); + } + @Override public InternalArray getArray(int pos) { return (InternalArray) getObject(pos); diff --git a/fluss-common/src/main/java/org/apache/fluss/row/GenericRow.java b/fluss-common/src/main/java/org/apache/fluss/row/GenericRow.java index d67eba1892..5d2e07133c 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/GenericRow.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/GenericRow.java @@ -166,6 +166,11 @@ public byte[] getBytes(int pos) { return (byte[]) this.fields[pos]; } + @Override + public Variant getVariant(int pos) { + return (Variant) this.fields[pos]; + } + @Override public InternalArray getArray(int pos) { return (InternalArray) this.fields[pos]; diff --git a/fluss-common/src/main/java/org/apache/fluss/row/GenericVariant.java b/fluss-common/src/main/java/org/apache/fluss/row/GenericVariant.java new file mode 100644 index 0000000000..c53be5cb44 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/row/GenericVariant.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.row; + +import org.apache.fluss.annotation.PublicEvolving; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Objects; + +/** + * An internal data structure implementing {@link Variant}. + * + *

A Variant consists of two byte arrays: + * + *

+ * + * @since 0.9 + */ +@PublicEvolving +public final class GenericVariant implements Variant, Serializable { + + private static final long serialVersionUID = 1L; + + private final byte[] value; + private final byte[] metadata; + + public GenericVariant(byte[] value, byte[] metadata) { + this.value = value; + this.metadata = metadata; + } + + @Override + public byte[] value() { + return value; + } + + @Override + public byte[] metadata() { + return metadata; + } + + @Override + public long sizeInBytes() { + return value.length + metadata.length; + } + + @Override + public Variant copy() { + return new GenericVariant( + Arrays.copyOf(value, value.length), Arrays.copyOf(metadata, metadata.length)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GenericVariant that = (GenericVariant) o; + return Objects.deepEquals(value, that.value) && Objects.deepEquals(metadata, that.metadata); + } + + @Override + public int hashCode() { + return Objects.hash(Arrays.hashCode(value), Arrays.hashCode(metadata)); + } + + @Override + public String toString() { + return "GenericVariant{value=" + + Arrays.toString(value) + + ", metadata=" + + Arrays.toString(metadata) + + "}"; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/row/InternalArray.java b/fluss-common/src/main/java/org/apache/fluss/row/InternalArray.java index 6575de1b39..def1489e08 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/InternalArray.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/InternalArray.java @@ -99,6 +99,9 @@ static ElementGetter createElementGetter(DataType fieldType) { case BYTES: elementGetter = InternalArray::getBytes; break; + case VARIANT: + elementGetter = InternalArray::getVariant; + break; case DECIMAL: final int decimalPrecision = getPrecision(fieldType); final int decimalScale = getScale(fieldType); diff --git a/fluss-common/src/main/java/org/apache/fluss/row/InternalRow.java b/fluss-common/src/main/java/org/apache/fluss/row/InternalRow.java index e19f42b2e0..678ee51be0 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/InternalRow.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/InternalRow.java @@ -129,6 +129,8 @@ static Class getDataClass(DataType type) { return TimestampNtz.class; case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return TimestampLtz.class; + case VARIANT: + return Variant.class; case ARRAY: return InternalArray.class; case MAP: @@ -181,6 +183,9 @@ static FieldGetter createFieldGetter(DataType fieldType, int fieldPos) { case BYTES: fieldGetter = row -> row.getBytes(fieldPos); break; + case VARIANT: + fieldGetter = row -> row.getVariant(fieldPos); + break; case DECIMAL: final int decimalPrecision = getPrecision(fieldType); final int decimalScale = getScale(fieldType); diff --git a/fluss-common/src/main/java/org/apache/fluss/row/PaddingRow.java b/fluss-common/src/main/java/org/apache/fluss/row/PaddingRow.java index df202ec7e2..a5651d34a7 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/PaddingRow.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/PaddingRow.java @@ -126,6 +126,11 @@ public byte[] getBytes(int pos) { return row.getBytes(pos); } + @Override + public Variant getVariant(int pos) { + return row.getVariant(pos); + } + @Override public InternalArray getArray(int pos) { return row.getArray(pos); diff --git a/fluss-common/src/main/java/org/apache/fluss/row/ProjectedRow.java b/fluss-common/src/main/java/org/apache/fluss/row/ProjectedRow.java index bf557c7923..ffbfb1aaa3 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/ProjectedRow.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/ProjectedRow.java @@ -140,6 +140,11 @@ public byte[] getBytes(int pos) { return row.getBytes(indexMapping[pos]); } + @Override + public Variant getVariant(int pos) { + return row.getVariant(indexMapping[pos]); + } + @Override public InternalArray getArray(int pos) { return row.getArray(indexMapping[pos]); diff --git a/fluss-common/src/main/java/org/apache/fluss/row/SequentialBinaryWriter.java b/fluss-common/src/main/java/org/apache/fluss/row/SequentialBinaryWriter.java index e2f7ad37a9..e14640b1d2 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/SequentialBinaryWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/SequentialBinaryWriter.java @@ -73,6 +73,8 @@ public interface SequentialBinaryWriter extends BinaryWriter { void writeRow(InternalRow value, RowSerializer serializer); + void writeVariant(Variant value); + /** Finally, complete write to set real size to binary. */ void complete(); @@ -168,4 +170,9 @@ default void writeMap(int pos, InternalMap value, MapSerializer serializer) { default void writeRow(int pos, InternalRow value, RowSerializer serializer) { writeRow(value, serializer); } + + @Override + default void writeVariant(int pos, Variant value) { + writeVariant(value); + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/row/Variant.java b/fluss-common/src/main/java/org/apache/fluss/row/Variant.java new file mode 100644 index 0000000000..25dd754294 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/row/Variant.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.row; + +import org.apache.fluss.annotation.PublicEvolving; + +/** + * A Variant represents a type that can contain one of: 1) Primitive: A type and corresponding value + * (e.g. INT, STRING); 2) Array: An ordered list of Variant values; 3) Object: An unordered + * collection of string/Variant pairs (i.e. key/value pairs). An object may not contain duplicate + * keys. + * + *

A Variant is encoded with 2 binary values, the value and the metadata. + * + *

The Variant Binary Encoding allows representation of semi-structured data (e.g. JSON) in a + * form that can be efficiently queried by path. The design is intended to allow efficient access to + * nested data even in the presence of very wide or deep structures. + * + * @since 0.9 + */ +@PublicEvolving +public interface Variant { + + String METADATA = "metadata"; + + String VALUE = "value"; + + /** Returns the variant metadata (string dictionary). */ + byte[] metadata(); + + /** Returns the variant value (binary-encoded variant value). */ + byte[] value(); + + /** Returns the size of the variant in bytes. */ + long sizeInBytes(); + + /** Returns a copy of the variant. */ + Variant copy(); + + // ------------------------------------------------------------------------------------------ + // Static Utilities + // ------------------------------------------------------------------------------------------ + + /** + * Converts a byte array in the format [4-byte value length (big-endian)][value][metadata] into + * a {@link Variant} object. + */ + static Variant bytesToVariant(byte[] bytes) { + if (bytes.length < 4) { + throw new IllegalArgumentException( + "Invalid variant bytes: length must be at least 4, got " + bytes.length); + } + int valueLength = + ((bytes[0] & 0xFF) << 24) + | ((bytes[1] & 0xFF) << 16) + | ((bytes[2] & 0xFF) << 8) + | (bytes[3] & 0xFF); + byte[] value = new byte[valueLength]; + System.arraycopy(bytes, 4, value, 0, valueLength); + int metadataLength = bytes.length - 4 - valueLength; + byte[] metadata = new byte[metadataLength]; + System.arraycopy(bytes, 4 + valueLength, metadata, 0, metadataLength); + return new GenericVariant(value, metadata); + } + + /** + * Converts a {@link Variant} object into a byte array in the format [4-byte value length + * (big-endian)][value][metadata]. + */ + static byte[] variantToBytes(Variant variant) { + byte[] value = variant.value(); + byte[] metadata = variant.metadata(); + int totalSize = 4 + value.length + metadata.length; + byte[] combined = new byte[totalSize]; + combined[0] = (byte) ((value.length >> 24) & 0xFF); + combined[1] = (byte) ((value.length >> 16) & 0xFF); + combined[2] = (byte) ((value.length >> 8) & 0xFF); + combined[3] = (byte) (value.length & 0xFF); + System.arraycopy(value, 0, combined, 4, value.length); + System.arraycopy(metadata, 0, combined, 4 + value.length, metadata.length); + return combined; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java b/fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java index 0bc5ac071d..e23b0f8be6 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java @@ -31,6 +31,7 @@ import org.apache.fluss.row.NullAwareGetters; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.Variant; import org.apache.fluss.row.array.AlignedArray; import org.apache.fluss.row.map.AlignedMap; import org.apache.fluss.types.DataType; @@ -388,6 +389,11 @@ public byte[] getBytes(int pos) { return BinarySegmentUtils.readBinary(segments, offset, fieldOffset, offsetAndLen); } + @Override + public Variant getVariant(int pos) { + return Variant.bytesToVariant(getBytes(pos)); + } + @Override public InternalArray getArray(int pos) { assertIndexIsValid(pos); diff --git a/fluss-common/src/main/java/org/apache/fluss/row/arrow/vectors/ArrowVariantColumnVector.java b/fluss-common/src/main/java/org/apache/fluss/row/arrow/vectors/ArrowVariantColumnVector.java new file mode 100644 index 0000000000..03a43be172 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/row/arrow/vectors/ArrowVariantColumnVector.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.row.arrow.vectors; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.row.columnar.BytesColumnVector; +import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VarBinaryVector; +import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.StructVector; + +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** + * Arrow column vector for Variant type. + * + *

Reads variant data from an Arrow STRUCT with two VARBINARY children ("value" and "metadata") + * and combines them into a single {@code byte[]} in the format: [4-byte value length + * (big-endian)][value bytes][metadata bytes]. + */ +@Internal +public class ArrowVariantColumnVector implements BytesColumnVector { + + private final StructVector structVector; + private final VarBinaryVector valueVector; + private final VarBinaryVector metadataVector; + + public ArrowVariantColumnVector(StructVector structVector) { + this.structVector = checkNotNull(structVector); + this.valueVector = (VarBinaryVector) structVector.getChild("value"); + this.metadataVector = (VarBinaryVector) structVector.getChild("metadata"); + } + + @Override + public Bytes getBytes(int i) { + byte[] value = valueVector.get(i); + byte[] metadata = metadataVector.get(i); + + // Combine into: [4-byte value length (big-endian)][value][metadata] + int totalLength = 4 + value.length + metadata.length; + byte[] combined = new byte[totalLength]; + + // Write value length as big-endian 4-byte integer + combined[0] = (byte) ((value.length >> 24) & 0xFF); + combined[1] = (byte) ((value.length >> 16) & 0xFF); + combined[2] = (byte) ((value.length >> 8) & 0xFF); + combined[3] = (byte) (value.length & 0xFF); + + // Copy value bytes + System.arraycopy(value, 0, combined, 4, value.length); + // Copy metadata bytes + System.arraycopy(metadata, 0, combined, 4 + value.length, metadata.length); + + return new BytesColumnVector.Bytes(combined, 0, combined.length); + } + + @Override + public boolean isNullAt(int i) { + return structVector.isNull(i); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowVariantWriter.java b/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowVariantWriter.java new file mode 100644 index 0000000000..a21d0da69d --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowVariantWriter.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.row.arrow.writers; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.row.DataGetters; +import org.apache.fluss.row.Variant; +import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VarBinaryVector; +import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.StructVector; + +/** + * {@link ArrowFieldWriter} for Variant type. + * + *

A VARIANT is stored in Arrow as a STRUCT with two VARBINARY children: "value" (the + * binary-encoded variant value) and "metadata" (the variant string dictionary / metadata). + * + *

Internally in Fluss, the VARIANT is represented as a {@link Variant} object containing two + * separate {@code byte[]} arrays: value and metadata. This writer extracts them and writes to the + * corresponding Arrow vectors. + */ +@Internal +public class ArrowVariantWriter extends ArrowFieldWriter { + + private final VarBinaryVector valueVector; + private final VarBinaryVector metadataVector; + + public ArrowVariantWriter(StructVector structVector) { + super(structVector); + this.valueVector = (VarBinaryVector) structVector.getChild("value"); + this.metadataVector = (VarBinaryVector) structVector.getChild("metadata"); + } + + @Override + public void doWrite(int rowIndex, DataGetters row, int ordinal, boolean handleSafe) { + StructVector structVector = (StructVector) fieldVector; + Variant variant = row.getVariant(ordinal); + + structVector.setIndexDefined(rowIndex); + valueVector.setSafe(rowIndex, variant.value()); + metadataVector.setSafe(rowIndex, variant.metadata()); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/row/columnar/ColumnarArray.java b/fluss-common/src/main/java/org/apache/fluss/row/columnar/ColumnarArray.java index 943c203080..dbbee62196 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/columnar/ColumnarArray.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/columnar/ColumnarArray.java @@ -25,6 +25,7 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.Variant; import java.io.Serializable; import java.util.Arrays; @@ -133,6 +134,11 @@ public byte[] getBytes(int pos) { } } + @Override + public Variant getVariant(int pos) { + return Variant.bytesToVariant(getBytes(pos)); + } + @Override public InternalArray getArray(int pos) { return ((ArrayColumnVector) data).getArray(offset + pos); diff --git a/fluss-common/src/main/java/org/apache/fluss/row/columnar/ColumnarRow.java b/fluss-common/src/main/java/org/apache/fluss/row/columnar/ColumnarRow.java index 40c935104a..eb76b5724a 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/columnar/ColumnarRow.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/columnar/ColumnarRow.java @@ -25,6 +25,7 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.Variant; /** * Columnar row to support access to vector column data. It is a row view in {@link @@ -133,6 +134,11 @@ public byte[] getBytes(int pos) { return vectorizedColumnBatch.getBytes(rowId, pos); } + @Override + public Variant getVariant(int pos) { + return Variant.bytesToVariant(getBytes(pos)); + } + @Override public InternalArray getArray(int pos) { return vectorizedColumnBatch.getArray(rowId, pos); diff --git a/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRow.java b/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRow.java index b56355622f..9d60828266 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRow.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRow.java @@ -29,6 +29,7 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.Variant; import org.apache.fluss.row.indexed.IndexedRow; import org.apache.fluss.types.DataType; import org.apache.fluss.utils.MurmurHashUtils; @@ -251,6 +252,11 @@ public byte[] getBytes(int pos) { return decodedRow().getBytes(pos); } + @Override + public Variant getVariant(int pos) { + return decodedRow().getVariant(pos); + } + @Override public InternalArray getArray(int pos) { return decodedRow().getArray(pos); diff --git a/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java b/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java index e51f2ad609..30c4ad1e07 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowReader.java @@ -26,6 +26,7 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.Variant; import org.apache.fluss.row.array.CompactedArray; import org.apache.fluss.row.map.CompactedMap; import org.apache.fluss.types.ArrayType; @@ -245,6 +246,11 @@ public byte[] readBytes() { return readBytesInternal(length); } + public Variant readVariant() { + byte[] bytes = readBytes(); + return Variant.bytesToVariant(bytes); + } + public byte[] readBinary(int length) { return readBytesInternal(length); } @@ -291,6 +297,9 @@ static FieldReader createFieldReader(DataType fieldType) { case BYTES: fieldReader = (reader, pos) -> reader.readBytes(); break; + case VARIANT: + fieldReader = (reader, pos) -> reader.readVariant(); + break; case DECIMAL: final int decimalPrecision = getPrecision(fieldType); final int decimalScale = getScale(fieldType); diff --git a/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowWriter.java b/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowWriter.java index d00076155f..32fe4f3060 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRowWriter.java @@ -33,6 +33,7 @@ import org.apache.fluss.row.SequentialBinaryWriter; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.Variant; import org.apache.fluss.row.serializer.ArraySerializer; import org.apache.fluss.row.serializer.MapSerializer; import org.apache.fluss.row.serializer.RowSerializer; @@ -220,6 +221,11 @@ public void writeBytes(byte[] value) { write(value, 0, value.length); } + @Override + public void writeVariant(Variant variant) { + writeBytes(Variant.variantToBytes(variant)); + } + @Override public void writeDecimal(Decimal value, int precision) { if (Decimal.isCompact(precision)) { diff --git a/fluss-common/src/main/java/org/apache/fluss/row/decode/iceberg/IcebergKeyDecoder.java b/fluss-common/src/main/java/org/apache/fluss/row/decode/iceberg/IcebergKeyDecoder.java index dd746e9866..b06c7b78e9 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/decode/iceberg/IcebergKeyDecoder.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/decode/iceberg/IcebergKeyDecoder.java @@ -23,6 +23,7 @@ import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.Variant; import org.apache.fluss.row.decode.KeyDecoder; import org.apache.fluss.types.DataType; import org.apache.fluss.types.RowType; @@ -130,6 +131,15 @@ private FieldReader createFieldReader(DataType fieldType) { return bytes; }; + case VARIANT: + return (segment, offset) -> { + // Variant bytes are written directly without length prefix + int length = segment.size() - offset; + byte[] bytes = new byte[length]; + segment.get(offset, bytes, 0, length); + return Variant.bytesToVariant(bytes); + }; + default: throw new IllegalArgumentException( "Unsupported type for Iceberg key decoder: " + fieldType); diff --git a/fluss-common/src/main/java/org/apache/fluss/row/decode/paimon/PaimonKeyDecoder.java b/fluss-common/src/main/java/org/apache/fluss/row/decode/paimon/PaimonKeyDecoder.java index f9a319022e..8bfad72983 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/decode/paimon/PaimonKeyDecoder.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/decode/paimon/PaimonKeyDecoder.java @@ -26,6 +26,7 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.Variant; import org.apache.fluss.row.decode.KeyDecoder; import org.apache.fluss.types.DataType; import org.apache.fluss.types.RowType; @@ -133,6 +134,9 @@ private FieldReader createFieldReader(DataType fieldType, int pos) { case BINARY: case BYTES: return (segment, baseOffset) -> readBinary(segment, baseOffset, fieldOffset); + case VARIANT: + return (segment, baseOffset) -> + Variant.bytesToVariant(readBinary(segment, baseOffset, fieldOffset)); case DECIMAL: final int decimalPrecision = getPrecision(fieldType); final int decimalScale = getScale(fieldType); diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java index 433f364526..395b373f7c 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java @@ -21,6 +21,7 @@ import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.Decimal; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.Variant; import org.apache.fluss.types.DataType; import org.apache.fluss.utils.UnsafeUtils; @@ -222,6 +223,10 @@ public static FieldWriter createFieldWriter(DataType fieldType) { case BYTES: return (writer, value) -> writer.writeBytes((byte[]) value, true); + case VARIANT: + return (writer, value) -> + writer.writeBytes(Variant.variantToBytes((Variant) value), true); + case ARRAY: throw new IllegalArgumentException( "Array types cannot be used as bucket keys. Bucket keys must be scalar types."); diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/paimon/PaimonBinaryRowWriter.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/paimon/PaimonBinaryRowWriter.java index 80bc371211..6e82dd6d8a 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/encode/paimon/PaimonBinaryRowWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/paimon/PaimonBinaryRowWriter.java @@ -25,6 +25,7 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.Variant; import org.apache.fluss.types.DataType; import java.io.Serializable; @@ -380,6 +381,11 @@ public static FieldWriter createFieldWriter(DataType fieldType) { case BYTES: fieldWriter = (writer, pos, value) -> writer.writeBytes(pos, (byte[]) value); break; + case VARIANT: + fieldWriter = + (writer, pos, value) -> + writer.writeBytes(pos, Variant.variantToBytes((Variant) value)); + break; case DECIMAL: final int decimalPrecision = getPrecision(fieldType); fieldWriter = diff --git a/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRow.java b/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRow.java index 9a7b18f772..683de7402a 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRow.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRow.java @@ -31,6 +31,7 @@ import org.apache.fluss.row.NullAwareGetters; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.Variant; import org.apache.fluss.row.array.IndexedArray; import org.apache.fluss.row.map.IndexedMap; import org.apache.fluss.types.ArrayType; @@ -383,6 +384,11 @@ public byte[] getBytes(int pos) { return bytes; } + @Override + public Variant getVariant(int pos) { + return Variant.bytesToVariant(getBytes(pos)); + } + @Override public InternalArray getArray(int pos) { assertIndexIsValid(pos); @@ -513,6 +519,7 @@ public static boolean isFixedLength(DataType dataType) { return true; case STRING: case BYTES: + case VARIANT: case ARRAY: case MAP: case ROW: diff --git a/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowReader.java b/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowReader.java index ff903cb775..d6f07e07de 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowReader.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowReader.java @@ -27,6 +27,7 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.Variant; import org.apache.fluss.row.array.IndexedArray; import org.apache.fluss.row.map.IndexedMap; import org.apache.fluss.types.ArrayType; @@ -174,6 +175,11 @@ public byte[] readBytes() { return readBytesInternal(length); } + public Variant readVariant() { + byte[] bytes = readBytes(); + return Variant.bytesToVariant(bytes); + } + private int readVarLengthFromVarLengthList() { if (variableLengthPosition - nullBitsSizeInBytes + 4 > variableColumnLengthListInBytes) { throw new IllegalArgumentException(); @@ -262,6 +268,9 @@ static FieldReader createFieldReader(DataType fieldType) { case BYTES: fieldReader = (reader, pos) -> reader.readBytes(); break; + case VARIANT: + fieldReader = (reader, pos) -> reader.readVariant(); + break; case DECIMAL: final int decimalPrecision = getPrecision(fieldType); final int decimalScale = getScale(fieldType); diff --git a/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowWriter.java b/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowWriter.java index 35e6ce7b6c..27b5f40fbf 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/indexed/IndexedRowWriter.java @@ -33,6 +33,7 @@ import org.apache.fluss.row.SequentialBinaryWriter; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.Variant; import org.apache.fluss.row.serializer.ArraySerializer; import org.apache.fluss.row.serializer.MapSerializer; import org.apache.fluss.row.serializer.RowSerializer; @@ -183,6 +184,11 @@ public void writeBytes(byte[] value) { write(value, 0, value.length); } + @Override + public void writeVariant(Variant variant) { + writeBytes(Variant.variantToBytes(variant)); + } + @Override public void writeDecimal(Decimal value, int precision) { if (Decimal.isCompact(precision)) { diff --git a/fluss-common/src/main/java/org/apache/fluss/types/DataTypeDefaultVisitor.java b/fluss-common/src/main/java/org/apache/fluss/types/DataTypeDefaultVisitor.java index eb58c5a6c4..c2036e3084 100644 --- a/fluss-common/src/main/java/org/apache/fluss/types/DataTypeDefaultVisitor.java +++ b/fluss-common/src/main/java/org/apache/fluss/types/DataTypeDefaultVisitor.java @@ -122,5 +122,10 @@ public R visit(RowType rowType) { return defaultMethod(rowType); } + @Override + public R visit(VariantType variantType) { + return defaultMethod(variantType); + } + protected abstract R defaultMethod(DataType dataType); } diff --git a/fluss-common/src/main/java/org/apache/fluss/types/DataTypeParser.java b/fluss-common/src/main/java/org/apache/fluss/types/DataTypeParser.java index 5068e02596..62d3b0c56d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/types/DataTypeParser.java +++ b/fluss-common/src/main/java/org/apache/fluss/types/DataTypeParser.java @@ -281,6 +281,7 @@ private enum Keyword { ROW, NOT, NULL, + VARIANT, } private enum UnsupportedKeyword { @@ -290,7 +291,6 @@ private enum UnsupportedKeyword { MULTISET, RAW, LEGACY, - VARIANT, BITMAP } @@ -514,6 +514,8 @@ private DataType parseTypeByKeyword() { return parseMapType(); case ROW: return parseRowType(); + case VARIANT: + return new VariantType(); default: throw parsingError("Unsupported type: " + token().value); } diff --git a/fluss-common/src/main/java/org/apache/fluss/types/DataTypeRoot.java b/fluss-common/src/main/java/org/apache/fluss/types/DataTypeRoot.java index f092e94661..951fd11351 100644 --- a/fluss-common/src/main/java/org/apache/fluss/types/DataTypeRoot.java +++ b/fluss-common/src/main/java/org/apache/fluss/types/DataTypeRoot.java @@ -103,7 +103,9 @@ public enum DataTypeRoot { MAP(DataTypeFamily.CONSTRUCTED, DataTypeFamily.EXTENSION), - ROW(DataTypeFamily.CONSTRUCTED); + ROW(DataTypeFamily.CONSTRUCTED), + + VARIANT(DataTypeFamily.PREDEFINED); private final Set families; diff --git a/fluss-common/src/main/java/org/apache/fluss/types/DataTypeVisitor.java b/fluss-common/src/main/java/org/apache/fluss/types/DataTypeVisitor.java index b3df5a12ba..b13a59e034 100644 --- a/fluss-common/src/main/java/org/apache/fluss/types/DataTypeVisitor.java +++ b/fluss-common/src/main/java/org/apache/fluss/types/DataTypeVisitor.java @@ -66,4 +66,6 @@ public interface DataTypeVisitor { R visit(MapType mapType); R visit(RowType rowType); + + R visit(VariantType variantType); } diff --git a/fluss-common/src/main/java/org/apache/fluss/types/DataTypes.java b/fluss-common/src/main/java/org/apache/fluss/types/DataTypes.java index 758b717e19..10b96aee8d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/types/DataTypes.java +++ b/fluss-common/src/main/java/org/apache/fluss/types/DataTypes.java @@ -375,4 +375,14 @@ public static RowType ROW(DataField... fields) { public static RowType ROW(DataType... fieldTypes) { return RowType.builder().fields(fieldTypes).build(); } + + /** + * Data type of a semi-structured variant value. A variant can represent any JSON-like value + * with an efficient binary encoding (value + metadata byte arrays). + * + * @see VariantType + */ + public static VariantType VARIANT() { + return new VariantType(); + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/types/VariantType.java b/fluss-common/src/main/java/org/apache/fluss/types/VariantType.java new file mode 100644 index 0000000000..ef29610556 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/types/VariantType.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.types; + +import org.apache.fluss.annotation.PublicStable; + +import java.util.Collections; +import java.util.List; + +/** + * Data type of a semi-structured variant value. A variant can represent any JSON-like value with an + * efficient binary encoding (value + metadata byte arrays). + * + *

Internally, a variant is stored as a {@code byte[]} containing the binary-encoded value and + * metadata. + * + * @since 0.7 + */ +@PublicStable +public class VariantType extends DataType { + private static final long serialVersionUID = 1L; + + private static final String FORMAT = "VARIANT"; + + public VariantType(boolean isNullable) { + super(isNullable, DataTypeRoot.VARIANT); + } + + public VariantType() { + this(true); + } + + @Override + public DataType copy(boolean isNullable) { + return new VariantType(isNullable); + } + + @Override + public String asSerializableString() { + return withNullability(FORMAT); + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(DataTypeVisitor visitor) { + return visitor.visit(this); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java b/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java index 4219d45ffd..8f815b716c 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java @@ -41,6 +41,7 @@ import org.apache.fluss.row.arrow.vectors.ArrowTinyIntColumnVector; import org.apache.fluss.row.arrow.vectors.ArrowVarBinaryColumnVector; import org.apache.fluss.row.arrow.vectors.ArrowVarCharColumnVector; +import org.apache.fluss.row.arrow.vectors.ArrowVariantColumnVector; import org.apache.fluss.row.arrow.writers.ArrowArrayWriter; import org.apache.fluss.row.arrow.writers.ArrowBigIntWriter; import org.apache.fluss.row.arrow.writers.ArrowBinaryWriter; @@ -60,6 +61,7 @@ import org.apache.fluss.row.arrow.writers.ArrowTinyIntWriter; import org.apache.fluss.row.arrow.writers.ArrowVarBinaryWriter; import org.apache.fluss.row.arrow.writers.ArrowVarCharWriter; +import org.apache.fluss.row.arrow.writers.ArrowVariantWriter; import org.apache.fluss.row.columnar.ColumnVector; import org.apache.fluss.row.columnar.VectorizedColumnBatch; import org.apache.fluss.shaded.arrow.com.google.flatbuffers.FlatBufferBuilder; @@ -132,6 +134,7 @@ import org.apache.fluss.types.TimeType; import org.apache.fluss.types.TimestampType; import org.apache.fluss.types.TinyIntType; +import org.apache.fluss.types.VariantType; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -352,6 +355,8 @@ public static ArrowFieldWriter createArrowFieldWriter(FieldVector vector, DataTy vector, ArrowUtils.createArrowFieldWriter(keyVector, mapType.getKeyType()), ArrowUtils.createArrowFieldWriter(valueVector, mapType.getValueType())); + } else if (vector instanceof StructVector && dataType instanceof VariantType) { + return new ArrowVariantWriter((StructVector) vector); } else if (vector instanceof StructVector && dataType instanceof RowType) { RowType rowType = (RowType) dataType; StructVector structVector = (StructVector) vector; @@ -416,6 +421,8 @@ public static ColumnVector createArrowColumnVector(ValueVector vector, DataType MapType mapType = (MapType) dataType; return new ArrowMapColumnVector( (FieldVector) vector, mapType.getKeyType(), mapType.getValueType()); + } else if (vector instanceof StructVector && dataType instanceof VariantType) { + return new ArrowVariantColumnVector((StructVector) vector); } else if (vector instanceof StructVector && dataType instanceof RowType) { RowType rowType = (RowType) dataType; StructVector structVector = (StructVector) vector; @@ -462,6 +469,14 @@ private static Field toArrowField(String fieldName, DataType logicalType) { Field structField = new Field(MapVector.DATA_VECTOR_NAME, structFieldType, structChildren); children = Collections.singletonList(structField); + } else if (logicalType instanceof VariantType) { + // VARIANT is stored as a STRUCT with two VARBINARY children: "value" and "metadata" + FieldType varBinaryFieldType = new FieldType(false, ArrowType.Binary.INSTANCE, null); + Field valueField = new Field("value", varBinaryFieldType, null); + Field metadataField = new Field("metadata", varBinaryFieldType, null); + children = new ArrayList<>(); + children.add(valueField); + children.add(metadataField); } return new Field(fieldName, fieldType, children); } @@ -592,6 +607,11 @@ public ArrowType visit(RowType rowType) { return ArrowType.Struct.INSTANCE; } + @Override + public ArrowType visit(VariantType variantType) { + return ArrowType.Struct.INSTANCE; + } + @Override protected ArrowType defaultMethod(DataType dataType) { throw new UnsupportedOperationException( diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/InternalRowUtils.java b/fluss-common/src/main/java/org/apache/fluss/utils/InternalRowUtils.java index bfa2a299bc..e3733262d8 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/InternalRowUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/InternalRowUtils.java @@ -34,6 +34,7 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.Variant; import org.apache.fluss.types.ArrayType; import org.apache.fluss.types.DataType; import org.apache.fluss.types.DataTypeRoot; @@ -184,6 +185,12 @@ public static int compare(Object x, Object y, DataTypeRoot type) { case BYTES: ret = byteArrayCompare((byte[]) x, (byte[]) y); break; + case VARIANT: + ret = + byteArrayCompare( + Variant.variantToBytes((Variant) x), + Variant.variantToBytes((Variant) y)); + break; case STRING: case CHAR: ret = ((BinaryString) x).compareTo((BinaryString) y); diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java index 9b6f4cd6f2..2716d70f78 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java @@ -26,6 +26,7 @@ import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.Variant; import org.apache.fluss.types.DataTypeRoot; import java.time.ZonedDateTime; @@ -177,6 +178,11 @@ public static String convertValueOfType(Object value, DataTypeRoot type) { byte[] bytesValue = (byte[]) value; stringPartitionKey = PartitionNameConverters.hexString(bytesValue); break; + case VARIANT: + Variant variantValue = (Variant) value; + stringPartitionKey = + PartitionNameConverters.hexString(Variant.variantToBytes(variantValue)); + break; case TINYINT: Byte tinyIntValue = (Byte) value; stringPartitionKey = tinyIntValue.toString(); diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/TypeUtils.java b/fluss-common/src/main/java/org/apache/fluss/utils/TypeUtils.java index 04283c9d97..6d771468bb 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/TypeUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/TypeUtils.java @@ -19,6 +19,7 @@ import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.Variant; import org.apache.fluss.types.DataType; import org.apache.fluss.types.DecimalType; import org.apache.fluss.types.LocalZonedTimestampType; @@ -41,6 +42,8 @@ public static Object castFromString(String s, DataType type) { case BINARY: case BYTES: return s.getBytes(StandardCharsets.UTF_8); + case VARIANT: + return Variant.bytesToVariant(s.getBytes(StandardCharsets.UTF_8)); case DECIMAL: DecimalType decimalType = (DecimalType) type; return Decimal.fromBigDecimal( diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/json/DataTypeJsonSerde.java b/fluss-common/src/main/java/org/apache/fluss/utils/json/DataTypeJsonSerde.java index 12e6fb1c54..6d8cffa207 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/json/DataTypeJsonSerde.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/json/DataTypeJsonSerde.java @@ -107,6 +107,7 @@ private static void serializeTypeWithGenericSerialization( case DATE: case BYTES: case STRING: + case VARIANT: // do nothing. break; case CHAR: diff --git a/fluss-common/src/test/java/org/apache/fluss/testutils/InternalArrayAssert.java b/fluss-common/src/test/java/org/apache/fluss/testutils/InternalArrayAssert.java index 6e4a7f4e1f..e5fafbb3bb 100644 --- a/fluss-common/src/test/java/org/apache/fluss/testutils/InternalArrayAssert.java +++ b/fluss-common/src/test/java/org/apache/fluss/testutils/InternalArrayAssert.java @@ -108,6 +108,15 @@ public InternalArrayAssert isEqualTo(InternalArray expected) { } } break; + case VARIANT: + for (int i = 0; i < actual.size(); i++) { + if (expected.isNullAt(i)) { + assertThat(actual.isNullAt(i)).isTrue(); + } else { + assertThat(actual.getVariant(i)).isEqualTo(expected.getVariant(i)); + } + } + break; case TINYINT: for (int i = 0; i < actual.size(); i++) { if (expected.isNullAt(i)) { @@ -245,6 +254,8 @@ static Object getValueAt(InternalArray array, int pos, DataType elementType) { return array.getBinary(pos, getLength(elementType)); case BYTES: return array.getBytes(pos); + case VARIANT: + return array.getVariant(pos); case TINYINT: return array.getByte(pos); case DECIMAL: diff --git a/fluss-common/src/test/java/org/apache/fluss/types/DataTypeVisitorTest.java b/fluss-common/src/test/java/org/apache/fluss/types/DataTypeVisitorTest.java index d7d12b3df5..29748fceb6 100644 --- a/fluss-common/src/test/java/org/apache/fluss/types/DataTypeVisitorTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/types/DataTypeVisitorTest.java @@ -46,7 +46,8 @@ void testDataTypeDefaultVisitor() { DataTypes.BYTES(), DataTypes.ARRAY(DataTypes.INT()), DataTypes.MAP(DataTypes.INT(), DataTypes.INT()), - DataTypes.ROW(DataTypes.FIELD("a", DataTypes.INT())) + DataTypes.ROW(DataTypes.FIELD("a", DataTypes.INT())), + DataTypes.VARIANT() }; String[] typeStrings = { @@ -68,7 +69,8 @@ void testDataTypeDefaultVisitor() { "BYTES", "ARRAY", "MAP", - "ROW<`a` INT>" + "ROW<`a` INT>", + "VARIANT" }; for (int i = 0; i < allTypes.length; i++) { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussArray.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussArray.java index 32d17cf844..194c8c3212 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussArray.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussArray.java @@ -24,6 +24,7 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.Variant; import org.apache.fluss.types.DataType; import org.apache.flink.table.data.ArrayData; @@ -161,6 +162,14 @@ public byte[] getBytes(int pos) { return flinkArray.getBinary(pos); } + @Override + public Variant getVariant(int pos) { + // TODO: Flink ArrayData has no native getVariant(); deserializing from byte[] as + // workaround. + // When Flink adds native VariantType, read value/metadata directly. + return Variant.bytesToVariant(flinkArray.getBinary(pos)); + } + @Override public InternalArray getArray(int pos) { return new FlinkAsFlussArray(flinkArray.getArray(pos)); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java index eba5abe9ba..057a3d1bd8 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java @@ -24,6 +24,7 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.Variant; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.RowData; @@ -139,6 +140,13 @@ public byte[] getBytes(int pos) { return flinkRow.getBinary(pos); } + @Override + public Variant getVariant(int pos) { + // TODO: Flink RowData has no native getVariant(); deserializing from byte[] as workaround. + // When Flink adds native VariantType, read value/metadata directly. + return Variant.bytesToVariant(flinkRow.getBinary(pos)); + } + @Override public InternalArray getArray(int pos) { return new FlinkAsFlussArray(flinkRow.getArray(pos)); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java index ac80f938c4..8bf4e09b14 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java @@ -294,6 +294,7 @@ private static Tuple2 calculateFixedSizeAndVariableColumnIndex( break; case STRING: case BYTES: + case VARIANT: variableSizeFields.add(i); break; } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java index 7661691596..7426085faf 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverter.java @@ -25,6 +25,7 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.Variant; import org.apache.fluss.types.ArrayType; import org.apache.fluss.types.DataType; import org.apache.fluss.types.MapType; @@ -129,6 +130,11 @@ static FlussDeserializationConverter createInternalConverter(DataType flussDataT case BYTES: case BINARY: return (flussField) -> flussField; + case VARIANT: + // TODO: Converts Variant to byte[] via variantToBytes() because Flink lacks + // native Variant support. When Flink adds VariantType, pass Variant's + // value/metadata directly instead of serializing to a single byte[]. + return (flussField) -> Variant.variantToBytes((Variant) flussField); case DECIMAL: return (flussField) -> { Decimal decimal = (Decimal) flussField; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToJsonConverters.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToJsonConverters.java index 680e697e18..15e548445a 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToJsonConverters.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussRowToJsonConverters.java @@ -21,6 +21,7 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.Variant; import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; @@ -81,6 +82,14 @@ private FlussRowToJsonConverter createNotNullConverter(DataType type) { case BYTES: return ((mapper, reuse, value) -> mapper.getNodeFactory().binaryNode((byte[]) value)); + case VARIANT: + // TODO: Serializes Variant as opaque binary JSON node. Consider producing + // structured JSON (e.g., {"value": ..., "metadata": ...}) or leveraging + // Variant's binary encoding to output the actual JSON representation + // once a Variant-to-JSON decoder is available. + return ((mapper, reuse, value) -> + mapper.getNodeFactory() + .binaryNode(Variant.variantToBytes((Variant) value))); case DECIMAL: return createDecimalConverter(); case TINYINT: diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussTypeToFlinkType.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussTypeToFlinkType.java index a3b527966d..6c8705fd82 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussTypeToFlinkType.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlussTypeToFlinkType.java @@ -38,6 +38,7 @@ import org.apache.fluss.types.TimeType; import org.apache.fluss.types.TimestampType; import org.apache.fluss.types.TinyIntType; +import org.apache.fluss.types.VariantType; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.DataType; @@ -168,6 +169,15 @@ public DataType visit(RowType rowType) { return withNullability(DataTypes.ROW(dataFields), rowType.isNullable()); } + // TODO: Currently maps Fluss VARIANT to Flink BYTES as a workaround because Flink's type + // system does not have a native VARIANT type yet. Once Flink introduces native VariantType + // support, we should map VARIANT directly to Flink's native VariantType for structured + // value/metadata access instead of opaque byte[] serialization. + @Override + public DataType visit(VariantType variantType) { + return withNullability(DataTypes.BYTES(), variantType.isNullable()); + } + private DataType withNullability(DataType flinkType, boolean nullable) { if (flinkType.getLogicalType().isNullable() != nullable) { return nullable ? flinkType.nullable() : flinkType.notNull(); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PojoToRowConverter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PojoToRowConverter.java index 81f53b183f..48e1db37a1 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PojoToRowConverter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PojoToRowConverter.java @@ -23,6 +23,7 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.Variant; import org.apache.fluss.types.DataType; import org.apache.fluss.types.DataTypeRoot; import org.apache.fluss.types.DecimalType; @@ -89,6 +90,7 @@ public class PojoToRowConverter { DataTypeRoot.STRING, orderedSet(String.class, Character.class, char.class)); SUPPORTED_TYPES.put(DataTypeRoot.BINARY, orderedSet(byte[].class)); SUPPORTED_TYPES.put(DataTypeRoot.BYTES, orderedSet(byte[].class)); + SUPPORTED_TYPES.put(DataTypeRoot.VARIANT, orderedSet(Variant.class, byte[].class)); SUPPORTED_TYPES.put(DataTypeRoot.DECIMAL, orderedSet(BigDecimal.class)); SUPPORTED_TYPES.put(DataTypeRoot.DATE, orderedSet(LocalDate.class)); SUPPORTED_TYPES.put(DataTypeRoot.TIME_WITHOUT_TIME_ZONE, orderedSet(LocalTime.class)); @@ -207,6 +209,23 @@ private FieldConverter createConverterForField(DataType fieldType, Field field) case BINARY: case BYTES: return field::get; + case VARIANT: + return obj -> { + Object value = field.get(obj); + if (value == null) { + return null; + } + if (value instanceof Variant) { + return value; + } + if (value instanceof byte[]) { + return Variant.bytesToVariant((byte[]) value); + } + LOG.warn( + "Field {} is not a byte[] or Variant. Cannot convert to Variant.", + field.getName()); + return null; + }; case CHAR: case STRING: return obj -> { diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java index fb8f819574..32aafe848c 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java @@ -208,4 +208,9 @@ public Type visit(RowType rowType) { return Types.StructType.of(fields); } + + @Override + public Type visit(org.apache.fluss.types.VariantType variantType) { + return Types.BinaryType.get(); + } } diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java index 1b83e8f2b4..44b72447af 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java @@ -25,6 +25,7 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.Variant; import org.apache.fluss.utils.BytesUtils; import java.math.BigDecimal; @@ -141,6 +142,14 @@ public byte[] getBytes(int pos) { return BytesUtils.toArray(byteBuffer); } + @Override + public Variant getVariant(int pos) { + // TODO: Iceberg does not have native Variant support yet; deserializing from byte[] + // as workaround. When Iceberg adds native VariantType, read value/metadata directly. + ByteBuffer byteBuffer = (ByteBuffer) icebergList.get(pos); + return Variant.bytesToVariant(BytesUtils.toArray(byteBuffer)); + } + @Override public InternalArray getArray(int pos) { List nestedList = (List) icebergList.get(pos); diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java index be990f7c64..12cd5887a7 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java @@ -25,6 +25,7 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.Variant; import org.apache.fluss.utils.BytesUtils; import org.apache.iceberg.data.Record; @@ -166,6 +167,15 @@ public byte[] getBytes(int pos) { return BytesUtils.toArray(byteBuffer); } + @Override + public Variant getVariant(int pos) { + // TODO: Iceberg does not have native Variant support yet; deserializing from byte[] + // as workaround. When Iceberg adds native VariantType, read value/metadata directly. + Object value = icebergRecord.get(pos); + ByteBuffer byteBuffer = (ByteBuffer) value; + return Variant.bytesToVariant(BytesUtils.toArray(byteBuffer)); + } + @Override public InternalArray getArray(int pos) { Object value = icebergRecord.get(pos); diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ShadedArrowBatchWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ShadedArrowBatchWriter.java index 21b2aa1ec9..5ab4ca5470 100644 --- a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ShadedArrowBatchWriter.java +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ShadedArrowBatchWriter.java @@ -25,6 +25,7 @@ import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector; import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot; import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector; +import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.StructVector; import org.apache.fluss.types.RowType; import org.apache.fluss.utils.ArrowUtils; @@ -107,6 +108,12 @@ private void initFieldVector(FieldVector fieldVector) { if (dataVector != null) { initFieldVector(dataVector); } + } else if (fieldVector instanceof StructVector) { + StructVector structVector = (StructVector) fieldVector; + structVector.allocateNew(); + for (FieldVector childVector : structVector.getChildrenFromFields()) { + initFieldVector(childVector); + } } else { fieldVector.allocateNew(); } diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java index 0694b7fcbb..c0a43a56b3 100644 --- a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java @@ -23,6 +23,7 @@ import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.complex.FixedSizeListVector; import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.StructVector; import org.apache.arrow.vector.types.pojo.Schema; import java.nio.ByteBuffer; @@ -76,6 +77,18 @@ private static void copyVectorData( org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector shadedVector, FieldVector nonShadedVector) { + if (shadedVector + instanceof + org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.StructVector) { + if (nonShadedVector instanceof StructVector) { + copyStructVectorData( + (org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.StructVector) + shadedVector, + (StructVector) nonShadedVector); + return; + } + } + if (shadedVector instanceof org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector) { @@ -117,6 +130,43 @@ private static void copyVectorData( } } + private static void copyStructVectorData( + org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.StructVector + shadedStructVector, + StructVector nonShadedStructVector) { + + int valueCount = shadedStructVector.getValueCount(); + + // Recursively copy each child vector + List shadedChildren = + shadedStructVector.getChildrenFromFields(); + List nonShadedChildren = nonShadedStructVector.getChildrenFromFields(); + for (int i = 0; i < Math.min(shadedChildren.size(), nonShadedChildren.size()); i++) { + copyVectorData(shadedChildren.get(i), nonShadedChildren.get(i)); + } + + // Copy the StructVector's own validity buffer + List shadedBuffers = + shadedStructVector.getFieldBuffers(); + List nonShadedBuffers = nonShadedStructVector.getFieldBuffers(); + + for (int i = 0; i < Math.min(shadedBuffers.size(), nonShadedBuffers.size()); i++) { + org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf shadedBuf = + shadedBuffers.get(i); + ArrowBuf nonShadedBuf = nonShadedBuffers.get(i); + + long size = Math.min(shadedBuf.capacity(), nonShadedBuf.capacity()); + if (size > 0) { + ByteBuffer srcBuffer = shadedBuf.nioBuffer(0, (int) size); + srcBuffer.position(0); + srcBuffer.limit((int) Math.min(size, Integer.MAX_VALUE)); + nonShadedBuf.setBytes(0, srcBuffer); + } + } + + nonShadedStructVector.setValueCount(valueCount); + } + private static void copyListVectorData( org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector shadedListVector, diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java index ed85b1a8c9..ccf1b56984 100644 --- a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java @@ -36,6 +36,7 @@ import org.apache.fluss.types.TimeType; import org.apache.fluss.types.TimestampType; import org.apache.fluss.types.TinyIntType; +import org.apache.fluss.types.VariantType; import org.apache.arrow.vector.types.DateUnit; import org.apache.arrow.vector.types.FloatingPointPrecision; @@ -118,7 +119,15 @@ private static Field toArrowField( } FieldType fieldType = new FieldType(logicalType.isNullable(), arrowType, null); List children = null; - if (logicalType instanceof ArrayType) { + if (logicalType instanceof VariantType) { + // VARIANT is stored as a STRUCT with two VARBINARY children: "value" and "metadata" + FieldType varBinaryFieldType = new FieldType(false, ArrowType.Binary.INSTANCE, null); + Field valueField = new Field("value", varBinaryFieldType, null); + Field metadataField = new Field("metadata", varBinaryFieldType, null); + children = new java.util.ArrayList<>(); + children.add(valueField); + children.add(metadataField); + } else if (logicalType instanceof ArrayType) { children = Collections.singletonList( toArrowField( @@ -151,6 +160,8 @@ private static ArrowType toArrowType(DataType dataType) { return new ArrowType.FixedSizeBinary(binaryType.getLength()); } else if (dataType instanceof BytesType) { return ArrowType.Binary.INSTANCE; + } else if (dataType instanceof VariantType) { + return ArrowType.Struct.INSTANCE; } else if (dataType instanceof DecimalType) { DecimalType decimalType = (DecimalType) dataType; return ArrowType.Decimal.createDecimal( diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/FlussDataTypeToPaimonDataType.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/FlussDataTypeToPaimonDataType.java index e97b4f67a6..a8f33557f8 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/FlussDataTypeToPaimonDataType.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/FlussDataTypeToPaimonDataType.java @@ -38,6 +38,7 @@ import org.apache.fluss.types.TimeType; import org.apache.fluss.types.TimestampType; import org.apache.fluss.types.TinyIntType; +import org.apache.fluss.types.VariantType; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; @@ -160,6 +161,11 @@ public DataType visit(RowType rowType) { return withNullability(rowTypeBuilder.build(), rowType.isNullable()); } + @Override + public DataType visit(VariantType variantType) { + return withNullability(new org.apache.paimon.types.VariantType(), variantType.isNullable()); + } + private DataType withNullability(DataType paimon, boolean nullable) { if (paimon.isNullable() != nullable) { return nullable ? paimon.nullable() : paimon.notNull(); diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonArrayAsFlussArray.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonArrayAsFlussArray.java index b5df153d74..845bb5c96c 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonArrayAsFlussArray.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonArrayAsFlussArray.java @@ -24,6 +24,7 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.Variant; import org.apache.paimon.data.Timestamp; @@ -134,6 +135,14 @@ public byte[] getBytes(int pos) { return paimonArray.getBinary(pos); } + @Override + public Variant getVariant(int pos) { + // TODO: Paimon has native Variant support (paimonArray.getVariant(pos)), but the current + // Paimon dependency may not include it yet. Once upgraded, convert Paimon's Variant + // (value/metadata) directly to Fluss Variant instead of round-tripping through byte[]. + return Variant.bytesToVariant(paimonArray.getBinary(pos)); + } + @Override public InternalArray getArray(int pos) { return new PaimonArrayAsFlussArray(paimonArray.getArray(pos)); diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRow.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRow.java index fe956561a6..b290e44681 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRow.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRow.java @@ -24,6 +24,7 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.Variant; import org.apache.paimon.data.Timestamp; @@ -142,6 +143,14 @@ public byte[] getBytes(int pos) { return paimonRow.getBinary(pos); } + @Override + public Variant getVariant(int pos) { + // TODO: Paimon has native Variant support (paimonRow.getVariant(pos)), but the current + // Paimon dependency may not include it yet. Once upgraded, convert Paimon's Variant + // (value/metadata) directly to Fluss Variant instead of round-tripping through byte[]. + return Variant.bytesToVariant(paimonRow.getBinary(pos)); + } + @Override public InternalArray getArray(int pos) { return new PaimonArrayAsFlussArray(paimonRow.getArray(pos)); diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussArray.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussArray.scala index 17d45ec028..114f9c469a 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussArray.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussArray.scala @@ -17,7 +17,7 @@ package org.apache.fluss.spark.row -import org.apache.fluss.row.{BinaryString, Decimal, InternalArray => FlussInternalArray, InternalMap, InternalRow => FlussInternalRow, TimestampLtz, TimestampNtz} +import org.apache.fluss.row.{BinaryString, Decimal, InternalArray => FlussInternalArray, InternalMap, InternalRow => FlussInternalRow, TimestampLtz, TimestampNtz, Variant} import org.apache.spark.sql.catalyst.util.{ArrayData => SparkArrayData} import org.apache.spark.sql.types.{ArrayType => SparkArrayType, DataType => SparkDataType, StructType} @@ -120,6 +120,11 @@ class SparkAsFlussArray(arrayData: SparkArrayData, elementType: SparkDataType) /** Returns the binary value at the given position. */ override def getBytes(pos: Int): Array[Byte] = arrayData.getBinary(pos) + // TODO: Spark ArrayData has no native getVariant() in Spark 3.x; deserializing from byte[] + // as workaround. When adopting Spark 4 with native VariantType, use arrayData.getVariant() directly. + /** Returns the variant value at the given position. */ + override def getVariant(pos: Int): Variant = Variant.bytesToVariant(arrayData.getBinary(pos)) + /** Returns the array value at the given position. */ override def getArray(pos: Int) = new SparkAsFlussArray( arrayData.getArray(pos), diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussRow.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussRow.scala index 3a5c9613c2..88d46604c9 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussRow.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussRow.scala @@ -17,7 +17,7 @@ package org.apache.fluss.spark.row -import org.apache.fluss.row.{BinaryString, Decimal, InternalMap, InternalRow => FlussInternalRow, TimestampLtz, TimestampNtz} +import org.apache.fluss.row.{BinaryString, Decimal, InternalMap, InternalRow => FlussInternalRow, TimestampLtz, TimestampNtz, Variant} import org.apache.spark.sql.catalyst.{InternalRow => SparkInternalRow} import org.apache.spark.sql.types.StructType @@ -116,6 +116,11 @@ class SparkAsFlussRow(schema: StructType) extends FlussInternalRow with Serializ /** Returns the binary value at the given position. */ override def getBytes(pos: Int): Array[Byte] = row.getBinary(pos) + // TODO: Spark InternalRow has no native getVariant() in Spark 3.x; deserializing from byte[] + // as workaround. When adopting Spark 4 with native VariantType, use row.getVariant() directly. + /** Returns the variant value at the given position. */ + override def getVariant(pos: Int): Variant = Variant.bytesToVariant(row.getBinary(pos)) + /** Returns the array value at the given position. */ override def getArray(pos: Int) = new SparkAsFlussArray(row.getArray(pos), schema.fields(pos).dataType) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/FlussToSparkTypeVisitor.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/FlussToSparkTypeVisitor.scala index f9df1e9c06..d0ac102058 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/FlussToSparkTypeVisitor.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/types/FlussToSparkTypeVisitor.scala @@ -113,4 +113,13 @@ object FlussToSparkTypeVisitor extends DataTypeVisitor[SparkDataType] { } SparkDataTypes.createStructType(sparkFields.toArray) } + + // TODO: Currently maps Fluss VARIANT to Spark BinaryType as a temporary workaround. + // Once Spark 4 introduces native VariantType support, we should adopt a shim mechanism + // (similar to Paimon's SparkShim) to map VARIANT to Spark's native VariantType on Spark 4+, + // while keeping the BinaryType fallback for Spark 3.x. + // See: Paimon's SparkShimLoader / Spark4Shim for reference implementation. + override def visit(variantType: VariantType): SparkDataType = { + SparkDataTypes.BinaryType + } } diff --git a/website/docs/_configs/_partial_config.mdx b/website/docs/_configs/_partial_config.mdx index 7bb8e0a663..2080b2b9b8 100644 --- a/website/docs/_configs/_partial_config.mdx +++ b/website/docs/_configs/_partial_config.mdx @@ -78,7 +78,7 @@ | `client.lookup.batch-timeout` | `0 s` | Duration | The maximum time to wait for the lookup batch to full, if this timeout is reached, the lookup batch will be closed to send. | | `client.lookup.max-retries` | `2147483647` | Integer | Setting a value greater than zero will cause the client to resend any lookup request that fails with a potentially transient error. | | `client.scanner.remote-log.prefetch-num` | `4` | Integer | The number of remote log segments to keep in local temp file for LogScanner, which download from remote storage. The default setting is 4. | -| `client.scanner.io.tmpdir` | `/var/folders/bp/v2l48kz51mx86d743qv0zhzh0000gn/T//fluss` | String | Local directory that is used by client for storing the data files (like kv snapshot, log segment files) to read temporarily | +| `client.scanner.io.tmpdir` | `/var/folders/7r/lwdsh9ms4gn0fnxs8c6fcfpm0000gn/T//fluss` | String | Local directory that is used by client for storing the data files (like kv snapshot, log segment files) to read temporarily | | `client.remote-file.download-thread-num` | `3` | Integer | The number of threads the client uses to download remote files. | | `client.filesystem.security.token.renewal.backoff` | `1 hours` | Duration | The time period how long to wait before retrying to obtain new security tokens for filesystem after a failure. | | `client.filesystem.security.token.renewal.time-ratio` | `0.75` | Double | Ratio of the token's expiration time when new credentials for access filesystem should be re-obtained. | @@ -238,9 +238,13 @@ | Key | Default | Type | Description | | :--- | :--- | :--- | :--- | -| `remote.data.dir` | `none` | String | The directory used for storing the kv snapshot data files and remote log for log tiered storage in a Fluss supported filesystem. | +| `remote.data.dir` | `none` | String | The directory used for storing the kv snapshot data files and remote log for log tiered storage in a Fluss supported filesystem. When upgrading to `remote.data.dirs`, please ensure this value is placed as the first entry in the new configuration.For new clusters, it is recommended to use `remote.data.dirs` instead. If `remote.data.dirs` is configured, this value will be ignored. | +| `remote.data.dirs` | `[]` | ArrayList | A comma-separated list of directories in Fluss supported filesystems for storing the kv snapshot data files and remote log files of tables/partitions. If configured, when a new table or a new partition is created, one of the directories from this list will be selected according to the strategy specified by `remote.data.dirs.strategy` (`ROUND_ROBIN` by default). If not configured, the system uses `remote.data.dir` as the sole remote data directory for all data. | +| `remote.data.dirs.strategy` | `ROUND_ROBIN` | RemoteDataDirStrategy | The strategy for selecting the remote data directory from `remote.data.dirs`. The candidate strategies are: [ROUND_ROBIN, WEIGHTED_ROUND_ROBIN], the default strategy is ROUND_ROBIN. ROUND_ROBIN: this strategy employs a round-robin approach to select one from the available remote directories. WEIGHTED_ROUND_ROBIN: this strategy selects one of the available remote directories based on the weights configured in `remote.data.dirs.weights`. | +| `remote.data.dirs.weights` | `[]` | ArrayList | The weights of the remote data directories. This is a list of weights corresponding to the `remote.data.dirs` in the same order. When `remote.data.dirs.strategy` is set to `WEIGHTED_ROUND_ROBIN`, this must be configured, and its size must be equal to `remote.data.dirs`; otherwise, it will be ignored. | | `remote.fs.write-buffer-size` | `4 kb` | MemorySize | The default size of the write buffer for writing the local files to remote file systems. | | `remote.log.task-interval-duration` | `1 min` | Duration | Interval at which remote log manager runs the scheduled tasks like copy segments, clean up remote log segments, delete local log segments etc. If the value is set to 0, it means that the remote log storage is disabled. | +| `remote.log.task-max-upload-segments` | `5` | Integer | The maximum number of log segments to upload to remote storage per tiering task execution. This limits the upload batch size to prevent overwhelming the remote storage when there is a large backlog of segments to upload. | | `remote.log.index-file-cache-size` | `1 gb` | MemorySize | The total size of the space allocated to store index files fetched from remote storage in the local storage. | | `remote.log-manager.thread-pool-size` | `4` | Integer | Size of the thread pool used in scheduling tasks to copy segments, fetch remote log indexes and clean up remote log segments. | | `remote.log.data-transfer-thread-num` | `4` | Integer | The number of threads the server uses to transfer (download and upload) remote log file can be data file, index file and remote log metadata file. This option is deprecated. Please use server.io-pool.size instead. | @@ -284,7 +288,7 @@ | `table.auto-partition.enabled` | `false` | Boolean | Whether enable auto partition for the table. Disable by default. When auto partition is enabled, the partitions of the table will be created automatically. | | `table.auto-partition.key` | `none` | String | This configuration defines the time-based partition key to be used for auto-partitioning when a table is partitioned with multiple keys. Auto-partitioning utilizes a time-based partition key to handle partitions automatically, including creating new ones and removing outdated ones, by comparing the time value of the partition with the current system time. In the case of a table using multiple partition keys (such as a composite partitioning strategy), this feature determines which key should serve as the primary time dimension for making auto-partitioning decisions.And If the table has only one partition key, this config is not necessary. Otherwise, it must be specified. | | `table.auto-partition.time-unit` | `DAY` | AutoPartitionTimeUnit | The time granularity for auto created partitions. The default value is `DAY`. Valid values are `HOUR`, `DAY`, `MONTH`, `QUARTER`, `YEAR`. If the value is `HOUR`, the partition format for auto created is yyyyMMddHH. If the value is `DAY`, the partition format for auto created is yyyyMMdd. If the value is `MONTH`, the partition format for auto created is yyyyMM. If the value is `QUARTER`, the partition format for auto created is yyyyQ. If the value is `YEAR`, the partition format for auto created is yyyy. | -| `table.auto-partition.time-zone` | `Europe/Paris` | String | The time zone for auto partitions, which is by default the same as the system time zone. | +| `table.auto-partition.time-zone` | `Asia/Shanghai` | String | The time zone for auto partitions, which is by default the same as the system time zone. | | `table.auto-partition.num-precreate` | `2` | Integer | The number of partitions to pre-create for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11 and the value is configured as 3, then partitions 20241111, 20241112, 20241113 will be pre-created. If any one partition exists, it'll skip creating the partition. The default value is 2, which means 2 partitions will be pre-created. If the `table.auto-partition.time-unit` is `DAY`(default), one precreated partition is for today and another one is for tomorrow.For a partition table with multiple partition keys, pre-create is unsupported and will be set to 0 automatically when creating table if it is not explicitly specified. | | `table.auto-partition.num-retention` | `7` | Integer | The number of history partitions to retain for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11, time-unit is DAY, and the value is configured as 3, then the history partitions 20241108, 20241109, 20241110 will be retained. The partitions earlier than 20241108 will be deleted. The default value is 7. | | `table.log.ttl` | `168 hours` | Duration | The time to live for log segments. The configuration controls the maximum time we will retain a log before we will delete old segments to free up space. If set to -1, the log will not be deleted. |