feat(vector): Support writing VECTOR to parquet and avro formats using Spark#18328
feat(vector): Support writing VECTOR to parquet and avro formats using Spark#18328rahil-c wants to merge 18 commits intoapache:masterfrom
Conversation
79398b2 to
8adeccb
Compare
|
@rahil-c to update pr overview |
…tion test - Write path (HoodieRowParquetWriteSupport.makeWriter) now switches on VectorElementType (FLOAT/DOUBLE/INT8) instead of hardcoding float, matching the read paths - Remove redundant detectVectorColumns call in readBaseFile by reusing vectorCols from requiredSchema for requestedSchema - Add testColumnProjectionWithVector covering 3 scenarios: exclude vector, vector-only, and all columns Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Use VectorLogicalType.VECTOR_BYTE_ORDER instead of hardcoded ByteOrder.LITTLE_ENDIAN in all 4 locations (write support, reader, Scala reader context, file group format) - Add Math.multiplyExact overflow guard for dimension * elementSize in HoodieRowParquetWriteSupport - Remove unnecessary array clone in HoodieSparkParquetReader - Add clarifying comment on non-vector column else branch - Fix misleading "float arrays" comment to "typed arrays" - Move inline JavaConverters import to top-level in SparkFileFormatInternalRowReaderContext - Import Metadata at top level instead of fully-qualified reference - Consolidate duplicate detectVectorColumns, replaceVectorColumnsWithBinary, and convertBinaryToVectorArray into SparkFileFormatInternalRowReaderContext companion object; HoodieFileGroupReaderBasedFileFormat now delegates - Add Javadoc on VectorType explaining it's needed for InternalSchema type hierarchy (cannot reuse HoodieSchema.Vector) - Clean up unused imports (ByteOrder, ByteBuffer, GenericArrayData, StructField, BinaryType, HoodieSchemaType) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…e types New tests added to TestVectorDataSource: - testDoubleVectorRoundTrip: DOUBLE element type end-to-end (64-dim) - testInt8VectorRoundTrip: INT8/byte element type end-to-end (256-dim) - testMultipleVectorColumns: two vector columns (float + double) in same schema with selective nulls and per-column projection - testMorTableWithVectors: MOR table type with insert + upsert, verifying merged view returns correct vectors - testCowUpsertWithVectors: COW upsert (update existing + insert new) verifying vector values after merge - testLargeDimensionVector: 1536-dim float vectors (OpenAI embedding size) to exercise large buffer allocation - testSmallDimensionVector: 2-dim vectors with edge values (Float.MaxValue) to verify boundary handling - testVectorWithNonVectorArrayColumn: vector column alongside a regular ArrayType(StringType) to ensure non-vector arrays are not incorrectly treated as vectors - testMorWithMultipleUpserts: MOR with 3 successive upsert batches of DOUBLE vectors, verifying the latest value wins per key Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ix hot-path allocation - Create shared VectorConversionUtils utility class to eliminate duplicated vector conversion logic across HoodieSparkParquetReader, SparkFileFormatInternalRowReaderContext, and HoodieFileGroupReaderBasedFileFormat - Add explicit dimension validation in HoodieRowParquetWriteSupport to prevent silent data corruption when array length doesn't match declared vector dimension - Reuse GenericInternalRow in HoodieSparkParquetReader's vector post-processing loop to reduce GC pressure on large scans
…eSchema.Vector] to fix Scala 2.12 type inference error
52f6db8 to
959bcd8
Compare
3f7e2d0 to
f8ce228
Compare
- Move VectorConversionUtils import into hudi group (was misplaced in 3rdParty) - Add blank line between hudi and 3rdParty import groups - Add blank line between java and scala import groups Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
@yihua @voonhous @balaji-varadarajan-ai will need a review from one of you guys if possible |
balaji-varadarajan-ai
left a comment
There was a problem hiding this comment.
Still reviewing the PR. here are the initial comments
| StructType structSchema = HoodieInternalRowUtils.getCachedSchema(nonNullSchema); | ||
|
|
||
| // Detect vector columns: ordinal → Vector schema | ||
| Map<Integer, HoodieSchema.Vector> vectorColumnInfo = VectorConversionUtils.detectVectorColumns(nonNullSchema); |
There was a problem hiding this comment.
seeing the pattern:
- Detecting vector columns.
- Replacing Schema
- Post-process rows
in HoodieSparkParquetReader, SparkFileFormatInternalRowReaderContext and HoodieFileGroupReaderBasedFileFormat. Wondering if you can bring them under one common method with specific callback.
There was a problem hiding this comment.
can look into this
| * @param schema a HoodieSchema of type RECORD (or null) | ||
| * @return map from field index to Vector schema; empty map if schema is null or has no vectors | ||
| */ | ||
| public static Map<Integer, HoodieSchema.Vector> detectVectorColumns(HoodieSchema schema) { |
There was a problem hiding this comment.
Just checking, As we are using integer ordinal position in the schema, can you check if things end to end with projections and schema evolution?
There was a problem hiding this comment.
I believe i have tests for this in the pr but will check
| HoodieSchema.Vector vectorSchema = (HoodieSchema.Vector) resolvedSchema; | ||
| int fixedSize = vectorSchema.getDimension() | ||
| * vectorSchema.getVectorElementType().getElementSize(); | ||
| return Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition) |
There was a problem hiding this comment.
The vectors are stored as bare FIXED_LEN_BYTE_ARRAY in Parquet with no logical type annotation or key-value metadata on the Parquet column. I think it would be useful to track this. Any thoughts?
There was a problem hiding this comment.
@balaji-varadarajan-ai so you mean we want to keep track of the hudi type info around VECTOR within parquet itself? If so i think i can look into this.
There was a problem hiding this comment.
@balaji-varadarajan-ai my question is what benefit do we get from keeping this info in the file footer or as a column annotation, since no other reader would be able to interpret this?
There was a problem hiding this comment.
My intuition is generally, keeping this metadata (to disambiguate) would be helpful in the data path for future scenarios.
| } | ||
| return new GenericArrayData(doubles); | ||
| case INT8: | ||
| byte[] int8s = new byte[dim]; |
There was a problem hiding this comment.
FYI: GenericArrayData(byte[]) is kinda inefficient — it actually boxes every byte into a Byte and stores it as an Object[]. So for a 1536-dim INT8 vector, that’s 1536 tiny allocations per row 😬. FLOAT/DOUBLE don’t have this issue since they use optimized primitive array constructors. If this becomes a bottleneck, consider using UnsafeArrayData.fromPrimitiveArray(int8s) to avoid all that boxing.
| default: | ||
| throw new UnsupportedOperationException("Unsupported vector element type: " + elemType); | ||
| } | ||
| recordConsumer.addBinary(Binary.fromReusedByteArray(buffer.array())); |
There was a problem hiding this comment.
Note: We reuse the same buffer for every row without copying. Make sure ColumnWriteStoreV2 doesn't hold references between writes, or consecutive rows will overwrite each other!
The decimal path does this too so it's probably safe, but keep in mind the vector buffer is much larger (~6KB vs ~16B).
| && resolvedSchema != null | ||
| && resolvedSchema.getType() == HoodieSchemaType.VECTOR) { | ||
| HoodieSchema.Vector vectorSchema = (HoodieSchema.Vector) resolvedSchema; | ||
| int fixedSize = vectorSchema.getDimension() |
There was a problem hiding this comment.
Use Math.multiplyExact() here to prevent silent overflow with massive vectors!
| StructField[] newFields = new StructField[fields.length]; | ||
| for (int i = 0; i < fields.length; i++) { | ||
| if (vectorColumns.containsKey(i)) { | ||
| newFields[i] = new StructField(fields[i].name(), BinaryType$.MODULE$, fields[i].nullable(), Metadata.empty()); |
There was a problem hiding this comment.
Is Metadata.empty() correct here ?
| result.update(i, convertBinaryToVectorArray(row.getBinary(i), vectorColumns.get(i))); | ||
| } else { | ||
| // Non-vector column: copy value as-is using the read schema's data type | ||
| result.update(i, row.get(i, readSchema.apply(i).dataType())); |
There was a problem hiding this comment.
instead of copying every field through GenericInternalRow, consider a byte-level copy of the UnsafeRow with surgical replacement of only the vector column offsets. It might be faster. You can separately micro benchmark this and test.
voonhous
left a comment
There was a problem hiding this comment.
Added some minor nit comments.
| case VECTOR: { | ||
| Types.VectorType vector = (Types.VectorType) primitive; | ||
| return HoodieSchema.createVector( | ||
| vector.getDimension(), | ||
| HoodieSchema.Vector.VectorElementType.fromString(vector.getElementType())); | ||
| } |
There was a problem hiding this comment.
This StorageBacking is lost in InternalSchema round-trip here iIUC. Types.VectorType stores storageBacking and VectorType.get() accepts it, but this conversion back to HoodieSchema doesn't pass it through.
This is fine for now since only PARQUET_FIXED_LEN_BYTE_ARRAY exists, but it'll silently lose data when new backing types are added. Maybe we should pass it through or add a comment noting the assumption?
There was a problem hiding this comment.
nice catch thanks voon
| private def readBaseFile(file: PartitionedFile, parquetFileReader: SparkColumnarFileReader, requestedSchema: StructType, | ||
| remainingPartitionSchema: StructType, fixedPartitionIndexes: Set[Int], requiredSchema: StructType, | ||
| partitionSchema: StructType, outputSchema: StructType, filters: Seq[Filter], | ||
| storageConf: StorageConfiguration[Configuration]): Iterator[InternalRow] = { |
There was a problem hiding this comment.
Possible to reduce boilerplate to this function to reduce its complexity?
There's 3 separate detectVectorColumns + replaceVectorFieldsWithBinary calls. We can add:
private def withVectorRewrite(schema: StructType): (StructType, Map[Int, HoodieSchema.Vector]) = {
val vecs = detectVectorColumns(schema)
if (vecs.nonEmpty) (replaceVectorFieldsWithBinary(schema, vecs), vecs) else (schema, vecs)
...
}
| LOCAL_TIMESTAMP_MILLIS(Long.class), | ||
| LOCAL_TIMESTAMP_MICROS(Long.class); | ||
| LOCAL_TIMESTAMP_MICROS(Long.class), | ||
| VECTOR(ByteBuffer.class); |
There was a problem hiding this comment.
Don't quite understand this, vectors aren't accessed as ByteBuffer through the InternalSchema API, we are using byte[].class right?
Possible to add a comment here for choosing ByteBuffer?
There was a problem hiding this comment.
Will leave a comment for this explaining why its ByteBuffer
|
@balaji-varadarajan-ai @voonhous if you can take a look again? |
Describe the issue this Pull Request addresses
Builds on #18146 (VECTOR type in HoodieSchema) and #18190 (Spark↔HoodieSchema converters) to complete the full read/write pipeline for vector columns in Apache Hudi backed by Parquet.
Vectors are stored as Parquet
FIXED_LEN_BYTE_ARRAY(little-endian, IEEE-754) rather than repeated groups.Summary and Changelog
Write path
HoodieRowParquetWriteSupport: detectsArrayTypecolumns annotated withhudi_type=VECTOR(dim, elementType)metadata and serialises them asFIXED_LEN_BYTE_ARRAYinstead of a Parquet list. Dimension mismatch at write time throwsHoodieExceptionto prevent silent data corruption.FLOAT32,FLOAT64,INT8Read path
HoodieSparkParquetReaderandSparkFileFormatInternalRowReaderContext: detectFIXED_LEN_BYTE_ARRAYcolumns carryinghudi_typemetadata and deserialise them back to SparkArrayData.HoodieFileGroupReaderBasedFileFormat: propagates vector column metadata through the file-group reader so schema is not lost during Spark's internal schema resolution.VectorConversionUtils(new): shared utility extracted to eliminate duplicated byte-buffer decode logic across the two reader paths.Schema / compatibility
InternalSchemaConverter: mapsVectorTypeto/from Avrobyteswithhudi_typeprop, preserving dimension and element-type metadata through the Avro layer.HoodieSchemaCompatibilityChecker: rejects illegal vector evolution (e.g. dimension change) rather than silently coercing.HoodieSchemaComparatorForSchemaEvolution: treats vector columns as incompatible when dimension or element type differs.HoodieTableMetadataUtil: skips column statistics for vector columns (min/max on raw bytes is meaningless).AvroSchemaConverterWithTimestampNTZ: passes throughhudi_typeproperty on bytes fields so vector metadata survives Avro↔Spark schema round-trips.Types.VectorType: addsbyteSize()helper used by the write path to computeFIXED_LEN_BYTE_ARRAYlength.Tests
TestVectorDataSource(808 lines): end-to-end Spark functional tests covering FLOAT32, FLOAT64, INT8 across COPY_ON_WRITE and MERGE_ON_READ table types; includes column projection, schema evolution rejection, and multi-batch upsert round-trips.TestHoodieSchemaCompatibility,TestHoodieSchemaComparatorForSchemaEvolution,TestHoodieTableMetadataUtil: unit tests for schema-layer changes.Impact
FIXED_LEN_BYTE_ARRAY. Reading those files with an older Hudi version will surface raw bytes rather than a float array; users should upgrade readers alongside writers.Risk Level
Low. All changes are gated behind
hudi_type=VECTOR(...)metadata presence. Tables that do not use vector columns are unaffected. New paths are covered by functional tests across both table types.Documentation Update
A follow-up website doc page covering vector column usage (schema annotation, supported element types, Parquet layout) will be raised separately. Config changes: none.
Contributor's checklist