diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 7ee743a6abe71..dd3675bd2b39d 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -32,7 +32,7 @@ use datafusion_datasource::file_sink_config::{FileOutputMode, FileSinkConfig}; #[expect(deprecated)] use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_datasource::{ - ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics, + ListingTableUrl, PartitionedFile, TableSchemaBuilder, compute_all_files_statistics, }; use datafusion_execution::cache::TableScopedPath; use datafusion_execution::cache::cache_manager::FileStatisticsCache; @@ -321,14 +321,15 @@ impl ListingTable { /// Creates a file source for this table fn create_file_source(&self) -> Arc { - let table_schema = TableSchema::new( - Arc::clone(&self.file_schema), - self.options - .table_partition_cols - .iter() - .map(|(col, field)| Arc::new(Field::new(col, field.clone(), false))) - .collect(), - ); + let table_schema = TableSchemaBuilder::from(&self.file_schema) + .with_table_partition_cols( + self.options + .table_partition_cols + .iter() + .map(|(col, field)| Arc::new(Field::new(col, field.clone(), false))) + .collect::>(), + ) + .build(); self.options.format.file_source(table_schema) } diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index b04238ebc9b37..c46b472bd6404 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -67,7 +67,7 @@ pub(crate) mod test_util { .await? }; - let table_schema = TableSchema::new(file_schema.clone(), vec![]); + let table_schema = TableSchema::from(&file_schema); let statistics = format .infer_stats(state, &store, file_schema.clone(), &meta) diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index 2954a47403299..c9ee2cc407783 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -34,7 +34,7 @@ mod tests { use datafusion_common::{Result, ScalarValue, test_util}; use datafusion_datasource::file_format::FileFormat; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; - use datafusion_datasource::{PartitionedFile, TableSchema}; + use datafusion_datasource::{PartitionedFile, TableSchemaBuilder}; use datafusion_datasource_avro::AvroFormat; use datafusion_datasource_avro::source::AvroSource; use datafusion_execution::object_store::ObjectStoreUrl; @@ -223,10 +223,13 @@ mod tests { partitioned_file.partition_values = vec![ScalarValue::from("2021-10-26")]; let projection = Some(vec![0, 1, file_schema.fields().len(), 2]); - let table_schema = TableSchema::new( - file_schema.clone(), - vec![Arc::new(Field::new("date", DataType::Utf8, false))], - ); + let table_schema = TableSchemaBuilder::from(file_schema) + .with_table_partition_cols(vec![Arc::new(Field::new( + "date", + DataType::Utf8, + false, + ))]) + .build(); let source = Arc::new(AvroSource::new(table_schema.clone())); let conf = FileScanConfigBuilder::new(object_store_url, source) // select specific columns of the files as well as the partitioning diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 82c47b6c7281c..56642d583e414 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -122,7 +122,7 @@ mod tests { quote: b'"', ..Default::default() }; - let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema)); + let table_schema = TableSchema::from(&file_schema); let source = Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options)); let config = @@ -194,7 +194,7 @@ mod tests { quote: b'"', ..Default::default() }; - let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema)); + let table_schema = TableSchema::from(&file_schema); let source = Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options)); let config = @@ -265,7 +265,7 @@ mod tests { quote: b'"', ..Default::default() }; - let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema)); + let table_schema = TableSchema::from(&file_schema); let source = Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options)); let config = @@ -335,7 +335,7 @@ mod tests { quote: b'"', ..Default::default() }; - let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema)); + let table_schema = TableSchema::from(&file_schema); let source = Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options)); let config = @@ -371,7 +371,7 @@ mod tests { file_compression_type: FileCompressionType, ) -> Result<()> { use datafusion_common::ScalarValue; - use datafusion_datasource::TableSchema; + use datafusion_datasource::TableSchemaBuilder; let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); @@ -400,10 +400,13 @@ mod tests { quote: b'"', ..Default::default() }; - let table_schema = TableSchema::new( - Arc::clone(&file_schema), - vec![Arc::new(Field::new("date", DataType::Utf8, false))], - ); + let table_schema = TableSchemaBuilder::from(&file_schema) + .with_table_partition_cols(vec![Arc::new(Field::new( + "date", + DataType::Utf8, + false, + ))]) + .build(); let source = Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options)); let config = @@ -508,7 +511,7 @@ mod tests { quote: b'"', ..Default::default() }; - let table_schema = TableSchema::from_file_schema(Arc::clone(&file_schema)); + let table_schema = TableSchema::from(&file_schema); let source = Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options)); let config = diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 6f38df46e3d2e..87e7fb1af4dd5 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -54,7 +54,7 @@ mod tests { use datafusion_datasource::source::DataSourceExec; use datafusion_datasource::file::FileSource; - use datafusion_datasource::{PartitionedFile, TableSchema}; + use datafusion_datasource::{PartitionedFile, TableSchemaBuilder}; use datafusion_datasource_parquet::source::ParquetSource; use datafusion_datasource_parquet::{ DefaultParquetFileReaderFactory, ParquetFileReaderFactory, ParquetFormat, @@ -1642,9 +1642,8 @@ mod tests { ), ]); - let table_schema = TableSchema::new( - Arc::clone(&schema), - vec![ + let table_schema = TableSchemaBuilder::from(&schema) + .with_table_partition_cols(vec![ Arc::new(Field::new("year", DataType::Utf8, false)), Arc::new(Field::new("month", DataType::UInt8, false)), Arc::new(Field::new( @@ -1655,8 +1654,8 @@ mod tests { ), false, )), - ], - ); + ]) + .build(); let source = Arc::new(ParquetSource::new(table_schema.clone())); let config = FileScanConfigBuilder::new(object_store_url, source) .with_file(partitioned_file) diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 717182f1d3d5b..f46a5a0749065 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -103,7 +103,7 @@ pub fn scan_partitioned_csv( quote: b'"', ..Default::default() }; - let table_schema = TableSchema::from_file_schema(schema); + let table_schema = TableSchema::from(schema); let source = Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options)); let config = FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source)?) diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs index 6f88e01059fc9..9f83f070d0286 100644 --- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs @@ -25,7 +25,7 @@ use datafusion::datasource::physical_plan::CsvSource; use datafusion::datasource::source::DataSourceExec; use datafusion_common::config::{ConfigOptions, CsvOptions}; use datafusion_common::{JoinSide, JoinType, NullEquality, Result, ScalarValue}; -use datafusion_datasource::TableSchema; +use datafusion_datasource::TableSchemaBuilder; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; @@ -1574,10 +1574,13 @@ fn partitioned_data_source() -> Arc { quote: b'"', ..Default::default() }; - let table_schema = TableSchema::new( - Arc::clone(&file_schema), - vec![Arc::new(Field::new("partition_col", DataType::Utf8, true))], - ); + let table_schema = TableSchemaBuilder::from(&file_schema) + .with_table_partition_cols(vec![Arc::new(Field::new( + "partition_col", + DataType::Utf8, + true, + ))]) + .build(); let config = FileScanConfigBuilder::new( ObjectStoreUrl::parse("test:///").unwrap(), Arc::new(CsvSource::new(table_schema).with_csv_options(options)), diff --git a/datafusion/core/tests/physical_optimizer/pushdown_utils.rs b/datafusion/core/tests/physical_optimizer/pushdown_utils.rs index 61fd0a45952ba..2ffd1899b3c1d 100644 --- a/datafusion/core/tests/physical_optimizer/pushdown_utils.rs +++ b/datafusion/core/tests/physical_optimizer/pushdown_utils.rs @@ -111,7 +111,7 @@ pub struct TestSource { impl TestSource { pub fn new(schema: SchemaRef, support: bool, batches: Vec) -> Self { - let table_schema = datafusion_datasource::TableSchema::new(schema, vec![]); + let table_schema = datafusion_datasource::TableSchema::from(schema); Self { support, metrics: ExecutionPlanMetricsSet::new(), diff --git a/datafusion/datasource-arrow/src/file_format.rs b/datafusion/datasource-arrow/src/file_format.rs index 9297486ad66e7..1a3e0210145f8 100644 --- a/datafusion/datasource-arrow/src/file_format.rs +++ b/datafusion/datasource-arrow/src/file_format.rs @@ -37,7 +37,6 @@ use datafusion_common::{ internal_datafusion_err, not_impl_err, }; use datafusion_common_runtime::{JoinSet, SpawnedTask}; -use datafusion_datasource::TableSchema; use datafusion_datasource::display::FileGroupDisplay; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; @@ -45,6 +44,7 @@ use datafusion_datasource::sink::{DataSink, DataSinkExec}; use datafusion_datasource::write::{ ObjectWriterBuilder, SharedBuffer, get_writer_schema, }; +use datafusion_datasource::{TableSchema, TableSchemaBuilder}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; use datafusion_physical_expr_common::sort_expr::LexRequirement; @@ -197,10 +197,9 @@ impl FileFormat for ArrowFormat { .object_meta .location; - let table_schema = TableSchema::new( - Arc::clone(conf.file_schema()), - conf.table_partition_cols().clone(), - ); + let table_schema = TableSchemaBuilder::from(conf.file_schema()) + .with_table_partition_cols(conf.table_partition_cols().clone()) + .build(); let mut source: Arc = match is_object_in_arrow_ipc_file_format(object_store, object_location).await diff --git a/datafusion/datasource-parquet/src/opener/mod.rs b/datafusion/datasource-parquet/src/opener/mod.rs index 5b40a947d9ea4..09e77638776e5 100644 --- a/datafusion/datasource-parquet/src/opener/mod.rs +++ b/datafusion/datasource-parquet/src/opener/mod.rs @@ -1415,7 +1415,7 @@ mod test { stats::Precision, }; use datafusion_datasource::morsel::{Morsel, Morselizer}; - use datafusion_datasource::{PartitionedFile, TableSchema}; + use datafusion_datasource::{PartitionedFile, TableSchema, TableSchemaBuilder}; use datafusion_expr::{col, lit}; use datafusion_physical_expr::{ PhysicalExpr, @@ -1495,7 +1495,7 @@ mod test { /// Create a simple table schema from a file schema (for files without partition columns). fn with_schema(mut self, file_schema: SchemaRef) -> Self { - self.table_schema = Some(TableSchema::from_file_schema(file_schema)); + self.table_schema = Some(TableSchema::from(file_schema)); self } @@ -1882,10 +1882,13 @@ mod test { Field::new("a", DataType::Int32, false), ])); - let table_schema_for_opener = TableSchema::new( - file_schema.clone(), - vec![Arc::new(Field::new("part", DataType::Int32, false))], - ); + let table_schema_for_opener = TableSchemaBuilder::from(&file_schema) + .with_table_partition_cols(vec![Arc::new(Field::new( + "part", + DataType::Int32, + false, + ))]) + .build(); let make_opener = |predicate| { ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) @@ -1951,10 +1954,13 @@ mod test { Field::new("a", DataType::Int32, false), Field::new("b", DataType::Float32, true), ])); - let table_schema_for_opener = TableSchema::new( - file_schema.clone(), - vec![Arc::new(Field::new("part", DataType::Int32, false))], - ); + let table_schema_for_opener = TableSchemaBuilder::from(&file_schema) + .with_table_partition_cols(vec![Arc::new(Field::new( + "part", + DataType::Int32, + false, + ))]) + .build(); let make_opener = |predicate| { ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) @@ -2023,10 +2029,13 @@ mod test { Field::new("a", DataType::Int32, false), ])); - let table_schema_for_opener = TableSchema::new( - file_schema.clone(), - vec![Arc::new(Field::new("part", DataType::Int32, false))], - ); + let table_schema_for_opener = TableSchemaBuilder::from(&file_schema) + .with_table_partition_cols(vec![Arc::new(Field::new( + "part", + DataType::Int32, + false, + ))]) + .build(); let make_opener = |predicate| { ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) @@ -2104,10 +2113,13 @@ mod test { Field::new("part", DataType::Int32, false), ])); - let table_schema_for_opener = TableSchema::new( - file_schema.clone(), - vec![Arc::new(Field::new("part", DataType::Int32, false))], - ); + let table_schema_for_opener = TableSchemaBuilder::from(&file_schema) + .with_table_partition_cols(vec![Arc::new(Field::new( + "part", + DataType::Int32, + false, + ))]) + .build(); let make_opener = |predicate| { ParquetMorselizerBuilder::new() .with_store(Arc::clone(&store)) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 2e2d0be0da507..8952666491517 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -1287,7 +1287,9 @@ mod tests { let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])); let partition_b = Arc::new(Field::new("b", DataType::Int32, true)); - let table_schema = TableSchema::new(file_schema, vec![partition_b]); + let table_schema = TableSchema::builder(file_schema) + .with_table_partition_cols(vec![partition_b]) + .build(); let source = ParquetSource::new(table_schema); // EquivalenceProperties is built on the *full* table schema so diff --git a/datafusion/datasource/src/file_scan_config/mod.rs b/datafusion/datasource/src/file_scan_config/mod.rs index 4bf86e17d387d..3ebd588a0770f 100644 --- a/datafusion/datasource/src/file_scan_config/mod.rs +++ b/datafusion/datasource/src/file_scan_config/mod.rs @@ -27,7 +27,7 @@ use crate::{ file_stream::work_source::SharedWorkSource, source::DataSource, statistics::MinMaxStatistics, }; -use arrow::datatypes::FieldRef; +use arrow::datatypes::Fields; use arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion_common::config::ConfigOptions; use datafusion_common::{ @@ -238,7 +238,9 @@ pub struct FileScanConfig { /// ]; /// /// // Create table schema with file schema and partition columns -/// let table_schema = TableSchema::new(file_schema, partition_cols); +/// let table_schema = TableSchema::builder(file_schema) +/// .with_table_partition_cols(partition_cols) +/// .build(); /// /// // Create a builder for scanning Parquet files from a local filesystem /// let config = FileScanConfigBuilder::new( @@ -1095,7 +1097,7 @@ impl FileScanConfig { } /// Get the table partition columns - pub fn table_partition_cols(&self) -> &Vec { + pub fn table_partition_cols(&self) -> &Fields { self.file_source.table_schema().table_partition_cols() } @@ -1423,9 +1425,9 @@ mod tests { use std::collections::HashMap; use super::*; - use crate::TableSchema; use crate::source::DataSourceExec; use crate::test_util::col; + use crate::{TableSchema, TableSchemaBuilder}; use crate::{ generate_test_files, test_util::MockSource, tests::aggr_test_schema, verify_sort_integrity, @@ -1850,10 +1852,14 @@ mod tests { statistics: Statistics, table_partition_cols: Vec, ) -> FileScanConfig { - let table_schema = TableSchema::new( - file_schema, - table_partition_cols.into_iter().map(Arc::new).collect(), - ); + let table_schema = TableSchema::builder(file_schema) + .with_table_partition_cols( + table_partition_cols + .into_iter() + .map(Arc::new) + .collect::(), + ) + .build(); FileScanConfigBuilder::new( ObjectStoreUrl::parse("test:///").unwrap(), Arc::new(MockSource::new(table_schema.clone())), @@ -1869,14 +1875,13 @@ mod tests { let file_schema = aggr_test_schema(); let object_store_url = ObjectStoreUrl::parse("test:///").unwrap(); - let table_schema = TableSchema::new( - Arc::clone(&file_schema), - vec![Arc::new(Field::new( + let table_schema = TableSchemaBuilder::from(&file_schema) + .with_table_partition_cols(vec![Arc::new(Field::new( "date", wrap_partition_type_in_dict(DataType::Utf8), false, - ))], - ); + ))]) + .build(); let file_source: Arc = Arc::new(MockSource::new(table_schema.clone())); @@ -1938,7 +1943,7 @@ mod tests { let file_schema = aggr_test_schema(); let object_store_url = ObjectStoreUrl::parse("test:///").unwrap(); - let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let table_schema = TableSchema::from(&file_schema); // Create a file source with a filter let file_source: Arc = Arc::new( @@ -1991,7 +1996,7 @@ mod tests { let file_schema = aggr_test_schema(); let object_store_url = ObjectStoreUrl::parse("test:///").unwrap(); - let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let table_schema = TableSchema::from(&file_schema); let file_source: Arc = Arc::new(MockSource::new(table_schema.clone())); @@ -2053,10 +2058,14 @@ mod tests { )]; let file = PartitionedFile::new("test_file.parquet", 100); - let table_schema = TableSchema::new( - Arc::clone(&schema), - partition_cols.iter().map(|f| Arc::new(f.clone())).collect(), - ); + let table_schema = TableSchemaBuilder::from(&schema) + .with_table_partition_cols( + partition_cols + .iter() + .map(|f| Arc::new(f.clone())) + .collect::(), + ) + .build(); let file_source: Arc = Arc::new(MockSource::new(table_schema.clone())); @@ -2092,7 +2101,10 @@ mod tests { Some(vec![0, 2]) ); assert_eq!(new_config.limit, Some(10)); - assert_eq!(*new_config.table_partition_cols(), partition_cols); + assert_eq!( + *new_config.table_partition_cols(), + Fields::from(partition_cols) + ); assert_eq!(new_config.file_groups.len(), 1); assert_eq!(new_config.file_groups[0].len(), 1); assert_eq!( @@ -2302,7 +2314,7 @@ mod tests { let file_group = FileGroup::new(vec![PartitionedFile::new("test.parquet", 1024)]) .with_statistics(Arc::new(file_group_stats)); - let table_schema = TableSchema::new(Arc::clone(&schema), vec![]); + let table_schema = TableSchema::from(&schema); // Create a FileScanConfig with projection: only keep columns 0 and 2 let config = FileScanConfigBuilder::new( @@ -2533,7 +2545,7 @@ mod tests { let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])); - let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let table_schema = TableSchema::from(&file_schema); let file_source = Arc::new(InexactSortPushdownSource::new(table_schema)); let file_groups = vec![FileGroup::new(vec![ @@ -2648,7 +2660,7 @@ mod tests { fn sort_pushdown_unsupported_source_files_get_sorted() -> Result<()> { let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); - let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let table_schema = TableSchema::from(&file_schema); let file_source = Arc::new(MockSource::new(table_schema)); let file_groups = vec![FileGroup::new(vec![ @@ -2682,7 +2694,7 @@ mod tests { fn sort_pushdown_unsupported_source_already_sorted() -> Result<()> { let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); - let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let table_schema = TableSchema::from(&file_schema); let file_source = Arc::new(MockSource::new(table_schema)); let file_groups = vec![FileGroup::new(vec![ @@ -2706,7 +2718,7 @@ mod tests { fn sort_pushdown_unsupported_source_descending_sort() -> Result<()> { let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); - let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let table_schema = TableSchema::from(&file_schema); let file_source = Arc::new(MockSource::new(table_schema)); let file_groups = vec![FileGroup::new(vec![ @@ -2745,7 +2757,7 @@ mod tests { fn sort_pushdown_exact_source_non_overlapping_returns_exact() -> Result<()> { let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); - let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let table_schema = TableSchema::from(&file_schema); let file_source = Arc::new(ExactSortPushdownSource::new(table_schema)); let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); @@ -2779,7 +2791,7 @@ mod tests { fn sort_pushdown_exact_source_overlapping_downgraded_to_inexact() -> Result<()> { let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); - let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let table_schema = TableSchema::from(&file_schema); let file_source = Arc::new(ExactSortPushdownSource::new(table_schema)); let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); @@ -2813,7 +2825,7 @@ mod tests { fn sort_pushdown_exact_source_out_of_order_returns_exact() -> Result<()> { let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); - let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let table_schema = TableSchema::from(&file_schema); let file_source = Arc::new(ExactSortPushdownSource::new(table_schema)); let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); @@ -2851,7 +2863,7 @@ mod tests { fn sort_pushdown_unsupported_source_single_file_groups() -> Result<()> { let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); - let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let table_schema = TableSchema::from(&file_schema); let file_source = Arc::new(MockSource::new(table_schema)); let file_groups = vec![ @@ -2877,7 +2889,7 @@ mod tests { fn sort_pushdown_unsupported_source_multiple_groups() -> Result<()> { let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); - let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let table_schema = TableSchema::from(&file_schema); let file_source = Arc::new(MockSource::new(table_schema)); let file_groups = vec![ @@ -2917,7 +2929,7 @@ mod tests { fn sort_pushdown_unsupported_source_partial_statistics() -> Result<()> { let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); - let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let table_schema = TableSchema::from(&file_schema); let file_source = Arc::new(MockSource::new(table_schema)); let file_groups = vec![ @@ -2957,7 +2969,7 @@ mod tests { fn sort_pushdown_inexact_source_with_statistics_sorting() -> Result<()> { let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); - let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let table_schema = TableSchema::from(&file_schema); let file_source = Arc::new(InexactSortPushdownSource::new(table_schema)); let file_groups = vec![FileGroup::new(vec![ @@ -2994,7 +3006,7 @@ mod tests { // time (all values in group 0 < group 1), degrading to single-threaded I/O. let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); - let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let table_schema = TableSchema::from(&file_schema); let file_source = Arc::new(ExactSortPushdownSource::new(table_schema)); let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); @@ -3052,7 +3064,7 @@ mod tests { // sorting (which would undo the reversal). The result is Inexact. let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)])); - let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let table_schema = TableSchema::from(&file_schema); let file_source = Arc::new(InexactSortPushdownSource::new(table_schema)); let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); @@ -3119,7 +3131,7 @@ mod tests { // Should NOT upgrade to Exact — NULLs would appear in wrong position. let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, true)])); - let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let table_schema = TableSchema::from(&file_schema); let file_source = Arc::new(MockSource::new(table_schema)); let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); @@ -3152,7 +3164,7 @@ mod tests { // Files are non-overlapping, no NULLs → should upgrade to Exact let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, true)])); - let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]); + let table_schema = TableSchema::from(&file_schema); let file_source = Arc::new(MockSource::new(table_schema)); let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0))); diff --git a/datafusion/datasource/src/file_stream/mod.rs b/datafusion/datasource/src/file_stream/mod.rs index e277690cff810..d976bf955dbb2 100644 --- a/datafusion/datasource/src/file_stream/mod.rs +++ b/datafusion/datasource/src/file_stream/mod.rs @@ -315,7 +315,7 @@ mod tests { let on_error = self.on_error; - let table_schema = TableSchema::new(file_schema, vec![]); + let table_schema = TableSchema::from(file_schema); let config = FileScanConfigBuilder::new( ObjectStoreUrl::parse("test:///").unwrap(), Arc::new(MockSource::new(table_schema)), @@ -352,7 +352,7 @@ mod tests { /// Create the smallest valid file scan config for builder validation tests. fn builder_test_config() -> FileScanConfig { - let table_schema = TableSchema::new(Arc::new(Schema::empty()), vec![]); + let table_schema = TableSchema::from(Arc::new(Schema::empty())); FileScanConfigBuilder::new( ObjectStoreUrl::parse("test:///").unwrap(), Arc::new(MockSource::new(table_schema)), @@ -1575,10 +1575,12 @@ mod tests { }) .collect::>(); - let table_schema = TableSchema::new( - Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])), - vec![], - ); + let table_schema = + TableSchema::from(Arc::new(Schema::new(vec![Field::new( + "i", + DataType::Int32, + false, + )]))); FileScanConfigBuilder::new( ObjectStoreUrl::parse("test:///").unwrap(), Arc::new(MockSource::new(table_schema)), diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index 84daf608b5182..b92b4b454676f 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -62,7 +62,7 @@ use datafusion_physical_expr::LexOrdering; use futures::{Stream, StreamExt}; use object_store::{GetOptions, GetRange, ObjectStore}; use object_store::{ObjectMeta, path::Path}; -pub use table_schema::TableSchema; +pub use table_schema::{TableSchema, TableSchemaBuilder}; // Remove when add_row_stats is remove #[expect(deprecated)] pub use statistics::add_row_stats; diff --git a/datafusion/datasource/src/table_schema.rs b/datafusion/datasource/src/table_schema.rs index aa2204e3b8e9b..8b6d18b0e5058 100644 --- a/datafusion/datasource/src/table_schema.rs +++ b/datafusion/datasource/src/table_schema.rs @@ -17,7 +17,7 @@ //! Helper struct to manage table schemas with partition columns -use arrow::datatypes::{FieldRef, SchemaBuilder, SchemaRef}; +use arrow::datatypes::{FieldRef, Fields, SchemaBuilder, SchemaRef}; use std::sync::Arc; /// The overall schema for potentially partitioned data sources. @@ -70,7 +70,11 @@ pub struct TableSchema { /// /// These columns are NOT present in the data files but are appended to each /// row during query execution based on the file's location. - table_partition_cols: Arc>, + /// + /// Stored as [`Fields`] (an immutable `Arc<[FieldRef]>`) so that cloning a + /// `TableSchema` is cheap and the partition columns can be shared zero-copy + /// with an existing schema. + table_partition_cols: Fields, /// The complete table schema: file_schema columns followed by partition columns. /// @@ -80,20 +84,12 @@ pub struct TableSchema { } impl TableSchema { - /// Create a new TableSchema from a file schema and partition columns. - /// - /// The table schema is automatically computed by appending the partition columns - /// to the file schema. + /// Start building a [`TableSchema`] from its (required) file schema. /// - /// You should prefer calling this method over - /// chaining [`TableSchema::from_file_schema`] and [`TableSchema::with_table_partition_cols`] - /// if you have both the file schema and partition columns available at construction time - /// since it avoids re-computing the table schema. - /// - /// # Arguments - /// - /// * `file_schema` - Schema of the data files (without partition columns) - /// * `table_partition_cols` - Partition columns to append to each row + /// Partition columns are optional and added with + /// [`TableSchemaBuilder::with_table_partition_cols`]; the full table schema + /// is computed once by [`TableSchemaBuilder::build`]. This is the preferred + /// way to construct a `TableSchema`. /// /// # Example /// @@ -106,50 +102,53 @@ impl TableSchema { /// Field::new("amount", DataType::Float64, false), /// ])); /// - /// let partition_cols = vec![ - /// Arc::new(Field::new("date", DataType::Utf8, false)), - /// Arc::new(Field::new("region", DataType::Utf8, false)), - /// ]; - /// - /// let table_schema = TableSchema::new(file_schema, partition_cols); + /// let table_schema = TableSchema::builder(file_schema) + /// .with_table_partition_cols(vec![ + /// Arc::new(Field::new("date", DataType::Utf8, false)), + /// Arc::new(Field::new("region", DataType::Utf8, false)), + /// ]) + /// .build(); /// /// // Table schema will have 4 columns: user_id, amount, date, region /// assert_eq!(table_schema.table_schema().fields().len(), 4); /// ``` + pub fn builder(file_schema: SchemaRef) -> TableSchemaBuilder { + TableSchemaBuilder::new(file_schema) + } + + /// Create a new TableSchema from a file schema and partition columns. + /// + /// This is a convenience for + /// `TableSchema::builder(file_schema).with_table_partition_cols(cols).build()`. + #[deprecated( + since = "55.0.0", + note = "use TableSchema::builder(file_schema).with_table_partition_cols(cols).build() (or TableSchema::from(file_schema) for no partition columns)" + )] pub fn new(file_schema: SchemaRef, table_partition_cols: Vec) -> Self { - let mut builder = SchemaBuilder::from(file_schema.as_ref()); - builder.extend(table_partition_cols.iter().cloned()); - Self { - file_schema, - table_partition_cols: Arc::new(table_partition_cols), - table_schema: Arc::new(builder.finish()), - } + TableSchemaBuilder::new(file_schema) + .with_table_partition_cols(table_partition_cols) + .build() } /// Create a new TableSchema with no partition columns. - /// - /// You should prefer calling [`TableSchema::new`] if you have partition columns at - /// construction time since it avoids re-computing the table schema. + #[deprecated( + since = "55.0.0", + note = "use TableSchema::from(file_schema) / file_schema.into()" + )] pub fn from_file_schema(file_schema: SchemaRef) -> Self { - Self::new(file_schema, vec![]) + TableSchemaBuilder::new(file_schema).build() } - /// Add partition columns to an existing TableSchema, returning a new instance. - /// - /// You should prefer calling [`TableSchema::new`] instead of chaining [`TableSchema::from_file_schema`] - /// into [`TableSchema::with_table_partition_cols`] if you have partition columns at construction time - /// since it avoids re-computing the table schema. - pub fn with_table_partition_cols(mut self, partition_cols: Vec) -> Self { - // Append to existing partition columns. `Arc::make_mut` copies the - // inner `Vec` if the `Arc` is shared (e.g. with a clone of this - // `TableSchema`) and otherwise mutates in place. The previous - // `Arc::get_mut().expect()` panicked whenever the `Arc` was shared: - // owning `self` does not imply sole ownership of the inner `Arc`. - Arc::make_mut(&mut self.table_partition_cols).extend(partition_cols); - let mut builder = SchemaBuilder::from(self.file_schema.as_ref()); - builder.extend(self.table_partition_cols.iter().cloned()); - self.table_schema = Arc::new(builder.finish()); - self + /// Return a new `TableSchema` with `partition_cols` as its partition columns, + /// replacing any existing ones. + #[deprecated( + since = "55.0.0", + note = "use TableSchema::builder(file_schema).with_table_partition_cols(cols).build()" + )] + pub fn with_table_partition_cols(self, partition_cols: Vec) -> Self { + TableSchemaBuilder::new(self.file_schema) + .with_table_partition_cols(partition_cols) + .build() } /// Get the file schema (without partition columns). @@ -163,7 +162,7 @@ impl TableSchema { /// /// These are the columns derived from the directory structure that /// will be appended to each row during query execution. - pub fn table_partition_cols(&self) -> &Vec { + pub fn table_partition_cols(&self) -> &Fields { &self.table_partition_cols } @@ -178,13 +177,87 @@ impl TableSchema { impl From for TableSchema { fn from(schema: SchemaRef) -> Self { - Self::from_file_schema(schema) + TableSchemaBuilder::new(schema).build() + } +} + +impl From<&SchemaRef> for TableSchema { + fn from(schema: &SchemaRef) -> Self { + TableSchemaBuilder::new(Arc::clone(schema)).build() + } +} + +/// Builder for [`TableSchema`]. +/// +/// The file schema is the only required input; partition columns are optional. +/// Unlike calling [`TableSchema`]'s setters repeatedly, the builder computes the +/// concatenated table schema exactly once, in [`TableSchemaBuilder::build`]. +/// +/// ``` +/// # use std::sync::Arc; +/// # use arrow::datatypes::{Schema, Field, DataType}; +/// # use datafusion_datasource::TableSchemaBuilder; +/// # let file_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])); +/// let table_schema = TableSchemaBuilder::new(file_schema) +/// .with_table_partition_cols(vec![Arc::new(Field::new("date", DataType::Utf8, false))]) +/// .build(); +/// assert_eq!(table_schema.table_partition_cols().len(), 1); +/// ``` +#[derive(Debug, Clone)] +pub struct TableSchemaBuilder { + file_schema: SchemaRef, + table_partition_cols: Fields, +} + +impl TableSchemaBuilder { + /// Create a builder for a `TableSchema` over the given file schema, with no + /// partition columns yet. + pub fn new(file_schema: SchemaRef) -> Self { + Self { + file_schema, + table_partition_cols: Fields::empty(), + } + } + + /// Set the partition columns, replacing any previously set. + /// + /// Accepts anything convertible into [`Fields`] (e.g. `Vec` or an + /// existing schema's `Fields`, which is shared zero-copy). + pub fn with_table_partition_cols( + mut self, + table_partition_cols: impl Into, + ) -> Self { + self.table_partition_cols = table_partition_cols.into(); + self + } + + /// Build the [`TableSchema`], computing the full `file + partition` schema once. + pub fn build(self) -> TableSchema { + let mut builder = SchemaBuilder::from(self.file_schema.as_ref()); + builder.extend(self.table_partition_cols.iter().cloned()); + TableSchema { + file_schema: self.file_schema, + table_partition_cols: self.table_partition_cols, + table_schema: Arc::new(builder.finish()), + } + } +} + +impl From for TableSchemaBuilder { + fn from(schema: SchemaRef) -> Self { + TableSchemaBuilder::new(schema) + } +} + +impl From<&SchemaRef> for TableSchemaBuilder { + fn from(schema: &SchemaRef) -> Self { + TableSchemaBuilder::new(Arc::clone(schema)) } } #[cfg(test)] mod tests { - use super::TableSchema; + use super::{TableSchema, TableSchemaBuilder}; use arrow::datatypes::{DataType, Field, Schema}; use std::sync::Arc; @@ -200,7 +273,9 @@ mod tests { Arc::new(Field::new("region", DataType::Utf8, false)), ]; - let table_schema = TableSchema::new(file_schema.clone(), partition_cols.clone()); + let table_schema = TableSchema::builder(file_schema.clone()) + .with_table_partition_cols(partition_cols.clone()) + .build(); // Verify file schema assert_eq!(table_schema.file_schema().as_ref(), file_schema.as_ref()); @@ -222,84 +297,99 @@ mod tests { } #[test] - fn test_add_multiple_partition_columns() { + fn test_builder_with_partition_cols() { let file_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); - let initial_partition_cols = - vec![Arc::new(Field::new("country", DataType::Utf8, false))]; + let table_schema = TableSchemaBuilder::new(Arc::clone(&file_schema)) + .with_table_partition_cols(vec![ + Arc::new(Field::new("country", DataType::Utf8, false)), + Arc::new(Field::new("year", DataType::Int32, false)), + ]) + .build(); - let table_schema = TableSchema::new(file_schema.clone(), initial_partition_cols); + // File schema is preserved and the partition columns are appended. + assert_eq!(table_schema.file_schema().as_ref(), file_schema.as_ref()); + assert_eq!(table_schema.table_partition_cols().len(), 2); + assert_eq!(table_schema.table_partition_cols()[0].name(), "country"); + assert_eq!(table_schema.table_partition_cols()[1].name(), "year"); - let additional_partition_cols = vec![ - Arc::new(Field::new("city", DataType::Utf8, false)), - Arc::new(Field::new("year", DataType::Int32, false)), - ]; + let expected_schema = Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("country", DataType::Utf8, false), + Field::new("year", DataType::Int32, false), + ]); + assert_eq!(table_schema.table_schema().as_ref(), &expected_schema); + } - let updated_table_schema = - table_schema.with_table_partition_cols(additional_partition_cols); + #[test] + fn test_builder_with_table_partition_cols_replaces() { + // Calling the setter more than once replaces rather than appends. + let file_schema = + Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); - // Verify file schema remains unchanged - assert_eq!( - updated_table_schema.file_schema().as_ref(), - file_schema.as_ref() - ); + let table_schema = TableSchemaBuilder::new(file_schema) + .with_table_partition_cols(vec![Arc::new(Field::new( + "country", + DataType::Utf8, + false, + ))]) + .with_table_partition_cols(vec![Arc::new(Field::new( + "city", + DataType::Utf8, + false, + ))]) + .build(); - // Verify partition columns - assert_eq!(updated_table_schema.table_partition_cols().len(), 3); - assert_eq!( - updated_table_schema.table_partition_cols()[0].name(), - "country" - ); - assert_eq!( - updated_table_schema.table_partition_cols()[1].name(), - "city" - ); - assert_eq!( - updated_table_schema.table_partition_cols()[2].name(), - "year" - ); + assert_eq!(table_schema.table_partition_cols().len(), 1); + assert_eq!(table_schema.table_partition_cols()[0].name(), "city"); + } - // Verify full table schema - let expected_fields = vec![ - Field::new("id", DataType::Int32, false), - Field::new("country", DataType::Utf8, false), - Field::new("city", DataType::Utf8, false), - Field::new("year", DataType::Int32, false), - ]; - let expected_schema = Schema::new(expected_fields); - assert_eq!( - updated_table_schema.table_schema().as_ref(), - &expected_schema - ); + #[test] + fn test_builder_accepts_fields_zero_copy() { + // `with_table_partition_cols` accepts an existing schema's `Fields` + // directly (shared via `Arc`, no `Vec` round-trip). + let file_schema = + Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + let partition_schema = + Schema::new(vec![Field::new("date", DataType::Utf8, false)]); + + let table_schema = TableSchemaBuilder::new(file_schema) + .with_table_partition_cols(partition_schema.fields().clone()) + .build(); + + assert_eq!(table_schema.table_partition_cols().len(), 1); + assert_eq!(table_schema.table_partition_cols()[0].name(), "date"); } #[test] - fn test_with_table_partition_cols_after_clone_does_not_panic() { - // `TableSchema` is cheaply cloneable because its partition columns are - // stored behind an `Arc`. Appending more partition columns to a clone - // must not panic just because the `Arc` is shared, and must not mutate - // the other clone (copy-on-write isolation). + #[expect(deprecated)] + fn test_deprecated_with_table_partition_cols_replaces() { + // The deprecated setter still works and replaces the partition columns. + // It is safe on a shared clone because partition columns are immutable. let file_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); - let original = TableSchema::new( - file_schema, - vec![Arc::new(Field::new("country", DataType::Utf8, false))], - ); - - let cloned = original.clone(); - let extended = cloned.with_table_partition_cols(vec![Arc::new(Field::new( - "city", - DataType::Utf8, - false, - ))]); - - // The extended schema sees both partition columns... - assert_eq!(extended.table_partition_cols().len(), 2); - assert_eq!(extended.table_partition_cols()[0].name(), "country"); - assert_eq!(extended.table_partition_cols()[1].name(), "city"); - - // ...while the original clone is left untouched. + let original = TableSchema::builder(file_schema) + .with_table_partition_cols(vec![Arc::new(Field::new( + "country", + DataType::Utf8, + false, + ))]) + .build(); + + let replaced = + original + .clone() + .with_table_partition_cols(vec![Arc::new(Field::new( + "city", + DataType::Utf8, + false, + ))]); + + assert_eq!(replaced.table_partition_cols().len(), 1); + assert_eq!(replaced.table_partition_cols()[0].name(), "city"); + + // The original is untouched. assert_eq!(original.table_partition_cols().len(), 1); assert_eq!(original.table_partition_cols()[0].name(), "country"); } diff --git a/datafusion/datasource/src/test_util.rs b/datafusion/datasource/src/test_util.rs index d211319629878..d35ed5feb51de 100644 --- a/datafusion/datasource/src/test_util.rs +++ b/datafusion/datasource/src/test_util.rs @@ -40,7 +40,7 @@ pub(crate) struct MockSource { impl Default for MockSource { fn default() -> Self { let table_schema = - crate::table_schema::TableSchema::new(Arc::new(Schema::empty()), vec![]); + crate::table_schema::TableSchema::from(Arc::new(Schema::empty())); Self { metrics: ExecutionPlanMetricsSet::new(), filter: None, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 96144b11e9d3a..55022608e5a70 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -613,7 +613,9 @@ pub fn parse_table_schema_from_proto( .with_metadata(schema.metadata.clone()), ); - Ok(TableSchema::new(file_schema, table_partition_cols)) + Ok(TableSchema::builder(file_schema) + .with_table_partition_cols(table_partition_cols) + .build()) } pub fn parse_protobuf_file_scan_config( diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index f28d3a1f5b4de..d88a360422b05 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -103,8 +103,8 @@ use datafusion_common::{ DataFusionError, NullEquality, Result, UnnestOptions, exec_datafusion_err, internal_datafusion_err, internal_err, not_impl_err, }; -use datafusion_datasource::TableSchema; use datafusion_datasource::file::FileSource; +use datafusion_datasource::{TableSchema, TableSchemaBuilder}; use datafusion_expr::async_udf::{AsyncScalarUDF, AsyncScalarUDFImpl}; use datafusion_expr::dml::InsertOp; use datafusion_expr::{ @@ -1008,7 +1008,7 @@ fn roundtrip_arrow_scan() -> Result<()> { let file_schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)])); - let table_schema = TableSchema::new(file_schema.clone(), vec![]); + let table_schema = TableSchema::from(&file_schema); let file_source = Arc::new(ArrowSource::new_file_source(table_schema)); let scan_config = @@ -1035,14 +1035,13 @@ async fn roundtrip_parquet_exec_with_table_partition_cols() -> Result<()> { vec![wrap_partition_value_in_dict(ScalarValue::Int64(Some(0)))]; let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)])); - let table_schema = TableSchema::new( - schema.clone(), - vec![Arc::new(Field::new( + let table_schema = TableSchemaBuilder::from(&schema) + .with_table_partition_cols(vec![Arc::new(Field::new( "part".to_string(), wrap_partition_type_in_dict(DataType::Int16), false, - ))], - ); + ))]) + .build(); let file_source = Arc::new(ParquetSource::new(table_schema.clone())); let scan_config =