diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index da737dd07c..102db189c4 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -428,15 +428,6 @@ void testAlterTableColumn() throws Exception { TablePath tablePath = TablePath.of("test_db", "alter_table_1"); admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get(); - assertThatThrownBy( - () -> - admin.alterTable( - tablePath, - Collections.singletonList( - TableChange.dropColumn("id")), - false) - .get()) - .hasMessageContaining("Not support drop column now."); assertThatThrownBy( () -> admin.alterTable( @@ -565,6 +556,61 @@ void testAlterTableColumn() throws Exception { .hasMessageContaining("Column nested_row already exists"); } + @Test + void testAlterTableDropColumn() throws Exception { + TablePath tablePath = TablePath.of("test_db", "alter_table_drop"); + Schema schema = + Schema.newBuilder() + .primaryKey("id") + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("age", DataTypes.INT()) + .column("seq", DataTypes.BIGINT()) + .enableAutoIncrement("seq") + .build(); + admin.createTable( + tablePath, + TableDescriptor.builder().schema(schema).distributedBy(3, "id").build(), + false) + .get(); + + assertThatThrownBy( + () -> + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.dropColumn("id")), + false) + .get()) + .hasMessageContaining( + "Cannot drop column 'id' because it is part of the primary key."); + + assertThatThrownBy( + () -> + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.dropColumn("non-existent")), + false) + .get()) + .hasMessageContaining("Column non-existent does not exist."); + + assertThatThrownBy( + () -> + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.dropColumn("seq")), + false) + .get()) + .hasMessageContaining("Cannot drop auto-increment column 'seq'."); + + admin.alterTable(tablePath, Collections.singletonList(TableChange.dropColumn("age")), false) + .get(); + SchemaInfo schemaInfo = admin.getTableSchema(tablePath).get(); + assertThat(schemaInfo.getSchema().getColumnNames()).doesNotContain("age"); + } + @Test void testCreateInvalidDatabaseAndTable() throws Exception { assertThatThrownBy( diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java index 92b675c428..72faa6cc83 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java @@ -475,6 +475,27 @@ public Builder column(String columnName, DataType dataType, AggFunction aggFunct return this; } + /** + * Declares a column that is dropped to this schema. + * + * @param columnName the name of the column + * @return the builder instance + */ + public Builder dropColumn(String columnName) { + checkNotNull(columnName, "Column name must not be null."); + + Column existingColumn = + columns.stream() + .filter(column -> column.getName().equals(columnName)) + .findFirst() + .orElseThrow( + () -> + new IllegalArgumentException( + "Column " + columnName + " does not exist.")); + columns.remove(existingColumn); + return this; + } + /** Apply comment to the previous column. */ public Builder withComment(@Nullable String comment) { if (!columns.isEmpty()) { @@ -565,6 +586,16 @@ public Optional getColumn(String columnName) { .findFirst(); } + /** Returns the primary key, if set. */ + public Optional getPrimaryKey() { + return Optional.ofNullable(primaryKey); + } + + /** Returns the auto-increment column names. */ + public List getAutoIncrementColumnNames() { + return Collections.unmodifiableList(autoIncrementColumnNames); + } + /** Returns an instance of an {@link Schema}. */ public Schema build() { return new Schema(columns, primaryKey, highestFieldId.get(), autoIncrementColumnNames); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java index 6cebccb9bb..70fcfc1ef1 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java @@ -1733,6 +1733,56 @@ void testAlterTableInvalidStatisticsColumnsOnLogTableShouldFail() { "Column 'content' of type 'BYTES' is not supported for statistics collection."); } + /** + * Tests that ALTER TABLE DROP COLUMN succeeds for a non-PK, non-autoincrement column and the + * resulting scan no longer exposes the dropped column. + */ + @Test + void testAlterTableDropColumn() throws Exception { + tEnv.executeSql("create table drop_col_test " + "(id int, name varchar, age int)"); + TablePath tablePath = TablePath.of(DEFAULT_DB, "drop_col_test"); + + // Write data + List rows = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + rows.add(row(i, "name-" + i, i * 10)); + } + writeRows(conn, tablePath, rows, true); + + // Drop a column + tEnv.executeSql("alter table drop_col_test drop age"); + + List expected = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + expected.add(String.format("+I[%d, name-%d]", i, i)); + } + try (CloseableIterator iter = + tEnv.executeSql("select * from drop_col_test").collect()) { + assertResultsIgnoreOrder(iter, expected, true); + } + } + + /** + * Tests that ALTER TABLE DROP COLUMN rejects primary key columns and auto-increment columns. + */ + @Test + void testAlterTableDropColumnShouldFail() { + tEnv.executeSql( + "create table drop_col_reject_test " + + "(id int not null, seq bigint, name varchar, " + + "primary key (id) not enforced) " + + "with ('auto-increment.fields' = 'seq')"); + + assertThatThrownBy(() -> tEnv.executeSql("alter table drop_col_reject_test drop id")) + .rootCause() + .hasMessageContaining( + "Cannot drop column 'id' because it is part of the primary key."); + + assertThatThrownBy(() -> tEnv.executeSql("alter table drop_col_reject_test drop seq")) + .rootCause() + .hasMessageContaining("Cannot drop auto-increment column 'seq'."); + } + private List writeRowsToTwoPartition(TablePath tablePath, Collection partitions) throws Exception { List rows = new ArrayList<>(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index e1d8179507..8035760047 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -432,6 +432,13 @@ public void alterTableSchema( // validate the table column changes if (!schemaChanges.isEmpty()) { + // TODO: remove this guard once lake catalogs support dropping columns. + if (isDataLakeEnabled(tableDescriptor) + && schemaChanges.stream() + .anyMatch(change -> change instanceof TableChange.DropColumn)) { + throw new InvalidAlterTableException( + "Drop column is not yet supported for lake-tiered tables."); + } Schema newSchema = SchemaUpdate.applySchemaChanges(table.getSchema(), schemaChanges); LakeCatalog.Context lakeCatalogContext = diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java index d1ad0c16c2..f15f13f188 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java @@ -94,7 +94,23 @@ private SchemaUpdate addColumn(TableChange.AddColumn addColumn) { } private SchemaUpdate dropColumn(TableChange.DropColumn dropColumn) { - throw new SchemaChangeException("Not support drop column now."); + String name = dropColumn.getName(); + Schema.Column existingColumn = builder.getColumn(name).orElse(null); + + if (existingColumn == null) { + throw new InvalidAlterTableException("Column " + name + " does not exist."); + } + if (builder.getPrimaryKey().map(pk -> pk.getColumnNames().contains(name)).orElse(false)) { + throw new InvalidAlterTableException( + "Cannot drop column '" + name + "' because it is part of the primary key."); + } + if (builder.getAutoIncrementColumnNames().contains(name)) { + throw new InvalidAlterTableException( + "Cannot drop auto-increment column '" + name + "'."); + } + builder.dropColumn(name); + + return this; } private SchemaUpdate modifiedColumn(TableChange.ModifyColumn modifyColumn) { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletSchemaEvolutionTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletSchemaEvolutionTest.java index 1303521708..eaa69f2638 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletSchemaEvolutionTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletSchemaEvolutionTest.java @@ -62,11 +62,15 @@ import static org.apache.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords; import static org.apache.fluss.testutils.LogRecordsAssert.assertThatLogRecords; -/** Tests for {@link KvTablet} partial update/delete after schema evolution (ADD COLUMN). */ +/** + * Tests for {@link KvTablet} partial update/delete after schema evolution (ADD COLUMN, DROP + * COLUMN). + */ class KvTabletSchemaEvolutionTest { private static final short SCHEMA_ID_V0 = 0; private static final short SCHEMA_ID_V1 = 1; + private static final short SCHEMA_ID_V2 = 2; // Schema v0: {a INT PK, b STRING, c STRING} private static final Schema SCHEMA_V0 = @@ -82,8 +86,14 @@ class KvTabletSchemaEvolutionTest { private static final Schema SCHEMA_V1 = Schema.newBuilder().fromSchema(SCHEMA_V0).column("d", DataTypes.STRING()).build(); + // Schema v2: DROP COLUMN c + // {a INT PK, b STRING} + private static final Schema SCHEMA_V2 = + Schema.newBuilder().fromSchema(SCHEMA_V0).dropColumn("c").build(); + private static final RowType ROW_TYPE_V0 = SCHEMA_V0.getRowType(); private static final RowType ROW_TYPE_V1 = SCHEMA_V1.getRowType(); + private static final RowType ROW_TYPE_V2 = SCHEMA_V2.getRowType(); private final Configuration conf = new Configuration(); @@ -246,6 +256,48 @@ void testPartialDeleteAfterAddColumn() throws Exception { .isEqualTo(expectedLogs); } + @Test + void testPartialUpdateAfterDropColumn() throws Exception { + KvRecordTestUtils.KvRecordFactory recordFactoryV0 = + KvRecordTestUtils.KvRecordFactory.of(ROW_TYPE_V0); + KvRecordTestUtils.KvRecordBatchFactory batchFactoryV0 = + KvRecordTestUtils.KvRecordBatchFactory.of(SCHEMA_ID_V0); + + // Insert {a=1, b="b_val", c="c_val"} with schema v0. + KvRecordBatch insertBatch = + batchFactoryV0.ofRecords( + recordFactoryV0.ofRecord( + "k1".getBytes(), new Object[] {1, "b_val", "c_val"})); + kvTablet.putAsLeader(insertBatch, null); + + // Evolve schema to v2 (DROP COLUMN c). + schemaGetter.updateLatestSchemaInfo(new SchemaInfo(SCHEMA_V2, SCHEMA_ID_V2)); + + long beforePartialUpdate = logTablet.localLogEndOffset(); + + // Partial update targeting columns [a, b] with old schemaId. + int[] targetColumns = {0, 1}; + KvRecordBatch partialBatch = + batchFactoryV0.ofRecords( + recordFactoryV0.ofRecord("k1".getBytes(), new Object[] {1, "new_b", null})); + kvTablet.putAsLeader(partialBatch, targetColumns); + + // After DROP COLUMN c, the read result must expose only {a, b}. + LogRecords actualLogRecords = readLogRecords(beforePartialUpdate); + MemoryLogRecords expectedLogs = + logRecords( + ROW_TYPE_V2, + SCHEMA_ID_V2, + beforePartialUpdate, + Arrays.asList(ChangeType.UPDATE_BEFORE, ChangeType.UPDATE_AFTER), + Arrays.asList(new Object[] {1, "b_val"}, new Object[] {1, "new_b"})); + + assertThatLogRecords(actualLogRecords) + .withSchema(ROW_TYPE_V2) + .assertCheckSum(true) + .isEqualTo(expectedLogs); + } + private LogRecords readLogRecords(long startOffset) throws Exception { return logTablet .read(startOffset, Integer.MAX_VALUE, FetchIsolation.LOG_END, false, null)