diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java index 159b42764c..706a008814 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java @@ -118,6 +118,7 @@ static class AppendSinkWriterBuilder private final TablePath tablePath; private final Configuration flussConfig; private final RowType tableRowType; + private final RowType consumedRowType; private final int numBucket; private final List bucketKeys; private final List partitionKeys; @@ -129,6 +130,7 @@ public AppendSinkWriterBuilder( TablePath tablePath, Configuration flussConfig, RowType tableRowType, + RowType consumedRowType, int numBucket, List bucketKeys, List partitionKeys, @@ -138,6 +140,7 @@ public AppendSinkWriterBuilder( this.tablePath = tablePath; this.flussConfig = flussConfig; this.tableRowType = tableRowType; + this.consumedRowType = consumedRowType; this.numBucket = numBucket; this.bucketKeys = bucketKeys; this.partitionKeys = partitionKeys; @@ -152,6 +155,7 @@ public AppendSinkWriter createWriter(MailboxExecutor mailboxExecutor) { tablePath, flussConfig, tableRowType, + consumedRowType, mailboxExecutor, flussSerializationSchema); } @@ -242,6 +246,7 @@ static class UpsertSinkWriterBuilder private final TablePath tablePath; private final Configuration flussConfig; private final RowType tableRowType; + private final RowType consumedRowType; private final @Nullable int[] targetColumnIndexes; private final int numBucket; private final List bucketKeys; @@ -268,6 +273,7 @@ static class UpsertSinkWriterBuilder TablePath tablePath, Configuration flussConfig, RowType tableRowType, + RowType consumedRowType, @Nullable int[] targetColumnIndexes, int numBucket, List bucketKeys, @@ -280,6 +286,7 @@ static class UpsertSinkWriterBuilder this.tablePath = tablePath; this.flussConfig = flussConfig; this.tableRowType = tableRowType; + this.consumedRowType = consumedRowType; this.targetColumnIndexes = targetColumnIndexes; this.numBucket = numBucket; this.bucketKeys = bucketKeys; @@ -297,6 +304,7 @@ public UpsertSinkWriter createWriter(MailboxExecutor mailboxExecutor) { tablePath, flussConfig, tableRowType, + consumedRowType, targetColumnIndexes, mailboxExecutor, flussSerializationSchema, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java index fba1803237..820707cf33 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java @@ -87,6 +87,13 @@ public class FlinkTableSink private final @Nullable DataLakeFormat lakeFormat; @Nullable private final String producerId; + /** + * The row type of data actually consumed by the sink. This may differ from {@link + * #tableRowType} when the Flink planner sends computed columns (e.g., during UPDATE with tables + * that have computed columns like {@code ptime AS PROCTIME()}). + */ + private RowType consumedRowType; + private boolean appliedUpdates = false; @Nullable private GenericRow deleteRow; @@ -108,6 +115,7 @@ public FlinkTableSink( this.tablePath = tablePath; this.flussConfig = flussConfig; this.tableRowType = tableRowType; + this.consumedRowType = tableRowType; this.primaryKeyIndexes = primaryKeyIndexes; this.partitionKeys = partitionKeys; this.streaming = streaming; @@ -216,6 +224,7 @@ private FlinkSink getFlinkSink(int[] targetColumnIndexes) { tablePath, flussConfig, tableRowType, + consumedRowType, targetColumnIndexes, numBucket, bucketKeys, @@ -229,6 +238,7 @@ private FlinkSink getFlinkSink(int[] targetColumnIndexes) { tablePath, flussConfig, tableRowType, + consumedRowType, numBucket, bucketKeys, partitionKeys, @@ -265,6 +275,7 @@ public DynamicTableSink copy() { bucketKeys, distributionMode, producerId); + sink.consumedRowType = consumedRowType; sink.appliedUpdates = appliedUpdates; sink.deleteRow = deleteRow; return sink; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java index c8808f12f8..44585c40c2 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java @@ -229,6 +229,7 @@ public FlussSink build() { tablePath, flussConfig, tableRowType, + tableRowType, targetColumnIndexes, numBucket, bucketKeys, @@ -245,6 +246,7 @@ public FlussSink build() { tablePath, flussConfig, tableRowType, + tableRowType, numBucket, bucketKeys, partitionKeys, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java index ac80f938c4..90551f6f48 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java @@ -106,6 +106,16 @@ public class RowDataSerializationSchema implements FlussSerializationSchema targetFieldNames = context.getRowSchema().getFieldNames(); List inputFieldNames = context.getInputRowSchema().getFieldNames(); + this.expectedInputFieldCount = inputFieldNames.size(); this.outputPadding = new PaddingRow(inputFieldNames.size()); if (targetFieldNames.size() != inputFieldNames.size()) { // there is a schema evolution happens (e.g., ADD COLUMN), need to build index mapping @@ -166,6 +177,19 @@ public RowWithOp serialize(RowData value) throws Exception { "Converter not initialized. The open() method must be called before serializing records."); } InternalRow row = converter.replace(value); + // Handle extra computed columns (e.g., PROCTIME()) that Flink may add during + // UPDATE statements. These columns don't exist in the Fluss table schema and + // need to be projected out. Lazily build a projection on first encounter. + if (row.getFieldCount() > expectedInputFieldCount) { + if (inputProjection == null) { + int[] identityMapping = new int[expectedInputFieldCount]; + for (int i = 0; i < expectedInputFieldCount; i++) { + identityMapping[i] = i; + } + inputProjection = ProjectedRow.from(identityMapping); + } + row = inputProjection.replaceRow(row); + } // handling schema evolution for changes before job compilation if (row.getFieldCount() < outputPadding.getFieldCount()) { row = outputPadding.replaceRow(row); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/AppendSinkWriter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/AppendSinkWriter.java index 3fdc5bdf7b..4bcde71d0a 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/AppendSinkWriter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/AppendSinkWriter.java @@ -43,9 +43,17 @@ public AppendSinkWriter( TablePath tablePath, Configuration flussConfig, RowType tableRowType, + RowType consumedRowType, MailboxExecutor mailboxExecutor, FlussSerializationSchema serializationSchema) { - super(tablePath, flussConfig, tableRowType, mailboxExecutor, serializationSchema); + super( + tablePath, + flussConfig, + tableRowType, + consumedRowType, + null, + mailboxExecutor, + serializationSchema); } @Override diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java index f942bc5adb..3d8c0bfc21 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java @@ -59,6 +59,7 @@ public abstract class FlinkSinkWriter implements SinkWriter { private final TablePath tablePath; private final Configuration flussConfig; protected final RowType tableRowType; + protected final RowType consumedRowType; protected final @Nullable int[] targetColumnIndexes; private final MailboxExecutor mailboxExecutor; private final FlussSerializationSchema serializationSchema; @@ -79,13 +80,21 @@ public FlinkSinkWriter( RowType tableRowType, MailboxExecutor mailboxExecutor, FlussSerializationSchema serializationSchema) { - this(tablePath, flussConfig, tableRowType, null, mailboxExecutor, serializationSchema); + this( + tablePath, + flussConfig, + tableRowType, + tableRowType, + null, + mailboxExecutor, + serializationSchema); } public FlinkSinkWriter( TablePath tablePath, Configuration flussConfig, RowType tableRowType, + RowType consumedRowType, @Nullable int[] targetColumns, MailboxExecutor mailboxExecutor, FlussSerializationSchema serializationSchema) { @@ -93,6 +102,7 @@ public FlinkSinkWriter( this.flussConfig = flussConfig; this.targetColumnIndexes = targetColumns; this.tableRowType = tableRowType; + this.consumedRowType = consumedRowType; this.mailboxExecutor = mailboxExecutor; this.serializationSchema = serializationSchema; } @@ -118,7 +128,7 @@ public void initialize(SinkWriterMetricGroup metricGroup) { try { this.serializationSchema.open( new SerializerInitContextImpl( - table.getTableInfo().getRowType(), tableRowType, false)); + table.getTableInfo().getRowType(), consumedRowType, false)); } catch (Exception e) { throw new FlussRuntimeException(e); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/UpsertSinkWriter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/UpsertSinkWriter.java index 73ac142f04..a3e4f9de16 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/UpsertSinkWriter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/UpsertSinkWriter.java @@ -60,6 +60,7 @@ public class UpsertSinkWriter extends FlinkSinkWriter { * @param tablePath the path of the table to write to * @param flussConfig the Fluss configuration * @param tableRowType the row type of the table + * @param consumedRowType the row type of the consumed data (may include computed columns) * @param targetColumnIndexes optional column indexes for partial updates * @param mailboxExecutor the mailbox executor for async operations * @param flussSerializationSchema the serialization schema for input records @@ -69,6 +70,7 @@ public UpsertSinkWriter( TablePath tablePath, Configuration flussConfig, RowType tableRowType, + RowType consumedRowType, @Nullable int[] targetColumnIndexes, MailboxExecutor mailboxExecutor, FlussSerializationSchema flussSerializationSchema, @@ -77,6 +79,7 @@ public UpsertSinkWriter( tablePath, flussConfig, tableRowType, + consumedRowType, targetColumnIndexes, mailboxExecutor, flussSerializationSchema); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java index 08b6b6c9f2..cc3d133070 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java @@ -247,12 +247,13 @@ void testAppendLogWithBucketKey(DistributionMode distributionMode) throws Except } else { assertThat(insertPlan).contains("\"ship_strategy\" : \"FORWARD\""); } - // there shouldn't have REBALANCE shuffle strategy, this asserts operator parallelism + // there shouldn't have REBALANCE shuffle strategy, this asserts operator + // parallelism assertThat(insertPlan).doesNotContain("\"ship_strategy\" : \"REBALANCE\""); tEnv.executeSql(insertSql).await(); CloseableIterator rowIter = tEnv.executeSql("select * from sink_test").collect(); - //noinspection ArraysAsListWithZeroOrOneArgument + // noinspection ArraysAsListWithZeroOrOneArgument List> expectedGroups = Arrays.asList( Arrays.asList( @@ -406,7 +407,7 @@ void testAppendLogPartitionTable(DistributionMode distributionMode) throws Excep tEnv.executeSql(insertSql).await(); CloseableIterator rowIter = tEnv.executeSql("select * from sink_test").collect(); - //noinspection ArraysAsListWithZeroOrOneArgument + // noinspection ArraysAsListWithZeroOrOneArgument List> expectedGroups = Arrays.asList( Arrays.asList( @@ -571,7 +572,8 @@ void testPartialUpsert() throws Exception { "-U[1, 111, c1]", "+U[1, 333, c1]", "-U[1, 333, c1]", "+U[1, null, c1]"); assertResultsIgnoreOrder(rowIter, expectedRows, false); - // check the row 1 will be deleted finally since all the fields in the row are set to null + // check the row 1 will be deleted finally since all the fields in the row are + // set to null tEnv.executeSql("INSERT INTO sink_test(a, c) SELECT f0, f2 FROM changeLog").await(); expectedRows = Arrays.asList("-U[1, null, c1]", "+U[1, null, c11]", "-D[1, null, c11]"); assertResultsIgnoreOrder(rowIter, expectedRows, true); @@ -635,7 +637,8 @@ void testFirstRowMergeEngine() throws Exception { + " b string) with('table.merge-engine' = 'first_row')"); tEnv.executeSql("create table log_sink (a int, b string)"); - // insert the primary table with first_row merge engine into the log table to verify that + // insert the primary table with first_row merge engine into the log table to + // verify that // the first_row merge engine only generates append-only stream JobClient insertJobClient = tEnv.executeSql("insert into log_sink select * from first_row_source") @@ -789,7 +792,8 @@ void testWritePartitionedTable(boolean isPrimaryKeyTable, boolean isAutoPartitio .await(); // This test requires dynamically discovering newly created partitions, so - // 'scan.partition.discovery.interval' needs to be set to 2s (default is 1 minute), + // 'scan.partition.discovery.interval' needs to be set to 2s (default is 1 + // minute), // otherwise the test may hang for 1 minute. CloseableIterator rowIter = tEnv.executeSql( @@ -1309,7 +1313,8 @@ void testVersionMergeEngineWithTypeTimestampLTZ9() throws Exception { @Test void testComprehensiveAggregationFunctions() throws Exception { - // Test all 11 aggregate functions (each function tested once with representative data type) + // Test all 11 aggregate functions (each function tested once with + // representative data type) tEnv.executeSql( "create table comprehensive_agg (" + "id int not null primary key not enforced, " @@ -1518,7 +1523,8 @@ void testDeleteBehaviorForInsertStmt(String deleteBehavior) throws Exception { "TableSourceScan(table=[[testcatalog, defaultdb, delete_behavior_ignore_table]], fields=[a, b], " + "changelogMode=[I,UB,UA,D])"); } else { - // For 'ignore' and 'disable', delete operations are not emitted in the changelog + // For 'ignore' and 'disable', delete operations are not emitted in the + // changelog assertThat(changelogModePlan) .contains( "TableSourceScan(table=[[testcatalog, defaultdb, delete_behavior_ignore_table]], fields=[a, b], " @@ -1540,7 +1546,8 @@ void testDeleteBehaviorForInsertStmt(String deleteBehavior) throws Exception { env.fromCollection( Arrays.asList( Row.ofKind(RowKind.INSERT, 4, "test4"), - Row.ofKind(RowKind.DELETE, 1, "test1"), // Should be ignored + Row.ofKind(RowKind.DELETE, 1, "test1"), // Should be + // ignored Row.ofKind(RowKind.UPDATE_AFTER, 2, "updated_test2")))); tEnv.createTemporaryView("changelog_source", changelogData); @@ -1559,13 +1566,15 @@ void testDeleteBehaviorForInsertStmt(String deleteBehavior) throws Exception { // 3. Verify the final table state based on delete behavior if (deleteBehavior.equals("disable")) { - // For 'disable', the delete operation is not supported, so we expect an exception + // For 'disable', the delete operation is not supported, so we expect an + // exception assertThatThrownBy(tableResult::await) .hasStackTraceContaining( "DeletionDisabledException: Delete operations are disabled for this table." + " The table.delete.behavior is set to 'disable'."); } else { - // For 'ignore', the delete operation is ignored, so we just wait for the insert and + // For 'ignore', the delete operation is ignored, so we just wait for the insert + // and // update to be applied tableResult.await(); CloseableIterator rowIter = @@ -1639,8 +1648,10 @@ void testWalModeWithDefaultMergeEngineAndAggregation() throws Exception { // Explain the aggregation query to check for ChangelogNormalize String aggPlan = tEnv.explainSql(aggQuery); - // ChangelogNormalize should be present to normalize the changelog for aggregation - // In Flink, when the source produces changelog with primary key semantics (I, UA, D), + // ChangelogNormalize should be present to normalize the changelog for + // aggregation + // In Flink, when the source produces changelog with primary key semantics (I, + // UA, D), // a ChangelogNormalize operator is inserted before aggregation assertThat(aggPlan).contains("ChangelogNormalize"); @@ -1670,6 +1681,40 @@ void testWalModeWithDefaultMergeEngineAndAggregation() throws Exception { assertQueryResultExactOrder(tEnv, aggQuery, expectedAggResults); } + @Test + void testRowUpdateWithComputedColumn() throws Exception { + String tableName = "fluss_order"; + tEnv.executeSql( + "CREATE TABLE " + + tableName + + " (" + + " order_key BIGINT," + + " cust_key BIGINT," + + " order_status VARCHAR," + + " total_price DOUBLE," + + " order_date DATE," + + " order_priority VARCHAR," + + " ptime AS PROCTIME()," + + " PRIMARY KEY (order_key) NOT ENFORCED" + + ") WITH (" + + " 'table.changelog.image' = 'wal'" + + ")"); + + tEnv.executeSql( + "INSERT INTO " + + tableName + + " VALUES " + + "(14404096, 1, 'o', 10.0, DATE '2026-01-01', 'low')") + .await(); + + tBatchEnv + .executeSql( + "UPDATE " + + tableName + + " SET order_priority = 'high' WHERE order_key = 14404096") + .await(); + } + @Test void testAutoIncrementWithTargetColumns() throws Exception { // use single parallelism to make result ordering stable @@ -1793,7 +1838,8 @@ void testWalModeWithAutoIncrement() throws Exception { // Use batch mode to update and delete records - // Upsert data, not support update/delete rows in table with auto-inc column for now. + // Upsert data, not support update/delete rows in table with auto-inc column for + // now. // TODO: Support Batch Update tEnv.executeSql( String.format( diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriterTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriterTest.java index 6eebba4197..0a1c662163 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriterTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriterTest.java @@ -269,6 +269,7 @@ private FlinkSinkWriter createSinkWriter( DEFAULT_SINK_TABLE_PATH, configuration, tableRowType, + tableRowType, mailboxExecutor, serializationSchema); }