diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index c3e5cabce7bc..fc2f060d3157 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -596,7 +596,8 @@ impl DataSource for FileScanConfig { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?; - let orderings = get_projected_output_ordering(self, &schema); + let eq_props = self.eq_properties(); + let orderings = eq_props.oeq_class(); write!(f, "file_groups=")?; FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?; @@ -635,7 +636,7 @@ impl DataSource for FileScanConfig { write!(f, ", limit={limit}")?; } - display_orderings(f, &orderings)?; + display_orderings(f, orderings)?; if !self.constraints.is_empty() { write!(f, ", {}", self.constraints)?; @@ -929,16 +930,69 @@ impl FileScanConfig { /// Returns only the output orderings that are validated against actual /// file group statistics. /// + /// The various listing tables do not attempt to read all files + /// concurrently, instead they read files in sequence within a + /// partition. This is an important property as it allows plans to + /// run against 1000s of files and not try to open them all + /// concurrently. + /// + /// However, it means if we assign more than one file to a partition + /// the output sort order will not be preserved unless the files' + /// min/max statistics prove the combined stream is still ordered. + /// + /// When only 1 file is assigned to each partition, each partition is + /// correctly sorted on `(A, B, C)`: + /// + /// ```text + /// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓ + /// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐ + /// ┃ ┌───────────────┐ ┌──────────────┐ │ ┌──────────────┐ │ ┌─────────────┐ ┃ + /// │ │ 1.parquet │ │ │ │ 2.parquet │ │ │ 3.parquet │ │ │ 4.parquet │ │ + /// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ │Sort: A, B, C │ │ │Sort: A, B, C│ ┃ + /// │ └───────────────┘ │ │ └──────────────┘ │ └──────────────┘ │ └─────────────┘ │ + /// ┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ + /// Partition 1 Partition 2 Partition 3 Partition 4 + /// ┃ ┃ + /// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ + /// DataSourceExec + /// ``` + /// + /// However, when more than 1 file is assigned to each partition, each + /// partition is NOT necessarily sorted on `(A, B, C)`. Once the second + /// file is scanned, the same values for A, B and C can be repeated in + /// the same sorted stream: + /// + /// ```text + /// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ + /// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ + /// ┃ ┌───────────────┐ ┌──────────────┐ │ + /// │ │ 1.parquet │ │ │ │ 2.parquet │ ┃ + /// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ + /// │ └───────────────┘ │ │ └──────────────┘ ┃ + /// ┃ ┌───────────────┐ ┌──────────────┐ │ + /// │ │ 3.parquet │ │ │ │ 4.parquet │ ┃ + /// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ + /// │ └───────────────┘ │ │ └──────────────┘ ┃ + /// ┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ + /// Partition 1 Partition 2 ┃ + /// ┃ + /// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛ + /// DataSourceExec + /// ``` + /// /// For example, individual files may be ordered by `col1 ASC`, - /// but if we have files with these min/max statistics in a single partition / file group: + /// but if we have files with these min/max statistics in a single + /// partition / file group: /// /// - file1: min(col1) = 10, max(col1) = 20 /// - file2: min(col1) = 5, max(col1) = 15 /// - /// Because reading file1 followed by file2 would produce out-of-order output (there is overlap - /// in the ranges), we cannot retain `col1 ASC` as a valid output ordering. + /// Because reading file1 followed by file2 would produce out-of-order + /// output (there is overlap in the ranges), we cannot retain `col1 ASC` + /// as a valid output ordering. /// - /// Similarly this would not be a valid order (non-overlapping ranges but not ordered): + /// Similarly this would not be a valid order (non-overlapping ranges + /// but not ordered): /// /// - file1: min(col1) = 20, max(col1) = 30 /// - file2: min(col1) = 10, max(col1) = 15 @@ -948,13 +1002,14 @@ impl FileScanConfig { /// - file1: min(col1) = 5, max(col1) = 15 /// - file2: min(col1) = 16, max(col1) = 25 /// - /// Then we know that reading file1 followed by file2 will produce ordered output, - /// so `col1 ASC` would be retained. + /// Then we know that reading file1 followed by file2 will produce + /// ordered output, so `col1 ASC` would be retained. /// - /// Note that we are checking for ordering *within* *each* file group / partition, - /// files in different partitions are read independently and do not affect each other's ordering. - /// Merging of the multiple partition streams into a single ordered stream is handled - /// upstream e.g. by `SortPreservingMergeExec`. + /// Note that we are checking for ordering *within* *each* file group / + /// partition — files in different partitions are read independently and + /// do not affect each other's ordering. Merging of the multiple + /// partition streams into a single ordered stream is handled upstream + /// e.g. by `SortPreservingMergeExec`. fn validated_output_ordering(&self) -> Vec { let schema = self.file_source.table_schema().table_schema(); validate_orderings(&self.output_ordering, schema, &self.file_groups, None) @@ -1295,13 +1350,15 @@ impl Debug for FileScanConfig { impl DisplayAs for FileScanConfig { fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult { - let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?; - let orderings = get_projected_output_ordering(self, &schema); + let eq_props = self.eq_properties(); + let orderings = eq_props.oeq_class(); write!(f, "file_groups=")?; FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?; - if !schema.fields().is_empty() { + if let Ok(schema) = self.projected_schema() + && !schema.fields().is_empty() + { write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; } @@ -1309,7 +1366,7 @@ impl DisplayAs for FileScanConfig { write!(f, ", limit={limit}")?; } - display_orderings(f, &orderings)?; + display_orderings(f, orderings)?; if !self.constraints.is_empty() { write!(f, ", {}", self.constraints)?; @@ -1319,21 +1376,6 @@ impl DisplayAs for FileScanConfig { } } -/// Get the indices of columns in a projection if the projection is a simple -/// list of columns. -/// If there are any expressions other than columns, returns None. -fn ordered_column_indices_from_projection( - projection: &ProjectionExprs, -) -> Option> { - projection - .expr_iter() - .map(|e| { - let index = e.as_any().downcast_ref::()?.index(); - Some(index) - }) - .collect::>>() -} - /// Check whether a given ordering is valid for all file groups by verifying /// that files within each group are sorted according to their min/max statistics. /// @@ -1379,115 +1421,6 @@ fn validate_orderings( .collect() } -/// The various listing tables does not attempt to read all files -/// concurrently, instead they will read files in sequence within a -/// partition. This is an important property as it allows plans to -/// run against 1000s of files and not try to open them all -/// concurrently. -/// -/// However, it means if we assign more than one file to a partition -/// the output sort order will not be preserved as illustrated in the -/// following diagrams: -/// -/// When only 1 file is assigned to each partition, each partition is -/// correctly sorted on `(A, B, C)` -/// -/// ```text -/// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓ -/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐ -/// ┃ ┌───────────────┐ ┌──────────────┐ │ ┌──────────────┐ │ ┌─────────────┐ ┃ -/// │ │ 1.parquet │ │ │ │ 2.parquet │ │ │ 3.parquet │ │ │ 4.parquet │ │ -/// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ │Sort: A, B, C │ │ │Sort: A, B, C│ ┃ -/// │ └───────────────┘ │ │ └──────────────┘ │ └──────────────┘ │ └─────────────┘ │ -/// ┃ │ │ ┃ -/// │ │ │ │ │ │ -/// ┃ │ │ ┃ -/// │ │ │ │ │ │ -/// ┃ │ │ ┃ -/// │ │ │ │ │ │ -/// ┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ -/// DataFusion DataFusion DataFusion DataFusion -/// ┃ Partition 1 Partition 2 Partition 3 Partition 4 ┃ -/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ -/// -/// DataSourceExec -/// ``` -/// -/// However, when more than 1 file is assigned to each partition, each -/// partition is NOT correctly sorted on `(A, B, C)`. Once the second -/// file is scanned, the same values for A, B and C can be repeated in -/// the same sorted stream -/// -///```text -/// ┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ -/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ -/// ┃ ┌───────────────┐ ┌──────────────┐ │ -/// │ │ 1.parquet │ │ │ │ 2.parquet │ ┃ -/// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ -/// │ └───────────────┘ │ │ └──────────────┘ ┃ -/// ┃ ┌───────────────┐ ┌──────────────┐ │ -/// │ │ 3.parquet │ │ │ │ 4.parquet │ ┃ -/// ┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ -/// │ └───────────────┘ │ │ └──────────────┘ ┃ -/// ┃ │ -/// │ │ │ ┃ -/// ┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ -/// DataFusion DataFusion ┃ -/// ┃ Partition 1 Partition 2 -/// ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛ -/// -/// DataSourceExec -/// ``` -fn get_projected_output_ordering( - base_config: &FileScanConfig, - projected_schema: &SchemaRef, -) -> Vec { - let projected_orderings = - project_orderings(&base_config.output_ordering, projected_schema); - - let indices = base_config - .file_source - .projection() - .as_ref() - .map(|p| ordered_column_indices_from_projection(p)); - - match indices { - Some(Some(indices)) => { - // Simple column projection — validate with statistics - validate_orderings( - &projected_orderings, - projected_schema, - &base_config.file_groups, - Some(indices.as_slice()), - ) - } - None => { - // No projection — validate with statistics (no remapping needed) - validate_orderings( - &projected_orderings, - projected_schema, - &base_config.file_groups, - None, - ) - } - Some(None) => { - // Complex projection (expressions, not simple columns) — can't - // determine column indices for statistics. Still valid if all - // file groups have at most one file. - if base_config.file_groups.iter().all(|g| g.len() <= 1) { - projected_orderings - } else { - debug!( - "Skipping specified output orderings. \ - Some file groups couldn't be determined to be sorted: {:?}", - base_config.file_groups - ); - vec![] - } - } - } -} - /// Convert type to a type suitable for use as a `ListingTable` /// partition column. Returns `Dictionary(UInt16, val_type)`, which is /// a reasonable trade off between a reasonable number of partition @@ -2583,4 +2516,247 @@ mod tests { Ok(()) } + + /// Helper: create a `PartitionedFile` with min/max stats for the given columns. + fn partitioned_file_with_stats( + name: &str, + col_stats: Vec<(ScalarValue, ScalarValue)>, + ) -> PartitionedFile { + let column_statistics: Vec = col_stats + .into_iter() + .map(|(min, max)| ColumnStatistics { + min_value: Precision::Exact(min), + max_value: Precision::Exact(max), + null_count: Precision::Exact(0), + ..ColumnStatistics::new_unknown() + }) + .collect(); + let stats = Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Absent, + column_statistics, + }; + PartitionedFile::new(name, 1024).with_statistics(Arc::new(stats)) + } + + /// Regression test: with a complex projection like `a + 1`, the display + /// path should still show orderings (it delegates to `eq_properties()` + /// which validates at table-schema level, then projects correctly). + #[test] + fn test_display_ordering_with_complex_projection_multi_file() { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let table_schema = TableSchema::new(Arc::clone(&schema), vec![]); + let file_source: Arc = + Arc::new(MockSource::new(table_schema.clone())); + + // Two files in one group, non-overlapping b statistics + let file1 = partitioned_file_with_stats( + "file1", + vec![ + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(100))), // a + (ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(10))), // b + ], + ); + let file2 = partitioned_file_with_stats( + "file2", + vec![ + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(100))), // a + (ScalarValue::Int32(Some(11)), ScalarValue::Int32(Some(20))), // b + ], + ); + + let sort_b_asc = PhysicalSortExpr::new( + Arc::new(Column::new("b", 1)), + arrow::compute::SortOptions { + descending: false, + nulls_first: false, + }, + ); + + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + Arc::clone(&file_source), + ) + .with_file_groups(vec![FileGroup::new(vec![file1, file2])]) + .with_output_ordering(vec![LexOrdering::new(vec![sort_b_asc]).unwrap()]) + .build(); + + // Push a complex projection: [a + 1 AS x, b] + let expr_a_plus_1: Arc = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), + )); + let exprs = ProjectionExprs::new(vec![ + ProjectionExpr::new(expr_a_plus_1, "x"), + ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"), + ]); + + let data_source = config + .try_swapping_with_projection(&exprs) + .unwrap() + .expect("projection swap should succeed"); + let new_config = data_source + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + + // Format via DisplayAs and verify ordering is present + let display = format!( + "{}", + datafusion_physical_plan::display::VerboseDisplay(new_config.clone()) + ); + assert!( + display.contains("output_ordering="), + "Expected output_ordering in display, but got: {display}" + ); + assert!( + display.contains("b@1 ASC"), + "Expected 'b@1 ASC' in display, but got: {display}" + ); + } + + /// Verify orderings ARE dropped when file statistics overlap + /// (ordering is genuinely invalid for multi-file groups). + #[test] + fn test_display_ordering_dropped_for_overlapping_stats() { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let table_schema = TableSchema::new(Arc::clone(&schema), vec![]); + let file_source: Arc = + Arc::new(MockSource::new(table_schema.clone())); + + // Two files in one group, OVERLAPPING b statistics + let file1 = partitioned_file_with_stats( + "file1", + vec![ + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(100))), + (ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(10))), + ], + ); + let file2 = partitioned_file_with_stats( + "file2", + vec![ + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(100))), + (ScalarValue::Int32(Some(5)), ScalarValue::Int32(Some(20))), // overlaps! + ], + ); + + let sort_b_asc = PhysicalSortExpr::new( + Arc::new(Column::new("b", 1)), + arrow::compute::SortOptions { + descending: false, + nulls_first: false, + }, + ); + + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + Arc::clone(&file_source), + ) + .with_file_groups(vec![FileGroup::new(vec![file1, file2])]) + .with_output_ordering(vec![LexOrdering::new(vec![sort_b_asc]).unwrap()]) + .build(); + + // Format and verify ordering is NOT present + let display = format!( + "{}", + datafusion_physical_plan::display::VerboseDisplay(config.clone()) + ); + assert!( + !display.contains("output_ordering"), + "Expected no output_ordering for overlapping stats, but got: {display}" + ); + } + + /// Verify the display path and optimization path agree: orderings from + /// `eq_properties().oeq_class()` match what appears in `fmt_as()` output. + #[test] + fn test_display_ordering_matches_eq_properties() { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let table_schema = TableSchema::new(Arc::clone(&schema), vec![]); + let file_source: Arc = + Arc::new(MockSource::new(table_schema.clone())); + + // Non-overlapping b statistics across two files + let file1 = partitioned_file_with_stats( + "file1", + vec![ + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(100))), + (ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(10))), + ], + ); + let file2 = partitioned_file_with_stats( + "file2", + vec![ + (ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(100))), + (ScalarValue::Int32(Some(11)), ScalarValue::Int32(Some(20))), + ], + ); + + let sort_b_asc = PhysicalSortExpr::new( + Arc::new(Column::new("b", 1)), + arrow::compute::SortOptions { + descending: false, + nulls_first: false, + }, + ); + + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + Arc::clone(&file_source), + ) + .with_file_groups(vec![FileGroup::new(vec![file1, file2])]) + .with_output_ordering(vec![LexOrdering::new(vec![sort_b_asc]).unwrap()]) + .build(); + + // Simple projection [a, b] + let exprs = ProjectionExprs::new(vec![ + ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a"), + ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"), + ]); + + let data_source = config + .try_swapping_with_projection(&exprs) + .unwrap() + .expect("projection swap should succeed"); + let new_config = data_source + .as_any() + .downcast_ref::() + .expect("Expected FileScanConfig"); + + // Get orderings from eq_properties (what the optimizer sees) + let eq_props = new_config.eq_properties(); + let oeq_orderings = eq_props.oeq_class(); + assert!( + !oeq_orderings.is_empty(), + "eq_properties should report orderings for valid non-overlapping files" + ); + + // Get display output + let display = format!( + "{}", + datafusion_physical_plan::display::VerboseDisplay(new_config.clone()) + ); + + // Verify they agree: each ordering from eq_properties should appear in display + for ordering in oeq_orderings.iter() { + let ordering_str = format!("{ordering}"); + assert!( + display.contains(&ordering_str), + "Display should contain ordering '{ordering_str}', but got: {display}" + ); + } + } } diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index 294841552a66..539a6d96b174 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -2274,7 +2274,7 @@ logical_plan 02)--TableScan: multiple_ordered_table projection=[a0, a, b, c, d] physical_plan 01)SortExec: expr=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, d@4 ASC NULLS LAST], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true query TT EXPLAIN SELECT a, b, ARRAY_AGG(d ORDER BY d) @@ -5001,7 +5001,7 @@ logical_plan 03)----TableScan: multiple_ordered_table projection=[a, b, c] physical_plan 01)AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[array_agg(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]], ordering_mode=Sorted -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_orderings=[[c@2 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]], file_type=csv, has_header=true query II? SELECT a, b, ARRAY_AGG(c ORDER BY c DESC) diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 2fb544a638d6..8b7073ef6c56 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3456,7 +3456,7 @@ logical_plan 05)----TableScan: annotated_data projection=[a0, a, b, c, d] physical_plan 01)NestedLoopJoinExec: join_type=Inner, filter=example(join_proj_push_down_1@0, join_proj_push_down_2@1) > 3, projection=[a0@0, a@1, b@2, c@3, d@4, a0@6, a@7, b@8, c@9, d@10] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d, CAST(a@1 AS Float64) as join_proj_push_down_1], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d, CAST(a@1 AS Float64) as join_proj_push_down_1], output_orderings=[[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], [join_proj_push_down_1@5 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]], file_type=csv, has_header=true 03)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, CAST(a@1 AS Float64) as join_proj_push_down_2] 04)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, maintains_sort_order=true 05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true diff --git a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt index 7feefc169fca..3144977b678f 100644 --- a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt +++ b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt @@ -97,7 +97,7 @@ logical_plan 01)Sort: a_big ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST 02)--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b 03)----TableScan: multiple_ordered_table projection=[a, b] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_ordering=[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], file_type=csv, has_header=true +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], [a_big@1 ASC NULLS LAST]], file_type=csv, has_header=true query TT EXPLAIN @@ -109,7 +109,7 @@ logical_plan 01)Sort: multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST 02)--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b 03)----TableScan: multiple_ordered_table projection=[a, b] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_ordering=[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], file_type=csv, has_header=true +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, a@1 as a_big, b], output_orderings=[[a@0 ASC NULLS LAST, b@2 ASC NULLS LAST], [a_big@1 ASC NULLS LAST]], file_type=csv, has_header=true # test for cast Utf8 diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index 7b741579cb13..131f0bc74ca4 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -404,7 +404,7 @@ logical_plan 01)Sort: timeseries_parquet.period_end ASC NULLS LAST, fetch=2 02)--Filter: timeseries_parquet.timeframe = Utf8View("quarterly") 03)----TableScan: timeseries_parquet projection=[timeframe, period_end, value], partial_filters=[timeseries_parquet.timeframe = Utf8View("quarterly")] -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/timeseries_sorted.parquet]]}, projection=[timeframe, period_end, value], limit=2, output_ordering=[timeframe@0 ASC NULLS LAST, period_end@1 ASC NULLS LAST], file_type=parquet, predicate=timeframe@0 = quarterly, pruning_predicate=timeframe_null_count@2 != row_count@3 AND timeframe_min@0 <= quarterly AND quarterly <= timeframe_max@1, required_guarantees=[timeframe in (quarterly)] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/timeseries_sorted.parquet]]}, projection=[timeframe, period_end, value], limit=2, output_ordering=[period_end@1 ASC NULLS LAST], file_type=parquet, predicate=timeframe@0 = quarterly, pruning_predicate=timeframe_null_count@2 != row_count@3 AND timeframe_min@0 <= quarterly AND quarterly <= timeframe_max@1, required_guarantees=[timeframe in (quarterly)] # Test 2.4: Verify ASC results query TIR @@ -458,7 +458,7 @@ logical_plan 03)----TableScan: timeseries_parquet projection=[timeframe, period_end, value], partial_filters=[timeseries_parquet.timeframe = Utf8View("quarterly")] physical_plan 01)SortExec: TopK(fetch=2), expr=[period_end@1 DESC], preserve_partitioning=[false] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/timeseries_sorted.parquet]]}, projection=[timeframe, period_end, value], output_ordering=[timeframe@0 ASC NULLS LAST, period_end@1 ASC NULLS LAST], file_type=parquet, predicate=timeframe@0 = quarterly AND DynamicFilter [ empty ], pruning_predicate=timeframe_null_count@2 != row_count@3 AND timeframe_min@0 <= quarterly AND quarterly <= timeframe_max@1, required_guarantees=[timeframe in (quarterly)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/timeseries_sorted.parquet]]}, projection=[timeframe, period_end, value], output_ordering=[period_end@1 ASC NULLS LAST], file_type=parquet, predicate=timeframe@0 = quarterly AND DynamicFilter [ empty ], pruning_predicate=timeframe_null_count@2 != row_count@3 AND timeframe_min@0 <= quarterly AND quarterly <= timeframe_max@1, required_guarantees=[timeframe in (quarterly)] # Results should still be correct query TIR diff --git a/datafusion/sqllogictest/test_files/topk.slt b/datafusion/sqllogictest/test_files/topk.slt index 8a1fef072229..3f135c25c86e 100644 --- a/datafusion/sqllogictest/test_files/topk.slt +++ b/datafusion/sqllogictest/test_files/topk.slt @@ -371,7 +371,7 @@ explain select number, letter, age, number as column4, letter as column5 from pa ---- physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC NULLS LAST, age@2 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC, letter@1 ASC NULLS LAST] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age, number@0 as column4, letter@1 as column5], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age, number@0 as column4, letter@1 as column5], output_orderings=[[number@0 DESC, column5@4 ASC NULLS LAST], [column4@3 DESC], [number@0 DESC, letter@1 ASC NULLS LAST]], file_type=parquet, predicate=DynamicFilter [ empty ] # Verify that the sort prefix is correctly computed over normalized, order-maintaining projections (number + 1, number, number + 1, age) query TT diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index d858d0ae3ea4..d158fa50a107 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -592,7 +592,7 @@ physical_plan 01)SortPreservingMergeExec: [c1@0 ASC NULLS LAST] 02)--UnionExec 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], output_ordering=[c1@0 ASC NULLS LAST], file_type=csv, has_header=true -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1a@0 as c1], file_type=csv, has_header=true +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1a@0 as c1], output_ordering=[c1@0 ASC NULLS LAST], file_type=csv, has_header=true statement ok drop table t1 diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 8ac8724683a8..8584208ca806 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -3145,7 +3145,7 @@ physical_plan 11)--------------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b, annotated_data_finite2.d] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW": nullable Int64 }, frame: ROWS BETWEEN 5 PRECEDING AND CURRENT ROW], mode=[Sorted] 12)----------------------SortExec: expr=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, d@4 ASC NULLS LAST, c@3 ASC NULLS LAST], preserve_partitioning=[false] 13)------------------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING, sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING: Field { "sum(annotated_data_finite2.c) PARTITION BY [annotated_data_finite2.a, annotated_data_finite2.b] ORDER BY [annotated_data_finite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING], mode=[Sorted] -14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Int64) as __common_expr_1, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], file_type=csv, has_header=true +14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[CAST(c@3 AS Int64) as __common_expr_1, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, __common_expr_1@0 ASC NULLS LAST]], file_type=csv, has_header=true query IIIIIIIIIIIIIII SELECT a, b, c, @@ -3507,7 +3507,7 @@ logical_plan physical_plan 01)BoundedWindowAggExec: wdw=[sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 02)--FilterExec: b@2 = 0 -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true # Since column b is constant after filter b=0, # window requirement b ASC, d ASC can be satisfied @@ -3525,7 +3525,7 @@ physical_plan 01)BoundedWindowAggExec: wdw=[sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST, multiple_ordered_table.d ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.b ASC NULLS LAST, multiple_ordered_table.d ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 02)--SortExec: expr=[d@4 ASC NULLS LAST], preserve_partitioning=[false] 03)----FilterExec: b@2 = 0 -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[c@3 ASC NULLS LAST], [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]], file_type=csv, has_header=true # Create an unbounded source where there is multiple orderings. @@ -3561,7 +3561,7 @@ physical_plan 02)--BoundedWindowAggExec: wdw=[min(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "min(multiple_ordered_table.d) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int32 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] 03)----ProjectionExec: expr=[c@2 as c, d@3 as d, max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] 04)------BoundedWindowAggExec: wdw=[max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "max(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.b, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int32 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true +05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[c@2 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]], file_type=csv, has_header=true query TT EXPLAIN SELECT MAX(c) OVER(PARTITION BY d ORDER BY c ASC) as max_c @@ -3605,7 +3605,7 @@ logical_plan physical_plan 01)ProjectionExec: expr=[sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@4 as sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] 02)--BoundedWindowAggExec: wdw=[sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "sum(multiple_ordered_table.d) PARTITION BY [multiple_ordered_table.c, multiple_ordered_table.a] ORDER BY [multiple_ordered_table.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": nullable Int64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], file_type=csv, has_header=true +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_orderings=[[c@2 ASC NULLS LAST], [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST]], file_type=csv, has_header=true query I SELECT SUM(d) OVER(PARTITION BY c, a ORDER BY b ASC)