Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.fluss.client.converter;

import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.Variant;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.DataTypeChecks;
import org.apache.fluss.types.DecimalType;
Expand Down Expand Up @@ -119,6 +120,8 @@ private static FieldToRow createFieldConverter(PojoType.Property prop, DataType
case BINARY:
case BYTES:
return prop::read;
case VARIANT:
return (obj) -> convertVariantValue(prop, prop.read(obj));
case CHAR:
case STRING:
return (obj) ->
Expand Down Expand Up @@ -165,6 +168,28 @@ private static FieldToRow createFieldConverter(PojoType.Property prop, DataType
}
}

/**
* Converts a byte array value from a POJO property to a Variant. If the value is already a
* Variant, it is returned directly. If it is a byte[], it is interpreted as the combined format
* [4-byte value length (big-endian)][value][metadata] and converted to a Variant.
*/
private static @Nullable Variant convertVariantValue(
PojoType.Property prop, @Nullable Object v) {
if (v == null) {
return null;
}
if (v instanceof Variant) {
return (Variant) v;
}
if (v instanceof byte[]) {
return Variant.bytesToVariant((byte[]) v);
}
throw new IllegalArgumentException(
String.format(
"Field %s is not a byte[] or Variant. Cannot convert to Variant.",
prop.name));
}

