diff --git a/client.go b/client.go index 1fb7260b..e5523c1e 100644 --- a/client.go +++ b/client.go @@ -1325,8 +1325,8 @@ const subscribeChanSizeDefault = 1_000 // Client.SubscribeConfig. type SubscribeConfig struct { // ChanSize is the size of the buffered channel that will be created for the - // subscription. Incoming events that overall this number because a listener - // isn't reading from the channel in a timely manner will be dropped. + // subscription. Incoming events that would overflow this buffer because a + // listener isn't reading from the channel in a timely manner will be dropped. // // Defaults to 1000. ChanSize int diff --git a/subscription_manager_test.go b/subscription_manager_test.go index c8a04509..57d4a133 100644 --- a/subscription_manager_test.go +++ b/subscription_manager_test.go @@ -169,4 +169,31 @@ func Test_SubscriptionManager(t *testing.T) { require.Contains(t, strings.TrimSpace(logBuf.String()), "Subscription event dropped due to full buffer") require.Contains(t, logBuf.String(), "event_kind=queue_paused") }) + + t.Run("PanicOnNegativeChanSize", func(t *testing.T) { + t.Parallel() + + manager := newSubscriptionManager(riversharedtest.BaseServiceArchetype(t), nil) + + require.PanicsWithValue(t, "SubscribeConfig.ChanSize must be greater or equal to 1", func() { + _, _ = manager.SubscribeConfig(&SubscribeConfig{ + ChanSize: -1, + Kinds: []EventKind{EventKindQueuePaused}, + }) + }) + }) + + t.Run("UsesDefaultChanSizeWhenZero", func(t *testing.T) { + t.Parallel() + + manager := newSubscriptionManager(riversharedtest.BaseServiceArchetype(t), nil) + + sub, cancelSub := manager.SubscribeConfig(&SubscribeConfig{ + ChanSize: 0, + Kinds: []EventKind{EventKindQueuePaused}, + }) + t.Cleanup(cancelSub) + + require.Equal(t, subscribeChanSizeDefault, cap(sub)) + }) }