[WIP] log puller optimization#5498
Conversation
|
Skipping CI for Draft Pull Request. |
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request refactors the region request scheduling and flow control in the logpuller service by introducing a state-tracked requestCache, a non-blocking controlQueue for deregistration, and a deferred task scheduling mechanism in requestedStore to prevent overloading busy stores. The review feedback highlights several critical issues, including potential memory leaks in requestCache due to stale pointers not being set to nil during slice clearing and compaction, thread-safety data races from lazy initialization of controlQueue, and a resource leak where promoted store tasks are not properly finished when a region is stopped.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| c.ready = c.ready[:0] | ||
| c.readyIdx = 0 |
There was a problem hiding this comment.
In clear(), slicing c.ready to [:0] without setting the elements to nil first will leak memory because the underlying array of the slice still holds references to the *regionReq pointers. We should set the elements to nil before slicing to allow them to be garbage collected.
for i := range c.ready {
c.ready[i] = nil
}
c.ready = c.ready[:0]
c.readyIdx = 0| if removed > 0 { | ||
| c.compactReadyLocked() | ||
| } |
There was a problem hiding this comment.
In takeUnsentRegions(), since all queued requests are removed from c.requests, any remaining elements in c.ready are now invalid (lazily deleted). Calling c.compactReadyLocked() might return early and do nothing if c.readyIdx is small, leaving stale *regionReq pointers in the slice and causing a memory leak. We should explicitly clear c.ready and set its elements to nil.
if removed > 0 {
for i := range c.ready {
c.ready[i] = nil
}
c.ready = c.ready[:0]
c.readyIdx = 0
}| if s.controlQueue == nil { | ||
| s.controlQueue = newControlQueue() | ||
| } |
There was a problem hiding this comment.
The lazy initialization of s.controlQueue is not thread-safe and introduces a data race because Unsubscribe (which calls enqueueDeregisterToAllStores) can be called concurrently from any goroutine. Since controlQueue is already guaranteed to be initialized in newRegionRequestWorker and in all relevant tests, this lazy initialization check is unnecessary and should be removed.
| if worker.controlQueue == nil { | ||
| worker.controlQueue = newControlQueue() | ||
| } |
| region := regionTask.GetRegionInfo() | ||
| if region.isStopped() { | ||
| enqueued, err := s.enqueueRegionToAllStores(ctx, region) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if !enqueued { | ||
| log.Debug("enqueue stop request failed, retry later", | ||
| zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID))) | ||
| s.regionTaskQueue.Push(regionTask) | ||
| } | ||
| s.enqueueDeregisterToAllStores(region.subscribedSpan.subID, region.filterLoop) | ||
| continue | ||
| } |
There was a problem hiding this comment.
If region.isStopped() is true, the loop continues immediately. However, if this task was previously deferred and promoted, promotedStore will be leaked (never finished). We should ensure that the promoted task is finished and another task is promoted before continuing.
| region := regionTask.GetRegionInfo() | |
| if region.isStopped() { | |
| enqueued, err := s.enqueueRegionToAllStores(ctx, region) | |
| if err != nil { | |
| return err | |
| } | |
| if !enqueued { | |
| log.Debug("enqueue stop request failed, retry later", | |
| zap.Uint64("subscriptionID", uint64(region.subscribedSpan.subID))) | |
| s.regionTaskQueue.Push(regionTask) | |
| } | |
| s.enqueueDeregisterToAllStores(region.subscribedSpan.subID, region.filterLoop) | |
| continue | |
| } | |
| region := regionTask.GetRegionInfo() | |
| if region.isStopped() { | |
| if promotedStore := regionTask.deferredStore.Load(); promotedStore != nil { | |
| promotedStore.finishPromotedTask(regionTask) | |
| promotedStore.promoteDeferredTask(s.regionTaskQueue) | |
| } | |
| s.enqueueDeregisterToAllStores(region.subscribedSpan.subID, region.filterLoop) | |
| continue | |
| } |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note