eventservice: optimize scanwindow#4950
Conversation
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
|
Skipping CI for Draft Pull Request. |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughReplaces the prior trend-based scan-interval logic with an adaptive scan-window controller: changefeed status now holds a controller and band-state, memory-usage updates are normalized and reported to the controller, the controller computes EMAs/pressure scores and decides interval changes (with cooldowns, emergency braking, and floor/recovery rules), and new Prometheus metrics and broker metric lifecycle helpers were added alongside expanded tests. ChangesAdaptive Scan Window Controller
🎯 4 (Complex) | ⏱️ ~45 minutes Sequence Diagram(s)sequenceDiagram
participant Client as UpdateMemoryUsage
participant Controller as AdaptiveScanWindowController
participant Metrics as EventServiceMetrics
participant Status as ChangefeedStatus
Client->>Controller: OnCongestionReport(normalizedUsage, releaseCount)
Controller-->>Controller: update EMAs, update usage window, compute pressure score
Controller->>Metrics: emit usage/EMA/pressure/target-band/adjust metrics
Controller->>Status: decision(newScanInterval, reason)
Status->>Status: persist/apply scanInterval if changed
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request replaces the existing scan interval adjustment logic with a new adaptiveScanWindowController that utilizes Exponential Moving Averages (EMAs) and a pressure score for more stable memory pressure management. The update includes comprehensive simulation tests and enhanced Prometheus metrics for monitoring controller decisions. Review feedback highlights a non-monotonic discontinuity in the emergency brake calculation, potential over-throttling caused by latching peak usage values, the presence of magic numbers, and the use of a redundant maxFloat64 helper that should be replaced by the built-in max function.
Signed-off-by: dongmen <414110582@qq.com>
There was a problem hiding this comment.
🧹 Nitpick comments (4)
pkg/eventservice/scan_window.go (4)
425-549: 💤 Low valueConsider factoring the repeated
scanWindowDecision{...}literal.
OnCongestionReportreturns the samescanWindowDecisionshape from six branches, each repeatingusage,fastUsageEMA,slowUsageEMA,pressureScore. A small builder method on the controller would let the function body focus on policy and reduce the surface for accidental field drift if a new metric/field is added later.♻️ Sketch
func (c *adaptiveScanWindowController) makeDecisionLocked( newInterval, maxInterval time.Duration, reason scanWindowDecisionReason, usage memoryUsageStats, ) scanWindowDecision { return scanWindowDecision{ newInterval: newInterval, maxInterval: maxInterval, reason: reason, usage: usage, fastUsageEMA: c.fastUsageEMA, slowUsageEMA: c.slowUsageEMA, pressureScore: c.pressureScore, } }Each branch then becomes
return c.makeDecisionLocked(newInterval, maxInterval, reason, usage).🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/eventservice/scan_window.go` around lines 425 - 549, The OnCongestionReport function repeats identical scanWindowDecision literals across multiple return sites; add a helper method on adaptiveScanWindowController (e.g., makeDecisionLocked(newInterval time.Duration, maxInterval time.Duration, reason scanWindowDecisionReason, usage memoryUsageStats) scanWindowDecision) that constructs and returns the scanWindowDecision using c.fastUsageEMA, c.slowUsageEMA, c.pressureScore and the supplied usage, newInterval, maxInterval, reason; then replace each repeated literal in OnCongestionReport with calls to c.makeDecisionLocked(...) (keep current names: OnCongestionReport, scanWindowDecision, usage, fastUsageEMA, slowUsageEMA, pressureScore).
388-406: ⚡ Quick winRace on band state can double-count target-band crossings.
observeScanWindowTargetBandMetricsreadsstate.Load(), compares tocurrentState, thenstate.Store(...). BecausehandleCongestionControlcan be invoked concurrently from differentfromnodes for the same changefeed (each call iterateschangefeedMap.Range), two goroutines can both observe the samepreviousState, both incrementEventServiceScanWindowTargetBandCrossCount, and both store — over-counting transitions for the same actual crossing.Use
Swapto read-and-replace atomically so only one caller observes each prior state:🔒 Proposed diff
- previousState := scanWindowBandState(state.Load()) - if previousState != scanWindowBandUnknown && previousState != currentState { - metrics.EventServiceScanWindowTargetBandCrossCount.WithLabelValues(changefeed, metricType).Inc() - } - state.Store(int32(currentState)) + previousState := scanWindowBandState(state.Swap(int32(currentState))) + if previousState != scanWindowBandUnknown && previousState != currentState { + metrics.EventServiceScanWindowTargetBandCrossCount.WithLabelValues(changefeed, metricType).Inc() + }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/eventservice/scan_window.go` around lines 388 - 406, The observeScanWindowTargetBandMetrics function currently uses state.Load() and state.Store(), which allows concurrent callers (e.g., handleCongestionControl) to both see the same previous state and double-count transitions; replace the Load/Store pair with an atomic swap so the read-and-replace is atomic: call state.Swap(int32(currentState)) (convert the returned int32 to scanWindowBandState) to obtain the previousState, then, if previousState != scanWindowBandUnknown && previousState != currentState, increment EventServiceScanWindowTargetBandCrossCount; keep the existing gauge Set(1)/Set(0) behavior and only change how previousState is read/stored.
321-362: 💤 Low valueMinor:
deleteScanWindowMetricsalso clearsEventServiceAvailableMemoryQuotaGaugeVec.That metric is not part of the scan-window family (registered separately, set in
handleCongestionControl), but its lifecycle is naturally tied to the changefeed. Functionally fine — just be aware the helper name slightly under-promises what it deletes. If you keep this coupling, consider a short comment explaining why the available-memory-quota label is wiped here.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/eventservice/scan_window.go` around lines 321 - 362, deleteScanWindowMetrics currently deletes EventServiceAvailableMemoryQuotaGaugeVec even though that metric is not part of the scan-window family; either move that deletion to the metric's owner (e.g. where handleCongestionControl manages the metric) or keep it here but add a short explanatory comment. Update the function deleteScanWindowMetrics to either remove the line metrics.EventServiceAvailableMemoryQuotaGaugeVec.DeleteLabelValues(changefeed) and place it in the lifecycle code that sets/clears available memory quota, or add a one-line comment above that DeleteLabelValues call explaining why the available-memory-quota label is cleared here (tie to changefeed lifecycle), referencing deleteScanWindowMetrics and handleCongestionControl so future readers can find the rationale.
796-801: ⚡ Quick winDrop
maxFloat64in favor of Go's built-inmax.This file already uses the Go 1.21+ built-in
min/maxforfloat64elsewhere (e.g.,min(c.pressureScore+2, scanWindowPressureScoreCeiling)at line 673), somaxFloat64is inconsistent and unnecessary. Replacing all three call sites with the builtin removes one ad-hoc helper and matches the surrounding style.♻️ Proposed diff
-func maxFloat64(a float64, b float64) float64 { - if a > b { - return a - } - return b -}Then at the call sites (lines 679, 681, 687):
- c.pressureScore = maxFloat64(0, c.pressureScore-1.5) + c.pressureScore = max(0, c.pressureScore-1.5) ... - c.pressureScore = maxFloat64(0, c.pressureScore-0.5) + c.pressureScore = max(0, c.pressureScore-0.5) ... - c.pressureScore = maxFloat64(0, c.pressureScore-relief) + c.pressureScore = max(0, c.pressureScore-relief)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/eventservice/scan_window.go` around lines 796 - 801, Remove the ad-hoc helper maxFloat64 and replace its call sites with the Go 1.21 built-in max: find all uses of maxFloat64(a, b) and change them to max(a, b), then delete the maxFloat64 function definition; no extra imports are needed—just remove the function maxFloat64 and update callers to use max directly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@pkg/eventservice/scan_window.go`:
- Around line 425-549: The OnCongestionReport function repeats identical
scanWindowDecision literals across multiple return sites; add a helper method on
adaptiveScanWindowController (e.g., makeDecisionLocked(newInterval
time.Duration, maxInterval time.Duration, reason scanWindowDecisionReason, usage
memoryUsageStats) scanWindowDecision) that constructs and returns the
scanWindowDecision using c.fastUsageEMA, c.slowUsageEMA, c.pressureScore and the
supplied usage, newInterval, maxInterval, reason; then replace each repeated
literal in OnCongestionReport with calls to c.makeDecisionLocked(...) (keep
current names: OnCongestionReport, scanWindowDecision, usage, fastUsageEMA,
slowUsageEMA, pressureScore).
- Around line 388-406: The observeScanWindowTargetBandMetrics function currently
uses state.Load() and state.Store(), which allows concurrent callers (e.g.,
handleCongestionControl) to both see the same previous state and double-count
transitions; replace the Load/Store pair with an atomic swap so the
read-and-replace is atomic: call state.Swap(int32(currentState)) (convert the
returned int32 to scanWindowBandState) to obtain the previousState, then, if
previousState != scanWindowBandUnknown && previousState != currentState,
increment EventServiceScanWindowTargetBandCrossCount; keep the existing gauge
Set(1)/Set(0) behavior and only change how previousState is read/stored.
- Around line 321-362: deleteScanWindowMetrics currently deletes
EventServiceAvailableMemoryQuotaGaugeVec even though that metric is not part of
the scan-window family; either move that deletion to the metric's owner (e.g.
where handleCongestionControl manages the metric) or keep it here but add a
short explanatory comment. Update the function deleteScanWindowMetrics to either
remove the line
metrics.EventServiceAvailableMemoryQuotaGaugeVec.DeleteLabelValues(changefeed)
and place it in the lifecycle code that sets/clears available memory quota, or
add a one-line comment above that DeleteLabelValues call explaining why the
available-memory-quota label is cleared here (tie to changefeed lifecycle),
referencing deleteScanWindowMetrics and handleCongestionControl so future
readers can find the rationale.
- Around line 796-801: Remove the ad-hoc helper maxFloat64 and replace its call
sites with the Go 1.21 built-in max: find all uses of maxFloat64(a, b) and
change them to max(a, b), then delete the maxFloat64 function definition; no
extra imports are needed—just remove the function maxFloat64 and update callers
to use max directly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 4190f76c-a953-4e4d-887a-59bfaeef4a9c
📒 Files selected for processing (6)
pkg/eventservice/dispatcher_stat.gopkg/eventservice/event_broker.gopkg/eventservice/event_broker_test.gopkg/eventservice/scan_window.gopkg/eventservice/scan_window_test.gopkg/metrics/event_service.go
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
|
/test all |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
pkg/eventservice/scan_window_test.go (1)
179-181: ⚡ Quick winReplace the hardcoded
30with a duration-derived boundLine 179 bakes in a window-size assumption. Deriving the loop count from
memoryUsageWindowDurationmakes this test resilient to future constant tuning.Suggested change
- for i := 0; i <= 30; i++ { + for i := 0; i <= int(memoryUsageWindowDuration/time.Second); i++ { status.updateMemoryUsage(start.Add(time.Duration(i)*time.Second), 1, 0) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/eventservice/scan_window_test.go` around lines 179 - 181, The test hardcodes 30 iterations when calling status.updateMemoryUsage, which couples it to a specific window size; change the loop bound to derive from memoryUsageWindowDuration (e.g., compute n := int(memoryUsageWindowDuration / time.Second) and use i := 0; i <= n; i++) so the test scales with the actual memoryUsageWindowDuration constant; update the loop surrounding status.updateMemoryUsage to use that computed n instead of the literal 30.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@pkg/eventservice/scan_window_test.go`:
- Around line 74-75: Replace hardcoded changefeed IDs created by
newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), ...) with a
unique per-test ID using t.Name() (e.g.,
newChangefeedStatus(common.NewChangefeedID4Test("default", t.Name()), ...)) for
all occurrences (including the instances around lines 89-90, 134-135, 157-158,
166-167, 175-176, 189-190) so updateMemoryUsage emits metrics keyed to a
test-unique label; also make the tests deterministic by using testify/require
assertions where applicable instead of non-deterministic checks.
---
Nitpick comments:
In `@pkg/eventservice/scan_window_test.go`:
- Around line 179-181: The test hardcodes 30 iterations when calling
status.updateMemoryUsage, which couples it to a specific window size; change the
loop bound to derive from memoryUsageWindowDuration (e.g., compute n :=
int(memoryUsageWindowDuration / time.Second) and use i := 0; i <= n; i++) so the
test scales with the actual memoryUsageWindowDuration constant; update the loop
surrounding status.updateMemoryUsage to use that computed n instead of the
literal 30.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: de3ec83f-f0f9-438c-87dc-c06631609797
📒 Files selected for processing (2)
pkg/eventservice/scan_window.gopkg/eventservice/scan_window_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/eventservice/scan_window.go
Signed-off-by: dongmen <414110582@qq.com>
There was a problem hiding this comment.
♻️ Duplicate comments (1)
pkg/eventservice/scan_window_test.go (1)
40-40: 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick winUse unique changefeed IDs in parallel tests to avoid shared metric state.
This issue was raised in a previous review and remains unresolved for most tests. While the metric tests at lines 209, 233, and 250 correctly use
t.Name()for isolation, tests at lines 40, 55, 70, 82, 91, 100, 112, 121, 144, 153, 162, 171, 185, 200, 272, 291, 306, 321, 334, 394, and 412 still use hardcoded"default","test". SinceupdateMemoryUsageemits global metrics keyed by changefeed label, this creates cross-test metric pollution when tests run in parallel.Suggested change for all affected tests
- status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), ...) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", t.Name()), ...)Apply this pattern consistently across all parallel tests that call
updateMemoryUsage.As per coding guidelines, "Prefer focused deterministic tests" and the metric tests at lines 208-267 demonstrate the correct pattern with
t.Name()and cleanup.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/eventservice/scan_window_test.go` at line 40, Replace hardcoded changefeed IDs created via common.NewChangefeedID4Test("default","test") with unique IDs using the test name (e.g., common.NewChangefeedID4Test("default", t.Name())) wherever newChangefeedStatus is used in these tests so metrics emitted by updateMemoryUsage are isolated; update each test (including the ones at lines noted) to construct the changefeed ID from t.Name() and, where other metric tests do, ensure any metric cleanup/reset used in the correct tests is applied consistently after the test to avoid cross-test pollution.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Duplicate comments:
In `@pkg/eventservice/scan_window_test.go`:
- Line 40: Replace hardcoded changefeed IDs created via
common.NewChangefeedID4Test("default","test") with unique IDs using the test
name (e.g., common.NewChangefeedID4Test("default", t.Name())) wherever
newChangefeedStatus is used in these tests so metrics emitted by
updateMemoryUsage are isolated; update each test (including the ones at lines
noted) to construct the changefeed ID from t.Name() and, where other metric
tests do, ensure any metric cleanup/reset used in the correct tests is applied
consistently after the test to avoid cross-test pollution.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 031c4e03-61d0-4370-88ad-5715e9a1a006
📒 Files selected for processing (3)
pkg/eventservice/event_broker_test.gopkg/eventservice/scan_window.gopkg/eventservice/scan_window_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
- pkg/eventservice/scan_window.go
- pkg/eventservice/event_broker_test.go
Signed-off-by: dongmen <414110582@qq.com>
|
/test all |
Signed-off-by: dongmen <414110582@qq.com>
Code reviewFound 3 issues:
🤖 Generated with Claude Code - If this code review was useful, please react with 👍. Otherwise, react with 👎. |
|
/retest |
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: 3AceShowHand The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
Signed-off-by: dongmen <414110582@qq.com>
What problem does this PR solve?
Issue Number: close #5041
The old scan window controller overreacted to memory feedback. A release pulse could reset the window, then later reports would shrink it again. Because it is a per-changefeed commit-ts span cap rather than a timer, this created sawtooth batching. Stale dispatchers could also pin the base ts and delay tables blocked by pending DDL.
What is changed and how it works?
This PR replaces the old policy with an adaptive controller while keeping scans event-driven. It combines a sliding usage window, EMAs, a pressure score, and cooldowns. High or critical pressure reduces the window in bounded steps. Low pressure recovers gradually and can leave the default floor faster. Release signals now relieve pressure instead of resetting the interval. The broker also skips stale dispatchers in
minSentTsrefresh and allows local advance for pending DDL when the global window is pinned.This smooths scan progress, reduces reset storms, and preserves DDL forward progress.
Before

After

Check List
Tests
Questions
Will it cause performance regression or break compatibility?
No compatibility break is expected. This changes only scan window control behavior and should stabilize throughput and memory pressure.
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
New Features
Improvements
Bug Fixes
Tests