diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index eadaafea81a53..6ea7099224179 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -25,9 +25,10 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode} import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect} -import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, DefaultValue, Identifier, InMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo} +import org.apache.spark.sql.connector.catalog.{BufferedRows, Column, ColumnDefaultValue, DefaultValue, Identifier, InMemoryBaseTable, InMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo} import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, UpdateColumnDefaultValue} import org.apache.spark.sql.connector.catalog.TableChange @@ -40,7 +41,7 @@ import org.apache.spark.sql.execution.ExplainUtils.stripAQEPlan import org.apache.spark.sql.execution.datasources.v2.{AlterTableExec, CreateTableExec, DataSourceV2Relation, ReplaceTableExec} import org.apache.spark.sql.functions.lit import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{BooleanType, CalendarIntervalType, DoubleType, IntegerType, StringType, TimestampType} +import org.apache.spark.sql.types.{BooleanType, CalendarIntervalType, DoubleType, IntegerType, StringType, StructType, TimestampType} import org.apache.spark.sql.util.QueryExecutionListener import org.apache.spark.unsafe.types.UTF8String @@ -1759,94 +1760,6 @@ class DataSourceV2DataFrameSuite } } - test("cached DSv2 table DataFrame is refreshed and reused after insert") { - val t = "testcat.ns1.ns2.tbl" - withTable(t) { - sql(s"CREATE TABLE $t (id bigint, data string) USING foo") - val df1 = Seq((1L, "a"), (2L, "b")).toDF("id", "data") - df1.write.insertInto(t) - - // cache DataFrame pointing to table - val readDF1 = spark.table(t) - readDF1.cache() - assertCached(readDF1) - checkAnswer(readDF1, Seq(Row(1L, "a"), Row(2L, "b"))) - - // insert more data, invalidating and refreshing cache entry - val df2 = Seq((3L, "c"), (4L, "d")).toDF("id", "data") - df2.write.insertInto(t) - - // verify underlying plan is recached and picks up new data - val readDF2 = spark.table(t) - assertCached(readDF2) - checkAnswer(readDF2, Seq(Row(1L, "a"), Row(2L, "b"), Row(3L, "c"), Row(4L, "d"))) - } - } - - test("SPARK-54022: caching table via Dataset API should pin table state") { - val t = "testcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo") - sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')") - - // cache table - spark.table(t).cache() - - // verify caching works as expected - assertCached(spark.table(t)) - checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A"))) - - // modify table directly to mimic external changes - val table = catalog("testcat").loadTable(ident, util.Set.of(TableWritePrivilege.DELETE)) - table.asInstanceOf[TruncatableTable].truncateTable() - - // verify external changes have no impact on cached state - assertCached(spark.table(t)) - checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A"))) - - // add more data within session that should invalidate cache - sql(s"INSERT INTO $t VALUES (10, 100, 'x')") - - // table should be re-cached correctly - assertCached(spark.table(t)) - checkAnswer(spark.table(t), Seq(Row(10, 100, "x"))) - } - } - - test("SPARK-54022: caching a query via Dataset API should not pin table state") { - val t = "testcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo") - sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')") - - // cache query on top of table - val df = spark.table(t).select("id") - df.cache() - - // verify query caching works as expected - assertCached(spark.table(t).select("id")) - checkAnswer(spark.table(t).select("id"), Seq(Row(1), Row(2), Row(3))) - - // verify table itself is not cached - assertNotCached(spark.table(t)) - checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A"))) - - // modify table directly to mimic external changes - val table = catalog("testcat").loadTable(ident, util.Set.of(TableWritePrivilege.DELETE)) - table.asInstanceOf[TruncatableTable].truncateTable() - - // verify cached DataFrame is unaffected by external changes - assertCached(df) - checkAnswer(df, Seq(Row(1), Row(2), Row(3))) - - // verify external changes are reflected correctly when table is queried - assertNotCached(spark.table(t)) - checkAnswer(spark.table(t), Seq.empty) - } - } - test("SPARK-54504: self-subquery refreshes both table references before execution") { val t = "testcat.ns1.ns2.tbl" withTable(t) { @@ -2178,4 +2091,275 @@ class DataSourceV2DataFrameSuite spark.listenerManager.unregister(listener) } } + + // CACHE TABLE impact on reads. + // Tests that CACHE TABLE pins table state against external changes, + // while session writes invalidate and re-cache. + + test("cached DSv2 table DataFrame is refreshed and reused after insert") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id bigint, data string) USING foo") + val df1 = Seq((1L, "a"), (2L, "b")).toDF("id", "data") + df1.write.insertInto(t) + + // cache DataFrame pointing to table + val readDF1 = spark.table(t) + readDF1.cache() + assertCached(readDF1) + checkAnswer(readDF1, Seq(Row(1L, "a"), Row(2L, "b"))) + + // insert more data, invalidating and refreshing cache entry + val df2 = Seq((3L, "c"), (4L, "d")).toDF("id", "data") + df2.write.insertInto(t) + + // verify underlying plan is recached and picks up new data + val readDF2 = spark.table(t) + assertCached(readDF2) + checkAnswer(readDF2, Seq(Row(1L, "a"), Row(2L, "b"), Row(3L, "c"), Row(4L, "d"))) + } + } + + test("SPARK-54022: caching table via Dataset API should pin table state") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo") + sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')") + + // cache table + spark.table(t).cache() + + // verify caching works as expected + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A"))) + + // modify table directly to mimic external changes + val table = catalog("testcat").loadTable(ident, util.Set.of(TableWritePrivilege.DELETE)) + table.asInstanceOf[TruncatableTable].truncateTable() + + // verify external changes have no impact on cached state + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A"))) + + // add more data within session that should invalidate cache + sql(s"INSERT INTO $t VALUES (10, 100, 'x')") + + // table should be re-cached correctly + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(10, 100, "x"))) + } + } + + test("SPARK-54022: caching a query via Dataset API should not pin table state") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo") + sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')") + + // cache query on top of table + val df = spark.table(t).select("id") + df.cache() + + // verify query caching works as expected + assertCached(spark.table(t).select("id")) + checkAnswer(spark.table(t).select("id"), Seq(Row(1), Row(2), Row(3))) + + // verify table itself is not cached + assertNotCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A"))) + + // modify table directly to mimic external changes + val table = catalog("testcat").loadTable(ident, util.Set.of(TableWritePrivilege.DELETE)) + table.asInstanceOf[TruncatableTable].truncateTable() + + // verify cached DataFrame is unaffected by external changes + assertCached(df) + checkAnswer(df, Seq(Row(1), Row(2), Row(3))) + + // verify external changes are reflected correctly when table is queried + assertNotCached(spark.table(t)) + checkAnswer(spark.table(t), Seq.empty) + } + } + + // Scenario 1: external write after CACHE TABLE is invisible (cache pinned). + test("SPARK-54022: cached table pinned against external data write") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // cache the table + spark.table(t).cache() + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100))) + + // external writer adds (2, 200) via direct catalog API + // (bypasses this session's CacheManager) + val schema = StructType.fromDDL("id INT, salary INT") + val extTable = catalog("testcat").loadTable(ident, + util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema).withRow(InternalRow(2, 200)))) + + // cache is pinned, external write invisible + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100))) + + // REFRESH TABLE picks up external write + sql(s"REFRESH TABLE $t") + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200))) + } + } + + // Scenario 2: session write invalidates cache; subsequent external write + // is again invisible. + test("SPARK-54022: session write invalidates cache, then external write invisible") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // cache the table + spark.table(t).cache() + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100))) + + // session write invalidates the cache entry + sql(s"INSERT INTO $t VALUES (2, 200)") + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200))) + + // external writer adds (3, 300) via direct catalog API + val schema = StructType.fromDDL("id INT, salary INT") + val extTable = catalog("testcat").loadTable(ident, + util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema).withRow(InternalRow(3, 300)))) + + // cache is re-pinned, external write invisible + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200))) + + // REFRESH TABLE picks up external write + sql(s"REFRESH TABLE $t") + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200), Row(3, 300))) + } + } + + // Scenario 3: external schema change after CACHE TABLE. + // Cache stays pinned at original 2-column schema; external ADD COLUMN + // is invisible. + test("SPARK-54022: cached table pinned against external schema change") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // cache table + spark.table(t).cache() + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100))) + + // simulate external schema change + data write via direct catalog API + // (bypasses this session's CacheManager) + val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true) + catalog("testcat").alterTable(ident, addCol) + + // external writer adds (2, 200, -1) + val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT") + val extTable = catalog("testcat").loadTable(ident, + util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1)))) + + // cache stays pinned at original 2-column schema + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100))) + + // REFRESH TABLE picks up external schema change and data + sql(s"REFRESH TABLE $t") + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100, null), Row(2, 200, -1))) + } + } + + // Scenario 4: session schema change invalidates cache; subsequent external + // write is invisible. + test("SPARK-54022: session schema change invalidates cache, external write invisible") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // cache table + spark.table(t).cache() + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100))) + + // session schema change: invalidates cache, rebuilds with new schema + sql(s"ALTER TABLE $t ADD COLUMN new_column INT") + + // after session ALTER, cache is rebuilt with 3-column schema + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100, null))) + + // external writer adds (2, 200, -1) via direct catalog API + // (bypasses this session's CacheManager) + val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT") + val extTable = catalog("testcat").loadTable(ident, + util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1)))) + + // external write invisible: cache still shows (1, 100, null) + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100, null))) + + // REFRESH TABLE picks up external write + sql(s"REFRESH TABLE $t") + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100, null), Row(2, 200, -1))) + } + } + + // Scenario 5: external drop and recreate with same schema. + test("SPARK-54022: cached table after external drop and recreate sees empty table") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // cache table + spark.table(t).cache() + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100))) + + // simulate external drop and recreate with same schema + catalog("testcat").dropTable(ident) + catalog("testcat").createTable( + ident = ident, + columns = Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType)), + partitions = Array.empty, + properties = Collections.emptyMap[String, String]) + + // query sees the new empty table + checkAnswer(spark.table(t), Seq.empty) + + // REFRESH TABLE on the new empty table + sql(s"REFRESH TABLE $t") + checkAnswer(spark.table(t), Seq.empty) + } + } }