Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -428,15 +428,6 @@ void testAlterTableColumn() throws Exception {
TablePath tablePath = TablePath.of("test_db", "alter_table_1");
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();

assertThatThrownBy(
() ->
admin.alterTable(
tablePath,
Collections.singletonList(
TableChange.dropColumn("id")),
false)
.get())
.hasMessageContaining("Not support drop column now.");
assertThatThrownBy(
() ->
admin.alterTable(
Expand Down Expand Up @@ -565,6 +556,61 @@ void testAlterTableColumn() throws Exception {
.hasMessageContaining("Column nested_row already exists");
}

@Test
void testAlterTableDropColumn() throws Exception {
TablePath tablePath = TablePath.of("test_db", "alter_table_drop");
Schema schema =
Schema.newBuilder()
.primaryKey("id")
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.column("seq", DataTypes.BIGINT())
.enableAutoIncrement("seq")
.build();
admin.createTable(
tablePath,
TableDescriptor.builder().schema(schema).distributedBy(3, "id").build(),
false)
.get();

assertThatThrownBy(
() ->
admin.alterTable(
tablePath,
Collections.singletonList(
TableChange.dropColumn("id")),
false)
.get())
.hasMessageContaining(
"Cannot drop column 'id' because it is part of the primary key.");

assertThatThrownBy(
() ->
admin.alterTable(
tablePath,
Collections.singletonList(
TableChange.dropColumn("non-existent")),
false)
.get())
.hasMessageContaining("Column non-existent does not exist.");

assertThatThrownBy(
() ->
admin.alterTable(
tablePath,
Collections.singletonList(
TableChange.dropColumn("seq")),
false)
.get())
.hasMessageContaining("Cannot drop auto-increment column 'seq'.");

admin.alterTable(tablePath, Collections.singletonList(TableChange.dropColumn("age")), false)
.get();
SchemaInfo schemaInfo = admin.getTableSchema(tablePath).get();
assertThat(schemaInfo.getSchema().getColumnNames()).doesNotContain("age");
}

@Test
void testCreateInvalidDatabaseAndTable() throws Exception {
assertThatThrownBy(
Expand Down
31 changes: 31 additions & 0 deletions fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,27 @@ public Builder column(String columnName, DataType dataType, AggFunction aggFunct
return this;
}

/**
* Declares a column that is dropped to this schema.
*
* @param columnName the name of the column
* @return the builder instance
*/
public Builder dropColumn(String columnName) {
checkNotNull(columnName, "Column name must not be null.");

Column existingColumn =
columns.stream()
.filter(column -> column.getName().equals(columnName))
.findFirst()
.orElseThrow(
() ->
new IllegalArgumentException(
"Column " + columnName + " does not exist."));
columns.remove(existingColumn);
return this;
}

/** Apply comment to the previous column. */
public Builder withComment(@Nullable String comment) {
if (!columns.isEmpty()) {
Expand Down Expand Up @@ -565,6 +586,16 @@ public Optional<Column> getColumn(String columnName) {
.findFirst();
}

/** Returns the primary key, if set. */
public Optional<PrimaryKey> getPrimaryKey() {
return Optional.ofNullable(primaryKey);
}

/** Returns the auto-increment column names. */
public List<String> getAutoIncrementColumnNames() {
return Collections.unmodifiableList(autoIncrementColumnNames);
}

/** Returns an instance of an {@link Schema}. */
public Schema build() {
return new Schema(columns, primaryKey, highestFieldId.get(), autoIncrementColumnNames);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1733,6 +1733,56 @@ void testAlterTableInvalidStatisticsColumnsOnLogTableShouldFail() {
"Column 'content' of type 'BYTES' is not supported for statistics collection.");
}

/**
* Tests that ALTER TABLE DROP COLUMN succeeds for a non-PK, non-autoincrement column and the
* resulting scan no longer exposes the dropped column.
*/
@Test
void testAlterTableDropColumn() throws Exception {
tEnv.executeSql("create table drop_col_test " + "(id int, name varchar, age int)");
TablePath tablePath = TablePath.of(DEFAULT_DB, "drop_col_test");

// Write data
List<InternalRow> rows = new ArrayList<>();
for (int i = 0; i < 5; i++) {
rows.add(row(i, "name-" + i, i * 10));
}
writeRows(conn, tablePath, rows, true);

// Drop a column
tEnv.executeSql("alter table drop_col_test drop age");

List<String> expected = new ArrayList<>();
for (int i = 0; i < 5; i++) {
expected.add(String.format("+I[%d, name-%d]", i, i));
}
try (CloseableIterator<Row> iter =
tEnv.executeSql("select * from drop_col_test").collect()) {
assertResultsIgnoreOrder(iter, expected, true);
}
}

/**
* Tests that ALTER TABLE DROP COLUMN rejects primary key columns and auto-increment columns.
*/
@Test
void testAlterTableDropColumnShouldFail() {
tEnv.executeSql(
"create table drop_col_reject_test "
+ "(id int not null, seq bigint, name varchar, "
+ "primary key (id) not enforced) "
+ "with ('auto-increment.fields' = 'seq')");

assertThatThrownBy(() -> tEnv.executeSql("alter table drop_col_reject_test drop id"))
.rootCause()
.hasMessageContaining(
"Cannot drop column 'id' because it is part of the primary key.");

assertThatThrownBy(() -> tEnv.executeSql("alter table drop_col_reject_test drop seq"))
.rootCause()
.hasMessageContaining("Cannot drop auto-increment column 'seq'.");
}

private List<String> writeRowsToTwoPartition(TablePath tablePath, Collection<String> partitions)
throws Exception {
List<InternalRow> rows = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,13 @@ public void alterTableSchema(

// validate the table column changes
if (!schemaChanges.isEmpty()) {
// TODO: remove this guard once lake catalogs support dropping columns.
if (isDataLakeEnabled(tableDescriptor)
&& schemaChanges.stream()
.anyMatch(change -> change instanceof TableChange.DropColumn)) {
throw new InvalidAlterTableException(
"Drop column is not yet supported for lake-tiered tables.");
}
Schema newSchema =
SchemaUpdate.applySchemaChanges(table.getSchema(), schemaChanges);
LakeCatalog.Context lakeCatalogContext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,23 @@ private SchemaUpdate addColumn(TableChange.AddColumn addColumn) {
}

private SchemaUpdate dropColumn(TableChange.DropColumn dropColumn) {
throw new SchemaChangeException("Not support drop column now.");
String name = dropColumn.getName();
Schema.Column existingColumn = builder.getColumn(name).orElse(null);

if (existingColumn == null) {
throw new InvalidAlterTableException("Column " + name + " does not exist.");
}
if (builder.getPrimaryKey().map(pk -> pk.getColumnNames().contains(name)).orElse(false)) {
throw new InvalidAlterTableException(
"Cannot drop column '" + name + "' because it is part of the primary key.");
}
if (builder.getAutoIncrementColumnNames().contains(name)) {
throw new InvalidAlterTableException(
"Cannot drop auto-increment column '" + name + "'.");
}
builder.dropColumn(name);

return this;
}

private SchemaUpdate modifiedColumn(TableChange.ModifyColumn modifyColumn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,15 @@
import static org.apache.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords;
import static org.apache.fluss.testutils.LogRecordsAssert.assertThatLogRecords;

/** Tests for {@link KvTablet} partial update/delete after schema evolution (ADD COLUMN). */
/**
* Tests for {@link KvTablet} partial update/delete after schema evolution (ADD COLUMN, DROP
* COLUMN).
*/
class KvTabletSchemaEvolutionTest {

private static final short SCHEMA_ID_V0 = 0;
private static final short SCHEMA_ID_V1 = 1;
private static final short SCHEMA_ID_V2 = 2;

// Schema v0: {a INT PK, b STRING, c STRING}
private static final Schema SCHEMA_V0 =
Expand All @@ -82,8 +86,14 @@ class KvTabletSchemaEvolutionTest {
private static final Schema SCHEMA_V1 =
Schema.newBuilder().fromSchema(SCHEMA_V0).column("d", DataTypes.STRING()).build();

// Schema v2: DROP COLUMN c
// {a INT PK, b STRING}
private static final Schema SCHEMA_V2 =
Schema.newBuilder().fromSchema(SCHEMA_V0).dropColumn("c").build();

private static final RowType ROW_TYPE_V0 = SCHEMA_V0.getRowType();
private static final RowType ROW_TYPE_V1 = SCHEMA_V1.getRowType();
private static final RowType ROW_TYPE_V2 = SCHEMA_V2.getRowType();

private final Configuration conf = new Configuration();

Expand Down Expand Up @@ -246,6 +256,48 @@ void testPartialDeleteAfterAddColumn() throws Exception {
.isEqualTo(expectedLogs);
}

@Test
void testPartialUpdateAfterDropColumn() throws Exception {
KvRecordTestUtils.KvRecordFactory recordFactoryV0 =
KvRecordTestUtils.KvRecordFactory.of(ROW_TYPE_V0);
KvRecordTestUtils.KvRecordBatchFactory batchFactoryV0 =
KvRecordTestUtils.KvRecordBatchFactory.of(SCHEMA_ID_V0);

// Insert {a=1, b="b_val", c="c_val"} with schema v0.
KvRecordBatch insertBatch =
batchFactoryV0.ofRecords(
recordFactoryV0.ofRecord(
"k1".getBytes(), new Object[] {1, "b_val", "c_val"}));
kvTablet.putAsLeader(insertBatch, null);

// Evolve schema to v2 (DROP COLUMN c).
schemaGetter.updateLatestSchemaInfo(new SchemaInfo(SCHEMA_V2, SCHEMA_ID_V2));

long beforePartialUpdate = logTablet.localLogEndOffset();

// Partial update targeting columns [a, b] with old schemaId.
int[] targetColumns = {0, 1};
KvRecordBatch partialBatch =
batchFactoryV0.ofRecords(
recordFactoryV0.ofRecord("k1".getBytes(), new Object[] {1, "new_b", null}));
kvTablet.putAsLeader(partialBatch, targetColumns);

// After DROP COLUMN c, the read result must expose only {a, b}.
LogRecords actualLogRecords = readLogRecords(beforePartialUpdate);
MemoryLogRecords expectedLogs =
logRecords(
ROW_TYPE_V2,
SCHEMA_ID_V2,
beforePartialUpdate,
Arrays.asList(ChangeType.UPDATE_BEFORE, ChangeType.UPDATE_AFTER),
Arrays.asList(new Object[] {1, "b_val"}, new Object[] {1, "new_b"}));

assertThatLogRecords(actualLogRecords)
.withSchema(ROW_TYPE_V2)
.assertCheckSum(true)
.isEqualTo(expectedLogs);
}

private LogRecords readLogRecords(long startOffset) throws Exception {
return logTablet
.read(startOffset, Integer.MAX_VALUE, FetchIsolation.LOG_END, false, null)
Expand Down