Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 116 additions & 48 deletions crates/fluss/src/row/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

use crate::row::InternalRow;
use arrow::array::{
Array, AsArray, BinaryArray, Decimal128Array, FixedSizeBinaryArray, Float32Array, Float64Array,
Int8Array, Int16Array, Int32Array, Int64Array, RecordBatch, StringArray,
Array, AsArray, BinaryArray, Date32Array, Decimal128Array, FixedSizeBinaryArray, Float32Array,
Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, RecordBatch, StringArray,
Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray,
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
};
use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
use std::sync::Arc;
Expand Down Expand Up @@ -67,35 +70,110 @@ impl ColumnarRow {
) -> T {
let schema = self.record_batch.schema();
let arrow_field = schema.field(pos);
let value = self.get_long(pos);
let column = self.record_batch.column(pos);

match arrow_field.data_type() {
ArrowDataType::Timestamp(time_unit, _) => {
// Convert based on Arrow TimeUnit
let (millis, nanos) = match time_unit {
TimeUnit::Second => (value * 1000, 0),
TimeUnit::Millisecond => (value, 0),
TimeUnit::Microsecond => {
let millis = value / 1000;
let nanos = ((value % 1000) * 1000) as i32;
(millis, nanos)
}
TimeUnit::Nanosecond => {
let millis = value / 1_000_000;
let nanos = (value % 1_000_000) as i32;
(millis, nanos)
}
};

if nanos == 0 {
construct_compact(millis)
} else {
// nanos is guaranteed to be in valid range [0, 999_999] by arithmetic
construct_with_nanos(millis, nanos)
.expect("nanos in valid range by construction")
// Read value based on the actual Arrow timestamp type
let value = match arrow_field.data_type() {
ArrowDataType::Timestamp(TimeUnit::Second, _) => column
.as_any()
.downcast_ref::<TimestampSecondArray>()
.expect("Expected TimestampSecondArray")
.value(self.row_id),
ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => column
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.expect("Expected TimestampMillisecondArray")
.value(self.row_id),
ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => column
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.expect("Expected TimestampMicrosecondArray")
.value(self.row_id),
ArrowDataType::Timestamp(TimeUnit::Nanosecond, _) => column
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.expect("Expected TimestampNanosecondArray")
.value(self.row_id),
other => panic!("Expected Timestamp column at position {pos}, got {other:?}"),
};

// Convert based on Arrow TimeUnit
let (millis, nanos) = match arrow_field.data_type() {
ArrowDataType::Timestamp(time_unit, _) => match time_unit {
TimeUnit::Second => (value * 1000, 0),
TimeUnit::Millisecond => (value, 0),
TimeUnit::Microsecond => {
// Use Euclidean division so that nanos is always non-negative,
// even for timestamps before the Unix epoch.
let millis = value.div_euclid(1000);
let nanos = (value.rem_euclid(1000) * 1000) as i32;
(millis, nanos)
}
TimeUnit::Nanosecond => {
// Use Euclidean division so that nanos is always in [0, 999_999].
let millis = value.div_euclid(1_000_000);
let nanos = value.rem_euclid(1_000_000) as i32;
(millis, nanos)
}
},
_ => unreachable!(),
};

if nanos == 0 {
construct_compact(millis)
} else {
// nanos is guaranteed to be in valid range [0, 999_999] by arithmetic
construct_with_nanos(millis, nanos).expect("nanos in valid range by construction")
}
}

/// Read date value from Arrow Date32Array
fn read_date_from_arrow(&self, pos: usize) -> i32 {
self.record_batch
.column(pos)
.as_any()
.downcast_ref::<Date32Array>()
.expect("Expected Date32Array")
.value(self.row_id)
}

/// Read time value from Arrow Time32/Time64 arrays, converting to milliseconds
fn read_time_from_arrow(&self, pos: usize) -> i32 {
let schema = self.record_batch.schema();
let arrow_field = schema.field(pos);
let column = self.record_batch.column(pos);

match arrow_field.data_type() {
ArrowDataType::Time32(TimeUnit::Second) => {
let value = column
.as_any()
.downcast_ref::<Time32SecondArray>()
.expect("Expected Time32SecondArray")
.value(self.row_id);
value * 1000 // Convert seconds to milliseconds
}
other => panic!("Expected Timestamp column at position {pos}, got {other:?}"),
ArrowDataType::Time32(TimeUnit::Millisecond) => column
.as_any()
.downcast_ref::<Time32MillisecondArray>()
.expect("Expected Time32MillisecondArray")
.value(self.row_id),
ArrowDataType::Time64(TimeUnit::Microsecond) => {
let value = column
.as_any()
.downcast_ref::<Time64MicrosecondArray>()
.expect("Expected Time64MicrosecondArray")
.value(self.row_id);
(value / 1000) as i32 // Convert microseconds to milliseconds
}
ArrowDataType::Time64(TimeUnit::Nanosecond) => {
let value = column
.as_any()
.downcast_ref::<Time64NanosecondArray>()
.expect("Expected Time64NanosecondArray")
.value(self.row_id);
(value / 1_000_000) as i32 // Convert nanoseconds to milliseconds
}
other => panic!("Expected Time column at position {pos}, got {other:?}"),
}
}
}
Expand Down Expand Up @@ -220,11 +298,11 @@ impl InternalRow for ColumnarRow {
}

fn get_date(&self, pos: usize) -> crate::row::datum::Date {
crate::row::datum::Date::new(self.get_int(pos))
crate::row::datum::Date::new(self.read_date_from_arrow(pos))
}

fn get_time(&self, pos: usize) -> crate::row::datum::Time {
crate::row::datum::Time::new(self.get_int(pos))
crate::row::datum::Time::new(self.read_time_from_arrow(pos))
}

fn get_timestamp_ntz(&self, pos: usize, precision: u32) -> crate::row::datum::TimestampNtz {
Expand All @@ -250,16 +328,12 @@ impl InternalRow for ColumnarRow {
}

fn get_char(&self, pos: usize, _length: usize) -> &str {
let array = self
.record_batch
self.record_batch
.column(pos)
.as_any()
.downcast_ref::<FixedSizeBinaryArray>()
.expect("Expected fixed-size binary array for char type");

let bytes = array.value(self.row_id);
// don't check length, following java client
std::str::from_utf8(bytes).expect("Invalid UTF-8 in char field")
.downcast_ref::<StringArray>()
.expect("Expected String array for char type")
.value(self.row_id)
}

fn get_string(&self, pos: usize) -> &str {
Expand Down Expand Up @@ -294,8 +368,8 @@ impl InternalRow for ColumnarRow {
mod tests {
use super::*;
use arrow::array::{
BinaryArray, BooleanArray, FixedSizeBinaryArray, Float32Array, Float64Array, Int8Array,
Int16Array, Int32Array, Int64Array, StringArray,
BinaryArray, BooleanArray, Float32Array, Float64Array, Int8Array, Int16Array, Int32Array,
Int64Array, StringArray,
};
use arrow::datatypes::{DataType, Field, Schema};

Expand All @@ -311,7 +385,7 @@ mod tests {
Field::new("f64", DataType::Float64, false),
Field::new("s", DataType::Utf8, false),
Field::new("bin", DataType::Binary, false),
Field::new("char", DataType::FixedSizeBinary(2), false),
Field::new("char", DataType::Utf8, false),
]));

let batch = RecordBatch::try_new(
Expand All @@ -326,13 +400,7 @@ mod tests {
Arc::new(Float64Array::from(vec![2.5])),
Arc::new(StringArray::from(vec!["hello"])),
Arc::new(BinaryArray::from(vec![b"data".as_slice()])),
Arc::new(
FixedSizeBinaryArray::try_from_sparse_iter_with_size(
vec![Some(b"ab".as_slice())].into_iter(),
2,
)
.expect("fixed array"),
),
Arc::new(StringArray::from(vec!["ab"])),
],
)
.expect("record batch");
Expand Down
Loading
Loading