Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ static class AppendSinkWriterBuilder<InputT>
private final TablePath tablePath;
private final Configuration flussConfig;
private final RowType tableRowType;
private final RowType consumedRowType;
private final int numBucket;
private final List<String> bucketKeys;
private final List<String> partitionKeys;
Expand All @@ -129,6 +130,7 @@ public AppendSinkWriterBuilder(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
RowType consumedRowType,
int numBucket,
List<String> bucketKeys,
List<String> partitionKeys,
Expand All @@ -138,6 +140,7 @@ public AppendSinkWriterBuilder(
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.tableRowType = tableRowType;
this.consumedRowType = consumedRowType;
this.numBucket = numBucket;
this.bucketKeys = bucketKeys;
this.partitionKeys = partitionKeys;
Expand All @@ -152,6 +155,7 @@ public AppendSinkWriter<InputT> createWriter(MailboxExecutor mailboxExecutor) {
tablePath,
flussConfig,
tableRowType,
consumedRowType,
mailboxExecutor,
flussSerializationSchema);
}
Expand Down Expand Up @@ -242,6 +246,7 @@ static class UpsertSinkWriterBuilder<InputT>
private final TablePath tablePath;
private final Configuration flussConfig;
private final RowType tableRowType;
private final RowType consumedRowType;
private final @Nullable int[] targetColumnIndexes;
private final int numBucket;
private final List<String> bucketKeys;
Expand All @@ -268,6 +273,7 @@ static class UpsertSinkWriterBuilder<InputT>
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
RowType consumedRowType,
@Nullable int[] targetColumnIndexes,
int numBucket,
List<String> bucketKeys,
Expand All @@ -280,6 +286,7 @@ static class UpsertSinkWriterBuilder<InputT>
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.tableRowType = tableRowType;
this.consumedRowType = consumedRowType;
this.targetColumnIndexes = targetColumnIndexes;
this.numBucket = numBucket;
this.bucketKeys = bucketKeys;
Expand All @@ -297,6 +304,7 @@ public UpsertSinkWriter<InputT> createWriter(MailboxExecutor mailboxExecutor) {
tablePath,
flussConfig,
tableRowType,
consumedRowType,
targetColumnIndexes,
mailboxExecutor,
flussSerializationSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ public class FlinkTableSink
private final @Nullable DataLakeFormat lakeFormat;
@Nullable private final String producerId;

/**
* The row type of data actually consumed by the sink. This may differ from {@link
* #tableRowType} when the Flink planner sends computed columns (e.g., during UPDATE with tables
* that have computed columns like {@code ptime AS PROCTIME()}).
*/
private RowType consumedRowType;

private boolean appliedUpdates = false;
@Nullable private GenericRow deleteRow;

Expand All @@ -108,6 +115,7 @@ public FlinkTableSink(
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.tableRowType = tableRowType;
this.consumedRowType = tableRowType;
this.primaryKeyIndexes = primaryKeyIndexes;
this.partitionKeys = partitionKeys;
this.streaming = streaming;
Expand Down Expand Up @@ -216,6 +224,7 @@ private FlinkSink<RowData> getFlinkSink(int[] targetColumnIndexes) {
tablePath,
flussConfig,
tableRowType,
consumedRowType,
targetColumnIndexes,
numBucket,
bucketKeys,
Expand All @@ -229,6 +238,7 @@ private FlinkSink<RowData> getFlinkSink(int[] targetColumnIndexes) {
tablePath,
flussConfig,
tableRowType,
consumedRowType,
numBucket,
bucketKeys,
partitionKeys,
Expand Down Expand Up @@ -265,6 +275,7 @@ public DynamicTableSink copy() {
bucketKeys,
distributionMode,
producerId);
sink.consumedRowType = consumedRowType;
sink.appliedUpdates = appliedUpdates;
sink.deleteRow = deleteRow;
return sink;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ public FlussSink<InputT> build() {
tablePath,
flussConfig,
tableRowType,
tableRowType,
targetColumnIndexes,
numBucket,
bucketKeys,
Expand All @@ -245,6 +246,7 @@ public FlussSink<InputT> build() {
tablePath,
flussConfig,
tableRowType,
tableRowType,
numBucket,
bucketKeys,
partitionKeys,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ public class RowDataSerializationSchema implements FlussSerializationSchema<RowD
*/
@Nullable private transient ProjectedRow outputProjection;

/**
* Projection to handle input rows with extra fields (e.g., computed columns like PROCTIME()
* that Flink adds during UPDATE). Lazily initialized in {@link #serialize(RowData)} when the
* actual RowData arity exceeds the expected input field count.
*/
@Nullable private transient ProjectedRow inputProjection;

/** The expected field count of the input, used to detect extra computed columns. */
private transient int expectedInputFieldCount;

/**
* Estimator for calculating the size of RowData instances. Maybe null if there is no need to
* calculate size.
Expand Down Expand Up @@ -134,6 +144,7 @@ public void open(InitializationContext context) throws Exception {
this.converter = new FlinkAsFlussRow();
List<String> targetFieldNames = context.getRowSchema().getFieldNames();
List<String> inputFieldNames = context.getInputRowSchema().getFieldNames();
this.expectedInputFieldCount = inputFieldNames.size();
this.outputPadding = new PaddingRow(inputFieldNames.size());
if (targetFieldNames.size() != inputFieldNames.size()) {
// there is a schema evolution happens (e.g., ADD COLUMN), need to build index mapping
Expand Down Expand Up @@ -166,6 +177,19 @@ public RowWithOp serialize(RowData value) throws Exception {
"Converter not initialized. The open() method must be called before serializing records.");
}
InternalRow row = converter.replace(value);
// Handle extra computed columns (e.g., PROCTIME()) that Flink may add during
// UPDATE statements. These columns don't exist in the Fluss table schema and
// need to be projected out. Lazily build a projection on first encounter.
if (row.getFieldCount() > expectedInputFieldCount) {
if (inputProjection == null) {
int[] identityMapping = new int[expectedInputFieldCount];
for (int i = 0; i < expectedInputFieldCount; i++) {
identityMapping[i] = i;
}
inputProjection = ProjectedRow.from(identityMapping);
}
row = inputProjection.replaceRow(row);
}
// handling schema evolution for changes before job compilation
if (row.getFieldCount() < outputPadding.getFieldCount()) {
row = outputPadding.replaceRow(row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,17 @@ public AppendSinkWriter(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
RowType consumedRowType,
MailboxExecutor mailboxExecutor,
FlussSerializationSchema<InputT> serializationSchema) {
super(tablePath, flussConfig, tableRowType, mailboxExecutor, serializationSchema);
super(
tablePath,
flussConfig,
tableRowType,
consumedRowType,
null,
mailboxExecutor,
serializationSchema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public abstract class FlinkSinkWriter<InputT> implements SinkWriter<InputT> {
private final TablePath tablePath;
private final Configuration flussConfig;
protected final RowType tableRowType;
protected final RowType consumedRowType;
protected final @Nullable int[] targetColumnIndexes;
private final MailboxExecutor mailboxExecutor;
private final FlussSerializationSchema<InputT> serializationSchema;
Expand All @@ -79,20 +80,29 @@ public FlinkSinkWriter(
RowType tableRowType,
MailboxExecutor mailboxExecutor,
FlussSerializationSchema<InputT> serializationSchema) {
this(tablePath, flussConfig, tableRowType, null, mailboxExecutor, serializationSchema);
this(
tablePath,
flussConfig,
tableRowType,
tableRowType,
null,
mailboxExecutor,
serializationSchema);
}

public FlinkSinkWriter(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
RowType consumedRowType,
@Nullable int[] targetColumns,
MailboxExecutor mailboxExecutor,
FlussSerializationSchema<InputT> serializationSchema) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.targetColumnIndexes = targetColumns;
this.tableRowType = tableRowType;
this.consumedRowType = consumedRowType;
this.mailboxExecutor = mailboxExecutor;
this.serializationSchema = serializationSchema;
}
Expand All @@ -118,7 +128,7 @@ public void initialize(SinkWriterMetricGroup metricGroup) {
try {
this.serializationSchema.open(
new SerializerInitContextImpl(
table.getTableInfo().getRowType(), tableRowType, false));
table.getTableInfo().getRowType(), consumedRowType, false));
} catch (Exception e) {
throw new FlussRuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class UpsertSinkWriter<InputT> extends FlinkSinkWriter<InputT> {
* @param tablePath the path of the table to write to
* @param flussConfig the Fluss configuration
* @param tableRowType the row type of the table
* @param consumedRowType the row type of the consumed data (may include computed columns)
* @param targetColumnIndexes optional column indexes for partial updates
* @param mailboxExecutor the mailbox executor for async operations
* @param flussSerializationSchema the serialization schema for input records
Expand All @@ -69,6 +70,7 @@ public UpsertSinkWriter(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
RowType consumedRowType,
@Nullable int[] targetColumnIndexes,
MailboxExecutor mailboxExecutor,
FlussSerializationSchema<InputT> flussSerializationSchema,
Expand All @@ -77,6 +79,7 @@ public UpsertSinkWriter(
tablePath,
flussConfig,
tableRowType,
consumedRowType,
targetColumnIndexes,
mailboxExecutor,
flussSerializationSchema);
Expand Down
Loading