diff --git a/stream/stream.go b/stream/stream.go index 6b11e90..704d540 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -2,6 +2,7 @@ package stream import ( + "runtime" "sync" "github.com/sourcegraph/conc" @@ -109,7 +110,14 @@ func (s *Stream) WithMaxGoroutines(n int) *Stream { func (s *Stream) init() { s.initOnce.Do(func() { - s.queue = make(chan callbackCh, s.pool.MaxGoroutines()+1) + maxGoroutines := s.pool.MaxGoroutines() + if maxGoroutines == 0 { + // When no goroutine limit is set, use a buffer based on + // GOMAXPROCS so that task submission does not block on the + // queue, avoiding accidentally serialized execution. + maxGoroutines = runtime.GOMAXPROCS(0) + } + s.queue = make(chan callbackCh, maxGoroutines+1) // Start the callbacker. s.callbackerHandle.Go(s.callbacker)