Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim {
DataTypes.IntegerType | DataTypes.LongType | DataTypes.BooleanType |
DataTypes.TimestampType =>
Compatible()
case _ => Unsupported(Some(s"Cast from DecimalType to $toType is not supported"))
case _ => Unsupported(s"Cast from DecimalType to $toType is not supported")
}

private def canCastFromDate(toType: DataType, evalMode: CometEvalMode.Value): SupportLevel =
Expand All @@ -394,10 +394,10 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim {
DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType |
DataTypes.DoubleType | _: DecimalType if evalMode == CometEvalMode.LEGACY =>
Compatible()
case _ => Unsupported(Some(s"Cast from DateType to $toType is not supported"))
case _ => Unsupported(s"Cast from DateType to $toType is not supported")
}

private def unsupported(fromType: DataType, toType: DataType): Unsupported = {
Unsupported(Some(s"Cast from $fromType to $toType is not supported"))
Unsupported(s"Cast from $fromType to $toType is not supported")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
if (handler.enabledConfig.forall(_.get(op.conf))) {
handler.getSupportLevel(op) match {
case Unsupported(notes) =>
withInfo(op, notes.getOrElse(""))
withInfo(op, notes)
false
case Incompatible(notes) =>
val allowIncompat = CometConf.isOperatorAllowIncompat(opName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
}
aggHandler.getSupportLevel(fn) match {
case Unsupported(notes) =>
withInfo(fn, notes.getOrElse(""))
withInfo(fn, notes)
None
case Incompatible(notes) =>
val exprAllowIncompat = CometConf.isExprAllowIncompat(exprConfName)
Expand Down Expand Up @@ -609,7 +609,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
}
handler.getSupportLevel(expr) match {
case Unsupported(notes) =>
withInfo(expr, notes.getOrElse(""))
withInfo(expr, notes)
None
case Incompatible(notes) =>
val exprAllowIncompat = CometConf.isExprAllowIncompat(exprConfName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ case class Compatible(notes: Option[String] = None) extends SupportLevel
case class Incompatible(notes: Option[String] = None) extends SupportLevel

/** Comet does not support this feature */
case class Unsupported(notes: Option[String] = None) extends SupportLevel
case class Unsupported(notes: String) extends SupportLevel
7 changes: 4 additions & 3 deletions spark/src/main/scala/org/apache/comet/serde/arrays.scala
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,8 @@ object CometArrayFilter extends CometExpressionSerde[ArrayFilter] {
override def getSupportLevel(expr: ArrayFilter): SupportLevel = {
expr.function.children.headOption match {
case Some(_: IsNotNull) => Compatible()
case _ => Unsupported()
case _ =>
Unsupported("Only array_compact (ArrayFilter with IsNotNull) is supported")
Comment on lines +625 to +626
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was previously not reported

}
}

Expand All @@ -639,10 +640,10 @@ object CometSize extends CometExpressionSerde[Size] {
override def getSupportLevel(expr: Size): SupportLevel = {
expr.child.dataType match {
case _: ArrayType => Compatible()
case _: MapType => Unsupported(Some("size does not support map inputs"))
case _: MapType => Unsupported("size does not support map inputs")
case other =>
// this should be unreachable because Spark only supports map and array inputs
Unsupported(Some(s"Unsupported child data type: $other"))
Unsupported(s"Unsupported child data type: $other")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ object CometKnownFloatingPointNormalized
case _: NormalizeNaNAndZero => Compatible()
case _ =>
Unsupported(
Some(
"KnownFloatingPointNormalized only supports NormalizeNaNAndZero child expressions"))
"KnownFloatingPointNormalized only supports NormalizeNaNAndZero child expressions")
}
}

Expand Down
13 changes: 6 additions & 7 deletions spark/src/main/scala/org/apache/comet/serde/datetime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ object CometUnixTimestamp extends CometExpressionSerde[UnixTimestamp] {
Compatible()
} else {
val inputType = expr.children.head.dataType
Unsupported(Some(s"unix_timestamp does not support input type: $inputType"))
Unsupported(s"unix_timestamp does not support input type: $inputType")
}
}

Expand Down Expand Up @@ -394,7 +394,7 @@ object CometTruncDate extends CometExpressionSerde[TruncDate] {
if (supportedFormats.contains(fmt.toString.toLowerCase(Locale.ROOT))) {
Compatible()
} else {
Unsupported(Some(s"Format $fmt is not supported"))
Unsupported(s"Format $fmt is not supported")
}
case _ =>
Incompatible(
Expand Down Expand Up @@ -454,7 +454,7 @@ object CometTruncTimestamp extends CometExpressionSerde[TruncTimestamp] {
" (https://github.com/apache/datafusion-comet/issues/2649)"))
}
} else {
Unsupported(Some(s"Format $fmt is not supported"))
Unsupported(s"Format $fmt is not supported")
}
case _ =>
Incompatible(
Expand Down Expand Up @@ -550,12 +550,11 @@ object CometDateFormat extends CometExpressionSerde[DateFormatClass] {
}
} else {
Unsupported(
Some(
s"Format '$format' is not supported. Supported formats: " +
supportedFormats.keys.mkString(", ")))
s"Format '$format' is not supported. Supported formats: " +
supportedFormats.keys.mkString(", "))
}
case _ =>
Unsupported(Some("Only literal format strings are supported"))
Unsupported("Only literal format strings are supported")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object CometMakeDecimal extends CometExpressionSerde[MakeDecimal] {
override def getSupportLevel(expr: MakeDecimal): SupportLevel = {
expr.child.dataType match {
case LongType => Compatible()
case other => Unsupported(Some(s"Unsupported input data type: $other"))
case other => Unsupported(s"Unsupported input data type: $other")
}
}

Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/org/apache/comet/serde/literals.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ object CometLiteral extends CometExpressionSerde[Literal] with Logging {
.isInstanceOf[ArrayType])))) {
Compatible(None)
} else {
Unsupported(Some(s"Unsupported data type ${expr.dataType}"))
Unsupported(s"Unsupported data type ${expr.dataType}")
}
}

