diff --git a/datafusion/core/benches/sql_planner_extended.rs b/datafusion/core/benches/sql_planner_extended.rs index adaf3e5911e9b..c3b0f658528de 100644 --- a/datafusion/core/benches/sql_planner_extended.rs +++ b/datafusion/core/benches/sql_planner_extended.rs @@ -18,7 +18,7 @@ use arrow::array::{ArrayRef, RecordBatch}; use arrow_schema::DataType; use arrow_schema::TimeUnit::Nanosecond; -use criterion::{Criterion, criterion_group, criterion_main}; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; use datafusion::prelude::{DataFrame, SessionContext}; use datafusion_catalog::MemTable; use datafusion_common::ScalarValue; @@ -27,6 +27,7 @@ use datafusion_expr::{cast, col, lit, not, try_cast, when}; use datafusion_functions::expr_fn::{ btrim, length, regexp_like, regexp_replace, to_timestamp, upper, }; +use std::fmt::Write; use std::hint::black_box; use std::ops::Rem; use std::sync::Arc; @@ -212,14 +213,74 @@ fn build_test_data_frame(ctx: &SessionContext, rt: &Runtime) -> DataFrame { }) } -fn criterion_benchmark(c: &mut Criterion) { +/// Build a CASE-heavy dataframe over a non-inner join to stress +/// planner-time filter pushdown and nullability/type inference. +fn build_case_heavy_left_join_df(ctx: &SessionContext, rt: &Runtime) -> DataFrame { + register_string_table(ctx, 100, 1000); + let query = build_case_heavy_left_join_query(30, 1); + rt.block_on(async { ctx.sql(&query).await.unwrap() }) +} + +fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) -> String { + let mut query = String::from( + "SELECT l.c0, r.c0 AS rc0 FROM t l LEFT JOIN t r ON l.c0 = r.c0 WHERE ", + ); + + if predicate_count == 0 { + query.push_str("TRUE"); + return query; + } + + // Keep this deterministic so comparisons between profiles are stable. + for i in 0..predicate_count { + if i > 0 { + query.push_str(" AND "); + } + + let mut expr = format!("length(l.c{})", i % 20); + for depth in 0..case_depth { + let left_col = (i + depth + 1) % 20; + let right_col = (i + depth + 2) % 20; + expr = format!( + "CASE WHEN l.c{left_col} IS NOT NULL THEN {expr} ELSE length(r.c{right_col}) END" + ); + } + + let _ = write!(&mut query, "{expr} > 2"); + } + + query +} + +fn build_case_heavy_left_join_df_with_push_down_filter( + rt: &Runtime, + predicate_count: usize, + case_depth: usize, + push_down_filter_enabled: bool, +) -> DataFrame { let ctx = SessionContext::new(); + register_string_table(&ctx, 100, 1000); + if !push_down_filter_enabled { + debug_assert!( + ctx.remove_optimizer_rule("push_down_filter"), + "push_down_filter rule should be present in the default optimizer" + ); + } + + let query = build_case_heavy_left_join_query(predicate_count, case_depth); + rt.block_on(async { ctx.sql(&query).await.unwrap() }) +} + +fn criterion_benchmark(c: &mut Criterion) { + let baseline_ctx = SessionContext::new(); + let case_heavy_ctx = SessionContext::new(); let rt = Runtime::new().unwrap(); // validate logical plan optimize performance // https://github.com/apache/datafusion/issues/17261 - let df = build_test_data_frame(&ctx, &rt); + let df = build_test_data_frame(&baseline_ctx, &rt); + let case_heavy_left_join_df = build_case_heavy_left_join_df(&case_heavy_ctx, &rt); c.bench_function("logical_plan_optimize", |b| { b.iter(|| { @@ -227,6 +288,68 @@ fn criterion_benchmark(c: &mut Criterion) { black_box(rt.block_on(async { df_clone.into_optimized_plan().unwrap() })); }) }); + + c.bench_function("logical_plan_optimize_case_heavy_left_join", |b| { + b.iter(|| { + let df_clone = case_heavy_left_join_df.clone(); + black_box(rt.block_on(async { df_clone.into_optimized_plan().unwrap() })); + }) + }); + + let mut group = c.benchmark_group("push_down_filter_case_heavy_left_join_ab"); + let predicate_sweep = [10, 20, 30, 40, 60]; + let case_depth_sweep = [1, 2, 3]; + + for case_depth in case_depth_sweep { + for predicate_count in predicate_sweep { + let with_push_down_filter = + build_case_heavy_left_join_df_with_push_down_filter( + &rt, + predicate_count, + case_depth, + true, + ); + let without_push_down_filter = + build_case_heavy_left_join_df_with_push_down_filter( + &rt, + predicate_count, + case_depth, + false, + ); + + let input_label = + format!("predicates={predicate_count},case_depth={case_depth}"); + group.bench_with_input( + BenchmarkId::new("with_push_down_filter", &input_label), + &with_push_down_filter, + |b, df| { + b.iter(|| { + let df_clone = df.clone(); + black_box( + rt.block_on(async { + df_clone.into_optimized_plan().unwrap() + }), + ); + }) + }, + ); + group.bench_with_input( + BenchmarkId::new("without_push_down_filter", &input_label), + &without_push_down_filter, + |b, df| { + b.iter(|| { + let df_clone = df.clone(); + black_box( + rt.block_on(async { + df_clone.into_optimized_plan().unwrap() + }), + ); + }) + }, + ); + } + } + group.finish(); } criterion_group!(benches, criterion_benchmark); diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index b1c0960386c2c..b94597fd42958 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -23,7 +23,9 @@ use std::sync::Arc; use arrow::datatypes::DataType; use indexmap::IndexSet; use itertools::Itertools; +use log::{Level, debug, log_enabled}; +use datafusion_common::instant::Instant; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, }; @@ -543,8 +545,19 @@ fn push_down_join( .map_or_else(Vec::new, |filter| split_conjunction_owned(filter.clone())); // Are there any new join predicates that can be inferred from the filter expressions? - let inferred_join_predicates = - infer_join_predicates(&join, &predicates, &on_filters)?; + let inferred_join_predicates = with_debug_timing("infer_join_predicates", || { + infer_join_predicates(&join, &predicates, &on_filters) + })?; + + if log_enabled!(Level::Debug) { + debug!( + "push_down_filter: join_type={:?}, parent_predicates={}, on_filters={}, inferred_join_predicates={}", + join.join_type, + predicates.len(), + on_filters.len(), + inferred_join_predicates.len() + ); + } if on_filters.is_empty() && predicates.is_empty() @@ -783,7 +796,15 @@ impl OptimizerRule for PushDownFilter { let predicate = split_conjunction_owned(filter.predicate.clone()); let old_predicate_len = predicate.len(); - let new_predicates = simplify_predicates(predicate)?; + let new_predicates = + with_debug_timing("simplify_predicates", || simplify_predicates(predicate))?; + if log_enabled!(Level::Debug) { + debug!( + "push_down_filter: simplify_predicates old_count={}, new_count={}", + old_predicate_len, + new_predicates.len() + ); + } if old_predicate_len != new_predicates.len() { let Some(new_predicate) = conjunction(new_predicates) else { // new_predicates is empty - remove the filter entirely @@ -1395,6 +1416,22 @@ impl PushDownFilter { } } +fn with_debug_timing(label: &'static str, f: F) -> Result +where + F: FnOnce() -> Result, +{ + if !log_enabled!(Level::Debug) { + return f(); + } + let start = Instant::now(); + let result = f(); + debug!( + "push_down_filter_timing: section={label}, elapsed_us={}", + start.elapsed().as_micros() + ); + result +} + /// replaces columns by its name on the projection. pub fn replace_cols_by_name( e: Expr,