Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 142 additions & 29 deletions flink-table/flink-table-planner/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ After the first full build, drop `-am` for faster rebuilds when you're only chan
- `plan/rules/physical/stream/` and `plan/rules/physical/batch/` — Physical planner rules
- `plan/rules/logical/` — Logical optimization rules
- `plan/nodes/exec/stream/` and `plan/nodes/exec/batch/` — ExecNodes (bridge between planner and runtime)
- `plan/nodes/exec/spec/` — Serializable operator specifications (JoinSpec, WindowSpec, etc.)
- `plan/nodes/exec/spec/` — Serializable operator specifications (JoinSpec, OverSpec, etc.)
- `plan/nodes/physical/stream/` and `plan/nodes/physical/batch/` — Intermediate physical nodes (Calcite-based)
- `plan/nodes/logical/` — Logical nodes (Calcite-based)
- `codegen/` — Code generation
Expand All @@ -52,57 +52,170 @@ After the first full build, drop `-am` for faster rebuilds when you're only chan
- **ExecNode**: Bridge between planner and runtime. Annotated with `@ExecNodeMetadata(name, version, minPlanVersion, minStateVersion)` for versioning and backwards compatibility. Extends `ExecNodeBase<T>` and implements either `StreamExecNode<T>` (streaming) or `BatchExecNode<T>` (batch); `T` is typically `RowData`.
- **Physical rules**: Extend `RelRule`, use Immutables `@Value.Immutable` for config. Transform logical nodes to physical nodes. Registered in `FlinkStreamRuleSets` and/or `FlinkBatchRuleSets`.
- **Logical optimization rules**: Also extend `RelRule`, often use `RexShuttle` for expression rewriting. Registered in rule sets.
- **Specs**: Serializable specifications in `plan/nodes/exec/spec/` (JoinSpec, WindowSpec, etc.) that carry operator configuration.
- **Specs**: Serializable specifications in `plan/nodes/exec/spec/` (JoinSpec, OverSpec, etc.) that carry operator configuration.

## Common Change Patterns
## SQL to Operator lifecycle

### Adding a new table operator
Every table operator traverses the same seven stages. Read the diagram top-to-bottom for the shape, then jump into the stage you need.

Components involved (can be developed top-down or bottom-up):
```
SQL string
│ parse
SqlNode (AST)
│ validate ← names resolve, types infer
typed SqlNode
│ SqlToRel
RelNode + RexNode ← logical algebra
│ logical phases
FlinkLogical*Node
│ physical phase
FlinkPhysical*Node
│ translateToExecNode
ExecNode (checkpoint-stable serializable plan)
│ translateToPlanInternal + codegen
StreamOperator (runs on TaskManager)
```

1. **Runtime operator** in `flink-table-runtime` under `operators/` (extend `TableStreamOperator`, implement `OneInputStreamOperator` or `TwoInputStreamOperator`). Test with harness tests. See [flink-table-runtime AGENTS.md](../flink-table-runtime/AGENTS.md).
2. **ExecNode** in `plan/nodes/exec/stream/` and/or `plan/nodes/exec/batch/` (extend `ExecNodeBase<T>`; implement `StreamExecNode<T>` for streaming or `BatchExecNode<T>` for batch; annotate with `@ExecNodeMetadata`; `T` is typically `RowData`)
3. **Physical Node + Physical Rules** in `plan/rules/physical/stream/` and/or `plan/rules/physical/batch/` (physical rules usually extend `ConverterRule` via `Config.INSTANCE.withConversion(...)`; same-convention rewrites extend `RelRule` with an `@Value.Immutable` config)
4. **Logical Node + Planner rule**
5. Tests: semantic tests, plan tests, restore tests (if stateful)
**Five types to know:**

Both `stream/` and `batch/` directories exist for rules and ExecNodes. Consider whether your change applies to one or both.
> - **`SqlNode`** - a node in the SQL syntax tree (what you typed).
> - **`RelNode`** - a node in the relational algebra tree (scan, project, join, ...).
> - **`RexNode`** - an expression inside a `RelNode` (column ref, literal, function call).
> - **`ExecNode`** - the planner-runtime bridge, JSON-serializable so a compiled plan can be saved and restored.
> - **`StreamOperator`** - the runtime class that processes records on a TaskManager.

### Adding a planner optimization rule
**Two planners to know** (both used in stages 4-5, see `FlinkStreamProgram` for the chained phases):

