diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala index fd2c0f6e9c2ec..db7f6b24e429d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala @@ -380,24 +380,52 @@ abstract class InMemoryBaseTable( def alterTableWithData( data: Array[BufferedRows], newSchema: StructType): InMemoryBaseTable = { + val newFieldNames = newSchema.fieldNames.toSet data.foreach { bufferedRow => val oldSchema = bufferedRow.schema + + // Identify which columns from the old schema still exist in the new schema. + // Each entry is (StructField, original index in old row) so we can extract values later. + val fieldsRetainedInOldSchema = oldSchema.fields.zipWithIndex.filter { + case (oldField, _) => newFieldNames.contains(oldField.name) + } + val areColumnsDropped = fieldsRetainedInOldSchema.length < oldSchema.length + + // Build a schema that only contains the retained columns. + // This becomes the write schema for the migrated rows. + val retainedSchemaAfterDroppedColumns = if (areColumnsDropped) { + StructType(fieldsRetainedInOldSchema.map(_._1)) + } else { + oldSchema + } + bufferedRow.rows.foreach { row => + // Physically remove dropped column values from the row so they do not + // survive through ALTER chains (e.g. DROP COLUMN then ADD COLUMN same name). + val retainedRowAfterDroppedColumns = if (areColumnsDropped) { + new GenericInternalRow(fieldsRetainedInOldSchema.map { + case (retainedField, idx) => row.get(idx, retainedField.dataType) + }) + } else { + row + } + // handle partition evolution by re-keying all data - val key = getKey(row, newSchema) + val key = getKey(retainedRowAfterDroppedColumns, newSchema) dataMap += dataMap.get(key) .map { splits => val newSplits = if ((splits.last.rows.size >= numRowsPerSplit) || - (splits.last.schema != oldSchema)) { - splits :+ new BufferedRows(key, oldSchema) + (splits.last.schema != retainedSchemaAfterDroppedColumns)) { + splits :+ new BufferedRows(key, retainedSchemaAfterDroppedColumns) } else { splits } - newSplits.last.withRow(row) + newSplits.last.withRow(retainedRowAfterDroppedColumns) key -> newSplits } .getOrElse(key -> Seq( - new BufferedRows(key, oldSchema).withRow(row))) + new BufferedRows(key, retainedSchemaAfterDroppedColumns) + .withRow(retainedRowAfterDroppedColumns))) addPartitionKey(key) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index ff7995ad6697e..ae1f8675d6d21 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -179,6 +179,20 @@ class BasicInMemoryTableCatalog extends TableCatalog { throw new IllegalArgumentException(s"Cannot drop all fields") } + // Compute the intermediate schema with only column deletions applied. + // This is used for data migration so that dropped column values are physically removed, + // even when a column with the same name is re-added in the same ALTER call. + val deleteOnlyChanges = changes.filter(_.isInstanceOf[TableChange.DeleteColumn]) + val schemaAfterDrops = if (deleteOnlyChanges.nonEmpty) { + CatalogV2Util.applySchemaChanges( + table.schema, + deleteOnlyChanges, + tableProvider = Some("in-memory"), + statementType = "ALTER TABLE") + } else { + schema + } + table.increaseVersion() val currentVersion = table.version() val newTable = table match { @@ -190,14 +204,14 @@ class BasicInMemoryTableCatalog extends TableCatalog { properties = properties, constraints = constraints, id = table.id) - .alterTableWithData(table.data, schema) + .alterTableWithData(table.data, schemaAfterDrops) case _: InMemoryTableWithV2Filter => new InMemoryTableWithV2Filter( name = table.name, columns = CatalogV2Util.structTypeToV2Columns(schema), partitioning = finalPartitioning, properties = properties) - .alterTableWithData(table.data, schema) + .alterTableWithData(table.data, schemaAfterDrops) case other => throw new UnsupportedOperationException( s"Unsupported InMemoryBaseTable subclass: ${other.getClass.getName}") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index eadaafea81a53..b3e5052c40e1e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -25,9 +25,10 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode} import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect} -import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, DefaultValue, Identifier, InMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo} +import org.apache.spark.sql.connector.catalog.{BufferedRows, Column, ColumnDefaultValue, DefaultValue, Identifier, InMemoryBaseTable, InMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo} import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, UpdateColumnDefaultValue} import org.apache.spark.sql.connector.catalog.TableChange @@ -40,7 +41,7 @@ import org.apache.spark.sql.execution.ExplainUtils.stripAQEPlan import org.apache.spark.sql.execution.datasources.v2.{AlterTableExec, CreateTableExec, DataSourceV2Relation, ReplaceTableExec} import org.apache.spark.sql.functions.lit import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{BooleanType, CalendarIntervalType, DoubleType, IntegerType, StringType, TimestampType} +import org.apache.spark.sql.types.{BooleanType, CalendarIntervalType, DoubleType, IntegerType, LongType, StringType, StructType, TimestampType} import org.apache.spark.sql.util.QueryExecutionListener import org.apache.spark.unsafe.types.UTF8String @@ -1759,6 +1760,289 @@ class DataSourceV2DataFrameSuite } } + // Temp views with stored plans: scenarios from the DSv2 table refresh design doc. + // Each test creates a DSv2 table with initial data, builds a temp view with a filter + // (to demonstrate that the stored plan is non-trivial), and then verifies the view + // behavior after various table modifications (session or external). + + // Scenario 1.1 (session write) + test("temp view with stored plan reflects session write") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + sql(s"INSERT INTO $t VALUES (2, 200)") + + checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + + // Scenario 1.2 (external write) + test("temp view with stored plan reflects external write") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external writer adds (2, 200) via direct catalog API + val schema = StructType.fromDDL("id INT, salary INT") + val extTable = catalog("testcat").loadTable(ident, + util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema).withRow(InternalRow(2, 200)))) + + checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + + // Scenario 2.1 (session ADD COLUMN) + test("temp view with stored plan preserves schema after session ADD COLUMN") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + sql(s"ALTER TABLE $t ADD COLUMN new_column INT") + sql(s"INSERT INTO $t VALUES (2, 200, -1)") + + // view preserves original 2-column schema, filter still applied + checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + + // Scenario 2.2 (external ADD COLUMN) + test("temp view with stored plan preserves schema after external ADD COLUMN") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external schema change via catalog API + val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true) + catalog("testcat").alterTable(ident, addCol) + + // external writer adds data with new schema + val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT") + val extTable = catalog("testcat").loadTable(ident, + util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1)))) + + // view preserves original 2-column schema, filter still applied + checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + + // Scenario 3.2 (external column removal) + test("temp view with stored plan detects external column removal") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external schema change via catalog API + val dropCol = TableChange.deleteColumn(Array("salary"), false) + catalog("testcat").alterTable(ident, dropCol) + + checkError( + exception = intercept[AnalysisException] { spark.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` INT has been removed")) + } + } + + // Scenario 4.2 (external drop and recreate table) + test("temp view with stored plan resolves to externally recreated table") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + val originalTableId = catalog("testcat").loadTable(ident).id + + // external drop and recreate via catalog API + catalog("testcat").dropTable(ident) + catalog("testcat").createTable( + ident, + new TableInfo.Builder() + .withColumns(Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType))) + .build()) + + val newTableId = catalog("testcat").loadTable(ident).id + assert(originalTableId != newTableId) + + // view resolves to the new empty table + checkAnswer(spark.table("v"), Seq.empty) + + // insert new data and verify the view picks it up + sql(s"INSERT INTO $t VALUES (2, 200)") + checkAnswer(spark.table("v"), Seq(Row(2, 200))) + } + } + + // Scenario 5.1 (session drop and re-add column with same type) + test("temp view with stored plan after session drop and re-add column same type") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + spark.table(t).createOrReplaceTempView("v_no_filter") + spark.table(t).filter("salary IS NULL").createOrReplaceTempView("v_filter_is_null") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + checkAnswer(spark.table("v_no_filter"), Seq(Row(1, 100), Row(10, 1000))) + checkAnswer(spark.table("v_filter_is_null"), Seq.empty) + + // drop and re-add column with same name and type + sql(s"ALTER TABLE $t DROP COLUMN salary") + sql(s"ALTER TABLE $t ADD COLUMN salary INT") + + // salary values are now null, so the filtered view returns nothing + checkAnswer(spark.table("v"), Seq.empty) + // unfiltered view returns rows with null salary + checkAnswer(spark.table("v_no_filter"), Seq(Row(1, null), Row(10, null))) + // IS NULL filter now matches all rows + checkAnswer(spark.table("v_filter_is_null"), Seq(Row(1, null), Row(10, null))) + } + } + + // Scenario 5.2 (external drop and re-add column with same type) + test("temp view with stored plan after external drop and re-add column same type") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + spark.table(t).createOrReplaceTempView("v_no_filter") + spark.table(t).filter("salary IS NULL").createOrReplaceTempView("v_filter_is_null") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + checkAnswer(spark.table("v_no_filter"), Seq(Row(1, 100), Row(10, 1000))) + checkAnswer(spark.table("v_filter_is_null"), Seq.empty) + + // external drop and re-add column via catalog API + val dropCol = TableChange.deleteColumn(Array("salary"), false) + val addCol = TableChange.addColumn(Array("salary"), IntegerType, true) + catalog("testcat").alterTable(ident, dropCol, addCol) + + // salary values are now null, so the filtered view returns nothing + checkAnswer(spark.table("v"), Seq.empty) + // unfiltered view returns rows with null salary + checkAnswer(spark.table("v_no_filter"), Seq(Row(1, null), Row(10, null))) + // IS NULL filter now matches all rows + checkAnswer(spark.table("v_filter_is_null"), Seq(Row(1, null), Row(10, null))) + } + } + + // Scenario 6.1 (session drop and re-add column with different type) + test("temp view with stored plan detects session column type change") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // drop and re-add column with same name but different type + sql(s"ALTER TABLE $t DROP COLUMN salary") + sql(s"ALTER TABLE $t ADD COLUMN salary STRING") + + checkError( + exception = intercept[AnalysisException] { spark.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to STRING")) + } + } + + // Scenario 6.2 (external drop and re-add column with different type) + test("temp view with stored plan detects external column type change") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external drop and re-add column with different type via catalog API + val dropCol = TableChange.deleteColumn(Array("salary"), false) + val addCol = TableChange.addColumn(Array("salary"), StringType, true) + catalog("testcat").alterTable(ident, dropCol, addCol) + + checkError( + exception = intercept[AnalysisException] { spark.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to STRING")) + } + } + + // Scenario 7 (type widening from INT to BIGINT) + test("temp view with stored plan detects type widening") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // widen salary type from INT to BIGINT via catalog API + val updateType = TableChange.updateColumnType(Array("salary"), LongType) + catalog("testcat").alterTable(ident, updateType) + + checkError( + exception = intercept[AnalysisException] { spark.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to BIGINT")) + } + } + test("cached DSv2 table DataFrame is refreshed and reused after insert") { val t = "testcat.ns1.ns2.tbl" withTable(t) {