From b3e6f7025035d8440f1c04ab9635dcf839a1ce78 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Wed, 3 Jun 2026 22:01:00 +0530 Subject: [PATCH 1/4] Fix volatile scalar subquery deduplication --- datafusion/core/src/physical_planner.rs | 77 +++++++-- .../core/tests/scalar_subquery_dedup.rs | 157 ++++++++++++++++++ datafusion/expr/src/execution_props.rs | 33 +++- datafusion/expr/src/expr.rs | 107 +++++++++++- datafusion/expr/src/logical_plan/plan.rs | 22 +++ .../optimizer/src/common_subexpr_eliminate.rs | 2 + datafusion/physical-expr/src/planner.rs | 4 +- .../sqllogictest/test_files/subquery.slt | 82 +++++++++ .../library-user-guide/upgrading/54.0.0.md | 1 + 9 files changed, 465 insertions(+), 20 deletions(-) create mode 100644 datafusion/core/tests/scalar_subquery_dedup.rs diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index dd741ee6ff12e..51f4d7efa06db 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -79,7 +79,9 @@ use datafusion_common::{ use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::memory::MemorySourceConfig; use datafusion_expr::dml::{CopyTo, InsertOp}; -use datafusion_expr::execution_props::{ScalarSubqueryResults, SubqueryIndex}; +use datafusion_expr::execution_props::{ + ScalarSubqueryResults, SubqueryIndex, SubqueryKey, +}; use datafusion_expr::expr::{ Alias, GroupingSet, NullTreatment, WindowFunction, WindowFunctionParams, physical_name, @@ -112,7 +114,7 @@ use datafusion_physical_plan::unnest::ListUnnest; use async_trait::async_trait; use datafusion_physical_plan::async_func::{AsyncFuncExec, AsyncMapper}; use futures::{StreamExt, TryStreamExt}; -use indexmap::IndexSet; +use indexmap::IndexMap; use itertools::{Itertools, multiunzip}; use log::debug; use tokio::sync::Mutex; @@ -391,14 +393,16 @@ impl DefaultPhysicalPlanner { /// at its level and then recurses in order to handle nested subqueries. #[allow(clippy::allow_attributes, clippy::mutable_key_type)] // Subquery contains Arc with interior mutability but is intentionally used as hash key fn collect_scalar_subqueries(plan: &LogicalPlan) -> Vec { - let mut subqueries = IndexSet::new(); + let mut subqueries: IndexMap = IndexMap::new(); plan.apply(|node| { for expr in node.expressions() { expr.apply(|e| { if let Expr::ScalarSubquery(sq) = e && sq.outer_ref_columns.is_empty() { - subqueries.insert(sq.clone()); + subqueries + .entry(SubqueryKey::new(sq)) + .or_insert_with(|| sq.clone()); } Ok(TreeNodeRecursion::Continue) }) @@ -407,7 +411,35 @@ impl DefaultPhysicalPlanner { Ok(TreeNodeRecursion::Continue) }) .expect("infallible"); - subqueries.into_iter().collect() + subqueries.into_values().collect() + } + + /// Give each volatile uncorrelated scalar subquery occurrence a fresh plan + /// pointer so [`SubqueryKey`] can keep otherwise-identical occurrences + /// distinct during physical planning. + fn freshen_volatile_subqueries(plan: &LogicalPlan) -> Result { + Ok(plan + .clone() + .transform_down(|node| { + node.map_expressions(|expr| { + expr.transform_down(|e| { + if let Expr::ScalarSubquery(sq) = &e + && sq.outer_ref_columns.is_empty() + && sq.is_volatile() + { + let fresh = Subquery { + subquery: Arc::new(sq.subquery.as_ref().clone()), + outer_ref_columns: sq.outer_ref_columns.clone(), + spans: sq.spans.clone(), + }; + Ok(Transformed::yes(Expr::ScalarSubquery(fresh))) + } else { + Ok(Transformed::no(e)) + } + }) + }) + })? + .data) } /// Create a physical plan from a logical plan. @@ -442,15 +474,30 @@ impl DefaultPhysicalPlanner { // scalar subqueries to joins, so none should reach this point. // Skip collection in that case to avoid creating a no-op // `ScalarSubqueryExec` wrapper. - let all_subqueries = if session_state + if !session_state .config_options() .optimizer .enable_physical_uncorrelated_scalar_subquery { - Self::collect_scalar_subqueries(logical_plan) + return self + .create_initial_plan_inner(logical_plan, session_state) + .await; + } + + let mut all_subqueries = Self::collect_scalar_subqueries(logical_plan); + let freshened = if all_subqueries.iter().any(Subquery::is_volatile) { + Some(Self::freshen_volatile_subqueries(logical_plan)?) } else { - Vec::new() + None }; + let logical_plan = match &freshened { + Some(freshened) => { + all_subqueries = Self::collect_scalar_subqueries(freshened); + freshened + } + None => logical_plan, + }; + let (links, index_map) = self .plan_scalar_subqueries(all_subqueries, session_state) .await?; @@ -2932,18 +2979,22 @@ impl DefaultPhysicalPlanner { } /// Build physical plans for scalar subqueries and assign each an ordinal - /// `SubqueryIndex`. Returns the links (plan + index) and a map from logical - /// `Subquery` to its index. + /// `SubqueryIndex`. Returns the links (plan + index) and a map from each + /// subquery's `SubqueryKey` to its index. async fn plan_scalar_subqueries( &self, subqueries: Vec, session_state: &SessionState, - ) -> Result<(Vec, DFHashMap)> { + ) -> Result<( + Vec, + DFHashMap, + )> { let mut links = Vec::with_capacity(subqueries.len()); let mut index_map = DFHashMap::with_capacity(subqueries.len()); for sq in subqueries { + let key = SubqueryKey::new(&sq); // Callers deduplicate, but guard against accidental double-planning. - if index_map.contains_key(&sq) { + if index_map.contains_key(&key) { continue; } let physical_plan = self @@ -2954,7 +3005,7 @@ impl DefaultPhysicalPlanner { plan: physical_plan, index, }); - index_map.insert(sq, index); + index_map.insert(key, index); } Ok((links, index_map)) } diff --git a/datafusion/core/tests/scalar_subquery_dedup.rs b/datafusion/core/tests/scalar_subquery_dedup.rs new file mode 100644 index 0000000000000..77bf482c5445e --- /dev/null +++ b/datafusion/core/tests/scalar_subquery_dedup.rs @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Regression tests for volatile uncorrelated scalar subquery deduplication. + +use std::hash::{Hash, Hasher}; +use std::sync::Arc; +use std::sync::atomic::{AtomicI64, Ordering}; + +use arrow::array::{Array, Int64Array}; +use arrow::datatypes::DataType; +use arrow::record_batch::RecordBatch; +use datafusion::logical_expr::{ + ColumnarValue, LogicalPlanBuilder, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, + Signature, Volatility, scalar_subquery, +}; +use datafusion::prelude::SessionContext; +use datafusion_common::{Result, ScalarValue}; + +#[derive(Debug)] +struct VolatileCounter { + signature: Signature, + counter: Arc, +} + +impl VolatileCounter { + fn new(counter: Arc) -> Self { + Self { + signature: Signature::nullary(Volatility::Volatile), + counter, + } + } +} + +impl PartialEq for VolatileCounter { + fn eq(&self, _other: &Self) -> bool { + true + } +} +impl Eq for VolatileCounter {} +impl Hash for VolatileCounter { + fn hash(&self, _state: &mut H) {} +} + +impl ScalarUDFImpl for VolatileCounter { + fn name(&self) -> &str { + "volatile_counter" + } + fn signature(&self) -> &Signature { + &self.signature + } + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Int64) + } + fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { + let value = self.counter.fetch_add(1, Ordering::SeqCst); + Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(value)))) + } +} + +fn ctx_with_counter() -> (SessionContext, Arc) { + let ctx = SessionContext::new(); + let counter = Arc::new(AtomicI64::new(0)); + ctx.register_udf(ScalarUDF::new_from_impl(VolatileCounter::new(Arc::clone( + &counter, + )))); + (ctx, counter) +} + +async fn collect_sql(ctx: &SessionContext, sql: &str) -> Result> { + ctx.sql(sql).await?.collect().await +} + +fn int64_value(batches: &[RecordBatch], col: usize) -> i64 { + batches[0] + .column(col) + .as_any() + .downcast_ref::() + .expect("Int64Array") + .value(0) +} + +fn assert_counter(counter: &Arc, expected: i64) { + assert_eq!(counter.load(Ordering::SeqCst), expected); +} + +#[tokio::test] +async fn same_node_volatile_subqueries_are_evaluated_independently() -> Result<()> { + let (ctx, counter) = ctx_with_counter(); + let batches = collect_sql( + &ctx, + "SELECT (SELECT volatile_counter()) AS a, (SELECT volatile_counter()) AS b", + ) + .await?; + + let a = int64_value(&batches, 0); + let b = int64_value(&batches, 1); + assert_ne!(a, b, "each volatile subquery must produce its own value"); + assert_counter(&counter, 2); + Ok(()) +} + +#[tokio::test] +async fn cross_node_volatile_subqueries_are_evaluated_independently() -> Result<()> { + let (ctx, counter) = ctx_with_counter(); + let batches = collect_sql( + &ctx, + "SELECT (SELECT volatile_counter()) AS a \ + FROM (SELECT 1) t \ + WHERE (SELECT volatile_counter()) >= 0", + ) + .await?; + + assert_eq!(batches.iter().map(|b| b.num_rows()).sum::(), 1); + assert_counter(&counter, 2); + Ok(()) +} + +#[tokio::test] +async fn shared_volatile_subquery_expr_is_evaluated_per_occurrence() -> Result<()> { + let (ctx, counter) = ctx_with_counter(); + + let subquery_plan = ctx + .sql("SELECT volatile_counter()") + .await? + .into_unoptimized_plan(); + let subquery = scalar_subquery(Arc::new(subquery_plan)); + + let plan = LogicalPlanBuilder::empty(true) + .project(vec![subquery.clone().alias("a"), subquery.alias("b")])? + .build()?; + + let batches = ctx.execute_logical_plan(plan).await?.collect().await?; + + let a = int64_value(&batches, 0); + let b = int64_value(&batches, 1); + assert_ne!( + a, b, + "a shared volatile subquery Expr must be evaluated per occurrence" + ); + assert_counter(&counter, 2); + Ok(()) +} diff --git a/datafusion/expr/src/execution_props.rs b/datafusion/expr/src/execution_props.rs index 649f74ed3997c..331f769e4d131 100644 --- a/datafusion/expr/src/execution_props.rs +++ b/datafusion/expr/src/execution_props.rs @@ -64,9 +64,10 @@ pub struct ExecutionProps { pub config_options: Option>, /// Providers for scalar variables pub var_providers: Option>>, - /// Maps each logical `Subquery` to its index in `subquery_results`. - /// Populated by the physical planner before calling `create_physical_expr`. - pub subquery_indexes: HashMap, + /// Maps each uncorrelated scalar [`SubqueryKey`] to its index in + /// `subquery_results`. Populated by the physical planner before calling + /// `create_physical_expr`. + pub subquery_indexes: HashMap, /// Shared results container for uncorrelated scalar subquery values. /// Populated at execution time by `ScalarSubqueryExec`. pub subquery_results: ScalarSubqueryResults, @@ -169,6 +170,32 @@ impl ExecutionProps { } } +/// Identity used to deduplicate uncorrelated scalar subqueries during physical +/// planning. +/// +/// Non-volatile subqueries are deduplicated by structure, so two textually +/// identical subqueries share a single execution. Volatile subqueries (whose +/// plan contains a function such as `random()`) are keyed by occurrence, using +/// a plan pointer freshened by the physical planner when needed. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum SubqueryKey { + /// A non-volatile subquery, deduplicated by structural identity. + Shared(crate::logical_plan::Subquery), + /// A volatile subquery occurrence. + Unique(usize), +} + +impl SubqueryKey { + /// Builds the deduplication key for `subquery`. + pub fn new(subquery: &crate::logical_plan::Subquery) -> Self { + if subquery.is_volatile() { + SubqueryKey::Unique(Arc::as_ptr(&subquery.subquery) as usize) + } else { + SubqueryKey::Shared(subquery.clone()) + } + } +} + /// Index of a scalar subquery within a [`ScalarSubqueryResults`] container. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub struct SubqueryIndex(usize); diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 98d355fad800e..417d9c964002b 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -2145,10 +2145,25 @@ impl Expr { /// For example the function call `RANDOM()` is volatile as each call will /// return a different value. /// + /// This also descends into subquery-bearing expressions, so + /// `(SELECT random())` is volatile even though the scalar subquery node is + /// not itself a volatile function. + /// /// See [`Volatility`] for more information. pub fn is_volatile(&self) -> bool { - self.exists(|expr| Ok(expr.is_volatile_node())) - .expect("exists closure is infallible") + self.exists(|expr| { + let subquery_is_volatile = match expr { + Expr::ScalarSubquery(subquery) + | Expr::Exists(Exists { subquery, .. }) + | Expr::InSubquery(InSubquery { subquery, .. }) + | Expr::SetComparison(SetComparison { subquery, .. }) => { + subquery.is_volatile() + } + _ => false, + }; + Ok(expr.is_volatile_node() || subquery_is_volatile) + }) + .expect("exists closure is infallible") } /// Recursively find all [`Expr::Placeholder`] expressions, and @@ -4185,6 +4200,94 @@ mod test { assert_eq!(udf.signature().volatility, Volatility::Volatile); } + #[test] + fn test_is_volatile_subquery() -> Result<()> { + use crate::logical_plan::{LogicalPlanBuilder, Subquery}; + use datafusion_common::Spans; + + #[derive(Debug, PartialEq, Eq, Hash)] + struct VolatileUdf { + signature: Signature, + } + impl ScalarUDFImpl for VolatileUdf { + fn name(&self) -> &str { + "volatile_udf" + } + fn signature(&self) -> &Signature { + &self.signature + } + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Int64) + } + fn invoke_with_args( + &self, + _args: ScalarFunctionArgs, + ) -> Result { + Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(0)))) + } + } + + let vol_udf = Arc::new(ScalarUDF::from(VolatileUdf { + signature: Signature::nullary(Volatility::Volatile), + })); + + let volatile_plan = Arc::new( + LogicalPlanBuilder::empty(true) + .project(vec![vol_udf.call(vec![])])? + .build()?, + ); + let stable_plan = Arc::new( + LogicalPlanBuilder::empty(true) + .project(vec![lit(1i64)])? + .build()?, + ); + + let subquery = |plan: &Arc| Subquery { + subquery: Arc::clone(plan), + outer_ref_columns: vec![], + spans: Spans::new(), + }; + + // Every subquery-bearing expression kind must surface the volatility of + // its subquery plan. + assert!(Expr::ScalarSubquery(subquery(&volatile_plan)).is_volatile()); + assert!(Expr::Exists(Exists::new(subquery(&volatile_plan), false)).is_volatile()); + assert!( + Expr::InSubquery(InSubquery::new( + Box::new(lit(1i64)), + subquery(&volatile_plan), + false + )) + .is_volatile() + ); + assert!( + Expr::SetComparison(SetComparison::new( + Box::new(lit(1i64)), + subquery(&volatile_plan), + Operator::Eq, + SetQuantifier::Any + )) + .is_volatile() + ); + + // Non-volatile subqueries are not reported as volatile. + assert!(!Expr::ScalarSubquery(subquery(&stable_plan)).is_volatile()); + assert!(!Expr::Exists(Exists::new(subquery(&stable_plan), false)).is_volatile()); + + // Volatility hidden behind a nested subquery is still detected. + let nested_plan = Arc::new( + LogicalPlanBuilder::empty(true) + .project(vec![Expr::Exists(Exists::new( + subquery(&volatile_plan), + false, + ))])? + .build()?, + ); + assert!(Expr::ScalarSubquery(subquery(&nested_plan)).is_volatile()); + + Ok(()) + } + use super::*; use crate::logical_plan::{EmptyRelation, LogicalPlan}; diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 1bfecd06c2228..633b5934e8dbd 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -4389,6 +4389,28 @@ impl Subquery { spans: Spans::new(), } } + + /// Returns `true` if this subquery's plan contains a volatile expression. + pub fn is_volatile(&self) -> bool { + plan_contains_volatile(&self.subquery) + } +} + +/// Returns `true` if any expression in `plan` is volatile. +fn plan_contains_volatile(plan: &LogicalPlan) -> bool { + plan.exists(|node| { + let mut found = false; + node.apply_expressions(|expr| { + if expr.is_volatile() { + found = true; + Ok(TreeNodeRecursion::Stop) + } else { + Ok(TreeNodeRecursion::Continue) + } + })?; + Ok(found) + }) + .expect("infallible") } impl Debug for Subquery { diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 2775d62144c56..5a54240d3a47b 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -701,6 +701,8 @@ impl CSEController for ExprCSEController<'_> { fn is_valid(node: &Expr) -> bool { !node.is_volatile_node() && !matches!(node, Expr::Lambda(_) | Expr::LambdaVariable(_)) + // Volatile scalar subqueries must be evaluated per occurrence. + && !matches!(node, Expr::ScalarSubquery(sq) if sq.is_volatile()) } fn is_ignored(&self, node: &Expr) -> bool { diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index d0d0508a106a5..e23fbb8f600d8 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -32,7 +32,7 @@ use datafusion_common::{ DFSchema, Result, ScalarValue, TableReference, ToDFSchema, exec_err, internal_datafusion_err, not_impl_err, plan_datafusion_err, plan_err, }; -use datafusion_expr::execution_props::ExecutionProps; +use datafusion_expr::execution_props::{ExecutionProps, SubqueryKey}; use datafusion_expr::expr::{ Alias, Cast, HigherOrderFunction, InList, Lambda, LambdaVariable, Placeholder, ScalarFunction, @@ -403,7 +403,7 @@ pub fn create_physical_expr( } }, Expr::ScalarSubquery(sq) => { - match execution_props.subquery_indexes.get(sq) { + match execution_props.subquery_indexes.get(&SubqueryKey::new(sq)) { Some(&index) => { let schema = sq.subquery.schema(); if schema.fields().len() != 1 { diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index 3d6f8027454c7..b4de240c65759 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -2031,6 +2031,88 @@ physical_plan 05)--AggregateExec: mode=Single, gby=[], aggr=[max(sq_values.v)] 06)----DataSourceExec: partitions=1, partition_sizes=[1] +# Volatile scalar subqueries are evaluated per occurrence. +query TT +explain SELECT (SELECT random()) - (SELECT random()) AS diff; +---- +logical_plan +01)Projection: () - () AS diff +02)--Subquery: +03)----Projection: random() +04)------EmptyRelation: rows=1 +05)--Subquery: +06)----Projection: random() +07)------EmptyRelation: rows=1 +08)--EmptyRelation: rows=1 +physical_plan +01)ScalarSubqueryExec: subqueries=2 +02)--ProjectionExec: expr=[scalar_subquery() - scalar_subquery() as diff] +03)----PlaceholderRowExec +04)--ProjectionExec: expr=[random()] +05)----PlaceholderRowExec +06)--ProjectionExec: expr=[random()] +07)----PlaceholderRowExec + +# Cross-node volatile scalar subqueries are not deduplicated. +query TT +explain SELECT (SELECT random()) AS r FROM (SELECT 1) t WHERE (SELECT random()) >= 0.0; +---- +logical_plan +01)Projection: () AS r +02)--Subquery: +03)----Projection: random() +04)------EmptyRelation: rows=1 +05)--SubqueryAlias: t +06)----Filter: () >= Float64(0) +07)------Subquery: +08)--------Projection: random() +09)----------EmptyRelation: rows=1 +10)------EmptyRelation: rows=1 +physical_plan +01)ScalarSubqueryExec: subqueries=2 +02)--ProjectionExec: expr=[scalar_subquery() as r] +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------FilterExec: scalar_subquery() >= 0 +05)--------PlaceholderRowExec +06)--ProjectionExec: expr=[random()] +07)----PlaceholderRowExec +08)--ProjectionExec: expr=[random()] +09)----PlaceholderRowExec + +# Volatility from a nested scalar subquery is still detected. +query TT +explain SELECT (SELECT (SELECT random())) - (SELECT (SELECT random())) AS d; +---- +logical_plan +01)Projection: () - () AS d +02)--Subquery: +03)----Projection: () +04)------Subquery: +05)--------Projection: random() +06)----------EmptyRelation: rows=1 +07)------EmptyRelation: rows=1 +08)--Subquery: +09)----Projection: () +10)------Subquery: +11)--------Projection: random() +12)----------EmptyRelation: rows=1 +13)------EmptyRelation: rows=1 +14)--EmptyRelation: rows=1 +physical_plan +01)ScalarSubqueryExec: subqueries=2 +02)--ProjectionExec: expr=[scalar_subquery() - scalar_subquery() as d] +03)----PlaceholderRowExec +04)--ScalarSubqueryExec: subqueries=1 +05)----ProjectionExec: expr=[scalar_subquery() as random()] +06)------PlaceholderRowExec +07)----ProjectionExec: expr=[random()] +08)------PlaceholderRowExec +09)--ScalarSubqueryExec: subqueries=1 +10)----ProjectionExec: expr=[scalar_subquery() as random()] +11)------PlaceholderRowExec +12)----ProjectionExec: expr=[random()] +13)------PlaceholderRowExec + statement ok RESET datafusion.explain.logical_plan_only; diff --git a/docs/source/library-user-guide/upgrading/54.0.0.md b/docs/source/library-user-guide/upgrading/54.0.0.md index 8245793ec07de..8e69eccdbeb46 100644 --- a/docs/source/library-user-guide/upgrading/54.0.0.md +++ b/docs/source/library-user-guide/upgrading/54.0.0.md @@ -399,6 +399,7 @@ boundary with `PhysicalPlanDecodeContext::new(task_ctx, codec)`. `ExecutionProps` gained new public fields. Code that constructs it via a struct literal, or pattern-matches it without `..`, no longer compiles. Use `ExecutionProps::new()` and include `..` in exhaustive patterns. +The `subquery_indexes` field is keyed by `SubqueryKey`. ### Items in `datafusion_functions::strings` are no longer public From 6f64f51de3c7fd1916b22665999c847abcffcbd6 Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Thu, 4 Jun 2026 11:29:09 +0530 Subject: [PATCH 2/4] one planner pass and check volatility --- datafusion/core/src/physical_planner.rs | 119 ++++++++---------- .../core/tests/scalar_subquery_dedup.rs | 52 ++++++++ datafusion/expr/src/execution_props.rs | 35 +----- datafusion/expr/src/expr.rs | 76 ++++++----- datafusion/expr/src/expr_fn.rs | 30 +---- datafusion/expr/src/expr_schema.rs | 1 + datafusion/expr/src/logical_plan/plan.rs | 41 +++++- datafusion/expr/src/logical_plan/tree_node.rs | 2 + .../optimizer/src/analyzer/type_coercion.rs | 23 ++-- .../optimizer/src/common_subexpr_eliminate.rs | 17 ++- datafusion/optimizer/src/push_down_filter.rs | 43 +++++-- .../optimizer/src/rewrite_set_comparison.rs | 1 + .../simplify_expressions/expr_simplifier.rs | 9 +- .../simplify_expressions/linear_aggregates.rs | 2 +- .../src/simplify_expressions/utils.rs | 5 +- datafusion/optimizer/src/unions_to_filter.rs | 4 +- datafusion/physical-expr/src/planner.rs | 12 +- .../proto/src/logical_plan/from_proto.rs | 8 +- datafusion/sql/src/expr/subquery.rs | 24 +--- datafusion/sql/src/relation/mod.rs | 16 +-- .../sqllogictest/test_files/subquery.slt | 24 ++++ .../logical_plan/consumer/expr/subquery.rs | 32 +++-- .../src/logical_plan/consumer/rel/mod.rs | 4 +- .../tests/cases/roundtrip_logical_plan.rs | 22 ++-- .../library-user-guide/upgrading/54.0.0.md | 4 +- 25 files changed, 347 insertions(+), 259 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 51f4d7efa06db..d47abd3919a54 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -79,9 +79,7 @@ use datafusion_common::{ use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::memory::MemorySourceConfig; use datafusion_expr::dml::{CopyTo, InsertOp}; -use datafusion_expr::execution_props::{ - ScalarSubqueryResults, SubqueryIndex, SubqueryKey, -}; +use datafusion_expr::execution_props::{ScalarSubqueryResults, SubqueryIndex}; use datafusion_expr::expr::{ Alias, GroupingSet, NullTreatment, WindowFunction, WindowFunctionParams, physical_name, @@ -388,58 +386,53 @@ impl DefaultPhysicalPlanner { Ok(()) } - /// Collect uncorrelated scalar subqueries. We don't descend into nested - /// subqueries here: each call to `create_initial_plan` handles subqueries - /// at its level and then recurses in order to handle nested subqueries. + /// Assign indexes to uncorrelated scalar subqueries at this plan level. + /// + /// Non-volatile subqueries reuse an index by structure. Volatile subqueries + /// always get a fresh index. #[allow(clippy::allow_attributes, clippy::mutable_key_type)] // Subquery contains Arc with interior mutability but is intentionally used as hash key - fn collect_scalar_subqueries(plan: &LogicalPlan) -> Vec { - let mut subqueries: IndexMap = IndexMap::new(); - plan.apply(|node| { - for expr in node.expressions() { - expr.apply(|e| { - if let Expr::ScalarSubquery(sq) = e - && sq.outer_ref_columns.is_empty() - { - subqueries - .entry(SubqueryKey::new(sq)) - .or_insert_with(|| sq.clone()); - } - Ok(TreeNodeRecursion::Continue) - }) - .expect("infallible"); - } - Ok(TreeNodeRecursion::Continue) - }) - .expect("infallible"); - subqueries.into_values().collect() - } + fn assign_scalar_subquery_indexes( + plan: &LogicalPlan, + ) -> Result<(LogicalPlan, Vec)> { + let mut dedup: IndexMap = IndexMap::new(); + let mut subqueries = Vec::new(); - /// Give each volatile uncorrelated scalar subquery occurrence a fresh plan - /// pointer so [`SubqueryKey`] can keep otherwise-identical occurrences - /// distinct during physical planning. - fn freshen_volatile_subqueries(plan: &LogicalPlan) -> Result { - Ok(plan + let plan = plan .clone() .transform_down(|node| { node.map_expressions(|expr| { expr.transform_down(|e| { - if let Expr::ScalarSubquery(sq) = &e + if let Expr::ScalarSubquery(ref sq) = e && sq.outer_ref_columns.is_empty() - && sq.is_volatile() { - let fresh = Subquery { - subquery: Arc::new(sq.subquery.as_ref().clone()), - outer_ref_columns: sq.outer_ref_columns.clone(), - spans: sq.spans.clone(), + let volatile = sq.is_volatile(); + let dedup_key = sq.clone(); + let (index, is_new) = if volatile { + (SubqueryIndex::new(subqueries.len()), true) + } else if let Some(index) = dedup.get(&dedup_key) { + (*index, false) + } else { + (SubqueryIndex::new(subqueries.len()), true) }; - Ok(Transformed::yes(Expr::ScalarSubquery(fresh))) + + let sq = sq.with_scalar_subquery_index(index); + if is_new { + if !volatile { + dedup.insert(dedup_key, index); + } + subqueries.push(sq.clone()); + } + + Ok(Transformed::yes(Expr::ScalarSubquery(sq))) } else { Ok(Transformed::no(e)) } }) }) })? - .data) + .data; + + Ok((plan, subqueries)) } /// Create a physical plan from a logical plan. @@ -484,19 +477,8 @@ impl DefaultPhysicalPlanner { .await; } - let mut all_subqueries = Self::collect_scalar_subqueries(logical_plan); - let freshened = if all_subqueries.iter().any(Subquery::is_volatile) { - Some(Self::freshen_volatile_subqueries(logical_plan)?) - } else { - None - }; - let logical_plan = match &freshened { - Some(freshened) => { - all_subqueries = Self::collect_scalar_subqueries(freshened); - freshened - } - None => logical_plan, - }; + let (logical_plan, all_subqueries) = + Self::assign_scalar_subquery_indexes(logical_plan)?; let (links, index_map) = self .plan_scalar_subqueries(all_subqueries, session_state) @@ -504,7 +486,7 @@ impl DefaultPhysicalPlanner { if links.is_empty() { return self - .create_initial_plan_inner(logical_plan, session_state) + .create_initial_plan_inner(&logical_plan, session_state) .await; } @@ -524,7 +506,7 @@ impl DefaultPhysicalPlanner { let session_state = Cow::Owned(owned); let plan = self - .create_initial_plan_inner(logical_plan, &session_state) + .create_initial_plan_inner(&logical_plan, &session_state) .await?; Ok(Arc::new(ScalarSubqueryExec::new(plan, links, results))) }) @@ -2978,34 +2960,35 @@ impl DefaultPhysicalPlanner { Ok(mem_exec) } - /// Build physical plans for scalar subqueries and assign each an ordinal - /// `SubqueryIndex`. Returns the links (plan + index) and a map from each - /// subquery's `SubqueryKey` to its index. + /// Build physical plans for scalar subqueries and return their links plus + /// an index map for direct physical expression planning of non-volatile + /// subqueries. Volatile subqueries are only resolved by their assigned + /// occurrence index so unindexed copies cannot accidentally share a value. async fn plan_scalar_subqueries( &self, subqueries: Vec, session_state: &SessionState, - ) -> Result<( - Vec, - DFHashMap, - )> { + ) -> Result<(Vec, DFHashMap)> { let mut links = Vec::with_capacity(subqueries.len()); let mut index_map = DFHashMap::with_capacity(subqueries.len()); + let mut indexes = HashSet::with_capacity(subqueries.len()); for sq in subqueries { - let key = SubqueryKey::new(&sq); - // Callers deduplicate, but guard against accidental double-planning. - if index_map.contains_key(&key) { - continue; + let Some(index) = sq.scalar_subquery_index else { + return internal_err!("Scalar subquery missing planner-assigned index"); + }; + if !indexes.insert(index) { + return internal_err!("Duplicate scalar subquery index {index:?}"); } let physical_plan = self .create_initial_plan(&sq.subquery, session_state) .await?; - let index = SubqueryIndex::new(links.len()); links.push(ScalarSubqueryLink { plan: physical_plan, index, }); - index_map.insert(key, index); + if !sq.is_volatile() { + index_map.insert(sq.without_scalar_subquery_index(), index); + } } Ok((links, index_map)) } diff --git a/datafusion/core/tests/scalar_subquery_dedup.rs b/datafusion/core/tests/scalar_subquery_dedup.rs index 77bf482c5445e..0154401c30e60 100644 --- a/datafusion/core/tests/scalar_subquery_dedup.rs +++ b/datafusion/core/tests/scalar_subquery_dedup.rs @@ -98,6 +98,10 @@ fn assert_counter(counter: &Arc, expected: i64) { assert_eq!(counter.load(Ordering::SeqCst), expected); } +fn num_rows(batches: &[RecordBatch]) -> usize { + batches.iter().map(|b| b.num_rows()).sum() +} + #[tokio::test] async fn same_node_volatile_subqueries_are_evaluated_independently() -> Result<()> { let (ctx, counter) = ctx_with_counter(); @@ -114,6 +118,23 @@ async fn same_node_volatile_subqueries_are_evaluated_independently() -> Result<( Ok(()) } +#[tokio::test] +async fn wrapped_volatile_subqueries_are_not_cse_candidates() -> Result<()> { + let (ctx, counter) = ctx_with_counter(); + let batches = collect_sql( + &ctx, + "SELECT (SELECT volatile_counter()) + 1 AS a, \ + (SELECT volatile_counter()) + 1 AS b", + ) + .await?; + + let a = int64_value(&batches, 0); + let b = int64_value(&batches, 1); + assert_ne!(a, b, "wrapped volatile subqueries must not be shared"); + assert_counter(&counter, 2); + Ok(()) +} + #[tokio::test] async fn cross_node_volatile_subqueries_are_evaluated_independently() -> Result<()> { let (ctx, counter) = ctx_with_counter(); @@ -130,6 +151,37 @@ async fn cross_node_volatile_subqueries_are_evaluated_independently() -> Result< Ok(()) } +#[tokio::test] +async fn union_filter_does_not_duplicate_volatile_scalar_subquery() -> Result<()> { + let (ctx, counter) = ctx_with_counter(); + let batches = collect_sql( + &ctx, + "SELECT a FROM (SELECT 1 AS a UNION ALL SELECT 2 AS a) t \ + WHERE (SELECT volatile_counter()) = 0", + ) + .await?; + + assert_eq!(num_rows(&batches), 2); + assert_counter(&counter, 1); + Ok(()) +} + +#[tokio::test] +async fn join_or_extraction_does_not_duplicate_volatile_scalar_subquery() -> Result<()> { + let (ctx, counter) = ctx_with_counter(); + let batches = collect_sql( + &ctx, + "SELECT l.a, r.b \ + FROM (SELECT 1 AS a) l JOIN (SELECT 2 AS b) r ON true \ + WHERE (l.a = 1 AND (SELECT volatile_counter()) = 0) OR r.b = 3", + ) + .await?; + + assert_eq!(num_rows(&batches), 1); + assert_counter(&counter, 1); + Ok(()) +} + #[tokio::test] async fn shared_volatile_subquery_expr_is_evaluated_per_occurrence() -> Result<()> { let (ctx, counter) = ctx_with_counter(); diff --git a/datafusion/expr/src/execution_props.rs b/datafusion/expr/src/execution_props.rs index 331f769e4d131..eaa0f59b5d3c4 100644 --- a/datafusion/expr/src/execution_props.rs +++ b/datafusion/expr/src/execution_props.rs @@ -64,10 +64,9 @@ pub struct ExecutionProps { pub config_options: Option>, /// Providers for scalar variables pub var_providers: Option>>, - /// Maps each uncorrelated scalar [`SubqueryKey`] to its index in - /// `subquery_results`. Populated by the physical planner before calling - /// `create_physical_expr`. - pub subquery_indexes: HashMap, + /// Maps each logical `Subquery` to its index in `subquery_results`. + /// Populated by the physical planner before calling `create_physical_expr`. + pub subquery_indexes: HashMap, /// Shared results container for uncorrelated scalar subquery values. /// Populated at execution time by `ScalarSubqueryExec`. pub subquery_results: ScalarSubqueryResults, @@ -170,34 +169,8 @@ impl ExecutionProps { } } -/// Identity used to deduplicate uncorrelated scalar subqueries during physical -/// planning. -/// -/// Non-volatile subqueries are deduplicated by structure, so two textually -/// identical subqueries share a single execution. Volatile subqueries (whose -/// plan contains a function such as `random()`) are keyed by occurrence, using -/// a plan pointer freshened by the physical planner when needed. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub enum SubqueryKey { - /// A non-volatile subquery, deduplicated by structural identity. - Shared(crate::logical_plan::Subquery), - /// A volatile subquery occurrence. - Unique(usize), -} - -impl SubqueryKey { - /// Builds the deduplication key for `subquery`. - pub fn new(subquery: &crate::logical_plan::Subquery) -> Self { - if subquery.is_volatile() { - SubqueryKey::Unique(Arc::as_ptr(&subquery.subquery) as usize) - } else { - SubqueryKey::Shared(subquery.clone()) - } - } -} - /// Index of a scalar subquery within a [`ScalarSubqueryResults`] container. -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Hash)] pub struct SubqueryIndex(usize); impl SubqueryIndex { diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 417d9c964002b..d4dd530cdd195 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -2145,12 +2145,15 @@ impl Expr { /// For example the function call `RANDOM()` is volatile as each call will /// return a different value. /// - /// This also descends into subquery-bearing expressions, so - /// `(SELECT random())` is volatile even though the scalar subquery node is - /// not itself a volatile function. - /// /// See [`Volatility`] for more information. pub fn is_volatile(&self) -> bool { + self.exists(|expr| Ok(expr.is_volatile_node())) + .expect("exists closure is infallible") + } + + /// Like [`Self::is_volatile`], but also descends into subquery-bearing + /// expressions. + pub fn is_volatile_including_subqueries(&self) -> bool { self.exists(|expr| { let subquery_is_volatile = match expr { Expr::ScalarSubquery(subquery) @@ -3869,14 +3872,14 @@ mod test { ) .unwrap(), ); - let subquery = Subquery { - subquery: Arc::new(LogicalPlan::EmptyRelation(EmptyRelation { + let subquery = Subquery::new( + Arc::new(LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, schema: subquery_schema, })), - outer_ref_columns: vec![], - spans: Spans::new(), - }; + vec![], + Spans::new(), + ); let in_subquery = Expr::InSubquery(InSubquery { expr: Box::new(Expr::Placeholder(Placeholder { @@ -3917,14 +3920,14 @@ mod test { ) .unwrap(), ); - let subquery = Subquery { - subquery: Arc::new(LogicalPlan::EmptyRelation(EmptyRelation { + let subquery = Subquery::new( + Arc::new(LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, schema: subquery_schema, })), - outer_ref_columns: vec![], - spans: Spans::new(), - }; + vec![], + Spans::new(), + ); let not_in_subquery = Expr::InSubquery(InSubquery { expr: Box::new(Expr::Placeholder(Placeholder { @@ -4242,23 +4245,27 @@ mod test { .build()?, ); - let subquery = |plan: &Arc| Subquery { - subquery: Arc::clone(plan), - outer_ref_columns: vec![], - spans: Spans::new(), + let subquery = |plan: &Arc| { + Subquery::new(Arc::clone(plan), vec![], Spans::new()) }; // Every subquery-bearing expression kind must surface the volatility of // its subquery plan. - assert!(Expr::ScalarSubquery(subquery(&volatile_plan)).is_volatile()); - assert!(Expr::Exists(Exists::new(subquery(&volatile_plan), false)).is_volatile()); + assert!( + Expr::ScalarSubquery(subquery(&volatile_plan)) + .is_volatile_including_subqueries() + ); + assert!( + Expr::Exists(Exists::new(subquery(&volatile_plan), false)) + .is_volatile_including_subqueries() + ); assert!( Expr::InSubquery(InSubquery::new( Box::new(lit(1i64)), subquery(&volatile_plan), false )) - .is_volatile() + .is_volatile_including_subqueries() ); assert!( Expr::SetComparison(SetComparison::new( @@ -4267,12 +4274,18 @@ mod test { Operator::Eq, SetQuantifier::Any )) - .is_volatile() + .is_volatile_including_subqueries() ); // Non-volatile subqueries are not reported as volatile. - assert!(!Expr::ScalarSubquery(subquery(&stable_plan)).is_volatile()); - assert!(!Expr::Exists(Exists::new(subquery(&stable_plan), false)).is_volatile()); + assert!( + !Expr::ScalarSubquery(subquery(&stable_plan)) + .is_volatile_including_subqueries() + ); + assert!( + !Expr::Exists(Exists::new(subquery(&stable_plan), false)) + .is_volatile_including_subqueries() + ); // Volatility hidden behind a nested subquery is still detected. let nested_plan = Arc::new( @@ -4283,7 +4296,10 @@ mod test { ))])? .build()?, ); - assert!(Expr::ScalarSubquery(subquery(&nested_plan)).is_volatile()); + assert!( + Expr::ScalarSubquery(subquery(&nested_plan)) + .is_volatile_including_subqueries() + ); Ok(()) } @@ -4382,14 +4398,14 @@ mod test { #[test] fn test_display_set_comparison() { - let subquery = Subquery { - subquery: Arc::new(LogicalPlan::EmptyRelation(EmptyRelation { + let subquery = Subquery::new( + Arc::new(LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, schema: Arc::new(DFSchema::empty()), })), - outer_ref_columns: vec![], - spans: Spans::new(), - }; + vec![], + Spans::new(), + ); let expr = Expr::SetComparison(SetComparison::new( Box::new(Expr::Column(Column::from_name("a"))), diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 9d711113e4f74..244ce69ef4994 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -251,11 +251,7 @@ pub fn in_list(expr: Expr, list: Vec, negated: bool) -> Expr { pub fn exists(subquery: Arc) -> Expr { let outer_ref_columns = subquery.all_out_ref_exprs(); Expr::Exists(Exists { - subquery: Subquery { - subquery, - outer_ref_columns, - spans: Spans::new(), - }, + subquery: Subquery::new(subquery, outer_ref_columns, Spans::new()), negated: false, }) } @@ -264,11 +260,7 @@ pub fn exists(subquery: Arc) -> Expr { pub fn not_exists(subquery: Arc) -> Expr { let outer_ref_columns = subquery.all_out_ref_exprs(); Expr::Exists(Exists { - subquery: Subquery { - subquery, - outer_ref_columns, - spans: Spans::new(), - }, + subquery: Subquery::new(subquery, outer_ref_columns, Spans::new()), negated: true, }) } @@ -278,11 +270,7 @@ pub fn in_subquery(expr: Expr, subquery: Arc) -> Expr { let outer_ref_columns = subquery.all_out_ref_exprs(); Expr::InSubquery(InSubquery::new( Box::new(expr), - Subquery { - subquery, - outer_ref_columns, - spans: Spans::new(), - }, + Subquery::new(subquery, outer_ref_columns, Spans::new()), false, )) } @@ -292,11 +280,7 @@ pub fn not_in_subquery(expr: Expr, subquery: Arc) -> Expr { let outer_ref_columns = subquery.all_out_ref_exprs(); Expr::InSubquery(InSubquery::new( Box::new(expr), - Subquery { - subquery, - outer_ref_columns, - spans: Spans::new(), - }, + Subquery::new(subquery, outer_ref_columns, Spans::new()), true, )) } @@ -304,11 +288,7 @@ pub fn not_in_subquery(expr: Expr, subquery: Arc) -> Expr { /// Create a scalar subquery expression pub fn scalar_subquery(subquery: Arc) -> Expr { let outer_ref_columns = subquery.all_out_ref_exprs(); - Expr::ScalarSubquery(Subquery { - subquery, - outer_ref_columns, - spans: Spans::new(), - }) + Expr::ScalarSubquery(Subquery::new(subquery, outer_ref_columns, Spans::new())) } /// Create a grouping set diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index 039bbad65a660..6fccb705f8687 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -788,6 +788,7 @@ pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) -> Result { self.assert_no_expressions(expr)?; @@ -968,6 +970,7 @@ impl LogicalPlan { subquery: Arc::new(subquery), outer_ref_columns: outer_ref_columns.clone(), spans: spans.clone(), + scalar_subquery_index: *scalar_subquery_index, })) } LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => { @@ -4352,6 +4355,8 @@ pub struct Subquery { pub outer_ref_columns: Vec, /// Span information for subquery projection columns pub spans: Spans, + /// Index assigned by the physical planner for uncorrelated scalar subqueries. + pub scalar_subquery_index: Option, } impl Normalizeable for Subquery { @@ -4370,10 +4375,25 @@ impl NormalizeEq for Subquery { .iter() .zip(other.outer_ref_columns.iter()) .all(|(a, b)| a.normalize_eq(b)) + && self.scalar_subquery_index == other.scalar_subquery_index } } impl Subquery { + /// Creates a subquery without a physical planner index. + pub fn new( + subquery: Arc, + outer_ref_columns: Vec, + spans: Spans, + ) -> Self { + Self { + subquery, + outer_ref_columns, + spans, + scalar_subquery_index: None, + } + } + pub fn try_from_expr(plan: &Expr) -> Result<&Subquery> { match plan { Expr::ScalarSubquery(it) => Ok(it), @@ -4387,6 +4407,25 @@ impl Subquery { subquery: plan, outer_ref_columns: self.outer_ref_columns.clone(), spans: Spans::new(), + scalar_subquery_index: self.scalar_subquery_index, + } + } + + pub fn with_scalar_subquery_index(&self, index: SubqueryIndex) -> Subquery { + Subquery { + subquery: Arc::clone(&self.subquery), + outer_ref_columns: self.outer_ref_columns.clone(), + spans: self.spans.clone(), + scalar_subquery_index: Some(index), + } + } + + pub fn without_scalar_subquery_index(&self) -> Subquery { + Subquery { + subquery: Arc::clone(&self.subquery), + outer_ref_columns: self.outer_ref_columns.clone(), + spans: self.spans.clone(), + scalar_subquery_index: None, } } @@ -4401,7 +4440,7 @@ fn plan_contains_volatile(plan: &LogicalPlan) -> bool { plan.exists(|node| { let mut found = false; node.apply_expressions(|expr| { - if expr.is_volatile() { + if expr.is_volatile_including_subqueries() { found = true; Ok(TreeNodeRecursion::Stop) } else { diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index e0cdec9e2c088..48c1be47ebe75 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -154,11 +154,13 @@ impl TreeNode for LogicalPlan { subquery, outer_ref_columns, spans, + scalar_subquery_index, }) => subquery.map_elements(f)?.update_data(|subquery| { LogicalPlan::Subquery(Subquery { subquery, outer_ref_columns, spans, + scalar_subquery_index, }) }), LogicalPlan::SubqueryAlias(SubqueryAlias { diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 7b81feab47a99..279bd47f18cde 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -456,6 +456,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> { subquery, outer_ref_columns, spans, + scalar_subquery_index, }) => { let new_plan = analyze_internal(self.schema, Arc::unwrap_or_clone(subquery))?.data; @@ -463,6 +464,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> { subquery: Arc::new(new_plan), outer_ref_columns, spans, + scalar_subquery_index, }))) } Expr::Exists(Exists { subquery, negated }) => { @@ -476,6 +478,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> { subquery: Arc::new(new_plan), outer_ref_columns: subquery.outer_ref_columns, spans: subquery.spans, + scalar_subquery_index: subquery.scalar_subquery_index, }, negated, }))) @@ -501,6 +504,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> { subquery: Arc::new(new_plan), outer_ref_columns: subquery.outer_ref_columns, spans: subquery.spans, + scalar_subquery_index: subquery.scalar_subquery_index, }; Ok(Transformed::yes(Expr::InSubquery(InSubquery::new( Box::new(expr.cast_to(&common_type, self.schema)?), @@ -537,6 +541,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> { subquery: Arc::new(new_plan), outer_ref_columns: subquery.outer_ref_columns, spans: subquery.spans, + scalar_subquery_index: subquery.scalar_subquery_index, }; Ok(Transformed::yes(Expr::SetComparison(SetComparison::new( Box::new(expr.cast_to(&common_type, self.schema)?), @@ -2724,11 +2729,7 @@ mod test { let in_subquery_expr = Expr::InSubquery(InSubquery::new( Box::new(col("a")), - Subquery { - subquery: empty_int32, - outer_ref_columns: vec![], - spans: Spans::new(), - }, + Subquery::new(empty_int32, vec![], Spans::new()), false, )); let plan = LogicalPlan::Filter(Filter::try_new(in_subquery_expr, empty_int64)?); @@ -2753,11 +2754,7 @@ mod test { let in_subquery_expr = Expr::InSubquery(InSubquery::new( Box::new(col("a")), - Subquery { - subquery: empty_int64, - outer_ref_columns: vec![], - spans: Spans::new(), - }, + Subquery::new(empty_int64, vec![], Spans::new()), false, )); let plan = LogicalPlan::Filter(Filter::try_new(in_subquery_expr, empty_int32)?); @@ -2781,11 +2778,7 @@ mod test { let in_subquery_expr = Expr::InSubquery(InSubquery::new( Box::new(col("a")), - Subquery { - subquery: empty_inside, - outer_ref_columns: vec![], - spans: Spans::new(), - }, + Subquery::new(empty_inside, vec![], Spans::new()), false, )); let plan = LogicalPlan::Filter(Filter::try_new(in_subquery_expr, empty_outside)?); diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 5a54240d3a47b..0c1f81857aeaf 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -30,7 +30,9 @@ use datafusion_common::alias::AliasGenerator; use datafusion_common::cse::{CSE, CSEController, FoundCommonNodes}; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{Column, DFSchema, DFSchemaRef, Result, qualified_name}; -use datafusion_expr::expr::{Alias, HigherOrderFunction, ScalarFunction}; +use datafusion_expr::expr::{ + Alias, Exists, HigherOrderFunction, InSubquery, ScalarFunction, SetComparison, +}; use datafusion_expr::logical_plan::{ Aggregate, Filter, LogicalPlan, Projection, Sort, Window, }; @@ -699,10 +701,19 @@ impl CSEController for ExprCSEController<'_> { } fn is_valid(node: &Expr) -> bool { + let subquery_is_volatile = match node { + Expr::ScalarSubquery(subquery) + | Expr::Exists(Exists { subquery, .. }) + | Expr::InSubquery(InSubquery { subquery, .. }) + | Expr::SetComparison(SetComparison { subquery, .. }) => { + subquery.is_volatile() + } + _ => false, + }; + !node.is_volatile_node() + && !subquery_is_volatile && !matches!(node, Expr::Lambda(_) | Expr::LambdaVariable(_)) - // Volatile scalar subqueries must be evaluated per occurrence. - && !matches!(node, Expr::ScalarSubquery(sq) if sq.is_volatile()) } fn is_ignored(&self, node: &Expr) -> bool { diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 54878d2f542c0..8ae852d2b40aa 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -388,7 +388,9 @@ fn extract_or_clause( } } _ => { - if has_all_column_refs(expr, schema_columns) { + if !expr.is_volatile_including_subqueries() + && has_all_column_refs(expr, schema_columns) + { predicate = Some(expr.clone()); } } @@ -953,6 +955,18 @@ impl OptimizerRule for PushDownFilter { ))) } LogicalPlan::Union(mut union) => { + let (keep_predicates, push_predicates): (Vec<_>, Vec<_>) = + split_conjunction_owned(filter.predicate) + .into_iter() + .partition(|expr| expr.is_volatile_including_subqueries()); + + if push_predicates.is_empty() { + return Ok(Transformed::yes(with_filters( + keep_predicates, + LogicalPlan::Union(union), + ))); + } + let mut inputs = Vec::with_capacity(union.inputs.len()); for input in union.inputs { let mut replace_map = HashMap::new(); @@ -965,16 +979,25 @@ impl OptimizerRule for PushDownFilter { ); } - let push_predicate = - replace_cols_by_name(filter.predicate.clone(), &replace_map)?; - inputs.push(Arc::new(LogicalPlan::Filter(Filter::new( - push_predicate, - input, - )))) + let push_predicates = push_predicates + .iter() + .cloned() + .map(|predicate| replace_cols_by_name(predicate, &replace_map)) + .collect::>>()?; + let input = if let Some(push_predicate) = conjunction(push_predicates) + { + Arc::new(LogicalPlan::Filter(Filter::new(push_predicate, input))) + } else { + input + }; + inputs.push(input); } union.inputs = inputs; - Ok(Transformed::yes(LogicalPlan::Union(union))) + Ok(Transformed::yes(with_filters( + keep_predicates, + LogicalPlan::Union(union), + ))) } LogicalPlan::Aggregate(mut agg) => { // We can push down Predicate which in groupby_expr. @@ -1109,7 +1132,7 @@ impl OptimizerRule for PushDownFilter { let (volatile_filters, non_volatile_filters): (Vec<&Expr>, Vec<&Expr>) = pushdown_candidates .into_iter() - .partition(|pred| pred.is_volatile()); + .partition(|pred| pred.is_volatile_including_subqueries()); // Check which non-volatile filters are supported by source let supported_filters = scan @@ -1291,7 +1314,7 @@ fn rewrite_projection( (qualified_name(qualifier, field.name()), unalias(expr)) }) .partition(|(_, value)| { - value.is_volatile() + value.is_volatile_including_subqueries() || value.placement() == ExpressionPlacement::MoveTowardsLeafNodes }); diff --git a/datafusion/optimizer/src/rewrite_set_comparison.rs b/datafusion/optimizer/src/rewrite_set_comparison.rs index c8c35b518743a..2f62114f7aad7 100644 --- a/datafusion/optimizer/src/rewrite_set_comparison.rs +++ b/datafusion/optimizer/src/rewrite_set_comparison.rs @@ -150,6 +150,7 @@ fn exists_subquery(subquery: &Subquery, filter: Expr) -> Result { subquery: Arc::new(plan), outer_ref_columns, spans: subquery.spans.clone(), + scalar_subquery_index: subquery.scalar_subquery_index, }, negated: false, })) diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index 39c8541b51b2f..9239dec5e6bcd 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -854,7 +854,7 @@ impl TreeNodeRewriter for Simplifier<'_> { left, op: Eq, right, - }) if (left == right) & !left.is_volatile() => { + }) if (left == right) & !left.is_volatile_including_subqueries() => { Transformed::yes(match !info.nullable(&left)? { true => lit(true), false => Expr::BinaryExpr(BinaryExpr { @@ -974,7 +974,9 @@ impl TreeNodeRewriter for Simplifier<'_> { }) if has_common_conjunction(&left, &right) => { let lhs: IndexSet = iter_conjunction_owned(*left).collect(); let (common, rhs): (Vec<_>, Vec<_>) = iter_conjunction_owned(*right) - .partition(|e| lhs.contains(e) && !e.is_volatile()); + .partition(|e| { + lhs.contains(e) && !e.is_volatile_including_subqueries() + }); let new_rhs = rhs.into_iter().reduce(and); let new_lhs = lhs.into_iter().filter(|e| !common.contains(e)).reduce(and); @@ -2185,7 +2187,8 @@ impl<'a> StringScalar<'a> { #[allow(clippy::allow_attributes, clippy::mutable_key_type)] // Expr contains Arc with interior mutability but is intentionally used as hash key fn has_common_conjunction(lhs: &Expr, rhs: &Expr) -> bool { let lhs_set: HashSet<&Expr> = iter_conjunction(lhs).collect(); - iter_conjunction(rhs).any(|e| lhs_set.contains(&e) && !e.is_volatile()) + iter_conjunction(rhs) + .any(|e| lhs_set.contains(&e) && !e.is_volatile_including_subqueries()) } // TODO: We might not need this after defer pattern for Box is stabilized. https://github.com/rust-lang/rust/issues/87121 diff --git a/datafusion/optimizer/src/simplify_expressions/linear_aggregates.rs b/datafusion/optimizer/src/simplify_expressions/linear_aggregates.rs index 21389cf326c24..2dbdf81fb7b21 100644 --- a/datafusion/optimizer/src/simplify_expressions/linear_aggregates.rs +++ b/datafusion/optimizer/src/simplify_expressions/linear_aggregates.rs @@ -147,7 +147,7 @@ fn candidate_linear_param(params: &AggregateFunctionParams) -> Option<&Expr> { return None; } let arg = args.first()?; - if arg.is_volatile() { + if arg.is_volatile_including_subqueries() { return None; }; Some(arg) diff --git a/datafusion/optimizer/src/simplify_expressions/utils.rs b/datafusion/optimizer/src/simplify_expressions/utils.rs index b0908b47602f7..4e9824e078146 100644 --- a/datafusion/optimizer/src/simplify_expressions/utils.rs +++ b/datafusion/optimizer/src/simplify_expressions/utils.rs @@ -80,7 +80,8 @@ fn expr_contains_inner(expr: &Expr, needle: &Expr, search_op: Operator) -> bool /// check volatile calls and return if expr contains needle pub fn expr_contains(expr: &Expr, needle: &Expr, search_op: Operator) -> bool { - expr_contains_inner(expr, needle, search_op) && !needle.is_volatile() + expr_contains_inner(expr, needle, search_op) + && !needle.is_volatile_including_subqueries() } /// Deletes all 'needles' or remains one 'needle' that are found in a chain of xor @@ -224,7 +225,7 @@ pub fn is_false(expr: &Expr) -> bool { /// returns true if `haystack` looks like (needle OP X) or (X OP needle) pub fn is_op_with(target_op: Operator, haystack: &Expr, needle: &Expr) -> bool { - matches!(haystack, Expr::BinaryExpr(BinaryExpr { left, op, right }) if op == &target_op && (needle == left.as_ref() || needle == right.as_ref()) && !needle.is_volatile()) + matches!(haystack, Expr::BinaryExpr(BinaryExpr { left, op, right }) if op == &target_op && (needle == left.as_ref() || needle == right.as_ref()) && !needle.is_volatile_including_subqueries()) } pub fn can_reduce_to_equal_statement(haystack: &Expr, needle: &Expr) -> bool { diff --git a/datafusion/optimizer/src/unions_to_filter.rs b/datafusion/optimizer/src/unions_to_filter.rs index 158fd358287fe..069a5c59d04d9 100644 --- a/datafusion/optimizer/src/unions_to_filter.rs +++ b/datafusion/optimizer/src/unions_to_filter.rs @@ -319,7 +319,7 @@ fn align_plan_to_schema( } fn is_mergeable_predicate(expr: &Expr) -> bool { - !expr.is_volatile() && !expr_contains_subquery(expr) + !expr.is_volatile_including_subqueries() && !expr_contains_subquery(expr) } /// Returns `true` when every projection expression in `wrappers` is both @@ -333,7 +333,7 @@ fn wrapper_projections_are_safe(wrappers: &[Wrapper]) -> bool { wrappers.iter().all(|w| match w { Wrapper::Projection { expr, .. } => expr .iter() - .all(|e| !e.is_volatile() && !expr_contains_subquery(e)), + .all(|e| !e.is_volatile_including_subqueries() && !expr_contains_subquery(e)), Wrapper::SubqueryAlias { .. } => true, }) } diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index e23fbb8f600d8..88f92019ba609 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -32,7 +32,7 @@ use datafusion_common::{ DFSchema, Result, ScalarValue, TableReference, ToDFSchema, exec_err, internal_datafusion_err, not_impl_err, plan_datafusion_err, plan_err, }; -use datafusion_expr::execution_props::{ExecutionProps, SubqueryKey}; +use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::expr::{ Alias, Cast, HigherOrderFunction, InList, Lambda, LambdaVariable, Placeholder, ScalarFunction, @@ -403,8 +403,14 @@ pub fn create_physical_expr( } }, Expr::ScalarSubquery(sq) => { - match execution_props.subquery_indexes.get(&SubqueryKey::new(sq)) { - Some(&index) => { + let index = sq.scalar_subquery_index.or_else(|| { + execution_props + .subquery_indexes + .get(&sq.without_scalar_subquery_index()) + .copied() + }); + match index { + Some(index) => { let schema = sq.subquery.schema(); if schema.fields().len() != 1 { return plan_err!( diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index c68b83964f4cf..aba5df14920b5 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -692,11 +692,11 @@ fn parse_subquery( .ok_or_else(|| Error::required("SubqueryNode.subquery"))?; let plan = plan_node.try_into_logical_plan(ctx, codec)?; let outer_ref_columns = parse_exprs(&proto.outer_ref_columns, ctx, codec)?; - Ok(Subquery { - subquery: Arc::new(plan), + Ok(Subquery::new( + Arc::new(plan), outer_ref_columns, - spans: Default::default(), - }) + Default::default(), + )) } /// Parse a vector of `protobuf::LogicalExprNode`s. diff --git a/datafusion/sql/src/expr/subquery.rs b/datafusion/sql/src/expr/subquery.rs index 662c44f6f2620..313935272109a 100644 --- a/datafusion/sql/src/expr/subquery.rs +++ b/datafusion/sql/src/expr/subquery.rs @@ -36,11 +36,7 @@ impl SqlToRel<'_, S> { let outer_ref_columns = sub_plan.all_out_ref_exprs(); planner_context.pop_outer_query_schema(); Ok(Expr::Exists(Exists { - subquery: Subquery { - subquery: Arc::new(sub_plan), - outer_ref_columns, - spans: Spans::new(), - }, + subquery: Subquery::new(Arc::new(sub_plan), outer_ref_columns, Spans::new()), negated, })) } @@ -81,11 +77,7 @@ impl SqlToRel<'_, S> { Ok(Expr::InSubquery(InSubquery::new( Box::new(expr_obj), - Subquery { - subquery: Arc::new(sub_plan), - outer_ref_columns, - spans, - }, + Subquery::new(Arc::new(sub_plan), outer_ref_columns, spans), negated, ))) } @@ -118,11 +110,11 @@ impl SqlToRel<'_, S> { "Select only one column in the subquery", )?; - Ok(Expr::ScalarSubquery(Subquery { - subquery: Arc::new(sub_plan), + Ok(Expr::ScalarSubquery(Subquery::new( + Arc::new(sub_plan), outer_ref_columns, spans, - })) + ))) } fn validate_single_column( @@ -196,11 +188,7 @@ impl SqlToRel<'_, S> { let expr_obj = self.sql_to_expr(left_expr, input_schema, planner_context)?; Ok(Expr::SetComparison(SetComparison::new( Box::new(expr_obj), - Subquery { - subquery: Arc::new(sub_plan), - outer_ref_columns, - spans, - }, + Subquery::new(Arc::new(sub_plan), outer_ref_columns, spans), self.parse_sql_binary_op(compare_op)?, quantifier, ))) diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index 08a292475fd72..dc0d8af8b6dc7 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -347,19 +347,19 @@ impl SqlToRel<'_, S> { match plan { LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => { subquery_alias( - LogicalPlan::Subquery(Subquery { - subquery: input, + LogicalPlan::Subquery(Subquery::new( + input, outer_ref_columns, - spans: Spans::new(), - }), + Spans::new(), + )), alias, ) } - plan => Ok(LogicalPlan::Subquery(Subquery { - subquery: Arc::new(plan), + plan => Ok(LogicalPlan::Subquery(Subquery::new( + Arc::new(plan), outer_ref_columns, - spans: Spans::new(), - })), + Spans::new(), + ))), } } } diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index b4de240c65759..c2191ed47d2ba 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -2031,6 +2031,30 @@ physical_plan 05)--AggregateExec: mode=Single, gby=[], aggr=[max(sq_values.v)] 06)----DataSourceExec: partitions=1, partition_sizes=[1] +# Cross-node non-volatile scalar subqueries are deduplicated. +query TT +explain SELECT (SELECT max(v) FROM sq_values) AS r FROM (SELECT 1) t WHERE (SELECT max(v) FROM sq_values) IS NOT NULL; +---- +logical_plan +01)Projection: () AS r +02)--Subquery: +03)----Aggregate: groupBy=[[]], aggr=[[max(sq_values.v)]] +04)------TableScan: sq_values projection=[v] +05)--SubqueryAlias: t +06)----Filter: () IS NOT NULL +07)------Subquery: +08)--------Aggregate: groupBy=[[]], aggr=[[max(sq_values.v)]] +09)----------TableScan: sq_values projection=[v] +10)------EmptyRelation: rows=1 +physical_plan +01)ScalarSubqueryExec: subqueries=1 +02)--ProjectionExec: expr=[scalar_subquery() as r] +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +04)------FilterExec: scalar_subquery() IS NOT NULL +05)--------PlaceholderRowExec +06)--AggregateExec: mode=Single, gby=[], aggr=[max(sq_values.v)] +07)----DataSourceExec: partitions=1, partition_sizes=[1] + # Volatile scalar subqueries are evaluated per occurrence. query TT explain SELECT (SELECT random()) - (SELECT random()) AS diff; diff --git a/datafusion/substrait/src/logical_plan/consumer/expr/subquery.rs b/datafusion/substrait/src/logical_plan/consumer/expr/subquery.rs index 83cf8400eebfc..9dbcb4aee325f 100644 --- a/datafusion/substrait/src/logical_plan/consumer/expr/subquery.rs +++ b/datafusion/substrait/src/logical_plan/consumer/expr/subquery.rs @@ -69,11 +69,11 @@ pub async fn from_subquery( .consume_expression(needle_expr, input_schema) .await?, ), - subquery: Subquery { - subquery: Arc::new(haystack_expr), - outer_ref_columns: outer_refs, - spans: Spans::new(), - }, + subquery: Subquery::new( + Arc::new(haystack_expr), + outer_refs, + Spans::new(), + ), negated: false, })) } else { @@ -91,11 +91,11 @@ pub async fn from_subquery( ) .await?; let outer_ref_columns = plan.all_out_ref_exprs(); - Ok(Expr::ScalarSubquery(Subquery { - subquery: Arc::new(plan), + Ok(Expr::ScalarSubquery(Subquery::new( + Arc::new(plan), outer_ref_columns, - spans: Spans::new(), - })) + Spans::new(), + ))) } SubqueryType::SetPredicate(predicate) => { match predicate.predicate_op() { @@ -110,11 +110,11 @@ pub async fn from_subquery( .await?; let outer_ref_columns = plan.all_out_ref_exprs(); Ok(Expr::Exists(Exists::new( - Subquery { - subquery: Arc::new(plan), + Subquery::new( + Arc::new(plan), outer_ref_columns, - spans: Spans::new(), - }, + Spans::new(), + ), false, ))) } @@ -162,11 +162,7 @@ pub async fn from_subquery( Ok(Expr::SetComparison(SetComparison::new( Box::new(left_expr), - Subquery { - subquery: Arc::new(plan), - outer_ref_columns, - spans: Spans::new(), - }, + Subquery::new(Arc::new(plan), outer_ref_columns, Spans::new()), comparison_op, reduction_op, ))) diff --git a/datafusion/substrait/src/logical_plan/consumer/rel/mod.rs b/datafusion/substrait/src/logical_plan/consumer/rel/mod.rs index 038ada115b9d8..25844004c2d66 100644 --- a/datafusion/substrait/src/logical_plan/consumer/rel/mod.rs +++ b/datafusion/substrait/src/logical_plan/consumer/rel/mod.rs @@ -169,5 +169,7 @@ fn retrieve_emit_kind(rel_common: Option<&RelCommon>) -> EmitKind { } fn contains_volatile_expr(proj: &Projection) -> bool { - proj.expr.iter().any(|e| e.is_volatile()) + proj.expr + .iter() + .any(|e| e.is_volatile_including_subqueries()) } diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index 018e1aef80ea1..5bf4b5c190cb1 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -2391,11 +2391,7 @@ async fn build_set_comparison_plan( .build()?; let predicate = Expr::SetComparison(SetComparison::new( Box::new(col("data.a")), - Subquery { - subquery: Arc::new(subquery_plan), - outer_ref_columns: vec![], - spans: Spans::new(), - }, + Subquery::new(Arc::new(subquery_plan), vec![], Spans::new()), op, quantifier, )); @@ -2415,11 +2411,11 @@ async fn build_scalar_subquery_projection_plan( .limit(0, Some(1))? .build()?; - let scalar_subquery = Expr::ScalarSubquery(Subquery { - subquery: Arc::new(subquery_plan), - outer_ref_columns: vec![], - spans: Spans::new(), - }); + let scalar_subquery = Expr::ScalarSubquery(Subquery::new( + Arc::new(subquery_plan), + vec![], + Spans::new(), + )); let outer_empty_relation = LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: true, @@ -2442,11 +2438,7 @@ async fn build_exists_filter_plan( .build()?; let predicate = Expr::Exists(Exists::new( - Subquery { - subquery: Arc::new(subquery_plan), - outer_ref_columns: vec![], - spans: Spans::new(), - }, + Subquery::new(Arc::new(subquery_plan), vec![], Spans::new()), negated, )); diff --git a/docs/source/library-user-guide/upgrading/54.0.0.md b/docs/source/library-user-guide/upgrading/54.0.0.md index 8e69eccdbeb46..8006c4a713bec 100644 --- a/docs/source/library-user-guide/upgrading/54.0.0.md +++ b/docs/source/library-user-guide/upgrading/54.0.0.md @@ -399,7 +399,9 @@ boundary with `PhysicalPlanDecodeContext::new(task_ctx, codec)`. `ExecutionProps` gained new public fields. Code that constructs it via a struct literal, or pattern-matches it without `..`, no longer compiles. Use `ExecutionProps::new()` and include `..` in exhaustive patterns. -The `subquery_indexes` field is keyed by `SubqueryKey`. +`Subquery` gained a `scalar_subquery_index` field used during physical +planning. Use `Subquery::new(...)` when constructing unplanned subqueries, +and include `..` in exhaustive `Subquery` patterns. ### Items in `datafusion_functions::strings` are no longer public From a415cc1e023a7e2f1d757f1831f8389d5449091b Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Thu, 4 Jun 2026 12:23:44 +0530 Subject: [PATCH 3/4] fix regression on extension-node --- datafusion/core/src/physical_planner.rs | 237 ++++++++++++++++++++---- 1 file changed, 203 insertions(+), 34 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index d47abd3919a54..4feed45987fc7 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -394,43 +394,118 @@ impl DefaultPhysicalPlanner { fn assign_scalar_subquery_indexes( plan: &LogicalPlan, ) -> Result<(LogicalPlan, Vec)> { - let mut dedup: IndexMap = IndexMap::new(); - let mut subqueries = Vec::new(); + fn register_subquery( + sq: &Subquery, + dedup: &mut IndexMap, + subqueries: &mut Vec, + ) -> Option { + if !sq.outer_ref_columns.is_empty() { + return None; + } - let plan = plan - .clone() - .transform_down(|node| { - node.map_expressions(|expr| { - expr.transform_down(|e| { - if let Expr::ScalarSubquery(ref sq) = e - && sq.outer_ref_columns.is_empty() - { - let volatile = sq.is_volatile(); - let dedup_key = sq.clone(); - let (index, is_new) = if volatile { - (SubqueryIndex::new(subqueries.len()), true) - } else if let Some(index) = dedup.get(&dedup_key) { - (*index, false) - } else { - (SubqueryIndex::new(subqueries.len()), true) - }; + let dedup_key = sq.without_scalar_subquery_index(); + if let Some(index) = dedup.get(&dedup_key) { + return Some(*index); + } - let sq = sq.with_scalar_subquery_index(index); - if is_new { - if !volatile { - dedup.insert(dedup_key, index); - } - subqueries.push(sq.clone()); - } + let index = SubqueryIndex::new(subqueries.len()); + dedup.insert(dedup_key, index); + // This helper owns registration for non-volatile subqueries: + // callers must not push the returned index again. + subqueries.push(sq.with_scalar_subquery_index(index)); + Some(index) + } - Ok(Transformed::yes(Expr::ScalarSubquery(sq))) - } else { - Ok(Transformed::no(e)) - } - }) - }) - })? - .data; + fn collect_extension_subqueries( + plan: &LogicalPlan, + dedup: &mut IndexMap, + subqueries: &mut Vec, + ) -> Result<()> { + for expr in plan.expressions() { + expr.apply(|e| { + if let Expr::ScalarSubquery(sq) = e + && !sq.is_volatile() + { + register_subquery(sq, dedup, subqueries); + } + Ok(TreeNodeRecursion::Continue) + })?; + } + + for input in plan.inputs() { + collect_extension_subqueries(input, dedup, subqueries)?; + } + + Ok(()) + } + + fn assign_expr_indexes( + expr: Expr, + dedup: &mut IndexMap, + subqueries: &mut Vec, + ) -> Result> { + expr.transform_down(|e| { + if let Expr::ScalarSubquery(ref sq) = e { + let volatile = sq.is_volatile(); + let index = if volatile && sq.outer_ref_columns.is_empty() { + (SubqueryIndex::new(subqueries.len()), true) + } else if let Some(index) = register_subquery(sq, dedup, subqueries) { + (index, false) + } else { + return Ok(Transformed::no(e)); + }; + + let (index, is_new) = index; + let sq = sq.with_scalar_subquery_index(index); + if is_new { + subqueries.push(sq.clone()); + } + + Ok(Transformed::yes(Expr::ScalarSubquery(sq))) + } else { + Ok(Transformed::no(e)) + } + }) + } + + fn assign_plan_indexes( + plan: LogicalPlan, + dedup: &mut IndexMap, + subqueries: &mut Vec, + ) -> Result { + let plan = match plan { + // Some extension nodes do not support reconstruction via + // `with_exprs_and_inputs`. Read them to preserve non-volatile + // subquery planning, but leave them untouched for their + // extension planner. Volatile subqueries in extension nodes + // are not planned because they need per-occurrence indexes. + LogicalPlan::Extension(_) => { + collect_extension_subqueries(&plan, dedup, subqueries)?; + plan + } + _ => { + plan.map_expressions(|expr| { + assign_expr_indexes(expr, dedup, subqueries) + })? + .data + } + }; + + match plan { + LogicalPlan::Extension(_) => Ok(plan), + _ => Ok(plan + .map_children(|child| { + assign_plan_indexes(child, dedup, subqueries) + .map(Transformed::yes) + })? + .data), + } + } + + let mut dedup: IndexMap = IndexMap::new(); + let mut subqueries = Vec::new(); + + let plan = assign_plan_indexes(plan.clone(), &mut dedup, &mut subqueries)?; Ok((plan, subqueries)) } @@ -4539,6 +4614,100 @@ mod tests { } } + /// Extension node that passes through its input but does not support + /// optimizer-style reconstruction. + #[derive(Debug, PartialEq, Eq, Hash)] + struct PassthroughExtensionNode { + input: LogicalPlan, + schema: DFSchemaRef, + } + + impl PartialOrd for PassthroughExtensionNode { + fn partial_cmp(&self, _other: &Self) -> Option { + None + } + } + + impl UserDefinedLogicalNodeCore for PassthroughExtensionNode { + fn name(&self) -> &str { + "Passthrough" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![&self.input] + } + + fn schema(&self) -> &DFSchemaRef { + &self.schema + } + + fn expressions(&self) -> Vec { + vec![] + } + + fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Passthrough") + } + + fn with_exprs_and_inputs( + &self, + _exprs: Vec, + _inputs: Vec, + ) -> Result { + unimplemented!("Passthrough"); + } + } + + struct PassthroughExtensionPlanner; + + #[async_trait] + impl ExtensionPlanner for PassthroughExtensionPlanner { + async fn plan_extension( + &self, + _planner: &dyn PhysicalPlanner, + node: &dyn UserDefinedLogicalNode, + _logical_inputs: &[&LogicalPlan], + physical_inputs: &[Arc], + _session_state: &SessionState, + ) -> Result>> { + if node.as_any().is::() { + Ok(Some(Arc::clone(&physical_inputs[0]))) + } else { + Ok(None) + } + } + } + + #[tokio::test] + async fn scalar_subquery_below_extension_plans() -> Result<()> { + let session_state = make_session_state(); + let planner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new( + PassthroughExtensionPlanner, + )]); + + let subquery_plan = LogicalPlanBuilder::empty(true) + .project(vec![lit(1i64)])? + .build()?; + let input = LogicalPlanBuilder::empty(true) + .project(vec![ + datafusion_expr::expr_fn::scalar_subquery(Arc::new(subquery_plan)) + .alias("sq"), + ])? + .build()?; + let logical_plan = LogicalPlan::Extension(Extension { + node: Arc::new(PassthroughExtensionNode { + schema: input.schema().clone(), + input, + }), + }); + + let plan = planner + .create_physical_plan(&logical_plan, &session_state) + .await?; + assert!(format!("{plan:?}").contains("ScalarSubqueryExec")); + Ok(()) + } + // Produces an execution plan where the schema is mismatched from // the logical plan node. struct BadExtensionPlanner {} From efa35ade1b79ba9fde0f2ed9b09671873a60cc7b Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Fri, 5 Jun 2026 09:21:28 +0530 Subject: [PATCH 4/4] make volatile exp public --- datafusion/expr/src/logical_plan/plan.rs | 46 ++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 3 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 0f460507ace27..667f0f8392086 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -420,6 +420,15 @@ impl LogicalPlan { exprs } + /// Returns `true` if any expression in this plan contains a volatile + /// function. + /// + /// This descends into subquery expressions, but does not model volatility + /// of table providers or external state. + pub fn contains_volatile_expr(&self) -> bool { + plan_contains_volatile(self) + } + /// Returns all the out reference(correlated) expressions (recursively) in the current /// logical plan nodes and all its descendant nodes. pub fn all_out_ref_exprs(self: &LogicalPlan) -> Vec { @@ -4431,7 +4440,7 @@ impl Subquery { /// Returns `true` if this subquery's plan contains a volatile expression. pub fn is_volatile(&self) -> bool { - plan_contains_volatile(&self.subquery) + self.subquery.contains_volatile_expr() } } @@ -4790,8 +4799,8 @@ mod tests { use crate::select_expr::SelectExpr; use crate::test::function_stub::{count, count_udaf}; use crate::{ - GroupingSet, binary_expr, col, exists, in_subquery, lit, placeholder, - scalar_subquery, + ColumnarValue, GroupingSet, Volatility, binary_expr, col, create_udf, exists, + in_subquery, lit, placeholder, scalar_subquery, }; use datafusion_common::metadata::ScalarAndMetadata; use datafusion_common::tree_node::{ @@ -4827,6 +4836,37 @@ mod tests { )) } + fn volatile_expr() -> Expr { + create_udf( + "volatile_udf", + vec![], + DataType::Int64, + Volatility::Volatile, + Arc::new(|_| Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(0))))), + ) + .call(vec![]) + } + + #[test] + fn contains_volatile_expr_detects_plan_expressions() -> Result<()> { + let stable_plan = LogicalPlanBuilder::empty(true) + .project(vec![lit(1i64)])? + .build()?; + assert!(!stable_plan.contains_volatile_expr()); + + let volatile_plan = LogicalPlanBuilder::empty(true) + .project(vec![volatile_expr()])? + .build()?; + assert!(volatile_plan.contains_volatile_expr()); + + let nested_plan = LogicalPlanBuilder::empty(true) + .project(vec![scalar_subquery(Arc::new(volatile_plan))])? + .build()?; + assert!(nested_plan.contains_volatile_expr()); + + Ok(()) + } + #[test] fn recursive_query_widens_nullability_per_column() -> Result<()> { // Column `a` is non-nullable in both terms and must stay non-nullable;