Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect}
import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, DefaultValue, Identifier, InMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo}
import org.apache.spark.sql.connector.catalog.{BufferedRows, Column, ColumnDefaultValue, DefaultValue, Identifier, InMemoryBaseTable, InMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo}
import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, UpdateColumnDefaultValue}
import org.apache.spark.sql.connector.catalog.TableChange
Expand All @@ -40,7 +41,7 @@ import org.apache.spark.sql.execution.ExplainUtils.stripAQEPlan
import org.apache.spark.sql.execution.datasources.v2.{AlterTableExec, CreateTableExec, DataSourceV2Relation, ReplaceTableExec}
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BooleanType, CalendarIntervalType, DoubleType, IntegerType, StringType, TimestampType}
import org.apache.spark.sql.types.{BooleanType, CalendarIntervalType, DoubleType, IntegerType, StringType, StructType, TimestampType}
import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -1759,94 +1760,6 @@ class DataSourceV2DataFrameSuite
}
}

test("cached DSv2 table DataFrame is refreshed and reused after insert") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
val df1 = Seq((1L, "a"), (2L, "b")).toDF("id", "data")
df1.write.insertInto(t)

// cache DataFrame pointing to table
val readDF1 = spark.table(t)
readDF1.cache()
assertCached(readDF1)
checkAnswer(readDF1, Seq(Row(1L, "a"), Row(2L, "b")))

// insert more data, invalidating and refreshing cache entry
val df2 = Seq((3L, "c"), (4L, "d")).toDF("id", "data")
df2.write.insertInto(t)

// verify underlying plan is recached and picks up new data
val readDF2 = spark.table(t)
assertCached(readDF2)
checkAnswer(readDF2, Seq(Row(1L, "a"), Row(2L, "b"), Row(3L, "c"), Row(4L, "d")))
}
}

test("SPARK-54022: caching table via Dataset API should pin table state") {
val t = "testcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
withTable(t) {
sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo")
sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')")

// cache table
spark.table(t).cache()

// verify caching works as expected
assertCached(spark.table(t))
checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A")))

// modify table directly to mimic external changes
val table = catalog("testcat").loadTable(ident, util.Set.of(TableWritePrivilege.DELETE))
table.asInstanceOf[TruncatableTable].truncateTable()

// verify external changes have no impact on cached state
assertCached(spark.table(t))
checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A")))

// add more data within session that should invalidate cache
sql(s"INSERT INTO $t VALUES (10, 100, 'x')")

// table should be re-cached correctly
assertCached(spark.table(t))
checkAnswer(spark.table(t), Seq(Row(10, 100, "x")))
}
}

test("SPARK-54022: caching a query via Dataset API should not pin table state") {
val t = "testcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
withTable(t) {
sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo")
sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')")

// cache query on top of table
val df = spark.table(t).select("id")
df.cache()

// verify query caching works as expected
assertCached(spark.table(t).select("id"))
checkAnswer(spark.table(t).select("id"), Seq(Row(1), Row(2), Row(3)))

// verify table itself is not cached
assertNotCached(spark.table(t))
checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A")))

// modify table directly to mimic external changes
val table = catalog("testcat").loadTable(ident, util.Set.of(TableWritePrivilege.DELETE))
table.asInstanceOf[TruncatableTable].truncateTable()

// verify cached DataFrame is unaffected by external changes
assertCached(df)
checkAnswer(df, Seq(Row(1), Row(2), Row(3)))

// verify external changes are reflected correctly when table is queried
assertNotCached(spark.table(t))
checkAnswer(spark.table(t), Seq.empty)
}
}

test("SPARK-54504: self-subquery refreshes both table references before execution") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
Expand Down Expand Up @@ -2178,4 +2091,275 @@ class DataSourceV2DataFrameSuite
spark.listenerManager.unregister(listener)
}
}

