Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
3763370
enable CometLocalTableScanExec by default
mbutrovich May 21, 2026
6830b09
Merge branch 'main' into enable_localtablescan
mbutrovich May 22, 2026
810e5d5
add NullType to toArrowType
mbutrovich May 22, 2026
174c939
add NullType to shuffles
mbutrovich May 22, 2026
3790c10
fix windowexec test and nulltype. fix timetype issues
mbutrovich May 22, 2026
18cd14b
Fix TimeType test.
mbutrovich May 22, 2026
fc40d59
fix null value type in map in native shuffle
mbutrovich May 22, 2026
92cc260
Merge branch 'main' into enable_localtablescan
mbutrovich May 22, 2026
cf0c1df
Merge branch 'main' into enable_localtablescan
mbutrovich May 26, 2026
8c088a7
avoid reuse in LocalTableScanExec
mbutrovich May 26, 2026
0cdfafe
Merge branch 'main' into enable_localtablescan
mbutrovich May 26, 2026
bd04fb4
Replace Comet's bespoke CometBatchIterator JNI input path with the ca…
mbutrovich May 27, 2026
2742b49
Merge branch 'main' into enable_localtablescan
mbutrovich May 27, 2026
04597c0
Merge branch 'main' into enable_localtablescan
mbutrovich May 27, 2026
b6db996
Unpack dictionaries.
mbutrovich May 27, 2026
5ca923f
Merge branch 'main' into enable_localtablescan
mbutrovich May 27, 2026
cf7bb6e
Fix shading issue.
mbutrovich May 27, 2026
2560e6f
Merge remote-tracking branch 'origin/enable_localtablescan' into enab…
mbutrovich May 27, 2026
82c9a1b
Try again to fix shading issue.
mbutrovich May 27, 2026
6adf124
Fix alignment issue for FFI Decimal128 with ArrowArrayStreamReader
mbutrovich May 27, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions native/core/src/execution/operators/aligned_stream_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{RecordBatch, RecordBatchOptions, StructArray};
use arrow::datatypes::{DataType, Schema, SchemaRef};
use arrow::error::ArrowError;
use arrow::ffi::{from_ffi_and_data_type, FFI_ArrowArray, FFI_ArrowSchema};
use arrow::ffi_stream::FFI_ArrowArrayStream;
use std::ffi::CStr;
use std::sync::Arc;

/// C Stream Interface reader that calls [`arrow::array::ArrayData::align_buffers`] on every
/// imported batch before constructing typed arrays. Stock `ArrowArrayStreamReader` panics
/// when a JVM producer hands us a `Decimal128` buffer at an offset that is 8-byte but not
/// 16-byte aligned, which Java's allocator does not guarantee. Track upstream:
/// <https://github.com/apache/arrow-rs/issues/10028>.
#[derive(Debug)]
pub struct AlignedArrowStreamReader {
stream: FFI_ArrowArrayStream,
schema: SchemaRef,
}

impl AlignedArrowStreamReader {
/// # Safety
/// `raw` must point at a valid `FFI_ArrowArrayStream` whose ownership is being transferred
/// to this reader. The stream's release callback fires when the reader is dropped.
pub unsafe fn from_raw(raw: *mut FFI_ArrowArrayStream) -> Result<Self, ArrowError> {
let mut stream = FFI_ArrowArrayStream::from_raw(raw);
if stream.release.is_none() {
return Err(ArrowError::CDataInterface(
"input stream is already released".to_string(),
));
}
let schema = read_schema(&mut stream)?;
Ok(Self { stream, schema })
}

pub fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}

fn last_error(&mut self) -> Option<String> {
let get = self.stream.get_last_error?;
let ptr = unsafe { get(&mut self.stream) };
if ptr.is_null() {
return None;
}
Some(
unsafe { CStr::from_ptr(ptr) }
.to_string_lossy()
.into_owned(),
)
}
}

impl Iterator for AlignedArrowStreamReader {
type Item = Result<RecordBatch, ArrowError>;

fn next(&mut self) -> Option<Self::Item> {
let mut array = FFI_ArrowArray::empty();
let ret = unsafe { self.stream.get_next.unwrap()(&mut self.stream, &mut array) };
if ret != 0 {
let msg = self
.last_error()
.unwrap_or_else(|| format!("get_next returned {ret}"));
return Some(Err(ArrowError::CDataInterface(msg)));
}
if array.is_released() {
return None;
}

let dt = DataType::Struct(self.schema.fields().clone());
Some(
unsafe { from_ffi_and_data_type(array, dt) }.and_then(|mut data| {
data.align_buffers();
let len = data.len();
RecordBatch::try_new_with_options(
Arc::clone(&self.schema),
StructArray::from(data).into_parts().1,
&RecordBatchOptions::new().with_row_count(Some(len)),
)
}),
)
}
}

fn read_schema(stream: &mut FFI_ArrowArrayStream) -> Result<SchemaRef, ArrowError> {
let mut schema = FFI_ArrowSchema::empty();
let ret = unsafe { stream.get_schema.unwrap()(stream, &mut schema) };
if ret != 0 {
return Err(ArrowError::CDataInterface(format!(
"Cannot get schema from input stream. Error code: {ret}"
)));
}
Ok(Arc::new(Schema::try_from(&schema)?))
}
2 changes: 2 additions & 0 deletions native/core/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
pub use crate::errors::ExecutionError;

