diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java index d1d663d1b9..c06f18b644 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java @@ -108,8 +108,7 @@ void testCannotEnableDatalakeForTableCreatedBeforeClusterEnabledDatalake() throw .cause() .isInstanceOf(InvalidAlterTableException.class) .hasMessageContaining( - "The option 'table.datalake.enabled' cannot be altered for tables that were" - + " created before the Fluss cluster enabled datalake."); + "The following options cannot be altered for tables that were created before the Fluss cluster enabled datalake: table.datalake.enabled."); } @Test diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java index af1e2eaaa3..ead9c8438a 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java @@ -249,45 +249,39 @@ void testAlterTableConfig() throws Exception { // alter table set an unsupported modification option should throw exception String unSupportedDml1 = - "alter table test_alter_table_append_only set ('table.auto-partition.enabled' = 'true')"; + "alter table test_alter_table_append_only set ('table.auto-partition.enabled' = 'true', 'table.kv.format' = 'indexed')"; assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml1)) .rootCause() .isInstanceOf(InvalidAlterTableException.class) - .hasMessage( - "The option 'table.auto-partition.enabled' is not supported to alter yet."); + .hasMessageContaining("The following options are not supported to alter yet:") + .hasMessageContaining("table.kv.format") + .hasMessageContaining(" table.auto-partition.enabled"); String unSupportedDml2 = - "alter table test_alter_table_append_only set ('k1' = 'v1', 'table.kv.format' = 'indexed')"; - assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml2)) - .rootCause() - .isInstanceOf(InvalidAlterTableException.class) - .hasMessage("The option 'table.kv.format' is not supported to alter yet."); - - String unSupportedDml3 = "alter table test_alter_table_append_only set ('bucket.num' = '1000')"; - assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml3)) + assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml2)) .rootCause() .isInstanceOf(CatalogException.class) .hasMessage("The option 'bucket.num' is not supported to alter yet."); - String unSupportedDml4 = + String unSupportedDml3 = "alter table test_alter_table_append_only set ('bucket.key' = 'a')"; - assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml4)) + assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml3)) .rootCause() .isInstanceOf(CatalogException.class) .hasMessage("The option 'bucket.key' is not supported to alter yet."); - String unSupportedDml5 = + String unSupportedDml4 = "alter table test_alter_table_append_only reset ('bootstrap.servers')"; - assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml5)) + assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml4)) .rootCause() .isInstanceOf(CatalogException.class) .hasMessage("The option 'bootstrap.servers' is not supported to alter yet."); - String unSupportedDml6 = + String unSupportedDml5 = "alter table test_alter_table_append_only set ('auto-increment.fields' = 'b')"; - assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml6)) + assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml5)) .rootCause() .isInstanceOf(CatalogException.class) .hasMessage("The option 'auto-increment.fields' is not supported to alter yet."); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index d3bd94cd76..f749fc559d 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -123,22 +123,31 @@ public static void validateTableDescriptor(TableDescriptor tableDescriptor, int public static void validateAlterTableProperties( TableInfo currentTable, Set tableKeysToChange) { TableConfig currentConfig = currentTable.getTableConfig(); - tableKeysToChange.forEach( - k -> { - if (isTableStorageConfig(k) && !isAlterableTableOption(k)) { - throw new InvalidAlterTableException( - "The option '" + k + "' is not supported to alter yet."); - } - if (!currentConfig.getDataLakeFormat().isPresent() - && ConfigOptions.TABLE_DATALAKE_ENABLED.key().equals(k)) { - throw new InvalidAlterTableException( - String.format( - "The option '%s' cannot be altered for tables that were" - + " created before the Fluss cluster enabled datalake.", - ConfigOptions.TABLE_DATALAKE_ENABLED.key())); - } - }); + List unsupportedKeys = + tableKeysToChange.stream() + .filter(k -> isTableStorageConfig(k) && !isAlterableTableOption(k)) + .collect(Collectors.toList()); + if (!unsupportedKeys.isEmpty()) { + throw new InvalidAlterTableException( + String.format( + "The following options are not supported to alter yet: %s.", + String.join(", ", unsupportedKeys))); + } + + if (!currentConfig.getDataLakeFormat().isPresent()) { + List datalakeKeys = + tableKeysToChange.stream() + .filter(k -> k.startsWith("table.datalake.")) + .collect(Collectors.toList()); + if (!datalakeKeys.isEmpty()) { + throw new InvalidAlterTableException( + String.format( + "The following options cannot be altered for tables that were" + + " created before the Fluss cluster enabled datalake: %s.", + String.join(", ", datalakeKeys))); + } + } } private static void checkSystemColumns(RowType schema) {