// CACHE TABLE impact on reads.
// Tests that CACHE TABLE pins table state against external changes,
// while session writes invalidate and re-cache.

test("cached DSv2 table DataFrame is refreshed and reused after insert") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
val df1 = Seq((1L, "a"), (2L, "b")).toDF("id", "data")
df1.write.insertInto(t)

// cache DataFrame pointing to table
val readDF1 = spark.table(t)
readDF1.cache()
assertCached(readDF1)
checkAnswer(readDF1, Seq(Row(1L, "a"), Row(2L, "b")))

// insert more data, invalidating and refreshing cache entry
val df2 = Seq((3L, "c"), (4L, "d")).toDF("id", "data")
df2.write.insertInto(t)

// verify underlying plan is recached and picks up new data
val readDF2 = spark.table(t)
assertCached(readDF2)
checkAnswer(readDF2, Seq(Row(1L, "a"), Row(2L, "b"), Row(3L, "c"), Row(4L, "d")))
}
}

test("SPARK-54022: caching table via Dataset API should pin table state") {
val t = "testcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
withTable(t) {
sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo")
sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')")

// cache table
spark.table(t).cache()

// verify caching works as expected
assertCached(spark.table(t))
checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A")))

// modify table directly to mimic external changes
val table = catalog("testcat").loadTable(ident, util.Set.of(TableWritePrivilege.DELETE))
table.asInstanceOf[TruncatableTable].truncateTable()

// verify external changes have no impact on cached state
assertCached(spark.table(t))
checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A")))

// add more data within session that should invalidate cache
sql(s"INSERT INTO $t VALUES (10, 100, 'x')")

// table should be re-cached correctly
assertCached(spark.table(t))
checkAnswer(spark.table(t), Seq(Row(10, 100, "x")))
}
}

test("SPARK-54022: caching a query via Dataset API should not pin table state") {
val t = "testcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
withTable(t) {
sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo")
sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')")

// cache query on top of table
val df = spark.table(t).select("id")
df.cache()

// verify query caching works as expected
assertCached(spark.table(t).select("id"))
checkAnswer(spark.table(t).select("id"), Seq(Row(1), Row(2), Row(3)))

// verify table itself is not cached
assertNotCached(spark.table(t))
checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A")))

// modify table directly to mimic external changes
val table = catalog("testcat").loadTable(ident, util.Set.of(TableWritePrivilege.DELETE))
table.asInstanceOf[TruncatableTable].truncateTable()

// verify cached DataFrame is unaffected by external changes
assertCached(df)
checkAnswer(df, Seq(Row(1), Row(2), Row(3)))

// verify external changes are reflected correctly when table is queried
assertNotCached(spark.table(t))
checkAnswer(spark.table(t), Seq.empty)
}
}

// Scenario 1: external write after CACHE TABLE is invisible (cache pinned).
test("SPARK-54022: cached table pinned against external data write") {
val t = "testcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100)")

// cache the table
spark.table(t).cache()
assertCached(spark.table(t))
checkAnswer(spark.table(t), Seq(Row(1, 100)))

// external writer adds (2, 200) via direct catalog API
// (bypasses this session's CacheManager)
val schema = StructType.fromDDL("id INT, salary INT")
val extTable = catalog("testcat").loadTable(ident,
util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
extTable.withData(Array(
new BufferedRows(Seq.empty, schema).withRow(InternalRow(2, 200))))

// cache is pinned, external write invisible
assertCached(spark.table(t))
checkAnswer(spark.table(t), Seq(Row(1, 100)))

// REFRESH TABLE picks up external write
sql(s"REFRESH TABLE $t")
assertCached(spark.table(t))
checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200)))
}
}

// Scenario 2: session write invalidates cache; subsequent external write
// is again invisible.
test("SPARK-54022: session write invalidates cache, then external write invisible") {
val t = "testcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100)")

// cache the table
spark.table(t).cache()
assertCached(spark.table(t))
checkAnswer(spark.table(t), Seq(Row(1, 100)))