pub use aligned_stream_reader::*;
pub use copy::*;
pub use iceberg_scan::*;
pub use scan::*;

mod aligned_stream_reader;
mod copy;
mod expand;
pub use expand::ExpandExec;
Expand Down
184 changes: 38 additions & 146 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use crate::execution::operators::{copy_array, copy_or_unpack_array, CopyMode};
use crate::{
errors::CometError,
execution::{
operators::ExecutionError, planner::TEST_EXEC_CONTEXT_ID, utils::SparkArrowConvert,
},
jvm_bridge::JVMClasses,
};
use arrow::array::{make_array, ArrayData, ArrayRef, RecordBatch, RecordBatchOptions};
use crate::execution::operators::{copy_or_unpack_array, AlignedArrowStreamReader, CopyMode};
use crate::{errors::CometError, execution::planner::TEST_EXEC_CONTEXT_ID};
use arrow::array::{ArrayRef, RecordBatch, RecordBatchOptions};
use arrow::compute::{cast_with_options, CastOptions};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::ffi::FFI_ArrowArray;
use arrow::ffi::FFI_ArrowSchema;
use datafusion::common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::metrics::{
Expand All @@ -40,52 +32,41 @@ use datafusion::{
};
use futures::Stream;
use itertools::Itertools;
use jni::objects::{Global, JObject, JValue};
use std::rc::Rc;
use std::{
any::Any,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
};

/// ScanExec reads batches of data from Spark via JNI. The source of the scan could be a file
/// scan or the result of reading a broadcast or shuffle exchange. ScanExec isn't invoked
/// until the data is already available in the JVM. When CometExecIterator invokes
/// Native.executePlan, it passes in the memory addresses of the input batches.
/// `ScanExec` reads batches of data from Spark over the Arrow C Stream Interface. The
/// `input_source` is moved out of the JVM-exported `ArrowArrayStream` at plan-construction time;
/// dropping the reader (when this exec drops) fires the stream's release callback, which closes
/// the JVM-side `ArrowReader` and its `VectorSchemaRoot`.
#[derive(Debug, Clone)]
pub struct ScanExec {
/// The ID of the execution context that owns this subquery. We use this ID to retrieve the JVM
/// environment `JNIEnv` from the execution context.
/// JVM execution-context id used to look up the `JNIEnv` for callbacks.
pub exec_context_id: i64,
/// The input source of scan node. It is a global reference of JVM `CometBatchIterator` object.
pub input_source: Option<Arc<Global<JObject<'static>>>>,
/// A description of the input source for informational purposes
/// The C Stream Interface reader. `None` only in unit tests that seed input via
/// `set_input_batch`.
pub input_source: Option<Arc<Mutex<AlignedArrowStreamReader>>>,
pub input_source_description: String,
/// The data types of columns of the input batch. Converted from Spark schema.
pub data_types: Vec<DataType>,
/// Schema of first batch
pub schema: SchemaRef,
/// The input batch of input data. Used to determine the schema of the input data.
/// It is also used in unit test to mock the input data from JVM.
/// Used in unit tests to mock the input batch; otherwise written by `pull_next` on each
/// poll.
pub batch: Arc<Mutex<Option<InputBatch>>>,
/// Cache of expensive-to-compute plan properties
cache: Arc<PlanProperties>,
/// Metrics collector
metrics: ExecutionPlanMetricsSet,
/// Baseline metrics
baseline_metrics: BaselineMetrics,
/// Whether native code can assume ownership of batches that it receives
arrow_ffi_safe: bool,
}

impl ScanExec {
pub fn new(
exec_context_id: i64,
input_source: Option<Arc<Global<JObject<'static>>>>,
input_source: Option<Arc<Mutex<AlignedArrowStreamReader>>>,
input_source_description: &str,
data_types: Vec<DataType>,
arrow_ffi_safe: bool,
) -> Result<Self, CometError> {
let metrics_set = ExecutionPlanMetricsSet::default();
let baseline_metrics = BaselineMetrics::new(&metrics_set, 0);
Expand All @@ -112,7 +93,6 @@ impl ScanExec {
metrics: metrics_set,
baseline_metrics,
schema,
arrow_ffi_safe,
})
}

Expand All @@ -131,22 +111,18 @@ impl ScanExec {
*self.batch.try_lock().unwrap() = Some(input);
}

/// Pull next input batch from JVM.
/// Pull next input batch from the upstream `ArrowArrayStreamReader`.
pub fn get_next_batch(&mut self) -> Result<(), CometError> {
if self.input_source.is_none() {
// This is a unit test. We don't need to call JNI.
// This is a unit test. Input batches are seeded via `set_input_batch`.
return Ok(());
}
let mut timer = self.baseline_metrics.elapsed_compute().timer();

let mut current_batch = self.batch.try_lock().unwrap();
if current_batch.is_none() {
let next_batch = ScanExec::get_next(
self.exec_context_id,
self.input_source.as_ref().unwrap().as_obj(),
self.data_types.len(),
self.arrow_ffi_safe,
)?;
let next_batch =
ScanExec::pull_next(self.exec_context_id, self.input_source.as_ref().unwrap())?;
*current_batch = Some(next_batch);
}

Expand All @@ -155,119 +131,35 @@ impl ScanExec {
Ok(())
}

/// Invokes JNI call to get next batch.
fn get_next(
/// Pull the next `RecordBatch` from the stream and convert it to an `InputBatch`. Dictionary
/// columns are unpacked because Comet's downstream operators do not handle them.
fn pull_next(
exec_context_id: i64,
iter: &JObject,
num_cols: usize,
arrow_ffi_safe: bool,
reader: &Arc<Mutex<AlignedArrowStreamReader>>,
) -> Result<InputBatch, CometError> {
if exec_context_id == TEST_EXEC_CONTEXT_ID {
// This is a unit test. We don't need to call JNI.
// Unit test path; input batches are seeded directly.
return Ok(InputBatch::EOF);
}

if iter.is_null() {
return Err(CometError::from(ExecutionError::GeneralError(format!(
"Null batch iterator object. Plan id: {exec_context_id}"
))));
}

JVMClasses::with_env(|env| {
let num_rows: i32 = unsafe {
jni_call!(env,
comet_batch_iterator(iter).has_next() -> i32)?
};

if num_rows == -1 {
return Ok(InputBatch::EOF);
}

// fetch batch data from JVM via FFI
let (num_rows, array_addrs, schema_addrs) =
Self::allocate_and_fetch_batch(env, iter, num_cols)?;

let mut inputs: Vec<ArrayRef> = Vec::with_capacity(num_cols);

// Process each column
for i in 0..num_cols {
let array_ptr = array_addrs[i];
let schema_ptr = schema_addrs[i];
let array_data = ArrayData::from_spark((array_ptr, schema_ptr))?;

// TODO: validate array input data
// array_data.validate_full()?;

let array = make_array(array_data);

let array = if arrow_ffi_safe {
// ownership of this array has been transferred to native
// but we still need to unpack dictionary arrays
copy_or_unpack_array(&array, &CopyMode::UnpackOrClone)?
} else {
// it is necessary to copy the array because the contents may be
// overwritten on the JVM side in the future
copy_array(&array)
};

inputs.push(array);

// Drop the Arcs to avoid memory leak
unsafe {
Rc::from_raw(array_ptr as *const FFI_ArrowArray);
Rc::from_raw(schema_ptr as *const FFI_ArrowSchema);
let mut reader = reader
.try_lock()
.map_err(|_| CometError::Internal("AlignedArrowStreamReader contended".to_string()))?;

let next = reader.next();
match next {
None => Ok(InputBatch::EOF),
Some(Err(e)) => Err(CometError::from(e)),
Some(Ok(record_batch)) => {
let num_rows = record_batch.num_rows();
let columns = record_batch.columns();
let mut inputs: Vec<ArrayRef> = Vec::with_capacity(columns.len());
for col in columns {
inputs.push(copy_or_unpack_array(col, &CopyMode::UnpackOrClone)?);
}
Ok(InputBatch::new(inputs, Some(num_rows)))
}

Ok(InputBatch::new(inputs, Some(num_rows as usize)))
})
}

/// Allocates Arrow FFI structures and calls JNI to get the next batch data.
/// Returns the number of rows and the allocated array/schema addresses.
fn allocate_and_fetch_batch(
env: &mut jni::Env,
iter: &JObject,
num_cols: usize,
) -> Result<(i32, Vec<i64>, Vec<i64>), CometError> {
let mut array_addrs = Vec::with_capacity(num_cols);
let mut schema_addrs = Vec::with_capacity(num_cols);

for _ in 0..num_cols {
let arrow_array = Rc::new(FFI_ArrowArray::empty());
let arrow_schema = Rc::new(FFI_ArrowSchema::empty());
let (array_ptr, schema_ptr) = (
Rc::into_raw(arrow_array) as i64,
Rc::into_raw(arrow_schema) as i64,
);

array_addrs.push(array_ptr);
schema_addrs.push(schema_ptr);
}

// Prepare the java array parameters
let long_array_addrs = env.new_long_array(num_cols)?;
let long_schema_addrs = env.new_long_array(num_cols)?;

long_array_addrs.set_region(env, 0, &array_addrs)?;
long_schema_addrs.set_region(env, 0, &schema_addrs)?;

let array_obj = JObject::from(long_array_addrs);
let schema_obj = JObject::from(long_schema_addrs);

let array_obj = JValue::Object(array_obj.as_ref());
let schema_obj = JValue::Object(schema_obj.as_ref());

let num_rows: i32 = unsafe {
jni_call!(env,
comet_batch_iterator(iter).next(array_obj, schema_obj) -> i32)?
};

// we already checked for end of results on call to has_next() so should always
// have a valid row count when calling next()
assert!(num_rows != -1);

Ok((num_rows, array_addrs, schema_addrs))
}
}

Expand Down
Loading
Loading