From 168ed58c78b08c80067725888fba361134c3214c Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Fri, 3 Apr 2026 11:34:18 -0700 Subject: [PATCH 1/4] chor: enable arrays_overlap --- .../source/user-guide/latest/compatibility.md | 3 - docs/source/user-guide/latest/expressions.md | 2 +- .../scala/org/apache/comet/serde/arrays.scala | 8 --- .../expressions/array/arrays_overlap.sql | 4 +- .../comet/CometArrayExpressionSuite.scala | 66 ++++++++++++++----- 5 files changed, 52 insertions(+), 31 deletions(-) diff --git a/docs/source/user-guide/latest/compatibility.md b/docs/source/user-guide/latest/compatibility.md index a6cced4e12..94dab0dba3 100644 --- a/docs/source/user-guide/latest/compatibility.md +++ b/docs/source/user-guide/latest/compatibility.md @@ -64,9 +64,6 @@ the [Comet Supported Expressions Guide](expressions.md) for more information on [#3346](https://github.com/apache/datafusion-comet/issues/3346) - **ArrayRemove**: Returns null when the element to remove is null, instead of removing null elements from the array. [#3173](https://github.com/apache/datafusion-comet/issues/3173) -- **ArraysOverlap**: Inconsistent behavior when arrays contain NULL values. - [#3645](https://github.com/apache/datafusion-comet/issues/3645), - [#2036](https://github.com/apache/datafusion-comet/issues/2036) - **ArrayUnion**: Sorts input arrays before performing the union, while Spark preserves the order of the first array and appends unique elements from the second. [#3644](https://github.com/apache/datafusion-comet/issues/3644) diff --git a/docs/source/user-guide/latest/expressions.md b/docs/source/user-guide/latest/expressions.md index 136e1e454f..6db05f4321 100644 --- a/docs/source/user-guide/latest/expressions.md +++ b/docs/source/user-guide/latest/expressions.md @@ -246,7 +246,7 @@ Comet supports using the following aggregate functions within window contexts wi | ArrayRemove | No | Returns null when element is null instead of removing null elements ([#3173](https://github.com/apache/datafusion-comet/issues/3173)) | | ArrayRepeat | No | | | ArrayUnion | No | Behaves differently than spark. Comet sorts the input arrays before performing the union, while Spark preserves the order of the first array and appends unique elements from the second. | -| ArraysOverlap | No | | +| ArraysOverlap | Yes | | | CreateArray | Yes | | | ElementAt | Yes | Input must be an array. Map inputs are not supported. | | Flatten | Yes | | diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index 47a6e91421..2c3aba2d66 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -246,14 +246,6 @@ object CometArrayMin extends CometExpressionSerde[ArrayMin] { } object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] { - - override def getSupportLevel(expr: ArraysOverlap): SupportLevel = - Incompatible( - Some( - "Inconsistent behavior with NULL values" + - " (https://github.com/apache/datafusion-comet/issues/3645)" + - " (https://github.com/apache/datafusion-comet/issues/2036)")) - override def convert( expr: ArraysOverlap, inputs: Seq[Attribute], diff --git a/spark/src/test/resources/sql-tests/expressions/array/arrays_overlap.sql b/spark/src/test/resources/sql-tests/expressions/array/arrays_overlap.sql index 27d28a7402..4b83377457 100644 --- a/spark/src/test/resources/sql-tests/expressions/array/arrays_overlap.sql +++ b/spark/src/test/resources/sql-tests/expressions/array/arrays_overlap.sql @@ -24,11 +24,11 @@ CREATE TABLE test_arrays_overlap(a array, b array) USING parquet statement INSERT INTO test_arrays_overlap VALUES (array(1, 2, 3), array(3, 4, 5)), (array(1, 2), array(3, 4)), (array(), array(1)), (NULL, array(1)), (array(1, NULL), array(NULL, 2)) -query ignore(https://github.com/apache/datafusion-comet/issues/3645) +query SELECT arrays_overlap(a, b) FROM test_arrays_overlap -- column + literal -query ignore(https://github.com/apache/datafusion-comet/issues/3645) +query SELECT arrays_overlap(a, array(3, 4, 5)) FROM test_arrays_overlap -- literal + column diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index fb5531a573..95df0b38eb 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -23,7 +23,7 @@ import scala.util.Random import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase -import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayDistinct, ArrayExcept, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayRepeat, ArraysOverlap, ArrayUnion} +import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayDistinct, ArrayExcept, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayRepeat, ArrayUnion} import org.apache.spark.sql.catalyst.expressions.{ArrayContains, ArrayRemove} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions._ @@ -545,27 +545,59 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } test("arrays_overlap") { - withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[ArraysOverlap]) -> "true") { - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - withTempView("t1") { - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, 10000) - spark.read.parquet(path.toString).createOrReplaceTempView("t1") - checkSparkAnswerAndOperator(sql( - "SELECT arrays_overlap(array(_2, _3, _4), array(_3, _4)) from t1 where _2 is not null")) - checkSparkAnswerAndOperator(sql( - "SELECT arrays_overlap(array('a', null, cast(_1 as string)), array('b', cast(_1 as string), cast(_2 as string))) from t1 where _1 is not null")) - checkSparkAnswerAndOperator(sql( - "SELECT arrays_overlap(array('a', null), array('b', null)) from t1 where _1 is not null")) - checkSparkAnswerAndOperator(spark.sql( - "SELECT arrays_overlap((CASE WHEN _2 =_3 THEN array(_6, _7) END), array(_6, _7)) FROM t1")); - } + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + withTempView("t1") { + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, 10000) + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + checkSparkAnswerAndOperator(sql( + "SELECT arrays_overlap(array(_2, _3, _4), array(_3, _4)) from t1 where _2 is not null")) + checkSparkAnswerAndOperator(sql( + "SELECT arrays_overlap(array('a', null, cast(_1 as string)), array('b', cast(_1 as string), cast(_2 as string))) from t1 where _1 is not null")) + checkSparkAnswerAndOperator(sql( + "SELECT arrays_overlap(array('a', null), array('b', null)) from t1 where _1 is not null")) + checkSparkAnswerAndOperator(spark.sql( + "SELECT arrays_overlap((CASE WHEN _2 =_3 THEN array(_6, _7) END), array(_6, _7)) FROM t1")); } } } } + test("arrays_overlap - null handling behavior verification") { + withTempDir { dir => + withTempView("t1") { + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = false, 100) + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + + // Test case 1: Common element exists - should return true + checkSparkAnswerAndOperator( + sql("SELECT arrays_overlap(array(1, 2, 3), array(3, 4, 5)) from t1 limit 1")) + + // Test case 2: No common elements, no nulls - should return false + checkSparkAnswerAndOperator( + sql("SELECT arrays_overlap(array(1, 2), array(3, 4)) from t1 limit 1")) + + // Test case 3: No common elements, but null exists - Spark returns null (three-valued logic) + checkSparkAnswerAndOperator( + sql("SELECT arrays_overlap(array(1, null, 3), array(4, 5)) from t1 limit 1")) + + // Test case 4: Common element exists even with null - should return true + checkSparkAnswerAndOperator( + sql("SELECT arrays_overlap(array(1, null, 3), array(1, 4)) from t1 limit 1")) + + // Test case 5: Both arrays have null but no common non-null elements + checkSparkAnswerAndOperator( + sql("SELECT arrays_overlap(array(1, null), array(2, null)) from t1 limit 1")) + + // Test case 6: Empty arrays + checkSparkAnswerAndOperator( + sql("SELECT arrays_overlap(array(), array(1, 2)) from t1 limit 1")) + } + } + } + test("array_compact") { // TODO fix for Spark 4.0.0 assume(!isSpark40Plus) From b617476e4624632e365d2f3e14ec7c95eb2f42a0 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Fri, 3 Apr 2026 12:48:47 -0700 Subject: [PATCH 2/4] fix: workaround arrays_overlap --- .../scala/org/apache/comet/serde/arrays.scala | 9 ++-- .../expressions/array/arrays_overlap.sql | 1 - .../comet/CometArrayExpressionSuite.scala | 48 +++++++++---------- 3 files changed, 30 insertions(+), 28 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index 2c3aba2d66..16d38bd520 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import scala.annotation.tailrec -import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayContains, ArrayDistinct, ArrayExcept, ArrayFilter, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayMax, ArrayMin, ArrayRemove, ArrayRepeat, ArraysOverlap, ArrayUnion, Attribute, CreateArray, ElementAt, Expression, Flatten, GetArrayItem, IsNotNull, Literal, Reverse, Size} +import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayContains, ArrayDistinct, ArrayExcept, ArrayFilter, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayMax, ArrayMin, ArrayRemove, ArrayRepeat, ArraysOverlap, ArrayUnion, Attribute, CreateArray, ElementAt, Expression, Flatten, GetArrayItem, If, IsNotNull, IsNull, Literal, Reverse, Size} import org.apache.spark.sql.catalyst.util.GenericArrayData import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -250,8 +250,11 @@ object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] { expr: ArraysOverlap, inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { - val leftArrayExprProto = exprToProto(expr.children.head, inputs, binding) - val rightArrayExprProto = exprToProto(expr.children(1), inputs, binding) + val left = If(IsNull(expr.left), Literal.create(Array(null), expr.left.dataType), expr.left) + val right = + If(IsNull(expr.right), Literal.create(Array(null), expr.right.dataType), expr.right) + val leftArrayExprProto = exprToProto(left, inputs, binding) + val rightArrayExprProto = exprToProto(right, inputs, binding) val arraysOverlapScalarExpr = scalarFunctionExprToProtoWithReturnType( "array_has_any", diff --git a/spark/src/test/resources/sql-tests/expressions/array/arrays_overlap.sql b/spark/src/test/resources/sql-tests/expressions/array/arrays_overlap.sql index 4b83377457..88cf38d211 100644 --- a/spark/src/test/resources/sql-tests/expressions/array/arrays_overlap.sql +++ b/spark/src/test/resources/sql-tests/expressions/array/arrays_overlap.sql @@ -15,7 +15,6 @@ -- specific language governing permissions and limitations -- under the License. --- Config: spark.comet.expression.ArraysOverlap.allowIncompatible=true -- ConfigMatrix: parquet.enable.dictionary=false,true statement diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 95df0b38eb..02be47a16a 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -565,36 +565,36 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } test("arrays_overlap - null handling behavior verification") { - withTempDir { dir => - withTempView("t1") { - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = false, 100) - spark.read.parquet(path.toString).createOrReplaceTempView("t1") + withTable("t") { + sql("create table t using parquet as select CAST(NULL as array) a1 from range(1)") - // Test case 1: Common element exists - should return true - checkSparkAnswerAndOperator( - sql("SELECT arrays_overlap(array(1, 2, 3), array(3, 4, 5)) from t1 limit 1")) + // Test case 1: Common element exists - should return true + checkSparkAnswerAndOperator( + sql("SELECT arrays_overlap(array(1, 2, 3), array(3, 4, 5)) from t")) - // Test case 2: No common elements, no nulls - should return false - checkSparkAnswerAndOperator( - sql("SELECT arrays_overlap(array(1, 2), array(3, 4)) from t1 limit 1")) + // Test case 2: No common elements, no nulls - should return false + checkSparkAnswerAndOperator(sql("SELECT arrays_overlap(array(1, 2), array(3, 4)) from t")) - // Test case 3: No common elements, but null exists - Spark returns null (three-valued logic) - checkSparkAnswerAndOperator( - sql("SELECT arrays_overlap(array(1, null, 3), array(4, 5)) from t1 limit 1")) + // Test case 3: No common elements, but null exists - Spark returns null (three-valued logic) + checkSparkAnswerAndOperator( + sql("SELECT arrays_overlap(array(1, null, 3), array(4, 5)) from t")) - // Test case 4: Common element exists even with null - should return true - checkSparkAnswerAndOperator( - sql("SELECT arrays_overlap(array(1, null, 3), array(1, 4)) from t1 limit 1")) + // Test case 4: Common element exists even with null - should return true + checkSparkAnswerAndOperator( + sql("SELECT arrays_overlap(array(1, null, 3), array(1, 4)) from t")) - // Test case 5: Both arrays have null but no common non-null elements - checkSparkAnswerAndOperator( - sql("SELECT arrays_overlap(array(1, null), array(2, null)) from t1 limit 1")) + // Test case 5: Both arrays have null but no common non-null elements + checkSparkAnswerAndOperator( + sql("SELECT arrays_overlap(array(1, null), array(2, null)) from t")) - // Test case 6: Empty arrays - checkSparkAnswerAndOperator( - sql("SELECT arrays_overlap(array(), array(1, 2)) from t1 limit 1")) - } + // Test case 6: Empty arrays + checkSparkAnswerAndOperator(sql("SELECT arrays_overlap(array(), array(1, 2)) from t")) + + checkSparkAnswerAndOperator(sql("SELECT arrays_overlap(array(null), array(null)) from t")) + + checkSparkAnswerAndOperator(sql("SELECT arrays_overlap(a1, array(1, 2)) from t")) + + checkSparkAnswerAndOperator(sql("SELECT arrays_overlap(a1, a1) from t")) } } From 6a68c52374960ccaeb6548c848e0d892d2fb1ff6 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Fri, 3 Apr 2026 15:04:16 -0700 Subject: [PATCH 3/4] fix: workaround arrays_overlap --- .../scala/org/apache/comet/serde/arrays.scala | 44 ++++++++++++++++--- .../comet/CometArrayExpressionSuite.scala | 44 +++++++------------ 2 files changed, 53 insertions(+), 35 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index 16d38bd520..d9b1369d02 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import scala.annotation.tailrec -import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayContains, ArrayDistinct, ArrayExcept, ArrayFilter, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayMax, ArrayMin, ArrayRemove, ArrayRepeat, ArraysOverlap, ArrayUnion, Attribute, CreateArray, ElementAt, Expression, Flatten, GetArrayItem, If, IsNotNull, IsNull, Literal, Reverse, Size} +import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayContains, ArrayDistinct, ArrayExcept, ArrayFilter, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayMax, ArrayMin, ArrayRemove, ArrayRepeat, ArraysOverlap, ArrayUnion, Attribute, CreateArray, ElementAt, Expression, Flatten, GetArrayItem, IsNotNull, Literal, Reverse, Size} import org.apache.spark.sql.catalyst.util.GenericArrayData import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -250,11 +250,8 @@ object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] { expr: ArraysOverlap, inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { - val left = If(IsNull(expr.left), Literal.create(Array(null), expr.left.dataType), expr.left) - val right = - If(IsNull(expr.right), Literal.create(Array(null), expr.right.dataType), expr.right) - val leftArrayExprProto = exprToProto(left, inputs, binding) - val rightArrayExprProto = exprToProto(right, inputs, binding) + val leftArrayExprProto = exprToProto(expr.left, inputs, binding) + val rightArrayExprProto = exprToProto(expr.right, inputs, binding) val arraysOverlapScalarExpr = scalarFunctionExprToProtoWithReturnType( "array_has_any", @@ -262,7 +259,40 @@ object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] { false, leftArrayExprProto, rightArrayExprProto) - optExprWithInfo(arraysOverlapScalarExpr, expr, expr.children: _*) + + val leftIsNull = createUnaryExpr( + expr, + expr.left, + inputs, + binding, + (builder, unaryExpr) => builder.setIsNull(unaryExpr)) + val rightIsNull = createUnaryExpr( + expr, + expr.right, + inputs, + binding, + (builder, unaryExpr) => builder.setIsNull(unaryExpr)) + + val nullLiteralProto = exprToProto(Literal(null, BooleanType), inputs) + + if (arraysOverlapScalarExpr.isDefined && leftIsNull.isDefined && rightIsNull.isDefined && nullLiteralProto.isDefined) { + val caseWhenExpr = ExprOuterClass.CaseWhen + .newBuilder() + .addWhen(leftIsNull.get) + .addThen(nullLiteralProto.get) + .addWhen(rightIsNull.get) + .addThen(nullLiteralProto.get) + .setElseExpr(arraysOverlapScalarExpr.get) + .build() + Some( + ExprOuterClass.Expr + .newBuilder() + .setCaseWhen(caseWhenExpr) + .build()) + } else { + withInfo(expr, expr.children: _*) + None + } } } diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 02be47a16a..4e28ffda3c 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -567,34 +567,22 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp test("arrays_overlap - null handling behavior verification") { withTable("t") { sql("create table t using parquet as select CAST(NULL as array) a1 from range(1)") - - // Test case 1: Common element exists - should return true - checkSparkAnswerAndOperator( - sql("SELECT arrays_overlap(array(1, 2, 3), array(3, 4, 5)) from t")) - - // Test case 2: No common elements, no nulls - should return false - checkSparkAnswerAndOperator(sql("SELECT arrays_overlap(array(1, 2), array(3, 4)) from t")) - - // Test case 3: No common elements, but null exists - Spark returns null (three-valued logic) - checkSparkAnswerAndOperator( - sql("SELECT arrays_overlap(array(1, null, 3), array(4, 5)) from t")) - - // Test case 4: Common element exists even with null - should return true - checkSparkAnswerAndOperator( - sql("SELECT arrays_overlap(array(1, null, 3), array(1, 4)) from t")) - - // Test case 5: Both arrays have null but no common non-null elements - checkSparkAnswerAndOperator( - sql("SELECT arrays_overlap(array(1, null), array(2, null)) from t")) - - // Test case 6: Empty arrays - checkSparkAnswerAndOperator(sql("SELECT arrays_overlap(array(), array(1, 2)) from t")) - - checkSparkAnswerAndOperator(sql("SELECT arrays_overlap(array(null), array(null)) from t")) - - checkSparkAnswerAndOperator(sql("SELECT arrays_overlap(a1, array(1, 2)) from t")) - - checkSparkAnswerAndOperator(sql("SELECT arrays_overlap(a1, a1) from t")) + val data = Seq( + "array(1, 2, 3)", + "array(3, 4, 5)", + "array(1, 2)", + "array(3, 4)", + "array(1, null, 3)", + "array(4, 5)", + "array(1, 4)", + "array(1, null)", + "array(2, null)", + "array()", + "array(null)", + "a1") + for (y <- data; x <- data) { + checkSparkAnswerAndOperator(sql(s"SELECT arrays_overlap($y, $x) from t")) + } } } From d297ec4ef02327eaedf0650ea284424d54728472 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Fri, 3 Apr 2026 15:46:05 -0700 Subject: [PATCH 4/4] fix: workaround arrays_overlap --- spark/src/main/scala/org/apache/comet/serde/arrays.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index d9b1369d02..0c121e02ca 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -275,7 +275,8 @@ object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] { val nullLiteralProto = exprToProto(Literal(null, BooleanType), inputs) - if (arraysOverlapScalarExpr.isDefined && leftIsNull.isDefined && rightIsNull.isDefined && nullLiteralProto.isDefined) { + if (arraysOverlapScalarExpr.isDefined && leftIsNull.isDefined && + rightIsNull.isDefined && nullLiteralProto.isDefined) { val caseWhenExpr = ExprOuterClass.CaseWhen .newBuilder() .addWhen(leftIsNull.get)