From cdc22b90d762dcaa358a0ab8baad442120be33b8 Mon Sep 17 00:00:00 2001 From: Denis Semenov Date: Thu, 5 Feb 2026 11:35:30 +0300 Subject: [PATCH] feat: add ValuesSource datasource This commit introduces `ValuesSource`, a new data source specifically designed to handle constant values that may contain placeholders. It replaces the previous `MemorySourceConfig::try_new_as_values` implementation, providing the necessary infrastructure to resolve physical placeholders during execution while maintaining performance for constant-only data by falling back to the standard memory source. --- datafusion/core/src/physical_planner.rs | 4 +- datafusion/datasource/src/memory.rs | 107 +--- datafusion/datasource/src/mod.rs | 1 + datafusion/datasource/src/values.rs | 557 ++++++++++++++++++ datafusion/proto/proto/datafusion.proto | 6 + datafusion/proto/src/generated/pbjson.rs | 122 ++++ datafusion/proto/src/generated/prost.rs | 11 +- datafusion/proto/src/physical_plan/mod.rs | 72 +++ .../tests/cases/roundtrip_physical_plan.rs | 11 + .../sqllogictest/test_files/placeholders.slt | 14 + 10 files changed, 807 insertions(+), 98 deletions(-) create mode 100644 datafusion/datasource/src/values.rs diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index c3c82087cbddd..54c5867c1804a 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -77,6 +77,7 @@ use datafusion_common::{ }; use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::memory::MemorySourceConfig; +use datafusion_datasource::values::ValuesSource; use datafusion_expr::dml::{CopyTo, InsertOp}; use datafusion_expr::expr::{ AggregateFunction, AggregateFunctionParams, Alias, GroupingSet, NullTreatment, @@ -486,8 +487,7 @@ impl DefaultPhysicalPlanner { .collect::>>>() }) .collect::>>()?; - MemorySourceConfig::try_new_as_values(Arc::clone(schema.inner()), exprs)? - as _ + ValuesSource::try_new_exec(Arc::clone(schema.inner()), exprs)? } LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index 1d12bb3200309..ee60f022c1e50 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -27,11 +27,9 @@ use std::sync::Arc; use crate::sink::DataSink; use crate::source::{DataSource, DataSourceExec}; -use arrow::array::{RecordBatch, RecordBatchOptions}; -use arrow::datatypes::{Schema, SchemaRef}; -use datafusion_common::{ - Result, ScalarValue, assert_or_internal_err, plan_err, project_schema, -}; +use arrow::array::RecordBatch; +use arrow::datatypes::SchemaRef; +use datafusion_common::{Result, assert_or_internal_err, plan_err, project_schema}; use datafusion_execution::TaskContext; use datafusion_physical_expr::equivalence::project_orderings; use datafusion_physical_expr::projection::ProjectionExprs; @@ -42,8 +40,8 @@ use datafusion_physical_plan::projection::{ all_alias_free_columns, new_projections_for_columns, }; use datafusion_physical_plan::{ - ColumnarValue, DisplayAs, DisplayFormatType, Partitioning, PhysicalExpr, - SendableRecordBatchStream, Statistics, common, + DisplayAs, DisplayFormatType, Partitioning, SendableRecordBatchStream, Statistics, + common, }; use async_trait::async_trait; @@ -285,61 +283,6 @@ impl MemorySourceConfig { Ok(DataSourceExec::from_data_source(source)) } - /// Create a new execution plan from a list of constant values (`ValuesExec`) - #[expect(clippy::needless_pass_by_value)] - pub fn try_new_as_values( - schema: SchemaRef, - data: Vec>>, - ) -> Result> { - if data.is_empty() { - return plan_err!("Values list cannot be empty"); - } - - let n_row = data.len(); - let n_col = schema.fields().len(); - - // We have this single row batch as a placeholder to satisfy evaluation argument - // and generate a single output row - let placeholder_schema = Arc::new(Schema::empty()); - let placeholder_batch = RecordBatch::try_new_with_options( - Arc::clone(&placeholder_schema), - vec![], - &RecordBatchOptions::new().with_row_count(Some(1)), - )?; - - // Evaluate each column - let arrays = (0..n_col) - .map(|j| { - (0..n_row) - .map(|i| { - let expr = &data[i][j]; - let result = expr.evaluate(&placeholder_batch)?; - - match result { - ColumnarValue::Scalar(scalar) => Ok(scalar), - ColumnarValue::Array(array) if array.len() == 1 => { - ScalarValue::try_from_array(&array, 0) - } - ColumnarValue::Array(_) => { - plan_err!("Cannot have array values in a values list") - } - } - }) - .collect::>>() - .and_then(ScalarValue::iter_to_array) - }) - .collect::>>()?; - - let batch = RecordBatch::try_new_with_options( - Arc::clone(&schema), - arrays, - &RecordBatchOptions::new().with_row_count(Some(n_row)), - )?; - - let partitions = vec![batch]; - Self::try_new_from_batches(Arc::clone(&schema), partitions) - } - /// Create a new plan using the provided schema and batches. /// /// Errors if any of the batches don't match the provided schema, or if no @@ -845,12 +788,13 @@ mod memory_source_tests { mod tests { use super::*; use crate::test_util::col; - use crate::tests::{aggr_test_schema, make_partition}; + use crate::tests::make_partition; + use crate::values::ValuesSource; use arrow::array::{ArrayRef, Int32Array, Int64Array, StringArray}; - use arrow::datatypes::{DataType, Field}; - use datafusion_common::assert_batches_eq; + use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::stats::{ColumnStatistics, Precision}; + use datafusion_common::{ScalarValue, assert_batches_eq}; use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::lit; @@ -883,14 +827,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn values_empty_case() -> Result<()> { - let schema = aggr_test_schema(); - let empty = MemorySourceConfig::try_new_as_values(schema, vec![]); - assert!(empty.is_err()); - Ok(()) - } - #[test] fn new_exec_with_batches() { let batch = make_partition(7); @@ -919,27 +855,6 @@ mod tests { .unwrap_err(); } - // Test issue: https://github.com/apache/datafusion/issues/8763 - #[test] - fn new_exec_with_non_nullable_schema() { - let schema = Arc::new(Schema::new(vec![Field::new( - "col0", - DataType::UInt32, - false, - )])); - let _ = MemorySourceConfig::try_new_as_values( - Arc::clone(&schema), - vec![vec![lit(1u32)]], - ) - .unwrap(); - // Test that a null value is rejected - let _ = MemorySourceConfig::try_new_as_values( - schema, - vec![vec![lit(ScalarValue::UInt32(None))]], - ) - .unwrap_err(); - } - #[test] fn values_stats_with_nulls_only() -> Result<()> { let data = vec![ @@ -950,7 +865,9 @@ mod tests { let rows = data.len(); let schema = Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])); - let values = MemorySourceConfig::try_new_as_values(schema, data)?; + + let values = ValuesSource::try_new_exec(schema, data)?; + assert!(values.data_source().as_any().is::()); assert_eq!( values.partition_statistics(None)?, diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index f80c9cb0b0daa..8c22531b7c7c0 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -44,6 +44,7 @@ pub mod sink; pub mod source; mod statistics; pub mod table_schema; +pub mod values; #[cfg(test)] pub mod test_util; diff --git a/datafusion/datasource/src/values.rs b/datafusion/datasource/src/values.rs new file mode 100644 index 0000000000000..bea1f871d76a2 --- /dev/null +++ b/datafusion/datasource/src/values.rs @@ -0,0 +1,557 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Values source for reading constant values or expressions with placeholders. +//! +//! This module provides the [`ValuesSource`] struct, which can be used to read values that may +//! contain placeholders. + +use std::{ + any::Any, + fmt::Formatter, + sync::{Arc, LazyLock}, +}; + +use arrow::{ + array::{RecordBatch, RecordBatchOptions}, + datatypes::{Schema, SchemaRef}, +}; +use datafusion_common::{ + Result, ScalarValue, Statistics, assert_eq_or_internal_err, exec_err, plan_err, +}; +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_expr::ColumnarValue; +use datafusion_physical_expr::{ + EquivalenceProperties, Partitioning, PhysicalExpr, + expressions::{has_placeholders, lit}, +}; +use datafusion_physical_plan::{ + DisplayFormatType, common::compute_record_batch_statistics, coop::cooperative, + execution_plan::ReplacePhysicalExpr, memory::MemoryStream, +}; + +use crate::{ + memory::MemorySourceConfig, + source::{DataSource, DataSourceExec}, +}; + +/// Information about a record with placeholders. +#[derive(Clone, Debug)] +struct RecordWithPlaceholders { + /// The physical expression. + pub expr: Arc, + /// The row index. + pub row: usize, + /// The column index. + pub column: usize, +} + +/// A data source for reading values that may contain placeholders. +/// +/// This source is used when the values contain placeholders that need to be resolved at execution +/// time. If all values are constant, [`MemorySourceConfig`] is typically used instead. +#[derive(Clone, Debug)] +pub struct ValuesSource { + /// The schema of the values. + schema: SchemaRef, + /// The record batch containing the values. + batch: RecordBatch, + /// Positions of rows that contain placeholders. + records_with_placeholders: Vec, +} + +impl ValuesSource { + /// Create a new [`ValuesSource`] from the provided schema and data. + #[expect(clippy::needless_pass_by_value)] + fn try_new(schema: SchemaRef, data: Vec>>) -> Result { + if data.is_empty() { + return plan_err!("Values list cannot be empty"); + } + + let n_row = data.len(); + let n_col = schema.fields().len(); + + let mut records_with_placeholders = Vec::new(); + + // Evaluate each column + let arrays = (0..n_col) + .map(|j| { + (0..n_row) + .map(|i| { + let expr = &data[i][j]; + if has_placeholders(expr) { + let record = RecordWithPlaceholders { + expr: Arc::clone(expr), + row: i, + column: j, + }; + + records_with_placeholders.push(record); + + let data_type = schema.field(j).data_type(); + return ScalarValue::new_default(data_type); + } + + evaluate_to_scalar(expr.as_ref()) + }) + .collect::>>() + .and_then(ScalarValue::iter_to_array) + }) + .collect::>>()?; + + let batch = RecordBatch::try_new_with_options( + Arc::clone(&schema), + arrays, + &RecordBatchOptions::new().with_row_count(Some(n_row)), + )?; + + Ok(Self { + batch, + records_with_placeholders, + schema, + }) + } + + /// Create a new execution plan from a list of values. + /// + /// If the values contain placeholders, a [`ValuesSource`] is used which will resolve them at + /// execution time. Otherwise, [`MemorySourceConfig`] will be used as it is more efficient for + /// constant values. + pub fn try_new_exec( + schema: SchemaRef, + data: Vec>>, + ) -> Result> { + let source = Self::try_new(schema, data)?; + if source.records_with_placeholders.is_empty() { + MemorySourceConfig::try_new_from_batches(source.schema, vec![source.batch]) + } else { + Ok(DataSourceExec::from_data_source(source)) + } + } + + /// Returns the schema. + pub fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + /// Returns the physical expressions for each value in the source. + /// + /// For values that do not contain placeholders, the expression will be a literal. + /// For values that contain placeholders, the original expression is returned. + pub fn expressions(&self) -> Vec>> { + let columns = self.batch.columns(); + let columns_len = columns.len(); + let rows = columns.first().map(|c| c.len()).unwrap_or(0); + let mut exprs = Vec::with_capacity(rows); + + for row in 0..rows { + let mut column_exprs = Vec::with_capacity(columns_len); + for column in columns { + let scalar = ScalarValue::try_from_array(&column, row) + .expect("should build scalar"); + + column_exprs.push(lit(scalar)); + } + exprs.push(column_exprs); + } + + for placeholder in self.records_with_placeholders.iter() { + exprs[placeholder.row][placeholder.column] = Arc::clone(&placeholder.expr); + } + + exprs + } +} + +impl DataSource for ValuesSource { + fn open( + &self, + partition: usize, + _context: Arc, + ) -> Result { + if partition > 0 { + return exec_err!("ValuesSource only supports a single partition"); + } + + if let Some(record) = self.records_with_placeholders.first() { + // Return an error, if placeholders are not resolved. + evaluate_to_scalar(record.expr.as_ref())?; + } + + Ok(Box::pin(cooperative(MemoryStream::try_new( + vec![self.batch.clone()], + Arc::clone(&self.schema), + None, + )?))) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "placeholders={}", self.records_with_placeholders.len()) + } + DisplayFormatType::TreeRender => Ok(()), + } + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + fn eq_properties(&self) -> EquivalenceProperties { + EquivalenceProperties::new(Arc::clone(&self.schema)) + } + + fn partition_statistics(&self, _partition: Option) -> Result { + Ok(compute_record_batch_statistics( + &[vec![self.batch.clone()]], + &self.schema, + None, + )) + } + + fn with_fetch(&self, _limit: Option) -> Option> { + None + } + + fn fetch(&self) -> Option { + None + } + + fn try_swapping_with_projection( + &self, + _projection: &datafusion_physical_expr::projection::ProjectionExprs, + ) -> Result>> { + Ok(None) + } + + fn physical_expressions<'a>( + &'a self, + ) -> Option> + 'a>> { + Some(Box::new( + self.records_with_placeholders + .iter() + .map(|r| Arc::clone(&r.expr)), + )) + } + + fn with_physical_expressions( + &self, + params: ReplacePhysicalExpr, + ) -> Result>> { + let expected_count = self.records_with_placeholders.len(); + let exprs_count = params.exprs.len(); + + assert_eq_or_internal_err!( + expected_count, + exprs_count, + "Inconsistent number of physical expressions for ValuesSource", + ); + + let mut records_with_placeholders = Vec::new(); + let mut column_updates = vec![vec![]; self.schema.fields().len()]; + for (record, expr) in self.records_with_placeholders.iter().zip(params.exprs) { + if has_placeholders(&expr) { + records_with_placeholders.push(record.clone()); + continue; + } + + let scalar = evaluate_to_scalar(expr.as_ref())?; + column_updates[record.column].push((record.row, scalar)); + } + + let mut columns = self.batch.columns().to_vec(); + for (col_idx, updates) in column_updates.into_iter().enumerate() { + if updates.is_empty() { + continue; + } + + let column = &columns[col_idx]; + let mut scalars = (0..column.len()) + .map(|i| ScalarValue::try_from_array(column, i)) + .collect::>>()?; + + for (row_idx, scalar) in updates { + scalars[row_idx] = scalar; + } + + columns[col_idx] = ScalarValue::iter_to_array(scalars)?; + } + + let batch = RecordBatch::try_new(Arc::clone(&self.schema), columns)?; + let data_source = Arc::new(ValuesSource { + schema: Arc::clone(&self.schema), + batch, + records_with_placeholders, + }); + + Ok(Some(data_source)) + } +} + +/// Evaluates a physical expression to a scalar value. +fn evaluate_to_scalar(expr: &dyn PhysicalExpr) -> Result { + static PLACEHOLDER_BATCH: LazyLock = LazyLock::new(|| { + let placeholder_schema = Arc::new(Schema::empty()); + RecordBatch::try_new_with_options( + placeholder_schema, + vec![], + &RecordBatchOptions::new().with_row_count(Some(1)), + ) + .expect("Failed to create placeholder batch") + }); + + let result = expr.evaluate(&PLACEHOLDER_BATCH)?; + match result { + ColumnarValue::Scalar(scalar) => Ok(scalar), + ColumnarValue::Array(array) if array.len() == 1 => { + ScalarValue::try_from_array(&array, 0) + } + _ => plan_err!("Cannot have array values in a values list"), + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use super::*; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::{ + ColumnStatistics, ParamValues, assert_batches_eq, stats::Precision, + }; + use datafusion_expr::Operator; + use datafusion_physical_expr::expressions::{BinaryExpr, lit, placeholder}; + use datafusion_physical_plan::{ + ExecutionPlan, collect, + plan_transformer::{ResolvePlaceholdersRule, TransformPlanExec}, + }; + + #[test] + fn test_values_source_no_placeholders() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, false), + ])); + + let data = vec![vec![lit(1i32), lit("foo")], vec![lit(2i32), lit("bar")]]; + + let exec = ValuesSource::try_new_exec(Arc::clone(&schema), data)?; + + // Should be MemorySourceConfig because no placeholders. + assert!(exec.data_source().as_any().is::()); + + Ok(()) + } + + #[test] + fn test_values_stats_with_nulls_and_placeholders() -> Result<()> { + let schema = + Arc::new(Schema::new(vec![Field::new("col0", DataType::Int32, true)])); + + let data = vec![ + vec![lit(ScalarValue::Int32(None))], + vec![lit(ScalarValue::Int32(None))], + vec![placeholder("$1", DataType::Int32)], + ]; + let rows = data.len(); + let nulls = rows - 1; + + let values = ValuesSource::try_new_exec(schema, data)?; + assert!(values.data_source().as_any().is::()); + + assert_eq!( + values.partition_statistics(None)?, + Statistics { + num_rows: Precision::Exact(rows), + total_byte_size: Precision::Exact(176), // not important + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(nulls), // there are only nulls and placeholders + distinct_count: Precision::Absent, + max_value: Precision::Absent, + min_value: Precision::Absent, + sum_value: Precision::Absent, + byte_size: Precision::Absent, + },], + } + ); + + Ok(()) + } + + // Test issue: https://github.com/apache/datafusion/issues/8763 + #[test] + fn test_values_with_non_nullable_schema() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col0", + DataType::UInt32, + false, + )])); + let _ = + ValuesSource::try_new(Arc::clone(&schema), vec![vec![lit(1u32)]]).unwrap(); + // Test that a null value is rejected + let _ = ValuesSource::try_new(schema, vec![vec![lit(ScalarValue::UInt32(None))]]) + .unwrap_err(); + } + + #[tokio::test] + async fn test_values_source_with_placeholders() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Int32, false), + ])); + + // $1 for the first column, second row. + let data = vec![ + vec![lit("foo"), lit(1i32)], + vec![ + lit("bar"), + Arc::new(BinaryExpr::new( + lit(30i32), + Operator::Plus, + placeholder("$1", DataType::Int32), + )), + ], + ]; + + let values_exec = ValuesSource::try_new_exec(Arc::clone(&schema), data)?; + + // Should be ValuesSource because of placeholder. + assert!(values_exec.data_source().as_any().is::()); + + let rules = vec![Box::new(ResolvePlaceholdersRule::new()) as Box<_>]; + let exec = Arc::new(TransformPlanExec::try_new(values_exec, rules)?); + let task_ctx = Arc::new(TaskContext::default().with_param_values( + ParamValues::List(vec![ScalarValue::Int32(Some(10)).into()]), + )); + + let batch = collect(exec, task_ctx).await?; + let expected = [ + "+-----+----+", + "| a | b |", + "+-----+----+", + "| foo | 1 |", + "| bar | 40 |", + "+-----+----+", + ]; + assert_batches_eq!(expected, &batch); + + Ok(()) + } + + #[tokio::test] + async fn test_values_source_multiple_placeholders() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let data: Vec>> = vec![vec![ + placeholder("$1", DataType::Int32), + placeholder("$2", DataType::Int32), + ]]; + + let values_exec = ValuesSource::try_new_exec(Arc::clone(&schema), data)?; + let rules = vec![Box::new(ResolvePlaceholdersRule::new()) as Box<_>]; + let exec = Arc::new(TransformPlanExec::try_new(values_exec, rules)?) as Arc<_>; + + let task_ctx = Arc::new(TaskContext::default().with_param_values( + ParamValues::List(vec![ + ScalarValue::Int32(Some(10)).into(), + ScalarValue::Int32(Some(20)).into(), + ]), + )); + + let batch = collect(Arc::clone(&exec), task_ctx).await?; + let expected = [ + "+----+----+", + "| a | b |", + "+----+----+", + "| 10 | 20 |", + "+----+----+", + ]; + assert_batches_eq!(expected, &batch); + + let task_ctx = Arc::new(TaskContext::default().with_param_values( + ParamValues::List(vec![ + ScalarValue::Int32(Some(30)).into(), + ScalarValue::Int32(Some(40)).into(), + ]), + )); + + let batch = collect(exec, task_ctx).await?; + let expected = [ + "+----+----+", + "| a | b |", + "+----+----+", + "| 30 | 40 |", + "+----+----+", + ]; + assert_batches_eq!(expected, &batch); + + Ok(()) + } + + #[test] + fn test_values_source_empty_data() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let result = ValuesSource::try_new_exec(schema, vec![]); + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_fails_if_not_all_placeholders_resolved() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let data: Vec>> = + vec![vec![lit(10), placeholder("$foo", DataType::Int32)]]; + + let values_exec = ValuesSource::try_new_exec(Arc::clone(&schema), data)?; + let rules = vec![Box::new(ResolvePlaceholdersRule::new()) as Box<_>]; + let exec = Arc::new(TransformPlanExec::try_new(values_exec, rules)?) as Arc<_>; + + let task_ctx = Arc::new(TaskContext::default()); + let result = collect(Arc::clone(&exec), task_ctx).await; + assert!(result.is_err()); + + let task_ctx = Arc::new(TaskContext::default().with_param_values( + ParamValues::Map(HashMap::from_iter([( + "foo".to_string(), + ScalarValue::Int32(Some(20)).into(), + )])), + )); + + let batch = collect(Arc::clone(&exec), task_ctx).await?; + let expected = [ + "+----+----+", + "| a | b |", + "+----+----+", + "| 10 | 20 |", + "+----+----+", + ]; + assert_batches_eq!(expected, &batch); + + Ok(()) + } +} diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 81ddeb25ffb45..8fe68a264c3cc 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -752,6 +752,7 @@ message PhysicalPlanNode { AsyncFuncExecNode async_func = 36; BufferExecNode buffer = 37; TransformPlanExecNode transform_plan = 38; + ValuesExecNode values_scan = 39; } } @@ -1123,6 +1124,11 @@ message MemoryScanExecNode { optional uint32 fetch = 6; } +message ValuesExecNode { + datafusion_common.Schema schema = 1; + repeated PhysicalExprNode exprs = 2; +} + message CooperativeExecNode { PhysicalPlanNode input = 1; } diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index b2abee35b617d..f579478f976e3 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -17908,6 +17908,9 @@ impl serde::Serialize for PhysicalPlanNode { physical_plan_node::PhysicalPlanType::TransformPlan(v) => { struct_ser.serialize_field("transformPlan", v)?; } + physical_plan_node::PhysicalPlanType::ValuesScan(v) => { + struct_ser.serialize_field("valuesScan", v)?; + } } } struct_ser.end() @@ -17978,6 +17981,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { "buffer", "transform_plan", "transformPlan", + "values_scan", + "valuesScan", ]; #[allow(clippy::enum_variant_names)] @@ -18019,6 +18024,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { AsyncFunc, Buffer, TransformPlan, + ValuesScan, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -18077,6 +18083,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { "asyncFunc" | "async_func" => Ok(GeneratedField::AsyncFunc), "buffer" => Ok(GeneratedField::Buffer), "transformPlan" | "transform_plan" => Ok(GeneratedField::TransformPlan), + "valuesScan" | "values_scan" => Ok(GeneratedField::ValuesScan), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -18356,6 +18363,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { return Err(serde::de::Error::duplicate_field("transformPlan")); } physical_plan_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::TransformPlan) +; + } + GeneratedField::ValuesScan => { + if physical_plan_type__.is_some() { + return Err(serde::de::Error::duplicate_field("valuesScan")); + } + physical_plan_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::ValuesScan) ; } } @@ -24295,6 +24309,114 @@ impl<'de> serde::Deserialize<'de> for UnnestOptions { deserializer.deserialize_struct("datafusion.UnnestOptions", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for ValuesExecNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.schema.is_some() { + len += 1; + } + if !self.exprs.is_empty() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.ValuesExecNode", len)?; + if let Some(v) = self.schema.as_ref() { + struct_ser.serialize_field("schema", v)?; + } + if !self.exprs.is_empty() { + struct_ser.serialize_field("exprs", &self.exprs)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for ValuesExecNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "schema", + "exprs", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Schema, + Exprs, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "schema" => Ok(GeneratedField::Schema), + "exprs" => Ok(GeneratedField::Exprs), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = ValuesExecNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.ValuesExecNode") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut schema__ = None; + let mut exprs__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Schema => { + if schema__.is_some() { + return Err(serde::de::Error::duplicate_field("schema")); + } + schema__ = map_.next_value()?; + } + GeneratedField::Exprs => { + if exprs__.is_some() { + return Err(serde::de::Error::duplicate_field("exprs")); + } + exprs__ = Some(map_.next_value()?); + } + } + } + Ok(ValuesExecNode { + schema: schema__, + exprs: exprs__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.ValuesExecNode", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for ValuesNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 000be28848ac3..755d04c68111f 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1079,7 +1079,7 @@ pub mod table_reference { pub struct PhysicalPlanNode { #[prost( oneof = "physical_plan_node::PhysicalPlanType", - tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38" + tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39" )] pub physical_plan_type: ::core::option::Option, } @@ -1163,6 +1163,8 @@ pub mod physical_plan_node { Buffer(::prost::alloc::boxed::Box), #[prost(message, tag = "38")] TransformPlan(::prost::alloc::boxed::Box), + #[prost(message, tag = "39")] + ValuesScan(super::ValuesExecNode), } } #[derive(Clone, PartialEq, ::prost::Message)] @@ -1695,6 +1697,13 @@ pub struct MemoryScanExecNode { pub fetch: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] +pub struct ValuesExecNode { + #[prost(message, optional, tag = "1")] + pub schema: ::core::option::Option, + #[prost(message, repeated, tag = "2")] + pub exprs: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] pub struct CooperativeExecNode { #[prost(message, optional, boxed, tag = "1")] pub input: ::core::option::Option<::prost::alloc::boxed::Box>, diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 5aa9f57be61dd..4d0b3645d5025 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -34,6 +34,7 @@ use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_datasource::sink::DataSinkExec; use datafusion_datasource::source::{DataSource, DataSourceExec}; +use datafusion_datasource::values::ValuesSource; #[cfg(feature = "avro")] use datafusion_datasource_avro::source::AvroSource; use datafusion_datasource_csv::file_format::CsvSink; @@ -323,6 +324,9 @@ impl protobuf::PhysicalPlanNode { codec, proto_converter, ), + PhysicalPlanType::ValuesScan(scan) => { + self.try_into_values_scan_exec(scan, ctx, codec, proto_converter) + } } } @@ -2248,6 +2252,34 @@ impl protobuf::PhysicalPlanNode { Ok(Arc::new(transformer)) } + fn try_into_values_scan_exec( + &self, + scan: &protobuf::ValuesExecNode, + ctx: &TaskContext, + codec: &dyn PhysicalExtensionCodec, + proto_converter: &dyn PhysicalProtoConverterExtension, + ) -> Result> { + let proto_schema = scan.schema.as_ref().ok_or_else(|| { + internal_datafusion_err!("schema in ValuesExecNode is missing.") + })?; + let schema = SchemaRef::new(proto_schema.try_into()?); + let linear_exprs = scan + .exprs + .iter() + .map(|expr| proto_converter.proto_to_physical_expr(expr, ctx, &schema, codec)) + .collect::>>()?; + + let columns = schema.fields.len(); + let rows = linear_exprs.len().checked_div(columns).unwrap_or(0); + let mut exprs = Vec::with_capacity(rows); + + for chunk in linear_exprs.chunks(columns) { + exprs.push(chunk.to_vec()); + } + + ValuesSource::try_new_exec(schema, exprs).map(|plan| plan as Arc<_>) + } + fn try_from_explain_exec( exec: &ExplainExec, _codec: &dyn PhysicalExtensionCodec, @@ -3006,6 +3038,15 @@ impl protobuf::PhysicalPlanNode { })); } + if let Some(source) = data_source.as_any().downcast_ref::() { + let node = protobuf::PhysicalPlanNode::try_from_values_source( + source, + codec, + proto_converter, + )?; + return Ok(Some(node)); + } + Ok(None) } @@ -3392,6 +3433,14 @@ impl protobuf::PhysicalPlanNode { })); } + if let Some(source) = exec.sink().as_any().downcast_ref::() { + return Ok(Some(protobuf::PhysicalPlanNode::try_from_values_source( + source, + codec, + proto_converter, + )?)); + } + // If unknown DataSink then let extension handle it Ok(None) } @@ -3653,6 +3702,29 @@ impl protobuf::PhysicalPlanNode { ))), }) } + + fn try_from_values_source( + source: &ValuesSource, + extension_codec: &dyn PhysicalExtensionCodec, + proto_converter: &dyn PhysicalProtoConverterExtension, + ) -> Result { + let schema = source.schema(); + let proto_exprs = source + .expressions() + .iter() + .flatten() + .map(|expr| proto_converter.physical_expr_to_proto(expr, extension_codec)) + .collect::>>()?; + + Ok(protobuf::PhysicalPlanNode { + physical_plan_type: Some(PhysicalPlanType::ValuesScan( + protobuf::ValuesExecNode { + schema: Some(schema.as_ref().try_into()?), + exprs: proto_exprs, + }, + )), + }) + } } pub trait AsExecutionPlan: Debug + Send + Sync + Clone { diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 820089a907d3b..2684ff56a2398 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -2379,6 +2379,17 @@ async fn roundtrip_resolve_placeholders_exec() -> Result<()> { roundtrip_test(plan) } +#[tokio::test] +async fn roundtrip_values_source() -> Result<()> { + let ctx = SessionContext::new(); + let plan = ctx + .sql("SELECT * FROM VALUES (10, $1), (20, 30), (40, $2), ($3, $3), ($4, $5), (50, 60)") + .await? + .create_physical_plan() + .await?; + roundtrip_test(plan) +} + /// Test that HashTableLookupExpr serializes to lit(true) /// /// HashTableLookupExpr contains a runtime hash table that cannot be serialized. diff --git a/datafusion/sqllogictest/test_files/placeholders.slt b/datafusion/sqllogictest/test_files/placeholders.slt index 24d43527052c7..3dfa7eb572dbc 100644 --- a/datafusion/sqllogictest/test_files/placeholders.slt +++ b/datafusion/sqllogictest/test_files/placeholders.slt @@ -37,6 +37,20 @@ CREATE TABLE t1( (2, 'Bob'), (3, 'Alice') +# INSERT with placeholders +query TT +EXPLAIN INSERT INTO t1 VALUES ($1, 'Samanta'), (5, $2) +---- +logical_plan +01)Dml: op=[Insert Into] table=[t1] +02)--Projection: column1 AS id, column2 AS name +03)----Values: ($1, Utf8View("Samanta") AS Utf8("Samanta")), (Int32(5) AS Int64(5), $2) +physical_plan +01)TransformPlanExec: rules=[ResolvePlaceholders: plans_to_modify=1] +02)--DataSinkExec: sink=MemoryTable (partitions=1) +03)----ProjectionExec: expr=[column1@0 as id, column2@1 as name] +04)------DataSourceExec: placeholders=2 + # Filter with multiple placeholders query TT EXPLAIN SELECT id FROM t1 WHERE name = $1 OR id = $2