From 97e5f2d913cabaa334ab42f361cbf8e96edc597c Mon Sep 17 00:00:00 2001 From: Yanhu007 Date: Tue, 14 Apr 2026 11:37:08 +0800 Subject: [PATCH] fix: prevent sequential execution in Stream without goroutine limit When no max goroutines is set, pool.MaxGoroutines() returns 0 (the cap of a nil channel), causing the internal queue buffer to be make(chan callbackCh, 0+1) = 1. This effectively serializes all tasks because Go() blocks on the queue after just one inflight task. Use runtime.GOMAXPROCS as a reasonable default queue buffer when no explicit goroutine limit is configured, matching the pool's unlimited concurrency behavior. Fixes #153 --- stream/stream.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/stream/stream.go b/stream/stream.go index 6b11e90..704d540 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -2,6 +2,7 @@ package stream import ( + "runtime" "sync" "github.com/sourcegraph/conc" @@ -109,7 +110,14 @@ func (s *Stream) WithMaxGoroutines(n int) *Stream { func (s *Stream) init() { s.initOnce.Do(func() { - s.queue = make(chan callbackCh, s.pool.MaxGoroutines()+1) + maxGoroutines := s.pool.MaxGoroutines() + if maxGoroutines == 0 { + // When no goroutine limit is set, use a buffer based on + // GOMAXPROCS so that task submission does not block on the + // queue, avoiding accidentally serialized execution. + maxGoroutines = runtime.GOMAXPROCS(0) + } + s.queue = make(chan callbackCh, maxGoroutines+1) // Start the callbacker. s.callbackerHandle.Go(s.callbacker)