From 75db50d820845427eed5716e6ad89b65d6337c34 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Fri, 24 Apr 2026 12:31:41 +0000 Subject: [PATCH 1/9] [SPARK-54022][SQL][TESTS] Add CACHE TABLE scenario 3, 4, 5 tests and reorganize cache tests Move existing cache-pinning tests to the bottom of DataSourceV2DataFrameSuite and add three new tests covering design doc Section [5] scenarios: - Scenario 3: external schema change is invisible to cached table - Scenario 4: session schema change invalidates and rebuilds cache - Scenario 5: external drop/recreate produces new table ID, query sees empty table Co-authored-by: Isaac --- .../DataSourceV2DataFrameSuite.scala | 261 ++++++++++++------ 1 file changed, 173 insertions(+), 88 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index eadaafea81a53..a2306583b3508 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -1759,94 +1759,6 @@ class DataSourceV2DataFrameSuite } } - test("cached DSv2 table DataFrame is refreshed and reused after insert") { - val t = "testcat.ns1.ns2.tbl" - withTable(t) { - sql(s"CREATE TABLE $t (id bigint, data string) USING foo") - val df1 = Seq((1L, "a"), (2L, "b")).toDF("id", "data") - df1.write.insertInto(t) - - // cache DataFrame pointing to table - val readDF1 = spark.table(t) - readDF1.cache() - assertCached(readDF1) - checkAnswer(readDF1, Seq(Row(1L, "a"), Row(2L, "b"))) - - // insert more data, invalidating and refreshing cache entry - val df2 = Seq((3L, "c"), (4L, "d")).toDF("id", "data") - df2.write.insertInto(t) - - // verify underlying plan is recached and picks up new data - val readDF2 = spark.table(t) - assertCached(readDF2) - checkAnswer(readDF2, Seq(Row(1L, "a"), Row(2L, "b"), Row(3L, "c"), Row(4L, "d"))) - } - } - - test("SPARK-54022: caching table via Dataset API should pin table state") { - val t = "testcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo") - sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')") - - // cache table - spark.table(t).cache() - - // verify caching works as expected - assertCached(spark.table(t)) - checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A"))) - - // modify table directly to mimic external changes - val table = catalog("testcat").loadTable(ident, util.Set.of(TableWritePrivilege.DELETE)) - table.asInstanceOf[TruncatableTable].truncateTable() - - // verify external changes have no impact on cached state - assertCached(spark.table(t)) - checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A"))) - - // add more data within session that should invalidate cache - sql(s"INSERT INTO $t VALUES (10, 100, 'x')") - - // table should be re-cached correctly - assertCached(spark.table(t)) - checkAnswer(spark.table(t), Seq(Row(10, 100, "x"))) - } - } - - test("SPARK-54022: caching a query via Dataset API should not pin table state") { - val t = "testcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo") - sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')") - - // cache query on top of table - val df = spark.table(t).select("id") - df.cache() - - // verify query caching works as expected - assertCached(spark.table(t).select("id")) - checkAnswer(spark.table(t).select("id"), Seq(Row(1), Row(2), Row(3))) - - // verify table itself is not cached - assertNotCached(spark.table(t)) - checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A"))) - - // modify table directly to mimic external changes - val table = catalog("testcat").loadTable(ident, util.Set.of(TableWritePrivilege.DELETE)) - table.asInstanceOf[TruncatableTable].truncateTable() - - // verify cached DataFrame is unaffected by external changes - assertCached(df) - checkAnswer(df, Seq(Row(1), Row(2), Row(3))) - - // verify external changes are reflected correctly when table is queried - assertNotCached(spark.table(t)) - checkAnswer(spark.table(t), Seq.empty) - } - } - test("SPARK-54504: self-subquery refreshes both table references before execution") { val t = "testcat.ns1.ns2.tbl" withTable(t) { @@ -2178,4 +2090,177 @@ class DataSourceV2DataFrameSuite spark.listenerManager.unregister(listener) } } + + // Design doc Section [5]: CACHE TABLE impact on reads. + // Tests that CACHE TABLE pins table state against external changes, + // while session writes invalidate and re-cache. + + // Scenario 1 (external write) + Scenario 2 (session write): + // External truncation is invisible to cached table; session INSERT + // invalidates cache and rebuilds with fresh data. + test("SPARK-54022: caching table via Dataset API should pin table state") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo") + sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')") + + // cache table + spark.table(t).cache() + + // verify caching works as expected + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A"))) + + // modify table directly to mimic external changes + val table = catalog("testcat").loadTable(ident, util.Set.of(TableWritePrivilege.DELETE)) + table.asInstanceOf[TruncatableTable].truncateTable() + + // verify external changes have no impact on cached state + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A"))) + + // add more data within session that should invalidate cache + sql(s"INSERT INTO $t VALUES (10, 100, 'x')") + + // table should be re-cached correctly + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(10, 100, "x"))) + } + } + + test("SPARK-54022: caching a query via Dataset API should not pin table state") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo") + sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')") + + // cache query on top of table + val df = spark.table(t).select("id") + df.cache() + + // verify query caching works as expected + assertCached(spark.table(t).select("id")) + checkAnswer(spark.table(t).select("id"), Seq(Row(1), Row(2), Row(3))) + + // verify table itself is not cached + assertNotCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A"))) + + // modify table directly to mimic external changes + val table = catalog("testcat").loadTable(ident, util.Set.of(TableWritePrivilege.DELETE)) + table.asInstanceOf[TruncatableTable].truncateTable() + + // verify cached DataFrame is unaffected by external changes + assertCached(df) + checkAnswer(df, Seq(Row(1), Row(2), Row(3))) + + // verify external changes are reflected correctly when table is queried + assertNotCached(spark.table(t)) + checkAnswer(spark.table(t), Seq.empty) + } + } + + // Scenario 2 variant: session write invalidates and refreshes cache. + test("cached DSv2 table DataFrame is refreshed and reused after insert") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id bigint, data string) USING foo") + val df1 = Seq((1L, "a"), (2L, "b")).toDF("id", "data") + df1.write.insertInto(t) + + // cache DataFrame pointing to table + val readDF1 = spark.table(t) + readDF1.cache() + assertCached(readDF1) + checkAnswer(readDF1, Seq(Row(1L, "a"), Row(2L, "b"))) + + // insert more data, invalidating and refreshing cache entry + val df2 = Seq((3L, "c"), (4L, "d")).toDF("id", "data") + df2.write.insertInto(t) + + // verify underlying plan is recached and picks up new data + val readDF2 = spark.table(t) + assertCached(readDF2) + checkAnswer(readDF2, Seq(Row(1L, "a"), Row(2L, "b"), Row(3L, "c"), Row(4L, "d"))) + } + } + + // Scenario 3: external schema change after CACHE TABLE. + // Cache stays pinned at original 2-column schema; external ADD COLUMN + // is invisible. + test("SPARK-54022: cached table pinned against external schema change") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // cache table + spark.table(t).cache() + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100))) + + // simulate external schema change via direct catalog API + // (bypasses this session's CacheManager) + val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true) + catalog("testcat").alterTable(ident, addCol) + + // cache stays pinned at original 2-column schema + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100))) + } + } + + // Scenario 4: session schema change invalidates cache. + test("SPARK-54022: session schema change invalidates cache") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // cache table + spark.table(t).cache() + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100))) + + // session schema change: invalidates cache, rebuilds with new schema + sql(s"ALTER TABLE $t ADD COLUMN new_column INT") + + // after session ALTER, cache is rebuilt with 3-column schema + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100, null))) + } + } + + // Scenario 5: external drop and recreate with same schema. + // The new table has a different ID; the query detects the ID change + // and sees the new empty table. + test("SPARK-54022: cached table after external drop and recreate sees empty table") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // cache table + spark.table(t).cache() + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100))) + + // simulate external drop and recreate with same schema + catalog("testcat").dropTable(ident) + catalog("testcat").createTable( + ident, + Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType)), + Array.empty, + Collections.emptyMap[String, String]) + + // the new table has a different ID; query sees the new empty table + checkAnswer(spark.table(t), Seq.empty) + } + } } From 707d30e93961eb06f626d5f6eb952349e43ee958 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Fri, 24 Apr 2026 12:35:40 +0000 Subject: [PATCH 2/9] Remove 'Design doc Section [5]:' from comment Co-authored-by: Isaac --- .../apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index a2306583b3508..921625f0dd6b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -2091,7 +2091,7 @@ class DataSourceV2DataFrameSuite } } - // Design doc Section [5]: CACHE TABLE impact on reads. + // CACHE TABLE impact on reads. // Tests that CACHE TABLE pins table state against external changes, // while session writes invalidate and re-cache. From e3c6f4dc366ee2f7b18ae06d2e028ab6b327df26 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Fri, 24 Apr 2026 13:21:31 +0000 Subject: [PATCH 3/9] Rewrite cache tests to match design doc scenarios exactly - Combine Scenarios 1+2 into single test following design doc flow - Remove separate "variant" test - Scenario 4: add external write after session schema change - Scenario 5: remove ID references from comments Co-authored-by: Isaac --- .../DataSourceV2DataFrameSuite.scala | 115 ++++++------------ 1 file changed, 36 insertions(+), 79 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 921625f0dd6b8..04472a0bbf910 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -2095,95 +2095,43 @@ class DataSourceV2DataFrameSuite // Tests that CACHE TABLE pins table state against external changes, // while session writes invalidate and re-cache. - // Scenario 1 (external write) + Scenario 2 (session write): - // External truncation is invisible to cached table; session INSERT - // invalidates cache and rebuilds with fresh data. - test("SPARK-54022: caching table via Dataset API should pin table state") { + // Scenario 1: external write after CACHE TABLE is invisible (cache pinned). + // Scenario 2: session write invalidates cache; subsequent external write + // is again invisible. + test("SPARK-54022: CACHE TABLE pins state; session write invalidates, external does not") { val t = "testcat.ns1.ns2.tbl" val ident = Identifier.of(Array("ns1", "ns2"), "tbl") withTable(t) { - sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo") - sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')") + // create a table and insert initial data + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") - // cache table + // cache the table spark.table(t).cache() - - // verify caching works as expected assertCached(spark.table(t)) - checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A"))) + checkAnswer(spark.table(t), Seq(Row(1, 100))) - // modify table directly to mimic external changes + // Scenario 1: external writer truncates table via direct catalog API + // (bypasses this session's CacheManager) val table = catalog("testcat").loadTable(ident, util.Set.of(TableWritePrivilege.DELETE)) table.asInstanceOf[TruncatableTable].truncateTable() - // verify external changes have no impact on cached state + // query the table again: cache is pinned, external change invisible assertCached(spark.table(t)) - checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A"))) - - // add more data within session that should invalidate cache - sql(s"INSERT INTO $t VALUES (10, 100, 'x')") + checkAnswer(spark.table(t), Seq(Row(1, 100))) - // table should be re-cached correctly + // Scenario 2: session write invalidates the cache entry + sql(s"INSERT INTO $t VALUES (2, 200)") assertCached(spark.table(t)) - checkAnswer(spark.table(t), Seq(Row(10, 100, "x"))) - } - } - - test("SPARK-54022: caching a query via Dataset API should not pin table state") { - val t = "testcat.ns1.ns2.tbl" - val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - withTable(t) { - sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo") - sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')") - - // cache query on top of table - val df = spark.table(t).select("id") - df.cache() - - // verify query caching works as expected - assertCached(spark.table(t).select("id")) - checkAnswer(spark.table(t).select("id"), Seq(Row(1), Row(2), Row(3))) - - // verify table itself is not cached - assertNotCached(spark.table(t)) - checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A"))) + checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200))) - // modify table directly to mimic external changes - val table = catalog("testcat").loadTable(ident, util.Set.of(TableWritePrivilege.DELETE)) - table.asInstanceOf[TruncatableTable].truncateTable() - - // verify cached DataFrame is unaffected by external changes - assertCached(df) - checkAnswer(df, Seq(Row(1), Row(2), Row(3))) - - // verify external changes are reflected correctly when table is queried - assertNotCached(spark.table(t)) - checkAnswer(spark.table(t), Seq.empty) - } - } - - // Scenario 2 variant: session write invalidates and refreshes cache. - test("cached DSv2 table DataFrame is refreshed and reused after insert") { - val t = "testcat.ns1.ns2.tbl" - withTable(t) { - sql(s"CREATE TABLE $t (id bigint, data string) USING foo") - val df1 = Seq((1L, "a"), (2L, "b")).toDF("id", "data") - df1.write.insertInto(t) - - // cache DataFrame pointing to table - val readDF1 = spark.table(t) - readDF1.cache() - assertCached(readDF1) - checkAnswer(readDF1, Seq(Row(1L, "a"), Row(2L, "b"))) + // external writer truncates again via direct catalog API + val table2 = catalog("testcat").loadTable(ident, util.Set.of(TableWritePrivilege.DELETE)) + table2.asInstanceOf[TruncatableTable].truncateTable() - // insert more data, invalidating and refreshing cache entry - val df2 = Seq((3L, "c"), (4L, "d")).toDF("id", "data") - df2.write.insertInto(t) - - // verify underlying plan is recached and picks up new data - val readDF2 = spark.table(t) - assertCached(readDF2) - checkAnswer(readDF2, Seq(Row(1L, "a"), Row(2L, "b"), Row(3L, "c"), Row(4L, "d"))) + // query the table again: cache is re-pinned, external change invisible + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200))) } } @@ -2213,9 +2161,11 @@ class DataSourceV2DataFrameSuite } } - // Scenario 4: session schema change invalidates cache. - test("SPARK-54022: session schema change invalidates cache") { + // Scenario 4: session schema change invalidates cache; subsequent external + // write is invisible. + test("SPARK-54022: session schema change invalidates cache, external write invisible") { val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") withTable(t) { sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") sql(s"INSERT INTO $t VALUES (1, 100)") @@ -2231,12 +2181,19 @@ class DataSourceV2DataFrameSuite // after session ALTER, cache is rebuilt with 3-column schema assertCached(spark.table(t)) checkAnswer(spark.table(t), Seq(Row(1, 100, null))) + + // simulate external writer adds (2, 200, -1) by truncating via + // direct catalog API (bypasses this session's CacheManager) + val table = catalog("testcat").loadTable(ident, util.Set.of(TableWritePrivilege.DELETE)) + table.asInstanceOf[TruncatableTable].truncateTable() + + // external change invisible: cache still shows (1, 100, null) + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100, null))) } } // Scenario 5: external drop and recreate with same schema. - // The new table has a different ID; the query detects the ID change - // and sees the new empty table. test("SPARK-54022: cached table after external drop and recreate sees empty table") { val t = "testcat.ns1.ns2.tbl" val ident = Identifier.of(Array("ns1", "ns2"), "tbl") @@ -2259,7 +2216,7 @@ class DataSourceV2DataFrameSuite Array.empty, Collections.emptyMap[String, String]) - // the new table has a different ID; query sees the new empty table + // query sees the new empty table checkAnswer(spark.table(t), Seq.empty) } } From 2e6851d7d667ad5e27debe1b7940db0875ea9799 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Fri, 24 Apr 2026 13:25:36 +0000 Subject: [PATCH 4/9] Restore original 3 cache tests, use withData for external inserts - Restore all 3 original cache tests (moved to bottom) - New Scenario 1+2 test uses InMemoryBaseTable.withData() to simulate true external INSERT bypassing session CacheManager - Scenario 4 also uses withData() for external write simulation Co-authored-by: Isaac --- .../DataSourceV2DataFrameSuite.scala | 124 +++++++++++++++--- 1 file changed, 109 insertions(+), 15 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 04472a0bbf910..5f7a69ca6d344 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode} import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect} -import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, DefaultValue, Identifier, InMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo} +import org.apache.spark.sql.connector.catalog.{BufferedRows, Column, ColumnDefaultValue, DefaultValue, Identifier, InMemoryBaseTable, InMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo} import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, UpdateColumnDefaultValue} import org.apache.spark.sql.connector.catalog.TableChange @@ -40,7 +40,8 @@ import org.apache.spark.sql.execution.ExplainUtils.stripAQEPlan import org.apache.spark.sql.execution.datasources.v2.{AlterTableExec, CreateTableExec, DataSourceV2Relation, ReplaceTableExec} import org.apache.spark.sql.functions.lit import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{BooleanType, CalendarIntervalType, DoubleType, IntegerType, StringType, TimestampType} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.{BooleanType, CalendarIntervalType, DoubleType, IntegerType, StringType, StructType, TimestampType} import org.apache.spark.sql.util.QueryExecutionListener import org.apache.spark.unsafe.types.UTF8String @@ -2095,6 +2096,94 @@ class DataSourceV2DataFrameSuite // Tests that CACHE TABLE pins table state against external changes, // while session writes invalidate and re-cache. + test("cached DSv2 table DataFrame is refreshed and reused after insert") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id bigint, data string) USING foo") + val df1 = Seq((1L, "a"), (2L, "b")).toDF("id", "data") + df1.write.insertInto(t) + + // cache DataFrame pointing to table + val readDF1 = spark.table(t) + readDF1.cache() + assertCached(readDF1) + checkAnswer(readDF1, Seq(Row(1L, "a"), Row(2L, "b"))) + + // insert more data, invalidating and refreshing cache entry + val df2 = Seq((3L, "c"), (4L, "d")).toDF("id", "data") + df2.write.insertInto(t) + + // verify underlying plan is recached and picks up new data + val readDF2 = spark.table(t) + assertCached(readDF2) + checkAnswer(readDF2, Seq(Row(1L, "a"), Row(2L, "b"), Row(3L, "c"), Row(4L, "d"))) + } + } + + test("SPARK-54022: caching table via Dataset API should pin table state") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo") + sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')") + + // cache table + spark.table(t).cache() + + // verify caching works as expected + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A"))) + + // modify table directly to mimic external changes + val table = catalog("testcat").loadTable(ident, util.Set.of(TableWritePrivilege.DELETE)) + table.asInstanceOf[TruncatableTable].truncateTable() + + // verify external changes have no impact on cached state + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A"))) + + // add more data within session that should invalidate cache + sql(s"INSERT INTO $t VALUES (10, 100, 'x')") + + // table should be re-cached correctly + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(10, 100, "x"))) + } + } + + test("SPARK-54022: caching a query via Dataset API should not pin table state") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo") + sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')") + + // cache query on top of table + val df = spark.table(t).select("id") + df.cache() + + // verify query caching works as expected + assertCached(spark.table(t).select("id")) + checkAnswer(spark.table(t).select("id"), Seq(Row(1), Row(2), Row(3))) + + // verify table itself is not cached + assertNotCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A"))) + + // modify table directly to mimic external changes + val table = catalog("testcat").loadTable(ident, util.Set.of(TableWritePrivilege.DELETE)) + table.asInstanceOf[TruncatableTable].truncateTable() + + // verify cached DataFrame is unaffected by external changes + assertCached(df) + checkAnswer(df, Seq(Row(1), Row(2), Row(3))) + + // verify external changes are reflected correctly when table is queried + assertNotCached(spark.table(t)) + checkAnswer(spark.table(t), Seq.empty) + } + } + // Scenario 1: external write after CACHE TABLE is invisible (cache pinned). // Scenario 2: session write invalidates cache; subsequent external write // is again invisible. @@ -2111,12 +2200,14 @@ class DataSourceV2DataFrameSuite assertCached(spark.table(t)) checkAnswer(spark.table(t), Seq(Row(1, 100))) - // Scenario 1: external writer truncates table via direct catalog API + // Scenario 1: external writer adds (2, 200) via direct catalog API // (bypasses this session's CacheManager) - val table = catalog("testcat").loadTable(ident, util.Set.of(TableWritePrivilege.DELETE)) - table.asInstanceOf[TruncatableTable].truncateTable() + val schema = StructType.fromDDL("id INT, salary INT") + val extTable = catalog("testcat").loadTable(ident).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema).withRow(InternalRow(2, 200)))) - // query the table again: cache is pinned, external change invisible + // query the table again: cache is pinned, external write invisible assertCached(spark.table(t)) checkAnswer(spark.table(t), Seq(Row(1, 100))) @@ -2125,11 +2216,12 @@ class DataSourceV2DataFrameSuite assertCached(spark.table(t)) checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200))) - // external writer truncates again via direct catalog API - val table2 = catalog("testcat").loadTable(ident, util.Set.of(TableWritePrivilege.DELETE)) - table2.asInstanceOf[TruncatableTable].truncateTable() + // external writer adds (3, 300) via direct catalog API + val extTable2 = catalog("testcat").loadTable(ident).asInstanceOf[InMemoryBaseTable] + extTable2.withData(Array( + new BufferedRows(Seq.empty, schema).withRow(InternalRow(3, 300)))) - // query the table again: cache is re-pinned, external change invisible + // query the table again: cache is re-pinned, external write invisible assertCached(spark.table(t)) checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200))) } @@ -2182,12 +2274,14 @@ class DataSourceV2DataFrameSuite assertCached(spark.table(t)) checkAnswer(spark.table(t), Seq(Row(1, 100, null))) - // simulate external writer adds (2, 200, -1) by truncating via - // direct catalog API (bypasses this session's CacheManager) - val table = catalog("testcat").loadTable(ident, util.Set.of(TableWritePrivilege.DELETE)) - table.asInstanceOf[TruncatableTable].truncateTable() + // external writer adds (2, 200, -1) via direct catalog API + // (bypasses this session's CacheManager) + val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT") + val extTable = catalog("testcat").loadTable(ident).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1)))) - // external change invisible: cache still shows (1, 100, null) + // external write invisible: cache still shows (1, 100, null) assertCached(spark.table(t)) checkAnswer(spark.table(t), Seq(Row(1, 100, null))) } From e83008fc9c871b328057ffe19a134eb89b72699c Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Fri, 24 Apr 2026 13:26:18 +0000 Subject: [PATCH 5/9] Use named arguments in createTable call Co-authored-by: Isaac --- .../spark/sql/connector/DataSourceV2DataFrameSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 5f7a69ca6d344..17965f31442a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -2303,12 +2303,12 @@ class DataSourceV2DataFrameSuite // simulate external drop and recreate with same schema catalog("testcat").dropTable(ident) catalog("testcat").createTable( - ident, - Array( + ident = ident, + columns = Array( Column.create("id", IntegerType), Column.create("salary", IntegerType)), - Array.empty, - Collections.emptyMap[String, String]) + partitions = Array.empty, + properties = Collections.emptyMap[String, String]) // query sees the new empty table checkAnswer(spark.table(t), Seq.empty) From 51a796d847b6ea9b57444b615303f09fa9613cdb Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Fri, 24 Apr 2026 13:29:02 +0000 Subject: [PATCH 6/9] Scenario 3: add external data write after schema change Co-authored-by: Isaac --- .../spark/sql/connector/DataSourceV2DataFrameSuite.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 17965f31442a1..5194f98628d17 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -2242,11 +2242,17 @@ class DataSourceV2DataFrameSuite assertCached(spark.table(t)) checkAnswer(spark.table(t), Seq(Row(1, 100))) - // simulate external schema change via direct catalog API + // simulate external schema change + data write via direct catalog API // (bypasses this session's CacheManager) val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true) catalog("testcat").alterTable(ident, addCol) + // external writer adds (2, 200, -1) + val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT") + val extTable = catalog("testcat").loadTable(ident).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1)))) + // cache stays pinned at original 2-column schema assertCached(spark.table(t)) checkAnswer(spark.table(t), Seq(Row(1, 100))) From 4f130bc57f729e48f533864e70d370062f42ca9e Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Fri, 24 Apr 2026 13:30:16 +0000 Subject: [PATCH 7/9] Add REFRESH TABLE verification at the end of each cache scenario test Co-authored-by: Isaac --- .../DataSourceV2DataFrameSuite.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 5194f98628d17..5f70c1506a3ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -2224,6 +2224,11 @@ class DataSourceV2DataFrameSuite // query the table again: cache is re-pinned, external write invisible assertCached(spark.table(t)) checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200))) + + // REFRESH TABLE picks up all external changes + sql(s"REFRESH TABLE $t") + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200), Row(3, 300))) } } @@ -2256,6 +2261,11 @@ class DataSourceV2DataFrameSuite // cache stays pinned at original 2-column schema assertCached(spark.table(t)) checkAnswer(spark.table(t), Seq(Row(1, 100))) + + // REFRESH TABLE picks up external schema change and data + sql(s"REFRESH TABLE $t") + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100, null), Row(2, 200, -1))) } } @@ -2290,6 +2300,11 @@ class DataSourceV2DataFrameSuite // external write invisible: cache still shows (1, 100, null) assertCached(spark.table(t)) checkAnswer(spark.table(t), Seq(Row(1, 100, null))) + + // REFRESH TABLE picks up external write + sql(s"REFRESH TABLE $t") + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100, null), Row(2, 200, -1))) } } @@ -2318,6 +2333,10 @@ class DataSourceV2DataFrameSuite // query sees the new empty table checkAnswer(spark.table(t), Seq.empty) + + // REFRESH TABLE on the new empty table + sql(s"REFRESH TABLE $t") + checkAnswer(spark.table(t), Seq.empty) } } } From 4ba967e2bd5314fa0f0deccf75c67f5d5c0a75bf Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Fri, 24 Apr 2026 13:35:14 +0000 Subject: [PATCH 8/9] Fix scalastyle: move InternalRow import to correct alphabetical position Co-authored-by: Isaac --- .../apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 5f70c1506a3ed..47e4ac7187408 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -25,6 +25,7 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode} import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect} import org.apache.spark.sql.connector.catalog.{BufferedRows, Column, ColumnDefaultValue, DefaultValue, Identifier, InMemoryBaseTable, InMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo} @@ -40,7 +41,6 @@ import org.apache.spark.sql.execution.ExplainUtils.stripAQEPlan import org.apache.spark.sql.execution.datasources.v2.{AlterTableExec, CreateTableExec, DataSourceV2Relation, ReplaceTableExec} import org.apache.spark.sql.functions.lit import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.{BooleanType, CalendarIntervalType, DoubleType, IntegerType, StringType, StructType, TimestampType} import org.apache.spark.sql.util.QueryExecutionListener import org.apache.spark.unsafe.types.UTF8String From 0c368a20df1a761900689b621d516419c5ff79e9 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Fri, 24 Apr 2026 13:55:55 +0000 Subject: [PATCH 9/9] Fix external write simulation and split Scenarios 1 and 2 - Use loadTable with INSERT privilege to get original table (not copy) for withData calls, fixing copyOnLoad interaction - Split Scenarios 1+2 into separate tests to avoid data accumulation - All 8 cache tests pass locally Co-authored-by: Isaac --- .../DataSourceV2DataFrameSuite.scala | 51 ++++++++++++++----- 1 file changed, 37 insertions(+), 14 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 47e4ac7187408..6ea7099224179 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -2185,13 +2185,10 @@ class DataSourceV2DataFrameSuite } // Scenario 1: external write after CACHE TABLE is invisible (cache pinned). - // Scenario 2: session write invalidates cache; subsequent external write - // is again invisible. - test("SPARK-54022: CACHE TABLE pins state; session write invalidates, external does not") { + test("SPARK-54022: cached table pinned against external data write") { val t = "testcat.ns1.ns2.tbl" val ident = Identifier.of(Array("ns1", "ns2"), "tbl") withTable(t) { - // create a table and insert initial data sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") sql(s"INSERT INTO $t VALUES (1, 100)") @@ -2200,32 +2197,56 @@ class DataSourceV2DataFrameSuite assertCached(spark.table(t)) checkAnswer(spark.table(t), Seq(Row(1, 100))) - // Scenario 1: external writer adds (2, 200) via direct catalog API + // external writer adds (2, 200) via direct catalog API // (bypasses this session's CacheManager) val schema = StructType.fromDDL("id INT, salary INT") - val extTable = catalog("testcat").loadTable(ident).asInstanceOf[InMemoryBaseTable] + val extTable = catalog("testcat").loadTable(ident, + util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] extTable.withData(Array( new BufferedRows(Seq.empty, schema).withRow(InternalRow(2, 200)))) - // query the table again: cache is pinned, external write invisible + // cache is pinned, external write invisible assertCached(spark.table(t)) checkAnswer(spark.table(t), Seq(Row(1, 100))) - // Scenario 2: session write invalidates the cache entry + // REFRESH TABLE picks up external write + sql(s"REFRESH TABLE $t") + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200))) + } + } + + // Scenario 2: session write invalidates cache; subsequent external write + // is again invisible. + test("SPARK-54022: session write invalidates cache, then external write invisible") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + + // cache the table + spark.table(t).cache() + assertCached(spark.table(t)) + checkAnswer(spark.table(t), Seq(Row(1, 100))) + + // session write invalidates the cache entry sql(s"INSERT INTO $t VALUES (2, 200)") assertCached(spark.table(t)) checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200))) // external writer adds (3, 300) via direct catalog API - val extTable2 = catalog("testcat").loadTable(ident).asInstanceOf[InMemoryBaseTable] - extTable2.withData(Array( + val schema = StructType.fromDDL("id INT, salary INT") + val extTable = catalog("testcat").loadTable(ident, + util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( new BufferedRows(Seq.empty, schema).withRow(InternalRow(3, 300)))) - // query the table again: cache is re-pinned, external write invisible + // cache is re-pinned, external write invisible assertCached(spark.table(t)) checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200))) - // REFRESH TABLE picks up all external changes + // REFRESH TABLE picks up external write sql(s"REFRESH TABLE $t") assertCached(spark.table(t)) checkAnswer(spark.table(t), Seq(Row(1, 100), Row(2, 200), Row(3, 300))) @@ -2254,7 +2275,8 @@ class DataSourceV2DataFrameSuite // external writer adds (2, 200, -1) val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT") - val extTable = catalog("testcat").loadTable(ident).asInstanceOf[InMemoryBaseTable] + val extTable = catalog("testcat").loadTable(ident, + util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] extTable.withData(Array( new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1)))) @@ -2293,7 +2315,8 @@ class DataSourceV2DataFrameSuite // external writer adds (2, 200, -1) via direct catalog API // (bypasses this session's CacheManager) val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT") - val extTable = catalog("testcat").loadTable(ident).asInstanceOf[InMemoryBaseTable] + val extTable = catalog("testcat").loadTable(ident, + util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] extTable.withData(Array( new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1))))