Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ void testCannotEnableDatalakeForTableCreatedBeforeClusterEnabledDatalake() throw
.cause()
.isInstanceOf(InvalidAlterTableException.class)
.hasMessageContaining(
"The option 'table.datalake.enabled' cannot be altered for tables that were"
+ " created before the Fluss cluster enabled datalake.");
"The following options cannot be altered for tables that were created before the Fluss cluster enabled datalake: table.datalake.enabled.");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,45 +249,39 @@ void testAlterTableConfig() throws Exception {

// alter table set an unsupported modification option should throw exception
String unSupportedDml1 =
"alter table test_alter_table_append_only set ('table.auto-partition.enabled' = 'true')";
"alter table test_alter_table_append_only set ('table.auto-partition.enabled' = 'true', 'table.kv.format' = 'indexed')";

assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml1))
.rootCause()
.isInstanceOf(InvalidAlterTableException.class)
.hasMessage(
"The option 'table.auto-partition.enabled' is not supported to alter yet.");
.hasMessageContaining("The following options are not supported to alter yet:")
.hasMessageContaining("table.kv.format")
.hasMessageContaining(" table.auto-partition.enabled");

String unSupportedDml2 =
"alter table test_alter_table_append_only set ('k1' = 'v1', 'table.kv.format' = 'indexed')";
assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml2))
.rootCause()
.isInstanceOf(InvalidAlterTableException.class)
.hasMessage("The option 'table.kv.format' is not supported to alter yet.");

String unSupportedDml3 =
"alter table test_alter_table_append_only set ('bucket.num' = '1000')";
assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml3))
assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml2))
.rootCause()
.isInstanceOf(CatalogException.class)
.hasMessage("The option 'bucket.num' is not supported to alter yet.");

String unSupportedDml4 =
String unSupportedDml3 =
"alter table test_alter_table_append_only set ('bucket.key' = 'a')";
assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml4))
assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml3))
.rootCause()
.isInstanceOf(CatalogException.class)
.hasMessage("The option 'bucket.key' is not supported to alter yet.");

String unSupportedDml5 =
String unSupportedDml4 =
"alter table test_alter_table_append_only reset ('bootstrap.servers')";
assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml5))
assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml4))
.rootCause()
.isInstanceOf(CatalogException.class)
.hasMessage("The option 'bootstrap.servers' is not supported to alter yet.");

String unSupportedDml6 =
String unSupportedDml5 =
"alter table test_alter_table_append_only set ('auto-increment.fields' = 'b')";
assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml6))
assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml5))
.rootCause()
.isInstanceOf(CatalogException.class)
.hasMessage("The option 'auto-increment.fields' is not supported to alter yet.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,22 +123,31 @@ public static void validateTableDescriptor(TableDescriptor tableDescriptor, int
public static void validateAlterTableProperties(
TableInfo currentTable, Set<String> tableKeysToChange) {
TableConfig currentConfig = currentTable.getTableConfig();
tableKeysToChange.forEach(
k -> {
if (isTableStorageConfig(k) && !isAlterableTableOption(k)) {
throw new InvalidAlterTableException(
"The option '" + k + "' is not supported to alter yet.");
}

if (!currentConfig.getDataLakeFormat().isPresent()
&& ConfigOptions.TABLE_DATALAKE_ENABLED.key().equals(k)) {
throw new InvalidAlterTableException(
String.format(
"The option '%s' cannot be altered for tables that were"
+ " created before the Fluss cluster enabled datalake.",
ConfigOptions.TABLE_DATALAKE_ENABLED.key()));
}
});
List<String> unsupportedKeys =
tableKeysToChange.stream()
.filter(k -> isTableStorageConfig(k) && !isAlterableTableOption(k))
.collect(Collectors.toList());
if (!unsupportedKeys.isEmpty()) {
throw new InvalidAlterTableException(
String.format(
"The following options are not supported to alter yet: %s.",
String.join(", ", unsupportedKeys)));
}

if (!currentConfig.getDataLakeFormat().isPresent()) {
List<String> datalakeKeys =
tableKeysToChange.stream()
.filter(k -> k.startsWith("table.datalake."))
.collect(Collectors.toList());
if (!datalakeKeys.isEmpty()) {
throw new InvalidAlterTableException(
String.format(
"The following options cannot be altered for tables that were"
+ " created before the Fluss cluster enabled datalake: %s.",
String.join(", ", datalakeKeys)));
}
}
}

private static void checkSystemColumns(RowType schema) {
Expand Down