From 768b3e90f261c7aea58bdb98dc698b90deeeae34 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Sun, 14 Dec 2025 16:24:01 +0400 Subject: [PATCH 1/8] impl map_from_entries --- native/core/src/execution/jni_api.rs | 2 + .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../scala/org/apache/comet/serde/maps.scala | 29 +++++++++++- .../comet/CometMapExpressionSuite.scala | 45 +++++++++++++++++++ 4 files changed, 77 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index a24d993059..4f53cea3e6 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -46,6 +46,7 @@ use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; +use datafusion_spark::function::map::map_from_entries::MapFromEntries; use datafusion_spark::function::math::expm1::SparkExpm1; use datafusion_spark::function::string::char::CharFunc; use datafusion_spark::function::string::concat::SparkConcat; @@ -337,6 +338,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default())); } /// Prepares arrow arrays for output. diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 54df2f1688..a99cf3824b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -125,7 +125,8 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[MapKeys] -> CometMapKeys, classOf[MapEntries] -> CometMapEntries, classOf[MapValues] -> CometMapValues, - classOf[MapFromArrays] -> CometMapFromArrays) + classOf[MapFromArrays] -> CometMapFromArrays, + classOf[MapFromEntries] -> CometMapFromEntries) private val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[CreateNamedStruct] -> CometCreateNamedStruct, diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index 2e217f6af0..498aa3594c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -19,9 +19,12 @@ package org.apache.comet.serde +import scala.annotation.tailrec + import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ArrayType, MapType} +import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, MapType, StructType} +import org.apache.comet.serde.CometArrayReverse.containsBinary import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} object CometMapKeys extends CometExpressionSerde[MapKeys] { @@ -89,3 +92,27 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { optExprWithInfo(mapFromArraysExpr, expr, expr.children: _*) } } + +object CometMapFromEntries extends CometScalarFunction[MapFromEntries]("map_from_entries") { + val keyUnsupportedReason = "Using BinaryType as Map keys is not allowed in map_from_entries" + val valueUnsupportedReason = "Using BinaryType as Map values is not allowed in map_from_entries" + + private def containsBinary(dataType: DataType): Boolean = { + dataType match { + case BinaryType => true + case StructType(fields) => fields.exists(field => containsBinary(field.dataType)) + case ArrayType(elementType, _) => containsBinary(elementType) + case _ => false + } + } + + override def getSupportLevel(expr: MapFromEntries): SupportLevel = { + if (containsBinary(expr.dataType.keyType)) { + return Incompatible(Some(keyUnsupportedReason)) + } + if (containsBinary(expr.dataType.valueType)) { + return Incompatible(Some(valueUnsupportedReason)) + } + Compatible(None) + } +} diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 88c13391a6..01b9744ed6 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -25,7 +25,9 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.BinaryType +import org.apache.comet.serde.CometMapFromEntries import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometMapExpressionSuite extends CometTestBase { @@ -125,4 +127,47 @@ class CometMapExpressionSuite extends CometTestBase { } } + test("map_from_entries") { + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + val filename = path.toString + val random = new Random(42) + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val schemaGenOptions = + SchemaGenOptions( + generateArray = true, + generateStruct = true, + primitiveTypes = SchemaGenOptions.defaultPrimitiveTypes.filterNot(_ == BinaryType)) + val dataGenOptions = DataGenOptions(allowNull = false, generateNegativeZero = false) + ParquetGenerator.makeParquetFile( + random, + spark, + filename, + 100, + schemaGenOptions, + dataGenOptions) + } + val df = spark.read.parquet(filename) + df.createOrReplaceTempView("t1") + for (field <- df.schema.fieldNames) { + checkSparkAnswerAndOperator( + spark.sql(s"SELECT map_from_entries(array(struct($field as a, $field as b))) FROM t1")) + } + } + } + + test("map_from_entries - fallback for binary type") { + val table = "t2" + withTable(table) { + sql( + s"create table $table using parquet as select cast(array() as array) as c1 from range(10)") + checkSparkAnswerAndFallbackReason( + sql(s"select map_from_entries(array(struct(c1, 0))) from $table"), + CometMapFromEntries.keyUnsupportedReason) + checkSparkAnswerAndFallbackReason( + sql(s"select map_from_entries(array(struct(0, c1))) from $table"), + CometMapFromEntries.valueUnsupportedReason) + } + } + } From c68c3428676b5d991e7ba9e13464bf2ce1ec84e8 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 16 Dec 2025 16:10:43 +0400 Subject: [PATCH 2/8] Revert "impl map_from_entries" This reverts commit 768b3e90f261c7aea58bdb98dc698b90deeeae34. --- native/core/src/execution/jni_api.rs | 2 - .../apache/comet/serde/QueryPlanSerde.scala | 3 +- .../scala/org/apache/comet/serde/maps.scala | 29 +----------- .../comet/CometMapExpressionSuite.scala | 45 ------------------- 4 files changed, 2 insertions(+), 77 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 4f53cea3e6..a24d993059 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -46,7 +46,6 @@ use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; -use datafusion_spark::function::map::map_from_entries::MapFromEntries; use datafusion_spark::function::math::expm1::SparkExpm1; use datafusion_spark::function::string::char::CharFunc; use datafusion_spark::function::string::concat::SparkConcat; @@ -338,7 +337,6 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); - session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default())); } /// Prepares arrow arrays for output. diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index a99cf3824b..54df2f1688 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -125,8 +125,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[MapKeys] -> CometMapKeys, classOf[MapEntries] -> CometMapEntries, classOf[MapValues] -> CometMapValues, - classOf[MapFromArrays] -> CometMapFromArrays, - classOf[MapFromEntries] -> CometMapFromEntries) + classOf[MapFromArrays] -> CometMapFromArrays) private val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[CreateNamedStruct] -> CometCreateNamedStruct, diff --git a/spark/src/main/scala/org/apache/comet/serde/maps.scala b/spark/src/main/scala/org/apache/comet/serde/maps.scala index 498aa3594c..2e217f6af0 100644 --- a/spark/src/main/scala/org/apache/comet/serde/maps.scala +++ b/spark/src/main/scala/org/apache/comet/serde/maps.scala @@ -19,12 +19,9 @@ package org.apache.comet.serde -import scala.annotation.tailrec - import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, MapType, StructType} +import org.apache.spark.sql.types.{ArrayType, MapType} -import org.apache.comet.serde.CometArrayReverse.containsBinary import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto, scalarFunctionExprToProtoWithReturnType} object CometMapKeys extends CometExpressionSerde[MapKeys] { @@ -92,27 +89,3 @@ object CometMapFromArrays extends CometExpressionSerde[MapFromArrays] { optExprWithInfo(mapFromArraysExpr, expr, expr.children: _*) } } - -object CometMapFromEntries extends CometScalarFunction[MapFromEntries]("map_from_entries") { - val keyUnsupportedReason = "Using BinaryType as Map keys is not allowed in map_from_entries" - val valueUnsupportedReason = "Using BinaryType as Map values is not allowed in map_from_entries" - - private def containsBinary(dataType: DataType): Boolean = { - dataType match { - case BinaryType => true - case StructType(fields) => fields.exists(field => containsBinary(field.dataType)) - case ArrayType(elementType, _) => containsBinary(elementType) - case _ => false - } - } - - override def getSupportLevel(expr: MapFromEntries): SupportLevel = { - if (containsBinary(expr.dataType.keyType)) { - return Incompatible(Some(keyUnsupportedReason)) - } - if (containsBinary(expr.dataType.valueType)) { - return Incompatible(Some(valueUnsupportedReason)) - } - Compatible(None) - } -} diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 01b9744ed6..88c13391a6 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -25,9 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.BinaryType -import org.apache.comet.serde.CometMapFromEntries import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} class CometMapExpressionSuite extends CometTestBase { @@ -127,47 +125,4 @@ class CometMapExpressionSuite extends CometTestBase { } } - test("map_from_entries") { - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - val filename = path.toString - val random = new Random(42) - withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - val schemaGenOptions = - SchemaGenOptions( - generateArray = true, - generateStruct = true, - primitiveTypes = SchemaGenOptions.defaultPrimitiveTypes.filterNot(_ == BinaryType)) - val dataGenOptions = DataGenOptions(allowNull = false, generateNegativeZero = false) - ParquetGenerator.makeParquetFile( - random, - spark, - filename, - 100, - schemaGenOptions, - dataGenOptions) - } - val df = spark.read.parquet(filename) - df.createOrReplaceTempView("t1") - for (field <- df.schema.fieldNames) { - checkSparkAnswerAndOperator( - spark.sql(s"SELECT map_from_entries(array(struct($field as a, $field as b))) FROM t1")) - } - } - } - - test("map_from_entries - fallback for binary type") { - val table = "t2" - withTable(table) { - sql( - s"create table $table using parquet as select cast(array() as array) as c1 from range(10)") - checkSparkAnswerAndFallbackReason( - sql(s"select map_from_entries(array(struct(c1, 0))) from $table"), - CometMapFromEntries.keyUnsupportedReason) - checkSparkAnswerAndFallbackReason( - sql(s"select map_from_entries(array(struct(0, c1))) from $table"), - CometMapFromEntries.valueUnsupportedReason) - } - } - } From 21a577169f115d1dd8d751099bd60c52f2350cad Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 19 May 2026 10:06:28 +0400 Subject: [PATCH 3/8] WORK --- .../src/json_funcs/json_array_length.rs | 96 +++++++++++++++++++ native/spark-expr/src/json_funcs/mod.rs | 1 + 2 files changed, 97 insertions(+) create mode 100644 native/spark-expr/src/json_funcs/json_array_length.rs diff --git a/native/spark-expr/src/json_funcs/json_array_length.rs b/native/spark-expr/src/json_funcs/json_array_length.rs new file mode 100644 index 0000000000..8eb0e29c8d --- /dev/null +++ b/native/spark-expr/src/json_funcs/json_array_length.rs @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use arrow::array::ArrayRef; +use arrow::datatypes::DataType; +use datafusion::common::{exec_err, Result, ScalarValue}; +use datafusion::common::cast::as_string_array; +use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility}; + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct JsonArrayLength { + signature: Signature, +} + +impl Default for JsonArrayLength { + fn default() -> Self { + Self::new() + } +} + +impl JsonArrayLength { + pub fn new() -> Self { + Self { + signature: Signature::variadic(vec![DataType::Utf8], Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for JsonArrayLength { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "json_array_length" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Int32) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + spark_json_array_length(&args.args) + } +} + +fn spark_json_array_length(args: &[ColumnarValue]) -> Result { + if args.len() != 1 { + return exec_err!("json_array_length function takes exactly one argument"); + } + match &args[0] { + ColumnarValue::Array(array) => { + let result = spark_json_array_length_array(array)?; + Ok(ColumnarValue::Array(result)) + } + ColumnarValue::Scalar(scalar) => { + let result = spark_json_array_length_scalar(scalar)?; + Ok(ColumnarValue::Scalar(result)) + } + } +} + +fn spark_json_array_length_array(array: &ArrayRef) -> Result { + match array.data_type() { + DataType::Utf8 => { + let array = as_string_array(array)?; + Int32Array; + } + other => { + exec_err!("Unsupported data type {other:?} for function `json_array_length`") + } + } +} + +fn spark_json_array_length_scalar(scalar: &ScalarValue) -> Result { + unimplemented!() +} diff --git a/native/spark-expr/src/json_funcs/mod.rs b/native/spark-expr/src/json_funcs/mod.rs index 9f025070d7..eaa29f9bab 100644 --- a/native/spark-expr/src/json_funcs/mod.rs +++ b/native/spark-expr/src/json_funcs/mod.rs @@ -17,6 +17,7 @@ mod from_json; mod to_json; +mod json_array_length; pub use from_json::FromJson; pub use to_json::ToJson; From 57076f4fc972315db0403a0dc6876ea605bbac2d Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 19 May 2026 14:56:52 +0400 Subject: [PATCH 4/8] feat: impl json_array_length --- native/spark-expr/src/comet_scalar_funcs.rs | 2 + .../src/json_funcs/json_array_length.rs | 53 ++++++++++++++-- native/spark-expr/src/json_funcs/mod.rs | 3 +- .../apache/comet/serde/QueryPlanSerde.scala | 5 +- .../org/apache/comet/serde/statics.scala | 5 +- .../expressions/json/json_array_length.sql | 61 +++++++++++++++++++ 6 files changed, 120 insertions(+), 9 deletions(-) create mode 100644 spark/src/test/resources/sql-tests/expressions/json/json_array_length.sql diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index 9ecb11dc52..bddd439666 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -16,6 +16,7 @@ // under the License. use crate::hash_funcs::*; +use crate::json_funcs::JsonArrayLength; use crate::map_funcs::spark_map_sort; use crate::math_funcs::abs::abs; use crate::math_funcs::checked_arithmetic::{checked_add, checked_div, checked_mul, checked_sub}; @@ -216,6 +217,7 @@ fn all_scalar_functions() -> Vec> { Arc::new(ScalarUDF::new_from_impl(SparkMakeDate::default())), Arc::new(ScalarUDF::new_from_impl(SparkSecondsToTimestamp::default())), Arc::new(ScalarUDF::new_from_impl(SparkSizeFunc::default())), + Arc::new(ScalarUDF::new_from_impl(JsonArrayLength::default())), ] } diff --git a/native/spark-expr/src/json_funcs/json_array_length.rs b/native/spark-expr/src/json_funcs/json_array_length.rs index 8eb0e29c8d..289918363d 100644 --- a/native/spark-expr/src/json_funcs/json_array_length.rs +++ b/native/spark-expr/src/json_funcs/json_array_length.rs @@ -15,12 +15,16 @@ // specific language governing permissions and limitations // under the License. -use std::any::Any; -use arrow::array::ArrayRef; +use arrow::array::{Array, ArrayRef, Int32Builder}; use arrow::datatypes::DataType; -use datafusion::common::{exec_err, Result, ScalarValue}; use datafusion::common::cast::as_string_array; -use datafusion::logical_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility}; +use datafusion::common::{exec_err, Result, ScalarValue}; +use datafusion::logical_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; + +use std::any::Any; +use std::sync::Arc; #[derive(Debug, PartialEq, Eq, Hash)] pub struct JsonArrayLength { @@ -83,7 +87,21 @@ fn spark_json_array_length_array(array: &ArrayRef) -> Result { match array.data_type() { DataType::Utf8 => { let array = as_string_array(array)?; - Int32Array; + let mut builder = Int32Builder::with_capacity(array.len()); + + for row_idx in 0..array.len() { + if array.is_null(row_idx) { + builder.append_null(); + } else { + let json_str = array.value(row_idx); + if let Some(json_array_length) = get_json_array_length(json_str) { + builder.append_value(json_array_length); + } else { + builder.append_null() + } + } + } + Ok(Arc::new(builder.finish())) } other => { exec_err!("Unsupported data type {other:?} for function `json_array_length`") @@ -92,5 +110,28 @@ fn spark_json_array_length_array(array: &ArrayRef) -> Result { } fn spark_json_array_length_scalar(scalar: &ScalarValue) -> Result { - unimplemented!() + match scalar { + ScalarValue::Utf8(value) => { + let length = value + .clone() + .and_then(|json_str| get_json_array_length(&json_str)); + Ok(ScalarValue::Int32(length)) + } + other => { + exec_err!("Unsupported data type {other:?} for function `json_array_length`") + } + } +} + +fn get_json_array_length(json_str: &str) -> Option { + match serde_json::from_str::(json_str) { + Ok(json_value) => { + if json_value.is_array() { + Some(json_value.as_array().unwrap().len() as i32) + } else { + None + } + } + Err(_) => None, + } } diff --git a/native/spark-expr/src/json_funcs/mod.rs b/native/spark-expr/src/json_funcs/mod.rs index eaa29f9bab..59e1e5dd58 100644 --- a/native/spark-expr/src/json_funcs/mod.rs +++ b/native/spark-expr/src/json_funcs/mod.rs @@ -16,8 +16,9 @@ // under the License. mod from_json; -mod to_json; mod json_array_length; +mod to_json; pub use from_json::FromJson; +pub use json_array_length::JsonArrayLength; pub use to_json::ToJson; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index f9acad1d89..93f2250d77 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -247,6 +247,9 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { private val conversionExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( classOf[Cast] -> CometCast) + private val jsonExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( + classOf[LengthOfJsonArray] -> CometScalarFunction("json_array_length")) + private[comet] val miscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( // TODO PromotePrecision classOf[Alias] -> CometAlias, @@ -271,7 +274,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { mathExpressions ++ hashExpressions ++ stringExpressions ++ conditionalExpressions ++ mapExpressions ++ predicateExpressions ++ structExpressions ++ bitwiseExpressions ++ miscExpressions ++ arrayExpressions ++ - temporalExpressions ++ conversionExpressions + temporalExpressions ++ conversionExpressions ++ jsonExpressions /** * Mapping of Spark aggregate expression class to Comet expression handler. diff --git a/spark/src/main/scala/org/apache/comet/serde/statics.scala b/spark/src/main/scala/org/apache/comet/serde/statics.scala index bff64e753a..9aa315764f 100644 --- a/spark/src/main/scala/org/apache/comet/serde/statics.scala +++ b/spark/src/main/scala/org/apache/comet/serde/statics.scala @@ -20,6 +20,7 @@ package org.apache.comet.serde import org.apache.spark.sql.catalyst.expressions.{Attribute, ExpressionImplUtils, Literal, UrlCodec} +import org.apache.spark.sql.catalyst.expressions.json.JsonExpressionUtils import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils @@ -38,7 +39,9 @@ object CometStaticInvoke extends CometExpressionSerde[StaticInvoke] { "read_side_padding"), ("isLuhnNumber", classOf[ExpressionImplUtils]) -> CometScalarFunction("luhn_check"), ("encode", UrlCodec.getClass) -> CometUrlEncodeStaticInvoke, - ("decode", UrlCodec.getClass) -> CometUrlDecodeStaticInvoke) + ("decode", UrlCodec.getClass) -> CometUrlDecodeStaticInvoke, + ("lengthOfJsonArray", classOf[JsonExpressionUtils]) -> CometScalarFunction( + "json_array_length")) override def convert( expr: StaticInvoke, diff --git a/spark/src/test/resources/sql-tests/expressions/json/json_array_length.sql b/spark/src/test/resources/sql-tests/expressions/json/json_array_length.sql new file mode 100644 index 0000000000..5d2e82f2cd --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/json/json_array_length.sql @@ -0,0 +1,61 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +statement +CREATE TABLE test_json_array_length(j string) USING parquet + +statement +INSERT INTO test_json_array_length VALUES + ('[1,2,3,4]'), + ('[]'), + ('[1]'), + (NULL), + ('[1,2,3,{"f1":1,"f2":[5,6]},4]'), + ('[[1,2],[3,4],[5,6]]'), + ('[{"a":1},{"b":2},{"c":3}]'), + ('[1,2'), + ('[1,2,3,]'), + ('not a json'), + ('{"object": "not array"}'), + (''), + (' '), + ('[true, false, null]'), + ('["string1", "string2", "string3"]'), + ('[1, "mixed", true, null, {"key":"value"}]'), + ('[1,2,3,4,5,6,7,8,9,10]'), + ('["line1\nline2", "tab\tseparated", "quote\"here"]'), + ('{"outer": [1,2,3], "inner": [[1,2],[3,4]]}'), + ('{"arrays": {"first": [1,2], "second": [3,4,5]}}'), + ('[{"arr": [1,2,3]}, {"arr": [4,5]}]') + +query +SELECT json_array_length(j) FROM test_json_array_length + +query +SELECT json_array_length('[1,2,3,4]') + +query +SELECT json_array_length('not an array') + +query +SELECT json_array_length('{"key":"value"}') + +query +SELECT json_array_length(NULL) + +query +SELECT json_array_length('[]') From 678e41760cfacb716d3ecd4a82aed387e5d00322 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Mon, 25 May 2026 21:42:17 +0400 Subject: [PATCH 5/8] fix --- .../scala/org/apache/comet/serde/json.scala | 36 +++++++++++++++++++ .../org/apache/comet/serde/statics.scala | 5 +-- .../apache/comet/shims/CometExprShim.scala | 15 ++++++++ .../apache/comet/shims/CometExprShim.scala | 14 +++++++- .../apache/comet/shims/CometExprShim.scala | 12 +++++++ .../expressions/json/json_array_length.sql | 5 +++ .../comet/CometJsonExpressionSuite.scala | 11 ++++-- .../org/apache/spark/sql/CometTestBase.scala | 1 + 8 files changed, 92 insertions(+), 7 deletions(-) create mode 100644 spark/src/main/scala/org/apache/comet/serde/json.scala diff --git a/spark/src/main/scala/org/apache/comet/serde/json.scala b/spark/src/main/scala/org/apache/comet/serde/json.scala new file mode 100644 index 0000000000..5f296599d6 --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/serde/json.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.serde + +import org.apache.spark.sql.catalyst.expressions.LengthOfJsonArray + +object CometLengthOfJsonArray + extends CometScalarFunction[LengthOfJsonArray]("json_array_length") { + + private val IncompatibleReason: String = + "Spark's lenient JSON parser allows single quotes, unescaped controls, " + + "and trailing content, " + + "while Comet's serde_json requires strict JSON." + + override def getIncompatibleReasons(): Seq[String] = Seq(IncompatibleReason) + + override def getSupportLevel(expr: LengthOfJsonArray): SupportLevel = Incompatible( + Some(IncompatibleReason)) +} diff --git a/spark/src/main/scala/org/apache/comet/serde/statics.scala b/spark/src/main/scala/org/apache/comet/serde/statics.scala index 9aa315764f..bff64e753a 100644 --- a/spark/src/main/scala/org/apache/comet/serde/statics.scala +++ b/spark/src/main/scala/org/apache/comet/serde/statics.scala @@ -20,7 +20,6 @@ package org.apache.comet.serde import org.apache.spark.sql.catalyst.expressions.{Attribute, ExpressionImplUtils, Literal, UrlCodec} -import org.apache.spark.sql.catalyst.expressions.json.JsonExpressionUtils import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils @@ -39,9 +38,7 @@ object CometStaticInvoke extends CometExpressionSerde[StaticInvoke] { "read_side_padding"), ("isLuhnNumber", classOf[ExpressionImplUtils]) -> CometScalarFunction("luhn_check"), ("encode", UrlCodec.getClass) -> CometUrlEncodeStaticInvoke, - ("decode", UrlCodec.getClass) -> CometUrlDecodeStaticInvoke, - ("lengthOfJsonArray", classOf[JsonExpressionUtils]) -> CometScalarFunction( - "json_array_length")) + ("decode", UrlCodec.getClass) -> CometUrlDecodeStaticInvoke) override def convert( expr: StaticInvoke, diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala index c159bde0de..1248ced8ae 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala @@ -156,6 +156,21 @@ trait CometExprShim extends CommonStringExprs { case _ => None } + case s: StaticInvoke => + (s.staticObject, s.functionName, s.arguments) match { + case (cls, "lengthOfJsonArray", Seq(child)) if cls == classOf[JsonExpressionUtils] => + val lengthOfJsonArray = LengthOfJsonArray(child) + val exprProto = exprToProtoInternal(lengthOfJsonArray, inputs, binding) + lengthOfJsonArray + .getTagValue(CometExplainInfo.EXTENSION_INFO) + .foreach { reasons => + println(s"Reasons: $reasons") + s.setTagValue(CometExplainInfo.EXTENSION_INFO, reasons) + } + exprProto + case _ => None + } + case ms: MapSort => val keyType = ms.dataType.asInstanceOf[MapType].keyType if (!supportedScalarSortElementType(keyType)) { diff --git a/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala index 9d6092aa5c..cad9305b9c 100644 --- a/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala @@ -21,7 +21,7 @@ package org.apache.comet.shims import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.Sum -import org.apache.spark.sql.catalyst.expressions.json.StructsToJsonEvaluator +import org.apache.spark.sql.catalyst.expressions.json.{JsonExpressionUtils, StructsToJsonEvaluator} import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke} import org.apache.spark.sql.catalyst.expressions.url.ParseUrlEvaluator import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -187,6 +187,18 @@ trait CometExprShim extends CommonStringExprs { case _ => None } + case s: StaticInvoke => + (s.staticObject, s.functionName, s.arguments) match { + case (cls, "lengthOfJsonArray", Seq(child)) if cls == classOf[JsonExpressionUtils] => + val lengthOfJsonArray = LengthOfJsonArray(child) + val exprProto = exprToProtoInternal(lengthOfJsonArray, inputs, binding) + lengthOfJsonArray + .getTagValue(CometExplainInfo.EXTENSION_INFO) + .foreach(reasons => s.setTagValue(CometExplainInfo.EXTENSION_INFO, reasons)) + exprProto + case _ => None + } + case ms: MapSort => val keyType = ms.dataType.asInstanceOf[MapType].keyType if (!supportedScalarSortElementType(keyType)) { diff --git a/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala index 9d6092aa5c..5a5a4bbeb9 100644 --- a/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala @@ -187,6 +187,18 @@ trait CometExprShim extends CommonStringExprs { case _ => None } + case s: StaticInvoke => + (s.staticObject, s.functionName, s.arguments) match { + case (cls, "lengthOfJsonArray", Seq(child)) if cls == classOf[JsonExpressionUtils] => + val lengthOfJsonArray = LengthOfJsonArray(child) + val exprProto = exprToProtoInternal(lengthOfJsonArray, inputs, binding) + lengthOfJsonArray + .getTagValue(CometExplainInfo.EXTENSION_INFO) + .foreach(reasons => s.setTagValue(CometExplainInfo.EXTENSION_INFO, reasons)) + exprProto + case _ => None + } + case ms: MapSort => val keyType = ms.dataType.asInstanceOf[MapType].keyType if (!supportedScalarSortElementType(keyType)) { diff --git a/spark/src/test/resources/sql-tests/expressions/json/json_array_length.sql b/spark/src/test/resources/sql-tests/expressions/json/json_array_length.sql index 5d2e82f2cd..d6fb9f69f0 100644 --- a/spark/src/test/resources/sql-tests/expressions/json/json_array_length.sql +++ b/spark/src/test/resources/sql-tests/expressions/json/json_array_length.sql @@ -15,6 +15,8 @@ -- specific language governing permissions and limitations -- under the License. +-- Config: spark.comet.expression.LengthOfJsonArray.allowIncompatible=true + statement CREATE TABLE test_json_array_length(j string) USING parquet @@ -59,3 +61,6 @@ SELECT json_array_length(NULL) query SELECT json_array_length('[]') + +query expect_fallback(Using BinaryType as Map values is not allowed in map_from_entries) +SELECT json_array_length("[{'key':'value'}]") diff --git a/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala index c2c1ce3663..ef02d8cc42 100644 --- a/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala @@ -26,7 +26,7 @@ import org.scalatest.Tag import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase -import org.apache.spark.sql.catalyst.expressions.{JsonToStructs, StructsToJson} +import org.apache.spark.sql.catalyst.expressions.{JsonToStructs, LengthOfJsonArray, StructsToJson} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions._ @@ -40,7 +40,8 @@ class CometJsonExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe super.test(testName, testTags: _*) { withSQLConf( CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) -> "true", - CometConf.getExprAllowIncompatConfigKey(classOf[StructsToJson]) -> "true") { + CometConf.getExprAllowIncompatConfigKey(classOf[StructsToJson]) -> "true", + CometConf.getExprAllowIncompatConfigKey(classOf[LengthOfJsonArray]) -> "false") { testFun } } @@ -194,4 +195,10 @@ class CometJsonExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe } } } + + test("test") { + withParquetTable(Seq((1, """"[{'key':'value'}]"""")), "t1", withDictionary = false) { + checkSparkAnswerAndFallbackReason(sql("SELECT json_array_length(_2) from t1"), "error") + } + } } diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index dd4aab0e6b..2c90114d10 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -321,6 +321,7 @@ abstract class CometTestBase val (sparkPlan, cometPlan) = internalCheckSparkAnswer(df, assertCometNative = false) val explainInfo = new ExtendedExplainInfo() val actualFallbacks = explainInfo.getFallbackReasons(cometPlan) + println(s"BOOOOOOOM: $actualFallbacks") for (reason <- fallbackReasons) { if (!actualFallbacks.exists(_.contains(reason))) { if (actualFallbacks.isEmpty) { From 88b5d7118927533220631e7cfec51a26a5452116 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 26 May 2026 14:49:54 +0400 Subject: [PATCH 6/8] fix PR issues --- .../org/apache/comet/serde/QueryPlanSerde.scala | 5 ++++- .../org/apache/comet/shims/CometExprShim.scala | 11 +++++------ .../org/apache/comet/shims/CometExprShim.scala | 8 +++++--- .../org/apache/comet/shims/CometExprShim.scala | 8 +++++--- .../expressions/json/json_array_length.sql | 16 ++++++++-------- .../apache/comet/CometJsonExpressionSuite.scala | 11 ++--------- .../org/apache/spark/sql/CometTestBase.scala | 1 - 7 files changed, 29 insertions(+), 31 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index c01f379cc7..9f54d1c397 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -257,7 +257,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { classOf[Cast] -> CometCast) private val jsonExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( - classOf[LengthOfJsonArray] -> CometScalarFunction("json_array_length")) + classOf[LengthOfJsonArray] -> CometLengthOfJsonArray) private[comet] val miscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( // TODO PromotePrecision @@ -708,6 +708,9 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { } handler.convert(expr, inputs, binding) } else { + // scalastyle:off println + println("BOOOOOOOOOOOM_1: " + expr) + // scalastyle:on println line=586 column=14 val optionalNotes = notes.map(str => s" ($str)").getOrElse("") withInfo( expr, diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala index 1248ced8ae..00c38f547a 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala @@ -161,12 +161,11 @@ trait CometExprShim extends CommonStringExprs { case (cls, "lengthOfJsonArray", Seq(child)) if cls == classOf[JsonExpressionUtils] => val lengthOfJsonArray = LengthOfJsonArray(child) val exprProto = exprToProtoInternal(lengthOfJsonArray, inputs, binding) - lengthOfJsonArray - .getTagValue(CometExplainInfo.EXTENSION_INFO) - .foreach { reasons => - println(s"Reasons: $reasons") - s.setTagValue(CometExplainInfo.EXTENSION_INFO, reasons) - } + if (exprProto.isEmpty) { + lengthOfJsonArray + .getTagValue(CometExplainInfo.EXTENSION_INFO) + .foreach(reasons => s.setTagValue(CometExplainInfo.EXTENSION_INFO, reasons)) + } exprProto case _ => None } diff --git a/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala index cad9305b9c..3bfffdf23b 100644 --- a/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala @@ -192,9 +192,11 @@ trait CometExprShim extends CommonStringExprs { case (cls, "lengthOfJsonArray", Seq(child)) if cls == classOf[JsonExpressionUtils] => val lengthOfJsonArray = LengthOfJsonArray(child) val exprProto = exprToProtoInternal(lengthOfJsonArray, inputs, binding) - lengthOfJsonArray - .getTagValue(CometExplainInfo.EXTENSION_INFO) - .foreach(reasons => s.setTagValue(CometExplainInfo.EXTENSION_INFO, reasons)) + if (exprProto.isEmpty) { + lengthOfJsonArray + .getTagValue(CometExplainInfo.EXTENSION_INFO) + .foreach(reasons => s.setTagValue(CometExplainInfo.EXTENSION_INFO, reasons)) + } exprProto case _ => None } diff --git a/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala index 5a5a4bbeb9..471956a4e6 100644 --- a/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala @@ -192,9 +192,11 @@ trait CometExprShim extends CommonStringExprs { case (cls, "lengthOfJsonArray", Seq(child)) if cls == classOf[JsonExpressionUtils] => val lengthOfJsonArray = LengthOfJsonArray(child) val exprProto = exprToProtoInternal(lengthOfJsonArray, inputs, binding) - lengthOfJsonArray - .getTagValue(CometExplainInfo.EXTENSION_INFO) - .foreach(reasons => s.setTagValue(CometExplainInfo.EXTENSION_INFO, reasons)) + if (exprProto.isEmpty) { + lengthOfJsonArray + .getTagValue(CometExplainInfo.EXTENSION_INFO) + .foreach(reasons => s.setTagValue(CometExplainInfo.EXTENSION_INFO, reasons)) + } exprProto case _ => None } diff --git a/spark/src/test/resources/sql-tests/expressions/json/json_array_length.sql b/spark/src/test/resources/sql-tests/expressions/json/json_array_length.sql index d6fb9f69f0..eb36d8bd56 100644 --- a/spark/src/test/resources/sql-tests/expressions/json/json_array_length.sql +++ b/spark/src/test/resources/sql-tests/expressions/json/json_array_length.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --- Config: spark.comet.expression.LengthOfJsonArray.allowIncompatible=true +-- Config: spark.comet.expression.LengthOfJsonArray.allowIncompatible=false statement CREATE TABLE test_json_array_length(j string) USING parquet @@ -44,23 +44,23 @@ INSERT INTO test_json_array_length VALUES ('{"arrays": {"first": [1,2], "second": [3,4,5]}}'), ('[{"arr": [1,2,3]}, {"arr": [4,5]}]') -query +query spark_answer_only SELECT json_array_length(j) FROM test_json_array_length -query +query spark_answer_only SELECT json_array_length('[1,2,3,4]') -query +query spark_answer_only SELECT json_array_length('not an array') -query +query spark_answer_only SELECT json_array_length('{"key":"value"}') -query +query spark_answer_only SELECT json_array_length(NULL) -query +query spark_answer_only SELECT json_array_length('[]') -query expect_fallback(Using BinaryType as Map values is not allowed in map_from_entries) +query expect_fallback(Spark's lenient JSON parser allows single quotes, unescaped controls, and trailing content, while Comet's serde_json requires strict JSON.) SELECT json_array_length("[{'key':'value'}]") diff --git a/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala index ef02d8cc42..c2c1ce3663 100644 --- a/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala @@ -26,7 +26,7 @@ import org.scalatest.Tag import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase -import org.apache.spark.sql.catalyst.expressions.{JsonToStructs, LengthOfJsonArray, StructsToJson} +import org.apache.spark.sql.catalyst.expressions.{JsonToStructs, StructsToJson} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions._ @@ -40,8 +40,7 @@ class CometJsonExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe super.test(testName, testTags: _*) { withSQLConf( CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) -> "true", - CometConf.getExprAllowIncompatConfigKey(classOf[StructsToJson]) -> "true", - CometConf.getExprAllowIncompatConfigKey(classOf[LengthOfJsonArray]) -> "false") { + CometConf.getExprAllowIncompatConfigKey(classOf[StructsToJson]) -> "true") { testFun } } @@ -195,10 +194,4 @@ class CometJsonExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe } } } - - test("test") { - withParquetTable(Seq((1, """"[{'key':'value'}]"""")), "t1", withDictionary = false) { - checkSparkAnswerAndFallbackReason(sql("SELECT json_array_length(_2) from t1"), "error") - } - } } diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 2c90114d10..dd4aab0e6b 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -321,7 +321,6 @@ abstract class CometTestBase val (sparkPlan, cometPlan) = internalCheckSparkAnswer(df, assertCometNative = false) val explainInfo = new ExtendedExplainInfo() val actualFallbacks = explainInfo.getFallbackReasons(cometPlan) - println(s"BOOOOOOOM: $actualFallbacks") for (reason <- fallbackReasons) { if (!actualFallbacks.exists(_.contains(reason))) { if (actualFallbacks.isEmpty) { From d4936a9edf90cc17a0ccb54c39fc2947a06c26e7 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 26 May 2026 14:51:42 +0400 Subject: [PATCH 7/8] fix PR issues --- .../src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 9f54d1c397..965070c314 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -708,9 +708,6 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { } handler.convert(expr, inputs, binding) } else { - // scalastyle:off println - println("BOOOOOOOOOOOM_1: " + expr) - // scalastyle:on println line=586 column=14 val optionalNotes = notes.map(str => s" ($str)").getOrElse("") withInfo( expr, From 09dc0d433c1e4f393ec216c22a29aa7d078d4e1b Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Wed, 27 May 2026 22:19:01 +0400 Subject: [PATCH 8/8] fix PR issues --- native/Cargo.lock | 1 + native/spark-expr/Cargo.toml | 1 + .../src/json_funcs/json_array_length.rs | 90 ++++++++++++------- .../expressions/json/json_array_length.sql | 2 - .../CometLengthOfJsonArrayBenchmark.scala | 71 +++++++++++++++ 5 files changed, 129 insertions(+), 36 deletions(-) create mode 100644 spark/src/test/scala/org/apache/spark/sql/benchmark/CometLengthOfJsonArrayBenchmark.scala diff --git a/native/Cargo.lock b/native/Cargo.lock index 11e9b1ccff..9cd5f15878 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -2140,6 +2140,7 @@ dependencies = [ "num", "rand 0.10.1", "regex", + "serde", "serde_json", "tokio", "twox-hash", diff --git a/native/spark-expr/Cargo.toml b/native/spark-expr/Cargo.toml index 1b0359059c..800fe3ecb1 100644 --- a/native/spark-expr/Cargo.toml +++ b/native/spark-expr/Cargo.toml @@ -35,6 +35,7 @@ num = { workspace = true } regex = { workspace = true } # preserve_order: needed for get_json_object to match Spark's JSON key ordering serde_json = { version = "1.0", features = ["preserve_order"] } +serde = { version = "1.0", features = ["derive"] } datafusion-comet-common = { workspace = true } datafusion-comet-jni-bridge = { workspace = true } jni = "0.22.4" diff --git a/native/spark-expr/src/json_funcs/json_array_length.rs b/native/spark-expr/src/json_funcs/json_array_length.rs index 289918363d..eef879da3c 100644 --- a/native/spark-expr/src/json_funcs/json_array_length.rs +++ b/native/spark-expr/src/json_funcs/json_array_length.rs @@ -15,15 +15,19 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::{Array, ArrayRef, Int32Builder}; +use arrow::array::{Array, ArrayRef, Int32Builder, OffsetSizeTrait}; use arrow::datatypes::DataType; -use datafusion::common::cast::as_string_array; +use datafusion::common::cast::as_generic_string_array; use datafusion::common::{exec_err, Result, ScalarValue}; use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; use std::any::Any; + +use serde::de::{IgnoredAny, SeqAccess, Visitor}; +use serde::Deserializer; +use std::fmt; use std::sync::Arc; #[derive(Debug, PartialEq, Eq, Hash)] @@ -40,7 +44,10 @@ impl Default for JsonArrayLength { impl JsonArrayLength { pub fn new() -> Self { Self { - signature: Signature::variadic(vec![DataType::Utf8], Volatility::Immutable), + signature: Signature::variadic( + vec![DataType::Utf8, DataType::LargeUtf8], + Volatility::Immutable, + ), } } } @@ -85,24 +92,8 @@ fn spark_json_array_length(args: &[ColumnarValue]) -> Result { fn spark_json_array_length_array(array: &ArrayRef) -> Result { match array.data_type() { - DataType::Utf8 => { - let array = as_string_array(array)?; - let mut builder = Int32Builder::with_capacity(array.len()); - - for row_idx in 0..array.len() { - if array.is_null(row_idx) { - builder.append_null(); - } else { - let json_str = array.value(row_idx); - if let Some(json_array_length) = get_json_array_length(json_str) { - builder.append_value(json_array_length); - } else { - builder.append_null() - } - } - } - Ok(Arc::new(builder.finish())) - } + DataType::Utf8 => spark_json_array_length_array_inner::(array), + DataType::LargeUtf8 => spark_json_array_length_array_inner::(array), other => { exec_err!("Unsupported data type {other:?} for function `json_array_length`") } @@ -111,27 +102,58 @@ fn spark_json_array_length_array(array: &ArrayRef) -> Result { fn spark_json_array_length_scalar(scalar: &ScalarValue) -> Result { match scalar { - ScalarValue::Utf8(value) => { - let length = value - .clone() - .and_then(|json_str| get_json_array_length(&json_str)); - Ok(ScalarValue::Int32(length)) - } + ScalarValue::Utf8(value) => spark_json_array_length_scalar_inner(value), + ScalarValue::LargeUtf8(value) => spark_json_array_length_scalar_inner(value), other => { exec_err!("Unsupported data type {other:?} for function `json_array_length`") } } } -fn get_json_array_length(json_str: &str) -> Option { - match serde_json::from_str::(json_str) { - Ok(json_value) => { - if json_value.is_array() { - Some(json_value.as_array().unwrap().len() as i32) +fn spark_json_array_length_scalar_inner(json_str: &Option) -> Result { + let array_length = json_str + .clone() + .and_then(|json_str| get_json_array_length(&json_str)); + Ok(ScalarValue::Int32(array_length)) +} + +fn spark_json_array_length_array_inner(array: &ArrayRef) -> Result { + let str_array = as_generic_string_array::(array)?; + let mut builder = Int32Builder::with_capacity(str_array.len()); + for row_idx in 0..str_array.len() { + if str_array.is_null(row_idx) { + builder.append_null(); + } else { + let json_str = str_array.value(row_idx); + if let Some(json_array_length) = get_json_array_length(json_str) { + builder.append_value(json_array_length); } else { - None + builder.append_null() } } - Err(_) => None, } + Ok(Arc::new(builder.finish())) +} + +struct ArrayItemCounter; + +impl<'de> Visitor<'de> for ArrayItemCounter { + type Value = i32; + + fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("a JSON array") + } + + fn visit_seq>(self, mut seq: A) -> Result { + let mut len = 0i32; + while seq.next_element::()?.is_some() { + len += 1; + } + Ok(len) + } +} + +fn get_json_array_length(json: &str) -> Option { + let mut deserializer = serde_json::Deserializer::from_str(json); + deserializer.deserialize_seq(ArrayItemCounter).ok() } diff --git a/spark/src/test/resources/sql-tests/expressions/json/json_array_length.sql b/spark/src/test/resources/sql-tests/expressions/json/json_array_length.sql index eb36d8bd56..9b7b332e50 100644 --- a/spark/src/test/resources/sql-tests/expressions/json/json_array_length.sql +++ b/spark/src/test/resources/sql-tests/expressions/json/json_array_length.sql @@ -15,8 +15,6 @@ -- specific language governing permissions and limitations -- under the License. --- Config: spark.comet.expression.LengthOfJsonArray.allowIncompatible=false - statement CREATE TABLE test_json_array_length(j string) USING parquet diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometLengthOfJsonArrayBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometLengthOfJsonArrayBenchmark.scala new file mode 100644 index 0000000000..8c09ce01cf --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometLengthOfJsonArrayBenchmark.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +import org.apache.spark.sql.catalyst.expressions.LengthOfJsonArray + +import org.apache.comet.CometConf + +/** + * Benchmark to measure performance of Comet json_array_length expression. To run this benchmark: + * {{{ + * SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometLengthOfJsonArrayBenchmark + * }}} + * Results will be written to "spark/benchmarks/CometLengthOfJsonArray-**results.txt". + */ +object CometLengthOfJsonArrayBenchmark extends CometBenchmarkBase { + + override def runCometBenchmark(args: Array[String]): Unit = { + val numRows = 1024 * 1024 + runBenchmarkWithTable("json_array_length", numRows) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + import spark.implicits._ + prepareTable( + dir, + spark + .range(numRows) + .map { i => + val arrayLength = (i % 100).toInt + (0 until arrayLength) + .map(j => s""""item_${i}_$j"""") + .mkString("[", ",", "]") + } + .toDF("c1")) + + val extraConfigs = + Map(CometConf.getExprAllowIncompatConfigKey(classOf[LengthOfJsonArray]) -> "true") + + val benchmarks = List( + StringExprConfig( + "get json array length", + "select json_array_length(c1) from parquetV1Table", + extraConfigs)) + + benchmarks.foreach { config => + runBenchmark(config.name) { + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } + } +}