Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions crates/iceberg/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,21 @@ impl TableScan {

/// Returns an [`ArrowRecordBatchStream`].
pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
self.to_arrow_from_tasks(self.plan_files().await?)
}

/// Like [`TableScan::to_arrow`], but accepts a caller-supplied
/// [`FileScanTask`] stream instead of running [`TableScan::plan_files`]
/// internally.
///
/// # Correctness
///
/// Tasks must come from a [`TableScan`] with the same projection and
/// filter as `self`: predicates are baked into each task at planning
/// time and are not re-applied here. Reader-side configuration
/// (concurrency, batch size, row-group filtering, row selection) is
/// taken from `self` and may differ from the planning scan.
pub fn to_arrow_from_tasks(&self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
.with_data_file_concurrency_limit(self.concurrency_limit_data_files)
.with_row_group_filtering_enabled(self.row_group_filtering_enabled)
Expand All @@ -442,10 +457,7 @@ impl TableScan {
arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
}

arrow_reader_builder
.build()
.read(self.plan_files().await?)
.map(|result| result.stream())
arrow_reader_builder.build().read(tasks).map(|r| r.stream())
}

/// Returns a reference to the column names of the table scan.
Expand Down
1 change: 0 additions & 1 deletion crates/integrations/datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,5 @@ pub(crate) mod write;

pub(crate) const DATA_FILES_COL_NAME: &str = "data_files";

pub use expr_to_predicate::convert_filters_to_predicate;
pub use project::project_with_partition;
pub use scan::IcebergTableScan;
Loading
Loading