From 02ad6ba38da22bfc183aa81463d32ad2d8421cde Mon Sep 17 00:00:00 2001 From: umi Date: Mon, 9 Mar 2026 15:58:30 +0800 Subject: [PATCH 1/5] Supoort adding columns before partition Option. --- .../java/org/apache/paimon/CoreOptions.java | 10 ++ .../apache/paimon/schema/SchemaManager.java | 19 +++ .../paimon/flink/SchemaChangeITCase.java | 144 ++++++++++++++++++ 3 files changed, 173 insertions(+) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 70e50efe9c83..0d3943007c89 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2035,6 +2035,16 @@ public InlineElement getDescription() { "If true, it disables explicit type casting. For ex: it disables converting LONG type to INT type. " + "Users can enable this option to disable explicit type casting"); + public static final ConfigOption ADD_COLUMN_BEFORE_PARTITION = + ConfigOptions.key("add-column-before-partition") + .booleanType() + .defaultValue(false) + .withDescription( + "If true, when adding a new column without specifying a position, " + + "the column will be placed before the first partition column " + + "instead of at the end of the schema. " + + "This only takes effect for partitioned tables."); + public static final ConfigOption COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT = ConfigOptions.key("commit.strict-mode.last-safe-snapshot") .longType() diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index e726b803268d..81585a1d44c9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -293,6 +293,14 @@ public static TableSchema generateTableSchema( CoreOptions.DISABLE_EXPLICIT_TYPE_CASTING .defaultValue() .toString())); + + boolean addColumnBeforePartition = + Boolean.parseBoolean( + oldOptions.getOrDefault( + CoreOptions.ADD_COLUMN_BEFORE_PARTITION.key(), + CoreOptions.ADD_COLUMN_BEFORE_PARTITION.defaultValue().toString())); + List partitionKeys = oldTableSchema.partitionKeys(); + List newFields = new ArrayList<>(oldTableSchema.fields()); AtomicInteger highestFieldId = new AtomicInteger(oldTableSchema.highestFieldId()); String newComment = oldTableSchema.comment(); @@ -368,6 +376,17 @@ protected void updateLastColumn( throw new UnsupportedOperationException( "Unsupported move type: " + move.type()); } + } else if (addColumnBeforePartition + && !partitionKeys.isEmpty() + && addColumn.fieldNames().length == 1) { + int insertIndex = newFields.size(); + for (int i = 0; i < newFields.size(); i++) { + if (partitionKeys.contains(newFields.get(i).name())) { + insertIndex = i; + break; + } + } + newFields.add(insertIndex, dataField); } else { newFields.add(dataField); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index 9084b55d60e5..8e6478afd842 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -1578,4 +1578,148 @@ public void testDisableExplicitTypeCasting(String formatType) { sql("ALTER TABLE T MODIFY v INT"); assertThat(sql("SELECT * FROM T")).containsExactlyInAnyOrder(Row.of(1, 10), Row.of(2, 20)); } + + @Test + public void testAddColumnBeforePartitionEnabled() { + sql( + "CREATE TABLE T_PART (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " behavior STRING,\n" + + " dt STRING,\n" + + " hh STRING\n" + + ") PARTITIONED BY (dt, hh) WITH (\n" + + " 'add-column-before-partition' = 'true'\n" + + ")"); + + sql("INSERT INTO T_PART VALUES(1, 100, 'buy', '2024-01-01', '10')"); + + // Add column without specifying position + sql("ALTER TABLE T_PART ADD score DOUBLE"); + + List result = sql("SHOW CREATE TABLE T_PART"); + assertThat(result.toString()) + .contains( + "`user_id` BIGINT,\n" + + " `item_id` BIGINT,\n" + + " `behavior` VARCHAR(2147483647),\n" + + " `score` DOUBLE,\n" + + " `dt` VARCHAR(2147483647),\n" + + " `hh` VARCHAR(2147483647)"); + + sql("INSERT INTO T_PART VALUES(2, 200, 'sell', 99.5, '2024-01-02', '11')"); + result = sql("SELECT * FROM T_PART"); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of(1L, 100L, "buy", null, "2024-01-01", "10"), + Row.of(2L, 200L, "sell", 99.5, "2024-01-02", "11")); + } + + @Test + public void testAddColumnBeforePartitionDisabledByDefault() { + sql( + "CREATE TABLE T_PART_DEFAULT (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " dt STRING\n" + + ") PARTITIONED BY (dt)"); + + // Add column without specifying position (default behavior) + sql("ALTER TABLE T_PART_DEFAULT ADD score DOUBLE"); + + List result = sql("SHOW CREATE TABLE T_PART_DEFAULT"); + // score should be appended at the end + assertThat(result.toString()) + .contains( + "`user_id` BIGINT,\n" + + " `item_id` BIGINT,\n" + + " `dt` VARCHAR(2147483647),\n" + + " `score` DOUBLE"); + } + + @Test + public void testAddColumnBeforePartitionWithExplicitPosition() { + sql( + "CREATE TABLE T_PART_POS (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " dt STRING\n" + + ") PARTITIONED BY (dt) WITH (\n" + + " 'add-column-before-partition' = 'true'\n" + + ")"); + + // Add column with explicit FIRST position, should respect explicit position + sql("ALTER TABLE T_PART_POS ADD score DOUBLE FIRST"); + + List result = sql("SHOW CREATE TABLE T_PART_POS"); + assertThat(result.toString()) + .contains( + "`score` DOUBLE,\n" + + " `user_id` BIGINT,\n" + + " `item_id` BIGINT,\n" + + " `dt` VARCHAR(2147483647)"); + } + + @Test + public void testAddColumnBeforePartitionViaAlterOption() { + sql( + "CREATE TABLE T_PART_ALTER (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " dt STRING\n" + + ") PARTITIONED BY (dt)"); + + // First add column without config (default: append at end) + sql("ALTER TABLE T_PART_ALTER ADD col1 INT"); + List result = sql("SHOW CREATE TABLE T_PART_ALTER"); + assertThat(result.toString()) + .contains( + "`user_id` BIGINT,\n" + + " `item_id` BIGINT,\n" + + " `dt` VARCHAR(2147483647),\n" + + " `col1` INT"); + + // Enable config via ALTER TABLE SET + sql("ALTER TABLE T_PART_ALTER SET ('add-column-before-partition' = 'true')"); + + // Now add another column, should go before partition column dt + sql("ALTER TABLE T_PART_ALTER ADD col2 DOUBLE"); + result = sql("SHOW CREATE TABLE T_PART_ALTER"); + assertThat(result.toString()) + .contains( + "`user_id` BIGINT,\n" + + " `item_id` BIGINT,\n" + + " `col2` DOUBLE,\n" + + " `dt` VARCHAR(2147483647),\n" + + " `col1` INT"); + } + + @Test + public void testAddMultipleColumnsBeforePartition() { + sql( + "CREATE TABLE T_PART_MULTI (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " dt STRING,\n" + + " hh STRING\n" + + ") PARTITIONED BY (dt, hh) WITH (\n" + + " 'add-column-before-partition' = 'true'\n" + + ")"); + + // Add first column + sql("ALTER TABLE T_PART_MULTI ADD col1 INT"); + // Add second column + sql("ALTER TABLE T_PART_MULTI ADD col2 DOUBLE"); + + List result = sql("SHOW CREATE TABLE T_PART_MULTI"); + // Both new columns should be before partition columns dt and hh + assertThat(result.toString()) + .contains( + "`user_id` BIGINT,\n" + + " `item_id` BIGINT,\n" + + " `col1` INT,\n" + + " `col2` DOUBLE,\n" + + " `dt` VARCHAR(2147483647),\n" + + " `hh` VARCHAR(2147483647)"); + } } From 9690ce450f4beb348679c2893e81a47074d6924f Mon Sep 17 00:00:00 2001 From: umi Date: Mon, 9 Mar 2026 16:32:40 +0800 Subject: [PATCH 2/5] test --- .../java/org/apache/paimon/flink/SchemaChangeITCase.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index 8e6478afd842..7ea82ea49b26 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -1709,7 +1709,7 @@ public void testAddMultipleColumnsBeforePartition() { // Add first column sql("ALTER TABLE T_PART_MULTI ADD col1 INT"); // Add second column - sql("ALTER TABLE T_PART_MULTI ADD col2 DOUBLE"); + sql("ALTER TABLE T_PART_MULTI ADD ( col2 INT, col3 DOUBLE )"); List result = sql("SHOW CREATE TABLE T_PART_MULTI"); // Both new columns should be before partition columns dt and hh @@ -1718,7 +1718,8 @@ public void testAddMultipleColumnsBeforePartition() { "`user_id` BIGINT,\n" + " `item_id` BIGINT,\n" + " `col1` INT,\n" - + " `col2` DOUBLE,\n" + + " `col2` INT,\n" + + " `col3` DOUBLE,\n" + " `dt` VARCHAR(2147483647),\n" + " `hh` VARCHAR(2147483647)"); } From 6e9b6d2caab5cf4e4e49bb82bbd34796b8358a86 Mon Sep 17 00:00:00 2001 From: umi Date: Mon, 9 Mar 2026 16:43:52 +0800 Subject: [PATCH 3/5] test --- .../paimon/flink/SchemaChangeITCase.java | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index 7ea82ea49b26..fbfbfc81e459 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -1723,4 +1723,57 @@ public void testAddMultipleColumnsBeforePartition() { + " `dt` VARCHAR(2147483647),\n" + " `hh` VARCHAR(2147483647)"); } + + @Test + public void testAddColumnBeforePartitionOnPrimaryKeyTable() { + sql( + "CREATE TABLE T_PK_PART (\n" + + " user_id BIGINT,\n" + + " item_id BIGINT,\n" + + " behavior STRING,\n" + + " dt STRING,\n" + + " hh STRING,\n" + + " PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n" + + ") PARTITIONED BY (dt, hh) WITH (\n" + + " 'add-column-before-partition' = 'true'\n" + + ")"); + + sql("INSERT INTO T_PK_PART VALUES(1, 100, 'buy', '2024-01-01', '10')"); + + // Add single column + sql("ALTER TABLE T_PK_PART ADD score DOUBLE"); + + List result = sql("SHOW CREATE TABLE T_PK_PART"); + assertThat(result.toString()) + .contains( + "`user_id` BIGINT NOT NULL,\n" + + " `item_id` BIGINT,\n" + + " `behavior` VARCHAR(2147483647),\n" + + " `score` DOUBLE,\n" + + " `dt` VARCHAR(2147483647) NOT NULL,\n" + + " `hh` VARCHAR(2147483647) NOT NULL"); + + // Add multiple columns + sql("ALTER TABLE T_PK_PART ADD ( col1 INT, col2 DOUBLE )"); + + result = sql("SHOW CREATE TABLE T_PK_PART"); + assertThat(result.toString()) + .contains( + "`user_id` BIGINT NOT NULL,\n" + + " `item_id` BIGINT,\n" + + " `behavior` VARCHAR(2147483647),\n" + + " `score` DOUBLE,\n" + + " `col1` INT,\n" + + " `col2` DOUBLE,\n" + + " `dt` VARCHAR(2147483647) NOT NULL,\n" + + " `hh` VARCHAR(2147483647) NOT NULL"); + + // Verify data read/write still works + sql("INSERT INTO T_PK_PART VALUES(2, 200, 'sell', 99.5, 10, 3.14, '2024-01-02', '11')"); + result = sql("SELECT * FROM T_PK_PART"); + assertThat(result) + .containsExactlyInAnyOrder( + Row.of(1L, 100L, "buy", null, null, null, "2024-01-01", "10"), + Row.of(2L, 200L, "sell", 99.5, 10, 3.14, "2024-01-02", "11")); + } } From f243b6115a7093419c6fc39bdc6541607e59b303 Mon Sep 17 00:00:00 2001 From: umi Date: Mon, 9 Mar 2026 20:05:42 +0800 Subject: [PATCH 4/5] fix --- .../org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala index 3c1e03ee89fd..d6df89e72375 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala @@ -838,7 +838,7 @@ trait MergeIntoAppendTableTest extends PaimonSparkTestBase with PaimonAppendTabl test("Paimon MergeInto: concurrent two merge") { for (dvEnabled <- Seq("true", "false")) { - withTable("s", "t") { + withTable("s", s"t_$dvEnabled") { sql("CREATE TABLE s (id INT, b INT, c INT)") sql( "INSERT INTO s VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4), (5, 5, 5), (6, 6, 6), (7, 7, 7), (8, 8, 8), (9, 9, 9)") From 177abd26f9a534548a457c8365dafd4ee3d46ab8 Mon Sep 17 00:00:00 2001 From: umi Date: Mon, 9 Mar 2026 22:22:20 +0800 Subject: [PATCH 5/5] remove --- .../apache/paimon/schema/SchemaManager.java | 38 +- .../paimon/flink/SchemaChangeITCase.java | 395 +++++++++--------- .../spark/sql/MergeIntoTableTestBase.scala | 2 +- 3 files changed, 220 insertions(+), 215 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 81585a1d44c9..95e8b21a38e0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -294,12 +294,13 @@ public static TableSchema generateTableSchema( .defaultValue() .toString())); - boolean addColumnBeforePartition = - Boolean.parseBoolean( - oldOptions.getOrDefault( - CoreOptions.ADD_COLUMN_BEFORE_PARTITION.key(), - CoreOptions.ADD_COLUMN_BEFORE_PARTITION.defaultValue().toString())); - List partitionKeys = oldTableSchema.partitionKeys(); + // boolean addColumnBeforePartition = + // Boolean.parseBoolean( + // oldOptions.getOrDefault( + // CoreOptions.ADD_COLUMN_BEFORE_PARTITION.key(), + // + // CoreOptions.ADD_COLUMN_BEFORE_PARTITION.defaultValue().toString())); + // List partitionKeys = oldTableSchema.partitionKeys(); List newFields = new ArrayList<>(oldTableSchema.fields()); AtomicInteger highestFieldId = new AtomicInteger(oldTableSchema.highestFieldId()); @@ -376,17 +377,20 @@ protected void updateLastColumn( throw new UnsupportedOperationException( "Unsupported move type: " + move.type()); } - } else if (addColumnBeforePartition - && !partitionKeys.isEmpty() - && addColumn.fieldNames().length == 1) { - int insertIndex = newFields.size(); - for (int i = 0; i < newFields.size(); i++) { - if (partitionKeys.contains(newFields.get(i).name())) { - insertIndex = i; - break; - } - } - newFields.add(insertIndex, dataField); + // } else if (addColumnBeforePartition + // && !partitionKeys.isEmpty() + // && addColumn.fieldNames().length == 1) + // { + // int insertIndex = newFields.size(); + // for (int i = 0; i < newFields.size(); i++) + // { + // if + // (partitionKeys.contains(newFields.get(i).name())) { + // insertIndex = i; + // break; + // } + // } + // newFields.add(insertIndex, dataField); } else { newFields.add(dataField); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index fbfbfc81e459..7b0552c76470 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -1579,201 +1579,202 @@ public void testDisableExplicitTypeCasting(String formatType) { assertThat(sql("SELECT * FROM T")).containsExactlyInAnyOrder(Row.of(1, 10), Row.of(2, 20)); } - @Test - public void testAddColumnBeforePartitionEnabled() { - sql( - "CREATE TABLE T_PART (\n" - + " user_id BIGINT,\n" - + " item_id BIGINT,\n" - + " behavior STRING,\n" - + " dt STRING,\n" - + " hh STRING\n" - + ") PARTITIONED BY (dt, hh) WITH (\n" - + " 'add-column-before-partition' = 'true'\n" - + ")"); - - sql("INSERT INTO T_PART VALUES(1, 100, 'buy', '2024-01-01', '10')"); - - // Add column without specifying position - sql("ALTER TABLE T_PART ADD score DOUBLE"); - - List result = sql("SHOW CREATE TABLE T_PART"); - assertThat(result.toString()) - .contains( - "`user_id` BIGINT,\n" - + " `item_id` BIGINT,\n" - + " `behavior` VARCHAR(2147483647),\n" - + " `score` DOUBLE,\n" - + " `dt` VARCHAR(2147483647),\n" - + " `hh` VARCHAR(2147483647)"); - - sql("INSERT INTO T_PART VALUES(2, 200, 'sell', 99.5, '2024-01-02', '11')"); - result = sql("SELECT * FROM T_PART"); - assertThat(result) - .containsExactlyInAnyOrder( - Row.of(1L, 100L, "buy", null, "2024-01-01", "10"), - Row.of(2L, 200L, "sell", 99.5, "2024-01-02", "11")); - } - - @Test - public void testAddColumnBeforePartitionDisabledByDefault() { - sql( - "CREATE TABLE T_PART_DEFAULT (\n" - + " user_id BIGINT,\n" - + " item_id BIGINT,\n" - + " dt STRING\n" - + ") PARTITIONED BY (dt)"); - - // Add column without specifying position (default behavior) - sql("ALTER TABLE T_PART_DEFAULT ADD score DOUBLE"); - - List result = sql("SHOW CREATE TABLE T_PART_DEFAULT"); - // score should be appended at the end - assertThat(result.toString()) - .contains( - "`user_id` BIGINT,\n" - + " `item_id` BIGINT,\n" - + " `dt` VARCHAR(2147483647),\n" - + " `score` DOUBLE"); - } - - @Test - public void testAddColumnBeforePartitionWithExplicitPosition() { - sql( - "CREATE TABLE T_PART_POS (\n" - + " user_id BIGINT,\n" - + " item_id BIGINT,\n" - + " dt STRING\n" - + ") PARTITIONED BY (dt) WITH (\n" - + " 'add-column-before-partition' = 'true'\n" - + ")"); - - // Add column with explicit FIRST position, should respect explicit position - sql("ALTER TABLE T_PART_POS ADD score DOUBLE FIRST"); - - List result = sql("SHOW CREATE TABLE T_PART_POS"); - assertThat(result.toString()) - .contains( - "`score` DOUBLE,\n" - + " `user_id` BIGINT,\n" - + " `item_id` BIGINT,\n" - + " `dt` VARCHAR(2147483647)"); - } - - @Test - public void testAddColumnBeforePartitionViaAlterOption() { - sql( - "CREATE TABLE T_PART_ALTER (\n" - + " user_id BIGINT,\n" - + " item_id BIGINT,\n" - + " dt STRING\n" - + ") PARTITIONED BY (dt)"); - - // First add column without config (default: append at end) - sql("ALTER TABLE T_PART_ALTER ADD col1 INT"); - List result = sql("SHOW CREATE TABLE T_PART_ALTER"); - assertThat(result.toString()) - .contains( - "`user_id` BIGINT,\n" - + " `item_id` BIGINT,\n" - + " `dt` VARCHAR(2147483647),\n" - + " `col1` INT"); - - // Enable config via ALTER TABLE SET - sql("ALTER TABLE T_PART_ALTER SET ('add-column-before-partition' = 'true')"); - - // Now add another column, should go before partition column dt - sql("ALTER TABLE T_PART_ALTER ADD col2 DOUBLE"); - result = sql("SHOW CREATE TABLE T_PART_ALTER"); - assertThat(result.toString()) - .contains( - "`user_id` BIGINT,\n" - + " `item_id` BIGINT,\n" - + " `col2` DOUBLE,\n" - + " `dt` VARCHAR(2147483647),\n" - + " `col1` INT"); - } - - @Test - public void testAddMultipleColumnsBeforePartition() { - sql( - "CREATE TABLE T_PART_MULTI (\n" - + " user_id BIGINT,\n" - + " item_id BIGINT,\n" - + " dt STRING,\n" - + " hh STRING\n" - + ") PARTITIONED BY (dt, hh) WITH (\n" - + " 'add-column-before-partition' = 'true'\n" - + ")"); - - // Add first column - sql("ALTER TABLE T_PART_MULTI ADD col1 INT"); - // Add second column - sql("ALTER TABLE T_PART_MULTI ADD ( col2 INT, col3 DOUBLE )"); - - List result = sql("SHOW CREATE TABLE T_PART_MULTI"); - // Both new columns should be before partition columns dt and hh - assertThat(result.toString()) - .contains( - "`user_id` BIGINT,\n" - + " `item_id` BIGINT,\n" - + " `col1` INT,\n" - + " `col2` INT,\n" - + " `col3` DOUBLE,\n" - + " `dt` VARCHAR(2147483647),\n" - + " `hh` VARCHAR(2147483647)"); - } - - @Test - public void testAddColumnBeforePartitionOnPrimaryKeyTable() { - sql( - "CREATE TABLE T_PK_PART (\n" - + " user_id BIGINT,\n" - + " item_id BIGINT,\n" - + " behavior STRING,\n" - + " dt STRING,\n" - + " hh STRING,\n" - + " PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n" - + ") PARTITIONED BY (dt, hh) WITH (\n" - + " 'add-column-before-partition' = 'true'\n" - + ")"); - - sql("INSERT INTO T_PK_PART VALUES(1, 100, 'buy', '2024-01-01', '10')"); - - // Add single column - sql("ALTER TABLE T_PK_PART ADD score DOUBLE"); - - List result = sql("SHOW CREATE TABLE T_PK_PART"); - assertThat(result.toString()) - .contains( - "`user_id` BIGINT NOT NULL,\n" - + " `item_id` BIGINT,\n" - + " `behavior` VARCHAR(2147483647),\n" - + " `score` DOUBLE,\n" - + " `dt` VARCHAR(2147483647) NOT NULL,\n" - + " `hh` VARCHAR(2147483647) NOT NULL"); - - // Add multiple columns - sql("ALTER TABLE T_PK_PART ADD ( col1 INT, col2 DOUBLE )"); - - result = sql("SHOW CREATE TABLE T_PK_PART"); - assertThat(result.toString()) - .contains( - "`user_id` BIGINT NOT NULL,\n" - + " `item_id` BIGINT,\n" - + " `behavior` VARCHAR(2147483647),\n" - + " `score` DOUBLE,\n" - + " `col1` INT,\n" - + " `col2` DOUBLE,\n" - + " `dt` VARCHAR(2147483647) NOT NULL,\n" - + " `hh` VARCHAR(2147483647) NOT NULL"); - - // Verify data read/write still works - sql("INSERT INTO T_PK_PART VALUES(2, 200, 'sell', 99.5, 10, 3.14, '2024-01-02', '11')"); - result = sql("SELECT * FROM T_PK_PART"); - assertThat(result) - .containsExactlyInAnyOrder( - Row.of(1L, 100L, "buy", null, null, null, "2024-01-01", "10"), - Row.of(2L, 200L, "sell", 99.5, 10, 3.14, "2024-01-02", "11")); - } + // @Test + // public void testAddColumnBeforePartitionEnabled() { + // sql( + // "CREATE TABLE T_PART (\n" + // + " user_id BIGINT,\n" + // + " item_id BIGINT,\n" + // + " behavior STRING,\n" + // + " dt STRING,\n" + // + " hh STRING\n" + // + ") PARTITIONED BY (dt, hh) WITH (\n" + // + " 'add-column-before-partition' = 'true'\n" + // + ")"); + // + // sql("INSERT INTO T_PART VALUES(1, 100, 'buy', '2024-01-01', '10')"); + // + // // Add column without specifying position + // sql("ALTER TABLE T_PART ADD score DOUBLE"); + // + // List result = sql("SHOW CREATE TABLE T_PART"); + // assertThat(result.toString()) + // .contains( + // "`user_id` BIGINT,\n" + // + " `item_id` BIGINT,\n" + // + " `behavior` VARCHAR(2147483647),\n" + // + " `score` DOUBLE,\n" + // + " `dt` VARCHAR(2147483647),\n" + // + " `hh` VARCHAR(2147483647)"); + // + // sql("INSERT INTO T_PART VALUES(2, 200, 'sell', 99.5, '2024-01-02', '11')"); + // result = sql("SELECT * FROM T_PART"); + // assertThat(result) + // .containsExactlyInAnyOrder( + // Row.of(1L, 100L, "buy", null, "2024-01-01", "10"), + // Row.of(2L, 200L, "sell", 99.5, "2024-01-02", "11")); + // } + // + // @Test + // public void testAddColumnBeforePartitionDisabledByDefault() { + // sql( + // "CREATE TABLE T_PART_DEFAULT (\n" + // + " user_id BIGINT,\n" + // + " item_id BIGINT,\n" + // + " dt STRING\n" + // + ") PARTITIONED BY (dt)"); + // + // // Add column without specifying position (default behavior) + // sql("ALTER TABLE T_PART_DEFAULT ADD score DOUBLE"); + // + // List result = sql("SHOW CREATE TABLE T_PART_DEFAULT"); + // // score should be appended at the end + // assertThat(result.toString()) + // .contains( + // "`user_id` BIGINT,\n" + // + " `item_id` BIGINT,\n" + // + " `dt` VARCHAR(2147483647),\n" + // + " `score` DOUBLE"); + // } + // + // @Test + // public void testAddColumnBeforePartitionWithExplicitPosition() { + // sql( + // "CREATE TABLE T_PART_POS (\n" + // + " user_id BIGINT,\n" + // + " item_id BIGINT,\n" + // + " dt STRING\n" + // + ") PARTITIONED BY (dt) WITH (\n" + // + " 'add-column-before-partition' = 'true'\n" + // + ")"); + // + // // Add column with explicit FIRST position, should respect explicit position + // sql("ALTER TABLE T_PART_POS ADD score DOUBLE FIRST"); + // + // List result = sql("SHOW CREATE TABLE T_PART_POS"); + // assertThat(result.toString()) + // .contains( + // "`score` DOUBLE,\n" + // + " `user_id` BIGINT,\n" + // + " `item_id` BIGINT,\n" + // + " `dt` VARCHAR(2147483647)"); + // } + // + // @Test + // public void testAddColumnBeforePartitionViaAlterOption() { + // sql( + // "CREATE TABLE T_PART_ALTER (\n" + // + " user_id BIGINT,\n" + // + " item_id BIGINT,\n" + // + " dt STRING\n" + // + ") PARTITIONED BY (dt)"); + // + // // First add column without config (default: append at end) + // sql("ALTER TABLE T_PART_ALTER ADD col1 INT"); + // List result = sql("SHOW CREATE TABLE T_PART_ALTER"); + // assertThat(result.toString()) + // .contains( + // "`user_id` BIGINT,\n" + // + " `item_id` BIGINT,\n" + // + " `dt` VARCHAR(2147483647),\n" + // + " `col1` INT"); + // + // // Enable config via ALTER TABLE SET + // sql("ALTER TABLE T_PART_ALTER SET ('add-column-before-partition' = 'true')"); + // + // // Now add another column, should go before partition column dt + // sql("ALTER TABLE T_PART_ALTER ADD col2 DOUBLE"); + // result = sql("SHOW CREATE TABLE T_PART_ALTER"); + // assertThat(result.toString()) + // .contains( + // "`user_id` BIGINT,\n" + // + " `item_id` BIGINT,\n" + // + " `col2` DOUBLE,\n" + // + " `dt` VARCHAR(2147483647),\n" + // + " `col1` INT"); + // } + // + // @Test + // public void testAddMultipleColumnsBeforePartition() { + // sql( + // "CREATE TABLE T_PART_MULTI (\n" + // + " user_id BIGINT,\n" + // + " item_id BIGINT,\n" + // + " dt STRING,\n" + // + " hh STRING\n" + // + ") PARTITIONED BY (dt, hh) WITH (\n" + // + " 'add-column-before-partition' = 'true'\n" + // + ")"); + // + // // Add first column + // sql("ALTER TABLE T_PART_MULTI ADD col1 INT"); + // // Add second column + // sql("ALTER TABLE T_PART_MULTI ADD ( col2 INT, col3 DOUBLE )"); + // + // List result = sql("SHOW CREATE TABLE T_PART_MULTI"); + // // Both new columns should be before partition columns dt and hh + // assertThat(result.toString()) + // .contains( + // "`user_id` BIGINT,\n" + // + " `item_id` BIGINT,\n" + // + " `col1` INT,\n" + // + " `col2` INT,\n" + // + " `col3` DOUBLE,\n" + // + " `dt` VARCHAR(2147483647),\n" + // + " `hh` VARCHAR(2147483647)"); + // } + // + // @Test + // public void testAddColumnBeforePartitionOnPrimaryKeyTable() { + // sql( + // "CREATE TABLE T_PK_PART (\n" + // + " user_id BIGINT,\n" + // + " item_id BIGINT,\n" + // + " behavior STRING,\n" + // + " dt STRING,\n" + // + " hh STRING,\n" + // + " PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n" + // + ") PARTITIONED BY (dt, hh) WITH (\n" + // + " 'add-column-before-partition' = 'true'\n" + // + ")"); + // + // sql("INSERT INTO T_PK_PART VALUES(1, 100, 'buy', '2024-01-01', '10')"); + // + // // Add single column + // sql("ALTER TABLE T_PK_PART ADD score DOUBLE"); + // + // List result = sql("SHOW CREATE TABLE T_PK_PART"); + // assertThat(result.toString()) + // .contains( + // "`user_id` BIGINT NOT NULL,\n" + // + " `item_id` BIGINT,\n" + // + " `behavior` VARCHAR(2147483647),\n" + // + " `score` DOUBLE,\n" + // + " `dt` VARCHAR(2147483647) NOT NULL,\n" + // + " `hh` VARCHAR(2147483647) NOT NULL"); + // + // // Add multiple columns + // sql("ALTER TABLE T_PK_PART ADD ( col1 INT, col2 DOUBLE )"); + // + // result = sql("SHOW CREATE TABLE T_PK_PART"); + // assertThat(result.toString()) + // .contains( + // "`user_id` BIGINT NOT NULL,\n" + // + " `item_id` BIGINT,\n" + // + " `behavior` VARCHAR(2147483647),\n" + // + " `score` DOUBLE,\n" + // + " `col1` INT,\n" + // + " `col2` DOUBLE,\n" + // + " `dt` VARCHAR(2147483647) NOT NULL,\n" + // + " `hh` VARCHAR(2147483647) NOT NULL"); + // + // // Verify data read/write still works + // sql("INSERT INTO T_PK_PART VALUES(2, 200, 'sell', 99.5, 10, 3.14, '2024-01-02', + // '11')"); + // result = sql("SELECT * FROM T_PK_PART"); + // assertThat(result) + // .containsExactlyInAnyOrder( + // Row.of(1L, 100L, "buy", null, null, null, "2024-01-01", "10"), + // Row.of(2L, 200L, "sell", 99.5, 10, 3.14, "2024-01-02", "11")); + // } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala index d6df89e72375..3c1e03ee89fd 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala @@ -838,7 +838,7 @@ trait MergeIntoAppendTableTest extends PaimonSparkTestBase with PaimonAppendTabl test("Paimon MergeInto: concurrent two merge") { for (dvEnabled <- Seq("true", "false")) { - withTable("s", s"t_$dvEnabled") { + withTable("s", "t") { sql("CREATE TABLE s (id INT, b INT, c INT)") sql( "INSERT INTO s VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4), (5, 5, 5), (6, 6, 6), (7, 7, 7), (8, 8, 8), (9, 9, 9)")