From 9186e25435e483ec320f6bc29a0b99fc475aa91d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 2 Apr 2026 13:59:08 -0600 Subject: [PATCH] fix: make Unsupported require a reason string Change Unsupported case class from Option[String] to String to prevent empty fallback reasons in explain output. This ensures every unsupported feature has a descriptive message explaining why it is not supported. --- .../org/apache/comet/expressions/CometCast.scala | 6 +++--- .../org/apache/comet/rules/CometExecRule.scala | 2 +- .../org/apache/comet/serde/QueryPlanSerde.scala | 4 ++-- .../org/apache/comet/serde/SupportLevel.scala | 2 +- .../scala/org/apache/comet/serde/arrays.scala | 7 ++++--- .../comet/serde/contraintExpressions.scala | 3 +-- .../scala/org/apache/comet/serde/datetime.scala | 13 ++++++------- .../apache/comet/serde/decimalExpressions.scala | 2 +- .../scala/org/apache/comet/serde/literals.scala | 2 +- .../main/scala/org/apache/comet/serde/math.scala | 4 ++-- .../serde/operator/CometDataWritingCommand.scala | 12 ++++++------ .../scala/org/apache/comet/serde/strings.scala | 16 ++++++++-------- .../scala/org/apache/comet/serde/structs.scala | 3 +-- .../spark/sql/comet/CometCollectLimitExec.scala | 2 +- .../comet/CometTakeOrderedAndProjectExec.scala | 8 ++++---- .../shuffle/CometShuffleExchangeExec.scala | 2 +- .../org/apache/spark/sql/comet/operators.scala | 14 +++++++------- 17 files changed, 50 insertions(+), 52 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index 2188f8e9af..1821556494 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -383,7 +383,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { DataTypes.IntegerType | DataTypes.LongType | DataTypes.BooleanType | DataTypes.TimestampType => Compatible() - case _ => Unsupported(Some(s"Cast from DecimalType to $toType is not supported")) + case _ => Unsupported(s"Cast from DecimalType to $toType is not supported") } private def canCastFromDate(toType: DataType, evalMode: CometEvalMode.Value): SupportLevel = @@ -394,10 +394,10 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType | DataTypes.DoubleType | _: DecimalType if evalMode == CometEvalMode.LEGACY => Compatible() - case _ => Unsupported(Some(s"Cast from DateType to $toType is not supported")) + case _ => Unsupported(s"Cast from DateType to $toType is not supported") } private def unsupported(fromType: DataType, toType: DataType): Unsupported = { - Unsupported(Some(s"Cast from $fromType to $toType is not supported")) + Unsupported(s"Cast from $fromType to $toType is not supported") } } diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 70983b0599..7cbdda1935 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -520,7 +520,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { if (handler.enabledConfig.forall(_.get(op.conf))) { handler.getSupportLevel(op) match { case Unsupported(notes) => - withInfo(op, notes.getOrElse("")) + withInfo(op, notes) false case Incompatible(notes) => val allowIncompat = CometConf.isOperatorAllowIncompat(opName) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 59fb0f9819..88557ca161 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -482,7 +482,7 @@ object QueryPlanSerde extends Logging with CometExprShim { } aggHandler.getSupportLevel(fn) match { case Unsupported(notes) => - withInfo(fn, notes.getOrElse("")) + withInfo(fn, notes) None case Incompatible(notes) => val exprAllowIncompat = CometConf.isExprAllowIncompat(exprConfName) @@ -609,7 +609,7 @@ object QueryPlanSerde extends Logging with CometExprShim { } handler.getSupportLevel(expr) match { case Unsupported(notes) => - withInfo(expr, notes.getOrElse("")) + withInfo(expr, notes) None case Incompatible(notes) => val exprAllowIncompat = CometConf.isExprAllowIncompat(exprConfName) diff --git a/spark/src/main/scala/org/apache/comet/serde/SupportLevel.scala b/spark/src/main/scala/org/apache/comet/serde/SupportLevel.scala index d5a524077d..9414529e55 100644 --- a/spark/src/main/scala/org/apache/comet/serde/SupportLevel.scala +++ b/spark/src/main/scala/org/apache/comet/serde/SupportLevel.scala @@ -39,4 +39,4 @@ case class Compatible(notes: Option[String] = None) extends SupportLevel case class Incompatible(notes: Option[String] = None) extends SupportLevel /** Comet does not support this feature */ -case class Unsupported(notes: Option[String] = None) extends SupportLevel +case class Unsupported(notes: String) extends SupportLevel diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index 47a6e91421..54d8a5f8b8 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -622,7 +622,8 @@ object CometArrayFilter extends CometExpressionSerde[ArrayFilter] { override def getSupportLevel(expr: ArrayFilter): SupportLevel = { expr.function.children.headOption match { case Some(_: IsNotNull) => Compatible() - case _ => Unsupported() + case _ => + Unsupported("Only array_compact (ArrayFilter with IsNotNull) is supported") } } @@ -639,10 +640,10 @@ object CometSize extends CometExpressionSerde[Size] { override def getSupportLevel(expr: Size): SupportLevel = { expr.child.dataType match { case _: ArrayType => Compatible() - case _: MapType => Unsupported(Some("size does not support map inputs")) + case _: MapType => Unsupported("size does not support map inputs") case other => // this should be unreachable because Spark only supports map and array inputs - Unsupported(Some(s"Unsupported child data type: $other")) + Unsupported(s"Unsupported child data type: $other") } } diff --git a/spark/src/main/scala/org/apache/comet/serde/contraintExpressions.scala b/spark/src/main/scala/org/apache/comet/serde/contraintExpressions.scala index 62643d6da6..a92d1947e3 100644 --- a/spark/src/main/scala/org/apache/comet/serde/contraintExpressions.scala +++ b/spark/src/main/scala/org/apache/comet/serde/contraintExpressions.scala @@ -33,8 +33,7 @@ object CometKnownFloatingPointNormalized case _: NormalizeNaNAndZero => Compatible() case _ => Unsupported( - Some( - "KnownFloatingPointNormalized only supports NormalizeNaNAndZero child expressions")) + "KnownFloatingPointNormalized only supports NormalizeNaNAndZero child expressions") } } diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index 8f3894c1ac..6c8d6aadfc 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -309,7 +309,7 @@ object CometUnixTimestamp extends CometExpressionSerde[UnixTimestamp] { Compatible() } else { val inputType = expr.children.head.dataType - Unsupported(Some(s"unix_timestamp does not support input type: $inputType")) + Unsupported(s"unix_timestamp does not support input type: $inputType") } } @@ -394,7 +394,7 @@ object CometTruncDate extends CometExpressionSerde[TruncDate] { if (supportedFormats.contains(fmt.toString.toLowerCase(Locale.ROOT))) { Compatible() } else { - Unsupported(Some(s"Format $fmt is not supported")) + Unsupported(s"Format $fmt is not supported") } case _ => Incompatible( @@ -454,7 +454,7 @@ object CometTruncTimestamp extends CometExpressionSerde[TruncTimestamp] { " (https://github.com/apache/datafusion-comet/issues/2649)")) } } else { - Unsupported(Some(s"Format $fmt is not supported")) + Unsupported(s"Format $fmt is not supported") } case _ => Incompatible( @@ -550,12 +550,11 @@ object CometDateFormat extends CometExpressionSerde[DateFormatClass] { } } else { Unsupported( - Some( - s"Format '$format' is not supported. Supported formats: " + - supportedFormats.keys.mkString(", "))) + s"Format '$format' is not supported. Supported formats: " + + supportedFormats.keys.mkString(", ")) } case _ => - Unsupported(Some("Only literal format strings are supported")) + Unsupported("Only literal format strings are supported") } } diff --git a/spark/src/main/scala/org/apache/comet/serde/decimalExpressions.scala b/spark/src/main/scala/org/apache/comet/serde/decimalExpressions.scala index 880f01742b..ea4563c428 100644 --- a/spark/src/main/scala/org/apache/comet/serde/decimalExpressions.scala +++ b/spark/src/main/scala/org/apache/comet/serde/decimalExpressions.scala @@ -42,7 +42,7 @@ object CometMakeDecimal extends CometExpressionSerde[MakeDecimal] { override def getSupportLevel(expr: MakeDecimal): SupportLevel = { expr.child.dataType match { case LongType => Compatible() - case other => Unsupported(Some(s"Unsupported input data type: $other")) + case other => Unsupported(s"Unsupported input data type: $other") } } diff --git a/spark/src/main/scala/org/apache/comet/serde/literals.scala b/spark/src/main/scala/org/apache/comet/serde/literals.scala index e24b55449c..d95714f163 100644 --- a/spark/src/main/scala/org/apache/comet/serde/literals.scala +++ b/spark/src/main/scala/org/apache/comet/serde/literals.scala @@ -53,7 +53,7 @@ object CometLiteral extends CometExpressionSerde[Literal] with Logging { .isInstanceOf[ArrayType])))) { Compatible(None) } else { - Unsupported(Some(s"Unsupported data type ${expr.dataType}")) + Unsupported(s"Unsupported data type ${expr.dataType}") } } diff --git a/spark/src/main/scala/org/apache/comet/serde/math.scala b/spark/src/main/scala/org/apache/comet/serde/math.scala index 45c60b8226..4e2729597b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/math.scala +++ b/spark/src/main/scala/org/apache/comet/serde/math.scala @@ -191,7 +191,7 @@ object CometAbs extends CometExpressionSerde[Abs] with MathExprBase { Compatible() case _ => // Spark supports NumericType, DayTimeIntervalType, and YearMonthIntervalType - Unsupported(Some("Only integral, floating-point, and decimal types are supported")) + Unsupported("Only integral, floating-point, and decimal types are supported") } } @@ -244,7 +244,7 @@ object CometCheckOverflow extends CometExpressionSerde[CheckOverflow] { if (expr.dataType.isInstanceOf[DecimalType]) { Compatible() } else { - Unsupported(Some("dataType must be DecimalType")) + Unsupported("dataType must be DecimalType") } } diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala index 69b9bd5f85..ead1311018 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala @@ -60,28 +60,28 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec case _: ParquetFileFormat => if (!cmd.outputPath.toString.startsWith("file:") && !cmd.outputPath.toString .startsWith("hdfs:")) { - return Unsupported(Some("Supported output filesystems: local, HDFS")) + return Unsupported("Supported output filesystems: local, HDFS") } if (cmd.bucketSpec.isDefined) { - return Unsupported(Some("Bucketed writes are not supported")) + return Unsupported("Bucketed writes are not supported") } if (cmd.partitionColumns.nonEmpty || cmd.staticPartitions.nonEmpty) { - return Unsupported(Some("Partitioned writes are not supported")) + return Unsupported("Partitioned writes are not supported") } val codec = parseCompressionCodec(cmd) if (!supportedCompressionCodes.contains(codec)) { - return Unsupported(Some(s"Unsupported compression codec: $codec")) + return Unsupported(s"Unsupported compression codec: $codec") } Incompatible(Some("Parquet write support is highly experimental")) case _ => - Unsupported(Some("Only Parquet writes are supported")) + Unsupported("Only Parquet writes are supported") } case other => - Unsupported(Some(s"Unsupported write command: ${other.getClass}")) + Unsupported(s"Unsupported write command: ${other.getClass}") } } diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index 64ba644048..54800fd24e 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -69,7 +69,7 @@ object CometLower extends CometCaseConversionBase[Lower]("lower") object CometLength extends CometScalarFunction[Length]("length") { override def getSupportLevel(expr: Length): SupportLevel = expr.child.dataType match { - case _: BinaryType => Unsupported(Some("Length on BinaryType is not supported")) + case _: BinaryType => Unsupported("Length on BinaryType is not supported") case _ => Compatible() } } @@ -139,7 +139,7 @@ object CometLeft extends CometExpressionSerde[Left] { override def getSupportLevel(expr: Left): SupportLevel = { expr.str.dataType match { case _: BinaryType | _: StringType => Compatible() - case _ => Unsupported(Some(s"LEFT does not support ${expr.str.dataType}")) + case _ => Unsupported(s"LEFT does not support ${expr.str.dataType}") } } } @@ -182,7 +182,7 @@ object CometRight extends CometExpressionSerde[Right] { override def getSupportLevel(expr: Right): SupportLevel = { expr.str.dataType match { case _: StringType => Compatible() - case _ => Unsupported(Some(s"RIGHT does not support ${expr.str.dataType}")) + case _ => Unsupported(s"RIGHT does not support ${expr.str.dataType}") } } } @@ -271,10 +271,10 @@ object CometStringRPad extends CometExpressionSerde[StringRPad] { override def getSupportLevel(expr: StringRPad): SupportLevel = { if (expr.str.isInstanceOf[Literal]) { - return Unsupported(Some("Scalar values are not supported for the str argument")) + return Unsupported("Scalar values are not supported for the str argument") } if (!expr.pad.isInstanceOf[Literal]) { - return Unsupported(Some("Only scalar values are supported for the pad argument")) + return Unsupported("Only scalar values are supported for the pad argument") } Compatible() } @@ -296,10 +296,10 @@ object CometStringLPad extends CometExpressionSerde[StringLPad] { override def getSupportLevel(expr: StringLPad): SupportLevel = { if (expr.str.isInstanceOf[Literal]) { - return Unsupported(Some("Scalar values are not supported for the str argument")) + return Unsupported("Scalar values are not supported for the str argument") } if (!expr.pad.isInstanceOf[Literal]) { - return Unsupported(Some("Only scalar values are supported for the pad argument")) + return Unsupported("Only scalar values are supported for the pad argument") } Compatible() } @@ -330,7 +330,7 @@ object CometRegExpReplace extends CometExpressionSerde[RegExpReplace] { expr.pos match { case Literal(value, DataTypes.IntegerType) if value == 1 => Compatible() case _ => - Unsupported(Some("Comet only supports regexp_replace with an offset of 1 (no offset).")) + Unsupported("Comet only supports regexp_replace with an offset of 1 (no offset).") } } diff --git a/spark/src/main/scala/org/apache/comet/serde/structs.scala b/spark/src/main/scala/org/apache/comet/serde/structs.scala index 449d0fc5b9..6e761e9e4e 100644 --- a/spark/src/main/scala/org/apache/comet/serde/structs.scala +++ b/spark/src/main/scala/org/apache/comet/serde/structs.scala @@ -248,8 +248,7 @@ object CometStructsToCsv extends CometExpressionSerde[StructsToCsv] { val containsComplexType = dataTypes.exists(DataTypeSupport.isComplexType) if (containsComplexType) { return Unsupported( - Some( - s"The schema ${expr.inputSchema} is not supported because it includes a complex type")) + s"The schema ${expr.inputSchema} is not supported because it includes a complex type") } val containsIncompatibleDataTypes = dataTypes.exists(incompatibleDataTypes.contains) if (containsIncompatibleDataTypes) { diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala index 6d43fe156b..afbd043bb0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala @@ -43,7 +43,7 @@ object CometCollectLimitExec extends CometSink[CollectLimitExec] { override def getSupportLevel(op: CollectLimitExec): SupportLevel = { if (!isCometShuffleEnabled(op.conf)) { - return Unsupported(Some("Comet shuffle is not enabled")) + return Unsupported("Comet shuffle is not enabled") } Compatible() } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala index 2abe783172..ce244969c9 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala @@ -42,24 +42,24 @@ object CometTakeOrderedAndProjectExec extends CometSink[TakeOrderedAndProjectExe override def getSupportLevel(op: TakeOrderedAndProjectExec): SupportLevel = { if (!isCometShuffleEnabled(op.conf)) { - return Unsupported(Some("TakeOrderedAndProject requires shuffle to be enabled")) + return Unsupported("TakeOrderedAndProject requires shuffle to be enabled") } op.projectList.foreach { p => val o = exprToProto(p, op.child.output) if (o.isEmpty) { - return Unsupported(Some(s"unsupported projection: $p")) + return Unsupported(s"unsupported projection: $p") } o } op.sortOrder.foreach { s => val o = exprToProto(s, op.child.output) if (o.isEmpty) { - return Unsupported(Some(s"unsupported sort order: $s")) + return Unsupported(s"unsupported sort order: $s") } o } if (!supportedSortType(op, op.sortOrder)) { - return Unsupported(Some("unsupported data type in sort order")) + return Unsupported("unsupported data type in sort order") } Compatible() } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index df2dca0331..1ac6f6361d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -225,7 +225,7 @@ object CometShuffleExchangeExec if (nativeShuffleSupported(op) || columnarShuffleSupported(op)) { Compatible() } else { - Unsupported() + Unsupported("Comet shuffle is not enabled or the partitioning is not supported") } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 1f5e7b6677..cc42f323a8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -1173,13 +1173,13 @@ object CometExplodeExec extends CometOperatorSerde[GenerateExec] { override def getSupportLevel(op: GenerateExec): SupportLevel = { if (!op.generator.deterministic) { - return Unsupported(Some("Only deterministic generators are supported")) + return Unsupported("Only deterministic generators are supported") } if (op.generator.children.length != 1) { - return Unsupported(Some("generators with multiple inputs are not supported")) + return Unsupported("generators with multiple inputs are not supported") } if (op.generator.nodeName.toLowerCase(Locale.ROOT) != "explode") { - return Unsupported(Some(s"Unsupported generator: ${op.generator.nodeName}")) + return Unsupported(s"Unsupported generator: ${op.generator.nodeName}") } if (op.outer) { // DataFusion UnnestExec has different semantics to Spark for this case @@ -1192,9 +1192,9 @@ object CometExplodeExec extends CometOperatorSerde[GenerateExec] { case _: MapType => // TODO add support for map types // https://github.com/apache/datafusion-comet/issues/2837 - Unsupported(Some("Comet only supports explode/explode_outer for arrays, not maps")) + Unsupported("Comet only supports explode/explode_outer for arrays, not maps") case other => - Unsupported(Some(s"Unsupported data type: $other")) + Unsupported(s"Unsupported data type: $other") } } @@ -1509,11 +1509,11 @@ object CometHashAggregateExec // CometExecRule does not allow mixed Spark/Comet aggregates if (!CometConf.COMET_ENABLE_PARTIAL_HASH_AGGREGATE.get(op.conf) && op.aggregateExpressions.exists(expr => expr.mode == Partial || expr.mode == PartialMerge)) { - return Unsupported(Some("Partial aggregates disabled via test config")) + return Unsupported("Partial aggregates disabled via test config") } if (!CometConf.COMET_ENABLE_FINAL_HASH_AGGREGATE.get(op.conf) && op.aggregateExpressions.exists(_.mode == Final)) { - return Unsupported(Some("Final aggregates disabled via test config")) + return Unsupported("Final aggregates disabled via test config") } Compatible() }