Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 126 additions & 3 deletions datafusion/core/benches/sql_planner_extended.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use arrow::array::{ArrayRef, RecordBatch};
use arrow_schema::DataType;
use arrow_schema::TimeUnit::Nanosecond;
use criterion::{Criterion, criterion_group, criterion_main};
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use datafusion::prelude::{DataFrame, SessionContext};
use datafusion_catalog::MemTable;
use datafusion_common::ScalarValue;
Expand All @@ -27,6 +27,7 @@ use datafusion_expr::{cast, col, lit, not, try_cast, when};
use datafusion_functions::expr_fn::{
btrim, length, regexp_like, regexp_replace, to_timestamp, upper,
};
use std::fmt::Write;
use std::hint::black_box;
use std::ops::Rem;
use std::sync::Arc;
Expand Down Expand Up @@ -212,21 +213,143 @@ fn build_test_data_frame(ctx: &SessionContext, rt: &Runtime) -> DataFrame {
})
}

fn criterion_benchmark(c: &mut Criterion) {
/// Build a CASE-heavy dataframe over a non-inner join to stress
/// planner-time filter pushdown and nullability/type inference.
fn build_case_heavy_left_join_df(ctx: &SessionContext, rt: &Runtime) -> DataFrame {
register_string_table(ctx, 100, 1000);
let query = build_case_heavy_left_join_query(30, 1);
rt.block_on(async { ctx.sql(&query).await.unwrap() })
}

fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) -> String {
let mut query = String::from(
"SELECT l.c0, r.c0 AS rc0 FROM t l LEFT JOIN t r ON l.c0 = r.c0 WHERE ",
);

if predicate_count == 0 {
query.push_str("TRUE");
return query;
}

// Keep this deterministic so comparisons between profiles are stable.
for i in 0..predicate_count {
if i > 0 {
query.push_str(" AND ");
}

let mut expr = format!("length(l.c{})", i % 20);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we really want a benchmark for the case expression.
We want to optimize for the evaluation cost of the filter during pushdown, so perhaps it could be written not using a large case expression as is done currently or adaptive removing filters, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the TPC-H/TPC-DS one is already a good one to optimize for.

for depth in 0..case_depth {
let left_col = (i + depth + 1) % 20;
let right_col = (i + depth + 2) % 20;
expr = format!(
"CASE WHEN l.c{left_col} IS NOT NULL THEN {expr} ELSE length(r.c{right_col}) END"
);
}

let _ = write!(&mut query, "{expr} > 2");
}

query
}

fn build_case_heavy_left_join_df_with_push_down_filter(
rt: &Runtime,
predicate_count: usize,
case_depth: usize,
push_down_filter_enabled: bool,
) -> DataFrame {
let ctx = SessionContext::new();
register_string_table(&ctx, 100, 1000);
if !push_down_filter_enabled {
debug_assert!(
ctx.remove_optimizer_rule("push_down_filter"),
"push_down_filter rule should be present in the default optimizer"
);
}

let query = build_case_heavy_left_join_query(predicate_count, case_depth);
rt.block_on(async { ctx.sql(&query).await.unwrap() })
}

fn criterion_benchmark(c: &mut Criterion) {
let baseline_ctx = SessionContext::new();
let case_heavy_ctx = SessionContext::new();
let rt = Runtime::new().unwrap();

// validate logical plan optimize performance
// https://github.com/apache/datafusion/issues/17261

let df = build_test_data_frame(&ctx, &rt);
let df = build_test_data_frame(&baseline_ctx, &rt);
let case_heavy_left_join_df = build_case_heavy_left_join_df(&case_heavy_ctx, &rt);

c.bench_function("logical_plan_optimize", |b| {
b.iter(|| {
let df_clone = df.clone();
black_box(rt.block_on(async { df_clone.into_optimized_plan().unwrap() }));
})
});

c.bench_function("logical_plan_optimize_case_heavy_left_join", |b| {
b.iter(|| {
let df_clone = case_heavy_left_join_df.clone();
black_box(rt.block_on(async { df_clone.into_optimized_plan().unwrap() }));
})
});

let mut group = c.benchmark_group("push_down_filter_case_heavy_left_join_ab");
let predicate_sweep = [10, 20, 30, 40, 60];
let case_depth_sweep = [1, 2, 3];

for case_depth in case_depth_sweep {
for predicate_count in predicate_sweep {
let with_push_down_filter =
build_case_heavy_left_join_df_with_push_down_filter(
&rt,
predicate_count,
case_depth,
true,
);
let without_push_down_filter =
build_case_heavy_left_join_df_with_push_down_filter(
&rt,
predicate_count,
case_depth,
false,
);

let input_label =
format!("predicates={predicate_count},case_depth={case_depth}");
group.bench_with_input(
BenchmarkId::new("with_push_down_filter", &input_label),
&with_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async {
df_clone.into_optimized_plan().unwrap()
}),
);
})
},
);
group.bench_with_input(
BenchmarkId::new("without_push_down_filter", &input_label),
&without_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async {
df_clone.into_optimized_plan().unwrap()
}),
);
})
},
);
}
}
group.finish();
}

criterion_group!(benches, criterion_benchmark);
Expand Down
43 changes: 40 additions & 3 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use std::sync::Arc;
use arrow::datatypes::DataType;
use indexmap::IndexSet;
use itertools::Itertools;
use log::{Level, debug, log_enabled};

use datafusion_common::instant::Instant;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
Expand Down Expand Up @@ -543,8 +545,19 @@ fn push_down_join(
.map_or_else(Vec::new, |filter| split_conjunction_owned(filter.clone()));

// Are there any new join predicates that can be inferred from the filter expressions?
let inferred_join_predicates =
infer_join_predicates(&join, &predicates, &on_filters)?;
let inferred_join_predicates = with_debug_timing("infer_join_predicates", || {
infer_join_predicates(&join, &predicates, &on_filters)
})?;

if log_enabled!(Level::Debug) {
debug!(
"push_down_filter: join_type={:?}, parent_predicates={}, on_filters={}, inferred_join_predicates={}",
join.join_type,
predicates.len(),
on_filters.len(),
inferred_join_predicates.len()
);
}

if on_filters.is_empty()
&& predicates.is_empty()
Expand Down Expand Up @@ -783,7 +796,15 @@ impl OptimizerRule for PushDownFilter {

let predicate = split_conjunction_owned(filter.predicate.clone());
let old_predicate_len = predicate.len();
let new_predicates = simplify_predicates(predicate)?;
let new_predicates =
with_debug_timing("simplify_predicates", || simplify_predicates(predicate))?;
if log_enabled!(Level::Debug) {
debug!(
"push_down_filter: simplify_predicates old_count={}, new_count={}",
old_predicate_len,
new_predicates.len()
);
}
if old_predicate_len != new_predicates.len() {
let Some(new_predicate) = conjunction(new_predicates) else {
// new_predicates is empty - remove the filter entirely
Expand Down Expand Up @@ -1395,6 +1416,22 @@ impl PushDownFilter {
}
}

fn with_debug_timing<T, F>(label: &'static str, f: F) -> Result<T>
where
F: FnOnce() -> Result<T>,
{
if !log_enabled!(Level::Debug) {
return f();
}
let start = Instant::now();
let result = f();
debug!(
"push_down_filter_timing: section={label}, elapsed_us={}",
start.elapsed().as_micros()
);
result
}

/// replaces columns by its name on the projection.
pub fn replace_cols_by_name(
e: Expr,
Expand Down
Loading