diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index e3a6733532324..a767526feb930 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -79,3 +79,7 @@ insta = { workspace = true } [[bench]] name = "projection_unnecessary" harness = false + +[[bench]] +name = "optimize_projections" +harness = false diff --git a/datafusion/optimizer/benches/optimize_projections.rs b/datafusion/optimizer/benches/optimize_projections.rs new file mode 100644 index 0000000000000..d190c5ceabb2f --- /dev/null +++ b/datafusion/optimizer/benches/optimize_projections.rs @@ -0,0 +1,235 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Micro-benchmarks for the `OptimizeProjections` logical optimizer rule. +//! +//! Each case models a plan shape typical of TPC-H, TPC-DS, or ClickBench. +//! Schemas use realistic widths and the rule operates on a fresh +//! `LogicalPlan` per iteration (construction is in the criterion setup +//! closure and excluded from measurement). + +use std::hint::black_box; + +use arrow::datatypes::{DataType, Field, Schema}; +use criterion::{BatchSize, Criterion, criterion_group, criterion_main}; +use datafusion_expr::{ + JoinType, LogicalPlan, LogicalPlanBuilder, col, lit, logical_plan::table_scan, +}; +use datafusion_functions_aggregate::expr_fn::sum; +use datafusion_optimizer::optimize_projections::OptimizeProjections; +use datafusion_optimizer::{OptimizerContext, OptimizerRule}; + +fn table(name: &str, cols: usize) -> LogicalPlan { + let fields: Vec = (0..cols) + .map(|i| Field::new(format!("c{i}"), DataType::Int32, true)) + .collect(); + table_scan(Some(name), &Schema::new(fields), None) + .unwrap() + .build() + .unwrap() +} + +fn scan_with_filter(name: &str, cols: usize, filter_col: usize) -> LogicalPlan { + LogicalPlanBuilder::from(table(name, cols)) + .filter(col(format!("{name}.c{filter_col}")).gt(lit(0i32))) + .unwrap() + .build() + .unwrap() +} + +/// TPC-H Q3-like: customer ⨝ orders ⨝ lineitem with filters above each scan, +/// GROUP BY 3 keys, 1 SUM aggregate. Models the canonical filter→join→aggregate +/// analytical shape after PushDownFilter. +fn plan_tpch_q3() -> LogicalPlan { + let customer = scan_with_filter("customer", 8, 6); + let orders = scan_with_filter("orders", 9, 4); + let lineitem = scan_with_filter("lineitem", 16, 10); + + LogicalPlanBuilder::from(customer) + .join_on( + orders, + JoinType::Inner, + vec![col("customer.c0").eq(col("orders.c1"))], + ) + .unwrap() + .join_on( + lineitem, + JoinType::Inner, + vec![col("lineitem.c0").eq(col("orders.c0"))], + ) + .unwrap() + .aggregate( + vec![col("lineitem.c0"), col("orders.c4"), col("orders.c7")], + vec![sum(col("lineitem.c5") - col("lineitem.c6"))], + ) + .unwrap() + .build() + .unwrap() +} + +/// TPC-H Q5-like: 6-way join through region→nation→customer→orders→lineitem +/// →supplier, GROUP BY 1 key, 1 SUM. Exercises nested-join pruning depth. +fn plan_tpch_q5() -> LogicalPlan { + let region = scan_with_filter("region", 3, 1); + let nation = table("nation", 4); + let customer = table("customer", 8); + let orders = table("orders", 9); + let lineitem = table("lineitem", 16); + let supplier = table("supplier", 7); + + LogicalPlanBuilder::from(region) + .join_on( + nation, + JoinType::Inner, + vec![col("region.c0").eq(col("nation.c2"))], + ) + .unwrap() + .join_on( + customer, + JoinType::Inner, + vec![col("nation.c0").eq(col("customer.c3"))], + ) + .unwrap() + .join_on( + orders, + JoinType::Inner, + vec![col("customer.c0").eq(col("orders.c1"))], + ) + .unwrap() + .join_on( + lineitem, + JoinType::Inner, + vec![col("lineitem.c0").eq(col("orders.c0"))], + ) + .unwrap() + .join_on( + supplier, + JoinType::Inner, + vec![col("lineitem.c2").eq(col("supplier.c0"))], + ) + .unwrap() + .aggregate( + vec![col("nation.c1")], + vec![sum(col("lineitem.c5") - col("lineitem.c6"))], + ) + .unwrap() + .build() + .unwrap() +} + +/// ClickBench-style: single wide `hits` table (100 cols), conjunctive filter, +/// GROUP BY 2 keys, 2 SUM aggregates. Stresses wide-schema column lookup. +fn plan_clickbench_groupby() -> LogicalPlan { + let hits = table("hits", 100); + let predicate = col("hits.c5") + .gt(lit(100i32)) + .and(col("hits.c12").lt(lit(1000i32))); + LogicalPlanBuilder::from(hits) + .filter(predicate) + .unwrap() + .aggregate( + vec![col("hits.c3"), col("hits.c7")], + vec![sum(col("hits.c42")), sum(col("hits.c60"))], + ) + .unwrap() + .build() + .unwrap() +} + +/// TPC-DS-style CTE shape: a SubqueryAlias wrapping a filter+projection over +/// a wide fact table, joined back on two dimension tables and aggregated. +fn plan_tpcds_subquery() -> LogicalPlan { + let store_sales = table("store_sales", 23); + let customer = table("customer", 18); + let item = table("item", 22); + + let sub = LogicalPlanBuilder::from(store_sales) + .filter(col("store_sales.c5").gt(lit(0i32))) + .unwrap() + .project(vec![ + col("store_sales.c0"), + col("store_sales.c3"), + col("store_sales.c13"), + ]) + .unwrap() + .alias("sub") + .unwrap() + .build() + .unwrap(); + + LogicalPlanBuilder::from(customer) + .join_on( + sub, + JoinType::Inner, + vec![col("customer.c0").eq(col("sub.c3"))], + ) + .unwrap() + .join_on( + item, + JoinType::Inner, + vec![col("item.c0").eq(col("sub.c0"))], + ) + .unwrap() + .aggregate(vec![col("customer.c2")], vec![sum(col("sub.c13"))]) + .unwrap() + .build() + .unwrap() +} + +/// Narrow 10-column table, single filter, project 3 cols. Guards against +/// regressions on the common small-schema case where a lookup-map fix for +/// wide schemas might hurt by adding hashing overhead. +fn plan_small_schema() -> LogicalPlan { + LogicalPlanBuilder::from(table("t", 10)) + .filter(col("t.c3").gt(lit(0i32))) + .unwrap() + .project(vec![col("t.c0"), col("t.c1"), col("t.c5")]) + .unwrap() + .build() + .unwrap() +} + +type BenchCase = (&'static str, fn() -> LogicalPlan); + +fn bench_optimize_projections(c: &mut Criterion) { + let rule = OptimizeProjections::new(); + let config = OptimizerContext::new(); + let mut group = c.benchmark_group("optimize_projections"); + + let cases: &[BenchCase] = &[ + ("tpch_q3", plan_tpch_q3), + ("tpch_q5", plan_tpch_q5), + ("clickbench_groupby", plan_clickbench_groupby), + ("tpcds_subquery", plan_tpcds_subquery), + ("small_schema", plan_small_schema), + ]; + + for (name, build) in cases { + group.bench_function(*name, |b| { + b.iter_batched( + build, + |plan| black_box(rule.rewrite(plan, &config).unwrap()), + BatchSize::SmallInput, + ); + }); + } + + group.finish(); +} + +criterion_group!(benches, bench_optimize_projections); +criterion_main!(benches); diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 46eb6a725a4f0..14badcf1435d5 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -21,7 +21,6 @@ mod required_indices; use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; -use std::collections::HashSet; use std::sync::Arc; use datafusion_common::{ @@ -147,26 +146,39 @@ fn optimize_projections( // `aggregate.aggr_expr`: let (group_by_reqs, aggregate_reqs) = indices.split_off(n_group_exprs); - // Get absolutely necessary GROUP BY fields: - let group_by_expr_existing = aggregate - .group_expr - .iter() - .map(|group_by_expr| group_by_expr.schema_name().to_string()) - .collect::>(); - - let new_group_bys = if let Some(simplest_groupby_indices) = - get_required_group_by_exprs_indices( - aggregate.input.schema(), - &group_by_expr_existing, - ) { - // Some of the fields in the GROUP BY may be required by the - // parent even if these fields are unnecessary in terms of - // functional dependency. - group_by_reqs - .append(&simplest_groupby_indices) - .get_at_indices(&aggregate.group_expr) - } else { + // Get absolutely necessary GROUP BY fields. + // + // When the input has no functional dependencies, we can + // short-circuit this analysis. + let new_group_bys = if aggregate + .input + .schema() + .functional_dependencies() + .is_empty() + { aggregate.group_expr + } else { + let group_by_expr_existing = aggregate + .group_expr + .iter() + .map(|group_by_expr| group_by_expr.schema_name().to_string()) + .collect::>(); + + if let Some(simplest_groupby_indices) = + get_required_group_by_exprs_indices( + aggregate.input.schema(), + &group_by_expr_existing, + ) + { + // Some of the fields in the GROUP BY may be required by + // the parent even if these fields are unnecessary in + // terms of functional dependency. + group_by_reqs + .append(&simplest_groupby_indices) + .get_at_indices(&aggregate.group_expr) + } else { + aggregate.group_expr + } }; // Only use the absolutely necessary aggregate expressions required @@ -682,56 +694,6 @@ fn rewrite_expr(expr: Expr, input: &Projection) -> Result> { }) } -/// Accumulates outer-referenced columns by the -/// given expression, `expr`. -/// -/// # Parameters -/// -/// * `expr` - The expression to analyze for outer-referenced columns. -/// * `columns` - A mutable reference to a `HashSet` where detected -/// columns are collected. -fn outer_columns<'a>(expr: &'a Expr, columns: &mut HashSet<&'a Column>) { - // inspect_expr_pre doesn't handle subquery references, so find them explicitly - expr.apply(|expr| { - match expr { - Expr::OuterReferenceColumn(_, col) => { - columns.insert(col); - } - Expr::ScalarSubquery(subquery) => { - outer_columns_helper_multi(&subquery.outer_ref_columns, columns); - } - Expr::Exists(exists) => { - outer_columns_helper_multi(&exists.subquery.outer_ref_columns, columns); - } - Expr::InSubquery(insubquery) => { - outer_columns_helper_multi( - &insubquery.subquery.outer_ref_columns, - columns, - ); - } - _ => {} - }; - Ok(TreeNodeRecursion::Continue) - }) - // unwrap: closure above never returns Err, so can not be Err here - .unwrap(); -} - -/// A recursive subroutine that accumulates outer-referenced columns by the -/// given expressions (`exprs`). -/// -/// # Parameters -/// -/// * `exprs` - The expressions to analyze for outer-referenced columns. -/// * `columns` - A mutable reference to a `HashSet` where detected -/// columns are collected. -fn outer_columns_helper_multi<'a, 'b>( - exprs: impl IntoIterator, - columns: &'b mut HashSet<&'a Column>, -) { - exprs.into_iter().for_each(|e| outer_columns(e, columns)); -} - /// Splits requirement indices for a join into left and right children based on /// the join type. /// diff --git a/datafusion/optimizer/src/optimize_projections/required_indices.rs b/datafusion/optimizer/src/optimize_projections/required_indices.rs index c1e0885c9b5f2..5e73a9fbeceda 100644 --- a/datafusion/optimizer/src/optimize_projections/required_indices.rs +++ b/datafusion/optimizer/src/optimize_projections/required_indices.rs @@ -17,8 +17,7 @@ //! [`RequiredIndices`] helper for OptimizeProjection -use crate::optimize_projections::outer_columns; -use datafusion_common::tree_node::TreeNodeRecursion; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{Column, DFSchemaRef, Result}; use datafusion_expr::{Expr, LogicalPlan}; @@ -105,44 +104,59 @@ impl RequiredIndices { /// Adds the indices of the fields referred to by the given expression /// `expr` within the given schema (`input_schema`). /// - /// Self is NOT compacted (and thus this method is not pub) + /// Self is NOT compacted (duplicate indices are removed by a subsequent + /// [`Self::compact`] call), and thus this method is not pub. /// /// # Parameters /// /// * `input_schema`: The input schema to analyze for index requirements. /// * `expr`: An expression for which we want to find necessary field indices. fn add_expr(&mut self, input_schema: &DFSchemaRef, expr: &Expr) { - // TODO could remove these clones (and visit the expression directly) - let mut cols = expr.column_refs(); - // Get outer-referenced (subquery) columns: - outer_columns(expr, &mut cols); - self.indices.reserve(cols.len()); - for col in cols { - if let Some(idx) = input_schema.maybe_index_of_column(col) { - self.indices.push(idx); + // `apply` does not descend into subqueries, so recurse manually to + // handle those cases. + expr.apply(|e| { + match e { + Expr::Column(c) | Expr::OuterReferenceColumn(_, c) => { + if let Some(idx) = input_schema.maybe_index_of_column(c) { + self.indices.push(idx); + } + } + Expr::ScalarSubquery(sub) => { + self.add_exprs(input_schema, &sub.outer_ref_columns); + } + Expr::Exists(ex) => { + self.add_exprs(input_schema, &ex.subquery.outer_ref_columns); + } + Expr::InSubquery(isq) => { + self.add_exprs(input_schema, &isq.subquery.outer_ref_columns); + } + _ => {} } + Ok(TreeNodeRecursion::Continue) + }) + .expect("traversal is infallible"); + } + + /// Like [`Self::add_expr`], but for multiple expressions. + fn add_exprs<'a>( + &mut self, + input_schema: &DFSchemaRef, + exprs: impl IntoIterator, + ) { + for expr in exprs { + self.add_expr(input_schema, expr); } } /// Adds the indices of the fields referred to by the given expressions - /// `within the given schema. - /// - /// # Parameters - /// - /// * `input_schema`: The input schema to analyze for index requirements. - /// * `exprs`: the expressions for which we want to find field indices. + /// within the given schema. pub fn with_exprs<'a>( - self, + mut self, schema: &DFSchemaRef, exprs: impl IntoIterator, ) -> Self { - exprs - .into_iter() - .fold(self, |mut acc, expr| { - acc.add_expr(schema, expr); - acc - }) - .compact() + self.add_exprs(schema, exprs); + self.compact() } /// Adds all `indices` into this instance.