Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2400,7 +2400,10 @@ mod row_reader {
validate(row, columns, field, "get_array", |dt| {
matches!(dt, fcore::metadata::DataType::Array(_))
})?;
row.get_array(field).map_err(|e| e.to_string())
row.get_array(field)
.map_err(|e| e.to_string())?
.try_into_binary()
.map_err(|e| e.to_string())
}

pub fn get_array_element_type(
Expand Down
6 changes: 4 additions & 2 deletions bindings/cpp/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,8 +588,10 @@ pub fn compacted_row_to_owned(
fcore::metadata::DataType::Binary(dt) => {
Datum::Blob(Cow::Owned(row.get_binary(i, dt.length())?.to_vec()))
}
fcore::metadata::DataType::Array(_) => Datum::Array(row.get_array(i)?),
fcore::metadata::DataType::Map(_) => Datum::Map(row.get_map(i)?),
fcore::metadata::DataType::Array(_) => {
Datum::Array(row.get_array(i)?.try_into_binary()?)
}
fcore::metadata::DataType::Map(_) => Datum::Map(row.get_map(i)?.try_into_binary()?),
other => return Err(anyhow!("Unsupported data type for column {i}: {other:?}")),
};

Expand Down
2 changes: 2 additions & 0 deletions bindings/python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1519,6 +1519,8 @@ pub fn datum_to_python_value(
DataType::Array(array_type) => {
let array_data = row
.get_array(pos)
.map_err(|e| FlussError::from_core_error(&e))?
.try_into_binary()
.map_err(|e| FlussError::from_core_error(&e))?;

let element_type = array_type.get_element_type();
Expand Down
2 changes: 1 addition & 1 deletion crates/examples/src/example_kv_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use fluss::client::FlussConnection;
use fluss::config::Config;
use fluss::error::Result;
use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
use fluss::row::{GenericRow, InternalRow};
use fluss::row::{DataGetters, GenericRow};

#[tokio::main]
#[allow(dead_code)]
Expand Down
2 changes: 1 addition & 1 deletion crates/examples/src/example_partitioned_kv_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use fluss::client::{FlussAdmin, FlussConnection};
use fluss::config::Config;
use fluss::error::Result;
use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor, TablePath};
use fluss::row::{GenericRow, InternalRow};
use fluss::row::{DataGetters, GenericRow};
use std::collections::HashMap;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion crates/examples/src/example_partitioned_prefix_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use fluss::client::{FlussAdmin, FlussConnection};
use fluss::config::Config;
use fluss::error::Result;
use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor, TablePath};
use fluss::row::{GenericRow, InternalRow};
use fluss::row::{DataGetters, GenericRow};
use std::collections::HashMap;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion crates/examples/src/example_prefix_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use fluss::client::FlussConnection;
use fluss::config::Config;
use fluss::error::Result;
use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
use fluss::row::{GenericRow, InternalRow};
use fluss::row::{DataGetters, GenericRow};

#[tokio::main]
#[allow(dead_code)]
Expand Down
2 changes: 1 addition & 1 deletion crates/examples/src/example_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use fluss::client::FlussConnection;
use fluss::config::Config;
use fluss::error::Result;
use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
use fluss::row::{GenericRow, InternalRow};
use fluss::row::{DataGetters, GenericRow};
use std::time::Duration;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion crates/fluss/src/client/table/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl AppendWriter {
Arc::new(self.table_info.row_type.clone()),
0,
None,
);
)?;
Arc::new(get_physical_path(
&self.table_path,
self.partition_getter.as_ref(),
Expand Down
1 change: 1 addition & 0 deletions crates/fluss/src/client/table/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,7 @@ impl PrefixKeyLookuper {
mod tests {
use super::*;
use crate::metadata::{Column, DataTypes, Schema};
use crate::row::DataGetters;
use crate::row::binary::BinaryWriter;
use crate::row::compacted::CompactedRowWriter;
use arrow::array::Int32Array;
Expand Down
38 changes: 10 additions & 28 deletions crates/fluss/src/record/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ use crate::compression::{
use crate::error::{Error, Result};
use crate::metadata::{DataField, DataType, RowType};
use crate::record::{ChangeType, ScanRecord};
use crate::row::column_vector::TypedBatch;
use crate::row::column_writer::{ColumnWriter, round_up_to_8};
use crate::row::{ColumnarRow, InternalRow, arrow_row_column_indices, fluss_row_column_indices};
use crate::row::{ColumnarRow, InternalRow};
use arrow::array::{ArrayBuilder, ArrayRef};
use arrow::{
array::RecordBatch,
Expand Down Expand Up @@ -1667,52 +1668,33 @@ impl Iterator for ArrowLogRecordIterator {
}

pub struct ArrowReader {
record_batch: Arc<RecordBatch>,
row_type: Arc<RowType>,
fluss_row_type: Option<Arc<RowType>>,
row_column_indices: Arc<[usize]>,
batch: Arc<TypedBatch>,
}

impl ArrowReader {
pub fn new(record_batch: Arc<RecordBatch>, row_type: Arc<RowType>) -> Self {
let row_column_indices = arrow_row_column_indices(&record_batch);
ArrowReader {
record_batch,
row_type,
fluss_row_type: None,
row_column_indices,
}
Self::new_with_fluss_row_type(record_batch, row_type, None)
}

pub fn new_with_fluss_row_type(
record_batch: Arc<RecordBatch>,
row_type: Arc<RowType>,
fluss_row_type: Option<Arc<RowType>>,
) -> Self {
let row_column_indices = match &fluss_row_type {
Some(rt) => fluss_row_column_indices(rt),
None => arrow_row_column_indices(&record_batch),
};
let schema = fluss_row_type.as_deref().unwrap_or(&row_type);
let typed = TypedBatch::build(&record_batch, schema)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so now build is fallible, and a malformed batch panics the scan instead of surfacing error?

.expect("ArrowReader: TypedBatch::build failed — schema mismatch in scan setup");
ArrowReader {
record_batch,
row_type,
fluss_row_type,
row_column_indices,
batch: Arc::new(typed),
}
}

pub fn row_count(&self) -> usize {
self.record_batch.num_rows()
self.batch.num_rows
}

pub fn read(&self, row_id: usize) -> ColumnarRow {
ColumnarRow::with_indices(
self.record_batch.clone(),
self.row_type.clone(),
row_id,
self.fluss_row_type.clone(),
self.row_column_indices.clone(),
)
ColumnarRow::from_typed_batch(Arc::clone(&self.batch), row_id)
}
}
pub struct MyVec<T>(pub StreamReader<T>);
Expand Down
2 changes: 1 addition & 1 deletion crates/fluss/src/record/kv/kv_record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ mod tests {
use crate::metadata::{DataTypes, KvFormat};
use crate::record::kv::test_util::TestReadContext;
use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, KvRecordBatchBuilder};
use crate::row::InternalRow;
use crate::row::DataGetters;
use crate::row::binary::BinaryWriter;

use bytes::{BufMut, BytesMut};
Expand Down
5 changes: 2 additions & 3 deletions crates/fluss/src/record/kv/kv_record_batch_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ impl Drop for KvRecordBatchBuilder {
mod tests {
use super::*;
use crate::metadata::{DataTypes, RowType};
use crate::record::kv::KvRecordBatch;
use crate::row::DataGetters;
use crate::row::binary::BinaryWriter;
use crate::row::compacted::{CompactedRow, CompactedRowWriter};
use std::sync::LazyLock;
Expand Down Expand Up @@ -497,9 +499,6 @@ mod tests {

#[test]
fn test_builder_with_compacted_row_writer() -> crate::error::Result<()> {
use crate::record::kv::KvRecordBatch;
use crate::row::InternalRow;

let mut builder = KvRecordBatchBuilder::new(1, 100000, KvFormat::COMPACTED);
builder.set_writer_state(100, 5);

Expand Down
2 changes: 1 addition & 1 deletion crates/fluss/src/record/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ mod tests {
let row_type = Arc::new(RowType::with_data_types(vec![
crate::metadata::DataType::Int(crate::metadata::IntType::new()),
]));
ColumnarRow::new(Arc::new(batch), row_type, row_id, None)
ColumnarRow::new(Arc::new(batch), row_type, row_id, None).expect("ColumnarRow")
}

#[test]
Expand Down
83 changes: 48 additions & 35 deletions crates/fluss/src/row/binary_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ use crate::error::Error::IllegalArgument;
use crate::error::Result;
use crate::metadata::{DataType, RowType};
use crate::row::Decimal;
use crate::row::InternalRow;
use crate::row::binary::{BinaryRowFormat, ValueWriter};
use crate::row::binary_map::FlussMap;
use crate::row::compacted::{CompactedRow, CompactedRowWriter, calculate_bit_set_width_in_bytes};
use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz};
use crate::row::field_getter::FieldGetter;
use crate::row::view::ArrayView;
use crate::row::{DataGetters, InternalArray, InternalRow};
use bytes::Bytes;
use serde::Serialize;
use std::fmt;
Expand Down Expand Up @@ -768,79 +769,74 @@ impl InternalRow for FlussArray {
fn get_field_count(&self) -> usize {
self.size()
}
}

impl InternalArray for FlussArray {
fn size(&self) -> usize {
FlussArray::size(self)
}
}

impl DataGetters for FlussArray {
fn is_null_at(&self, pos: usize) -> Result<bool> {
Ok(self.is_null_at(pos))
Ok(FlussArray::is_null_at(self, pos))
}

fn get_boolean(&self, pos: usize) -> Result<bool> {
self.get_boolean(pos)
FlussArray::get_boolean(self, pos)
}
fn get_byte(&self, pos: usize) -> Result<i8> {
self.get_byte(pos)
FlussArray::get_byte(self, pos)
}
fn get_short(&self, pos: usize) -> Result<i16> {
self.get_short(pos)
FlussArray::get_short(self, pos)
}
fn get_int(&self, pos: usize) -> Result<i32> {
self.get_int(pos)
FlussArray::get_int(self, pos)
}
fn get_long(&self, pos: usize) -> Result<i64> {
self.get_long(pos)
FlussArray::get_long(self, pos)
}
fn get_float(&self, pos: usize) -> Result<f32> {
self.get_float(pos)
FlussArray::get_float(self, pos)
}
fn get_double(&self, pos: usize) -> Result<f64> {
self.get_double(pos)
FlussArray::get_double(self, pos)
}

fn get_char(&self, pos: usize, _length: usize) -> Result<&str> {
self.get_string(pos)
FlussArray::get_string(self, pos)
}

fn get_string(&self, pos: usize) -> Result<&str> {
self.get_string(pos)
FlussArray::get_string(self, pos)
}

fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> Result<Decimal> {
self.get_decimal(pos, precision as u32, scale as u32)
FlussArray::get_decimal(self, pos, precision as u32, scale as u32)
}

fn get_date(&self, pos: usize) -> Result<Date> {
self.get_date(pos)
FlussArray::get_date(self, pos)
}
fn get_time(&self, pos: usize) -> Result<Time> {
self.get_time(pos)
FlussArray::get_time(self, pos)
}
fn get_timestamp_ntz(&self, pos: usize, precision: u32) -> Result<TimestampNtz> {
self.get_timestamp_ntz(pos, precision)
FlussArray::get_timestamp_ntz(self, pos, precision)
}
fn get_timestamp_ltz(&self, pos: usize, precision: u32) -> Result<TimestampLtz> {
self.get_timestamp_ltz(pos, precision)
FlussArray::get_timestamp_ltz(self, pos, precision)
}

fn get_binary(&self, pos: usize, _length: usize) -> Result<&[u8]> {
self.get_binary(pos)
FlussArray::get_binary(self, pos)
}

fn get_bytes(&self, pos: usize) -> Result<&[u8]> {
self.get_binary(pos)
FlussArray::get_binary(self, pos)
}

fn get_array(&self, pos: usize) -> Result<FlussArray> {
self.get_array(pos)
}

fn get_map(&self, pos: usize) -> Result<FlussMap> {
// FlussArray carries no schema; nested map reads must go through the
// inherent FlussArray::get_map(pos, key_type, value_type).
Err(IllegalArgument {
message: format!(
"InternalRow::get_map is not supported on FlussArray (pos {pos}); \
use FlussArray::get_map(pos, key_type, value_type) directly"
),
})
fn get_array(&self, pos: usize) -> Result<ArrayView<'_>> {
Ok(ArrayView::Binary(FlussArray::get_array(self, pos)?))
}
}

Expand All @@ -852,6 +848,23 @@ mod tests {
use crate::row::compacted::CompactedRowWriter;
use crate::row::{Datum, GenericRow};

#[test]
fn fluss_array_dispatches_through_internal_array_trait() {
let mut writer = FlussArrayWriter::new(3, &DataTypes::int());
writer.write_int(0, 10);
writer.set_null_at(1);
writer.write_int(2, 30);
let arr = writer.complete().unwrap();

let view: &dyn InternalArray = &arr;
assert_eq!(view.size(), 3);
assert!(!view.is_null_at(0).unwrap());
assert!(view.is_null_at(1).unwrap());
assert!(!view.is_null_at(2).unwrap());
assert_eq!(view.get_int(0).unwrap(), 10);
assert_eq!(view.get_int(2).unwrap(), 30);
}

#[test]
fn test_header_calculation() {
assert_eq!(calculate_header_in_bytes(0), 4);
Expand Down Expand Up @@ -1169,7 +1182,7 @@ mod tests {
let r1_tags = r1.get_array(0).unwrap();
assert_eq!(r1_tags.size(), 3);
assert_eq!(r1_tags.get_string(0).unwrap(), "x");
assert!(r1_tags.is_null_at(1));
assert!(r1_tags.is_null_at(1).unwrap());
assert_eq!(r1_tags.get_string(2).unwrap(), "z");
}

Expand Down Expand Up @@ -1207,7 +1220,7 @@ mod tests {
let bytes = writer.to_bytes();

let outer_compacted = CompactedRow::from_bytes(outer_row_type, &bytes);
let recovered_arr = outer_compacted.get_array(0).unwrap();
let recovered_arr = outer_compacted.get_array(0).unwrap().expect_binary();
assert_eq!(recovered_arr.size(), 2);

let recovered_r0 = recovered_arr.get_row(0, inner_row_type).unwrap();
Expand Down
Loading
Loading