diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 27f479183a..d205f56a92 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -433,6 +433,21 @@ impl TableScan { /// Returns an [`ArrowRecordBatchStream`]. pub async fn to_arrow(&self) -> Result { + self.to_arrow_from_tasks(self.plan_files().await?) + } + + /// Like [`TableScan::to_arrow`], but accepts a caller-supplied + /// [`FileScanTask`] stream instead of running [`TableScan::plan_files`] + /// internally. + /// + /// # Correctness + /// + /// Tasks must come from a [`TableScan`] with the same projection and + /// filter as `self`: predicates are baked into each task at planning + /// time and are not re-applied here. Reader-side configuration + /// (concurrency, batch size, row-group filtering, row selection) is + /// taken from `self` and may differ from the planning scan. + pub fn to_arrow_from_tasks(&self, tasks: FileScanTaskStream) -> Result { let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone()) .with_data_file_concurrency_limit(self.concurrency_limit_data_files) .with_row_group_filtering_enabled(self.row_group_filtering_enabled) @@ -442,10 +457,7 @@ impl TableScan { arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size); } - arrow_reader_builder - .build() - .read(self.plan_files().await?) - .map(|result| result.stream()) + arrow_reader_builder.build().read(tasks).map(|r| r.stream()) } /// Returns a reference to the column names of the table scan. diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs b/crates/integrations/datafusion/src/physical_plan/mod.rs index aeac30de32..5a9845cde0 100644 --- a/crates/integrations/datafusion/src/physical_plan/mod.rs +++ b/crates/integrations/datafusion/src/physical_plan/mod.rs @@ -26,6 +26,5 @@ pub(crate) mod write; pub(crate) const DATA_FILES_COL_NAME: &str = "data_files"; -pub use expr_to_predicate::convert_filters_to_predicate; pub use project::project_with_partition; pub use scan::IcebergTableScan; diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 36539ae503..0a386bded7 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -18,11 +18,10 @@ use std::any::Any; use std::pin::Pin; use std::sync::Arc; -use std::vec; use datafusion::arrow::array::RecordBatch; use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; -use datafusion::error::Result as DFResult; +use datafusion::error::{DataFusionError, Result as DFResult}; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr::EquivalenceProperties; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; @@ -31,46 +30,113 @@ use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProp use datafusion::prelude::Expr; use futures::{Stream, TryStreamExt}; use iceberg::expr::Predicate; +use iceberg::scan::{FileScanTask, TableScan}; use iceberg::table::Table; use super::expr_to_predicate::convert_filters_to_predicate; use crate::to_datafusion_error; -/// Manages the scanning process of an Iceberg [`Table`], encapsulating the -/// necessary details and computed properties required for execution planning. +/// Iceberg [`Table`] scan as a DataFusion [`ExecutionPlan`]. +/// +/// Has two construction modes: [`IcebergTableScan::new`] for a lazy +/// single-partition scan, and [`IcebergTableScan::new_with_tasks`] for an +/// eager multi-partition scan over pre-planned [`FileScanTask`] buckets. +/// +/// Note: in eager mode the underlying `TableScan` is rebuilt on every +/// `execute(partition)` call. The per-build cost is bounded (no I/O) and +/// keeps the plan free of `Arc`-shared evaluator caches that are awkward to +/// serialize across workers. #[derive(Debug)] pub struct IcebergTableScan { /// A table in the catalog. table: Table, /// Snapshot of the table to scan. snapshot_id: Option, - /// Stores certain, often expensive to compute, - /// plan properties used in query optimization. + /// Cached plan properties used by query optimization. plan_properties: Arc, - /// Projection column names, None means all columns + /// Projection column names, None means all columns. projection: Option>, - /// Filters to apply to the table scan + /// Filters to apply to the table scan. predicates: Option, - /// Optional limit on the number of rows to return + /// Pre-planned file scan tasks per partition (eager mode), or `None` (lazy mode). + buckets: Option>>, + /// Optional limit on the number of rows to return. limit: Option, } impl IcebergTableScan { - /// Creates a new [`IcebergTableScan`] object. - pub(crate) fn new( + /// Creates a lazy single-partition scan that plans and reads all tasks + /// inside `execute(0)`. Used by + /// [`IcebergStaticTableProvider`][crate::table::IcebergStaticTableProvider]. + pub fn new( + table: Table, + snapshot_id: Option, + schema: ArrowSchemaRef, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Self { + Self::new_inner( + table, + snapshot_id, + schema, + projection, + filters, + limit, + Partitioning::UnknownPartitioning(1), + None, + ) + } + + /// Creates an eager multi-partition scan over pre-planned task buckets. + /// Partition `i` streams `buckets[i]`. The caller is responsible for + /// ensuring `partitioning` matches the bucketing. Used by + /// [`IcebergTableProvider`][crate::table::IcebergTableProvider]. + #[allow(clippy::too_many_arguments)] + pub fn new_with_tasks( + table: Table, + snapshot_id: Option, + schema: ArrowSchemaRef, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + buckets: Vec>, + partitioning: Partitioning, + ) -> Self { + Self::new_inner( + table, + snapshot_id, + schema, + projection, + filters, + limit, + partitioning, + Some(buckets), + ) + } + + #[allow(clippy::too_many_arguments)] + fn new_inner( table: Table, snapshot_id: Option, schema: ArrowSchemaRef, projection: Option<&Vec>, filters: &[Expr], limit: Option, + partitioning: Partitioning, + buckets: Option>>, ) -> Self { let output_schema = match projection { None => schema.clone(), Some(projection) => Arc::new(schema.project(projection).unwrap()), }; - let plan_properties = Self::compute_properties(output_schema.clone()); - let projection = get_column_names(schema.clone(), projection); + let plan_properties = Arc::new(PlanProperties::new( + EquivalenceProperties::new(output_schema), + partitioning, + EmissionType::Incremental, + Boundedness::Bounded, + )); + let projection = get_column_names(schema, projection); let predicates = convert_filters_to_predicate(filters); Self { @@ -79,6 +145,7 @@ impl IcebergTableScan { plan_properties, projection, predicates, + buckets, limit, } } @@ -99,21 +166,17 @@ impl IcebergTableScan { self.predicates.as_ref() } + /// Returns the pre-planned file task buckets, or an empty slice in lazy mode. + pub fn buckets(&self) -> &[Vec] { + self.buckets.as_deref().unwrap_or(&[]) + } + pub fn limit(&self) -> Option { self.limit } - /// Computes [`PlanProperties`] used in query optimization. - fn compute_properties(schema: ArrowSchemaRef) -> Arc { - // TODO: - // This is more or less a placeholder, to be replaced - // once we support output-partitioning - Arc::new(PlanProperties::new( - EquivalenceProperties::new(schema), - Partitioning::UnknownPartitioning(1), - EmissionType::Incremental, - Boundedness::Bounded, - )) + fn total_file_count(&self) -> usize { + self.buckets().iter().map(|b| b.len()).sum() } } @@ -132,8 +195,15 @@ impl ExecutionPlan for IcebergTableScan { fn with_new_children( self: Arc, - _children: Vec>, + children: Vec>, ) -> DFResult> { + if !children.is_empty() { + return Err(DataFusionError::Internal(format!( + "{} is a leaf node and expects no children, but {} were provided", + self.name(), + children.len() + ))); + } Ok(self) } @@ -143,36 +213,34 @@ impl ExecutionPlan for IcebergTableScan { fn execute( &self, - _partition: usize, + partition: usize, _context: Arc, ) -> DFResult { - let fut = get_batch_stream( + let bucket = match &self.buckets { + Some(buckets) => Some(buckets.get(partition).cloned().ok_or_else(|| { + DataFusionError::Internal(format!( + "{}: partition index {partition} is out of bounds (total buckets: {})", + self.name(), + buckets.len() + )) + })?), + None => None, + }; + + let fut = build_record_batch_stream( self.table.clone(), self.snapshot_id, self.projection.clone(), self.predicates.clone(), + bucket, ); - let stream = futures::stream::once(fut).try_flatten(); - - // Apply limit if specified - let limited_stream: Pin> + Send>> = - if let Some(limit) = self.limit { - let mut remaining = limit; - Box::pin(stream.try_filter_map(move |batch| { - futures::future::ready(if remaining == 0 { - Ok(None) - } else if batch.num_rows() <= remaining { - remaining -= batch.num_rows(); - Ok(Some(batch)) - } else { - let limited_batch = batch.slice(0, remaining); - remaining = 0; - Ok(Some(limited_batch)) - }) - })) - } else { - Box::pin(stream) - }; + let stream = Box::pin(futures::stream::once(fut).try_flatten()) + as Pin> + Send>>; + + let limited_stream = match self.limit { + Some(limit) => apply_limit(stream, limit), + None => stream, + }; Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), @@ -187,16 +255,25 @@ impl DisplayAs for IcebergTableScan { _t: datafusion::physical_plan::DisplayFormatType, f: &mut std::fmt::Formatter, ) -> std::fmt::Result { + let projection = self + .projection + .as_deref() + .map_or(String::new(), |v| v.join(",")); + let predicate = self + .predicates + .as_ref() + .map_or(String::new(), |p| p.to_string()); + write!( f, - "IcebergTableScan projection:[{}] predicate:[{}]", - self.projection - .clone() - .map_or(String::new(), |v| v.join(",")), - self.predicates - .clone() - .map_or(String::from(""), |p| format!("{p}")) + "{} projection:[{projection}] predicate:[{predicate}]", + self.name() )?; + if let Some(buckets) = &self.buckets { + let file_count = self.total_file_count(); + let bucket_count = buckets.len(); + write!(f, " buckets:[{bucket_count}] file_count:[{file_count}]")?; + } if let Some(limit) = self.limit { write!(f, " limit:[{limit}]")?; } @@ -204,40 +281,81 @@ impl DisplayAs for IcebergTableScan { } } -/// Asynchronously retrieves a stream of [`RecordBatch`] instances -/// from a given table. -/// -/// This function initializes a [`TableScan`], builds it, -/// and then converts it into a stream of Arrow [`RecordBatch`]es. -async fn get_batch_stream( +fn build_table_scan( table: Table, snapshot_id: Option, column_names: Option>, predicates: Option, -) -> DFResult> + Send>>> { +) -> DFResult { let scan_builder = match snapshot_id { - Some(snapshot_id) => table.scan().snapshot_id(snapshot_id), + Some(id) => table.scan().snapshot_id(id), None => table.scan(), }; - let mut scan_builder = match column_names { - Some(column_names) => scan_builder.select(column_names), + Some(names) => scan_builder.select(names), None => scan_builder.select_all(), }; if let Some(pred) = predicates { scan_builder = scan_builder.with_filter(pred); } - let table_scan = scan_builder.build().map_err(to_datafusion_error)?; + scan_builder.build().map_err(to_datafusion_error) +} + +/// Builds the `RecordBatch` stream for a single partition. When `bucket` is +/// `Some`, streams the pre-planned tasks via `to_arrow_from_tasks`; when +/// `None`, plans and reads the full scan via `to_arrow`. +async fn build_record_batch_stream( + table: Table, + snapshot_id: Option, + column_names: Option>, + predicates: Option, + bucket: Option>, +) -> DFResult> + Send>>> { + let table_scan = build_table_scan(table, snapshot_id, column_names, predicates)?; + let stream: Pin> + Send>> = match bucket { + Some(bucket) => { + let task_stream = Box::pin(futures::stream::iter( + bucket.into_iter().map(Ok::<_, iceberg::Error>), + )); + Box::pin( + table_scan + .to_arrow_from_tasks(task_stream) + .map_err(to_datafusion_error)? + .map_err(to_datafusion_error), + ) + } + None => Box::pin( + table_scan + .to_arrow() + .await + .map_err(to_datafusion_error)? + .map_err(to_datafusion_error), + ), + }; + Ok(stream) +} - let stream = table_scan - .to_arrow() - .await - .map_err(to_datafusion_error)? - .map_err(to_datafusion_error); - Ok(Box::pin(stream)) +/// Truncates a stream of `RecordBatch` to at most `limit` rows. +fn apply_limit( + stream: Pin> + Send>>, + limit: usize, +) -> Pin> + Send>> { + let mut remaining = limit; + Box::pin(stream.try_filter_map(move |batch| { + futures::future::ready(if remaining == 0 { + Ok(None) + } else if batch.num_rows() <= remaining { + remaining -= batch.num_rows(); + Ok(Some(batch)) + } else { + let limited_batch = batch.slice(0, remaining); + remaining = 0; + Ok(Some(limited_batch)) + }) + })) } -fn get_column_names( +pub(super) fn get_column_names( schema: ArrowSchemaRef, projection: Option<&Vec>, ) -> Option> { diff --git a/crates/integrations/datafusion/src/table/bucketing.rs b/crates/integrations/datafusion/src/table/bucketing.rs new file mode 100644 index 0000000000..55ab23b6fd --- /dev/null +++ b/crates/integrations/datafusion/src/table/bucketing.rs @@ -0,0 +1,187 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion::arrow::array::{ + ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array, Int32Array, Int64Array, + StringArray, +}; +use datafusion::arrow::datatypes::{DataType, Schema as ArrowSchema}; +use datafusion::common::hash_utils::create_hashes; +use datafusion::physical_plan::repartition::REPARTITION_RANDOM_STATE; +use iceberg::scan::FileScanTask; +use iceberg::spec::{Literal, PrimitiveLiteral, Transform}; +use iceberg::table::Table; + +/// Identity-partitioned column that is also present in the output projection +/// and whose Arrow type can be reconstructed from a `Literal` for hashing. +pub(super) struct IdentityCol { + pub(super) name: String, + /// Position of this column in the *output* schema (after projection). + pub(super) output_idx: usize, + /// Position of this column inside the partition spec's `fields()` slice, + /// matching the slot order of `FileScanTask::partition`. + pub(super) spec_field_idx: usize, + pub(super) output_dtype: DataType, +} + +/// Inspect the table's default partition spec and return the list of identity +/// columns that can support a [`Partitioning::Hash`] declaration. Returns +/// `None` if any condition is violated: +/// - the source column for an identity field is not in the output projection +/// - the source column's Arrow type is not currently supported by +/// [`literal_to_array`] +/// - the table has spec evolution (>1 historical specs), since older files +/// may carry a partition tuple that does not align with the default spec +/// +/// Returning `None` forces the scan to declare `UnknownPartitioning` even if +/// bucketing succeeds. +pub(super) fn compute_identity_cols( + table: &Table, + output_schema: &ArrowSchema, +) -> Option> { + let metadata = table.metadata(); + if metadata.partition_specs_iter().len() > 1 { + return None; + } + let spec = metadata.default_partition_spec(); + let table_schema = metadata.current_schema(); + + let mut cols = Vec::new(); + for (spec_field_idx, pf) in spec.fields().iter().enumerate() { + if pf.transform != Transform::Identity { + continue; + } + let source_field = table_schema.field_by_id(pf.source_id)?; + let output_idx = output_schema.index_of(source_field.name.as_str()).ok()?; + let output_dtype = output_schema.field(output_idx).data_type().clone(); + if !is_supported_dtype(&output_dtype) { + return None; + } + cols.push(IdentityCol { + name: source_field.name.clone(), + output_idx, + spec_field_idx, + output_dtype, + }); + } + Some(cols) +} + +fn is_supported_dtype(dt: &DataType) -> bool { + matches!( + dt, + DataType::Boolean + | DataType::Int32 + | DataType::Int64 + | DataType::Float32 + | DataType::Float64 + | DataType::Utf8 + | DataType::Date32 + ) +} + +/// Distribute `tasks` across `n_partitions` buckets. When `identity_cols` +/// describes a non-empty, hashable identity key, each task is hashed on +/// that key using DataFusion's repartition hash so the resulting partitioning +/// matches what `RepartitionExec` would produce on the same data. Tasks +/// missing partition data fall back to hashing `data_file_path`, which still +/// distributes evenly but breaks the `Hash` contract — the second tuple +/// element flags whether every task supplied a full identity key. +pub(super) fn bucket_tasks( + tasks: Vec, + n_partitions: usize, + identity_cols: Option<&[IdentityCol]>, +) -> (Vec>, bool) { + if n_partitions == 0 { + return (Vec::new(), tasks.is_empty()); + } + let mut buckets: Vec> = (0..n_partitions).map(|_| Vec::new()).collect(); + let mut all_full_key = true; + let cols = identity_cols.unwrap_or(&[]); + + for task in tasks { + let bucket_idx = match identity_hash(&task, cols) { + Some(h) => (h % n_partitions as u64) as usize, + None => { + all_full_key = false; + fallback_hash(&task) as usize % n_partitions + } + }; + buckets[bucket_idx].push(task); + } + (buckets, all_full_key) +} + +/// Hash the identity-partition values of `task` using +/// [`REPARTITION_RANDOM_STATE`] so the bucket assignment matches DataFusion's +/// hash-repartition convention. Returns `None` if the task lacks partition +/// data or any required slot is null/unsupported. +fn identity_hash(task: &FileScanTask, cols: &[IdentityCol]) -> Option { + if cols.is_empty() { + return None; + } + let partition = task.partition.as_ref()?; + let mut arrays: Vec = Vec::with_capacity(cols.len()); + for col in cols { + let lit = partition.fields().get(col.spec_field_idx)?.as_ref()?; + arrays.push(literal_to_array(lit, &col.output_dtype)?); + } + let mut hashes = vec![0u64; 1]; + create_hashes( + &arrays, + REPARTITION_RANDOM_STATE.random_state(), + &mut hashes, + ) + .ok()?; + Some(hashes[0]) +} + +/// Deterministic per-file fallback used when `identity_hash` cannot produce a +/// bucket. The hash function does not need to match DataFusion's because any +/// task taking this path causes the scan to drop to `UnknownPartitioning`. +fn fallback_hash(task: &FileScanTask) -> u64 { + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + let mut hasher = DefaultHasher::new(); + task.data_file_path.hash(&mut hasher); + hasher.finish() +} + +/// Materialize a single-element Arrow array of `dt` holding the value of +/// `lit`. The Arrow type must match what DataFusion will see for this column +/// at scan time, otherwise `create_hashes` would dispatch on a different type +/// and produce a hash that disagrees with DataFusion's row-wise hashing. +fn literal_to_array(lit: &Literal, dt: &DataType) -> Option { + let prim = match lit { + Literal::Primitive(p) => p, + _ => return None, + }; + Some(match (prim, dt) { + (PrimitiveLiteral::Boolean(v), DataType::Boolean) => Arc::new(BooleanArray::from(vec![*v])), + (PrimitiveLiteral::Int(v), DataType::Int32) => Arc::new(Int32Array::from(vec![*v])), + (PrimitiveLiteral::Int(v), DataType::Date32) => Arc::new(Date32Array::from(vec![*v])), + (PrimitiveLiteral::Long(v), DataType::Int64) => Arc::new(Int64Array::from(vec![*v])), + (PrimitiveLiteral::Float(v), DataType::Float32) => Arc::new(Float32Array::from(vec![v.0])), + (PrimitiveLiteral::Double(v), DataType::Float64) => Arc::new(Float64Array::from(vec![v.0])), + (PrimitiveLiteral::String(v), DataType::Utf8) => { + Arc::new(StringArray::from(vec![v.as_str()])) + } + _ => return None, + }) +} diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 75b7988d8d..14aff7af3a 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -25,6 +25,7 @@ //! - [`IcebergStaticTableProvider`]: Static provider for read-only access to a specific //! table snapshot. Use for consistent analytical queries or time-travel scenarios. +mod bucketing; pub mod metadata_table; pub mod table_provider_factory; @@ -40,10 +41,14 @@ use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::Result as DFResult; use datafusion::logical_expr::dml::InsertOp; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; -use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_expr::expressions::Column; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion::physical_plan::{ExecutionPlan, Partitioning}; +use futures::TryStreamExt; use iceberg::arrow::schema_to_arrow_schema; use iceberg::inspect::MetadataTableType; +use iceberg::scan::FileScanTask; use iceberg::spec::TableProperties; use iceberg::table::Table; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; @@ -51,6 +56,7 @@ use metadata_table::IcebergMetadataTableProvider; use crate::error::to_datafusion_error; use crate::physical_plan::commit::IcebergCommitExec; +use crate::physical_plan::expr_to_predicate::convert_filters_to_predicate; use crate::physical_plan::project::project_with_partition; use crate::physical_plan::repartition::repartition; use crate::physical_plan::scan::IcebergTableScan; @@ -87,7 +93,6 @@ impl IcebergTableProvider { ) -> Result { let table_ident = TableIdent::new(namespace, name.into()); - // Load table once to get initial schema let table = catalog.load_table(&table_ident).await?; let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); @@ -102,7 +107,6 @@ impl IcebergTableProvider { &self, r#type: MetadataTableType, ) -> Result { - // Load fresh table metadata for metadata table access let table = self.catalog.load_table(&self.table_ident).await?; Ok(IcebergMetadataTableProvider { table, r#type }) } @@ -124,26 +128,93 @@ impl TableProvider for IcebergTableProvider { async fn scan( &self, - _state: &dyn Session, + state: &dyn Session, projection: Option<&Vec>, filters: &[Expr], limit: Option, ) -> DFResult> { - // Load fresh table metadata from catalog + // Second load: fetch the latest snapshot so scans always reflect current table state. let table = self .catalog .load_table(&self.table_ident) .await .map_err(to_datafusion_error)?; - // Create scan with fresh metadata (always use current snapshot) - Ok(Arc::new(IcebergTableScan::new( + // Build a TableScan mirroring the inputs we'll hand to IcebergTableScan, + // so plan_files() uses the same projection/filters the scan will replay in execute(). + let col_names = projection.map(|indices| { + indices + .iter() + .map(|&i| self.schema.field(i).name().clone()) + .collect::>() + }); + + let predicate = convert_filters_to_predicate(filters); + + let mut builder = table.scan(); + builder = match col_names { + Some(names) => builder.select(names), + None => builder.select_all(), + }; + if let Some(pred) = predicate { + builder = builder.with_filter(pred); + } + + let tasks: Vec = builder + .build() + .map_err(to_datafusion_error)? + .plan_files() + .await + .map_err(to_datafusion_error)? + .try_collect::>() + .await + .map_err(to_datafusion_error)?; + + // Output schema after projection: column indices in `Hash` exprs and any + // Arrow array we hash must reference this schema, not the full table schema. + let output_schema = match projection { + None => self.schema.clone(), + Some(p) => Arc::new(self.schema.project(p).map_err(|e| { + to_datafusion_error(Error::new(ErrorKind::DataInvalid, e.to_string())) + })?), + }; + + let target_partitions = state.config().target_partitions(); + // Always produce at least 1 partition so that DataFusion can schedule + // the plan normally and callers can safely call execute(0). An empty + // bucket simply yields an empty record-batch stream. + let n_partitions = target_partitions.min(tasks.len()).max(1); + + // identity_cols is Some(non-empty) iff every condition for declaring + // Partitioning::Hash is met: the table's default spec has identity-transform + // fields, every such source column is present in the output projection, and + // every column type is supported by literal_to_array. Any miss collapses to + // None, which forces UnknownPartitioning regardless of bucketing strategy. + let identity_cols = bucketing::compute_identity_cols(&table, &output_schema); + + let (buckets, all_had_full_key) = + bucketing::bucket_tasks(tasks, n_partitions, identity_cols.as_deref()); + + let partitioning = match identity_cols { + Some(cols) if !cols.is_empty() && all_had_full_key && n_partitions > 0 => { + let exprs: Vec> = cols + .iter() + .map(|c| Arc::new(Column::new(&c.name, c.output_idx)) as Arc) + .collect(); + Partitioning::Hash(exprs, n_partitions) + } + _ => Partitioning::UnknownPartitioning(n_partitions), + }; + + Ok(Arc::new(IcebergTableScan::new_with_tasks( table, None, // Always use current snapshot for catalog-backed provider self.schema.clone(), projection, filters, limit, + buckets, + partitioning, ))) } @@ -161,7 +232,6 @@ impl TableProvider for IcebergTableProvider { input: Arc, _insert_op: InsertOp, ) -> DFResult> { - // Load fresh table metadata from catalog let table = self .catalog .load_table(&self.table_ident) @@ -188,7 +258,6 @@ impl TableProvider for IcebergTableProvider { let repartitioned_plan = repartition(plan_with_partition, table.metadata_ref(), target_partitions)?; - // Apply sort node when it's not fanout mode let fanout_enabled = table .metadata() .properties() @@ -314,7 +383,6 @@ impl TableProvider for IcebergStaticTableProvider { filters: &[Expr], limit: Option, ) -> DFResult> { - // Use cached table (no refresh) Ok(Arc::new(IcebergTableScan::new( self.table.clone(), self.snapshot_id, @@ -865,4 +933,382 @@ mod tests { "Limit should be None when not specified" ); } + + // ── Bucketed scan tests ────────────────────────────────────────────────── + + async fn make_catalog_and_table_for_bucketing() + -> (Arc, NamespaceIdent, String, tempfile::TempDir) { + use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; + use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; + use iceberg::{CatalogBuilder, TableCreation}; + + let temp_dir = tempfile::TempDir::new().unwrap(); + let warehouse = temp_dir.path().to_str().unwrap().to_string(); + + let catalog = Arc::new( + MemoryCatalogBuilder::default() + .load( + "memory", + std::collections::HashMap::from([( + MEMORY_CATALOG_WAREHOUSE.to_string(), + warehouse.clone(), + )]), + ) + .await + .unwrap(), + ); + + let namespace = NamespaceIdent::new("ns".to_string()); + catalog + .create_namespace(&namespace, std::collections::HashMap::new()) + .await + .unwrap(); + + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + catalog + .create_table( + &namespace, + TableCreation::builder() + .name("t".to_string()) + .location(format!("{warehouse}/t")) + .schema(schema) + .properties(std::collections::HashMap::new()) + .build(), + ) + .await + .unwrap(); + + (catalog, namespace, "t".to_string(), temp_dir) + } + + /// Registers `n` synthetic data files in the table metadata via the iceberg + /// transaction API. No actual parquet files are written, only the metadata + /// entries that `plan_files()` reads are created. + async fn append_fake_data_files( + catalog: &Arc, + namespace: &NamespaceIdent, + table_name: &str, + n: usize, + ) { + use iceberg::spec::{DataContentType, DataFileBuilder, DataFileFormat}; + use iceberg::transaction::{ApplyTransactionAction, Transaction}; + + let table = catalog + .load_table(&TableIdent::new(namespace.clone(), table_name.to_string())) + .await + .unwrap(); + + let data_files = (0..n) + .map(|i| { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!( + "{}/data/fake_{i}.parquet", + table.metadata().location() + )) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(128) + .record_count(1) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .build() + .unwrap() + }) + .collect::>(); + + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(data_files); + action + .apply(tx) + .unwrap() + .commit(catalog.as_ref()) + .await + .unwrap(); + } + + fn ctx_with_target_partitions(n: usize) -> SessionContext { + use datafusion::prelude::SessionConfig; + SessionContext::new_with_config(SessionConfig::new().with_target_partitions(n)) + } + + /// An empty table must produce a single empty-bucket scan so that DataFusion + /// can schedule the plan normally. execute(0) on an empty bucket simply + /// returns an empty record-batch stream. + #[tokio::test] + async fn test_empty_table_single_empty_bucket() { + let (catalog, namespace, table_name, _temp_dir) = + make_catalog_and_table_for_bucketing().await; + // no files appended + let provider = IcebergTableProvider::try_new(catalog, namespace, table_name) + .await + .unwrap(); + let plan = provider + .scan(&ctx_with_target_partitions(8).state(), None, &[], None) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + + assert_eq!(scan.buckets().len(), 1); + assert_eq!(scan.buckets()[0].len(), 0); + assert_eq!(scan.properties().partitioning.partition_count(), 1); + } + + /// When the table has no identity-partition columns, every task takes the + /// fallback (file_path) bucket path, so the declaration must drop to + /// `UnknownPartitioning`. The bucket count should still equal + /// min(target_partitions, num_files). + #[tokio::test] + async fn test_unpartitioned_falls_back_to_unknown() { + use datafusion::physical_plan::Partitioning; + + let (catalog, namespace, table_name, _temp_dir) = + make_catalog_and_table_for_bucketing().await; + append_fake_data_files(&catalog, &namespace, &table_name, 5).await; + + let provider = IcebergTableProvider::try_new(catalog, namespace, table_name) + .await + .unwrap(); + let plan = provider + .scan(&ctx_with_target_partitions(3).state(), None, &[], None) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + + let total_files: usize = scan.buckets().iter().map(|b| b.len()).sum(); + assert_eq!(total_files, 5); + assert_eq!(scan.buckets().len(), 3); + assert!(matches!( + scan.properties().partitioning, + Partitioning::UnknownPartitioning(3) + )); + } + + /// Bucket count must be capped at the number of files: spinning up more + /// DataFusion partitions than there are tasks would just leave empty + /// streams, wasting scheduler slots. + #[tokio::test] + async fn test_bucket_count_capped_at_file_count() { + let (catalog, namespace, table_name, _temp_dir) = + make_catalog_and_table_for_bucketing().await; + append_fake_data_files(&catalog, &namespace, &table_name, 2).await; + + let provider = IcebergTableProvider::try_new(catalog, namespace, table_name) + .await + .unwrap(); + let plan = provider + .scan(&ctx_with_target_partitions(16).state(), None, &[], None) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + + assert_eq!(scan.buckets().len(), 2); + } + + /// target_partitions = 1 collapses every task into a single bucket, giving + /// the same execution profile as a single-partition scan. + #[tokio::test] + async fn test_single_target_partition_single_bucket() { + let (catalog, namespace, table_name, _temp_dir) = + make_catalog_and_table_for_bucketing().await; + append_fake_data_files(&catalog, &namespace, &table_name, 4).await; + + let provider = IcebergTableProvider::try_new(catalog, namespace, table_name) + .await + .unwrap(); + let plan = provider + .scan(&ctx_with_target_partitions(1).state(), None, &[], None) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + + assert_eq!(scan.buckets().len(), 1); + assert_eq!(scan.buckets()[0].len(), 4); + } + + async fn make_partitioned_catalog_and_table_for_bucketing() + -> (Arc, NamespaceIdent, String, tempfile::TempDir) { + use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; + use iceberg::spec::{ + NestedField, PrimitiveType, Schema, Transform, Type, UnboundPartitionSpec, + }; + use iceberg::{CatalogBuilder, TableCreation}; + + let temp_dir = tempfile::TempDir::new().unwrap(); + let warehouse = temp_dir.path().to_str().unwrap().to_string(); + + let catalog = Arc::new( + MemoryCatalogBuilder::default() + .load( + "memory", + std::collections::HashMap::from([( + MEMORY_CATALOG_WAREHOUSE.to_string(), + warehouse.clone(), + )]), + ) + .await + .unwrap(), + ); + + let namespace = NamespaceIdent::new("ns".to_string()); + catalog + .create_namespace(&namespace, std::collections::HashMap::new()) + .await + .unwrap(); + + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + let partition_spec = UnboundPartitionSpec::builder() + .with_spec_id(0) + .add_partition_field(2, "name_part", Transform::Identity) + .unwrap() + .build(); + + catalog + .create_table( + &namespace, + TableCreation::builder() + .name("t".to_string()) + .location(format!("{warehouse}/t")) + .schema(schema) + .partition_spec(partition_spec) + .properties(std::collections::HashMap::new()) + .build(), + ) + .await + .unwrap(); + + (catalog, namespace, "t".to_string(), temp_dir) + } + + /// Like [`append_fake_data_files`] but each file carries a partition tuple + /// matching the table's identity-partition spec on `name`. + async fn append_partitioned_fake_data_files( + catalog: &Arc, + namespace: &NamespaceIdent, + table_name: &str, + partition_values: Vec<&str>, + ) { + use iceberg::spec::{DataContentType, DataFileBuilder, DataFileFormat, Literal, Struct}; + use iceberg::transaction::{ApplyTransactionAction, Transaction}; + + let table = catalog + .load_table(&TableIdent::new(namespace.clone(), table_name.to_string())) + .await + .unwrap(); + + let data_files = partition_values + .iter() + .enumerate() + .map(|(i, value)| { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!( + "{}/data/fake_{i}.parquet", + table.metadata().location() + )) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(128) + .record_count(1) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter(vec![Some(Literal::string(*value))])) + .build() + .unwrap() + }) + .collect::>(); + + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(data_files); + action + .apply(tx) + .unwrap() + .commit(catalog.as_ref()) + .await + .unwrap(); + } + + /// Identity-partitioned table whose source column is in the projection + /// must produce `Partitioning::Hash` referencing that column. + #[tokio::test] + async fn test_identity_partitioned_declares_hash() { + use datafusion::physical_expr::expressions::Column; + use datafusion::physical_plan::Partitioning; + + let (catalog, namespace, table_name, _temp_dir) = + make_partitioned_catalog_and_table_for_bucketing().await; + append_partitioned_fake_data_files(&catalog, &namespace, &table_name, vec![ + "a", "b", "c", "a", "b", "c", + ]) + .await; + + let provider = IcebergTableProvider::try_new(catalog, namespace, table_name) + .await + .unwrap(); + let plan = provider + .scan(&ctx_with_target_partitions(3).state(), None, &[], None) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + + let total_files: usize = scan.buckets().iter().map(|b| b.len()).sum(); + assert_eq!(total_files, 6); + + match &scan.properties().partitioning { + Partitioning::Hash(exprs, n) => { + assert_eq!(*n, 3); + assert_eq!(exprs.len(), 1); + let col = exprs[0] + .as_any() + .downcast_ref::() + .expect("expected Column expr"); + assert_eq!(col.name(), "name"); + } + other => panic!("expected Partitioning::Hash, got {other:?}"), + } + } + + /// A projection that omits the partition source column drops + /// `compute_identity_cols` to `None`, collapsing to `UnknownPartitioning`. + #[tokio::test] + async fn test_projection_without_partition_col_falls_back_to_unknown() { + use datafusion::physical_plan::Partitioning; + + let (catalog, namespace, table_name, _temp_dir) = + make_partitioned_catalog_and_table_for_bucketing().await; + append_partitioned_fake_data_files(&catalog, &namespace, &table_name, vec!["a", "b"]).await; + + let provider = IcebergTableProvider::try_new(catalog, namespace, table_name) + .await + .unwrap(); + // Project only "id" (idx 0), excluding the partition column "name" (idx 1). + let projection = vec![0_usize]; + let plan = provider + .scan( + &ctx_with_target_partitions(3).state(), + Some(&projection), + &[], + None, + ) + .await + .unwrap(); + let scan = plan.as_any().downcast_ref::().unwrap(); + + assert!(matches!( + scan.properties().partitioning, + Partitioning::UnknownPartitioning(_) + )); + } } diff --git a/crates/integrations/datafusion/tests/integration_datafusion_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_test.rs index cebac75dd9..7603c8b7ab 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs @@ -131,7 +131,7 @@ async fn test_provider_plan_stream_schema() -> Result<()> { let task_ctx = Arc::new(df.task_ctx()); let plan = df.create_physical_plan().await.unwrap(); - let stream = plan.execute(1, task_ctx).unwrap(); + let stream = plan.execute(0, task_ctx).unwrap(); // Ensure both the plan and the stream conform to the same schema assert_eq!(plan.schema(), stream.schema()); @@ -600,8 +600,8 @@ async fn test_insert_into_nested() -> Result<()> { // Insert data with nested structs let insert_sql = r#" INSERT INTO catalog.test_insert_nested.nested_table - SELECT - 1 as id, + SELECT + 1 as id, 'Alice' as name, named_struct( 'address', named_struct( @@ -615,8 +615,8 @@ async fn test_insert_into_nested() -> Result<()> { ) ) as profile UNION ALL - SELECT - 2 as id, + SELECT + 2 as id, 'Bob' as name, named_struct( 'address', named_struct( @@ -738,15 +738,15 @@ async fn test_insert_into_nested() -> Result<()> { let df = ctx .sql( r#" - SELECT - id, + SELECT + id, name, profile.address.street, profile.address.city, profile.address.zip, profile.contact.email, profile.contact.phone - FROM catalog.test_insert_nested.nested_table + FROM catalog.test_insert_nested.nested_table ORDER BY id "#, ) @@ -852,8 +852,8 @@ async fn test_insert_into_partitioned() -> Result<()> { let df = ctx .sql( r#" - INSERT INTO catalog.test_partitioned_write.partitioned_table - VALUES + INSERT INTO catalog.test_partitioned_write.partitioned_table + VALUES (1, 'electronics', 'laptop'), (2, 'electronics', 'phone'), (3, 'books', 'novel'), diff --git a/crates/sqllogictest/testdata/slts/df_test/basic_queries.slt b/crates/sqllogictest/testdata/slts/df_test/basic_queries.slt index a5ca4de46a..d9933e0f87 100644 --- a/crates/sqllogictest/testdata/slts/df_test/basic_queries.slt +++ b/crates/sqllogictest/testdata/slts/df_test/basic_queries.slt @@ -53,7 +53,7 @@ logical_plan physical_plan 01)GlobalLimitExec: skip=0, fetch=3 02)--CooperativeExec -03)----IcebergTableScan projection:[id,name,score,category] predicate:[] limit:[3] +03)----IcebergTableScan projection:[id,name,score,category] predicate:[] buckets:[1] file_count:[1] limit:[3] # Test SELECT * with ORDER BY and LIMIT query ITRT diff --git a/crates/sqllogictest/testdata/slts/df_test/binary_predicate_pushdown.slt b/crates/sqllogictest/testdata/slts/df_test/binary_predicate_pushdown.slt index aa68ab2762..249d52edd0 100644 --- a/crates/sqllogictest/testdata/slts/df_test/binary_predicate_pushdown.slt +++ b/crates/sqllogictest/testdata/slts/df_test/binary_predicate_pushdown.slt @@ -28,7 +28,7 @@ logical_plan physical_plan 01)FilterExec: data@1 = 0102 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -03)----IcebergTableScan projection:[id,data] predicate:[data = 0102] +03)----IcebergTableScan projection:[id,data] predicate:[data = 0102] buckets:[1] file_count:[0] # Verify empty result from empty table query I? diff --git a/crates/sqllogictest/testdata/slts/df_test/boolean_predicate_pushdown.slt b/crates/sqllogictest/testdata/slts/df_test/boolean_predicate_pushdown.slt index 496f719261..b4596ba6ba 100644 --- a/crates/sqllogictest/testdata/slts/df_test/boolean_predicate_pushdown.slt +++ b/crates/sqllogictest/testdata/slts/df_test/boolean_predicate_pushdown.slt @@ -39,7 +39,7 @@ logical_plan physical_plan 01)FilterExec: is_active@1 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -03)----IcebergTableScan projection:[id,is_active,description] predicate:[is_active = true] +03)----IcebergTableScan projection:[id,is_active,description] predicate:[is_active = true] buckets:[1] file_count:[1] # Query with is_active = true query ITT rowsort @@ -59,7 +59,7 @@ logical_plan physical_plan 01)FilterExec: NOT is_active@1 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -03)----IcebergTableScan projection:[id,is_active,description] predicate:[is_active = false] +03)----IcebergTableScan projection:[id,is_active,description] predicate:[is_active = false] buckets:[1] file_count:[1] # Query with is_active = false query ITT rowsort @@ -78,7 +78,7 @@ logical_plan physical_plan 01)FilterExec: NOT is_active@1 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -03)----IcebergTableScan projection:[id,is_active,description] predicate:[is_active = false] +03)----IcebergTableScan projection:[id,is_active,description] predicate:[is_active = false] buckets:[1] file_count:[1] # Query with is_active != true (includes false and NULL) query ITT rowsort diff --git a/crates/sqllogictest/testdata/slts/df_test/like_predicate_pushdown.slt b/crates/sqllogictest/testdata/slts/df_test/like_predicate_pushdown.slt index 3d8b151aa9..698046046a 100644 --- a/crates/sqllogictest/testdata/slts/df_test/like_predicate_pushdown.slt +++ b/crates/sqllogictest/testdata/slts/df_test/like_predicate_pushdown.slt @@ -36,8 +36,8 @@ logical_plan 02)--TableScan: default.default.test_unpartitioned_table projection=[id, name], partial_filters=[default.default.test_unpartitioned_table.name LIKE Utf8("Al%")] physical_plan 01)FilterExec: name@1 LIKE Al% -02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -03)----IcebergTableScan projection:[id,name] predicate:[name STARTS WITH "Al"] +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 +03)----IcebergTableScan projection:[id,name] predicate:[name STARTS WITH "Al"] buckets:[2] file_count:[2] # Test LIKE filtering with case-sensitive match query IT rowsort @@ -55,8 +55,8 @@ logical_plan 02)--TableScan: default.default.test_unpartitioned_table projection=[id, name], partial_filters=[default.default.test_unpartitioned_table.name NOT LIKE Utf8("Al%")] physical_plan 01)FilterExec: name@1 NOT LIKE Al% -02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -03)----IcebergTableScan projection:[id,name] predicate:[name NOT STARTS WITH "Al"] +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3 +03)----IcebergTableScan projection:[id,name] predicate:[name NOT STARTS WITH "Al"] buckets:[3] file_count:[3] # Test NOT LIKE filtering query IT rowsort diff --git a/crates/sqllogictest/testdata/slts/df_test/timestamp_predicate_pushdown.slt b/crates/sqllogictest/testdata/slts/df_test/timestamp_predicate_pushdown.slt index ffa74173dc..47100cc36d 100644 --- a/crates/sqllogictest/testdata/slts/df_test/timestamp_predicate_pushdown.slt +++ b/crates/sqllogictest/testdata/slts/df_test/timestamp_predicate_pushdown.slt @@ -50,7 +50,7 @@ logical_plan physical_plan 01)FilterExec: ts@1 = 1672921800000000000 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -03)----IcebergTableScan projection:[id,ts] predicate:[ts = 2023-01-05 12:30:00] +03)----IcebergTableScan projection:[id,ts] predicate:[ts = 2023-01-05 12:30:00] buckets:[1] file_count:[1] # Verify timestamp equality filtering works query I? @@ -68,7 +68,7 @@ logical_plan physical_plan 01)FilterExec: ts@1 > 1673308800000000000 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -03)----IcebergTableScan projection:[id,ts] predicate:[ts > 2023-01-10 00:00:00] +03)----IcebergTableScan projection:[id,ts] predicate:[ts > 2023-01-10 00:00:00] buckets:[1] file_count:[1] # Verify timestamp greater than filtering query I? rowsort @@ -97,7 +97,7 @@ logical_plan physical_plan 01)FilterExec: ts@1 >= 1672876800000000000 AND ts@1 <= 1673827199000000000 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -03)----IcebergTableScan projection:[id,ts] predicate:[(ts >= 2023-01-05 00:00:00) AND (ts <= 2023-01-15 23:59:59)] +03)----IcebergTableScan projection:[id,ts] predicate:[(ts >= 2023-01-05 00:00:00) AND (ts <= 2023-01-15 23:59:59)] buckets:[1] file_count:[1] # Test timestamp range predicate filtering query I? rowsort @@ -162,7 +162,7 @@ logical_plan physical_plan 01)FilterExec: ts@1 > 1672531200000000 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -03)----IcebergTableScan projection:[id,ts] predicate:[ts > 2023-01-01 00:00:00] +03)----IcebergTableScan projection:[id,ts] predicate:[ts > 2023-01-01 00:00:00] buckets:[1] file_count:[1] query I? SELECT * FROM default.default.test_timestamp_micros WHERE ts > CAST('2023-01-01 00:00:00' AS TIMESTAMP)