Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -222,31 +222,28 @@ object CometConf extends ShimCometConf {

val COMET_CONVERT_FROM_PARQUET_ENABLED: ConfigEntry[Boolean] =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any test which exercise COMET_CONVERT_FROM_PARQUET_ENABLED, COMET_CONVERT_FROM_JSON_ENABLED, and COMET_CONVERT_FROM_CSV_ENABLED, since these will no longer be 'experimental' ?

conf("spark.comet.convert.parquet.enabled")
.category(CATEGORY_TESTING)
.category(CATEGORY_EXEC)
.doc(
"When enabled, data from Spark (non-native) Parquet v1 and v2 scans will be converted to " +
"Arrow format. This is an experimental feature and has known issues with " +
"non-UTC timezones.")
"Arrow format.")
.booleanConf
.createWithDefault(false)

val COMET_CONVERT_FROM_JSON_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.convert.json.enabled")
.category(CATEGORY_TESTING)
.category(CATEGORY_EXEC)
.doc(
"When enabled, data from Spark (non-native) JSON v1 and v2 scans will be converted to " +
"Arrow format. This is an experimental feature and has known issues with " +
"non-UTC timezones.")
"Arrow format.")
.booleanConf
.createWithDefault(false)

val COMET_CONVERT_FROM_CSV_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.convert.csv.enabled")
.category(CATEGORY_TESTING)
.category(CATEGORY_EXEC)
.doc(
"When enabled, data from Spark (non-native) CSV v1 and v2 scans will be converted to " +
"Arrow format. This is an experimental feature and has known issues with " +
"non-UTC timezones.")
"Arrow format.")
.booleanConf
.createWithDefault(false)

Expand Down Expand Up @@ -743,17 +740,17 @@ object CometConf extends ShimCometConf {

val COMET_SPARK_TO_ARROW_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.sparkToColumnar.enabled")
.category(CATEGORY_TESTING)
.category(CATEGORY_EXEC)
.doc("Whether to enable Spark to Arrow columnar conversion. When this is turned on, " +
"Comet will convert operators in " +
"`spark.comet.sparkToColumnar.supportedOperatorList` into Arrow columnar format before " +
"processing. This is an experimental feature and has known issues with non-UTC timezones.")
"processing.")
.booleanConf
.createWithDefault(false)

val COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST: ConfigEntry[Seq[String]] =
conf("spark.comet.sparkToColumnar.supportedOperatorList")
.category(CATEGORY_TESTING)
.category(CATEGORY_EXEC)
.doc("A comma-separated list of operators that will be converted to Arrow columnar " +
s"format when `${COMET_SPARK_TO_ARROW_ENABLED.key}` is true.")
.stringConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ case class CometLocalTableScanExec(
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numInputRows = longMetric("numOutputRows")
val maxRecordsPerBatch = CometConf.COMET_BATCH_SIZE.get(conf)
val timeZoneId = conf.sessionLocalTimeZone
// Use UTC to match native side expectations. See CometSparkToColumnarExec.
val timeZoneId = "UTC"
rdd.mapPartitionsInternal { sparkBatches =>
val context = TaskContext.get()
val batches = CometArrowConverters.rowToArrowBatchIter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,12 @@ case class CometSparkToColumnarExec(child: SparkPlan)
val numOutputBatches = longMetric("numOutputBatches")
val conversionTime = longMetric("conversionTime")
val maxRecordsPerBatch = CometConf.COMET_BATCH_SIZE.get(conf)
val timeZoneId = conf.sessionLocalTimeZone
// Use UTC for Arrow schema timezone to match the native side, which always
// deserializes Timestamp as Timestamp(Microsecond, Some("UTC")). Spark's internal
// timestamp representation is always UTC microseconds, so the timezone here is
// purely schema metadata. Using session timezone would cause Arrow RowConverter
// schema mismatch errors in non-UTC sessions. See COMET-2720.
val timeZoneId = "UTC"
val schema = child.schema

if (child.supportsColumnar) {
Expand Down
36 changes: 36 additions & 0 deletions spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2173,6 +2173,42 @@ class CometExecSuite extends CometTestBase {
}
}

test("LocalTableScanExec with timestamps in non-UTC timezone") {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need COMET_SPARK_TO_ARROW_ENABLED to be set to true for these tests?

withSQLConf(
CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true",
SESSION_LOCAL_TIMEZONE.key -> "America/Los_Angeles") {
val df = Seq(
(1, java.sql.Timestamp.valueOf("2024-01-15 10:30:00")),
(2, java.sql.Timestamp.valueOf("2024-06-15 14:00:00")),
(3, java.sql.Timestamp.valueOf("2024-12-25 08:00:00")))
.toDF("id", "ts")
.orderBy("ts")
checkSparkAnswerAndOperator(df)
}
}

test("sort on timestamps with non-UTC timezone via LocalTableScan") {
// When session timezone is non-UTC, CometLocalTableScanExec and
// CometSparkToColumnarExec must use UTC for the Arrow schema timezone
// to match the native side's expectations. Without this, the native
// ScanExec sees a timezone mismatch and performs an unnecessary cast.
// The cast is currently a no-op (Arrow timestamps with timezone are
// always UTC microseconds), but using UTC avoids the overhead and
// keeps schemas consistent throughout the native plan.
withSQLConf(
CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true",
SESSION_LOCAL_TIMEZONE.key -> "America/Los_Angeles") {
val df = Seq(
(1, java.sql.Timestamp.valueOf("2024-01-15 10:30:00")),
(2, java.sql.Timestamp.valueOf("2024-06-15 14:00:00")),
(3, java.sql.Timestamp.valueOf("2024-12-25 08:00:00")))
.toDF("id", "ts")
.repartition(2)
.orderBy("ts")
checkSparkAnswer(df)
}
}

}

case class BucketedTableTestSpec(
Expand Down
Loading