private interface FieldToRow {
Object readAndConvert(Object pojo) throws Exception;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.fluss.row.InternalArray;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.Variant;
import org.apache.fluss.types.ArrayType;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.DataTypeChecks;
Expand Down Expand Up @@ -140,6 +141,8 @@ private static RowToField createRowReader(DataType fieldType, PojoType.Property
case BINARY:
case BYTES:
return InternalRow::getBytes;
case VARIANT:
return (row, pos) -> Variant.variantToBytes(row.getVariant(pos));
case DECIMAL:
DecimalType decimalType = (DecimalType) fieldType;
return (row, pos) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,22 @@ public void writeRow(int pos, InternalRow value, RowSerializer serializer) {
pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
}

@Override
public void writeVariant(int pos, Variant variant) {
byte[] value = variant.value();
byte[] metadata = variant.metadata();
int totalSize = 4 + value.length + metadata.length;
byte[] combined = new byte[totalSize];
// Write value length as big-endian 4-byte integer
combined[0] = (byte) ((value.length >> 24) & 0xFF);
combined[1] = (byte) ((value.length >> 16) & 0xFF);
combined[2] = (byte) ((value.length >> 8) & 0xFF);
combined[3] = (byte) (value.length & 0xFF);
System.arraycopy(value, 0, combined, 4, value.length);
System.arraycopy(metadata, 0, combined, 4 + value.length, metadata.length);
writeBytes(pos, combined);
}

@Override
public void writeChar(int pos, BinaryString value, int length) {
// TODO: currently, we encoding CHAR(length) as the same with STRING, the length info can
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.lang.reflect.Array;

import static org.apache.fluss.memory.MemoryUtils.UNSAFE;
import static org.apache.fluss.row.Variant.bytesToVariant;
import static org.apache.fluss.utils.Preconditions.checkArgument;

/**
Expand Down Expand Up @@ -82,6 +83,7 @@ public static int calculateFixLengthPartSize(DataType type) {
case STRING:
case BINARY:
case BYTES:
case VARIANT:
case DECIMAL:
case BIGINT:
case DOUBLE:
Expand Down Expand Up @@ -233,6 +235,12 @@ public byte[] getBytes(int pos) {
return BinarySegmentUtils.readBinary(segments, offset, fieldOffset, offsetAndSize);
}

@Override
public Variant getVariant(int pos) {
byte[] bytes = getBytes(pos);
return bytesToVariant(bytes);
}

@Override
public TimestampNtz getTimestampNtz(int pos, int precision) {
assertIndexIsValid(pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ public static NullSetter createNullSetter(DataType elementType) {
case STRING:
case BINARY:
case BYTES:
case VARIANT:
case DECIMAL:
case BIGINT:
case TIMESTAMP_WITHOUT_TIME_ZONE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public interface BinaryWriter {

void writeRow(int pos, InternalRow value, RowSerializer serializer);

void writeVariant(int pos, Variant value);

/** Finally, complete write to set real size to binary. */
void complete();

Expand Down Expand Up @@ -135,6 +137,8 @@ static BinaryWriter.ValueWriter createNotNullValueWriter(
writer.writeBinary(pos, (byte[]) value, binaryLength);
case BYTES:
return (writer, pos, value) -> writer.writeBytes(pos, (byte[]) value);
case VARIANT:
return (writer, pos, value) -> writer.writeVariant(pos, (Variant) value);
case DECIMAL:
final int decimalPrecision = getPrecision(elementType);
return (writer, pos, value) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ public interface DataGetters {
/** Returns the binary value at the given position. */
byte[] getBytes(int pos);

/** Returns the variant value at the given position. */
Variant getVariant(int pos);

/** Returns the array value at the given position. */
InternalArray getArray(int pos);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ public byte[] getBytes(int pos) {
return (byte[]) getObject(pos);
}

@Override
public Variant getVariant(int pos) {
return (Variant) getObject(pos);
}

@Override
public InternalArray getArray(int pos) {
return (InternalArray) getObject(pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ public byte[] getBytes(int pos) {
return (byte[]) this.fields[pos];
}

@Override
public Variant getVariant(int pos) {
return (Variant) this.fields[pos];
}

@Override
public InternalArray getArray(int pos) {
return (InternalArray) this.fields[pos];
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.row;

import org.apache.fluss.annotation.PublicEvolving;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;

/**
* An internal data structure implementing {@link Variant}.
*
* <p>A Variant consists of two byte arrays:
*
* <ul>
* <li><b>value</b>: the binary-encoded variant value (header + data), supports nested JSON
* structures.
* <li><b>metadata</b>: the string dictionary (version number + deduplicated list of all object
* key names).
* </ul>
*
* @since 0.9
*/
@PublicEvolving
public final class GenericVariant implements Variant, Serializable {

private static final long serialVersionUID = 1L;

private final byte[] value;
private final byte[] metadata;

public GenericVariant(byte[] value, byte[] metadata) {
this.value = value;
this.metadata = metadata;
}

@Override
public byte[] value() {
return value;
}

@Override
public byte[] metadata() {
return metadata;
}

@Override
public long sizeInBytes() {
return value.length + metadata.length;
}

@Override
public Variant copy() {
return new GenericVariant(
Arrays.copyOf(value, value.length), Arrays.copyOf(metadata, metadata.length));
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
GenericVariant that = (GenericVariant) o;
return Objects.deepEquals(value, that.value) && Objects.deepEquals(metadata, that.metadata);
}

@Override
public int hashCode() {
return Objects.hash(Arrays.hashCode(value), Arrays.hashCode(metadata));
}

@Override
public String toString() {
return "GenericVariant{value="
+ Arrays.toString(value)
+ ", metadata="
+ Arrays.toString(metadata)
+ "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ static ElementGetter createElementGetter(DataType fieldType) {
case BYTES:
elementGetter = InternalArray::getBytes;
break;
case VARIANT:
elementGetter = InternalArray::getVariant;
break;
case DECIMAL:
final int decimalPrecision = getPrecision(fieldType);
final int decimalScale = getScale(fieldType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ static Class<?> getDataClass(DataType type) {
return TimestampNtz.class;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return TimestampLtz.class;
case VARIANT:
return Variant.class;
case ARRAY:
return InternalArray.class;
case MAP:
Expand Down Expand Up @@ -181,6 +183,9 @@ static FieldGetter createFieldGetter(DataType fieldType, int fieldPos) {
case BYTES:
fieldGetter = row -> row.getBytes(fieldPos);
break;
case VARIANT:
fieldGetter = row -> row.getVariant(fieldPos);
break;
case DECIMAL:
final int decimalPrecision = getPrecision(fieldType);
final int decimalScale = getScale(fieldType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ public byte[] getBytes(int pos) {
return row.getBytes(pos);
}

@Override
public Variant getVariant(int pos) {
return row.getVariant(pos);
}

@Override
public InternalArray getArray(int pos) {
return row.getArray(pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ public byte[] getBytes(int pos) {
return row.getBytes(indexMapping[pos]);
}

@Override
public Variant getVariant(int pos) {
return row.getVariant(indexMapping[pos]);
}

@Override
public InternalArray getArray(int pos) {
return row.getArray(indexMapping[pos]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public interface SequentialBinaryWriter extends BinaryWriter {

void writeRow(InternalRow value, RowSerializer serializer);

void writeVariant(Variant value);

/** Finally, complete write to set real size to binary. */
void complete();

Expand Down Expand Up @@ -168,4 +170,9 @@ default void writeMap(int pos, InternalMap value, MapSerializer serializer) {
default void writeRow(int pos, InternalRow value, RowSerializer serializer) {
writeRow(value, serializer);
}

@Override
default void writeVariant(int pos, Variant value) {
writeVariant(value);
}
}
Loading
Loading