From 497edc6613a6b4bbce6d11e265e5a87a2b1c84b8 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Mon, 1 Jun 2026 16:16:32 -0400 Subject: [PATCH 1/2] . --- datafusion/physical-plan/src/filter.rs | 412 ++++++++++++++---- .../test_files/parquet_statistics.slt | 14 +- 2 files changed, 346 insertions(+), 80 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index b3b107dc580df..3a05fb2d3b248 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -311,10 +311,7 @@ impl FilterExec { } /// Calculates `Statistics` for `FilterExec`, by applying selectivity - /// (either default, or estimated) to input statistics. - /// - /// Equality predicates (`col = literal`) set NDV to `Exact(1)`, or - /// `Exact(0)` when the predicate is contradictory (e.g. `a = 1 AND a = 2`). + /// (either default or estimated) to input statistics. pub(crate) fn statistics_helper( schema: &SchemaRef, input_stats: Statistics, @@ -327,8 +324,8 @@ impl FilterExec { let input_total_byte_size = input_stats.total_byte_size; let (selectivity, num_rows, column_statistics) = if is_infeasible { - // Contradictory predicate: zero rows, and null/min/max are - // undefined on an empty column. + // Contradictory predicate: no rows survive. Row-bounded counts are + // zero; value statistics are undefined on an empty column. let mut cs = input_stats.to_inexact().column_statistics; for col_stat in &mut cs { col_stat.distinct_count = Precision::Exact(0); @@ -339,43 +336,50 @@ impl FilterExec { col_stat.byte_size = Precision::Exact(0); } (0.0, Precision::Exact(0), cs) - } else if !check_support(predicate, schema) { - // Interval analysis is not applicable; fall back to the default - // selectivity but still pin NDV=1 for every `col = literal` column. - let selectivity = default_selectivity as f64 / 100.0; - let mut cs = input_stats.to_inexact().column_statistics; - for &idx in &eq_columns { - if idx < cs.len() && cs[idx].distinct_count != Precision::Exact(0) { - cs[idx].distinct_count = Precision::Exact(1); + } else { + let null_rejecting_columns = collect_null_rejecting_columns(predicate); + + if check_support(predicate, schema) { + let input_analysis_ctx = AnalysisContext::try_from_statistics( + schema, + &input_stats.column_statistics, + )?; + let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?; + let selectivity = analysis_ctx.selectivity.unwrap_or(1.0); + let filtered_num_rows = + input_num_rows.with_estimated_selectivity(selectivity); + let cs = collect_new_statistics( + schema, + &input_stats.column_statistics, + analysis_ctx.boundaries, + selectivity, + &null_rejecting_columns, + filtered_num_rows, + ); + (selectivity, filtered_num_rows, cs) + } else { + // Without interval boundaries, use the default selectivity and + // apply the row-count constraints that still follow from the + // filter predicate. + let selectivity = default_selectivity as f64 / 100.0; + let filtered_num_rows = + input_num_rows.with_estimated_selectivity(selectivity); + let mut cs = input_stats.to_inexact().column_statistics; + for (idx, col_stat) in cs.iter_mut().enumerate() { + col_stat.byte_size = scale_byte_size(col_stat.byte_size, selectivity); + col_stat.null_count = if null_rejecting_columns.contains(&idx) { + Precision::Exact(0) + } else { + cap_at_rows(col_stat.null_count, filtered_num_rows) + }; + col_stat.distinct_count = if eq_columns.contains(&idx) { + distinct_count_for_singleton_domain(filtered_num_rows) + } else { + cap_at_rows(col_stat.distinct_count, filtered_num_rows) + }; } + (selectivity, filtered_num_rows, cs) } - ( - selectivity, - input_num_rows.with_estimated_selectivity(selectivity), - cs, - ) - } else { - // Interval-analysis path. `collect_new_statistics` already sets - // distinct_count = Exact(1) when an interval collapses to a single - // value, so no post-fix is needed here. - let input_analysis_ctx = AnalysisContext::try_from_statistics( - schema, - &input_stats.column_statistics, - )?; - let analysis_ctx = analyze(predicate, input_analysis_ctx, schema)?; - let selectivity = analysis_ctx.selectivity.unwrap_or(1.0); - let filtered_num_rows = - input_num_rows.with_estimated_selectivity(selectivity); - let cs = collect_new_statistics( - schema, - &input_stats.column_statistics, - analysis_ctx.boundaries, - match &filtered_num_rows { - Precision::Absent => None, - p => Some(*p), - }, - ); - (selectivity, filtered_num_rows, cs) }; let total_byte_size = @@ -846,6 +850,35 @@ fn collect_equality_columns(predicate: &Arc) -> (HashSet) -> HashSet { + let mut columns = HashSet::new(); + + for expr in split_conjunction(predicate) { + let Some(binary) = expr.downcast_ref::() else { + continue; + }; + if !binary.op().returns_null_on_null() { + continue; + } + if let Some(col) = binary.left().downcast_ref::() { + columns.insert(col.index()); + } + if let Some(col) = binary.right().downcast_ref::() { + columns.insert(col.index()); + } + } + + columns +} + /// Converts an interval bound to a [`Precision`] value. NULL bounds (which /// represent "unbounded" in the interval type) map to [`Precision::Absent`]. fn interval_bound_to_precision( @@ -861,15 +894,56 @@ fn interval_bound_to_precision( } } -/// This function ensures that all bounds in the `ExprBoundaries` vector are -/// converted to closed bounds. If a lower/upper bound is initially open, it -/// is adjusted by using the next/previous value for its data type to convert -/// it into a closed bound. +/// Scales a column's `byte_size` by the estimated filter `selectivity`. An +/// exact zero is preserved: an empty column stays exactly empty after +/// filtering. +fn scale_byte_size(byte_size: Precision, selectivity: f64) -> Precision { + match byte_size { + Precision::Exact(0) => Precision::Exact(0), + byte_size => byte_size.with_estimated_selectivity(selectivity), + } +} + +/// Caps a row-bounded column statistic (a null count or distinct count) at the +/// filtered row estimate, since a column cannot have more nulls or distinct +/// values than it has rows. Known counts are demoted to inexact because the +/// filtered row count is itself an estimate. +fn cap_at_rows( + value: Precision, + filtered_num_rows: Precision, +) -> Precision { + match filtered_num_rows { + Precision::Absent => value.to_inexact(), + rows => value.to_inexact().min(&rows), + } +} + +/// Returns the NDV for a column constrained to one non-null value, such as +/// `column = literal` or a singleton interval. The constraint gives NDV at +/// most one; a zero-row output has no distinct values. +/// +/// The caller is responsible for proving the singleton domain. +fn distinct_count_for_singleton_domain( + filtered_num_rows: Precision, +) -> Precision { + match filtered_num_rows { + Precision::Exact(0) | Precision::Inexact(0) => filtered_num_rows, + _ => Precision::Exact(1), + } +} + +/// Builds output column statistics from interval-analysis boundaries. +/// +/// The interval bounds become min/max values, singleton intervals become +/// singleton NDV, and row-bounded counts are kept consistent with the filtered +/// row estimate. fn collect_new_statistics( schema: &SchemaRef, input_column_stats: &[ColumnStatistics], analysis_boundaries: Vec, - filtered_num_rows: Option>, + selectivity: f64, + null_rejecting_columns: &HashSet, + filtered_num_rows: Precision, ) -> Vec { analysis_boundaries .into_iter() @@ -904,24 +978,29 @@ fn collect_new_statistics( !lower.is_null() && !upper.is_null() && lower == upper; let min_value = interval_bound_to_precision(lower, is_single_value); let max_value = interval_bound_to_precision(upper, is_single_value); - // When the interval collapses to a single value (equality - // predicate), the column has exactly 1 distinct value. - // Otherwise, cap NDV at the filtered row count. + + // Distinct and null counts cannot exceed the number of rows + // that survive the filter. Singleton intervals and + // null-rejecting predicates provide tighter bounds. let capped_distinct_count = if is_single_value { - Precision::Exact(1) + distinct_count_for_singleton_domain(filtered_num_rows) } else { - match filtered_num_rows { - Some(rows) => distinct_count.to_inexact().min(&rows), - None => distinct_count.to_inexact(), - } + cap_at_rows(distinct_count, filtered_num_rows) + }; + let capped_null_count = if null_rejecting_columns.contains(&idx) { + Precision::Exact(0) + } else { + cap_at_rows(input_column_stats[idx].null_count, filtered_num_rows) }; + let byte_size = + scale_byte_size(input_column_stats[idx].byte_size, selectivity); ColumnStatistics { - null_count: input_column_stats[idx].null_count.to_inexact(), + null_count: capped_null_count, max_value, min_value, sum_value: Precision::Absent, distinct_count: capped_distinct_count, - byte_size: input_column_stats[idx].byte_size, + byte_size, } }, ) @@ -1237,6 +1316,8 @@ mod tests { assert_eq!( statistics.column_statistics, vec![ColumnStatistics { + // `a <= 25` rejects nulls, so the column has no surviving nulls. + null_count: Precision::Exact(0), min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), max_value: Precision::Inexact(ScalarValue::Int32(Some(25))), ..Default::default() @@ -1283,6 +1364,8 @@ mod tests { assert_eq!( statistics.column_statistics, vec![ColumnStatistics { + // `a <= 25 AND a >= 10` rejects nulls in `a`. + null_count: Precision::Exact(0), min_value: Precision::Inexact(ScalarValue::Int32(Some(10))), max_value: Precision::Inexact(ScalarValue::Int32(Some(25))), ..Default::default() @@ -1350,11 +1433,16 @@ mod tests { statistics.column_statistics, vec![ ColumnStatistics { + // `a <= 25 AND a >= 10` rejects nulls in `a`. + null_count: Precision::Exact(0), min_value: Precision::Inexact(ScalarValue::Int32(Some(10))), max_value: Precision::Inexact(ScalarValue::Int32(Some(25))), ..Default::default() }, ColumnStatistics { + // `b > 45` in the upstream filter zeroes b's nulls; the outer + // filter then caps the (already zero) count, demoting to inexact. + null_count: Precision::Inexact(0), min_value: Precision::Inexact(ScalarValue::Int32(Some(46))), max_value: Precision::Inexact(ScalarValue::Int32(Some(50))), ..Default::default() @@ -1551,8 +1639,13 @@ mod tests { Arc::new(Column::new("b", 1)), )), )); - // Since filter predicate passes all entries, statistics after filter shouldn't change. - let expected = input.partition_statistics(None)?.column_statistics.clone(); + // The filter predicate passes all (non-null) entries, so min/max/NDV + // are unchanged. `a < 200` and `1 <= b` are null-rejecting, though, so + // both columns lose any nulls regardless of selectivity. + let mut expected = input.partition_statistics(None)?.column_statistics.clone(); + for col in &mut expected { + col.null_count = Precision::Exact(0); + } let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); let statistics = filter.partition_statistics(None)?; @@ -1742,10 +1835,14 @@ mod tests { statistics.column_statistics, vec![ ColumnStatistics { + // `a < 50` rejects nulls in `a`. + null_count: Precision::Exact(0), min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), max_value: Precision::Inexact(ScalarValue::Int32(Some(49))), ..Default::default() }, + // `b` is not referenced by the predicate, so its stats are + // unchanged (null count stays unknown). ColumnStatistics { min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), @@ -1790,7 +1887,9 @@ mod tests { num_rows: Precision::Absent, total_byte_size: Precision::Absent, column_statistics: vec![ColumnStatistics { - null_count: Precision::Absent, + // `a <= 10` rejects nulls, so `a` has no surviving nulls even + // though the input statistics are entirely unknown. + null_count: Precision::Exact(0), min_value: Precision::Inexact(ScalarValue::Int32(Some(5))), max_value: Precision::Inexact(ScalarValue::Int32(Some(10))), sum_value: Precision::Absent, @@ -2425,7 +2524,7 @@ mod tests { vec![Precision::Exact(1)], ), ( - "OR preserves original NDV", + "OR is not collapsed to NDV=1, but NDV is capped at filtered rows", vec![Field::new("name", DataType::Utf8, false)], vec![ColumnStatistics { distinct_count: Precision::Inexact(50), @@ -2444,7 +2543,9 @@ mod tests { Arc::new(Literal::new(ScalarValue::Utf8(Some("b".to_string())))), )), )), - vec![Precision::Inexact(50)], + // Input NDV is 50, but the 20% default selectivity on 100 rows + // estimates 20 output rows, so NDV is capped at 20. + vec![Precision::Inexact(20)], ), ( "AND with mixed types (Utf8 + Int32)", @@ -2643,11 +2744,73 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_filter_statistics_empty_input_equality_ndv_zero() -> Result<()> { + let cases: Vec<(&str, Schema, Statistics, Arc)> = vec![ + ( + "fallback string equality", + Schema::new(vec![Field::new("name", DataType::Utf8, true)]), + Statistics { + num_rows: Precision::Exact(0), + total_byte_size: Precision::Exact(0), + column_statistics: vec![ColumnStatistics { + distinct_count: Precision::Exact(0), + null_count: Precision::Exact(0), + byte_size: Precision::Exact(0), + ..Default::default() + }], + }, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("name", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Utf8(Some("x".to_string())))), + )), + ), + ( + "interval numeric equality", + Schema::new(vec![Field::new("a", DataType::Int32, true)]), + Statistics { + num_rows: Precision::Exact(0), + total_byte_size: Precision::Exact(0), + column_statistics: vec![ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(10))), + distinct_count: Precision::Exact(0), + null_count: Precision::Exact(0), + byte_size: Precision::Exact(0), + ..Default::default() + }], + }, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(5)))), + )), + ), + ]; + + for (desc, schema, input_stats, predicate) in cases { + let input = Arc::new(StatisticsExec::new(input_stats, schema)); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + let statistics = filter.partition_statistics(None)?; + + assert_eq!( + statistics.num_rows, + Precision::Inexact(0), + "case '{desc}': row count mismatch" + ); + assert_eq!( + statistics.column_statistics[0].distinct_count, + Precision::Inexact(0), + "case '{desc}': NDV should be capped at zero rows" + ); + } + Ok(()) + } + #[tokio::test] async fn test_filter_statistics_and_equality_ndv() -> Result<()> { - // a: min=1, max=100, ndv=80 - // b: min=1, max=50, ndv=40 - // c: min=1, max=200, ndv=150 let schema = Schema::new(vec![ Field::new("a", DataType::Int32, false), Field::new("b", DataType::Int32, false), @@ -2661,6 +2824,7 @@ mod tests { ColumnStatistics { min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), + null_count: Precision::Inexact(80), distinct_count: Precision::Inexact(80), ..Default::default() }, @@ -2673,6 +2837,7 @@ mod tests { ColumnStatistics { min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), max_value: Precision::Inexact(ScalarValue::Int32(Some(200))), + null_count: Precision::Inexact(90), distinct_count: Precision::Inexact(150), ..Default::default() }, @@ -2706,11 +2871,15 @@ mod tests { let filter: Arc = Arc::new(FilterExec::try_new(predicate, input)?); let statistics = filter.partition_statistics(None)?; - // a = 42 collapses to single value + // Equality predicates collapse NDV and reject nulls for their columns. assert_eq!( statistics.column_statistics[0].distinct_count, Precision::Exact(1) ); + assert_eq!( + statistics.column_statistics[0].null_count, + Precision::Exact(0) + ); // b > 10 narrows to [11, 50] but doesn't collapse to a single value. // The combined selectivity of a=42 (1/80) and c=7 (1/150) on 100 rows // computes num_rows = 1, so NDV is capped at the row count: min(40, 1) = 1. @@ -2718,11 +2887,14 @@ mod tests { statistics.column_statistics[1].distinct_count, Precision::Inexact(1) ); - // c = 7 collapses to single value assert_eq!( statistics.column_statistics[2].distinct_count, Precision::Exact(1) ); + assert_eq!( + statistics.column_statistics[2].null_count, + Precision::Exact(0) + ); Ok(()) } @@ -2742,8 +2914,8 @@ mod tests { schema.clone(), )); - // a = 42: even without known bounds, interval analysis resolves - // the equality to [42, 42], so NDV is correctly set to Exact(1) + // Even without input bounds, interval analysis can derive singleton + // bounds from the equality itself. let predicate = Arc::new(BinaryExpr::new( Arc::new(Column::new("a", 0)), Operator::Eq, @@ -3208,16 +3380,17 @@ mod tests { #[tokio::test] async fn test_filter_statistics_ndv_capped_at_row_count() -> Result<()> { - // Table: a: min=1, max=100, distinct_count=80, 100 rows - let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); let input = Arc::new(StatisticsExec::new( Statistics { num_rows: Precision::Inexact(100), - total_byte_size: Precision::Inexact(400), + total_byte_size: Precision::Inexact(1000), column_statistics: vec![ColumnStatistics { min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), + null_count: Precision::Inexact(80), distinct_count: Precision::Inexact(80), + byte_size: Precision::Exact(1000), ..Default::default() }], }, @@ -3232,14 +3405,107 @@ mod tests { Arc::new(FilterExec::try_new(predicate, input)?); let statistics = filter.partition_statistics(None)?; - // Filter estimates ~10 rows (selectivity = 10/100) assert_eq!(statistics.num_rows, Precision::Inexact(10)); - // NDV should be capped at the filtered row count (10), not the original 80 let ndv = &statistics.column_statistics[0].distinct_count; assert!( ndv.get_value().copied() <= Some(10), "Expected NDV <= 10 (filtered row count), got {ndv:?}" ); + // `a <= 10` rejects nulls, so the 80 input nulls drop to exactly zero. + assert_eq!( + statistics.column_statistics[0].null_count, + Precision::Exact(0) + ); + // byte_size follows the same 10% selectivity estimate. + assert_eq!( + statistics.column_statistics[0].byte_size, + Precision::Inexact(100) + ); + Ok(()) + } + + #[tokio::test] + async fn test_filter_statistics_default_selectivity_column_stats() -> Result<()> { + let schema = Schema::new(vec![Field::new("name", DataType::Utf8, true)]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(1000), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Inexact(80), + distinct_count: Precision::Inexact(60), + byte_size: Precision::Exact(1000), + ..Default::default() + }], + }, + schema.clone(), + )); + + // Utf8 interval analysis is unsupported, so this exercises the default + // selectivity path. The predicate rejects nulls but does not constrain + // the column to one value. + let predicate: Arc = + binary(col("name", &schema)?, Operator::Gt, lit("m"), &schema)?; + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + + let statistics = filter.partition_statistics(None)?; + assert_eq!(statistics.num_rows, Precision::Inexact(20)); + assert_eq!( + statistics.column_statistics[0].null_count, + Precision::Exact(0) + ); + assert_eq!( + statistics.column_statistics[0].byte_size, + Precision::Inexact(200) + ); + assert_eq!( + statistics.column_statistics[0].distinct_count, + Precision::Inexact(20) + ); + Ok(()) + } + + #[tokio::test] + async fn test_filter_statistics_or_does_not_reject_nulls() -> Result<()> { + let schema = Schema::new(vec![Field::new("name", DataType::Utf8, true)]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(1000), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Inexact(80), + distinct_count: Precision::Inexact(60), + byte_size: Precision::Exact(1000), + ..Default::default() + }], + }, + schema.clone(), + )); + + let predicate: Arc = binary( + binary(col("name", &schema)?, Operator::Gt, lit("m"), &schema)?, + Operator::Or, + is_null(col("name", &schema)?)?, + &schema, + )?; + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + + let statistics = filter.partition_statistics(None)?; + assert_eq!(statistics.num_rows, Precision::Inexact(20)); + assert_eq!( + statistics.column_statistics[0].null_count, + Precision::Inexact(20) + ); + assert_eq!( + statistics.column_statistics[0].byte_size, + Precision::Inexact(200) + ); + assert_eq!( + statistics.column_statistics[0].distinct_count, + Precision::Inexact(20) + ); Ok(()) } } diff --git a/datafusion/sqllogictest/test_files/parquet_statistics.slt b/datafusion/sqllogictest/test_files/parquet_statistics.slt index 1073f60a0fef2..38cde680ee20f 100644 --- a/datafusion/sqllogictest/test_files/parquet_statistics.slt +++ b/datafusion/sqllogictest/test_files/parquet_statistics.slt @@ -59,7 +59,7 @@ query TT EXPLAIN SELECT * FROM test_table WHERE column1 = 1; ---- physical_plan -01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0) Distinct=Exact(1) ScanBytes=Inexact(40))]] +01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Exact(0) Distinct=Exact(1) ScanBytes=Inexact(10))]] 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]] 03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]] @@ -84,7 +84,7 @@ query TT EXPLAIN SELECT * FROM test_table WHERE column1 = 1; ---- physical_plan -01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0) Distinct=Exact(1) ScanBytes=Inexact(40))]] +01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Exact(0) Distinct=Exact(1) ScanBytes=Inexact(10))]] 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]] 03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]] @@ -109,7 +109,7 @@ query TT EXPLAIN SELECT * FROM test_table WHERE column1 = 1; ---- physical_plan -01)FilterExec: column1@0 = 1, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Distinct=Exact(1))]] +01)FilterExec: column1@0 = 1, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Exact(0) Distinct=Exact(1))]] 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:)]] 03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:)]] @@ -152,7 +152,7 @@ query TT EXPLAIN SELECT i8 FROM typed_table WHERE i8 = 2; ---- physical_plan -01)FilterExec: i8@0 = 2, statistics=[Rows=Inexact(1), Bytes=Inexact(1), [(Col[0]: Min=Exact(Int8(2)) Max=Exact(Int8(2)) Null=Inexact(0) Distinct=Exact(1) ScanBytes=Inexact(5))]] +01)FilterExec: i8@0 = 2, statistics=[Rows=Inexact(1), Bytes=Inexact(1), [(Col[0]: Min=Exact(Int8(2)) Max=Exact(Int8(2)) Null=Exact(0) Distinct=Exact(1) ScanBytes=Inexact(1))]] 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, statistics=[Rows=Inexact(5), Bytes=Inexact(5), [(Col[0]: Min=Inexact(Int8(1)) Max=Inexact(Int8(5)) Null=Inexact(0) ScanBytes=Inexact(5))]] 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/typed_table.parquet]]}, projection=[i8], file_type=parquet, predicate=i8@0 = 2, pruning_predicate=i8_null_count@2 != row_count@3 AND i8_min@0 <= 2 AND 2 <= i8_max@1, required_guarantees=[i8 in (2)], statistics=[Rows=Inexact(5), Bytes=Inexact(5), [(Col[0]: Min=Inexact(Int8(1)) Max=Inexact(Int8(5)) Null=Inexact(0) ScanBytes=Inexact(5))]] @@ -161,7 +161,7 @@ query TT EXPLAIN SELECT i64 FROM typed_table WHERE i64 = 2; ---- physical_plan -01)FilterExec: i64@0 = 2, statistics=[Rows=Inexact(1), Bytes=Inexact(8), [(Col[0]: Min=Exact(Int64(2)) Max=Exact(Int64(2)) Null=Inexact(0) Distinct=Exact(1) ScanBytes=Inexact(40))]] +01)FilterExec: i64@0 = 2, statistics=[Rows=Inexact(1), Bytes=Inexact(8), [(Col[0]: Min=Exact(Int64(2)) Max=Exact(Int64(2)) Null=Exact(0) Distinct=Exact(1) ScanBytes=Inexact(8))]] 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(5)) Null=Inexact(0) ScanBytes=Inexact(40))]] 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/typed_table.parquet]]}, projection=[i64], file_type=parquet, predicate=i64@1 = 2, pruning_predicate=i64_null_count@2 != row_count@3 AND i64_min@0 <= 2 AND 2 <= i64_max@1, required_guarantees=[i64 in (2)], statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(5)) Null=Inexact(0) ScanBytes=Inexact(40))]] @@ -170,7 +170,7 @@ query TT EXPLAIN SELECT f32 FROM typed_table WHERE f32 = 2.5; ---- physical_plan -01)FilterExec: CAST(f32@0 AS Float64) = 2.5, statistics=[Rows=Inexact(1), Bytes=Inexact(1), [(Col[0]: Min=Exact(Float32(2.5)) Max=Exact(Float32(2.5)) Null=Inexact(0) Distinct=Exact(1) ScanBytes=Inexact(20))]] +01)FilterExec: CAST(f32@0 AS Float64) = 2.5, statistics=[Rows=Inexact(1), Bytes=Inexact(1), [(Col[0]: Min=Exact(Float32(2.5)) Max=Exact(Float32(2.5)) Null=Inexact(0) Distinct=Exact(1) ScanBytes=Inexact(1))]] 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, statistics=[Rows=Inexact(5), Bytes=Inexact(20), [(Col[0]: Min=Inexact(Float32(1.5)) Max=Inexact(Float32(5.5)) Null=Inexact(0) ScanBytes=Inexact(20))]] 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/typed_table.parquet]]}, projection=[f32], file_type=parquet, predicate=CAST(f32@2 AS Float64) = 2.5, pruning_predicate=f32_null_count@2 != row_count@3 AND CAST(f32_min@0 AS Float64) <= 2.5 AND 2.5 <= CAST(f32_max@1 AS Float64), required_guarantees=[], statistics=[Rows=Inexact(5), Bytes=Inexact(20), [(Col[0]: Min=Inexact(Float32(1.5)) Max=Inexact(Float32(5.5)) Null=Inexact(0) ScanBytes=Inexact(20))]] @@ -179,7 +179,7 @@ query TT EXPLAIN SELECT f64 FROM typed_table WHERE 2.5 = f64; ---- physical_plan -01)FilterExec: f64@0 = 2.5, statistics=[Rows=Inexact(1), Bytes=Inexact(1), [(Col[0]: Min=Exact(Float64(2.5)) Max=Exact(Float64(2.5)) Null=Inexact(0) Distinct=Exact(1) ScanBytes=Inexact(40))]] +01)FilterExec: f64@0 = 2.5, statistics=[Rows=Inexact(1), Bytes=Inexact(1), [(Col[0]: Min=Exact(Float64(2.5)) Max=Exact(Float64(2.5)) Null=Exact(0) Distinct=Exact(1) ScanBytes=Inexact(1))]] 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Float64(1.5)) Max=Inexact(Float64(5.5)) Null=Inexact(0) ScanBytes=Inexact(40))]] 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/typed_table.parquet]]}, projection=[f64], file_type=parquet, predicate=f64@3 = 2.5, pruning_predicate=f64_null_count@2 != row_count@3 AND f64_min@0 <= 2.5 AND 2.5 <= f64_max@1, required_guarantees=[f64 in (2.5)], statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Float64(1.5)) Max=Inexact(Float64(5.5)) Null=Inexact(0) ScanBytes=Inexact(40))]] From f84e55d6c9d5419bbeee3253df6c207fc6a2fca9 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Tue, 2 Jun 2026 22:50:36 -0400 Subject: [PATCH 2/2] Address review comments --- datafusion/physical-plan/src/filter.rs | 114 +++++++++++++++--- .../test_files/parquet_statistics.slt | 2 +- 2 files changed, 96 insertions(+), 20 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 3a05fb2d3b248..11d36192f3aae 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -60,7 +60,9 @@ use datafusion_common::{ use datafusion_execution::TaskContext; use datafusion_expr::Operator; use datafusion_physical_expr::equivalence::ProjectionMapping; -use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, lit}; +use datafusion_physical_expr::expressions::{ + BinaryExpr, Column, IsNotNullExpr, Literal, lit, +}; use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns}; use datafusion_physical_expr::{ @@ -310,8 +312,23 @@ impl FilterExec { &self.projection } - /// Calculates `Statistics` for `FilterExec`, by applying selectivity - /// (either default or estimated) to input statistics. + /// Calculates `Statistics` for `FilterExec` by applying the filter's + /// selectivity (default, or estimated from interval analysis) to the input + /// statistics. + /// + /// The estimated output row count is used to keep the per-column statistics + /// consistent with it: + /// - null and distinct counts are capped at the estimated row count; + /// - byte sizes (per column and total) are scaled by the selectivity; + /// - a column constrained to a single value (`col = literal`, or an + /// interval that collapses to one point) gets a distinct count of 1; + /// - a column in a null-rejecting conjunct gets a null count of 0. + /// + /// When interval analysis applies, min/max are also tightened to the + /// surviving value range. + /// + /// A contradictory predicate (e.g. `a = 1 AND a = 2`) yields zero rows and + /// empty-column statistics. pub(crate) fn statistics_helper( schema: &SchemaRef, input_stats: Statistics, @@ -852,27 +869,40 @@ fn collect_equality_columns(predicate: &Arc) -> (HashSet) -> HashSet { let mut columns = HashSet::new(); for expr in split_conjunction(predicate) { - let Some(binary) = expr.downcast_ref::() else { - continue; - }; - if !binary.op().returns_null_on_null() { + // `col IS NOT NULL` keeps only rows where `col` is non-null. + if let Some(is_not_null) = expr.downcast_ref::() { + if let Some(col) = is_not_null.arg().downcast_ref::() { + columns.insert(col.index()); + } continue; } - if let Some(col) = binary.left().downcast_ref::() { - columns.insert(col.index()); - } - if let Some(col) = binary.right().downcast_ref::() { - columns.insert(col.index()); + + // A binary operator that returns NULL on NULL input rejects rows where + // a direct column operand is NULL. + if let Some(binary) = expr.downcast_ref::() { + if !binary.op().returns_null_on_null() { + continue; + } + if let Some(col) = binary.left().downcast_ref::() { + columns.insert(col.index()); + } + if let Some(col) = binary.right().downcast_ref::() { + columns.insert(col.index()); + } } } @@ -918,9 +948,11 @@ fn cap_at_rows( } } -/// Returns the NDV for a column constrained to one non-null value, such as -/// `column = literal` or a singleton interval. The constraint gives NDV at -/// most one; a zero-row output has no distinct values. +/// Returns the NDV for a column constrained to one non-null value (e.g. +/// `column = literal` or a singleton interval), derived from the filtered row +/// estimate: zero rows means zero distinct values, a known positive row count +/// means exactly one, and an unknown row count means an inexact one (the column +/// could still be empty). /// /// The caller is responsible for proving the singleton domain. fn distinct_count_for_singleton_domain( @@ -928,6 +960,9 @@ fn distinct_count_for_singleton_domain( ) -> Precision { match filtered_num_rows { Precision::Exact(0) | Precision::Inexact(0) => filtered_num_rows, + // The row count is unknown, so the column could still be empty (zero + // distinct values); report an inexact one rather than overstating it. + Precision::Absent => Precision::Inexact(1), _ => Precision::Exact(1), } } @@ -3508,4 +3543,45 @@ mod tests { ); Ok(()) } + + #[tokio::test] + async fn test_filter_statistics_is_not_null_rejects_nulls() -> Result<()> { + let schema = Schema::new(vec![Field::new("name", DataType::Utf8, true)]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(1000), + column_statistics: vec![ColumnStatistics { + null_count: Precision::Inexact(80), + distinct_count: Precision::Inexact(60), + byte_size: Precision::Exact(1000), + ..Default::default() + }], + }, + schema.clone(), + )); + + // `name IS NOT NULL` keeps only non-null rows, so the surviving null + // count is exactly zero. Utf8 interval analysis is unsupported, so this + // also exercises the default-selectivity path. + let predicate: Arc = is_not_null(col("name", &schema)?)?; + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + + let statistics = filter.partition_statistics(None)?; + assert_eq!(statistics.num_rows, Precision::Inexact(20)); + assert_eq!( + statistics.column_statistics[0].null_count, + Precision::Exact(0) + ); + assert_eq!( + statistics.column_statistics[0].byte_size, + Precision::Inexact(200) + ); + assert_eq!( + statistics.column_statistics[0].distinct_count, + Precision::Inexact(20) + ); + Ok(()) + } } diff --git a/datafusion/sqllogictest/test_files/parquet_statistics.slt b/datafusion/sqllogictest/test_files/parquet_statistics.slt index 38cde680ee20f..9cf6b1e0381d1 100644 --- a/datafusion/sqllogictest/test_files/parquet_statistics.slt +++ b/datafusion/sqllogictest/test_files/parquet_statistics.slt @@ -109,7 +109,7 @@ query TT EXPLAIN SELECT * FROM test_table WHERE column1 = 1; ---- physical_plan -01)FilterExec: column1@0 = 1, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Exact(0) Distinct=Exact(1))]] +01)FilterExec: column1@0 = 1, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Exact(0) Distinct=Inexact(1))]] 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:)]] 03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:)]]