Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 234 additions & 31 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ use datafusion_physical_plan::unnest::ListUnnest;
use async_trait::async_trait;
use datafusion_physical_plan::async_func::{AsyncFuncExec, AsyncMapper};
use futures::{StreamExt, TryStreamExt};
use indexmap::IndexSet;
use indexmap::IndexMap;
use itertools::{Itertools, multiunzip};
use log::debug;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -386,28 +386,128 @@ impl DefaultPhysicalPlanner {
Ok(())
}

/// Collect uncorrelated scalar subqueries. We don't descend into nested
/// subqueries here: each call to `create_initial_plan` handles subqueries
/// at its level and then recurses in order to handle nested subqueries.
/// Assign indexes to uncorrelated scalar subqueries at this plan level.
///
/// Non-volatile subqueries reuse an index by structure. Volatile subqueries
/// always get a fresh index.
#[allow(clippy::allow_attributes, clippy::mutable_key_type)] // Subquery contains Arc with interior mutability but is intentionally used as hash key
fn collect_scalar_subqueries(plan: &LogicalPlan) -> Vec<Subquery> {
let mut subqueries = IndexSet::new();
plan.apply(|node| {
for expr in node.expressions() {
fn assign_scalar_subquery_indexes(
plan: &LogicalPlan,
) -> Result<(LogicalPlan, Vec<Subquery>)> {
fn register_subquery(
sq: &Subquery,
dedup: &mut IndexMap<Subquery, SubqueryIndex>,
subqueries: &mut Vec<Subquery>,
) -> Option<SubqueryIndex> {
if !sq.outer_ref_columns.is_empty() {
return None;
}

let dedup_key = sq.without_scalar_subquery_index();
if let Some(index) = dedup.get(&dedup_key) {
return Some(*index);
}

let index = SubqueryIndex::new(subqueries.len());
dedup.insert(dedup_key, index);
// This helper owns registration for non-volatile subqueries:
// callers must not push the returned index again.
subqueries.push(sq.with_scalar_subquery_index(index));
Some(index)
}

fn collect_extension_subqueries(
plan: &LogicalPlan,
dedup: &mut IndexMap<Subquery, SubqueryIndex>,
subqueries: &mut Vec<Subquery>,
) -> Result<()> {
for expr in plan.expressions() {
expr.apply(|e| {
if let Expr::ScalarSubquery(sq) = e
&& sq.outer_ref_columns.is_empty()
&& !sq.is_volatile()
{
subqueries.insert(sq.clone());
register_subquery(sq, dedup, subqueries);
}
Ok(TreeNodeRecursion::Continue)
})
.expect("infallible");
})?;
}
Ok(TreeNodeRecursion::Continue)
})
.expect("infallible");
subqueries.into_iter().collect()

for input in plan.inputs() {
collect_extension_subqueries(input, dedup, subqueries)?;
}

Ok(())
}

fn assign_expr_indexes(
expr: Expr,
dedup: &mut IndexMap<Subquery, SubqueryIndex>,
subqueries: &mut Vec<Subquery>,
) -> Result<Transformed<Expr>> {
expr.transform_down(|e| {
if let Expr::ScalarSubquery(ref sq) = e {
let volatile = sq.is_volatile();
let index = if volatile && sq.outer_ref_columns.is_empty() {
(SubqueryIndex::new(subqueries.len()), true)
} else if let Some(index) = register_subquery(sq, dedup, subqueries) {
(index, false)
} else {
return Ok(Transformed::no(e));
};

let (index, is_new) = index;
let sq = sq.with_scalar_subquery_index(index);
if is_new {
subqueries.push(sq.clone());
}

Ok(Transformed::yes(Expr::ScalarSubquery(sq)))
} else {
Ok(Transformed::no(e))
}
})
}

fn assign_plan_indexes(
plan: LogicalPlan,
dedup: &mut IndexMap<Subquery, SubqueryIndex>,
subqueries: &mut Vec<Subquery>,
) -> Result<LogicalPlan> {
let plan = match plan {
// Some extension nodes do not support reconstruction via
// `with_exprs_and_inputs`. Read them to preserve non-volatile
// subquery planning, but leave them untouched for their
// extension planner. Volatile subqueries in extension nodes
// are not planned because they need per-occurrence indexes.
LogicalPlan::Extension(_) => {
collect_extension_subqueries(&plan, dedup, subqueries)?;
plan
}
_ => {
plan.map_expressions(|expr| {
assign_expr_indexes(expr, dedup, subqueries)
})?
.data
}
};

match plan {
LogicalPlan::Extension(_) => Ok(plan),
_ => Ok(plan
.map_children(|child| {
assign_plan_indexes(child, dedup, subqueries)
.map(Transformed::yes)
})?
.data),
}
}

let mut dedup: IndexMap<Subquery, SubqueryIndex> = IndexMap::new();
let mut subqueries = Vec::new();

let plan = assign_plan_indexes(plan.clone(), &mut dedup, &mut subqueries)?;

Ok((plan, subqueries))
}

