From 054607b9bd8e5af599e6f90cb408513f73903b0b Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 2 Mar 2026 21:20:21 +0800 Subject: [PATCH 1/4] feat: add benchmarking for case-heavy left join and improve debug logging in push down filter --- .../core/benches/sql_planner_extended.rs | 32 ++++++++++++++ datafusion/optimizer/src/push_down_filter.rs | 43 +++++++++++++++++-- 2 files changed, 72 insertions(+), 3 deletions(-) diff --git a/datafusion/core/benches/sql_planner_extended.rs b/datafusion/core/benches/sql_planner_extended.rs index adaf3e5911e9b..6c1105d392c33 100644 --- a/datafusion/core/benches/sql_planner_extended.rs +++ b/datafusion/core/benches/sql_planner_extended.rs @@ -27,6 +27,7 @@ use datafusion_expr::{cast, col, lit, not, try_cast, when}; use datafusion_functions::expr_fn::{ btrim, length, regexp_like, regexp_replace, to_timestamp, upper, }; +use std::fmt::Write; use std::hint::black_box; use std::ops::Rem; use std::sync::Arc; @@ -212,6 +213,29 @@ fn build_test_data_frame(ctx: &SessionContext, rt: &Runtime) -> DataFrame { }) } +/// Build a CASE-heavy dataframe over a non-inner join to stress +/// planner-time filter pushdown and nullability/type inference. +fn build_case_heavy_left_join_df(ctx: &SessionContext, rt: &Runtime) -> DataFrame { + let mut query = String::from( + "SELECT l.c0, r.c0 AS rc0 FROM t l LEFT JOIN t r ON l.c0 = r.c0 WHERE ", + ); + + // Keep this deterministic so comparisons between profiles are stable. + for i in 1..=30 { + if i > 1 { + query.push_str(" AND "); + } + let left_col = i % 20; + let right_col = (i + 1) % 20; + let _ = write!( + &mut query, + "CASE WHEN l.c{left_col} IS NOT NULL THEN length(l.c{left_col}) ELSE length(r.c{right_col}) END > 2" + ); + } + + rt.block_on(async { ctx.sql(&query).await.unwrap() }) +} + fn criterion_benchmark(c: &mut Criterion) { let ctx = SessionContext::new(); let rt = Runtime::new().unwrap(); @@ -220,6 +244,7 @@ fn criterion_benchmark(c: &mut Criterion) { // https://github.com/apache/datafusion/issues/17261 let df = build_test_data_frame(&ctx, &rt); + let case_heavy_left_join_df = build_case_heavy_left_join_df(&ctx, &rt); c.bench_function("logical_plan_optimize", |b| { b.iter(|| { @@ -227,6 +252,13 @@ fn criterion_benchmark(c: &mut Criterion) { black_box(rt.block_on(async { df_clone.into_optimized_plan().unwrap() })); }) }); + + c.bench_function("logical_plan_optimize_case_heavy_left_join", |b| { + b.iter(|| { + let df_clone = case_heavy_left_join_df.clone(); + black_box(rt.block_on(async { df_clone.into_optimized_plan().unwrap() })); + }) + }); } criterion_group!(benches, criterion_benchmark); diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index b1c0960386c2c..b94597fd42958 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -23,7 +23,9 @@ use std::sync::Arc; use arrow::datatypes::DataType; use indexmap::IndexSet; use itertools::Itertools; +use log::{Level, debug, log_enabled}; +use datafusion_common::instant::Instant; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, }; @@ -543,8 +545,19 @@ fn push_down_join( .map_or_else(Vec::new, |filter| split_conjunction_owned(filter.clone())); // Are there any new join predicates that can be inferred from the filter expressions? - let inferred_join_predicates = - infer_join_predicates(&join, &predicates, &on_filters)?; + let inferred_join_predicates = with_debug_timing("infer_join_predicates", || { + infer_join_predicates(&join, &predicates, &on_filters) + })?; + + if log_enabled!(Level::Debug) { + debug!( + "push_down_filter: join_type={:?}, parent_predicates={}, on_filters={}, inferred_join_predicates={}", + join.join_type, + predicates.len(), + on_filters.len(), + inferred_join_predicates.len() + ); + } if on_filters.is_empty() && predicates.is_empty() @@ -783,7 +796,15 @@ impl OptimizerRule for PushDownFilter { let predicate = split_conjunction_owned(filter.predicate.clone()); let old_predicate_len = predicate.len(); - let new_predicates = simplify_predicates(predicate)?; + let new_predicates = + with_debug_timing("simplify_predicates", || simplify_predicates(predicate))?; + if log_enabled!(Level::Debug) { + debug!( + "push_down_filter: simplify_predicates old_count={}, new_count={}", + old_predicate_len, + new_predicates.len() + ); + } if old_predicate_len != new_predicates.len() { let Some(new_predicate) = conjunction(new_predicates) else { // new_predicates is empty - remove the filter entirely @@ -1395,6 +1416,22 @@ impl PushDownFilter { } } +fn with_debug_timing(label: &'static str, f: F) -> Result +where + F: FnOnce() -> Result, +{ + if !log_enabled!(Level::Debug) { + return f(); + } + let start = Instant::now(); + let result = f(); + debug!( + "push_down_filter_timing: section={label}, elapsed_us={}", + start.elapsed().as_micros() + ); + result +} + /// replaces columns by its name on the projection. pub fn replace_cols_by_name( e: Expr, From b927fe7c1717006b0339c20f73c6c584ad76a7c0 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 3 Mar 2026 13:08:35 +0800 Subject: [PATCH 2/4] feat: enhance benchmarking for case-heavy left join with push down filter --- .../core/benches/sql_planner_extended.rs | 97 +++++++++++++++++-- 1 file changed, 91 insertions(+), 6 deletions(-) diff --git a/datafusion/core/benches/sql_planner_extended.rs b/datafusion/core/benches/sql_planner_extended.rs index 6c1105d392c33..34763e1503c30 100644 --- a/datafusion/core/benches/sql_planner_extended.rs +++ b/datafusion/core/benches/sql_planner_extended.rs @@ -18,7 +18,7 @@ use arrow::array::{ArrayRef, RecordBatch}; use arrow_schema::DataType; use arrow_schema::TimeUnit::Nanosecond; -use criterion::{Criterion, criterion_group, criterion_main}; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; use datafusion::prelude::{DataFrame, SessionContext}; use datafusion_catalog::MemTable; use datafusion_common::ScalarValue; @@ -216,23 +216,60 @@ fn build_test_data_frame(ctx: &SessionContext, rt: &Runtime) -> DataFrame { /// Build a CASE-heavy dataframe over a non-inner join to stress /// planner-time filter pushdown and nullability/type inference. fn build_case_heavy_left_join_df(ctx: &SessionContext, rt: &Runtime) -> DataFrame { + let query = build_case_heavy_left_join_query(30, 1); + rt.block_on(async { ctx.sql(&query).await.unwrap() }) +} + +fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) -> String { let mut query = String::from( "SELECT l.c0, r.c0 AS rc0 FROM t l LEFT JOIN t r ON l.c0 = r.c0 WHERE ", ); + if predicate_count == 0 { + query.push_str("TRUE"); + return query; + } + // Keep this deterministic so comparisons between profiles are stable. - for i in 1..=30 { - if i > 1 { + for i in 0..predicate_count { + if i > 0 { query.push_str(" AND "); } - let left_col = i % 20; - let right_col = (i + 1) % 20; + + let mut expr = format!("length(l.c{})", i % 20); + for depth in 0..case_depth { + let left_col = (i + depth + 1) % 20; + let right_col = (i + depth + 2) % 20; + expr = format!( + "CASE WHEN l.c{left_col} IS NOT NULL THEN {expr} ELSE length(r.c{right_col}) END" + ); + } + let _ = write!( &mut query, - "CASE WHEN l.c{left_col} IS NOT NULL THEN length(l.c{left_col}) ELSE length(r.c{right_col}) END > 2" + "{expr} > 2" ); } + query +} + +fn build_case_heavy_left_join_df_with_push_down_filter( + rt: &Runtime, + predicate_count: usize, + case_depth: usize, + push_down_filter_enabled: bool, +) -> DataFrame { + let ctx = SessionContext::new(); + register_string_table(&ctx, 100, 1000); + if !push_down_filter_enabled { + debug_assert!( + ctx.remove_optimizer_rule("push_down_filter"), + "push_down_filter rule should be present in the default optimizer" + ); + } + + let query = build_case_heavy_left_join_query(predicate_count, case_depth); rt.block_on(async { ctx.sql(&query).await.unwrap() }) } @@ -259,6 +296,54 @@ fn criterion_benchmark(c: &mut Criterion) { black_box(rt.block_on(async { df_clone.into_optimized_plan().unwrap() })); }) }); + + let mut group = c.benchmark_group("push_down_filter_case_heavy_left_join_ab"); + let predicate_sweep = [10, 20, 30, 40, 60]; + let case_depth_sweep = [1, 2, 3]; + + for case_depth in case_depth_sweep { + for predicate_count in predicate_sweep { + let with_push_down_filter = build_case_heavy_left_join_df_with_push_down_filter( + &rt, + predicate_count, + case_depth, + true, + ); + let without_push_down_filter = build_case_heavy_left_join_df_with_push_down_filter( + &rt, + predicate_count, + case_depth, + false, + ); + + let input_label = format!("predicates={predicate_count},case_depth={case_depth}"); + group.bench_with_input( + BenchmarkId::new("with_push_down_filter", &input_label), + &with_push_down_filter, + |b, df| { + b.iter(|| { + let df_clone = df.clone(); + black_box(rt.block_on(async { + df_clone.into_optimized_plan().unwrap() + })); + }) + }, + ); + group.bench_with_input( + BenchmarkId::new("without_push_down_filter", &input_label), + &without_push_down_filter, + |b, df| { + b.iter(|| { + let df_clone = df.clone(); + black_box(rt.block_on(async { + df_clone.into_optimized_plan().unwrap() + })); + }) + }, + ); + } + } + group.finish(); } criterion_group!(benches, criterion_benchmark); From 8ea18ede5589d8d14a8f0d34a59657b98f7526c7 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 3 Mar 2026 17:42:51 +0800 Subject: [PATCH 3/4] Align benchmark helper setup --- datafusion/core/benches/sql_planner_extended.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/datafusion/core/benches/sql_planner_extended.rs b/datafusion/core/benches/sql_planner_extended.rs index 34763e1503c30..7329e9b76bd07 100644 --- a/datafusion/core/benches/sql_planner_extended.rs +++ b/datafusion/core/benches/sql_planner_extended.rs @@ -216,6 +216,7 @@ fn build_test_data_frame(ctx: &SessionContext, rt: &Runtime) -> DataFrame { /// Build a CASE-heavy dataframe over a non-inner join to stress /// planner-time filter pushdown and nullability/type inference. fn build_case_heavy_left_join_df(ctx: &SessionContext, rt: &Runtime) -> DataFrame { + register_string_table(ctx, 100, 1000); let query = build_case_heavy_left_join_query(30, 1); rt.block_on(async { ctx.sql(&query).await.unwrap() }) } @@ -274,14 +275,15 @@ fn build_case_heavy_left_join_df_with_push_down_filter( } fn criterion_benchmark(c: &mut Criterion) { - let ctx = SessionContext::new(); + let baseline_ctx = SessionContext::new(); + let case_heavy_ctx = SessionContext::new(); let rt = Runtime::new().unwrap(); // validate logical plan optimize performance // https://github.com/apache/datafusion/issues/17261 - let df = build_test_data_frame(&ctx, &rt); - let case_heavy_left_join_df = build_case_heavy_left_join_df(&ctx, &rt); + let df = build_test_data_frame(&baseline_ctx, &rt); + let case_heavy_left_join_df = build_case_heavy_left_join_df(&case_heavy_ctx, &rt); c.bench_function("logical_plan_optimize", |b| { b.iter(|| { From 7f6512bf8c20a3659f8a52e5c9388aa25c154c58 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 3 Mar 2026 18:12:46 +0800 Subject: [PATCH 4/4] cargo fmt --- .../core/benches/sql_planner_extended.rs | 50 ++++++++++--------- 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/datafusion/core/benches/sql_planner_extended.rs b/datafusion/core/benches/sql_planner_extended.rs index 7329e9b76bd07..c3b0f658528de 100644 --- a/datafusion/core/benches/sql_planner_extended.rs +++ b/datafusion/core/benches/sql_planner_extended.rs @@ -246,10 +246,7 @@ fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) - ); } - let _ = write!( - &mut query, - "{expr} > 2" - ); + let _ = write!(&mut query, "{expr} > 2"); } query @@ -305,29 +302,34 @@ fn criterion_benchmark(c: &mut Criterion) { for case_depth in case_depth_sweep { for predicate_count in predicate_sweep { - let with_push_down_filter = build_case_heavy_left_join_df_with_push_down_filter( - &rt, - predicate_count, - case_depth, - true, - ); - let without_push_down_filter = build_case_heavy_left_join_df_with_push_down_filter( - &rt, - predicate_count, - case_depth, - false, - ); + let with_push_down_filter = + build_case_heavy_left_join_df_with_push_down_filter( + &rt, + predicate_count, + case_depth, + true, + ); + let without_push_down_filter = + build_case_heavy_left_join_df_with_push_down_filter( + &rt, + predicate_count, + case_depth, + false, + ); - let input_label = format!("predicates={predicate_count},case_depth={case_depth}"); + let input_label = + format!("predicates={predicate_count},case_depth={case_depth}"); group.bench_with_input( BenchmarkId::new("with_push_down_filter", &input_label), &with_push_down_filter, |b, df| { b.iter(|| { let df_clone = df.clone(); - black_box(rt.block_on(async { - df_clone.into_optimized_plan().unwrap() - })); + black_box( + rt.block_on(async { + df_clone.into_optimized_plan().unwrap() + }), + ); }) }, ); @@ -337,9 +339,11 @@ fn criterion_benchmark(c: &mut Criterion) { |b, df| { b.iter(|| { let df_clone = df.clone(); - black_box(rt.block_on(async { - df_clone.into_optimized_plan().unwrap() - })); + black_box( + rt.block_on(async { + df_clone.into_optimized_plan().unwrap() + }), + ); }) }, );