diff --git a/flink-table/flink-table-planner/AGENTS.md b/flink-table/flink-table-planner/AGENTS.md index 969c5b366569d..4c82b34e23a7f 100644 --- a/flink-table/flink-table-planner/AGENTS.md +++ b/flink-table/flink-table-planner/AGENTS.md @@ -38,7 +38,7 @@ After the first full build, drop `-am` for faster rebuilds when you're only chan - `plan/rules/physical/stream/` and `plan/rules/physical/batch/` — Physical planner rules - `plan/rules/logical/` — Logical optimization rules - `plan/nodes/exec/stream/` and `plan/nodes/exec/batch/` — ExecNodes (bridge between planner and runtime) -- `plan/nodes/exec/spec/` — Serializable operator specifications (JoinSpec, WindowSpec, etc.) +- `plan/nodes/exec/spec/` — Serializable operator specifications (JoinSpec, OverSpec, etc.) - `plan/nodes/physical/stream/` and `plan/nodes/physical/batch/` — Intermediate physical nodes (Calcite-based) - `plan/nodes/logical/` — Logical nodes (Calcite-based) - `codegen/` — Code generation @@ -52,57 +52,170 @@ After the first full build, drop `-am` for faster rebuilds when you're only chan - **ExecNode**: Bridge between planner and runtime. Annotated with `@ExecNodeMetadata(name, version, minPlanVersion, minStateVersion)` for versioning and backwards compatibility. Extends `ExecNodeBase` and implements either `StreamExecNode` (streaming) or `BatchExecNode` (batch); `T` is typically `RowData`. - **Physical rules**: Extend `RelRule`, use Immutables `@Value.Immutable` for config. Transform logical nodes to physical nodes. Registered in `FlinkStreamRuleSets` and/or `FlinkBatchRuleSets`. - **Logical optimization rules**: Also extend `RelRule`, often use `RexShuttle` for expression rewriting. Registered in rule sets. -- **Specs**: Serializable specifications in `plan/nodes/exec/spec/` (JoinSpec, WindowSpec, etc.) that carry operator configuration. +- **Specs**: Serializable specifications in `plan/nodes/exec/spec/` (JoinSpec, OverSpec, etc.) that carry operator configuration. -## Common Change Patterns +## SQL to Operator lifecycle -### Adding a new table operator +Every table operator traverses the same seven stages. Read the diagram top-to-bottom for the shape, then jump into the stage you need. -Components involved (can be developed top-down or bottom-up): +``` +SQL string + │ parse + ▼ +SqlNode (AST) + │ validate ← names resolve, types infer + ▼ +typed SqlNode + │ SqlToRel + ▼ +RelNode + RexNode ← logical algebra + │ logical phases + ▼ +FlinkLogical*Node + │ physical phase + ▼ +FlinkPhysical*Node + │ translateToExecNode + ▼ +ExecNode (checkpoint-stable serializable plan) + │ translateToPlanInternal + codegen + ▼ +StreamOperator (runs on TaskManager) +``` -1. **Runtime operator** in `flink-table-runtime` under `operators/` (extend `TableStreamOperator`, implement `OneInputStreamOperator` or `TwoInputStreamOperator`). Test with harness tests. See [flink-table-runtime AGENTS.md](../flink-table-runtime/AGENTS.md). -2. **ExecNode** in `plan/nodes/exec/stream/` and/or `plan/nodes/exec/batch/` (extend `ExecNodeBase`; implement `StreamExecNode` for streaming or `BatchExecNode` for batch; annotate with `@ExecNodeMetadata`; `T` is typically `RowData`) -3. **Physical Node + Physical Rules** in `plan/rules/physical/stream/` and/or `plan/rules/physical/batch/` (physical rules usually extend `ConverterRule` via `Config.INSTANCE.withConversion(...)`; same-convention rewrites extend `RelRule` with an `@Value.Immutable` config) -4. **Logical Node + Planner rule** -5. Tests: semantic tests, plan tests, restore tests (if stateful) +**Five types to know:** -Both `stream/` and `batch/` directories exist for rules and ExecNodes. Consider whether your change applies to one or both. +> - **`SqlNode`** - a node in the SQL syntax tree (what you typed). +> - **`RelNode`** - a node in the relational algebra tree (scan, project, join, ...). +> - **`RexNode`** - an expression inside a `RelNode` (column ref, literal, function call). +> - **`ExecNode`** - the planner-runtime bridge, JSON-serializable so a compiled plan can be saved and restored. +> - **`StreamOperator`** - the runtime class that processes records on a TaskManager. -### Adding a planner optimization rule +**Two planners to know** (both used in stages 4-5, see `FlinkStreamProgram` for the chained phases): -Pick the base class by what the rule does: -- Converts a node from one calling convention to another (for example, logical → stream physical): extend `ConverterRule`. -Call `ConverterRule.Config.INSTANCE.withConversion(...)` in the constructor, do not define your own config. -- Rewrites nodes within the same convention (logical → logical, physical → physical): extend `RelRule` with an `@Value.Immutable` config. -Some existing rules still use Calcite's older `RelOptRule`; prefer `RelRule` for new code. +> - **HepPlanner** - deterministic, fires rules in a fixed order until no more match. Used for rewrites that don't need cost estimation: subquery elimination, decorrelation, predicate pushdown, projection cleanup, etc. Rules typically extend `RelRule` (or the older `RelOptRule`) and rewrite within the same calling convention (logical → logical, physical → physical). +> - **VolcanoPlanner** - cost-based, explores alternative plans and picks the cheapest using a cost model. Used for convention conversion (Calcite logical → Flink logical in the `LOGICAL` phase, Flink logical → Flink physical in the `PHYSICAL` phase) and for picking among physical strategies (e.g., interval join vs regular join). Rules typically extend `ConverterRule`, change calling convention via `Config.INSTANCE.withConversion(...)`, and participate in the cost-based search. -Then: -1. Register in `FlinkStreamRuleSets.scala` and/or `FlinkBatchRuleSets.scala` -2. Plan tests with XML golden files — when the test fails, copy the framework's generated log file over the reference `.xml` (cases are ordered alphabetically by method name) -3. A same-convention rewrite needs no runtime changes. A `ConverterRule` that produces a new physical node also needs the physical node, ExecNode, and runtime operator — see "Adding a new table operator" above. +Each stage below names the framework piece, then shows where binary join and PTF land in it. + +### 1. Parsing (SQL text → SqlNode tree) +SQL string is tokenized and parsed into a `SqlNode` AST by the generated parser (grammar in `flink-sql-parser/parserImpls.ftl`, driver in `planner/parse/CalciteParser`). + + Examples: + - Binary join: `a JOIN b ON a.x = b.y` becomes a `SqlJoin` node. + - PTF: `TABLE(my_ptf(TABLE t1 PARTITION BY a))` becomes a `SqlBasicCall` holding the function call with table args. + +### 2. Validation (resolve names, infer types) +`FlinkPlannerImpl.validate` runs `FlinkCalciteSqlValidator` over the AST: identifiers (tables, functions, columns) get resolved, types are inferred, arg counts/types are checked. Function calls are matched to a `SqlOperator` via the `SqlOperatorTable`. Extension points: register a function in the operator table; provide a `TypeInference` for typing. + + Examples: + - Binary join: the `SqlJoin` operator is validated; equality predicates resolve to standard `SqlBinaryOperator`s. + - PTF: `FunctionCatalogOperatorTable.lookupOperatorOverloads` resolves the function name; `BridgingSqlFunction` wraps the user's `TypeInference` via `SystemTypeInference` (system args, output column derivation). + +### 3. SqlToRel (SqlNode → RelNode + RexNode) +`FlinkPlannerImpl.rel` invokes Calcite's `SqlToRelConverter` to produce the initial `RelNode` tree with `RexNode` expressions. Customizable via convertlets registered in `FlinkConvertletTable`. + + Examples: + - Binary join: produces a `LogicalJoin` with the ON condition as a `RexNode`. + - PTF: `FlinkConvertletTable.convertTableArgs` rewrites table arguments into `RexTableArgCall` operands. + +### 4. Logical plan (rewrite to a cheaper equivalent tree) +`FlinkStreamProgram` chains the phases that run during this stage. HepPlanner drives the rewrite phases (`SUBQUERY_REWRITE`, `TEMPORAL_JOIN_REWRITE`, `DECORRELATE`, `DEFAULT_REWRITE`, `PREDICATE_PUSHDOWN`, `JOIN_REORDER`, `MULTI_JOIN`, `PROJECT_REWRITE`, `LOGICAL_REWRITE`); VolcanoPlanner drives the `LOGICAL` phase that converts Calcite logical nodes to Flink logical nodes (required output trait `FlinkConventions.LOGICAL`); `TIME_INDICATOR` runs the dedicated `FlinkRelTimeIndicatorProgram`. All rule sets live in `FlinkStreamRuleSets`. + + Examples: + - Binary join: `FlinkLogicalJoinConverter` produces `FlinkLogicalJoin`. + - PTF: `FlinkLogicalTableFunctionScanConverter` produces `FlinkLogicalTableFunctionScan`, then calls `BridgingSqlFunction.resolveCallTraits(call)` to bake conditional traits into the operator's static args. + +### 5. Physical plan (pick a concrete execution strategy) +VolcanoPlanner runs the `PHYSICAL` phase, cost-picking a physical strategy via `ConverterRule`s. The `PHYSICAL_REWRITE` phase runs post-passes such as `FlinkChangelogModeInferenceProgram`. + + Examples: + - Binary join: depending on predicate shape, `StreamPhysicalJoinRule` / `StreamPhysicalIntervalJoinRule` / `StreamPhysicalTemporalJoinRule` produces `StreamPhysicalJoin`, `StreamPhysicalIntervalJoin`, or `StreamPhysicalTemporalJoin`. + - PTF: `StreamPhysicalProcessTableFunctionRule` produces `StreamPhysicalProcessTableFunction`. + +### 6. Exec node & compiled plan (serializable handoff to the runtime) +`physicalNode.translateToExecNode()` produces an `@ExecNodeMetadata`-annotated `ExecNode` (the bridge to runtime). JSON serde via `ExecNodeGraphJsonSerializer/Deserializer` (and `RexNodeJsonSerializer/Deserializer` for expressions) supports compiled-plan write and restore. Each `ExecNode` is rebuilt via its `@JsonCreator` constructor on restore. + + Examples: + - Binary join: `StreamExecJoin`. + - PTF: `StreamExecProcessTableFunction`. Compiled-plan restore re-runs `BridgingSqlFunction.resolveCallTraits` from the `StreamExecProcessTableFunction` `@JsonCreator` constructor so the runtime still sees the resolved signature. + +### 7. Runtime (codegen and build the operator) +`ExecNode.translateToPlanInternal` drives codegen, wraps the generated runner in an operator factory, and emits a `Transformation`. At task startup the factory instantiates the actual operator and `open()` wires state, timers, collectors, then opens the runner; `processElement` feeds records into the runner. Runtime classes live in `flink-table-runtime` (see [its AGENTS.md](../flink-table-runtime/AGENTS.md)). + + Examples: + - Binary join (streaming): `StreamExecJoin` wires a `StreamingJoinOperator` (or `MiniBatchStreamingJoinOperator` / `AsyncStateStreamingJoinOperator` depending on config). Batch counterparts (`BatchExecHashJoin`, `BatchExecSortMergeJoin`) wire `HashJoinOperator` (codegen via `LongHashJoinGenerator`) or `SortMergeJoinOperator` instead. + - PTF: `ProcessTableRunnerGenerator` produces a `GeneratedProcessTableRunner` wrapped in `ProcessTableOperatorFactory`; the factory creates either `ProcessSetTableOperator` (set semantics, keyed) or `ProcessRowTableOperator` (row semantics or no table args), both extending `AbstractProcessTableOperator`. + +### Worked examples (side-by-side) + +| Stage | Binary join: `SELECT * FROM a JOIN b ON a.x = b.y` | PTF: `SELECT * FROM TABLE(my_ptf(TABLE t1 PARTITION BY a))` | +| ------------- | -------------------------------------------------- | ----------------------------------------------------------- | +| Parsing | `SqlJoin` | `SqlBasicCall` | +| Validation | `SqlJoin` operator | `BridgingSqlFunction` | +| SqlToRel | `LogicalJoin` with ON `RexNode` | `RexCall` with `RexTableArgCall` operand | +| Logical | `FlinkLogicalJoin` | `FlinkLogicalTableFunctionScan` | +| Physical | `StreamPhysicalJoin` | `StreamPhysicalProcessTableFunction` | +| Exec / plan | `StreamExecJoin` | `StreamExecProcessTableFunction` | +| Runtime | `StreamingJoinOperator` | `ProcessTableOperatorFactory` -> `ProcessSetTableOperator` | + +## Common workflows -### Extending SQL syntax +Recipes organized by the lifecycle stage they target. Stage numbering matches the lifecycle above; stages without recipes today are omitted. The final "Cross-stage" subsection groups workflows that touch more than one stage. + +### 1. Parsing + +#### 1.1 Extending SQL syntax 1. Modify parser grammar in `flink-sql-parser` (`parserImpls.ftl`) 2. Add operation conversion logic in `SqlNodeToOperationConversion.java` 3. Test with parser tests and SQL gateway integration tests (`.q` files) -### Code generation changes +### 6. Exec node & compiled plan + +#### 6.1 Plan serialization changes + +- ExecNode specs use Jackson for JSON serialization. Source/sink specs should use `@JsonIgnoreProperties(ignoreUnknown = true)` for forward compatibility. +- When adding new ExecNode features, update `RexNodeJsonDeserializer` or related serde classes if new function kinds or types are introduced. + +#### 6.2 ExecNode versioning + +When bumping an ExecNode version, update the `@ExecNodeMetadata` annotation's `version` and `minPlanVersion`/`minStateVersion` fields. Add restore test snapshots for the new version. + +### 7. Runtime + +#### 7.1 Code generation changes - Cast rules live in `functions/casting/`. Each extends `AbstractExpressionCodeGeneratorCastRule` or similar. - Custom call generators for functions live in `codegen/calls/` (e.g., `JsonCallGen.scala`). Simple scalar functions typically don't need these; the planner handles them uniformly through the function definition. - Immutables library is used for rule configs (`@Value.Immutable`, `@Value.Enclosing`). See [README.md](README.md). -### Plan serialization changes +### Cross-stage -- ExecNode specs use Jackson for JSON serialization. Source/sink specs should use `@JsonIgnoreProperties(ignoreUnknown = true)` for forward compatibility. -- When adding new ExecNode features, update `RexNodeJsonDeserializer` or related serde classes if new function kinds or types are introduced. +#### Adding a new table operator -### ExecNode versioning +Components involved (can be developed top-down or bottom-up): -When bumping an ExecNode version, update the `@ExecNodeMetadata` annotation's `version` and `minPlanVersion`/`minStateVersion` fields. Add restore test snapshots for the new version. +1. **(Runtime)** Runtime operator in `flink-table-runtime` under `operators/` ((extend TableStreamOperator, implement OneInputStreamOperator, TwoInputStreamOperator, or MultipleInputStreamOperator for N-ary inputs). Test with harness tests. See [flink-table-runtime AGENTS.md](../flink-table-runtime/AGENTS.md). +2. **(Exec node & compiled plan)** ExecNode in `plan/nodes/exec/stream/` and/or `plan/nodes/exec/batch/` (extend `ExecNodeBase`; implement `StreamExecNode` for streaming or `BatchExecNode` for batch; annotate with `@ExecNodeMetadata`; `T` is typically `RowData`) +3. **(Physical plan)** Physical Node + Physical Rules in `plan/rules/physical/stream/` and/or `plan/rules/physical/batch/` (physical rules usually extend `ConverterRule` via `Config.INSTANCE.withConversion(...)`; same-convention rewrites extend `RelRule` with an `@Value.Immutable` config) +4. **(Logical plan)** Logical Node + Planner rule +5. Tests: semantic tests, plan tests, restore tests (if stateful) + +Both `stream/` and `batch/` directories exist for rules and ExecNodes. Consider whether your change applies to one or both. + +#### Adding a planner optimization rule + +Pick the base class by what the rule does: +- Converts a node from one calling convention to another (for example, logical → stream physical, **Physical plan**): extend `ConverterRule`. Call `ConverterRule.Config.INSTANCE.withConversion(...)` in the constructor, do not define your own config. +- Rewrites nodes within the same convention (logical → logical at **Logical plan**, physical → physical at **Physical plan**): extend `RelRule` with an `@Value.Immutable` config. Some existing rules still use Calcite's older `RelOptRule`; prefer `RelRule` for new code. + +Then: +1. Register in `FlinkStreamRuleSets.scala` and/or `FlinkBatchRuleSets.scala` +2. Plan tests with XML golden files — when the test fails, copy the framework's generated log file over the reference `.xml` (cases are ordered alphabetically by method name) +3. A same-convention rewrite needs no runtime changes. A `ConverterRule` that produces a new physical node also needs the physical node, ExecNode, and runtime operator — see "Adding a new table operator" above. -### Configuration options +## Configuration options New features often introduce `ExecutionConfigOptions` entries (in `flink-table-api-java`) for runtime tunability (e.g., cache sizes, timeouts, batch sizes).