Expand Down
4 changes: 2 additions & 2 deletions spark/src/main/scala/org/apache/comet/serde/math.scala
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ object CometAbs extends CometExpressionSerde[Abs] with MathExprBase {
Compatible()
case _ =>
// Spark supports NumericType, DayTimeIntervalType, and YearMonthIntervalType
Unsupported(Some("Only integral, floating-point, and decimal types are supported"))
Unsupported("Only integral, floating-point, and decimal types are supported")
}
}

Expand Down Expand Up @@ -244,7 +244,7 @@ object CometCheckOverflow extends CometExpressionSerde[CheckOverflow] {
if (expr.dataType.isInstanceOf[DecimalType]) {
Compatible()
} else {
Unsupported(Some("dataType must be DecimalType"))
Unsupported("dataType must be DecimalType")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,28 +60,28 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec
case _: ParquetFileFormat =>
if (!cmd.outputPath.toString.startsWith("file:") && !cmd.outputPath.toString
.startsWith("hdfs:")) {
return Unsupported(Some("Supported output filesystems: local, HDFS"))
return Unsupported("Supported output filesystems: local, HDFS")
}

if (cmd.bucketSpec.isDefined) {
return Unsupported(Some("Bucketed writes are not supported"))
return Unsupported("Bucketed writes are not supported")
}

if (cmd.partitionColumns.nonEmpty || cmd.staticPartitions.nonEmpty) {
return Unsupported(Some("Partitioned writes are not supported"))
return Unsupported("Partitioned writes are not supported")
}

val codec = parseCompressionCodec(cmd)
if (!supportedCompressionCodes.contains(codec)) {
return Unsupported(Some(s"Unsupported compression codec: $codec"))
return Unsupported(s"Unsupported compression codec: $codec")
}

Incompatible(Some("Parquet write support is highly experimental"))
case _ =>
Unsupported(Some("Only Parquet writes are supported"))
Unsupported("Only Parquet writes are supported")
}
case other =>
Unsupported(Some(s"Unsupported write command: ${other.getClass}"))
Unsupported(s"Unsupported write command: ${other.getClass}")
}
}