Pick the base class by what the rule does:
- Converts a node from one calling convention to another (for example, logical → stream physical): extend `ConverterRule`.
Call `ConverterRule.Config.INSTANCE.withConversion(...)` in the constructor, do not define your own config.
- Rewrites nodes within the same convention (logical → logical, physical → physical): extend `RelRule` with an `@Value.Immutable` config.
Some existing rules still use Calcite's older `RelOptRule`; prefer `RelRule` for new code.
> - **HepPlanner** - deterministic, fires rules in a fixed order until no more match. Used for rewrites that don't need cost estimation: subquery elimination, decorrelation, predicate pushdown, projection cleanup, etc. Rules typically extend `RelRule` (or the older `RelOptRule`) and rewrite within the same calling convention (logical → logical, physical → physical).
> - **VolcanoPlanner** - cost-based, explores alternative plans and picks the cheapest using a cost model. Used for convention conversion (Calcite logical → Flink logical in the `LOGICAL` phase, Flink logical → Flink physical in the `PHYSICAL` phase) and for picking among physical strategies (e.g., interval join vs regular join). Rules typically extend `ConverterRule`, change calling convention via `Config.INSTANCE.withConversion(...)`, and participate in the cost-based search.

Then:
1. Register in `FlinkStreamRuleSets.scala` and/or `FlinkBatchRuleSets.scala`
2. Plan tests with XML golden files — when the test fails, copy the framework's generated log file over the reference `.xml` (cases are ordered alphabetically by method name)
3. A same-convention rewrite needs no runtime changes. A `ConverterRule` that produces a new physical node also needs the physical node, ExecNode, and runtime operator — see "Adding a new table operator" above.
Each stage below names the framework piece, then shows where binary join and PTF land in it.

### 1. Parsing (SQL text → SqlNode tree)
SQL string is tokenized and parsed into a `SqlNode` AST by the generated parser (grammar in `flink-sql-parser/parserImpls.ftl`, driver in `planner/parse/CalciteParser`).

Examples:
- Binary join: `a JOIN b ON a.x = b.y` becomes a `SqlJoin` node.
- PTF: `TABLE(my_ptf(TABLE t1 PARTITION BY a))` becomes a `SqlBasicCall` holding the function call with table args.

### 2. Validation (resolve names, infer types)
`FlinkPlannerImpl.validate` runs `FlinkCalciteSqlValidator` over the AST: identifiers (tables, functions, columns) get resolved, types are inferred, arg counts/types are checked. Function calls are matched to a `SqlOperator` via the `SqlOperatorTable`. Extension points: register a function in the operator table; provide a `TypeInference` for typing.

Examples:
- Binary join: the `SqlJoin` operator is validated; equality predicates resolve to standard `SqlBinaryOperator`s.
- PTF: `FunctionCatalogOperatorTable.lookupOperatorOverloads` resolves the function name; `BridgingSqlFunction` wraps the user's `TypeInference` via `SystemTypeInference` (system args, output column derivation).

### 3. SqlToRel (SqlNode → RelNode + RexNode)
`FlinkPlannerImpl.rel` invokes Calcite's `SqlToRelConverter` to produce the initial `RelNode` tree with `RexNode` expressions. Customizable via convertlets registered in `FlinkConvertletTable`.

Examples:
- Binary join: produces a `LogicalJoin` with the ON condition as a `RexNode`.
- PTF: `FlinkConvertletTable.convertTableArgs` rewrites table arguments into `RexTableArgCall` operands.

### 4. Logical plan (rewrite to a cheaper equivalent tree)
`FlinkStreamProgram` chains the phases that run during this stage. HepPlanner drives the rewrite phases (`SUBQUERY_REWRITE`, `TEMPORAL_JOIN_REWRITE`, `DECORRELATE`, `DEFAULT_REWRITE`, `PREDICATE_PUSHDOWN`, `JOIN_REORDER`, `MULTI_JOIN`, `PROJECT_REWRITE`, `LOGICAL_REWRITE`); VolcanoPlanner drives the `LOGICAL` phase that converts Calcite logical nodes to Flink logical nodes (required output trait `FlinkConventions.LOGICAL`); `TIME_INDICATOR` runs the dedicated `FlinkRelTimeIndicatorProgram`. All rule sets live in `FlinkStreamRuleSets`.

Examples:
- Binary join: `FlinkLogicalJoinConverter` produces `FlinkLogicalJoin`.
- PTF: `FlinkLogicalTableFunctionScanConverter` produces `FlinkLogicalTableFunctionScan`, then calls `BridgingSqlFunction.resolveCallTraits(call)` to bake conditional traits into the operator's static args.

### 5. Physical plan (pick a concrete execution strategy)
VolcanoPlanner runs the `PHYSICAL` phase, cost-picking a physical strategy via `ConverterRule`s. The `PHYSICAL_REWRITE` phase runs post-passes such as `FlinkChangelogModeInferenceProgram`.

Examples:
- Binary join: depending on predicate shape, `StreamPhysicalJoinRule` / `StreamPhysicalIntervalJoinRule` / `StreamPhysicalTemporalJoinRule` produces `StreamPhysicalJoin`, `StreamPhysicalIntervalJoin`, or `StreamPhysicalTemporalJoin`.
- PTF: `StreamPhysicalProcessTableFunctionRule` produces `StreamPhysicalProcessTableFunction`.

### 6. Exec node & compiled plan (serializable handoff to the runtime)
`physicalNode.translateToExecNode()` produces an `@ExecNodeMetadata`-annotated `ExecNode` (the bridge to runtime). JSON serde via `ExecNodeGraphJsonSerializer/Deserializer` (and `RexNodeJsonSerializer/Deserializer` for expressions) supports compiled-plan write and restore. Each `ExecNode` is rebuilt via its `@JsonCreator` constructor on restore.

Examples:
- Binary join: `StreamExecJoin`.
- PTF: `StreamExecProcessTableFunction`. Compiled-plan restore re-runs `BridgingSqlFunction.resolveCallTraits` from the `StreamExecProcessTableFunction` `@JsonCreator` constructor so the runtime still sees the resolved signature.

### 7. Runtime (codegen and build the operator)
`ExecNode.translateToPlanInternal` drives codegen, wraps the generated runner in an operator factory, and emits a `Transformation`. At task startup the factory instantiates the actual operator and `open()` wires state, timers, collectors, then opens the runner; `processElement` feeds records into the runner. Runtime classes live in `flink-table-runtime` (see [its AGENTS.md](../flink-table-runtime/AGENTS.md)).

Examples:
- Binary join (streaming): `StreamExecJoin` wires a `StreamingJoinOperator` (or `MiniBatchStreamingJoinOperator` / `AsyncStateStreamingJoinOperator` depending on config). Batch counterparts (`BatchExecHashJoin`, `BatchExecSortMergeJoin`) wire `HashJoinOperator` (codegen via `LongHashJoinGenerator`) or `SortMergeJoinOperator` instead.
- PTF: `ProcessTableRunnerGenerator` produces a `GeneratedProcessTableRunner` wrapped in `ProcessTableOperatorFactory`; the factory creates either `ProcessSetTableOperator` (set semantics, keyed) or `ProcessRowTableOperator` (row semantics or no table args), both extending `AbstractProcessTableOperator`.

### Worked examples (side-by-side)

| Stage | Binary join: `SELECT * FROM a JOIN b ON a.x = b.y` | PTF: `SELECT * FROM TABLE(my_ptf(TABLE t1 PARTITION BY a))` |
| ------------- | -------------------------------------------------- | ----------------------------------------------------------- |
| Parsing | `SqlJoin` | `SqlBasicCall` |
| Validation | `SqlJoin` operator | `BridgingSqlFunction` |
| SqlToRel | `LogicalJoin` with ON `RexNode` | `RexCall` with `RexTableArgCall` operand |
| Logical | `FlinkLogicalJoin` | `FlinkLogicalTableFunctionScan` |
| Physical | `StreamPhysicalJoin` | `StreamPhysicalProcessTableFunction` |
| Exec / plan | `StreamExecJoin` | `StreamExecProcessTableFunction` |
| Runtime | `StreamingJoinOperator` | `ProcessTableOperatorFactory` -> `ProcessSetTableOperator` |

## Common workflows

### Extending SQL syntax
Recipes organized by the lifecycle stage they target. Stage numbering matches the lifecycle above; stages without recipes today are omitted. The final "Cross-stage" subsection groups workflows that touch more than one stage.

### 1. Parsing

#### 1.1 Extending SQL syntax

