From d8f6e623fd2b4ee8899ea913e2c67223ba1e9ca6 Mon Sep 17 00:00:00 2001 From: Gustavo de Morais Date: Fri, 24 Apr 2026 18:03:05 +0200 Subject: [PATCH] [FLINK-39539][table] Document changelog processing in AGENTS.md Adds Changelog Processing sections to the root, planner, and runtime AGENTS.md covering: RowKind / ChangelogMode model, two-pass trait inference, primary vs unique vs upsert keys (incl. nullability), ChangelogNormalize and SinkUpsertMaterializer (V1, V2, WatermarkCompactingSinkMaterializer) lifecycle, the two-timestamps model (StreamRecord envelope vs row event-time column), FLIP-558 and FLIP-564 framing, and SQL best practices. --- AGENTS.md | 21 +++++ flink-table/flink-table-planner/AGENTS.md | 72 +++++++++++++++ flink-table/flink-table-runtime/AGENTS.md | 101 +++++++++++++++++++++- 3 files changed, 193 insertions(+), 1 deletion(-) diff --git a/AGENTS.md b/AGENTS.md index daa34497a6628..7c005710cdbc4 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -250,6 +250,27 @@ This section maps common types of Flink changes to the modules they touch and th 6. Add to release notes 7. Verify: ArchUnit tests pass, no new architecture violations +## Changelog Processing + +Flink SQL's data model is a continuously evolving changelog: every row carries a `RowKind` (`+I`, `-U`, `+U`, `-D`) and every stream a `ChangelogMode`. Most planner and runtime mistakes in the table layer trace back to mishandling this. Recent FLIPs reshape the area: + +- **FLIP-558** - `SinkUpsertMaterializerV2` (FLINK-38461) fixes changelog disorder via watermark-based compaction; new `ON CONFLICT` syntax (in progress) lets users opt out of the expensive history-tracking "rollback" SUM behavior. +- **FLIP-564** - `FROM_CHANGELOG` (FLINK-39261) and `TO_CHANGELOG` (FLINK-39419) PTFs expose stream/table conversion in SQL, the equivalent of DataStream's `fromChangelogStream`/`toChangelogStream`. + +For depth see [flink-table-planner/AGENTS.md](flink-table/flink-table-planner/AGENTS.md#changelog-processing) (trait inference, upsert keys, when the planner inserts `ChangelogNormalize` / `SinkUpsertMaterializer` / `DropUpdateBefore`) and [flink-table-runtime/AGENTS.md](flink-table/flink-table-runtime/AGENTS.md#changelog-processing) (`SinkUpsertMaterializer` V1 vs V2 internals, FLIP-564 PTFs). + +## SQL Best Practices + +Guidance for writing SQL (and for AI generating it) that minimizes changelog-related cost and surprises. + +1. **Keep the upstream upsert key equal to the sink primary key.** When they differ the planner must insert `SinkUpsertMaterializer`, which (in V1) tracks per-key update history. Add a `GROUP BY` on the PK columns or use `LAST_VALUE` instead of relying on sink-side deduplication. +2. **Prefer append-only sources when semantics allow.** Changelog sources force `ChangelogNormalize` and downstream retraction state. +3. **Define watermarks on changelog sources.** Disorder-tolerant operators (V2 SUM, FLIP-564 `FROM_CHANGELOG ORDER BY`) need watermark progress to compact and emit. No watermarks = stalled output or buffered state growth. +4. **Use `FROM_CHANGELOG` for custom CDC formats** (FLIP-564) instead of writing a connector when the source is an append CDC stream. +5. **Use `TO_CHANGELOG` to materialize a CDC append stream** out of a Flink table for archival, audit, or non-CDC-aware sinks. +6. **Match sink `ChangelogMode` to what the sink can actually do.** Don't declare upsert capability for a sink that cannot upsert; use retract and let the planner negotiate. +7. **Prefer retract sinks when the downstream system handles full deletes** - retract sinks bypass `SinkUpsertMaterializer` entirely (FLINK-38201). + ## Coding Standards - **Format Java files with Spotless immediately after editing:** `./mvnw spotless:apply`. Uses google-java-format with AOSP style. diff --git a/flink-table/flink-table-planner/AGENTS.md b/flink-table/flink-table-planner/AGENTS.md index 969c5b366569d..94bea46f68d8d 100644 --- a/flink-table/flink-table-planner/AGENTS.md +++ b/flink-table/flink-table-planner/AGENTS.md @@ -106,6 +106,78 @@ When bumping an ExecNode version, update the `@ExecNodeMetadata` annotation's `v New features often introduce `ExecutionConfigOptions` entries (in `flink-table-api-java`) for runtime tunability (e.g., cache sizes, timeouts, batch sizes). +## Changelog Processing + +Every stream in Flink SQL carries a `ChangelogMode` - the set of `RowKind`s (`+I`, `-U`, `+U`, `-D`) it can produce or consume. The planner tracks this as two traits and propagates them across the plan: most subtle table-layer bugs trace back to mishandling these. + +### Core types + +- `ChangelogMode` ([flink-table-common](../flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java)) - the public contract for source/sink connectors. The three canonical modes: append `{+I}`, upsert `{+I, +U, -D}`, retract `{+I, -U, +U, -D}`. +- `RowKind` ([flink-core](../../flink-core/src/main/java/org/apache/flink/types/RowKind.java)) - the per-record tag. +- `ModifyKindSet` / `ModifyKindSetTrait` and `UpdateKind` / `UpdateKindTrait` (`plan/trait/`) - the planner-internal split. `ModifyKindSet` says *which kinds flow*; `UpdateKind` says whether updates are `BEFORE_AND_AFTER`, `ONLY_UPDATE_AFTER`, or `NONE`. Splitting them lets the planner request the cheaper `ONLY_UPDATE_AFTER` form when downstream tolerates it. + +### Two-pass inference + +[`FlinkChangelogModeInferenceProgram.scala`](src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala) runs three nested visitors: + +1. **`SatisfyModifyKindSetTraitVisitor`** (bottom-up) - asks each node which `RowKind`s it *can* produce given its inputs. +2. **`SatisfyUpdateKindTraitVisitor`** (top-down) - pushes downstream's `UpdateKind` requirement up the tree, so producers don't emit `-U` when nobody needs it. +3. **`SatisfyDeleteKindTraitVisitor`** (top-down) - decides between delete-by-key and full-delete on inputs that produce `-D`. + +Each visitor is a big `case` match over `StreamPhysicalRel` subclasses. A new physical node that isn't covered by those matches falls through to default handling and may silently get wrong changelog metadata - add it to all three visitors when introducing one. + +### Primary keys, unique keys, upsert keys + +Three related but distinct concepts: + +- **Primary key (PK)** - user contract, declared in `CREATE TABLE ... PRIMARY KEY NOT ENFORCED`. Always NOT NULL (Flink enforces this at table creation). At most one per table. Used by sinks to know what to upsert against, and by sources to mark a row identifier. The PK is what shows up in `ResolvedSchema.getPrimaryKey()`. +- **Unique key** - planner-derived set(s) of columns guaranteed to identify a row uniquely in a given relational node's output. Multiple unique keys may coexist (e.g. PK plus a `GROUP BY` introducing another). Tracked by Calcite metadata in [`FlinkRelMdUniqueKeys`](src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala). Carries an `ignoreNulls` flag - with `ignoreNulls=false` (strict), a column that may contain NULLs is not considered unique because of SQL's `NULL != NULL` rule. +- **Upsert key** - the subset of unique keys that the *streaming pipeline* can use to route and apply updates. Derived in [`FlinkRelMdUpsertKeys.scala`](src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala) starting from unique keys (with strict `ignoreNulls=false`). Two helpers shape the result: `enrichWithImmutableColumns` adds extra key variants by unioning each base key with the input's immutable columns (e.g. `{k1}` plus `{k1, immutable_col}`), giving downstream more candidates to match against; `filterKeys` then keeps only variants whose columns subsume the hash distribution key (so all updates for that key land at the same task). The "smallest" upsert key chosen for an operator comes from [`UpsertKeyUtil.getSmallestKey`](src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java). + +**Why nullability matters.** A nullable upsert key column would let two distinct logical rows both have `NULL` and be conflated by the upsert routing - SQL's `NULL != NULL` breaks the "this update applies to exactly that row" invariant. PKs are NOT NULL by definition so PK-derived upsert keys are always safe; non-PK unique keys only become upsert keys if their columns are strictly unique under `ignoreNulls=false`. + +**PK vs upsert key at the sink.** When the upstream upsert key equals (or is contained in) the sink's PK, updates can be routed straight through. When they differ, the planner inserts `SinkUpsertMaterializer` to reconcile - see the operators section below. This mismatch is the single most common source of expensive plans. + +**Preservation through expressions.** Casts and projections preserve upsert keys *only when injective* - non-injective casts (e.g. `BIGINT -> INT` on a key column) collapse distinct keys and must be treated as key-destroying. See FLINK-39088 for the recent injective-cast handling. When extending key derivation through a new `RexCall`, add an explicit injectivity check; do not assume preservation. + +### Operators the planner inserts + +The planner injects these to bridge changelog-mode mismatches. See [flink-table-runtime AGENTS.md](../flink-table-runtime/AGENTS.md#changelog-processing) for runtime internals and worked examples. + +For what each operator actually does at runtime (the per-key dedupe/expand/normalize logic of ChangelogNormalize, the rollback algorithm of SUM, V1 vs V2 vs `WatermarkCompactingSinkMaterializer`), see [flink-table-runtime AGENTS.md](../flink-table-runtime/AGENTS.md#changelog-processing). The planner-side concern is *when* and *why* each gets inserted: + +**`StreamPhysicalChangelogNormalize`** ([file](src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalChangelogNormalize.scala)) - inserted as a speculative placeholder during physical conversion, then often pruned once trait inference knows what downstream actually consumes: + +1. **Speculative insertion during physical conversion.** [`StreamPhysicalTableSourceScanRule`](src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala) calls [`DynamicSourceUtils.changelogNormalizeEnabled`](src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java); if true, a `StreamPhysicalChangelogNormalize` is placed on top of the scan as a placeholder. Conditions: the source is an **upsert source** (`+U` present, `-U` absent, PK declared) or a **CDC source with possible duplicates** (non-insert-only + PK + `TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE=true`); and `eventTimeSnapshotRequired` is false. At this point the planner has no view of what downstream actually consumes, so it inserts the node defensively. +2. **Trait inference decides what survives.** Once `FlinkChangelogModeInferenceProgram` runs, it knows the full downstream demand: + - If downstream is happy with `ONLY_UPDATE_AFTER` and no other consumer needs the normalize's work (no filter pushed in, no CDC dedup requested, no metadata columns accessed), [`ChangelogNormalizeRequirementResolver.isRequired`](src/main/java/org/apache/flink/table/planner/plan/optimize/ChangelogNormalizeRequirementResolver.java) returns false and the node is removed entirely - the upsert stream flows through untouched. + - Otherwise the node is kept, with `generateUpdateBefore` toggled based on demand (`BEFORE_AND_AFTER` vs `ONLY_UPDATE_AFTER`) by `SatisfyUpdateKindTraitVisitor`, and the input delete trait picked by `SatisfyDeleteKindTraitVisitor` (prefers delete-by-key, falls back to full). +3. **Filter pushdown** is prepared by [`FlinkMarkChangelogNormalizeProgram`](src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/FlinkMarkChangelogNormalizeProgram.java) + `PushCalcPastChangelogNormalizeRule`, which can push a downstream filter into a surviving normalize to shrink its state. + +The takeaway: an upsert source does not force a `ChangelogNormalize` in the final plan - it just opens the door for one, and the inference pass is the real arbiter. + +**`StreamPhysicalDropUpdateBefore`** ([file](src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDropUpdateBefore.scala)) - inserted by trait inference in `FlinkChangelogModeInferenceProgram` when an upstream produces `BEFORE_AND_AFTER` but the requirement above is `ONLY_UPDATE_AFTER` - drops `-U` records to save shuffle cost. + +**`StreamPhysicalSinkUpsertMaterializer` (SUM)** - decision lives in [`FlinkChangelogModeInferenceProgram.analyzeUpsertMaterializeStrategy`](src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala), gated by `TABLE_EXEC_SINK_UPSERT_MATERIALIZE`: + +- `FORCE` - inserted whenever the sink has a non-empty PK and is not a retract sink. +- `NONE` - never inserted. +- `AUTO` (default) - inserted only if **all** of: sink has PK, sink is upsert (not append, not retract), input has updates, and **sink PK does not contain the input upsert key**. Skipped (FLINK-38201) for retract sinks; also skipped for an append input combined with `ON CONFLICT DO DEDUPLICATE` (the dedupe is implicit). + +The runtime variant (V1 / V2 / `WatermarkCompactingSinkMaterializer`) is chosen in [`StreamExecSink`](src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java) based on `SinkUpsertMaterializeStrategy` and the `ON CONFLICT` clause. `ON CONFLICT DO ERROR` / `DO NOTHING` additionally requires source watermarks (`validateSourcesHaveWatermarks`). + +### Sink contract + +[`DynamicTableSink#getChangelogMode(ChangelogMode requestedMode)`](../flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java) is a *negotiation*, not a getter. Return only modes the sink can actually handle: the planner uses the answer to decide which of the operators above to insert. Silently accepting `RETRACT` when you only handle `INSERT` produces wrong results at runtime. + +### Common pitfalls + +- New `StreamPhysicalRel` declares `INSERT_ONLY` but downstream of it produces updates - inference picks the wrong mode and codegen emits without `-U`. +- Adding a `RexCall` to upsert-key derivation without an injectivity check (FLINK-39088 territory). +- Forcing `BEFORE_AND_AFTER` when `ONLY_UPDATE_AFTER` would suffice - inserts an unnecessary `ChangelogNormalize` and bloats state. +- Changing trait inference and not regenerating plan-test XML golden files. +- Assuming SUM V1 layout in tests when V2 is now active by default. + ## Testing Patterns Choose test types based on what you're changing: diff --git a/flink-table/flink-table-runtime/AGENTS.md b/flink-table/flink-table-runtime/AGENTS.md index cd61620f20cd1..82354786a4da3 100644 --- a/flink-table/flink-table-runtime/AGENTS.md +++ b/flink-table/flink-table-runtime/AGENTS.md @@ -68,7 +68,106 @@ Some functions also require custom code generators in the planner (e.g., `JsonCa - When modifying state serializers, create a `TypeSerializerSnapshot` with version bumping - Migration test resources follow naming: `migration-flink----snapshot` -- Rescaling tests verify state redistribution across parallelism changes (see `SinkUpsertMaterializerMigrationTest`, `SinkUpsertMaterializerRescalingTest`) + - Rescaling tests verify state redistribution across parallelism changes (see `SinkUpsertMaterializerMigrationTest`, `SinkUpsertMaterializerRescalingTest`) + + ## Changelog Processing + +Runtime operators carry the `RowKind` contract that the planner's trait inference assumed. Get it wrong and state diverges from intent: the operator may double-count, leak rows, or emit inconsistent updates. See [flink-table-planner AGENTS.md](../flink-table-planner/AGENTS.md#changelog-processing) for the planner-side picture. + +### `RowKind` semantics + +`+I` (INSERT), `-U` (UPDATE_BEFORE), `+U` (UPDATE_AFTER), `-D` (DELETE). Stateful operators that buffer rows must apply `-U`/`-D` as retractions; emitting `-U`/`+U` pairs vs. only `+U` depends on the operator's declared `ChangelogMode` (negotiated by the planner). Defined in [`RowKind.java`](../../flink-core/src/main/java/org/apache/flink/types/RowKind.java). + +### Time signals: row column vs `StreamRecord.timestamp` + +Two distinct time concepts coexist on the streaming runtime, and they behave very differently in the SQL/Table layer than in the DataStream API: + +- **Row event-time column** - a regular column inside the `RowData` payload, declared via `WATERMARK FOR AS ...`. This is the **canonical event-time signal in SQL**. It is data, propagates with the row through every operator, and is what built-in time-aware operators (window aggregates, temporal joins, FLIP-558 V2 SUM compaction, etc.) actually read. For a CDC `-U`/`-D`, this column carries the *prior version's* logical time - not when the deletion happened. +- **`StreamRecord.timestamp`** - the optional `long` (millis since epoch) attached to the value envelope ([`StreamRecord.java`](../../flink-runtime/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java); semantic contract on [`TimestampAssigner`](../../flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java): "event time, used by all functions that operate on event time"). Sentinel `Long.MIN_VALUE` means "no timestamp set". This is the DataStream API's native event-time channel. + +**The envelope timestamp is not reliably propagated through the SQL pipeline.** It is set at the source (e.g. Kafka emits with `consumerRecord.timestamp()` in [`KafkaRecordEmitter.java:46-58`](https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java#L46-L58); a pushed `WatermarkStrategy.withTimestampAssigner(...)` via `SupportsWatermarkPushDown` may overwrite it from a row column). After that, most table operators **don't preserve it**: + +- [`TimestampedCollector`](../../flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java) - the wrapper used by many table operators - reuses a `StreamRecord` whose timestamp starts unset and is only attached if the operator explicitly calls `setTimestamp(...)` / `setAbsoluteTimestamp(...)`. `SinkUpsertMaterializer` (V1 and V2) creates a `TimestampedCollector` and never sets a timestamp, so emitted records leave with `hasTimestamp=false`. `WindowAggOperator` explicitly calls `eraseTimestamp()` before assigning the window's emit time. Outcome: relying on the envelope timestamp mid-SQL-pipeline is unsafe. +- [`WatermarkAssignerOperator`](src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java) reads the row's rowtime column to emit `Watermark` events but does **not** set `StreamRecord.timestamp`. + +The only place the SQL runtime deliberately writes the envelope from a row column is [`StreamRecordTimestampInserter`](src/main/java/org/apache/flink/table/runtime/operators/sink/StreamRecordTimestampInserter.java) - inserted right before sinks so connectors (e.g. Kafka producer) can stamp the outgoing record with the row's logical event-time. + +**Practical rule for SQL/Table code:** + +- Treat the row's event-time column as the source of truth. +- Treat `Watermark` events (separate from records) as the time-progress signal - they are propagated by the framework regardless of envelope timestamps. +- Don't read `element.getTimestamp()` in a new table operator unless you know the upstream chain set it (window operators, source, FLIP-558 V2 input where the operator does use `element.getTimestamp()` internally for ordering). Use a row column index instead. +- DataStream API users see the envelope timestamp directly and can rely on it; the table runtime cannot. + +**`-U` and `-D` carry the prior version's row-column timestamp.** Conceptually a `-U`/`-D` is "the row that was previously visible is gone", so it carries the row being removed - including its event-time column. For Debezium-style CDC, the `before` image's `update_time` is exactly the time the deleted version was last valid. For partial-delete tombstones expanded by `ChangelogNormalize`, the emitted `-D` payload comes from state (the prior `+I`/`+U` row contents), so its row column carries the prior version's time. + +Confusing the row event-time column with the envelope timestamp is the source of most "watermark stuck on deletes" / "temporal join sees stale dim after delete" / "SUM state grows unbounded" bugs. See FLIP-558 motivation for the broader framing. + +### `ChangelogNormalize` - what it does + +The runtime side is [`ProcTimeDeduplicateKeepLastRowFunction`](src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java) (or its mini-batch sibling), called from `processLastRowOnChangelog` in [`DeduplicateFunctionHelper`](src/main/java/org/apache/flink/table/runtime/operators/deduplicate/utils/DeduplicateFunctionHelper.java). It keeps the latest row per unique key in `ValueState` and performs four jobs in one operator: + +1. **Deduplicate by key** - if a `+U`/`+I` arrives for an already-stored row with identical content (and TTL is disabled), the duplicate is swallowed. +2. **Synthesize `-U` (UPDATE_BEFORE)** - when `generateUpdateBefore=true`, an incoming `+U` is paired with a `-U` built from the previous state value. This converts a partial-image upsert stream `{+I, +U, -D}` into a full retract stream `{+I, -U, +U, -D}` for downstream operators that need it. +3. **Expand partial deletes into full deletes** - upsert sources (e.g. Kafka tombstones) often deliver `-D` with only key columns. ChangelogNormalize emits the *stored* full row as `-D` instead, so downstream operators see the complete row content. See the comment at [`DeduplicateFunctionHelper.java:140-148`](src/main/java/org/apache/flink/table/runtime/operators/deduplicate/utils/DeduplicateFunctionHelper.java#L140-L148). +4. **Normalize `RowKind`** - rewrites the in-state row to `+I` and emits `+I` (first occurrence) or `+U` (subsequent updates), so the downstream sees a clean retract changelog regardless of upstream sloppiness. + +Optionally pushes a filter into the same operator (`filterCondition`) and supports mini-batching. **Note:** ChangelogNormalize does *not* fix changelog disorder across keys - it operates per key and assumes ordering inside a key. Cross-key disorder is `SinkUpsertMaterializerV2`'s job (FLIP-558). + +Concrete example for an upsert source `{+I, +U, -D}` with `generateUpdateBefore=true`: + +``` +input: +I(k=1, v=A) +U(k=1, v=B) -D(k=1, [tombstone]) +state: - +I(k=1, v=A) +I(k=1, v=B) +emit: +I(k=1, v=A) -U(k=1, v=A) +U(k=1, v=B) -D(k=1, v=B) +``` + +### `SinkUpsertMaterializer` (SUM) - what it does + +SUM resolves PK conflicts when the *upstream upsert key differs from the sink's primary key*. Imagine two source rows with different upsert keys mapping to the same sink PK (e.g. a join produces two rows with `name='Football'` but different `product_id`s into a sink keyed by `name`). SUM keeps a per-PK history of "which source-key-versions are still alive" so it knows the right value to emit at each step. + +[`SinkUpsertMaterializer`](src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java) (V1) uses `ValueState>` keyed by sink PK. Three responsibilities: + +1. **Track all live versions per PK** - on `+I`/`+U`, replace the entry with matching upsert key (or append if new). Emit `+I` if the list was empty, otherwise `+U`. +2. **Apply retractions to the right entry** - on `-U`/`-D`, find and remove the first matching entry by upsert key. +3. **"Rollback" emission rule** - if the removed entry was the *last* in the list, emit `+U` for the new last row (rolling back to the prior live value); if the list becomes empty, emit `-D`; if a middle entry was removed, emit nothing. + +This per-PK history is the source of the well-known SUM state-size problem (FLIP-558 motivation). TTL via `StateTtlConfig` is the only mitigation in V1. + +Example for two source upsert keys `(p=1, name=Football, q=5)` and `(p=2, name=Football, q=6)` both targeting sink PK `name`: + +``` +input: +U(p=1,name=F,q=5) +U(p=2,name=F,q=6) -U(p=2,name=F,q=6) +state: [(p=1,q=5)] [(p=1,q=5),(p=2,q=6)] [(p=1,q=5)] +emit: +I(name=F,q=5) +U(name=F,q=6) +U(name=F,q=5) // rollback +``` + +### `SinkUpsertMaterializerV2` (FLIP-558, FLINK-38461) + +[`SinkUpsertMaterializerV2`](src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerV2.java) keeps the same DEDUPLICATE/rollback contract as V1 but replaces the per-key `List` with a `SequencedMultiSetState` that tracks insertion order via the `StreamRecord` timestamp. Watermark-driven compaction inside the multiset addresses changelog disorder (out-of-order `-U`/`+U` arriving via different upstream paths). Removal returns a `StateChangeInfo` enum: + +- `REMOVAL_ALL` -> emit `-D` +- `REMOVAL_LAST_ADDED` -> emit `+U` for the new last row (rollback) +- `REMOVAL_OTHER` -> no-op +- `REMOVAL_NOT_FOUND` -> warn, no-op + +Sibling operator `WatermarkCompactingSinkMaterializer` is a different code path used for the new `ON CONFLICT DO ERROR` / `DO NOTHING` strategies (FLIP-558) - it compacts on combined-watermark progression and either fails or drops on duplicate PKs instead of running the rollback algorithm. Selection between SUM V1, V2, and the watermark-compacting variant happens in [`StreamExecSink`](../flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java) based on `TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY` and the `ON CONFLICT` clause. + +### `FROM_CHANGELOG` / `TO_CHANGELOG` PTFs (FLIP-564) + +Two built-in process table functions that expose the stream/table duality in SQL: + +- **`FROM_CHANGELOG`** (FLINK-39261) - parses an append-only CDC stream into a Flink changelog. Current parameters in [`BuiltInFunctionDefinitions.java:815-837`](../flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java#L815-L837): `input` (table, row semantics), `op` (descriptor pointing at the operation-code column), `op_mapping` (`MAP` mapping source codes to `RowKind` names, e.g. `'u' -> 'UPDATE_BEFORE, UPDATE_AFTER'`). Output changelog mode is `ChangelogMode.all()`. Unmapped or NULL op codes throw `TableRuntimeException` (FLINK-39495 - prior to that fix they were silently dropped). Note: the FLIP-564 design proposes additional parameters (`before`/`after` image descriptors, `state_ttl`, partition/order syntax for reordering, `invalid_op_handling`) that are not in the current implementation. +- **`TO_CHANGELOG`** (FLINK-39419, FLINK-39392) - emits a CDC append stream from a dynamic table. Current parameters in [`BuiltInFunctionDefinitions.java:783-813`](../flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java#L783-L813): `input` (table), `op`, `op_mapping`. Declared with `ROW_SEMANTIC_TABLE`, `SUPPORT_UPDATES`, `REQUIRE_UPDATE_BEFORE`, and `REQUIRE_FULL_DELETE` traits, so the planner ensures the input arrives as a full retract changelog with materialized `-U` and full-row `-D`. Useful for archival/audit pipelines or sending to non-CDC-aware sinks. + +Type strategies in flink-table-common: `FromChangelogTypeStrategy.java`, `ToChangelogTypeStrategy.java`. + +### Common pitfalls + +- New sink/aggregator silently ignoring `-U`/`-D` because it was written assuming insert-only input. +- Tests asserting V1 state layout against an operator now running V2. +- New PTFs that don't honor `state_ttl` and grow state forever. +- Skipping `op_mapping` edge cases in PTF tests (multi-code mappings, NULL op codes, unmapped op codes which now throw). ## Testing Patterns