From 88e28a4c0c2523bb61d5f95b75fd9ac7bc1252a8 Mon Sep 17 00:00:00 2001 From: Herrtian <70463940+Herrtian@users.noreply.github.com> Date: Fri, 22 May 2026 18:58:10 +0200 Subject: [PATCH 1/9] Port NotExpr proto hooks --- .../physical-expr/src/expressions/not.rs | 40 +++++++++++++++++++ .../proto/src/physical_plan/from_proto.rs | 8 +--- .../proto/src/physical_plan/to_proto.rs | 15 +------ 3 files changed, 43 insertions(+), 20 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs index b63effdbb9c88..76833d1114563 100644 --- a/datafusion/physical-expr/src/expressions/not.rs +++ b/datafusion/physical-expr/src/expressions/not.rs @@ -181,6 +181,46 @@ impl PhysicalExpr for NotExpr { write!(f, "NOT ")?; self.arg.fmt_sql(f) } + + #[cfg(feature = "proto")] + fn try_to_proto( + &self, + ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>, + ) -> Result> { + use datafusion_proto_models::protobuf; + + Ok(Some(protobuf::PhysicalExprNode { + expr_id: None, + expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new( + protobuf::PhysicalNot { + expr: Some(Box::new(ctx.encode_child(&self.arg)?)), + }, + ))), + })) + } +} + +#[cfg(feature = "proto")] +impl NotExpr { + /// Reconstruct a [`NotExpr`] from its protobuf representation. + pub fn try_from_proto( + node: &datafusion_proto_models::protobuf::PhysicalExprNode, + ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>, + ) -> Result> { + use datafusion_proto_models::protobuf; + + let protobuf::PhysicalNot { expr } = match &node.expr_type { + Some(protobuf::physical_expr_node::ExprType::NotExpr(e)) => e.as_ref(), + _ => return internal_err!("PhysicalExprNode is not a NotExpr"), + }; + let expr = expr.as_deref().ok_or_else(|| { + datafusion_common::DataFusionError::Internal( + "NotExpr is missing required field 'expr'".to_string(), + ) + })?; + + Ok(Arc::new(NotExpr::new(ctx.decode(expr)?))) + } } /// Creates a unary expression NOT diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 96144b11e9d3a..cb2f20a23a816 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -312,13 +312,7 @@ pub fn parse_physical_expr_with_converter( proto_converter, )?)) } - ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr( - e.expr.as_deref(), - ctx, - "expr", - input_schema, - proto_converter, - )?)), + ExprType::NotExpr(_) => NotExpr::try_from_proto(proto, &decode_ctx)?, ExprType::Negative(e) => { Arc::new(NegativeExpr::new(parse_required_physical_expr( e.expr.as_deref(), diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 5dd643c84ba21..3c7dbd1dd835f 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -36,8 +36,8 @@ use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr; use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ - CaseExpr, CastExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, Literal, - NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, + CaseExpr, CastExpr, DynamicFilterPhysicalExpr, InListExpr, IsNotNullExpr, IsNullExpr, + LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, }; use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr}; use datafusion_physical_plan::udaf::AggregateFunctionExpr; @@ -379,17 +379,6 @@ pub fn serialize_physical_expr_with_converter( ), ), }) - } else if let Some(expr) = expr.downcast_ref::() { - Ok(protobuf::PhysicalExprNode { - expr_id, - expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new( - protobuf::PhysicalNot { - expr: Some(Box::new( - proto_converter.physical_expr_to_proto(expr.arg(), codec)?, - )), - }, - ))), - }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id, From 15800c7f03db7c7ed5fad8acc8c8bfe96580ec52 Mon Sep 17 00:00:00 2001 From: Herrtian <70463940+Herrtian@users.noreply.github.com> Date: Sun, 24 May 2026 11:03:10 +0200 Subject: [PATCH 2/9] Add NotExpr proto missing child test --- .../physical-expr/src/expressions/not.rs | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs index 76833d1114563..64cb7733087bc 100644 --- a/datafusion/physical-expr/src/expressions/not.rs +++ b/datafusion/physical-expr/src/expressions/not.rs @@ -238,6 +238,45 @@ mod tests { use arrow::{array::BooleanArray, datatypes::*}; use datafusion_physical_expr_common::physical_expr::fmt_sql; + #[cfg(feature = "proto")] + #[test] + fn test_from_proto_missing_child() { + use datafusion_common::DataFusionError; + use datafusion_physical_expr_common::physical_expr::proto_decode::{ + PhysicalExprDecode, PhysicalExprDecodeCtx, + }; + use datafusion_proto_models::protobuf::{ + PhysicalExprNode, PhysicalNot, physical_expr_node, + }; + + struct NoopDecoder; + + impl PhysicalExprDecode for NoopDecoder { + fn decode( + &self, + _node: &PhysicalExprNode, + _schema: &Schema, + ) -> Result> { + unreachable!("missing child should be rejected before decoding") + } + } + + let node = PhysicalExprNode { + expr_id: None, + expr_type: Some(physical_expr_node::ExprType::NotExpr(Box::new( + PhysicalNot { expr: None }, + ))), + }; + let schema = Schema::empty(); + let decoder = NoopDecoder; + let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + + let err = NotExpr::try_from_proto(&node, &ctx).unwrap_err(); + assert!( + matches!(err, DataFusionError::Internal(msg) if msg == "NotExpr is missing required field 'expr'") + ); + } + #[test] fn neg_op() -> Result<()> { let schema = schema(); From 812b9e05655329d31c75f51a57f43aee9fac628a Mon Sep 17 00:00:00 2001 From: Herrtian <70463940+Herrtian@users.noreply.github.com> Date: Mon, 25 May 2026 16:21:24 +0200 Subject: [PATCH 3/9] Improve NotExpr proto decode error Signed-off-by: Herrtian <70463940+Herrtian@users.noreply.github.com> --- .../physical-expr/src/expressions/not.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs index 64cb7733087bc..16061103ef589 100644 --- a/datafusion/physical-expr/src/expressions/not.rs +++ b/datafusion/physical-expr/src/expressions/not.rs @@ -25,7 +25,9 @@ use crate::PhysicalExpr; use arrow::datatypes::{DataType, FieldRef, Schema}; use arrow::record_batch::RecordBatch; -use datafusion_common::{Result, ScalarValue, cast::as_boolean_array, internal_err}; +use datafusion_common::{ + Result, ScalarValue, cast::as_boolean_array, internal_datafusion_err, internal_err, +}; use datafusion_expr::ColumnarValue; use datafusion_expr::interval_arithmetic::Interval; #[expect(deprecated)] @@ -214,8 +216,9 @@ impl NotExpr { _ => return internal_err!("PhysicalExprNode is not a NotExpr"), }; let expr = expr.as_deref().ok_or_else(|| { - datafusion_common::DataFusionError::Internal( - "NotExpr is missing required field 'expr'".to_string(), + internal_datafusion_err!( + "NotExpr is missing required field 'expr' (expr_id: {:?}, expr_type: NotExpr)", + node.expr_id ) })?; @@ -262,7 +265,7 @@ mod tests { } let node = PhysicalExprNode { - expr_id: None, + expr_id: Some(42), expr_type: Some(physical_expr_node::ExprType::NotExpr(Box::new( PhysicalNot { expr: None }, ))), @@ -272,9 +275,10 @@ mod tests { let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); let err = NotExpr::try_from_proto(&node, &ctx).unwrap_err(); - assert!( - matches!(err, DataFusionError::Internal(msg) if msg == "NotExpr is missing required field 'expr'") - ); + assert!(matches!(err, DataFusionError::Internal(msg) + if msg.contains("NotExpr is missing required field 'expr'") + && msg.contains("expr_id: Some(42)") + && msg.contains("expr_type: NotExpr"))); } #[test] From 33a90db6f256d9683940b87eacfbf8cc00c7e35b Mon Sep 17 00:00:00 2001 From: Herrtian <70463940+Herrtian@users.noreply.github.com> Date: Mon, 25 May 2026 16:23:59 +0200 Subject: [PATCH 4/9] Remove stale proto serializer imports Signed-off-by: Herrtian <70463940+Herrtian@users.noreply.github.com> --- datafusion/proto/src/physical_plan/to_proto.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 3c7dbd1dd835f..6fc25fc0233c2 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -36,8 +36,8 @@ use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr; use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ - CaseExpr, CastExpr, DynamicFilterPhysicalExpr, InListExpr, IsNotNullExpr, IsNullExpr, - LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, + CaseExpr, CastExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, Literal, + NegativeExpr, TryCastExpr, UnKnownColumn, }; use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr}; use datafusion_physical_plan::udaf::AggregateFunctionExpr; From 14ddac9890b314908127247546c73386c3f01f81 Mon Sep 17 00:00:00 2001 From: Herrtian <70463940+Herrtian@users.noreply.github.com> Date: Wed, 27 May 2026 14:24:10 +0200 Subject: [PATCH 5/9] Address NotExpr proto review --- .../physical-expr/src/expressions/not.rs | 87 +++++++++++-------- 1 file changed, 51 insertions(+), 36 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs index 16061103ef589..409b318a085aa 100644 --- a/datafusion/physical-expr/src/expressions/not.rs +++ b/datafusion/physical-expr/src/expressions/not.rs @@ -215,14 +215,22 @@ impl NotExpr { Some(protobuf::physical_expr_node::ExprType::NotExpr(e)) => e.as_ref(), _ => return internal_err!("PhysicalExprNode is not a NotExpr"), }; - let expr = expr.as_deref().ok_or_else(|| { - internal_datafusion_err!( - "NotExpr is missing required field 'expr' (expr_id: {:?}, expr_type: NotExpr)", - node.expr_id - ) - })?; - - Ok(Arc::new(NotExpr::new(ctx.decode(expr)?))) + let expr = ctx + .decode_required_expression(expr.as_deref(), "NotExpr", "expr") + .map_err(|err| match err { + datafusion_common::DataFusionError::Internal(msg) + if msg == "NotExpr is missing required field 'expr'" => + { + internal_datafusion_err!( + "{msg} (expr_id: {:?}, expr_type: {:?})", + node.expr_id, + &node.expr_type + ) + } + other => other, + })?; + + Ok(Arc::new(NotExpr::new(expr))) } } @@ -231,39 +239,35 @@ pub fn not(arg: Arc) -> Result> { Ok(Arc::new(NotExpr::new(arg))) } -#[cfg(test)] -mod tests { - use std::sync::LazyLock; +#[cfg(all(test, feature = "proto"))] +mod proto_tests { + use std::sync::Arc; use super::*; - use crate::expressions::{Column, col}; - use arrow::{array::BooleanArray, datatypes::*}; - use datafusion_physical_expr_common::physical_expr::fmt_sql; + use arrow::datatypes::Schema; + use datafusion_common::DataFusionError; + use datafusion_physical_expr_common::physical_expr::proto_decode::{ + PhysicalExprDecode, PhysicalExprDecodeCtx, + }; + use datafusion_proto_models::protobuf::{ + PhysicalExprNode, PhysicalNot, physical_expr_node, + }; + + struct NoopDecoder; + + impl PhysicalExprDecode for NoopDecoder { + fn decode( + &self, + _node: &PhysicalExprNode, + _schema: &Schema, + ) -> Result> { + unreachable!("missing child should be rejected before decoding") + } + } - #[cfg(feature = "proto")] #[test] fn test_from_proto_missing_child() { - use datafusion_common::DataFusionError; - use datafusion_physical_expr_common::physical_expr::proto_decode::{ - PhysicalExprDecode, PhysicalExprDecodeCtx, - }; - use datafusion_proto_models::protobuf::{ - PhysicalExprNode, PhysicalNot, physical_expr_node, - }; - - struct NoopDecoder; - - impl PhysicalExprDecode for NoopDecoder { - fn decode( - &self, - _node: &PhysicalExprNode, - _schema: &Schema, - ) -> Result> { - unreachable!("missing child should be rejected before decoding") - } - } - let node = PhysicalExprNode { expr_id: Some(42), expr_type: Some(physical_expr_node::ExprType::NotExpr(Box::new( @@ -278,8 +282,19 @@ mod tests { assert!(matches!(err, DataFusionError::Internal(msg) if msg.contains("NotExpr is missing required field 'expr'") && msg.contains("expr_id: Some(42)") - && msg.contains("expr_type: NotExpr"))); + && msg.contains("expr_type: Some(NotExpr"))); } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use super::*; + use crate::expressions::{Column, col}; + + use arrow::{array::BooleanArray, datatypes::*}; + use datafusion_physical_expr_common::physical_expr::fmt_sql; #[test] fn neg_op() -> Result<()> { From 17b267cc49127c485eec272476bf34d9c9874fd2 Mon Sep 17 00:00:00 2001 From: Herrtian <70463940+Herrtian@users.noreply.github.com> Date: Thu, 28 May 2026 19:57:17 +0200 Subject: [PATCH 6/9] Handle NotExpr proto backtrace errors Signed-off-by: Herrtian <70463940+Herrtian@users.noreply.github.com> --- datafusion/physical-expr/src/expressions/not.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs index 409b318a085aa..a86cd86ab56e7 100644 --- a/datafusion/physical-expr/src/expressions/not.rs +++ b/datafusion/physical-expr/src/expressions/not.rs @@ -219,10 +219,10 @@ impl NotExpr { .decode_required_expression(expr.as_deref(), "NotExpr", "expr") .map_err(|err| match err { datafusion_common::DataFusionError::Internal(msg) - if msg == "NotExpr is missing required field 'expr'" => + if msg.starts_with("NotExpr is missing required field 'expr'") => { internal_datafusion_err!( - "{msg} (expr_id: {:?}, expr_type: {:?})", + "NotExpr is missing required field 'expr' (expr_id: {:?}, expr_type: {:?})", node.expr_id, &node.expr_type ) From 794438fa52c1ecf97ea0dd32f1f00ce91b81ab8e Mon Sep 17 00:00:00 2001 From: Herrtian <70463940+Herrtian@users.noreply.github.com> Date: Thu, 28 May 2026 20:56:40 +0200 Subject: [PATCH 7/9] Use proto variant helper for NotExpr Signed-off-by: Herrtian <70463940+Herrtian@users.noreply.github.com> --- datafusion/physical-expr/src/expressions/not.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs index a86cd86ab56e7..57ec6e9b27079 100644 --- a/datafusion/physical-expr/src/expressions/not.rs +++ b/datafusion/physical-expr/src/expressions/not.rs @@ -209,14 +209,16 @@ impl NotExpr { node: &datafusion_proto_models::protobuf::PhysicalExprNode, ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>, ) -> Result> { + use datafusion_physical_expr_common::expect_expr_variant; use datafusion_proto_models::protobuf; - let protobuf::PhysicalNot { expr } = match &node.expr_type { - Some(protobuf::physical_expr_node::ExprType::NotExpr(e)) => e.as_ref(), - _ => return internal_err!("PhysicalExprNode is not a NotExpr"), - }; + let not_expr = expect_expr_variant!( + node, + protobuf::physical_expr_node::ExprType::NotExpr, + "NotExpr", + ); let expr = ctx - .decode_required_expression(expr.as_deref(), "NotExpr", "expr") + .decode_required_expression(not_expr.expr.as_deref(), "NotExpr", "expr") .map_err(|err| match err { datafusion_common::DataFusionError::Internal(msg) if msg.starts_with("NotExpr is missing required field 'expr'") => From 215e8926829f5ec4b834529101d6c304fd97a6f3 Mon Sep 17 00:00:00 2001 From: Herrtian <70463940+Herrtian@users.noreply.github.com> Date: Thu, 28 May 2026 21:19:55 +0200 Subject: [PATCH 8/9] Address NotExpr proto review feedback Signed-off-by: Herrtian <70463940+Herrtian@users.noreply.github.com> --- .../physical-expr/src/expressions/not.rs | 176 +++++++++++------- 1 file changed, 112 insertions(+), 64 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs index 57ec6e9b27079..f856dd568a8da 100644 --- a/datafusion/physical-expr/src/expressions/not.rs +++ b/datafusion/physical-expr/src/expressions/not.rs @@ -25,9 +25,7 @@ use crate::PhysicalExpr; use arrow::datatypes::{DataType, FieldRef, Schema}; use arrow::record_batch::RecordBatch; -use datafusion_common::{ - Result, ScalarValue, cast::as_boolean_array, internal_datafusion_err, internal_err, -}; +use datafusion_common::{Result, ScalarValue, cast::as_boolean_array, internal_err}; use datafusion_expr::ColumnarValue; use datafusion_expr::interval_arithmetic::Interval; #[expect(deprecated)] @@ -217,20 +215,8 @@ impl NotExpr { protobuf::physical_expr_node::ExprType::NotExpr, "NotExpr", ); - let expr = ctx - .decode_required_expression(not_expr.expr.as_deref(), "NotExpr", "expr") - .map_err(|err| match err { - datafusion_common::DataFusionError::Internal(msg) - if msg.starts_with("NotExpr is missing required field 'expr'") => - { - internal_datafusion_err!( - "NotExpr is missing required field 'expr' (expr_id: {:?}, expr_type: {:?})", - node.expr_id, - &node.expr_type - ) - } - other => other, - })?; + let expr = + ctx.decode_required_expression(not_expr.expr.as_deref(), "NotExpr", "expr")?; Ok(Arc::new(NotExpr::new(expr))) } @@ -241,53 +227,6 @@ pub fn not(arg: Arc) -> Result> { Ok(Arc::new(NotExpr::new(arg))) } -#[cfg(all(test, feature = "proto"))] -mod proto_tests { - use std::sync::Arc; - - use super::*; - - use arrow::datatypes::Schema; - use datafusion_common::DataFusionError; - use datafusion_physical_expr_common::physical_expr::proto_decode::{ - PhysicalExprDecode, PhysicalExprDecodeCtx, - }; - use datafusion_proto_models::protobuf::{ - PhysicalExprNode, PhysicalNot, physical_expr_node, - }; - - struct NoopDecoder; - - impl PhysicalExprDecode for NoopDecoder { - fn decode( - &self, - _node: &PhysicalExprNode, - _schema: &Schema, - ) -> Result> { - unreachable!("missing child should be rejected before decoding") - } - } - - #[test] - fn test_from_proto_missing_child() { - let node = PhysicalExprNode { - expr_id: Some(42), - expr_type: Some(physical_expr_node::ExprType::NotExpr(Box::new( - PhysicalNot { expr: None }, - ))), - }; - let schema = Schema::empty(); - let decoder = NoopDecoder; - let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); - - let err = NotExpr::try_from_proto(&node, &ctx).unwrap_err(); - assert!(matches!(err, DataFusionError::Internal(msg) - if msg.contains("NotExpr is missing required field 'expr'") - && msg.contains("expr_id: Some(42)") - && msg.contains("expr_type: Some(NotExpr"))); - } -} - #[cfg(test)] mod tests { use std::sync::LazyLock; @@ -457,3 +396,112 @@ mod tests { Arc::clone(&SCHEMA) } } + +/// Tests for the `try_to_proto` / `try_from_proto` hooks. +#[cfg(all(test, feature = "proto"))] +mod proto_tests { + use super::*; + use crate::expressions::{Column, col}; + use crate::proto_test_util::{ + StubDecoder, StubEncoder, UnreachableDecoder, column_node, + }; + use arrow::datatypes::Field; + use datafusion_common::DataFusionError; + use datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx; + use datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx; + use datafusion_proto_models::protobuf::{ + PhysicalExprNode, PhysicalNot, physical_expr_node, + }; + + /// Build a `NotExpr` proto node with the given child. + fn not_node(expr: Option>) -> PhysicalExprNode { + PhysicalExprNode { + expr_id: None, + expr_type: Some(physical_expr_node::ExprType::NotExpr(Box::new( + PhysicalNot { expr }, + ))), + } + } + + /// A `NotExpr` over a boolean column. + fn not_fixture() -> NotExpr { + let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]); + NotExpr::new(col("a", &schema).unwrap()) + } + + #[test] + fn try_to_proto_encodes_not_expr() { + let not = not_fixture(); + let encoder = StubEncoder::ok(); + let ctx = PhysicalExprEncodeCtx::new(&encoder); + + let node = not + .try_to_proto(&ctx) + .unwrap() + .expect("NotExpr should encode to Some(node)"); + + assert!(node.expr_id.is_none()); + let not_node = match node.expr_type { + Some(physical_expr_node::ExprType::NotExpr(boxed)) => *boxed, + other => panic!("expected a NotExpr node, got {other:?}"), + }; + assert!(not_node.expr.is_some()); + } + + #[test] + fn try_to_proto_propagates_expr_encode_error() { + let not = not_fixture(); + let encoder = StubEncoder::failing_on(1); + let ctx = PhysicalExprEncodeCtx::new(&encoder); + let err = not.try_to_proto(&ctx).unwrap_err(); + assert!(matches!(err, DataFusionError::Internal(msg) if msg.contains("call 1"))); + } + + #[test] + fn try_from_proto_decodes_not_expr() { + let node = not_node(Some(Box::new(column_node("a")))); + let schema = Schema::empty(); + let decoder = StubDecoder::ok(); + let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + + let decoded = NotExpr::try_from_proto(&node, &ctx).unwrap(); + let not = decoded + .downcast_ref::() + .expect("decoded expr should be a NotExpr"); + assert!(not.arg().downcast_ref::().is_some()); + } + + #[test] + fn try_from_proto_rejects_non_not_node() { + let node = column_node("a"); + let schema = Schema::empty(); + let decoder = UnreachableDecoder; + let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + let err = NotExpr::try_from_proto(&node, &ctx).unwrap_err(); + assert!( + matches!(err, DataFusionError::Internal(msg) if msg.contains("PhysicalExprNode is not a NotExpr")) + ); + } + + #[test] + fn try_from_proto_rejects_missing_expr() { + let node = not_node(None); + let schema = Schema::empty(); + let decoder = UnreachableDecoder; + let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + let err = NotExpr::try_from_proto(&node, &ctx).unwrap_err(); + assert!( + matches!(err, DataFusionError::Internal(msg) if msg.contains("NotExpr is missing required field 'expr'")) + ); + } + + #[test] + fn try_from_proto_propagates_expr_decode_error() { + let node = not_node(Some(Box::new(column_node("a")))); + let schema = Schema::empty(); + let decoder = StubDecoder::failing_on(1); + let ctx = PhysicalExprDecodeCtx::new(&schema, &decoder); + let err = NotExpr::try_from_proto(&node, &ctx).unwrap_err(); + assert!(matches!(err, DataFusionError::Internal(msg) if msg.contains("call 1"))); + } +} From abc1cc82d2bdc710c7a73c2eff092f712606062b Mon Sep 17 00:00:00 2001 From: Herrtian <70463940+Herrtian@users.noreply.github.com> Date: Thu, 28 May 2026 21:58:38 +0200 Subject: [PATCH 9/9] Remove unused NotExpr import Signed-off-by: Herrtian <70463940+Herrtian@users.noreply.github.com> --- datafusion/proto/src/physical_plan/to_proto.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index cfd9555a3bdd6..6febf15835f4c 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -36,8 +36,8 @@ use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr; use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ - CaseExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, Literal, NotExpr, - TryCastExpr, UnKnownColumn, + CaseExpr, DynamicFilterPhysicalExpr, IsNotNullExpr, IsNullExpr, Literal, TryCastExpr, + UnKnownColumn, }; use datafusion_physical_plan::udaf::AggregateFunctionExpr; use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr};