diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 046ccf0b1c..32522bf51f 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -222,31 +222,28 @@ object CometConf extends ShimCometConf { val COMET_CONVERT_FROM_PARQUET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.convert.parquet.enabled") - .category(CATEGORY_TESTING) + .category(CATEGORY_EXEC) .doc( "When enabled, data from Spark (non-native) Parquet v1 and v2 scans will be converted to " + - "Arrow format. This is an experimental feature and has known issues with " + - "non-UTC timezones.") + "Arrow format.") .booleanConf .createWithDefault(false) val COMET_CONVERT_FROM_JSON_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.convert.json.enabled") - .category(CATEGORY_TESTING) + .category(CATEGORY_EXEC) .doc( "When enabled, data from Spark (non-native) JSON v1 and v2 scans will be converted to " + - "Arrow format. This is an experimental feature and has known issues with " + - "non-UTC timezones.") + "Arrow format.") .booleanConf .createWithDefault(false) val COMET_CONVERT_FROM_CSV_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.convert.csv.enabled") - .category(CATEGORY_TESTING) + .category(CATEGORY_EXEC) .doc( "When enabled, data from Spark (non-native) CSV v1 and v2 scans will be converted to " + - "Arrow format. This is an experimental feature and has known issues with " + - "non-UTC timezones.") + "Arrow format.") .booleanConf .createWithDefault(false) @@ -743,17 +740,17 @@ object CometConf extends ShimCometConf { val COMET_SPARK_TO_ARROW_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.sparkToColumnar.enabled") - .category(CATEGORY_TESTING) + .category(CATEGORY_EXEC) .doc("Whether to enable Spark to Arrow columnar conversion. When this is turned on, " + "Comet will convert operators in " + "`spark.comet.sparkToColumnar.supportedOperatorList` into Arrow columnar format before " + - "processing. This is an experimental feature and has known issues with non-UTC timezones.") + "processing.") .booleanConf .createWithDefault(false) val COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST: ConfigEntry[Seq[String]] = conf("spark.comet.sparkToColumnar.supportedOperatorList") - .category(CATEGORY_TESTING) + .category(CATEGORY_EXEC) .doc("A comma-separated list of operators that will be converted to Arrow columnar " + s"format when `${COMET_SPARK_TO_ARROW_ENABLED.key}` is true.") .stringConf diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala index 68a2ebf8ec..622168bcc9 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala @@ -66,7 +66,8 @@ case class CometLocalTableScanExec( override def doExecuteColumnar(): RDD[ColumnarBatch] = { val numInputRows = longMetric("numOutputRows") val maxRecordsPerBatch = CometConf.COMET_BATCH_SIZE.get(conf) - val timeZoneId = conf.sessionLocalTimeZone + // Use UTC to match native side expectations. See CometSparkToColumnarExec. + val timeZoneId = "UTC" rdd.mapPartitionsInternal { sparkBatches => val context = TaskContext.get() val batches = CometArrowConverters.rowToArrowBatchIter( diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala index 8447c9d044..efe6a97d40 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala @@ -97,7 +97,12 @@ case class CometSparkToColumnarExec(child: SparkPlan) val numOutputBatches = longMetric("numOutputBatches") val conversionTime = longMetric("conversionTime") val maxRecordsPerBatch = CometConf.COMET_BATCH_SIZE.get(conf) - val timeZoneId = conf.sessionLocalTimeZone + // Use UTC for Arrow schema timezone to match the native side, which always + // deserializes Timestamp as Timestamp(Microsecond, Some("UTC")). Spark's internal + // timestamp representation is always UTC microseconds, so the timezone here is + // purely schema metadata. Using session timezone would cause Arrow RowConverter + // schema mismatch errors in non-UTC sessions. See COMET-2720. + val timeZoneId = "UTC" val schema = child.schema if (child.supportsColumnar) { diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index aff1816265..9cb89a7a11 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -2173,6 +2173,42 @@ class CometExecSuite extends CometTestBase { } } + test("LocalTableScanExec with timestamps in non-UTC timezone") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + SESSION_LOCAL_TIMEZONE.key -> "America/Los_Angeles") { + val df = Seq( + (1, java.sql.Timestamp.valueOf("2024-01-15 10:30:00")), + (2, java.sql.Timestamp.valueOf("2024-06-15 14:00:00")), + (3, java.sql.Timestamp.valueOf("2024-12-25 08:00:00"))) + .toDF("id", "ts") + .orderBy("ts") + checkSparkAnswerAndOperator(df) + } + } + + test("sort on timestamps with non-UTC timezone via LocalTableScan") { + // When session timezone is non-UTC, CometLocalTableScanExec and + // CometSparkToColumnarExec must use UTC for the Arrow schema timezone + // to match the native side's expectations. Without this, the native + // ScanExec sees a timezone mismatch and performs an unnecessary cast. + // The cast is currently a no-op (Arrow timestamps with timezone are + // always UTC microseconds), but using UTC avoids the overhead and + // keeps schemas consistent throughout the native plan. + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + SESSION_LOCAL_TIMEZONE.key -> "America/Los_Angeles") { + val df = Seq( + (1, java.sql.Timestamp.valueOf("2024-01-15 10:30:00")), + (2, java.sql.Timestamp.valueOf("2024-06-15 14:00:00")), + (3, java.sql.Timestamp.valueOf("2024-12-25 08:00:00"))) + .toDF("id", "ts") + .repartition(2) + .orderBy("ts") + checkSparkAnswer(df) + } + } + } case class BucketedTableTestSpec(