Support GROUP BY GROUPING SETS / ROLLUP / CUBE in the multi-stage query engine#18817
Support GROUP BY GROUPING SETS / ROLLUP / CUBE in the multi-stage query engine#18817xiangfu0 wants to merge 11 commits into
Conversation
…ion until supported Foundation for GROUP BY GROUPING SETS / ROLLUP / CUBE in the multi-stage engine. - AggregateNode now carries per-set membership bitmasks over its union group keys (plan.proto field 8 + serde), mirroring the single-stage PinotQuery.groupingSetMasks so the per-set row expansion can later be pushed down to the single-stage (leaf) engine. - Until the leaf pushdown + final-stage merge land, both AggregateNode converters reject ROLLUP/CUBE/GROUPING SETS with a clear error instead of silently collapsing to a plain GROUP BY (which dropped the subtotal/grand-total rows). These queries run on the single-stage engine, which supports them natively.
…ion to the single-stage leaf Implements multi-stage execution of grouping-set aggregates by pushing the per-set row expansion down to the single-stage (leaf) engine — the same backend-expansion model used by Doris/StarRocks — reusing the single-stage GROUPING SETS support (apache#18662) wholesale. Plan split (PinotAggregateExchangeNodeInsertRule): a grouping-set aggregate becomes LEAF -> EXCHANGE -> FINAL -> PROJECT - LEAF carries the grouping sets and emits the synthetic $groupingId discriminator column after the union group keys (PinotLogicalAggregate.deriveRowType). - EXCHANGE hash-partitions by the union keys AND $groupingId so each (set, key) co-locates. - FINAL is a plain SIMPLE aggregate grouping by [union keys..., $groupingId], so rows from different grouping sets stay distinct with no grouping-set-specific merge logic. - PROJECT drops $groupingId to restore the original aggregate row type. Leaf pushdown (ServerPlanRequestVisitor): sets PinotQuery.groupingSetMasks so the single-stage engine does the expansion + $groupingId, identical to the single-stage path. RelToPlanNodeConverter encodes Calcite groupSets into the AggregateNode masks (carried via the plan.proto field added earlier). The v2 physical planner still rejects grouping sets. Verified by MSE-vs-single-stage parity integration tests for ROLLUP, CUBE, mixed plain+ROLLUP, single-column ROLLUP, and ROLLUP+HAVING (GroupingSetsQueriesTest#testMultiStage*). Known limitations (follow-ups): GROUPING() / GROUPING_ID(), explicit GROUP BY GROUPING SETS ((a,b),...) tuple syntax (parsed as a ROW by the multi-stage validator), the v2 physical planner, leaf group trim, and >31 grouping columns are rejected with a clear error so the query runs on the single-stage engine instead.
…ROLLUP / CUBE Computes GROUPING() / GROUPING_ID() in the multi-stage engine from the $groupingId discriminator, mirroring the single-stage post-aggregation handler. - Register SqlStdOperatorTable.GROUPING and GROUPING_ID in PinotOperatorTable so the validator resolves them. - In the grouping-set plan split, GROUPING() / GROUPING_ID() are not pushed to the LEAF/FINAL aggregations (they are not real aggregations). They are split out and computed in the final PROJECT as bit expressions over the $groupingId column: GROUPING(col) = bitAnd( bitShiftRightUnsigned($groupingId, k), 1), packed with the first argument as the most significant bit (k = the column's index in the union group keys, matching the single-stage bit convention where a set bit means the column is rolled up). Real aggregate results are referenced from the FINAL output; $groupingId is dropped. Verified by MSE-vs-single-stage parity tests for GROUPING()/GROUPING_ID() in SELECT and in HAVING (GroupingSetsQueriesTest#testMultiStageGroupingFunctionsParity, testMultiStageGroupingInHavingParity), with all existing ROLLUP/CUBE parity tests still green.
… syntax The multi-stage validator rejected explicit GROUP BY GROUPING SETS ((a, b), (a), ()) because the parenthesized grouping-set tuples are parsed as ROW expressions, and RowExpressionValidationVisitor only allowed ROW operands under VALUES / INSERT / ARRAY constructors. Allow them under the GROUPING_SETS / ROLLUP / CUBE constructs as well (the tuples are grouping sets, not row constructors). ROLLUP / CUBE already worked because their operands are bare column references. With this, explicit GROUPING SETS syntax flows through to the leaf-pushdown execution path that already handles grouping sets, matching the single-stage engine. Broaden the multi-stage parity tests: explicit GROUPING SETS syntax, a composite grouping-set level ((a, b)), and filtered aggregation combined with grouping sets.
Now that the multi-stage engine supports GROUP BY GROUPING SETS / ROLLUP / CUBE, run the order-independent aggregation and GROUPING() / GROUPING_ID() tests on both engines via the useBothQueryEngines data provider (verifying correctness directly on each engine, replacing the v1-vs-v2 parity tests, which are now redundant). Kept single-stage only, with comments, the tests that are inherently engine-specific: null-handling-disabled queries (the engines differ on reading genuine NULLs), multi-value grouping columns (rejected as an intermediate-stage key in the multi-stage engine), the single-stage ORDER BY comparator paths, and the compile-time rejection cases.
…t rejected The increment that added multi-stage GROUPING SETS / ROLLUP / CUBE execution made the old QueryCompilationTest#testGroupingSetsRejectedInMultiStage assertion stale (it expected a rejection error). Replace it with testGroupingSetsSupportedInMultiStage, asserting these queries — including GROUPING() / GROUPING_ID() and explicit GROUPING SETS tuple syntax — now plan successfully.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18817 +/- ##
============================================
+ Coverage 64.74% 64.75% +0.01%
Complexity 1319 1319
============================================
Files 3390 3393 +3
Lines 210693 211173 +480
Branches 33070 33155 +85
============================================
+ Hits 136414 136750 +336
- Misses 63287 63396 +109
- Partials 10992 11027 +35
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
xiangfu0
left a comment
There was a problem hiding this comment.
Found one high-signal compatibility issue; see inline comment.
| for (int i = 0; i < groupCount; i++) { | ||
| builder.add(fields.get(i)); | ||
| } | ||
| builder.add(GroupingSets.GROUPING_ID_COLUMN, typeFactory.createSqlType(SqlTypeName.INTEGER)); |
There was a problem hiding this comment.
This changes the leaf-stage wire row layout, but there is no mixed-version gate for the new broker/server contract. Older servers will ignore AggregateNode.groupingSets, build a plain grouped leaf PinotQuery, and return the pre-$groupingId shape during a rolling upgrade. That breaks Pinot's mixed-version broker/server guarantee for this feature. Can we gate this plan shape on worker capability, or fall back to single-stage / explicit failure until all servers are upgraded?
…a a RepeatOperator Grouping sets were only executable when the aggregate sat directly on a table scan (pushed down to the single-stage leaf, which expands rows per set). Over a JOIN (or any intermediate-stage input) the aggregate runs in the multi-stage runtime, which could not expand rows. Add RepeatOperator: for each input row and grouping set it emits one row with the rolled-up columns set to NULL and the synthetic $groupingId discriminator appended (bit i set iff union column i is rolled up, matching the single-stage convention). The multi-stage equivalent of the single-stage per-set expansion / Doris's RepeatNode. AggregateOperator wraps the input of a grouping-set aggregate in a RepeatOperator and runs an ordinary GROUP BY over the union columns plus $groupingId, so no grouping-set-specific aggregation logic is needed and the existing FINAL merge + GROUPING()/GROUPING_ID() projection handle the rest. The efficient single-stage leaf pushdown is preserved for aggregates directly over a scan (that path does not build AggregateOperator). Verified end-to-end: testMultiStageRollupOverJoin and testMultiStageGroupingOverJoin (self-join + ROLLUP / GROUPING) alongside the existing scan-input parity tests.
513b62b to
616c781
Compare
… physical planner The v2 physical optimizer (usePhysicalOptimizer=true) runs its own AggregatePushdownRule instead of the default planner's PinotAggregateExchangeNodeInsertRule, so grouping-set aggregates were previously rejected on the v2 path. Mirror the single-stage split here: - PhysicalAggregate.deriveRowType() appends the synthetic $groupingId INT discriminator after the union group-by columns for LEAF non-SIMPLE aggregates. - AggregatePushdownRule splits a grouping-set aggregate into LEAF (carries the grouping sets) -> EXCHANGE (repartitions by the union keys plus $groupingId) -> FINAL (SIMPLE aggregate grouping by [union keys, $groupingId]) -> PROJECT (drops $groupingId). The per-set row expansion itself happens at runtime in the planner-agnostic RepeatOperator. - PRelToPlanNodeConverter no longer rejects grouping sets; the masks flow to the runtime. Guards mirror the single-stage rule: reject more than MAX_GROUPING_SET_COLUMNS (31) distinct grouping columns (the $groupingId bitmask is 32-bit), and reject GROUPING() / GROUPING_ID() over grouping sets on the v2 path for now (run those on the default planner). Tested via GroupingSetsQueriesTest: testV2PhysicalPlannerRollup verifies the genuine vs rolled-up NULL discrimination through the v2 plan, and testV2PhysicalPlannerRejectsGroupingFunction confirms the v2 path is actually engaged (the default planner accepts GROUPING()).
xiangfu0
left a comment
There was a problem hiding this comment.
Found one high-signal issue in the new v2 grouping-sets path; see inline comment.
| hintOptions = Map.of(); | ||
| } | ||
| boolean isInputExchange = call._currentNode.unwrap().getInput(0) instanceof Exchange; | ||
| if (aggRel.getGroupType() != Aggregate.Group.SIMPLE) { |
There was a problem hiding this comment.
This new GROUPING SETS branch now returns addPartialAggregateForGroupingSets(...) before it checks withinGroupCollation, but the logical planner explicitly avoids leaf/final splitting when a WITHIN GROUP ordering is present. With usePhysicalOptimizer=true, a query like LISTAGG(...) WITHIN GROUP (...) under ROLLUP/CUBE can now be partially aggregated without preserving the required order, which is a silent wrong-result risk instead of the previous planner fallback. Can we keep the same withinGroupCollation escape hatch here and skip partial aggregation when ordered aggregates are involved?
Parameterize the v2 physical-planner ROLLUP test with the useBothQueryEngines data provider. On the multi-stage engine usePhysicalOptimizer engages the v2 split; the single-stage engine ignores the flag and runs the same ROLLUP, so the test now also cross-checks both engines return identical rows.
…vider Replace the hardcoded setUseMultiStageQueryEngine(true/false) calls on single-engine tests with the useV1QueryEngine / useV2QueryEngine data providers, so engine selection is declarative and uniform across the suite (matching the useBothQueryEngines tests). Single-stage-specific tests (null-handling-off, multi-value keys, single-stage ORDER BY, compile-time rejections) use useV1QueryEngine; multi-stage-specific tests (grouping sets over a JOIN, the v2 physical planner cases) use useV2QueryEngine. No behavior change.
Code review + cleanup, no behavior change: - Centralize the $groupingId LEAF row-type layout in GroupingSets.appendGroupingIdColumn and call it from both PinotLogicalAggregate and PhysicalAggregate, so the default and v2 planners can no longer drift on the column layout. - Add RepeatOperatorTest covering ROLLUP, CUBE, and a non-contiguous explicit grouping set (verifies the rolled-up columns are NULLed and the $groupingId discriminator is correct). - Note the rolling-upgrade behavior of the new plan.proto groupingSets field (upgrade servers before brokers; an old server runs a plain GROUP BY and drops the subtotal rows). - Drop the runRows test helper in favor of the inherited postQuery; remove vendor product references from comments; use /// markdown doc comments consistently.
Summary
Adds native execution of
GROUP BY GROUPING SETS (...)/ROLLUP(...)/CUBE(...)and theGROUPING()/GROUPING_ID()functions to the multi-stage query engine (MSE), building on thesingle-stage support in #18662.
Two execution paths, both producing identical results:
engine, which expands each row across the grouping sets and appends the synthetic
$groupingIddiscriminator.Reuses the single-stage engine wholesale (the Doris/StarRocks backend-expansion model).
JOIN) — a newRepeatOperatorin the multi-stage runtimeperforms the same per-set row expansion (NULLing rolled-up columns, appending
$groupingId), then an ordinaryGROUP BY over
[union keys…, $groupingId]runs — no grouping-set-specific aggregation logic.Plan split (
PinotAggregateExchangeNodeInsertRule):LEAF → EXCHANGE(hash union+$groupingId) → FINAL(group by union+$groupingId) → PROJECT. The PROJECT computesGROUPING()/GROUPING_ID()from$groupingId(bit extraction), mirroring the single-stage post-aggregation handler, and drops$groupingId.GROUPING/GROUPING_IDare registered inPinotOperatorTable, and explicitGROUP BY GROUPING SETS ((a, b), …)tuple syntax is accepted by the validator.Tests
GroupingSetsQueriesTest— grouping-set aggregation,GROUPING()/GROUPING_ID()(SELECT and HAVING),filtered aggregation, composite/nested sets, explicit GROUPING SETS syntax, and grouping sets over a JOIN —
run against both engines (the order-independent cases via the
useBothQueryEnginesdata provider), plus thesingle-stage genuine-vs-rolled-up-NULL discrimination cases. All green.
Known limitations (rejected with a clear error → never wrong results)
usePhysicalOptimizer=true, opt-in, default off) still rejects grouping sets; thedefault planner is fully supported. (Follow-up.)
$groupingIdbitmask is a 32-bit int) — same cap as thesingle-stage engine.
ORDER BY … LIMITpushdown) is not pushed for grouping-set queries; results are still correct(the broker applies the final
ORDER BY+LIMIT).