Skip to content
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@
<td>String</td>
<td>Specifies column names that should be stored as blob type. This is used when you want to treat a BYTES column as a BLOB.</td>
</tr>
<tr>
<td><h5>blob-ref-field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specifies column names that should be stored as blob reference type. This is used when you want to treat a BYTES column as a BLOB_REF.</td>
</tr>
<tr>
<td><h5>blob.split-by-file-size</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
25 changes: 24 additions & 1 deletion paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2254,6 +2254,14 @@ public InlineElement getDescription() {
"Specifies column names that should be stored as blob type. "
+ "This is used when you want to treat a BYTES column as a BLOB.");

public static final ConfigOption<String> BLOB_REF_FIELD =
key("blob-ref-field")
.stringType()
.noDefaultValue()
.withDescription(
"Specifies column names that should be stored as blob reference type. "
+ "This is used when you want to treat a BYTES column as a BLOB_REF.");

@Immutable
public static final ConfigOption<String> BLOB_DESCRIPTOR_FIELD =
key("blob-descriptor-field")
Expand Down Expand Up @@ -2935,7 +2943,13 @@ public Set<String> blobExternalStorageField() {
* subset of descriptor fields and therefore are also updatable.
*/
public Set<String> updatableBlobFields() {
return blobDescriptorField();
Set<String> fields = new HashSet<>(blobDescriptorField());
fields.addAll(blobRefField());
return fields;
}

public Set<String> blobRefField() {
return parseCommaSeparatedSet(BLOB_REF_FIELD);
}

/**
Expand Down Expand Up @@ -3274,6 +3288,15 @@ public static List<String> blobField(Map<String, String> options) {
return Arrays.stream(string.split(",")).map(String::trim).collect(Collectors.toList());
}

public static List<String> blobRefField(Map<String, String> options) {
String string = options.get(BLOB_REF_FIELD.key());
if (string == null) {
return Collections.emptyList();
}

return Arrays.stream(string.split(",")).map(String::trim).collect(Collectors.toList());
}

public boolean sequenceFieldSortOrderIsAscending() {
return options.get(SEQUENCE_FIELD_SORT_ORDER) == SortOrder.ASCENDING;
}
Expand Down
65 changes: 65 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/types/BlobRefType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.types;

import org.apache.paimon.annotation.Public;

/**
* Data type of blob reference.
*
* <p>{@link BlobRefType} stores reference bytes inline in data files instead of writing payloads to
* Paimon-managed {@code .blob} files.
*
* @since 1.5.0
*/
@Public
public final class BlobRefType extends DataType {

private static final long serialVersionUID = 1L;

private static final String FORMAT = "BLOB_REF";

public BlobRefType(boolean isNullable) {
super(isNullable, DataTypeRoot.BLOB_REF);
}

public BlobRefType() {
this(true);
}

@Override
public int defaultSize() {
return BlobType.DEFAULT_SIZE;
}

@Override
public DataType copy(boolean isNullable) {
return new BlobRefType(isNullable);
}

@Override
public String asSQLString() {
return withNullability(FORMAT);
}

@Override
public <R> R accept(DataTypeVisitor<R> visitor) {
return visitor.visit(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ public R visit(BlobType blobType) {
return defaultMethod(blobType);
}

@Override
public R visit(BlobRefType blobRefType) {
return defaultMethod(blobRefType);
}

@Override
public R visit(ArrayType arrayType) {
return defaultMethod(arrayType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ private enum Keyword {
LEGACY,
VARIANT,
BLOB,
BLOB_REF,
NOT
}

Expand Down Expand Up @@ -549,6 +550,8 @@ private DataType parseTypeByKeyword() {
return new VariantType();
case BLOB:
return new BlobType();
case BLOB_REF:
return new BlobRefType();
case VECTOR:
return parseVectorType();
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public enum DataTypeRoot {

BLOB(DataTypeFamily.PREDEFINED),

BLOB_REF(DataTypeFamily.PREDEFINED),

ARRAY(DataTypeFamily.CONSTRUCTED, DataTypeFamily.COLLECTION),

VECTOR(DataTypeFamily.CONSTRUCTED, DataTypeFamily.COLLECTION),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public interface DataTypeVisitor<R> {

R visit(BlobType blobType);

R visit(BlobRefType blobRefType);

R visit(ArrayType arrayType);

R visit(VectorType vectorType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ public static BlobType BLOB() {
return new BlobType();
}

public static BlobRefType BLOB_REF() {
return new BlobRefType();
}

public static OptionalInt getPrecision(DataType dataType) {
return dataType.accept(PRECISION_EXTRACTOR);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.BinaryType;
import org.apache.paimon.types.BlobRefType;
import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.BooleanType;
import org.apache.paimon.types.CharType;
Expand Down Expand Up @@ -163,6 +164,12 @@ public FieldType visit(BlobType blobType) {
throw new UnsupportedOperationException();
}

@Override
public FieldType visit(BlobRefType blobRefType) {
return new FieldType(
blobRefType.isNullable(), Types.MinorType.VARBINARY.getType(), null);
}

private TimeUnit getTimeUnit(int precision) {
if (precision == 0) {
return TimeUnit.SECOND;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.BinaryType;
import org.apache.paimon.types.BlobRefType;
import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.BooleanType;
import org.apache.paimon.types.CharType;
Expand Down Expand Up @@ -447,6 +448,11 @@ public Arrow2PaimonVectorConverter visit(BlobType blobType) {
throw new UnsupportedOperationException();
}

@Override
public Arrow2PaimonVectorConverter visit(BlobRefType blobRefType) {
return visit(new VarBinaryType());
}

@Override
public Arrow2PaimonVectorConverter visit(ArrayType arrayType) {
final Arrow2PaimonVectorConverter arrowVectorConvertor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.BinaryType;
import org.apache.paimon.types.BlobRefType;
import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.BooleanType;
import org.apache.paimon.types.CharType;
Expand Down Expand Up @@ -156,6 +157,11 @@ public ArrowFieldWriterFactory visit(BlobType blobType) {
throw new UnsupportedOperationException("Doesn't support BlobType.");
}

@Override
public ArrowFieldWriterFactory visit(BlobRefType blobRefType) {
return ArrowFieldWriters.BlobRefWriter::new;
}

@Override
public ArrowFieldWriterFactory visit(ArrayType arrayType) {
ArrowFieldWriterFactory elementWriterFactory = arrayType.getElementType().accept(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,42 @@ protected void doWrite(int rowIndex, DataGetters getters, int pos) {
}
}

/**
* Writer for BLOB_REF. The batch path is identical to {@link BinaryWriter} (columnar data is
* already serialized bytes). The row-by-row path serializes the {@link
* org.apache.paimon.data.Blob} via {@link BlobUtils#serializeBlobReference}.
*/
public static class BlobRefWriter extends ArrowFieldWriter {

public BlobRefWriter(FieldVector fieldVector, boolean isNullable) {
super(fieldVector, isNullable);
}

@Override
protected void doWrite(
ColumnVector columnVector,
@Nullable int[] pickedInColumn,
int startIndex,
int batchRows) {
VarBinaryVector varBinaryVector = (VarBinaryVector) fieldVector;
for (int i = 0; i < batchRows; i++) {
int row = getRowNumber(startIndex, i, pickedInColumn);
if (columnVector.isNullAt(row)) {
varBinaryVector.setNull(i);
} else {
byte[] value = ((BytesColumnVector) columnVector).getBytes(row).getBytes();
varBinaryVector.setSafe(i, value);
}
}
}

@Override
protected void doWrite(int rowIndex, DataGetters getters, int pos) {
byte[] bytes = getters.getBlobRef(pos).reference().serialize();
((VarBinaryVector) fieldVector).setSafe(rowIndex, bytes);
}
}

/** Writer for DECIMAL. */
public static class DecimalWriter extends ArrowFieldWriter {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,21 @@

import org.apache.paimon.arrow.ArrowUtils;
import org.apache.paimon.arrow.writer.ArrowFieldWriter;
import org.apache.paimon.data.BlobReference;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.InternalVector;
import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.ColumnarVec;
import org.apache.paimon.data.columnar.VecColumnVector;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.data.columnar.heap.HeapBytesVector;
import org.apache.paimon.data.columnar.heap.HeapFloatVector;
import org.apache.paimon.reader.VectorizedRecordIterator;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.FixedSizeListVector;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -129,6 +132,78 @@ public void testVectorColumnWriteWithPickedInColumn() {
}
}

@Test
public void testBlobRefColumnWriteAndReadBack() {
RowType rowType = RowType.of(DataTypes.BLOB_REF());
try (RootAllocator allocator = new RootAllocator()) {
VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(rowType, allocator);
ArrowFieldWriter[] fieldWriters = ArrowUtils.createArrowFieldWriters(vsr, rowType);

// Prepare serialized BlobReference bytes
BlobReference ref0 = new BlobReference("default.upstream", 7, 100L);
BlobReference ref1 = new BlobReference("default.upstream", 8, 200L);
byte[] bytes0 = ref0.serialize();
byte[] bytes1 = ref1.serialize();

int rows = 3; // row 0 = ref0, row 1 = null, row 2 = ref1
HeapBytesVector bytesVector = new HeapBytesVector(rows);
bytesVector.appendByteArray(bytes0, 0, bytes0.length);
bytesVector.appendByteArray(new byte[0], 0, 0); // placeholder for null
bytesVector.appendByteArray(bytes1, 0, bytes1.length);
bytesVector.setNullAt(1);

VectorizedColumnBatch batch =
new VectorizedColumnBatch(new ColumnVector[] {bytesVector});
batch.setNumRows(rows);

ArrowVectorizedBatchConverter converter =
new ArrowVectorizedBatchConverter(vsr, fieldWriters);
converter.reset(
new VectorizedRecordIterator() {
@Override
public VectorizedColumnBatch batch() {
return batch;
}

@Override
public InternalRow next() {
return null;
}

@Override
public void releaseBatch() {}
});
converter.next(rows);

// Verify the Arrow vector contains the correct binary data
VarBinaryVector arrowVector = (VarBinaryVector) vsr.getVector(0);
assertThat(arrowVector.isNull(0)).isFalse();
assertThat(arrowVector.isNull(1)).isTrue();
assertThat(arrowVector.isNull(2)).isFalse();

// Read back and verify the bytes can be deserialized to BlobReference
BlobReference readRef0 = BlobReference.deserialize(arrowVector.getObject(0));
assertThat(readRef0.tableName()).isEqualTo("default.upstream");
assertThat(readRef0.fieldId()).isEqualTo(7);
assertThat(readRef0.rowId()).isEqualTo(100L);

BlobReference readRef2 = BlobReference.deserialize(arrowVector.getObject(2));
assertThat(readRef2.tableName()).isEqualTo("default.upstream");
assertThat(readRef2.fieldId()).isEqualTo(8);
assertThat(readRef2.rowId()).isEqualTo(200L);

// Also verify the Arrow2Paimon round-trip
Arrow2PaimonVectorConverter paimonConverter =
Arrow2PaimonVectorConverter.construct(DataTypes.BLOB_REF());
ColumnVector paimonVector = paimonConverter.convertVector(arrowVector);
assertThat(paimonVector.isNullAt(0)).isFalse();
assertThat(paimonVector.isNullAt(1)).isTrue();
assertThat(paimonVector.isNullAt(2)).isFalse();

converter.close();
}
}

private static class TestVecColumnVectorWithNulls implements VecColumnVector {

private final ColumnVector data;
Expand Down
Loading
Loading