Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,27 @@ This section maps common types of Flink changes to the modules they touch and th
6. Add to release notes
7. Verify: ArchUnit tests pass, no new architecture violations

## Changelog Processing

Flink SQL's data model is a continuously evolving changelog: every row carries a `RowKind` (`+I`, `-U`, `+U`, `-D`) and every stream a `ChangelogMode`. Most planner and runtime mistakes in the table layer trace back to mishandling this. Recent FLIPs reshape the area:

- **FLIP-558** - `SinkUpsertMaterializerV2` (FLINK-38461) fixes changelog disorder via watermark-based compaction; new `ON CONFLICT` syntax (in progress) lets users opt out of the expensive history-tracking "rollback" SUM behavior.
- **FLIP-564** - `FROM_CHANGELOG` (FLINK-39261) and `TO_CHANGELOG` (FLINK-39419) PTFs expose stream/table conversion in SQL, the equivalent of DataStream's `fromChangelogStream`/`toChangelogStream`.

For depth see [flink-table-planner/AGENTS.md](flink-table/flink-table-planner/AGENTS.md#changelog-processing) (trait inference, upsert keys, when the planner inserts `ChangelogNormalize` / `SinkUpsertMaterializer` / `DropUpdateBefore`) and [flink-table-runtime/AGENTS.md](flink-table/flink-table-runtime/AGENTS.md#changelog-processing) (`SinkUpsertMaterializer` V1 vs V2 internals, FLIP-564 PTFs).

## SQL Best Practices

Guidance for writing SQL (and for AI generating it) that minimizes changelog-related cost and surprises.

1. **Keep the upstream upsert key equal to the sink primary key.** When they differ the planner must insert `SinkUpsertMaterializer`, which (in V1) tracks per-key update history. Add a `GROUP BY` on the PK columns or use `LAST_VALUE` instead of relying on sink-side deduplication.
2. **Prefer append-only sources when semantics allow.** Changelog sources force `ChangelogNormalize` and downstream retraction state.
3. **Define watermarks on changelog sources.** Disorder-tolerant operators (V2 SUM, FLIP-564 `FROM_CHANGELOG ORDER BY`) need watermark progress to compact and emit. No watermarks = stalled output or buffered state growth.
4. **Use `FROM_CHANGELOG` for custom CDC formats** (FLIP-564) instead of writing a connector when the source is an append CDC stream.
5. **Use `TO_CHANGELOG` to materialize a CDC append stream** out of a Flink table for archival, audit, or non-CDC-aware sinks.
6. **Match sink `ChangelogMode` to what the sink can actually do.** Don't declare upsert capability for a sink that cannot upsert; use retract and let the planner negotiate.
7. **Prefer retract sinks when the downstream system handles full deletes** - retract sinks bypass `SinkUpsertMaterializer` entirely (FLINK-38201).

## Coding Standards

- **Format Java files with Spotless immediately after editing:** `./mvnw spotless:apply`. Uses google-java-format with AOSP style.
Expand Down
72 changes: 72 additions & 0 deletions flink-table/flink-table-planner/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,78 @@ When bumping an ExecNode version, update the `@ExecNodeMetadata` annotation's `v

New features often introduce `ExecutionConfigOptions` entries (in `flink-table-api-java`) for runtime tunability (e.g., cache sizes, timeouts, batch sizes).

## Changelog Processing

Every stream in Flink SQL carries a `ChangelogMode` - the set of `RowKind`s (`+I`, `-U`, `+U`, `-D`) it can produce or consume. The planner tracks this as two traits and propagates them across the plan: most subtle table-layer bugs trace back to mishandling these.

### Core types

- `ChangelogMode` ([flink-table-common](../flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java)) - the public contract for source/sink connectors. The three canonical modes: append `{+I}`, upsert `{+I, +U, -D}`, retract `{+I, -U, +U, -D}`.
- `RowKind` ([flink-core](../../flink-core/src/main/java/org/apache/flink/types/RowKind.java)) - the per-record tag.
- `ModifyKindSet` / `ModifyKindSetTrait` and `UpdateKind` / `UpdateKindTrait` (`plan/trait/`) - the planner-internal split. `ModifyKindSet` says *which kinds flow*; `UpdateKind` says whether updates are `BEFORE_AND_AFTER`, `ONLY_UPDATE_AFTER`, or `NONE`. Splitting them lets the planner request the cheaper `ONLY_UPDATE_AFTER` form when downstream tolerates it.

### Two-pass inference

[`FlinkChangelogModeInferenceProgram.scala`](src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala) runs three nested visitors:

1. **`SatisfyModifyKindSetTraitVisitor`** (bottom-up) - asks each node which `RowKind`s it *can* produce given its inputs.
2. **`SatisfyUpdateKindTraitVisitor`** (top-down) - pushes downstream's `UpdateKind` requirement up the tree, so producers don't emit `-U` when nobody needs it.
3. **`SatisfyDeleteKindTraitVisitor`** (top-down) - decides between delete-by-key and full-delete on inputs that produce `-D`.

Each visitor is a big `case` match over `StreamPhysicalRel` subclasses. A new physical node that isn't covered by those matches falls through to default handling and may silently get wrong changelog metadata - add it to all three visitors when introducing one.

### Primary keys, unique keys, upsert keys

Three related but distinct concepts:

- **Primary key (PK)** - user contract, declared in `CREATE TABLE ... PRIMARY KEY NOT ENFORCED`. Always NOT NULL (Flink enforces this at table creation). At most one per table. Used by sinks to know what to upsert against, and by sources to mark a row identifier. The PK is what shows up in `ResolvedSchema.getPrimaryKey()`.
- **Unique key** - planner-derived set(s) of columns guaranteed to identify a row uniquely in a given relational node's output. Multiple unique keys may coexist (e.g. PK plus a `GROUP BY` introducing another). Tracked by Calcite metadata in [`FlinkRelMdUniqueKeys`](src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala). Carries an `ignoreNulls` flag - with `ignoreNulls=false` (strict), a column that may contain NULLs is not considered unique because of SQL's `NULL != NULL` rule.
- **Upsert key** - the subset of unique keys that the *streaming pipeline* can use to route and apply updates. Derived in [`FlinkRelMdUpsertKeys.scala`](src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala) starting from unique keys (with strict `ignoreNulls=false`). Two helpers shape the result: `enrichWithImmutableColumns` adds extra key variants by unioning each base key with the input's immutable columns (e.g. `{k1}` plus `{k1, immutable_col}`), giving downstream more candidates to match against; `filterKeys` then keeps only variants whose columns subsume the hash distribution key (so all updates for that key land at the same task). The "smallest" upsert key chosen for an operator comes from [`UpsertKeyUtil.getSmallestKey`](src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java).

**Why nullability matters.** A nullable upsert key column would let two distinct logical rows both have `NULL` and be conflated by the upsert routing - SQL's `NULL != NULL` breaks the "this update applies to exactly that row" invariant. PKs are NOT NULL by definition so PK-derived upsert keys are always safe; non-PK unique keys only become upsert keys if their columns are strictly unique under `ignoreNulls=false`.

**PK vs upsert key at the sink.** When the upstream upsert key equals (or is contained in) the sink's PK, updates can be routed straight through. When they differ, the planner inserts `SinkUpsertMaterializer` to reconcile - see the operators section below. This mismatch is the single most common source of expensive plans.

**Preservation through expressions.** Casts and projections preserve upsert keys *only when injective* - non-injective casts (e.g. `BIGINT -> INT` on a key column) collapse distinct keys and must be treated as key-destroying. See FLINK-39088 for the recent injective-cast handling. When extending key derivation through a new `RexCall`, add an explicit injectivity check; do not assume preservation.

### Operators the planner inserts

The planner injects these to bridge changelog-mode mismatches. See [flink-table-runtime AGENTS.md](../flink-table-runtime/AGENTS.md#changelog-processing) for runtime internals and worked examples.

For what each operator actually does at runtime (the per-key dedupe/expand/normalize logic of ChangelogNormalize, the rollback algorithm of SUM, V1 vs V2 vs `WatermarkCompactingSinkMaterializer`), see [flink-table-runtime AGENTS.md](../flink-table-runtime/AGENTS.md#changelog-processing). The planner-side concern is *when* and *why* each gets inserted:

**`StreamPhysicalChangelogNormalize`** ([file](src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalChangelogNormalize.scala)) - inserted as a speculative placeholder during physical conversion, then often pruned once trait inference knows what downstream actually consumes:

1. **Speculative insertion during physical conversion.** [`StreamPhysicalTableSourceScanRule`](src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTableSourceScanRule.scala) calls [`DynamicSourceUtils.changelogNormalizeEnabled`](src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java); if true, a `StreamPhysicalChangelogNormalize` is placed on top of the scan as a placeholder. Conditions: the source is an **upsert source** (`+U` present, `-U` absent, PK declared) or a **CDC source with possible duplicates** (non-insert-only + PK + `TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE=true`); and `eventTimeSnapshotRequired` is false. At this point the planner has no view of what downstream actually consumes, so it inserts the node defensively.
2. **Trait inference decides what survives.** Once `FlinkChangelogModeInferenceProgram` runs, it knows the full downstream demand:
- If downstream is happy with `ONLY_UPDATE_AFTER` and no other consumer needs the normalize's work (no filter pushed in, no CDC dedup requested, no metadata columns accessed), [`ChangelogNormalizeRequirementResolver.isRequired`](src/main/java/org/apache/flink/table/planner/plan/optimize/ChangelogNormalizeRequirementResolver.java) returns false and the node is removed entirely - the upsert stream flows through untouched.
- Otherwise the node is kept, with `generateUpdateBefore` toggled based on demand (`BEFORE_AND_AFTER` vs `ONLY_UPDATE_AFTER`) by `SatisfyUpdateKindTraitVisitor`, and the input delete trait picked by `SatisfyDeleteKindTraitVisitor` (prefers delete-by-key, falls back to full).
3. **Filter pushdown** is prepared by [`FlinkMarkChangelogNormalizeProgram`](src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/FlinkMarkChangelogNormalizeProgram.java) + `PushCalcPastChangelogNormalizeRule`, which can push a downstream filter into a surviving normalize to shrink its state.

The takeaway: an upsert source does not force a `ChangelogNormalize` in the final plan - it just opens the door for one, and the inference pass is the real arbiter.

**`StreamPhysicalDropUpdateBefore`** ([file](src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDropUpdateBefore.scala)) - inserted by trait inference in `FlinkChangelogModeInferenceProgram` when an upstream produces `BEFORE_AND_AFTER` but the requirement above is `ONLY_UPDATE_AFTER` - drops `-U` records to save shuffle cost.

**`StreamPhysicalSinkUpsertMaterializer` (SUM)** - decision lives in [`FlinkChangelogModeInferenceProgram.analyzeUpsertMaterializeStrategy`](src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala), gated by `TABLE_EXEC_SINK_UPSERT_MATERIALIZE`:

- `FORCE` - inserted whenever the sink has a non-empty PK and is not a retract sink.
- `NONE` - never inserted.
- `AUTO` (default) - inserted only if **all** of: sink has PK, sink is upsert (not append, not retract), input has updates, and **sink PK does not contain the input upsert key**. Skipped (FLINK-38201) for retract sinks; also skipped for an append input combined with `ON CONFLICT DO DEDUPLICATE` (the dedupe is implicit).

The runtime variant (V1 / V2 / `WatermarkCompactingSinkMaterializer`) is chosen in [`StreamExecSink`](src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java) based on `SinkUpsertMaterializeStrategy` and the `ON CONFLICT` clause. `ON CONFLICT DO ERROR` / `DO NOTHING` additionally requires source watermarks (`validateSourcesHaveWatermarks`).

### Sink contract

[`DynamicTableSink#getChangelogMode(ChangelogMode requestedMode)`](../flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java) is a *negotiation*, not a getter. Return only modes the sink can actually handle: the planner uses the answer to decide which of the operators above to insert. Silently accepting `RETRACT` when you only handle `INSERT` produces wrong results at runtime.

### Common pitfalls

- New `StreamPhysicalRel` declares `INSERT_ONLY` but downstream of it produces updates - inference picks the wrong mode and codegen emits without `-U`.
- Adding a `RexCall` to upsert-key derivation without an injectivity check (FLINK-39088 territory).
- Forcing `BEFORE_AND_AFTER` when `ONLY_UPDATE_AFTER` would suffice - inserts an unnecessary `ChangelogNormalize` and bloats state.
- Changing trait inference and not regenerating plan-test XML golden files.
- Assuming SUM V1 layout in tests when V2 is now active by default.

## Testing Patterns

Choose test types based on what you're changing:
Expand Down
Loading