1. Modify parser grammar in `flink-sql-parser` (`parserImpls.ftl`)
2. Add operation conversion logic in `SqlNodeToOperationConversion.java`
3. Test with parser tests and SQL gateway integration tests (`.q` files)

### Code generation changes
### 6. Exec node & compiled plan

#### 6.1 Plan serialization changes

- ExecNode specs use Jackson for JSON serialization. Source/sink specs should use `@JsonIgnoreProperties(ignoreUnknown = true)` for forward compatibility.
- When adding new ExecNode features, update `RexNodeJsonDeserializer` or related serde classes if new function kinds or types are introduced.

#### 6.2 ExecNode versioning

When bumping an ExecNode version, update the `@ExecNodeMetadata` annotation's `version` and `minPlanVersion`/`minStateVersion` fields. Add restore test snapshots for the new version.

### 7. Runtime

#### 7.1 Code generation changes

- Cast rules live in `functions/casting/`. Each extends `AbstractExpressionCodeGeneratorCastRule` or similar.
- Custom call generators for functions live in `codegen/calls/` (e.g., `JsonCallGen.scala`). Simple scalar functions typically don't need these; the planner handles them uniformly through the function definition.
- Immutables library is used for rule configs (`@Value.Immutable`, `@Value.Enclosing`). See [README.md](README.md).

### Plan serialization changes
### Cross-stage

- ExecNode specs use Jackson for JSON serialization. Source/sink specs should use `@JsonIgnoreProperties(ignoreUnknown = true)` for forward compatibility.
- When adding new ExecNode features, update `RexNodeJsonDeserializer` or related serde classes if new function kinds or types are introduced.
#### Adding a new table operator

### ExecNode versioning
Components involved (can be developed top-down or bottom-up):

When bumping an ExecNode version, update the `@ExecNodeMetadata` annotation's `version` and `minPlanVersion`/`minStateVersion` fields. Add restore test snapshots for the new version.
1. **(Runtime)** Runtime operator in `flink-table-runtime` under `operators/` ((extend TableStreamOperator, implement OneInputStreamOperator, TwoInputStreamOperator, or MultipleInputStreamOperator for N-ary inputs). Test with harness tests. See [flink-table-runtime AGENTS.md](../flink-table-runtime/AGENTS.md).
2. **(Exec node & compiled plan)** ExecNode in `plan/nodes/exec/stream/` and/or `plan/nodes/exec/batch/` (extend `ExecNodeBase<T>`; implement `StreamExecNode<T>` for streaming or `BatchExecNode<T>` for batch; annotate with `@ExecNodeMetadata`; `T` is typically `RowData`)
3. **(Physical plan)** Physical Node + Physical Rules in `plan/rules/physical/stream/` and/or `plan/rules/physical/batch/` (physical rules usually extend `ConverterRule` via `Config.INSTANCE.withConversion(...)`; same-convention rewrites extend `RelRule` with an `@Value.Immutable` config)
4. **(Logical plan)** Logical Node + Planner rule
5. Tests: semantic tests, plan tests, restore tests (if stateful)

Both `stream/` and `batch/` directories exist for rules and ExecNodes. Consider whether your change applies to one or both.

#### Adding a planner optimization rule

Pick the base class by what the rule does:
- Converts a node from one calling convention to another (for example, logical → stream physical, **Physical plan**): extend `ConverterRule`. Call `ConverterRule.Config.INSTANCE.withConversion(...)` in the constructor, do not define your own config.
- Rewrites nodes within the same convention (logical → logical at **Logical plan**, physical → physical at **Physical plan**): extend `RelRule` with an `@Value.Immutable` config. Some existing rules still use Calcite's older `RelOptRule`; prefer `RelRule` for new code.

Then:
1. Register in `FlinkStreamRuleSets.scala` and/or `FlinkBatchRuleSets.scala`
2. Plan tests with XML golden files — when the test fails, copy the framework's generated log file over the reference `.xml` (cases are ordered alphabetically by method name)
3. A same-convention rewrite needs no runtime changes. A `ConverterRule` that produces a new physical node also needs the physical node, ExecNode, and runtime operator — see "Adding a new table operator" above.

### Configuration options
## Configuration options

New features often introduce `ExecutionConfigOptions` entries (in `flink-table-api-java`) for runtime tunability (e.g., cache sizes, timeouts, batch sizes).

Expand Down