fix: allow using in select#17
Conversation
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughIntroduces asynchronous min extraction to BinHeap by adding Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Client
participant BH as BinHeap
participant BG as extractMin() goroutine
participant CH as minCh (chan T)
rect rgb(240,248,255)
note over Client,BH: Creation & initialization
Client->>BH: NewBinHeap(...)
BH->>BH: make(minCh), init stopCtx/stopCf
BH-->>BG: start extractMin() goroutine
BH-->>Client: ExtractMinCh() returns receive-only CH
end
rect rgb(245,255,240)
note over BG,CH: Background extraction loop
BG->>BG: wait for heap state / removals
BG->>BG: remove min, update exists map
alt not stopped
BG-->>CH: non-blocking send min item
else stopped
BG-->>BH: exit goroutine
end
end
rect rgb(255,250,240)
note over Client,CH: Consumption
Client->>CH: receive (select { case x := <-CH: ... })
Client-->>Client: process min item
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull Request Overview
This PR modifies the binary heap implementation to support non-blocking extraction operations in select statements by introducing a channel-based approach. Instead of the blocking ExtractMin() method, it adds ExtractMinCh() that returns a channel for receiving minimum items.
- Replaced blocking
ExtractMin()calls with channel-based extraction usingExtractMinCh() - Added a background goroutine that continuously extracts items and sends them to a buffered channel
- Updated all test cases to use the new channel-based extraction pattern
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| binary_heap.go | Added channel field, background goroutine, and ExtractMinCh() method to enable non-blocking extraction |
| binary_heap_test.go | Updated all test cases to use the new channel-based extraction pattern instead of direct ExtractMin() calls |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| len: 0, | ||
| maxLen: maxLen, | ||
| cond: sync.Cond{L: &sync.Mutex{}}, | ||
| minCh: make(chan T, 5), |
There was a problem hiding this comment.
[nitpick] The buffer size of 5 appears to be a magic number. Consider defining this as a named constant to improve maintainability and make the reasoning behind this specific buffer size clear.
| cond: sync.Cond{L: &sync.Mutex{}}, | ||
| minCh: make(chan T, 5), | ||
| } | ||
| go bh.extractMin() |
There was a problem hiding this comment.
The goroutine started in the constructor has no mechanism to be stopped, which could lead to goroutine leaks. Consider adding a context or stop channel to allow proper cleanup of this background goroutine.
There was a problem hiding this comment.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
binary_heap.go(2 hunks)binary_heap_test.go(7 hunks)
🧰 Additional context used
🪛 GitHub Actions: tests
binary_heap_test.go
[error] 250-250: TestItemPeek failed: Not equal: expected: 5 actual: 6
[error] 377-377: TestExists failed: Should be true
| func (bh *BinHeap[T]) extractMin() { | ||
| for { | ||
| bh.cond.L.Lock() | ||
|
|
||
| bh.swap(0, bh.len-1) | ||
| // if len == 0, wait for the signal | ||
| for bh.Len() == 0 { | ||
| bh.cond.Wait() | ||
| } | ||
|
|
||
| item := (bh.items)[int(bh.len)-1] //nolint:gosec | ||
| bh.items = (bh).items[0 : int(bh.len)-1] //nolint:gosec | ||
| bh.fixDown(0, int(bh.len-2)) //nolint:gosec | ||
| bh.swap(0, bh.len-1) | ||
|
|
||
| // reduce len | ||
| atomic.AddUint64(&bh.len, ^uint64(0)) | ||
| item := (bh.items)[int(bh.len)-1] //nolint:gosec | ||
| bh.items = (bh).items[0 : int(bh.len)-1] //nolint:gosec | ||
| bh.fixDown(0, int(bh.len-2)) //nolint:gosec | ||
|
|
||
| // remove item | ||
| delete(bh.exists, item.ID()) | ||
| // reduce len | ||
| atomic.AddUint64(&bh.len, ^uint64(0)) | ||
|
|
||
| bh.cond.L.Unlock() | ||
| return item | ||
| // remove item | ||
| delete(bh.exists, item.ID()) | ||
| bh.cond.L.Unlock() | ||
| // send item to the channel | ||
| bh.minCh <- item | ||
| } |
There was a problem hiding this comment.
Stop draining the heap without a waiting receiver.
extractMin keeps popping items and pushing them into minCh even when nobody is receiving. That drains the heap ahead of consumers, so invariants like PeekPriority and Exists observe already-removed elements. The failing pipeline tests (TestItemPeek expecting 5 but seeing 6, and TestExists) demonstrate the breakage. We need to couple the removal with an actual receiver—e.g. block on the send before mutating the heap, or gate the pop behind an explicit request path—so the heap state only advances when a consumer is ready.
🤖 Prompt for AI Agents
In binary_heap.go around lines 192 to 215, extractMin currently removes items
and then sends them to minCh, which lets the heap be drained ahead of receivers;
instead, change the sequence so you peek the minimum item first and perform the
blocking send to bh.minCh while still holding bh.cond.L, and only after the send
succeeds mutate the heap (swap/remove/fixDown/atomic len decrement/delete from
exists) and then unlock; this couples removal with an actual receiver and
prevents advancing heap state before a consumer is ready.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
binary_heap_test.go (2)
82-82: Remove redundantPriority()call.Line 82 calls
item.Priority()but discards the result. The next line (83) calls it again for the actual append operation, making this call unnecessary.Apply this diff:
for range 11 { item := <-itemCh - item.Priority() res = append(res, item.Priority()) }
296-297: Simplify unused receive.The extracted item is immediately discarded. Consider using the blank identifier directly in the receive operation.
Apply this diff:
defer wg.Done() for range 11 { - min := <-bh.ExtractMinCh() - _ = min + _ = <-bh.ExtractMinCh() }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
binary_heap_test.go(7 hunks)
🔇 Additional comments (3)
binary_heap_test.go (3)
106-115: LGTM!The channel-based extraction properly tests concurrent insert/extract behavior with a constrained heap size.
161-169: LGTM!The select statement properly enables cancellation and demonstrates the non-blocking extraction pattern that is the goal of this PR.
346-349: LGTM!Channel-based extraction correctly validates the remaining heap contents after removal.
| itemCh := bh.ExtractMinCh() | ||
| item := <-itemCh | ||
| assert.Equal(t, int64(5), item.Priority()) | ||
| assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds()) |
There was a problem hiding this comment.
Fix inverted timing assertion.
The assertion logic is inverted. Line 221 checks that 5 >= elapsed_time, but the comment on line 208 and test intent require that the elapsed time be at least 5 seconds.
Apply this diff:
tn := time.Now()
itemCh := bh.ExtractMinCh()
item := <-itemCh
assert.Equal(t, int64(5), item.Priority())
- assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds())
+ assert.GreaterOrEqual(t, time.Since(tn).Seconds(), float64(5))📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| itemCh := bh.ExtractMinCh() | |
| item := <-itemCh | |
| assert.Equal(t, int64(5), item.Priority()) | |
| assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds()) | |
| tn := time.Now() | |
| itemCh := bh.ExtractMinCh() | |
| item := <-itemCh | |
| assert.Equal(t, int64(5), item.Priority()) | |
| assert.GreaterOrEqual(t, time.Since(tn).Seconds(), float64(5)) |
🤖 Prompt for AI Agents
In binary_heap_test.go around lines 218 to 221, the timing assertion is
inverted: it currently asserts 5 >= elapsed, but the test expects elapsed >= 5
seconds. Replace the assertion so it checks time.Since(tn).Seconds() is greater
than or equal to 5 (e.g., call assert.GreaterOrEqual(t,
time.Since(tn).Seconds(), float64(5))) to ensure the elapsed time is at least 5
seconds.
| item := <-bh.ExtractMinCh() | ||
| assert.Equal(t, int64(5), item.Priority()) | ||
| assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds()) |
There was a problem hiding this comment.
Fix inverted timing assertion.
The assertion has the same inversion issue as TestNewItemWithTimeout. Line 255 checks that 5 >= elapsed_time, but should verify that elapsed time is at least 5 seconds.
Apply this diff:
tn := time.Now()
item := <-bh.ExtractMinCh()
assert.Equal(t, int64(5), item.Priority())
- assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds())
+ assert.GreaterOrEqual(t, time.Since(tn).Seconds(), float64(5))📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| item := <-bh.ExtractMinCh() | |
| assert.Equal(t, int64(5), item.Priority()) | |
| assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds()) | |
| tn := time.Now() | |
| item := <-bh.ExtractMinCh() | |
| assert.Equal(t, int64(5), item.Priority()) | |
| assert.GreaterOrEqual(t, time.Since(tn).Seconds(), float64(5)) |
🤖 Prompt for AI Agents
In binary_heap_test.go around lines 253 to 255, the timing assertion is
inverted: it currently asserts 5 >= elapsed but should assert elapsed >= 5;
change the assertion to compare the elapsed seconds as the first value and 5 as
the second (e.g. assert.GreaterOrEqual(t, time.Since(tn).Seconds(), float64(5)))
so the test verifies that at least 5 seconds have passed.
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (2)
binary_heap_test.go (2)
223-226: Fix inverted timing assertion.The assertion order is still reversed, so the test incorrectly passes when the elapsed time is less than the required 5 s. Swap the arguments so we verify
elapsed >= 5.- assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds()) + assert.GreaterOrEqual(t, time.Since(tn).Seconds(), float64(5))
257-260: Fix inverted timing assertion here as well.Same inversion bug as above: the check currently enforces
5 ≥ elapsed. Flip the arguments so we assert the elapsed time meets the 5 s floor.- assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds()) + assert.GreaterOrEqual(t, time.Since(tn).Seconds(), float64(5))
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
.github/ISSUE_TEMPLATE/.github/pull_request_template.md(1 hunks)binary_heap.go(5 hunks)binary_heap_test.go(9 hunks)
🧰 Additional context used
🪛 GitHub Actions: tests
binary_heap_test.go
[error] 340-340: TestBinHeap_Remove: should be 5
[error] 382-382: TestExists: Should be true
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Build (Go stable, OS ubuntu-latest)
| func (bh *BinHeap[T]) Stop() { | ||
| bh.stopCf() | ||
| } |
There was a problem hiding this comment.
Stop must wake the extractor before channel consumers hang.
If Stop() is called while the heap is empty, extractMin() stays blocked on cond.Wait() because the loop never checks stopCtx and Stop() never signals the condition. As a result the goroutine leaks, minCh is never closed, and any consumer waiting in a select will block forever. Please broadcast the condition inside Stop() and short-circuit the wait loop when the context is canceled.
func (bh *BinHeap[T]) Stop() {
- bh.stopCf()
+ bh.stopCf()
+ bh.cond.L.Lock()
+ bh.cond.Broadcast()
+ bh.cond.L.Unlock()
}
…
- for bh.Len() == 0 {
- bh.cond.Wait()
- }
+ for bh.Len() == 0 {
+ if bh.stopCtx.Err() != nil {
+ bh.cond.L.Unlock()
+ close(bh.minCh)
+ return
+ }
+ bh.cond.Wait()
+ }Also applies to: 207-209
🤖 Prompt for AI Agents
In binary_heap.go around lines 109-111 (and similarly 207-209), Stop() currently
only cancels the context but does not wake any goroutine waiting on the
condition, so extractMin() can block on cond.Wait() forever; modify Stop() to
call bh.stopCf() and then bh.cond.Broadcast() (or Signal() if only one waiter)
so any waiting extractor wakes, and update extractMin()'s wait loop to
short-circuit when the stop context is canceled by checking bh.stopCtx.Done() or
bh.stopCtx.Err() after waking (break/return to allow minCh to be closed) instead
of waiting indefinitely; apply the same broadcast + stop-check fix to the other
Stop() occurrence at lines 207-209.
| bh.swap(0, bh.len-1) | ||
|
|
||
| // reduce len | ||
| atomic.AddUint64(&bh.len, ^uint64(0)) | ||
| item := (bh.items)[int(bh.len)-1] //nolint:gosec | ||
| bh.items = (bh).items[0 : int(bh.len)-1] //nolint:gosec | ||
| bh.fixDown(0, int(bh.len-2)) //nolint:gosec | ||
|
|
||
| // remove item | ||
| delete(bh.exists, item.ID()) | ||
| // reduce len | ||
| atomic.AddUint64(&bh.len, ^uint64(0)) | ||
|
|
||
| bh.cond.L.Unlock() | ||
| return item | ||
| // remove item | ||
| delete(bh.exists, item.ID()) | ||
| bh.cond.L.Unlock() | ||
| // send item to the channel | ||
| select { | ||
| case bh.minCh <- item: | ||
| case <-bh.stopCtx.Done(): | ||
| close(bh.minCh) | ||
| return | ||
| } |
There was a problem hiding this comment.
Don’t remove heap entries until a receiver is ready.
This loop swaps, truncates, decrements len, and deletes from exists before confirming that anyone is ready to receive from minCh. When no consumer is listening (the common case in tests such as TestBinHeap_Remove and TestExists), the goroutine still drains the heap and the affected item disappears from Remove/Exists queries while the send remains blocked. That’s exactly why the pipeline is failing (TestBinHeap_Remove: should be 5, TestExists: Should be true). Please rework the extraction so the heap state is mutated only after a corresponding receive succeeds (e.g., guard the removal behind a handshake/pending buffer, or otherwise delay the swap/truncate/delete until the channel send completes).
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
binary_heap_test.go (2)
229-233: Fix inverted elapsed-time assertion.
assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds())still checks5 ≥ elapsed, so the test passes even if the wait is shorter than 5 s. Swap the arguments so we assertelapsed ≥ 5.- assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds()) + assert.GreaterOrEqual(t, time.Since(tn).Seconds(), float64(5))
264-267: Same elapsed-time assertion inversion here.This assertion also verifies the opposite inequality. Please swap the operands so it enforces
elapsed ≥ 5.- assert.GreaterOrEqual(t, float64(5), time.Since(tn).Seconds()) + assert.GreaterOrEqual(t, time.Since(tn).Seconds(), float64(5))
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
.github/workflows/linux.yml(1 hunks)binary_heap_test.go(9 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
binary_heap_test.go (1)
binary_heap.go (1)
Item(14-21)
🪛 GitHub Actions: tests
binary_heap_test.go
[error] 388-388: TestExists failed: Should be true
| itemCh := bh.ExtractMinCh() | ||
| for range 11 { | ||
| item := bh.ExtractMin() | ||
| item := <-itemCh | ||
| item.Priority() | ||
| res = append(res, item.Priority()) |
There was a problem hiding this comment.
Background extractor is removing items prematurely.
After the switch to ExtractMinCh, the background goroutine is now popping the min element even when nobody is reading from the channel, so tests that only call Exists (see the TestExists failure in CI) lose items immediately. The extractor must block before removing from the heap (e.g., coordinate on demand or buffer the request, not the item) so items remain present until a consumer is ready.
Reason for This PR
Description of Changes
License Acceptance
By submitting this pull request, I confirm that my contribution is made under
the terms of the MIT license.
PR Checklist
[Author TODO: Meet these criteria.][Reviewer TODO: Verify that these criteria are met. Request changes if not]git commit -s).CHANGELOG.md.Summary by CodeRabbit
New Features
Breaking Changes
Behavior
Refactor
Documentation