Expand Down
16 changes: 8 additions & 8 deletions spark/src/main/scala/org/apache/comet/serde/strings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ object CometLower extends CometCaseConversionBase[Lower]("lower")

object CometLength extends CometScalarFunction[Length]("length") {
override def getSupportLevel(expr: Length): SupportLevel = expr.child.dataType match {
case _: BinaryType => Unsupported(Some("Length on BinaryType is not supported"))
case _: BinaryType => Unsupported("Length on BinaryType is not supported")
case _ => Compatible()
}
}
Expand Down Expand Up @@ -139,7 +139,7 @@ object CometLeft extends CometExpressionSerde[Left] {
override def getSupportLevel(expr: Left): SupportLevel = {
expr.str.dataType match {
case _: BinaryType | _: StringType => Compatible()
case _ => Unsupported(Some(s"LEFT does not support ${expr.str.dataType}"))
case _ => Unsupported(s"LEFT does not support ${expr.str.dataType}")
}
}
}
Expand Down Expand Up @@ -182,7 +182,7 @@ object CometRight extends CometExpressionSerde[Right] {
override def getSupportLevel(expr: Right): SupportLevel = {
expr.str.dataType match {
case _: StringType => Compatible()
case _ => Unsupported(Some(s"RIGHT does not support ${expr.str.dataType}"))
case _ => Unsupported(s"RIGHT does not support ${expr.str.dataType}")
}
}
}
Expand Down Expand Up @@ -271,10 +271,10 @@ object CometStringRPad extends CometExpressionSerde[StringRPad] {

override def getSupportLevel(expr: StringRPad): SupportLevel = {
if (expr.str.isInstanceOf[Literal]) {
return Unsupported(Some("Scalar values are not supported for the str argument"))
return Unsupported("Scalar values are not supported for the str argument")
}
if (!expr.pad.isInstanceOf[Literal]) {
return Unsupported(Some("Only scalar values are supported for the pad argument"))
return Unsupported("Only scalar values are supported for the pad argument")
}
Compatible()
}
Expand All @@ -296,10 +296,10 @@ object CometStringLPad extends CometExpressionSerde[StringLPad] {

override def getSupportLevel(expr: StringLPad): SupportLevel = {
if (expr.str.isInstanceOf[Literal]) {
return Unsupported(Some("Scalar values are not supported for the str argument"))
return Unsupported("Scalar values are not supported for the str argument")
}
if (!expr.pad.isInstanceOf[Literal]) {
return Unsupported(Some("Only scalar values are supported for the pad argument"))
return Unsupported("Only scalar values are supported for the pad argument")
}
Compatible()
}
Expand Down Expand Up @@ -330,7 +330,7 @@ object CometRegExpReplace extends CometExpressionSerde[RegExpReplace] {
expr.pos match {
case Literal(value, DataTypes.IntegerType) if value == 1 => Compatible()
case _ =>
Unsupported(Some("Comet only supports regexp_replace with an offset of 1 (no offset)."))
Unsupported("Comet only supports regexp_replace with an offset of 1 (no offset).")
}
}

Expand Down
3 changes: 1 addition & 2 deletions spark/src/main/scala/org/apache/comet/serde/structs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ object CometStructsToCsv extends CometExpressionSerde[StructsToCsv] {
val containsComplexType = dataTypes.exists(DataTypeSupport.isComplexType)
if (containsComplexType) {
return Unsupported(
Some(
s"The schema ${expr.inputSchema} is not supported because it includes a complex type"))
s"The schema ${expr.inputSchema} is not supported because it includes a complex type")
}
val containsIncompatibleDataTypes = dataTypes.exists(incompatibleDataTypes.contains)
if (containsIncompatibleDataTypes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object CometCollectLimitExec extends CometSink[CollectLimitExec] {

override def getSupportLevel(op: CollectLimitExec): SupportLevel = {
if (!isCometShuffleEnabled(op.conf)) {
return Unsupported(Some("Comet shuffle is not enabled"))
return Unsupported("Comet shuffle is not enabled")
}
Compatible()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,24 @@ object CometTakeOrderedAndProjectExec extends CometSink[TakeOrderedAndProjectExe

override def getSupportLevel(op: TakeOrderedAndProjectExec): SupportLevel = {
if (!isCometShuffleEnabled(op.conf)) {
return Unsupported(Some("TakeOrderedAndProject requires shuffle to be enabled"))
return Unsupported("TakeOrderedAndProject requires shuffle to be enabled")
}
op.projectList.foreach { p =>
val o = exprToProto(p, op.child.output)
if (o.isEmpty) {
return Unsupported(Some(s"unsupported projection: $p"))
return Unsupported(s"unsupported projection: $p")
}
o
}
op.sortOrder.foreach { s =>
val o = exprToProto(s, op.child.output)
if (o.isEmpty) {
return Unsupported(Some(s"unsupported sort order: $s"))
return Unsupported(s"unsupported sort order: $s")
}
o
}
if (!supportedSortType(op, op.sortOrder)) {
return Unsupported(Some("unsupported data type in sort order"))
return Unsupported("unsupported data type in sort order")
}
Compatible()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ object CometShuffleExchangeExec
if (nativeShuffleSupported(op) || columnarShuffleSupported(op)) {
Compatible()
} else {
Unsupported()
Unsupported("Comet shuffle is not enabled or the partitioning is not supported")
}
}

Expand Down
14 changes: 7 additions & 7 deletions spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1173,13 +1173,13 @@ object CometExplodeExec extends CometOperatorSerde[GenerateExec] {

override def getSupportLevel(op: GenerateExec): SupportLevel = {
if (!op.generator.deterministic) {
return Unsupported(Some("Only deterministic generators are supported"))
return Unsupported("Only deterministic generators are supported")
}
if (op.generator.children.length != 1) {
return Unsupported(Some("generators with multiple inputs are not supported"))
return Unsupported("generators with multiple inputs are not supported")
}
if (op.generator.nodeName.toLowerCase(Locale.ROOT) != "explode") {
return Unsupported(Some(s"Unsupported generator: ${op.generator.nodeName}"))
return Unsupported(s"Unsupported generator: ${op.generator.nodeName}")
}
if (op.outer) {
// DataFusion UnnestExec has different semantics to Spark for this case
Expand All @@ -1192,9 +1192,9 @@ object CometExplodeExec extends CometOperatorSerde[GenerateExec] {
case _: MapType =>
// TODO add support for map types
// https://github.com/apache/datafusion-comet/issues/2837
Unsupported(Some("Comet only supports explode/explode_outer for arrays, not maps"))
Unsupported("Comet only supports explode/explode_outer for arrays, not maps")
case other =>
Unsupported(Some(s"Unsupported data type: $other"))
Unsupported(s"Unsupported data type: $other")
}
}

Expand Down Expand Up @@ -1509,11 +1509,11 @@ object CometHashAggregateExec
// CometExecRule does not allow mixed Spark/Comet aggregates
if (!CometConf.COMET_ENABLE_PARTIAL_HASH_AGGREGATE.get(op.conf) &&
op.aggregateExpressions.exists(expr => expr.mode == Partial || expr.mode == PartialMerge)) {
return Unsupported(Some("Partial aggregates disabled via test config"))
return Unsupported("Partial aggregates disabled via test config")
}
if (!CometConf.COMET_ENABLE_FINAL_HASH_AGGREGATE.get(op.conf) &&
op.aggregateExpressions.exists(_.mode == Final)) {
return Unsupported(Some("Final aggregates disabled via test config"))
return Unsupported("Final aggregates disabled via test config")
}
Compatible()
}
Expand Down
Loading