Skip to content

Commit f5d462f

Browse files
aokolnychyihuangxiaopingRD
authored andcommitted
[SPARK-54341][SQL] Remember TimeTravelSpec for tables loaded via TableProvider
### What changes were proposed in this pull request? This PR adds logic to remember `TimeTravelSpec` for tables loaded via `TableProvider`. ### Why are the changes needed? These changes are needed to align `TableProvider` with other places that remember time travel specs. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This PR comes with tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#53048 from aokolnychyi/spark-54341. Authored-by: Anton Okolnychyi <aokolnychyi@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 309277a commit f5d462f

File tree

2 files changed

+37
-16
lines changed

2 files changed

+37
-16
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ private[sql] object DataSourceV2Utils extends Logging {
115115
val finalOptions = sessionOptions.filter { case (k, _) => !optionsWithPath.contains(k) } ++
116116
optionsWithPath.originalMap
117117
val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
118-
val (table, catalog, ident) = provider match {
118+
val (table, catalog, ident, timeTravelSpec) = provider match {
119119
case _: SupportsCatalogOptions if userSpecifiedSchema.nonEmpty =>
120120
throw new IllegalArgumentException(
121121
s"$source does not support user specified schema. Please don't specify the schema.")
@@ -141,16 +141,17 @@ private[sql] object DataSourceV2Utils extends Logging {
141141
}
142142
val timeTravel = TimeTravelSpec.create(
143143
timeTravelTimestamp, timeTravelVersion, conf.sessionLocalTimeZone)
144-
(CatalogV2Util.getTable(catalog, ident, timeTravel), Some(catalog), Some(ident))
144+
val tbl = CatalogV2Util.getTable(catalog, ident, timeTravel)
145+
(tbl, Some(catalog), Some(ident), timeTravel)
145146
case _ =>
146147
// TODO: Non-catalog paths for DSV2 are currently not well defined.
147148
val tbl = DataSourceV2Utils.getTableFromProvider(provider, dsOptions, userSpecifiedSchema)
148-
(tbl, None, None)
149+
(tbl, None, None, None)
149150
}
150151
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
151152
table match {
152153
case _: SupportsRead if table.supports(BATCH_READ) =>
153-
Option(DataSourceV2Relation.create(table, catalog, ident, dsOptions))
154+
Option(DataSourceV2Relation.create(table, catalog, ident, dsOptions, timeTravelSpec))
154155
case _ => None
155156
}
156157
}

sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ import scala.util.Try
2626
import org.scalatest.BeforeAndAfter
2727

2828
import org.apache.spark.SparkException
29-
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, SaveMode}
30-
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
29+
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, QueryTest, SaveMode}
30+
import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NoSuchTableException, TableAlreadyExistsException, TimeTravelSpec}
3131
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression}
3232
import org.apache.spark.sql.catalyst.util.DateTimeUtils
3333
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, InMemoryTableCatalog, SupportsCatalogOptions, TableCatalog}
@@ -305,8 +305,13 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with
305305
.mode(SaveMode.Overwrite).save()
306306

307307
// load with version
308-
checkAnswer(load("t", Some(catalogName), version = Some("Snapshot123456789")), df1.toDF())
309-
checkAnswer(load("t", Some(catalogName), version = Some("2345678910")), df2.toDF())
308+
val readDF1 = load("t", Some(catalogName), version = Some("Snapshot123456789"))
309+
checkAnswer(readDF1, df1.toDF())
310+
checkTimeTravel(readDF1, expectedTimeTravelSpec = AsOfVersion("Snapshot123456789"))
311+
312+
val readDF2 = load("t", Some(catalogName), version = Some("2345678910"))
313+
checkAnswer(readDF2, df2.toDF())
314+
checkTimeTravel(readDF2, expectedTimeTravelSpec = AsOfVersion("2345678910"))
310315
}
311316

312317
val ts1 = DateTimeUtils.stringToTimestampAnsi(
@@ -330,16 +335,26 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with
330335
.mode(SaveMode.Overwrite).save()
331336

332337
// load with timestamp
333-
checkAnswer(load("t", Some(catalogName), version = None,
334-
timestamp = Some("2019-01-29 00:37:58")), df3.toDF())
335-
checkAnswer(load("t", Some(catalogName), version = None,
336-
timestamp = Some("2021-01-29 00:37:58")), df4.toDF())
338+
val readDF3 = load("t", Some(catalogName), version = None,
339+
timestamp = Some("2019-01-29 00:37:58"))
340+
checkAnswer(readDF3, df3.toDF())
341+
checkTimeTravel(readDF3, expectedTimeTravelSpec = AsOfTimestamp(ts1))
342+
343+
val readDF4 = load("t", Some(catalogName), version = None,
344+
timestamp = Some("2021-01-29 00:37:58"))
345+
checkAnswer(readDF4, df4.toDF())
346+
checkTimeTravel(readDF4, expectedTimeTravelSpec = AsOfTimestamp(ts2))
337347

338348
// load with timestamp in number format
339-
checkAnswer(load("t", Some(catalogName), version = None,
340-
timestamp = Some(MICROSECONDS.toSeconds(ts1).toString)), df3.toDF())
341-
checkAnswer(load("t", Some(catalogName), version = None,
342-
timestamp = Some(MICROSECONDS.toSeconds(ts2).toString)), df4.toDF())
349+
val readDF5 = load("t", Some(catalogName), version = None,
350+
timestamp = Some(MICROSECONDS.toSeconds(ts1).toString))
351+
checkAnswer(readDF5, df3.toDF())
352+
checkTimeTravel(readDF5, expectedTimeTravelSpec = AsOfTimestamp(ts1))
353+
354+
val readDF6 = load("t", Some(catalogName), version = None,
355+
timestamp = Some(MICROSECONDS.toSeconds(ts2).toString))
356+
checkAnswer(readDF6, df4.toDF())
357+
checkTimeTravel(readDF6, expectedTimeTravelSpec = AsOfTimestamp(ts2))
343358
}
344359

345360
val e = intercept[AnalysisException] {
@@ -360,6 +375,11 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with
360375
assert(v2.catalog.exists(_ == catalogPlugin))
361376
}
362377

378+
private def checkTimeTravel(ds: Dataset[_], expectedTimeTravelSpec: TimeTravelSpec): Unit = {
379+
val relation = ds.logicalPlan.asInstanceOf[DataSourceV2Relation]
380+
assert(relation.timeTravelSpec.contains(expectedTimeTravelSpec))
381+
}
382+
363383
private def load(
364384
name: String,
365385
catalogOpt: Option[String],

0 commit comments

Comments
 (0)