From 13fec979691e157f46ef576151bbe5166b5c4c08 Mon Sep 17 00:00:00 2001 From: Ganesha S Date: Thu, 18 Dec 2025 10:27:32 +0530 Subject: [PATCH] [SPARK-54831][SQL] Add COMMENT ON COLUMN support to set and remove column comments --- .../resources/error/error-conditions.json | 6 + .../sql/catalyst/parser/SqlBaseParser.g4 | 1 + .../spark/sql/errors/QueryParsingErrors.scala | 13 ++ .../apache/spark/sql/types/StructField.scala | 13 +- .../sql/catalyst/parser/AstBuilder.scala | 19 ++ .../plans/logical/v2AlterTableCommands.scala | 22 +++ .../sql/catalyst/parser/DDLParserSuite.scala | 41 +++++ .../sql/connector/DataSourceV2SQLSuite.scala | 171 ++++++++++++++++++ 8 files changed, 281 insertions(+), 5 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 9ac7ea2c205d..35465a0bd969 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -806,6 +806,12 @@ ], "sqlState" : "22003" }, + "COMMENT_ON_COLUMN_REQUIRES_TABLE_NAME" : { + "message" : [ + "COMMENT ON COLUMN requires at least table name and column name (e.g., table.column, database.table.column, or catalog.database.table.column). Found: ." + ], + "sqlState" : "42601" + }, "COMPARATOR_RETURNS_NULL" : { "message" : [ "The comparator has returned a NULL for a comparison between and .", diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index c29452cab180..e8da1b4ed728 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -379,6 +379,7 @@ statement | COMMENT ON namespace identifierReference IS comment #commentNamespace | COMMENT ON TABLE identifierReference IS comment #commentTable + | COMMENT ON COLUMN multipartIdentifier IS comment #commentColumn | REFRESH TABLE identifierReference #refreshTable | REFRESH FUNCTION identifierReference #refreshFunction | REFRESH (stringLit | .*?) #refreshResource diff --git a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala index cd302ef6fcca..b2f8b181422c 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala @@ -830,6 +830,19 @@ private[sql] object QueryParsingErrors extends DataTypeErrorsBase { ctx) } + /** + * Indicates that COMMENT ON COLUMN must specify a table name along with column name. + * + * @throws ParseException + * Always throws this exception + */ + def commentOnColumnRequiresTableNameError(ctx: ParserRuleContext): Nothing = { + throw new ParseException( + errorClass = "COMMENT_ON_COLUMN_REQUIRES_TABLE_NAME", + messageParameters = Map("columnRef" -> ctx.getText), + ctx = ctx) + } + /** * Throws an internal error for unexpected parameter markers found during AST building. This * should be unreachable in normal operation due to grammar-level blocking. diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/StructField.scala b/sql/api/src/main/scala/org/apache/spark/sql/types/StructField.scala index eb3d30051880..d6f0b3725c50 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/types/StructField.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/types/StructField.scala @@ -149,13 +149,16 @@ case class StructField( } /** - * Updates the StructField with a new comment value. + * Updates the StructField with a new comment value. If the comment is null, the comment key is + * removed from metadata. */ def withComment(comment: String): StructField = { - val newMetadata = new MetadataBuilder() - .withMetadata(metadata) - .putString("comment", comment) - .build() + val builder = new MetadataBuilder().withMetadata(metadata) + val newMetadata = if (comment == null) { + builder.remove("comment").build() + } else { + builder.putString("comment", comment).build() + } copy(metadata = newMetadata) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index c3571ad70962..330c88f365a4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -6397,6 +6397,25 @@ class AstBuilder extends DataTypeAstBuilder CommentOnTable(createUnresolvedTable(ctx.identifierReference, "COMMENT ON TABLE"), comment) } + override def visitCommentColumn(ctx: CommentColumnContext): LogicalPlan = withOrigin(ctx) { + val comment = visitComment(ctx.comment) + val nameParts = visitMultipartIdentifier(ctx.multipartIdentifier) + if (nameParts.length < 2) { + throw QueryParsingErrors.commentOnColumnRequiresTableNameError(ctx) + } + // The last part is the column name, everything before that is the table name + // This allows: + // - table.column (uses current catalog and database) + // - database.table.column (uses current catalog) + // - catalog.database.table.column (fully qualified) + val tableName = nameParts.init + val columnName = Seq(nameParts.last) + CommentOnColumn( + UnresolvedTable(tableName, "COMMENT ON COLUMN"), + UnresolvedFieldName(columnName), + comment) + } + override def visitComment (ctx: CommentContext): String = { Option(ctx.stringLit()).map(s => string(visitStringLit(s))).getOrElse("") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala index 843ce22061d8..676e04751619 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala @@ -58,6 +58,28 @@ case class CommentOnTable(table: LogicalPlan, comment: String) extends AlterTabl copy(table = newChild) } +/** + * The logical plan that defines or changes the comment of a COLUMN for v2 catalogs. + * + * {{{ + * COMMENT ON COLUMN catalog.namespace.table.column IS ('text' | NULL) + * }}} + * + * where the `text` is the new comment written as a string literal; or `NULL` to drop the comment. + */ +case class CommentOnColumn( + table: LogicalPlan, + column: FieldName, + comment: String) extends AlterTableCommand { + override def changes: Seq[TableChange] = { + require(column.resolved, "FieldName should be resolved before it's converted to TableChange.") + Seq(TableChange.updateColumnComment(column.name.toArray, comment)) + } + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = + copy(table = newChild) +} + + /** * The logical plan of the ALTER TABLE ... SET LOCATION command. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 0cb6f53f273a..543c2d74b908 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -2626,6 +2626,46 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan("COMMENT ON TABLE a.b.c IS 'xYz'"), CommentOnTable(UnresolvedTable(Seq("a", "b", "c"), "COMMENT ON TABLE"), "xYz")) + + // Test COMMENT ON COLUMN with fully qualified name (catalog.database.table.column) + comparePlans( + parsePlan("COMMENT ON COLUMN my_catalog.my_db.my_table.my_column IS NULL"), + CommentOnColumn( + UnresolvedTable(Seq("my_catalog", "my_db", "my_table"), "COMMENT ON COLUMN"), + UnresolvedFieldName(Seq("my_column")), + "")) + + // Test COMMENT ON COLUMN with 3-part name (database.table.column) + comparePlans( + parsePlan("COMMENT ON COLUMN a.b.c.d IS 'column comment'"), + CommentOnColumn( + UnresolvedTable(Seq("a", "b", "c"), "COMMENT ON COLUMN"), + UnresolvedFieldName(Seq("d")), + "column comment")) + + // Test COMMENT ON COLUMN with 2-part name (table.column) - uses current catalog and database + comparePlans( + parsePlan("COMMENT ON COLUMN my_table.my_column IS 'simple comment'"), + CommentOnColumn( + UnresolvedTable(Seq("my_table"), "COMMENT ON COLUMN"), + UnresolvedFieldName(Seq("my_column")), + "simple comment")) + + // Test COMMENT ON COLUMN with 'NULL' string literal + comparePlans( + parsePlan("COMMENT ON COLUMN a.b.c.d IS 'NULL'"), + CommentOnColumn( + UnresolvedTable(Seq("a", "b", "c"), "COMMENT ON COLUMN"), + UnresolvedFieldName(Seq("d")), + "NULL")) + + // Test COMMENT ON COLUMN with empty string + comparePlans( + parsePlan("COMMENT ON COLUMN a.b.c.d IS ''"), + CommentOnColumn( + UnresolvedTable(Seq("a", "b", "c"), "COMMENT ON COLUMN"), + UnresolvedFieldName(Seq("d")), + "")) } test("create table - without using") { @@ -2775,6 +2815,7 @@ class DDLParserSuite extends AnalysisTest { None, None, dropDefault = true)))) + // Make sure that the parser returns an exception when the feature is disabled. withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") { val sql = "CREATE TABLE my_tab(a INT, b STRING NOT NULL DEFAULT \"abc\") USING parquet" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 5a9494cf8173..504cfaf8aec1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2830,6 +2830,177 @@ class DataSourceV2SQLSuiteV1Filter } } + test("COMMENT ON COLUMN") { + // use the default v2 session catalog. + spark.conf.set(V2_SESSION_CATALOG_IMPLEMENTATION, "builtin") + // Session catalog is used. + withTable("t") { + sql("CREATE TABLE t(k int COMMENT 'original comment', v string) USING json") + + // Verify original comment + val originalComment = sql("DESCRIBE t").filter("col_name = 'k'") + .select("comment").first().getString(0) + assert(originalComment === "original comment", + "Expected original comment to be set") + + // Set a new comment + checkColumnComment("t", "k", "new comment") + + // Verify comment is updated + val newComment = sql("DESCRIBE t").filter("col_name = 'k'") + .select("comment").first().getString(0) + assert(newComment === "new comment", + "Expected comment to be updated to 'new comment'") + + // Remove comment by setting to NULL + checkColumnComment("t", "k", null) + + // Verify comment is removed + val removedComment = sql("DESCRIBE t").filter("col_name = 'k'") + .select("comment").first().getString(0) + assert(removedComment.isEmpty, + "Expected comment to be removed") + + // Set comment to literal "NULL" string + checkColumnComment("t", "k", "NULL") + val nullStringComment = sql("DESCRIBE t").filter("col_name = 'k'") + .select("comment").first().getString(0) + assert(nullStringComment === "NULL", + "Expected comment to be set to 'NULL' string") + + // Set comment to empty string + checkColumnComment("t", "k", "") + val emptyComment = sql("DESCRIBE t").filter("col_name = 'k'") + .select("comment").first().getString(0) + assert(emptyComment === "", + "Expected comment to be set to empty string (not removed)") + + // Add comment to column that didn't have one + checkColumnComment("t", "v", "new column comment") + val vComment = sql("DESCRIBE t").filter("col_name = 'v'") + .select("comment").first().getString(0) + assert(vComment === "new column comment", + "Expected comment to be set on column v") + + // Remove comment from column v using COMMENT ON COLUMN IS NULL + checkColumnComment("t", "v", null) + val removedVComment = sql("DESCRIBE t").filter("col_name = 'v'") + .select("comment").first().getString(0) + assert(removedVComment.isEmpty, + "Expected comment to be removed using COMMENT ON COLUMN IS NULL") + } + + // V2 non-session catalog is used. + withTable("testcat.ns1.ns2.t") { + sql("CREATE TABLE testcat.ns1.ns2.t(k int COMMENT 'original', v string) USING foo") + + // Verify original comment + val originalComment = sql("DESCRIBE testcat.ns1.ns2.t").filter("col_name = 'k'") + .select("comment").first().getString(0) + assert(originalComment === "original", + "Expected original comment for testcat table") + + // Set a comment + checkColumnComment("testcat.ns1.ns2.t", "k", "updated comment") + + // Verify comment is set + val updatedComment = sql("DESCRIBE testcat.ns1.ns2.t").filter("col_name = 'k'") + .select("comment").first().getString(0) + assert(updatedComment === "updated comment", + "Expected comment to be updated for testcat table") + + // Remove comment by setting to NULL + checkColumnComment("testcat.ns1.ns2.t", "k", null) + + // Verify comment is removed + val removedComment = sql("DESCRIBE testcat.ns1.ns2.t").filter("col_name = 'k'") + .select("comment").first().getString(0) + assert(removedComment === null || removedComment.isEmpty, + "Expected comment to be removed from testcat table") + + // Set comment to empty string + checkColumnComment("testcat.ns1.ns2.t", "k", "") + val emptyComment = sql("DESCRIBE testcat.ns1.ns2.t").filter("col_name = 'k'") + .select("comment").first().getString(0) + assert(emptyComment === "", + "Expected comment to be set to empty string for testcat table") + + // Set a comment on column v and then remove it using COMMENT ON COLUMN IS NULL + sql("COMMENT ON COLUMN testcat.ns1.ns2.t.v IS 'temp comment'") + val tempComment = sql("DESCRIBE testcat.ns1.ns2.t").filter("col_name = 'v'") + .select("comment").first().getString(0) + assert(tempComment === "temp comment", + "Expected temp comment to be set") + + checkColumnComment("testcat.ns1.ns2.t", "v", null) + val droppedVComment = sql("DESCRIBE testcat.ns1.ns2.t").filter("col_name = 'v'") + .select("comment").first().getString(0) + assert(droppedVComment === null || droppedVComment.isEmpty, + "Expected comment to be removed using COMMENT ON COLUMN IS NULL in testcat") + } + + // Test error case: column not found + withTable("t2") { + sql("CREATE TABLE t2(k int) USING json") + val sql1 = "COMMENT ON COLUMN t2.nonexistent IS 'test'" + checkError( + exception = analysisException(sql1), + condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION", + parameters = Map( + "objectName" -> "`nonexistent`", + "proposal" -> "`k`"), + context = ExpectedContext( + fragment = sql1, + start = 0, + stop = 41)) + } + + // Test with different qualification levels + withDatabase("test_db") { + sql("CREATE DATABASE test_db") + sql("USE test_db") + withTable("test_db.qualified_test") { + sql("CREATE TABLE qualified_test(x int, y string) USING json") + + // Test with just table.column (uses current database) + sql("COMMENT ON COLUMN qualified_test.x IS 'test with table.column'") + val comment1 = sql("DESCRIBE qualified_test").filter("col_name = 'x'") + .select("comment").first().getString(0) + assert(comment1 === "test with table.column", + "Expected comment to be set using table.column format") + + // Test with database.table.column + sql("COMMENT ON COLUMN test_db.qualified_test.y IS 'test with database.table.column'") + val comment2 = sql("DESCRIBE qualified_test").filter("col_name = 'y'") + .select("comment").first().getString(0) + assert(comment2 === "test with database.table.column", + "Expected comment to be set using database.table.column format") + + // Test removing comment with just table.column + sql("COMMENT ON COLUMN qualified_test.x IS NULL") + val removedComment = sql("DESCRIBE qualified_test").filter("col_name = 'x'") + .select("comment").first().getString(0) + assert(removedComment === null || removedComment.isEmpty, + "Expected comment to be removed using table.column format") + } + } + } + + private def checkColumnComment(tableName: String, columnName: String, comment: String): Unit = { + sql(s"COMMENT ON COLUMN $tableName.$columnName IS " + + Option(comment).map("'" + _ + "'").getOrElse("NULL")) + val commentValue = sql(s"DESCRIBE $tableName").filter(s"col_name = '$columnName'") + .select("comment").first().getString(0) + if (comment == null) { + // When comment is NULL, the comment should be removed + assert(commentValue.isEmpty, + s"Expected comment to be removed for column $columnName") + } else { + assert(commentValue === comment, + s"Expected comment to be '$comment' for column $columnName, but got '$commentValue'") + } + } + private def checkTableComment(tableName: String, comment: String): Unit = { sql(s"COMMENT ON TABLE $tableName IS " + Option(comment).map("'" + _ + "'").getOrElse("NULL")) if (comment == null) {