/// Create a physical plan from a logical plan.
Expand Down Expand Up @@ -442,22 +542,26 @@ impl DefaultPhysicalPlanner {
// scalar subqueries to joins, so none should reach this point.
// Skip collection in that case to avoid creating a no-op
// `ScalarSubqueryExec` wrapper.
let all_subqueries = if session_state
if !session_state
.config_options()
.optimizer
.enable_physical_uncorrelated_scalar_subquery
{
Self::collect_scalar_subqueries(logical_plan)
} else {
Vec::new()
};
return self
.create_initial_plan_inner(logical_plan, session_state)
.await;
}

let (logical_plan, all_subqueries) =
Self::assign_scalar_subquery_indexes(logical_plan)?;

let (links, index_map) = self
.plan_scalar_subqueries(all_subqueries, session_state)
.await?;

if links.is_empty() {
return self
.create_initial_plan_inner(logical_plan, session_state)
.create_initial_plan_inner(&logical_plan, session_state)
.await;
}

Expand All @@ -477,7 +581,7 @@ impl DefaultPhysicalPlanner {
let session_state = Cow::Owned(owned);

let plan = self
.create_initial_plan_inner(logical_plan, &session_state)
.create_initial_plan_inner(&logical_plan, &session_state)
.await?;
Ok(Arc::new(ScalarSubqueryExec::new(plan, links, results)))
})
Expand Down Expand Up @@ -2931,30 +3035,35 @@ impl DefaultPhysicalPlanner {
Ok(mem_exec)
}

/// Build physical plans for scalar subqueries and assign each an ordinal
/// `SubqueryIndex`. Returns the links (plan + index) and a map from logical
/// `Subquery` to its index.
/// Build physical plans for scalar subqueries and return their links plus
/// an index map for direct physical expression planning of non-volatile
/// subqueries. Volatile subqueries are only resolved by their assigned
/// occurrence index so unindexed copies cannot accidentally share a value.
async fn plan_scalar_subqueries(
&self,
subqueries: Vec<Subquery>,
session_state: &SessionState,
) -> Result<(Vec<ScalarSubqueryLink>, DFHashMap<Subquery, SubqueryIndex>)> {
let mut links = Vec::with_capacity(subqueries.len());
let mut index_map = DFHashMap::with_capacity(subqueries.len());
let mut indexes = HashSet::with_capacity(subqueries.len());
for sq in subqueries {
// Callers deduplicate, but guard against accidental double-planning.
if index_map.contains_key(&sq) {
continue;
let Some(index) = sq.scalar_subquery_index else {
return internal_err!("Scalar subquery missing planner-assigned index");
};
if !indexes.insert(index) {
return internal_err!("Duplicate scalar subquery index {index:?}");
}
let physical_plan = self
.create_initial_plan(&sq.subquery, session_state)
.await?;
let index = SubqueryIndex::new(links.len());
links.push(ScalarSubqueryLink {
plan: physical_plan,
index,
});
index_map.insert(sq, index);
if !sq.is_volatile() {
index_map.insert(sq.without_scalar_subquery_index(), index);
}
}
Ok((links, index_map))
}
Expand Down Expand Up @@ -4505,6 +4614,100 @@ mod tests {
}
}

/// Extension node that passes through its input but does not support
/// optimizer-style reconstruction.
#[derive(Debug, PartialEq, Eq, Hash)]
struct PassthroughExtensionNode {
input: LogicalPlan,
schema: DFSchemaRef,
}

impl PartialOrd for PassthroughExtensionNode {
fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
None
}
}

impl UserDefinedLogicalNodeCore for PassthroughExtensionNode {
fn name(&self) -> &str {
"Passthrough"
}

fn inputs(&self) -> Vec<&LogicalPlan> {
vec![&self.input]
}

fn schema(&self) -> &DFSchemaRef {
&self.schema
}

fn expressions(&self) -> Vec<Expr> {
vec![]
}

fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Passthrough")
}

fn with_exprs_and_inputs(
&self,
_exprs: Vec<Expr>,
_inputs: Vec<LogicalPlan>,
) -> Result<Self> {
unimplemented!("Passthrough");
}
}

struct PassthroughExtensionPlanner;

#[async_trait]
impl ExtensionPlanner for PassthroughExtensionPlanner {
async fn plan_extension(
&self,
_planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
_logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
_session_state: &SessionState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if node.as_any().is::<PassthroughExtensionNode>() {
Ok(Some(Arc::clone(&physical_inputs[0])))
} else {
Ok(None)
}
}
}

#[tokio::test]
async fn scalar_subquery_below_extension_plans() -> Result<()> {
let session_state = make_session_state();
let planner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(
PassthroughExtensionPlanner,
)]);

let subquery_plan = LogicalPlanBuilder::empty(true)
.project(vec![lit(1i64)])?
.build()?;
let input = LogicalPlanBuilder::empty(true)
.project(vec![
datafusion_expr::expr_fn::scalar_subquery(Arc::new(subquery_plan))
.alias("sq"),
])?
.build()?;
let logical_plan = LogicalPlan::Extension(Extension {
node: Arc::new(PassthroughExtensionNode {
schema: input.schema().clone(),
input,
}),
});

let plan = planner
.create_physical_plan(&logical_plan, &session_state)
.await?;
assert!(format!("{plan:?}").contains("ScalarSubqueryExec"));
Ok(())
}

// Produces an execution plan where the schema is mismatched from
// the logical plan node.
struct BadExtensionPlanner {}
Expand Down
Loading
Loading