// session write invalidates the cache entry
sql(s"INSERT INTO $t VALUES (2, 200)")
assertCached(spark.table(t))
checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200)))

// external writer adds (3, 300) via direct catalog API
val schema = StructType.fromDDL("id INT, salary INT")
val extTable = catalog("testcat").loadTable(ident,
util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
extTable.withData(Array(
new BufferedRows(Seq.empty, schema).withRow(InternalRow(3, 300))))

// cache is re-pinned, external write invisible
assertCached(spark.table(t))
checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200)))

// REFRESH TABLE picks up external write
sql(s"REFRESH TABLE $t")
assertCached(spark.table(t))
checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200), Row(3, 300)))
}
}

// Scenario 3: external schema change after CACHE TABLE.
// Cache stays pinned at original 2-column schema; external ADD COLUMN
// is invisible.
test("SPARK-54022: cached table pinned against external schema change") {
val t = "testcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100)")

// cache table
spark.table(t).cache()
assertCached(spark.table(t))
checkAnswer(spark.table(t), Seq(Row(1, 100)))

// simulate external schema change + data write via direct catalog API
// (bypasses this session's CacheManager)
val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true)
catalog("testcat").alterTable(ident, addCol)

// external writer adds (2, 200, -1)
val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT")
val extTable = catalog("testcat").loadTable(ident,
util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
extTable.withData(Array(
new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1))))

// cache stays pinned at original 2-column schema
assertCached(spark.table(t))
checkAnswer(spark.table(t), Seq(Row(1, 100)))

// REFRESH TABLE picks up external schema change and data
sql(s"REFRESH TABLE $t")
assertCached(spark.table(t))
checkAnswer(spark.table(t), Seq(Row(1, 100, null), Row(2, 200, -1)))
}
}

// Scenario 4: session schema change invalidates cache; subsequent external
// write is invisible.
test("SPARK-54022: session schema change invalidates cache, external write invisible") {
val t = "testcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100)")

// cache table
spark.table(t).cache()
assertCached(spark.table(t))
checkAnswer(spark.table(t), Seq(Row(1, 100)))

// session schema change: invalidates cache, rebuilds with new schema
sql(s"ALTER TABLE $t ADD COLUMN new_column INT")

// after session ALTER, cache is rebuilt with 3-column schema
assertCached(spark.table(t))
checkAnswer(spark.table(t), Seq(Row(1, 100, null)))

// external writer adds (2, 200, -1) via direct catalog API
// (bypasses this session's CacheManager)
val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT")
val extTable = catalog("testcat").loadTable(ident,
util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
extTable.withData(Array(
new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1))))

// external write invisible: cache still shows (1, 100, null)
assertCached(spark.table(t))
checkAnswer(spark.table(t), Seq(Row(1, 100, null)))

// REFRESH TABLE picks up external write
sql(s"REFRESH TABLE $t")
assertCached(spark.table(t))
checkAnswer(spark.table(t), Seq(Row(1, 100, null), Row(2, 200, -1)))
}
}

// Scenario 5: external drop and recreate with same schema.
test("SPARK-54022: cached table after external drop and recreate sees empty table") {
val t = "testcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100)")

// cache table
spark.table(t).cache()
assertCached(spark.table(t))
checkAnswer(spark.table(t), Seq(Row(1, 100)))

// simulate external drop and recreate with same schema
catalog("testcat").dropTable(ident)
catalog("testcat").createTable(
ident = ident,
columns = Array(
Column.create("id", IntegerType),
Column.create("salary", IntegerType)),
partitions = Array.empty,
properties = Collections.emptyMap[String, String])

// query sees the new empty table
checkAnswer(spark.table(t), Seq.empty)

// REFRESH TABLE on the new empty table
sql(s"REFRESH TABLE $t")
checkAnswer(spark.table(t), Seq.empty)
}
}
}