Add broker statistics foundation and cost-based join reordering for the multi-stage engine#18741
Add broker statistics foundation and cost-based join reordering for the multi-stage engine#18741gortiz wants to merge 17 commits into
Conversation
Planner-facing read API (PinotStatisticsProvider, TableStatistics, ColumnStatistics, StatConfidence) in pinot-query-planner-spi, and broker-side persistence/source contracts (StatsStore, ColumnStatsSource, segment stat row values) in pinot-broker. Contracts only; no implementations except NoOpStatisticsProvider.
SqliteStatsStore persists per-segment stats off-heap (WAL mode, prepared statements, Flyway-migrated schema). Aggregated table/column reads, time- range row estimation with interpolation, crc-based reconciliation support, corruption auto-recovery, purge. 18 unit tests.
BrokerTableStatsManager owns the SQLite StatsStore and produces per-table SegmentZkMetadataFetchListeners that persist segment totalDocs/size/time bounds on init/onAssignmentChange/refreshSegment, with crc-based restart reconciliation. Gated by pinot.broker.stats.enabled (default false); stats failures never propagate to routing or query handling.
LogicalTableStatsResolver merges hybrid OFFLINE+REALTIME stats at the broker time boundary (no overlap double-count), marks upsert/dedup row counts LOW confidence (physical docs over-count logical rows), and downgrades realtime tables with consuming segments to ESTIMATED. Raw table names resolve the merged logical view; suffixed names the physical one.
PinotStatisticsProvider threads from BaseBrokerStarter through MultiStageBrokerRequestHandler into QueryEnvironment.Config, PinotCatalog and PinotTable. PinotTable.getStatistic() surfaces row counts for EXACT/ESTIMATED confidence only; LOW/UNKNOWN behave as before (unknown). Defaults to NoOpStatisticsProvider: zero behavior change when disabled.
PinotRelMdSelectivity estimates time-predicate selectivity from segment time boundaries via estimateRowsInTimeRange (range, equality and SEARCH/ Sarg forms, with inclusive-to-half-open bound conversion), and equality selectivity from column NDV when available. The primary time column is resolved from the table config (segment boundaries are organized by it) and only used when its unit is epoch millis. Registered on the planner cluster as a stateless singleton chained before Calcite defaults; NoOp stats fall back to previous behavior.
PinotRelOptCost orders costs lexicographically by (rows, cpu, io) — unlike Calcite's VolcanoCost which compares rows only — giving a deterministic total order where cpu/io break row-count ties. Includes factory with standard ZERO/TINY/HUGE/INFINITY constants.
JoinReorderOptimizer runs JOIN_TO_MULTI_JOIN + MULTI_JOIN_OPTIMIZE (with project/filter merge) driven by statistics-backed RelMetadataQuery row counts and the rows-dominated cost factory, between the logical Hep program and the trait planner. Gated by the useJoinReorder query option / pinot.broker.multistage.use.join.reorder (default false). Skips unless all joins are INNER, un-hinted, and every scan has a known row count; any failure falls back to the un-reordered plan. The facade isolates the reorder strategy so a scoped VolcanoPlanner can replace the internals.
Max-join-count cap (joinReorderMaxJoins query option / pinot.broker.multistage.join.reorder.max.joins, default 10) skips the reorder phase for oversized join trees; slow reorders log a WARN canary with the join count; skip decisions log a DEBUG reason (NO_JOINS, NON_INNER_JOIN, HINTED_JOIN, UNKNOWN_ROW_COUNT, TOO_MANY_JOINS, ERROR). Plan-level tests cover cap boundary/exceeded, EXPLAIN diff enabled vs disabled, and per-query option plumbing.
- Read schema from TableCache instead of ZooKeeper in the time-boundary provider (no synchronous ZK reads on the planning path) - Honor Calcite's Filter selectivity contract: subtract the filter's own condition from passed predicates (no double counting) and delegate to the input for unresolvable shapes - Memoize PinotTable.getStatistic() (Calcite calls it repeatedly per planning) - Clamp unknown (-1) segment sizes out of table size sums - Document that enabling broker stats changes cardinality estimates for all MSE queries; correct the stats-dir default docs - Catch Exception|StackOverflowError (not Throwable) in the reorder phase - Add LICENSE-binary entries for sqlite-jdbc and flyway-core; justify the Flyway 8.x pin - Drop unrelated local files accidentally committed; prefer imports over inline FQCNs
All new classes use /// (JEP 467) doc comments; branch-added comments in files already using /// (QueryEnvironment, CommonConstants, broker starter/routing/handler) converted to match.
MultistageEngineQuickStart.getConfigOverrides() dropped the base implementation's config-file overrides, so -configFile was silently ignored by every quickstart extending it (TPCH, COLOCATED_JOIN, ...). Merge them, with config-file values taking precedence.
The option already declared arity 1..* but bootstrapOfflineTableDirectories only consumed the first directory (and getBootstrapDataDir() returns null for more than one, leading to an NPE). Bootstrap every provided directory.
Apply the super.getConfigOverrides() merge to RealtimeQuickStart and TimeSeriesEngineQuickStart (same dropped -configFile defect), add regression tests for the configFile merge and multi-directory bootstrap, remove the dead no-arg getTableName() (NPEs with multiple bootstrap dirs), and fix a dangling doc-comment reference.
bd19480 to
88286e4
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18741 +/- ##
===========================================
Coverage 64.82% 64.83%
+ Complexity 1319 1318 -1
===========================================
Files 3388 3404 +16
Lines 210228 211524 +1296
Branches 32948 33156 +208
===========================================
+ Hits 136282 137139 +857
- Misses 62978 63308 +330
- Partials 10968 11077 +109
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
- Avoid the banned com.google.common.collect.ImmutableList import: build PinotDefaultRelMetadataProvider via the ChainedRelMetadataProvider.of static factory instead of subclassing (whose protected ctor required ImmutableList). - Wrap two Javadoc lines exceeding 120 chars in QueryEnvironment. These passed locally before due to the Develocity build cache; verified with the cache disabled.
Rename testNoConsumingSegment_confidenceExact -> testNoConsumingSegment ConfidenceExact: underscores are disallowed by the MethodName rule (^[a-z][a-zA-Z0-9]*$), which checkstyle enforces on test sources too. Verified the full lint (checkstyle + spotless + license + rat) with the pinot-fastdev profile disabled, which is what masked these locally.
…oundation # Conflicts: # pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
|
This is exciting! I haven't looked through the code yet, but some higher-level thoughts
|
Contributes to #18740 (umbrella: cost-based optimization for the multi-stage query engine).
This PR implements Phase 1 of the umbrella issue: the broker statistics foundation and a
gated, cost-based join-reorder phase for the multi-stage engine. Everything is off by default
and broker-local (no wire-format or mixed-version impact). The commits are structured so the PR
can be split into the stacked PRs listed in the umbrella issue if reviewers prefer.
What's included
Statistics foundation
PinotStatisticsProvider,TableStatistics,ColumnStatistics,StatConfidence(planner SPI) andStatsStore/ColumnStatsSource(broker SPIs).StatsStore: off-heap persistence (WAL, prepared statements, Flyway-migratedschema), crc-based restart reconciliation, corruption auto-recovery, purge.
(
pinot.broker.stats.enabled, default false): per-segment row count, size and time boundsvia a
SegmentZkMetadataFetchListener, with no extra ZK reads on the query path.broker time boundary (no double counting), upsert/dedup row counts marked LOW confidence
(physical docs over-count logical rows), consuming segments downgrade confidence. The planner
treats LOW/UNKNOWN confidence as "no stats".
Planner integration
PinotTable#getStatistic()(memoized) exposes row counts to Calcite; statistics provider isthreaded through
QueryEnvironment.Config(default no-op ⇒ zero behavior change).RelMetadataProviderwith stats-backed row counts and selectivity, includingtime-range selectivity computed from segment time boundaries (the primary time column from the
table config, applied only when its unit is epoch millis).
RelOptCostimplementation with a deterministic total order.Cost-based join reordering (
useJoinReorderquery option, default false)JOIN_TO_MULTI_JOIN+MULTI_JOIN_OPTIMIZE(with project/filter merge) off thestatistics-backed row counts.
configurable join-count cap (
joinReorderMaxJoins, default 10), and any failure falls back tothe un-reordered plan (a reorder problem must never fail a query).
Independent fixes found along the way (can be extracted to a separate PR on request)
-configFilewas silently dropped by quickstarts overridinggetConfigOverrides()(
MultistageEngineQuickStart,RealtimeQuickStart,TimeSeriesEngineQuickStart).-bootstrapTableDirdirectories were not bootstrapped despite the option declaringarity
1..*.Measured impact
TPC-H SF=1 (quickstart, 150 samples per variant, median with bootstrap 95% CI), queries written
in a poor syntactic join order:
useJoinReordercustomer⋈orders⋈lineitem(filtered)region⋈nation⋈supplier⋈lineitem(filtered)The second query's reordered plan (reduce the dimension chain first, single probe over the fact
table) beats even the hand-optimized left-deep SQL by 2.6×. Results are identical across all
variants; with the option disabled, plans are byte-identical to master (verified against the
full plan-resource test suite).
Testing
semantics), table-type stat semantics, metadata provider/selectivity (incl. bound-conversion
regression tests), cost ordering, and plan-level join-reorder tests (EXPLAIN diffs, hint veto,
join-cap fallback, option plumbing).
pinot-query-plannermodule green (1339 tests), including the 574 resource-based plantests confirming default-off changes nothing.
Notes for reviewers
pinot.broker.stats.enabledchanges cardinality estimates visible to all MSEplanner rules (documented on the config key); it never affects correctness. The join-reorder
phase is additionally gated by its own query option.
org.xerial:sqlite-jdbc,org.flywaydb:flyway-core(8.x — the lastline with built-in SQLite support).
LICENSE-binaryupdated.StatsStoreis an interface; SQLite is the default implementation and the storage engine isswappable.