From 9a6ac4a447af3b03d14468065f44173c3f23e8fd Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Fri, 29 May 2026 08:00:48 +0100 Subject: [PATCH] [rust][row] typed column-vector dispatch with lazy nested ARRAY/MAP/ROW views --- bindings/cpp/src/lib.rs | 5 +- bindings/cpp/src/types.rs | 6 +- bindings/python/src/table.rs | 2 + crates/examples/src/example_kv_table.rs | 2 +- .../src/example_partitioned_kv_table.rs | 2 +- .../src/example_partitioned_prefix_lookup.rs | 2 +- crates/examples/src/example_prefix_lookup.rs | 2 +- crates/examples/src/example_table.rs | 2 +- crates/fluss/src/client/table/append.rs | 2 +- crates/fluss/src/client/table/lookup.rs | 1 + crates/fluss/src/record/arrow.rs | 38 +- crates/fluss/src/record/kv/kv_record_batch.rs | 2 +- .../src/record/kv/kv_record_batch_builder.rs | 5 +- crates/fluss/src/record/mod.rs | 2 +- crates/fluss/src/row/binary_array.rs | 83 +- crates/fluss/src/row/binary_map.rs | 33 + crates/fluss/src/row/column.rs | 1336 +++-------------- crates/fluss/src/row/column_vector.rs | 650 ++++++++ crates/fluss/src/row/column_writer.rs | 369 ++++- crates/fluss/src/row/columnar.rs | 364 +++++ .../fluss/src/row/compacted/compacted_row.rs | 36 +- .../src/row/compacted/compacted_row_reader.rs | 2 +- crates/fluss/src/row/datum.rs | 19 +- crates/fluss/src/row/field_getter.rs | 20 +- crates/fluss/src/row/fixed_schema_decoder.rs | 2 +- crates/fluss/src/row/lookup_row.rs | 19 +- crates/fluss/src/row/mod.rs | 104 +- crates/fluss/src/row/projected_row.rs | 23 +- crates/fluss/src/row/row_decoder.rs | 2 +- crates/fluss/src/row/view.rs | 670 +++++++++ crates/fluss/tests/integration/kv_table.rs | 23 +- crates/fluss/tests/integration/log_table.rs | 46 +- .../tests/integration/table_remote_scan.rs | 2 +- 33 files changed, 2420 insertions(+), 1456 deletions(-) create mode 100644 crates/fluss/src/row/column_vector.rs create mode 100644 crates/fluss/src/row/columnar.rs create mode 100644 crates/fluss/src/row/view.rs diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index ed575244..b9d5a03e 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -2400,7 +2400,10 @@ mod row_reader { validate(row, columns, field, "get_array", |dt| { matches!(dt, fcore::metadata::DataType::Array(_)) })?; - row.get_array(field).map_err(|e| e.to_string()) + row.get_array(field) + .map_err(|e| e.to_string())? + .try_into_binary() + .map_err(|e| e.to_string()) } pub fn get_array_element_type( diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs index 23ac636d..8836e25e 100644 --- a/bindings/cpp/src/types.rs +++ b/bindings/cpp/src/types.rs @@ -588,8 +588,10 @@ pub fn compacted_row_to_owned( fcore::metadata::DataType::Binary(dt) => { Datum::Blob(Cow::Owned(row.get_binary(i, dt.length())?.to_vec())) } - fcore::metadata::DataType::Array(_) => Datum::Array(row.get_array(i)?), - fcore::metadata::DataType::Map(_) => Datum::Map(row.get_map(i)?), + fcore::metadata::DataType::Array(_) => { + Datum::Array(row.get_array(i)?.try_into_binary()?) + } + fcore::metadata::DataType::Map(_) => Datum::Map(row.get_map(i)?.try_into_binary()?), other => return Err(anyhow!("Unsupported data type for column {i}: {other:?}")), }; diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index b30baeb5..04f65a8b 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -1519,6 +1519,8 @@ pub fn datum_to_python_value( DataType::Array(array_type) => { let array_data = row .get_array(pos) + .map_err(|e| FlussError::from_core_error(&e))? + .try_into_binary() .map_err(|e| FlussError::from_core_error(&e))?; let element_type = array_type.get_element_type(); diff --git a/crates/examples/src/example_kv_table.rs b/crates/examples/src/example_kv_table.rs index ad12ed79..d3367490 100644 --- a/crates/examples/src/example_kv_table.rs +++ b/crates/examples/src/example_kv_table.rs @@ -20,7 +20,7 @@ use fluss::client::FlussConnection; use fluss::config::Config; use fluss::error::Result; use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; -use fluss::row::{GenericRow, InternalRow}; +use fluss::row::{DataGetters, GenericRow}; #[tokio::main] #[allow(dead_code)] diff --git a/crates/examples/src/example_partitioned_kv_table.rs b/crates/examples/src/example_partitioned_kv_table.rs index 944d8d49..c9231120 100644 --- a/crates/examples/src/example_partitioned_kv_table.rs +++ b/crates/examples/src/example_partitioned_kv_table.rs @@ -20,7 +20,7 @@ use fluss::client::{FlussAdmin, FlussConnection}; use fluss::config::Config; use fluss::error::Result; use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor, TablePath}; -use fluss::row::{GenericRow, InternalRow}; +use fluss::row::{DataGetters, GenericRow}; use std::collections::HashMap; #[tokio::main] diff --git a/crates/examples/src/example_partitioned_prefix_lookup.rs b/crates/examples/src/example_partitioned_prefix_lookup.rs index b212b0fd..009afbc4 100644 --- a/crates/examples/src/example_partitioned_prefix_lookup.rs +++ b/crates/examples/src/example_partitioned_prefix_lookup.rs @@ -20,7 +20,7 @@ use fluss::client::{FlussAdmin, FlussConnection}; use fluss::config::Config; use fluss::error::Result; use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor, TablePath}; -use fluss::row::{GenericRow, InternalRow}; +use fluss::row::{DataGetters, GenericRow}; use std::collections::HashMap; #[tokio::main] diff --git a/crates/examples/src/example_prefix_lookup.rs b/crates/examples/src/example_prefix_lookup.rs index 12fc76dc..2bd583a1 100644 --- a/crates/examples/src/example_prefix_lookup.rs +++ b/crates/examples/src/example_prefix_lookup.rs @@ -20,7 +20,7 @@ use fluss::client::FlussConnection; use fluss::config::Config; use fluss::error::Result; use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; -use fluss::row::{GenericRow, InternalRow}; +use fluss::row::{DataGetters, GenericRow}; #[tokio::main] #[allow(dead_code)] diff --git a/crates/examples/src/example_table.rs b/crates/examples/src/example_table.rs index 1f751f3c..db042c0c 100644 --- a/crates/examples/src/example_table.rs +++ b/crates/examples/src/example_table.rs @@ -27,7 +27,7 @@ use fluss::client::FlussConnection; use fluss::config::Config; use fluss::error::Result; use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; -use fluss::row::{GenericRow, InternalRow}; +use fluss::row::{DataGetters, GenericRow}; use std::time::Duration; #[tokio::main] diff --git a/crates/fluss/src/client/table/append.rs b/crates/fluss/src/client/table/append.rs index 562e8ea7..dd54a5b1 100644 --- a/crates/fluss/src/client/table/append.rs +++ b/crates/fluss/src/client/table/append.rs @@ -131,7 +131,7 @@ impl AppendWriter { Arc::new(self.table_info.row_type.clone()), 0, None, - ); + )?; Arc::new(get_physical_path( &self.table_path, self.partition_getter.as_ref(), diff --git a/crates/fluss/src/client/table/lookup.rs b/crates/fluss/src/client/table/lookup.rs index 51a0a071..8bf2d0bb 100644 --- a/crates/fluss/src/client/table/lookup.rs +++ b/crates/fluss/src/client/table/lookup.rs @@ -626,6 +626,7 @@ impl PrefixKeyLookuper { mod tests { use super::*; use crate::metadata::{Column, DataTypes, Schema}; + use crate::row::DataGetters; use crate::row::binary::BinaryWriter; use crate::row::compacted::CompactedRowWriter; use arrow::array::Int32Array; diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index b97fc120..57e93ebf 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -22,8 +22,9 @@ use crate::compression::{ use crate::error::{Error, Result}; use crate::metadata::{DataField, DataType, RowType}; use crate::record::{ChangeType, ScanRecord}; +use crate::row::column_vector::TypedBatch; use crate::row::column_writer::{ColumnWriter, round_up_to_8}; -use crate::row::{ColumnarRow, InternalRow, arrow_row_column_indices, fluss_row_column_indices}; +use crate::row::{ColumnarRow, InternalRow}; use arrow::array::{ArrayBuilder, ArrayRef}; use arrow::{ array::RecordBatch, @@ -1667,21 +1668,12 @@ impl Iterator for ArrowLogRecordIterator { } pub struct ArrowReader { - record_batch: Arc, - row_type: Arc, - fluss_row_type: Option>, - row_column_indices: Arc<[usize]>, + batch: Arc, } impl ArrowReader { pub fn new(record_batch: Arc, row_type: Arc) -> Self { - let row_column_indices = arrow_row_column_indices(&record_batch); - ArrowReader { - record_batch, - row_type, - fluss_row_type: None, - row_column_indices, - } + Self::new_with_fluss_row_type(record_batch, row_type, None) } pub fn new_with_fluss_row_type( @@ -1689,30 +1681,20 @@ impl ArrowReader { row_type: Arc, fluss_row_type: Option>, ) -> Self { - let row_column_indices = match &fluss_row_type { - Some(rt) => fluss_row_column_indices(rt), - None => arrow_row_column_indices(&record_batch), - }; + let schema = fluss_row_type.as_deref().unwrap_or(&row_type); + let typed = TypedBatch::build(&record_batch, schema) + .expect("ArrowReader: TypedBatch::build failed — schema mismatch in scan setup"); ArrowReader { - record_batch, - row_type, - fluss_row_type, - row_column_indices, + batch: Arc::new(typed), } } pub fn row_count(&self) -> usize { - self.record_batch.num_rows() + self.batch.num_rows } pub fn read(&self, row_id: usize) -> ColumnarRow { - ColumnarRow::with_indices( - self.record_batch.clone(), - self.row_type.clone(), - row_id, - self.fluss_row_type.clone(), - self.row_column_indices.clone(), - ) + ColumnarRow::from_typed_batch(Arc::clone(&self.batch), row_id) } } pub struct MyVec(pub StreamReader); diff --git a/crates/fluss/src/record/kv/kv_record_batch.rs b/crates/fluss/src/record/kv/kv_record_batch.rs index 14ff2e91..d28302a1 100644 --- a/crates/fluss/src/record/kv/kv_record_batch.rs +++ b/crates/fluss/src/record/kv/kv_record_batch.rs @@ -373,7 +373,7 @@ mod tests { use crate::metadata::{DataTypes, KvFormat}; use crate::record::kv::test_util::TestReadContext; use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, KvRecordBatchBuilder}; - use crate::row::InternalRow; + use crate::row::DataGetters; use crate::row::binary::BinaryWriter; use bytes::{BufMut, BytesMut}; diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs b/crates/fluss/src/record/kv/kv_record_batch_builder.rs index 0e806337..a2e472a3 100644 --- a/crates/fluss/src/record/kv/kv_record_batch_builder.rs +++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs @@ -319,6 +319,8 @@ impl Drop for KvRecordBatchBuilder { mod tests { use super::*; use crate::metadata::{DataTypes, RowType}; + use crate::record::kv::KvRecordBatch; + use crate::row::DataGetters; use crate::row::binary::BinaryWriter; use crate::row::compacted::{CompactedRow, CompactedRowWriter}; use std::sync::LazyLock; @@ -497,9 +499,6 @@ mod tests { #[test] fn test_builder_with_compacted_row_writer() -> crate::error::Result<()> { - use crate::record::kv::KvRecordBatch; - use crate::row::InternalRow; - let mut builder = KvRecordBatchBuilder::new(1, 100000, KvFormat::COMPACTED); builder.set_writer_state(100, 5); diff --git a/crates/fluss/src/record/mod.rs b/crates/fluss/src/record/mod.rs index 462bdebb..7e548b2c 100644 --- a/crates/fluss/src/record/mod.rs +++ b/crates/fluss/src/record/mod.rs @@ -253,7 +253,7 @@ mod tests { let row_type = Arc::new(RowType::with_data_types(vec![ crate::metadata::DataType::Int(crate::metadata::IntType::new()), ])); - ColumnarRow::new(Arc::new(batch), row_type, row_id, None) + ColumnarRow::new(Arc::new(batch), row_type, row_id, None).expect("ColumnarRow") } #[test] diff --git a/crates/fluss/src/row/binary_array.rs b/crates/fluss/src/row/binary_array.rs index b987cec8..db15b082 100644 --- a/crates/fluss/src/row/binary_array.rs +++ b/crates/fluss/src/row/binary_array.rs @@ -28,12 +28,13 @@ use crate::error::Error::IllegalArgument; use crate::error::Result; use crate::metadata::{DataType, RowType}; use crate::row::Decimal; -use crate::row::InternalRow; use crate::row::binary::{BinaryRowFormat, ValueWriter}; use crate::row::binary_map::FlussMap; use crate::row::compacted::{CompactedRow, CompactedRowWriter, calculate_bit_set_width_in_bytes}; use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz}; use crate::row::field_getter::FieldGetter; +use crate::row::view::ArrayView; +use crate::row::{DataGetters, InternalArray, InternalRow}; use bytes::Bytes; use serde::Serialize; use std::fmt; @@ -768,79 +769,74 @@ impl InternalRow for FlussArray { fn get_field_count(&self) -> usize { self.size() } +} +impl InternalArray for FlussArray { + fn size(&self) -> usize { + FlussArray::size(self) + } +} + +impl DataGetters for FlussArray { fn is_null_at(&self, pos: usize) -> Result { - Ok(self.is_null_at(pos)) + Ok(FlussArray::is_null_at(self, pos)) } fn get_boolean(&self, pos: usize) -> Result { - self.get_boolean(pos) + FlussArray::get_boolean(self, pos) } fn get_byte(&self, pos: usize) -> Result { - self.get_byte(pos) + FlussArray::get_byte(self, pos) } fn get_short(&self, pos: usize) -> Result { - self.get_short(pos) + FlussArray::get_short(self, pos) } fn get_int(&self, pos: usize) -> Result { - self.get_int(pos) + FlussArray::get_int(self, pos) } fn get_long(&self, pos: usize) -> Result { - self.get_long(pos) + FlussArray::get_long(self, pos) } fn get_float(&self, pos: usize) -> Result { - self.get_float(pos) + FlussArray::get_float(self, pos) } fn get_double(&self, pos: usize) -> Result { - self.get_double(pos) + FlussArray::get_double(self, pos) } fn get_char(&self, pos: usize, _length: usize) -> Result<&str> { - self.get_string(pos) + FlussArray::get_string(self, pos) } - fn get_string(&self, pos: usize) -> Result<&str> { - self.get_string(pos) + FlussArray::get_string(self, pos) } fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> Result { - self.get_decimal(pos, precision as u32, scale as u32) + FlussArray::get_decimal(self, pos, precision as u32, scale as u32) } fn get_date(&self, pos: usize) -> Result { - self.get_date(pos) + FlussArray::get_date(self, pos) } fn get_time(&self, pos: usize) -> Result