[FLINK-39438][MaxCompute] Add support for sink operation types in MaxCompute options#4377
Conversation
There was a problem hiding this comment.
Pull request overview
Adds an explicit MaxCompute sink option to control whether the sink operates in append or upsert mode, decoupling some behavior from MaxCompute table type.
Changes:
- Introduce
sink.operationconfiguration (append/upsert) wired throughMaxComputeDataSinkFactoryintoMaxComputeOptions. - Adjust writer selection and table-creation behavior to honor the configured sink operation.
- Add an emulator-based E2E test to verify table creation in APPEND mode does not create transactional / PK metadata.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| .../utils/SchemaEvolutionUtilsTest.java | Adds E2E test covering append-mode table creation behavior. |
| .../EmulatorTestBase.java | Adds appendOptions test config using the new sink operation option. |
| .../writer/MaxComputeWriter.java | Uses sinkOperation to decide between upsert vs append writer. |
| .../writer/BatchAppendWriter.java | Avoids constructing PartitionSpec when partition is null/empty. |
| .../utils/SchemaEvolutionUtils.java | Creates transactional table with PKs only when in UPSERT mode. |
| .../options/MaxComputeOptions.java | Adds SinkOperation option to MaxComputeOptions (+ builder + enum). |
| .../MaxComputeDataSinkOptions.java | Defines the new sink.operation ConfigOption. |
| .../MaxComputeDataSinkFactory.java | Reads sink.operation from config and passes it into options. |
Comments suppressed due to low confidence (2)
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java:94
- When
sinkOperationisUPSERTbut the provided schema has no primary keys, the table will be created as non-transactional and the sink will effectively behave as append. Consider validating this combination and throwing a clear exception (or otherwise enforcing that UPSERT requires primary keys) to avoid silent misconfiguration.
if (!CollectionUtil.isNullOrEmpty(schema.primaryKeys())
&& options.getSinkOperation() == MaxComputeOptions.SinkOperation.UPSERT) {
tableCreator
.transactionTable()
.withBucketNum(options.getBucketsNum())
.withPrimaryKeys(schema.primaryKeys());
}
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/writer/MaxComputeWriter.java:43
- In UPSERT mode, this falls back to
BatchAppendWriterwhen the target table is non-transactional, which silently ignores update/delete semantics and doesn’t truly “respect the configured option”. Consider failing fast whensinkOperationisUPSERTbutisTransactionalTable(...)is false (with an actionable error telling users to create a transactional table or switch to APPEND).
if (MaxComputeUtils.isTransactionalTable(options, sessionIdentifier)
&& options.getSinkOperation() == MaxComputeOptions.SinkOperation.UPSERT) {
return new BatchUpsertWriter(options, writeOptions, sessionIdentifier);
} else {
return new BatchAppendWriter(options, writeOptions, sessionIdentifier);
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| void testCreateTableInAppendMode() { | ||
| try { | ||
| String appendTable = "SCHEMA_EVOLUTION_APPEND_TABLE"; | ||
| SchemaEvolutionUtils.createTable( | ||
| appendOptions, | ||
| TableId.tableId(appendTable), | ||
| Schema.newBuilder() | ||
| .physicalColumn("PK", DataTypes.BIGINT()) | ||
| .physicalColumn("ID1", DataTypes.BIGINT()) | ||
| .primaryKey("PK") | ||
| .build()); | ||
| // In APPEND mode the table should NOT be created as a transactional table, | ||
| // so primary key metadata should be absent even though the schema defines one. | ||
| assertThat(odpsInstance.tables().get(appendTable).getPrimaryKey()).isEmpty(); | ||
| odpsInstance.tables().delete(appendTable, true); |
| .defaultValue(4) | ||
| .withDescription("The number of concurrent with flush bucket data."); | ||
|
|
||
| public static final ConfigOption<MaxComputeOptions.SinkOperation> SINK_OPERATION = |
There was a problem hiding this comment.
Please add documentation for the newly introduced sink.operation option. This should explain:
- upsert (default): Requires primary keys in schema. Creates a transactional table and supports update/delete semantics.
- append: Creates a regular table regardless of primary keys. Only supports insert operations, suitable for append-only scenarios.
This helps users understand the configuration impact and choose the appropriate mode for their use case.
| return value; | ||
| } | ||
|
|
||
| public static SinkOperation fromValue(String value) { |
There was a problem hiding this comment.
This method is unused and could be removed.
| this.value = value; | ||
| } | ||
|
|
||
| public String getValue() { |
There was a problem hiding this comment.
This method is unused and could be removed.
Problem
Expected Behavior
Proposal
Example YAML Configuration: