diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 181b7de7d9f71..4fba94ec3a1dc 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -661,39 +661,36 @@ mod test { let full_statistics = nested_loop_join.partition_statistics(None)?; // With empty join columns, estimate_join_statistics returns Inexact row count // based on the outer side (right side for RightSemi) - let mut expected_full_statistics = create_partition_statistics( + let expected_full_statistics = create_partition_statistics( 4, 32, 1, 4, Some((DATE_2025_03_01, DATE_2025_03_04)), - ); - expected_full_statistics.num_rows = Precision::Inexact(4); - expected_full_statistics.total_byte_size = Precision::Absent; + ) + .to_inexact(); assert_eq!(*full_statistics, expected_full_statistics); // Test partition_statistics(Some(idx)) - returns partition-specific statistics // Partition 1: ids [3,4], dates [2025-03-01, 2025-03-02] - let mut expected_statistic_partition_1 = create_partition_statistics( + let expected_statistic_partition_1 = create_partition_statistics( 2, 16, 3, 4, Some((DATE_2025_03_01, DATE_2025_03_02)), - ); - expected_statistic_partition_1.num_rows = Precision::Inexact(2); - expected_statistic_partition_1.total_byte_size = Precision::Absent; + ) + .to_inexact(); // Partition 2: ids [1,2], dates [2025-03-03, 2025-03-04] - let mut expected_statistic_partition_2 = create_partition_statistics( + let expected_statistic_partition_2 = create_partition_statistics( 2, 16, 1, 2, Some((DATE_2025_03_03, DATE_2025_03_04)), - ); - expected_statistic_partition_2.num_rows = Precision::Inexact(2); - expected_statistic_partition_2.total_byte_size = Precision::Absent; + ) + .to_inexact(); let statistics = (0..nested_loop_join.output_partitioning().partition_count()) .map(|idx| nested_loop_join.partition_statistics(Some(idx))) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 03387c316b8e1..3774a300209d0 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1448,6 +1448,7 @@ impl ExecutionPlan for HashJoinExec { Arc::unwrap_or_clone(left_stats), Arc::unwrap_or_clone(right_stats), &self.on, + self.null_equality, &self.join_type, &self.join_schema, )? @@ -1463,6 +1464,7 @@ impl ExecutionPlan for HashJoinExec { Arc::unwrap_or_clone(left_stats), Arc::unwrap_or_clone(right_stats), &self.on, + self.null_equality, &self.join_type, &self.join_schema, )? @@ -1480,6 +1482,7 @@ impl ExecutionPlan for HashJoinExec { Arc::unwrap_or_clone(left_stats), Arc::unwrap_or_clone(right_stats), &self.on, + self.null_equality, &self.join_type, &self.join_schema, )? diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 15af23b447836..a18ec0cbe4504 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -61,8 +61,9 @@ use arrow::record_batch::RecordBatch; use arrow_schema::DataType; use datafusion_common::cast::as_boolean_array; use datafusion_common::{ - JoinSide, Result, ScalarValue, Statistics, arrow_err, assert_eq_or_internal_err, - internal_datafusion_err, internal_err, project_schema, unwrap_or_internal_err, + JoinSide, NullEquality, Result, ScalarValue, Statistics, arrow_err, + assert_eq_or_internal_err, internal_datafusion_err, internal_err, project_schema, + unwrap_or_internal_err, }; use datafusion_execution::TaskContext; use datafusion_execution::disk_manager::RefCountedTempFile; @@ -713,6 +714,7 @@ impl ExecutionPlan for NestedLoopJoinExec { left_stats, right_stats, &join_columns, + NullEquality::NullEqualsNothing, &self.join_type, &self.join_schema, )?; diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index 9e87b52696a57..a86cb647e4bff 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -582,6 +582,7 @@ impl ExecutionPlan for SortMergeJoinExec { left_stats, right_stats, &self.on, + self.null_equality, &self.join_type, &self.schema, )?)) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 5918097194959..8cc93dee578d4 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -417,6 +417,7 @@ impl Clone for OnceFut { #[derive(Clone, Debug, Default)] struct PartialJoinStatistics { pub num_rows: usize, + pub total_byte_size: Precision, pub column_statistics: Vec, } @@ -430,9 +431,11 @@ struct PartialJoinStatistics { /// column-level statistics (distinct counts, min/max values) of the join keys. /// - **Column statistics**: Combines column statistics from both inputs. For join types /// that preserve all columns (Inner, Left, Right, Full), statistics from both sides -/// are concatenated. For semi/anti joins, only the relevant side's statistics are kept. -/// - **Byte size**: Always returns `Precision::Absent` as join output size is difficult -/// to estimate without knowing the actual data. +/// are concatenated. For semi/anti joins, the preserved side's statistics are +/// normalized as subset estimates. +/// - **Byte size**: For semi/anti joins, sums normalized column byte-size estimates +/// when every output column has one. Other join types return `Precision::Absent` +/// because join output size is difficult to estimate without knowing the actual data. /// /// # The `on` Parameter /// @@ -446,24 +449,34 @@ struct PartialJoinStatistics { /// - Does not account for selectivity of arbitrary join filter expressions /// (e.g., `(t1.v1 + t2.v1) % 2 = 0`). Such filters, common in NestedLoopJoinExec, /// are not factored into the cardinality estimation. -/// - Column statistics for the output are simply combined from inputs without -/// adjusting for join selectivity (acknowledged in the code as needing -/// "filter selectivity analysis"). +/// - Column statistics for inner/outer joins are simply combined from inputs +/// without adjusting for join selectivity (acknowledged in the code as +/// needing "filter selectivity analysis"). pub(crate) fn estimate_join_statistics( left_stats: Statistics, right_stats: Statistics, on: &JoinOn, + null_equality: NullEquality, join_type: &JoinType, schema: &Schema, ) -> Result { - let join_stats = estimate_join_cardinality(join_type, left_stats, right_stats, on); - let (num_rows, column_statistics) = match join_stats { - Some(stats) => (Precision::Inexact(stats.num_rows), stats.column_statistics), - None => (Precision::Absent, Statistics::unknown_column(schema)), + let join_stats = + estimate_join_cardinality(join_type, left_stats, right_stats, on, null_equality); + let (num_rows, total_byte_size, column_statistics) = match join_stats { + Some(stats) => ( + Precision::Inexact(stats.num_rows), + stats.total_byte_size, + stats.column_statistics, + ), + None => ( + Precision::Absent, + Precision::Absent, + Statistics::unknown_column(schema), + ), }; Ok(Statistics { num_rows, - total_byte_size: Precision::Absent, + total_byte_size, column_statistics, }) } @@ -474,23 +487,24 @@ fn estimate_join_cardinality( left_stats: Statistics, right_stats: Statistics, on: &JoinOn, + null_equality: NullEquality, ) -> Option { - let (left_key_stats, right_key_stats) = on + let on_column_indices = on .iter() - .map(|(left, right)| { - match ( - left.downcast_ref::(), - right.downcast_ref::(), - ) { - (Some(left), Some(right)) => ( - left_stats.column_statistics[left.index()].clone(), - right_stats.column_statistics[right.index()].clone(), - ), - _ => ( - ColumnStatistics::new_unknown(), - ColumnStatistics::new_unknown(), - ), - } + .map(|(left, right)| equijoin_column_indices(left, right)) + .collect::>(); + + let (left_key_stats, right_key_stats) = on_column_indices + .iter() + .map(|indices| match indices { + Some((left_index, right_index)) => ( + left_stats.column_statistics[*left_index].clone(), + right_stats.column_statistics[*right_index].clone(), + ), + None => ( + ColumnStatistics::new_unknown(), + ColumnStatistics::new_unknown(), + ), }) .unzip::<_, _, Vec<_>, Vec<_>>(); @@ -526,6 +540,7 @@ fn estimate_join_cardinality( Some(PartialJoinStatistics { num_rows: *cardinality.get_value()?, + total_byte_size: Precision::Absent, // We don't do anything specific here, just combine the existing // statistics which might yield subpar results (although it is // true, esp regarding min/max). For a better estimation, we need @@ -547,9 +562,9 @@ fn estimate_join_cardinality( let (outer_stats, inner_stats, outer_key_stats, inner_key_stats) = if is_left { - (&left_stats, &right_stats, &left_key_stats, &right_key_stats) + (left_stats, right_stats, left_key_stats, right_key_stats) } else { - (&right_stats, &left_stats, &right_key_stats, &left_key_stats) + (right_stats, left_stats, right_key_stats, left_key_stats) }; let outer_rows = *outer_stats.num_rows.get_value()?; @@ -575,8 +590,9 @@ fn estimate_join_cardinality( estimate_semi_join_cardinality( &outer_stats.num_rows, &inner_stats.num_rows, - outer_key_stats, - inner_key_stats, + &outer_key_stats, + &inner_key_stats, + null_equality, ) }; @@ -588,10 +604,37 @@ fn estimate_join_cardinality( (None, _) => outer_rows, }; - let outer_stats = if is_left { left_stats } else { right_stats }; + // The outer side is the one whose columns a semi/anti join emits, so + // its statistics are the ones to normalize into the subset estimate. + let Statistics { + num_rows: preserved_num_rows, + column_statistics: preserved_column_statistics, + .. + } = outer_stats; + let preserved_join_key_indices = on_column_indices + .iter() + .filter_map(|&indices| { + indices.map( + |(left_index, right_index)| { + if is_left { left_index } else { right_index } + }, + ) + }) + .collect::>(); + let column_statistics = normalize_semi_anti_join_column_statistics( + preserved_column_statistics, + &preserved_num_rows, + cardinality, + &preserved_join_key_indices, + is_anti, + null_equality, + ); + let total_byte_size = + total_byte_size_from_column_statistics(&column_statistics); Some(PartialJoinStatistics { num_rows: cardinality, - column_statistics: outer_stats.column_statistics, + total_byte_size, + column_statistics, }) } @@ -601,6 +644,7 @@ fn estimate_join_cardinality( column_statistics.push(ColumnStatistics::new_unknown()); Some(PartialJoinStatistics { num_rows, + total_byte_size: Precision::Absent, column_statistics, }) } @@ -610,12 +654,132 @@ fn estimate_join_cardinality( column_statistics.push(ColumnStatistics::new_unknown()); Some(PartialJoinStatistics { num_rows, + total_byte_size: Precision::Absent, column_statistics, }) } } } +fn equijoin_column_indices( + left: &PhysicalExprRef, + right: &PhysicalExprRef, +) -> Option<(usize, usize)> { + Some(( + left.downcast_ref::()?.index(), + right.downcast_ref::()?.index(), + )) +} + +/// Adjusts the preserved input's column statistics to describe the subset of +/// rows a semi or anti join emits. Most values become estimates (marked +/// inexact) bounded by the smaller output row count: +/// +/// - `null_count` and `byte_size` are scaled by the output/input row ratio. +/// - `distinct_count` is capped at the number of non-null output rows. +/// - `sum_value` is dropped, since the input sum does not apply to the subset. +/// +/// Join-key columns are the exception for `null_count`: under regular SQL +/// equality, null keys never match, so a semi join keeps none of those rows and +/// an anti join keeps all of them. Under null-equal joins, null keys can match +/// and are treated like the rest of the subset. +fn normalize_semi_anti_join_column_statistics( + column_statistics: Vec, + input_num_rows: &Precision, + output_num_rows: usize, + join_key_indices: &[usize], + is_anti: bool, + null_equality: NullEquality, +) -> Vec { + let input_num_rows = input_num_rows.get_value().copied().unwrap_or(0); + + column_statistics + .into_iter() + .enumerate() + .map(|(idx, stats)| { + let mut stats = stats.to_inexact(); + stats.null_count = if join_key_indices.contains(&idx) { + normalize_semi_anti_join_key_null_count( + stats.null_count, + input_num_rows, + output_num_rows, + is_anti, + null_equality, + ) + } else { + scale_subset_count(stats.null_count, input_num_rows, output_num_rows) + .min(&Precision::Inexact(output_num_rows)) + }; + let max_distinct_count = stats + .null_count + .get_value() + .map(|null_count| output_num_rows.saturating_sub(*null_count)) + .unwrap_or(output_num_rows); + stats.distinct_count = stats + .distinct_count + .min(&Precision::Inexact(max_distinct_count)); + stats.byte_size = + scale_subset_count(stats.byte_size, input_num_rows, output_num_rows); + stats.sum_value = Precision::Absent; + stats + }) + .collect() +} + +fn normalize_semi_anti_join_key_null_count( + null_count: Precision, + input_num_rows: usize, + output_num_rows: usize, + is_anti: bool, + null_equality: NullEquality, +) -> Precision { + match (is_anti, null_equality) { + (false, NullEquality::NullEqualsNothing) => Precision::Exact(0), + (true, NullEquality::NullEqualsNothing) => null_count + .to_inexact() + .min(&Precision::Inexact(output_num_rows)), + (_, NullEquality::NullEqualsNull) => { + scale_subset_count(null_count, input_num_rows, output_num_rows) + .min(&Precision::Inexact(output_num_rows)) + } + } +} + +// Scale a column-level count to an estimated row subset. Rounding up keeps a +// small non-zero count from disappearing solely because the subset is small. +fn scale_subset_count( + count: Precision, + input_num_rows: usize, + output_num_rows: usize, +) -> Precision { + let scaled = match count { + Precision::Exact(count) | Precision::Inexact(count) => { + if input_num_rows == 0 { + 0 + } else { + (count as u128 * output_num_rows as u128).div_ceil(input_num_rows as u128) + as usize + } + } + Precision::Absent => return Precision::Absent, + }; + + Precision::Inexact(scaled) +} + +fn total_byte_size_from_column_statistics( + column_statistics: &[ColumnStatistics], +) -> Precision { + column_statistics + .iter() + .map(|stats| stats.byte_size.get_value().copied()) + .try_fold(0usize, |acc, byte_size| { + byte_size.map(|byte_size| acc.saturating_add(byte_size)) + }) + .map(Precision::Inexact) + .unwrap_or(Precision::Absent) +} + /// Estimate the inner join cardinality by using the basic building blocks of /// column-level statistics and the total row count. This is a very naive and /// a very conservative implementation that can quickly give up if there is not @@ -640,6 +804,13 @@ fn estimate_inner_join_cardinality( .. } = right_stats; + if left_num_rows == Precision::Exact(0) || right_num_rows == Precision::Exact(0) { + return Some(Precision::Exact(0)); + } + if left_num_rows == Precision::Inexact(0) || right_num_rows == Precision::Inexact(0) { + return Some(Precision::Inexact(0)); + } + // Follow Spark Catalyst's conservative NDV join estimate: for multi-key // joins, use the most selective key instead of multiplying all key denominators. let mut join_selectivity = Precision::Absent; @@ -743,8 +914,8 @@ fn estimate_disjoint_inputs( /// Under the uniformity assumption (each distinct value contributes /// equally to row counts), the surviving fraction of outer rows is: /// -/// Null rows cannot match, so each column's selectivity is further -/// reduced by the outer null fraction: +/// Under regular SQL equality, null rows cannot match, so each column's +/// selectivity is further reduced by the outer null fraction: /// /// ```text /// null_frac_i = outer_null_count_i / outer_rows @@ -761,7 +932,7 @@ fn estimate_disjoint_inputs( /// Anti join cardinality is derived as the complement: /// `outer_rows - semi_cardinality`. /// -/// Boundary cases: +/// With `NullEqualsNothing`, boundary cases are: /// * `inner_ndv >= outer_ndv` → selectivity = `1.0 - null_frac` /// * `null_frac = 1.0` → selectivity = 0.0 (no non-null rows can match) /// * Missing NDV statistics → returns `None` (fallback to `outer_rows`) @@ -776,6 +947,7 @@ fn estimate_semi_join_cardinality( inner_num_rows: &Precision, outer_key_stats: &[ColumnStatistics], inner_key_stats: &[ColumnStatistics], + null_equality: NullEquality, ) -> Option { let outer_rows = *outer_num_rows.get_value()?; if outer_rows == 0 { @@ -806,11 +978,21 @@ fn estimate_semi_join_cardinality( if let (Some(&o), Some(&i)) = (outer_ndv.get_value(), inner_ndv.get_value()) && o > 0 { - let null_frac = outer_stat - .null_count - .get_value() - .map(|&nc| nc as f64 / outer_rows as f64) - .unwrap_or(0.0); + let null_frac = if null_equality == NullEquality::NullEqualsNothing { + outer_stat + .null_count + .get_value() + .map(|&nc| { + if nc > outer_rows { + 0.0 + } else { + nc as f64 / outer_rows as f64 + } + }) + .unwrap_or(0.0) + } else { + 0.0 + }; selectivity *= (o.min(i) as f64) / (o as f64) * (1.0 - null_frac); has_selectivity_estimate = true; } @@ -2590,6 +2772,7 @@ mod tests { create_stats(Some(left_num_rows), left_col_stats.clone(), false), create_stats(Some(right_num_rows), right_col_stats.clone(), false), &join_on, + NullEquality::NullEqualsNothing, ); assert_eq!( @@ -2722,6 +2905,7 @@ mod tests { create_stats(Some(1000), left_col_stats.clone(), false), create_stats(Some(2000), right_col_stats.clone(), false), &join_on, + NullEquality::NullEqualsNothing, ) .unwrap(); assert_eq!(partial_join_stats.num_rows, expected_num_rows); @@ -2775,6 +2959,7 @@ mod tests { create_stats(Some(1000), left_col_stats.clone(), false), create_stats(Some(2000), right_col_stats.clone(), false), &join_on_ab, + NullEquality::NullEqualsNothing, ) .unwrap(); let stats_ba = estimate_join_cardinality( @@ -2782,6 +2967,7 @@ mod tests { create_stats(Some(1000), left_col_stats.clone(), false), create_stats(Some(2000), right_col_stats.clone(), false), &join_on_ba, + NullEquality::NullEqualsNothing, ) .unwrap(); @@ -2855,6 +3041,7 @@ mod tests { create_stats(Some(1000), left_col_stats.clone(), true), create_stats(Some(2000), right_col_stats.clone(), true), &join_on, + NullEquality::NullEqualsNothing, ) .unwrap(); assert_eq!(partial_join_stats.num_rows, expected_num_rows); @@ -3090,6 +3277,7 @@ mod tests { column_statistics: inner_col_stats, }, &join_on, + NullEquality::NullEqualsNothing, ) .map(|cardinality| cardinality.num_rows); @@ -3124,6 +3312,7 @@ mod tests { column_statistics: dummy_column_stats.clone(), }, &join_on, + NullEquality::NullEqualsNothing, ); assert!( absent_outer_estimation.is_none(), @@ -3143,6 +3332,7 @@ mod tests { column_statistics: dummy_column_stats.clone(), }, &join_on, + NullEquality::NullEqualsNothing, ).expect("Expected non-empty PartialJoinStatistics for SemiJoin with absent inner num_rows"); assert_eq!( @@ -3163,6 +3353,7 @@ mod tests { column_statistics: dummy_column_stats, }, &join_on, + NullEquality::NullEqualsNothing, ); assert!( absent_inner_estimation.is_none(), @@ -3209,6 +3400,7 @@ mod tests { ], }, &join_on, + NullEquality::NullEqualsNothing, ) .map(|c| c.num_rows); assert_eq!(result, Some(13), "multi-column semi join"); @@ -3233,6 +3425,7 @@ mod tests { ], }, &join_on, + NullEquality::NullEqualsNothing, ) .map(|c| c.num_rows); assert_eq!(result, Some(87), "multi-column anti join"); @@ -3260,6 +3453,7 @@ mod tests { ], }, &join_on, + NullEquality::NullEqualsNothing, ) .map(|c| c.num_rows); assert_eq!(result, Some(50), "mixed stats: col1 skipped"); @@ -3284,6 +3478,7 @@ mod tests { ], }, &join_on, + NullEquality::NullEqualsNothing, ) .map(|c| c.num_rows); assert_eq!(result, Some(100), "no column has stats on both sides"); @@ -3312,6 +3507,7 @@ mod tests { ], }, &join_on, + NullEquality::NullEqualsNothing, ) .map(|c| c.num_rows); assert_eq!( @@ -3353,6 +3549,7 @@ mod tests { left_stats.clone(), right_stats.clone(), &join_on, + NullEquality::NullEqualsNothing, ) .map(|c| c.num_rows); assert_eq!(left_semi, Some(50)); @@ -3362,11 +3559,305 @@ mod tests { left_stats, right_stats, &join_on, + NullEquality::NullEqualsNothing, ) .map(|c| c.num_rows); assert_eq!(left_anti, Some(0)); } + #[test] + fn test_semi_join_scales_preserved_column_statistics() { + let join_on = vec![( + Arc::new(Column::new("l_key", 0)) as _, + Arc::new(Column::new("r_key", 0)) as _, + )]; + + let result = estimate_join_cardinality( + &JoinType::LeftSemi, + Statistics { + num_rows: Inexact(432_187), + total_byte_size: Absent, + column_statistics: vec![ + ColumnStatistics { + null_count: Exact(7_196), + min_value: Exact(ScalarValue::from(1_i64)), + max_value: Exact(ScalarValue::from(432_187_i64)), + sum_value: Absent, + distinct_count: Absent, + byte_size: Exact(3_457_496), + }, + ColumnStatistics { + null_count: Exact(7_196), + min_value: Exact(ScalarValue::from(1_i64)), + max_value: Exact(ScalarValue::from(432_187_i64)), + sum_value: Exact(ScalarValue::from(1_000_000_i64)), + distinct_count: Exact(500_000), + byte_size: Exact(3_457_496), + }, + ], + }, + Statistics { + num_rows: Inexact(32), + total_byte_size: Absent, + column_statistics: vec![create_column_stats( + Inexact(1), + Inexact(32), + Absent, + Absent, + )], + }, + &join_on, + NullEquality::NullEqualsNothing, + ) + .expect("semi join cardinality should be estimated"); + + assert_eq!(result.num_rows, 32); + assert_eq!(result.total_byte_size, Inexact(512)); + assert_eq!(result.column_statistics[0].null_count, Exact(0)); + assert_eq!(result.column_statistics[0].distinct_count, Absent); + assert_eq!( + result.column_statistics[0].min_value, + Inexact(ScalarValue::from(1_i64)) + ); + assert_eq!( + result.column_statistics[0].max_value, + Inexact(ScalarValue::from(432_187_i64)) + ); + assert_eq!(result.column_statistics[0].byte_size, Inexact(256)); + assert_eq!(result.column_statistics[1].null_count, Inexact(1)); + // distinct_count is capped at the non-null output rows (32 - 1). + assert_eq!(result.column_statistics[1].distinct_count, Inexact(31)); + assert_eq!(result.column_statistics[1].sum_value, Absent); + assert_eq!(result.column_statistics[1].byte_size, Inexact(256)); + } + + #[test] + fn test_semi_join_null_equals_null_scales_join_key_nulls() { + let join_on = vec![( + Arc::new(Column::new("l_key", 0)) as _, + Arc::new(Column::new("r_key", 0)) as _, + )]; + + let result = estimate_join_cardinality( + &JoinType::LeftSemi, + Statistics { + num_rows: Inexact(100), + total_byte_size: Absent, + column_statistics: vec![create_column_stats( + Absent, + Absent, + Inexact(100), + Exact(20), + )], + }, + Statistics { + num_rows: Inexact(10), + total_byte_size: Absent, + column_statistics: vec![create_column_stats( + Absent, + Absent, + Inexact(10), + Absent, + )], + }, + &join_on, + NullEquality::NullEqualsNull, + ) + .expect("semi join cardinality should be estimated"); + + assert_eq!(result.num_rows, 10); + assert_eq!(result.column_statistics[0].null_count, Inexact(2)); + assert_eq!(result.column_statistics[0].distinct_count, Inexact(8)); + } + + #[test] + fn test_semi_join_total_byte_size_absent_if_any_column_byte_size_absent() { + let join_on = vec![( + Arc::new(Column::new("l_key", 0)) as _, + Arc::new(Column::new("r_key", 0)) as _, + )]; + + let result = estimate_join_cardinality( + &JoinType::LeftSemi, + Statistics { + num_rows: Inexact(100), + total_byte_size: Absent, + column_statistics: vec![ + ColumnStatistics { + null_count: Exact(0), + min_value: Exact(ScalarValue::from(1_i64)), + max_value: Exact(ScalarValue::from(100_i64)), + sum_value: Absent, + distinct_count: Absent, + byte_size: Exact(800), + }, + ColumnStatistics { + null_count: Exact(0), + min_value: Absent, + max_value: Absent, + sum_value: Absent, + distinct_count: Absent, + byte_size: Absent, + }, + ], + }, + Statistics { + num_rows: Inexact(10), + total_byte_size: Absent, + column_statistics: vec![create_column_stats( + Inexact(1), + Inexact(10), + Absent, + Absent, + )], + }, + &join_on, + NullEquality::NullEqualsNothing, + ) + .expect("semi join cardinality should be estimated"); + + assert_eq!(result.num_rows, 10); + assert_eq!(result.total_byte_size, Absent); + } + + #[test] + fn test_anti_join_preserves_join_key_nulls() { + let join_on = vec![( + Arc::new(Column::new("l_key", 0)) as _, + Arc::new(Column::new("r_key", 0)) as _, + )]; + + let result = estimate_join_cardinality( + &JoinType::LeftAnti, + Statistics { + num_rows: Inexact(1_000_000), + total_byte_size: Absent, + column_statistics: vec![create_column_stats( + Absent, + Absent, + Inexact(900_000), + Exact(100_000), + )], + }, + Statistics { + num_rows: Inexact(900_000), + total_byte_size: Absent, + column_statistics: vec![create_column_stats( + Absent, + Absent, + Inexact(900_000), + Absent, + )], + }, + &join_on, + NullEquality::NullEqualsNothing, + ) + .expect("anti join cardinality should be estimated"); + + assert_eq!(result.num_rows, 100_000); + assert_eq!(result.column_statistics[0].null_count, Inexact(100_000)); + assert_eq!(result.column_statistics[0].distinct_count, Inexact(0)); + } + + #[test] + fn test_anti_join_null_equals_null_scales_join_key_nulls() { + let join_on = vec![( + Arc::new(Column::new("l_key", 0)) as _, + Arc::new(Column::new("r_key", 0)) as _, + )]; + + let result = estimate_join_cardinality( + &JoinType::LeftAnti, + Statistics { + num_rows: Inexact(100), + total_byte_size: Absent, + column_statistics: vec![create_column_stats( + Absent, + Absent, + Inexact(100), + Exact(20), + )], + }, + Statistics { + num_rows: Inexact(10), + total_byte_size: Absent, + column_statistics: vec![create_column_stats( + Absent, + Absent, + Inexact(10), + Absent, + )], + }, + &join_on, + NullEquality::NullEqualsNull, + ) + .expect("anti join cardinality should be estimated"); + + assert_eq!(result.num_rows, 90); + assert_eq!(result.column_statistics[0].null_count, Inexact(18)); + assert_eq!(result.column_statistics[0].distinct_count, Inexact(72)); + } + + #[test] + fn test_right_semi_join_scales_preserved_column_statistics() { + let join_on = vec![( + Arc::new(Column::new("l_key", 0)) as _, + Arc::new(Column::new("r_key", 0)) as _, + )]; + + // For a right semi join the right input is preserved, so its column + // statistics (and right join-key index) are the ones normalized. + let result = estimate_join_cardinality( + &JoinType::RightSemi, + Statistics { + num_rows: Inexact(32), + total_byte_size: Absent, + column_statistics: vec![create_column_stats( + Inexact(1), + Inexact(32), + Absent, + Absent, + )], + }, + Statistics { + num_rows: Inexact(432_187), + total_byte_size: Absent, + column_statistics: vec![ + ColumnStatistics { + null_count: Exact(7_196), + min_value: Exact(ScalarValue::from(1_i64)), + max_value: Exact(ScalarValue::from(432_187_i64)), + sum_value: Absent, + distinct_count: Absent, + byte_size: Exact(3_457_496), + }, + ColumnStatistics { + null_count: Exact(7_196), + min_value: Exact(ScalarValue::from(1_i64)), + max_value: Exact(ScalarValue::from(432_187_i64)), + sum_value: Exact(ScalarValue::from(1_000_000_i64)), + distinct_count: Exact(500_000), + byte_size: Exact(3_457_496), + }, + ], + }, + &join_on, + NullEquality::NullEqualsNothing, + ) + .expect("right semi join cardinality should be estimated"); + + assert_eq!(result.num_rows, 32); + // Join-key column: null counts collapse to exact zero (null keys never match). + assert_eq!(result.column_statistics[0].null_count, Exact(0)); + assert_eq!(result.column_statistics[0].byte_size, Inexact(256)); + // Non-key column: counts scaled to the subset, sum dropped, distinct + // capped at the non-null output rows (32 - 1). + assert_eq!(result.column_statistics[1].null_count, Inexact(1)); + assert_eq!(result.column_statistics[1].distinct_count, Inexact(31)); + assert_eq!(result.column_statistics[1].sum_value, Absent); + assert_eq!(result.column_statistics[1].byte_size, Inexact(256)); + } + #[test] fn test_calculate_join_output_ordering() -> Result<()> { let left_ordering = LexOrdering::new(vec![