From 901a6acbc7ea004b92bd3c83c07f0e09a8d0d713 Mon Sep 17 00:00:00 2001 From: "Peter Chen J." Date: Wed, 3 Jun 2026 19:35:11 +0800 Subject: [PATCH 1/4] Align subscribe config channel size contract --- client.go | 6 +++--- client_test.go | 2 +- subscription_manager.go | 2 +- subscription_manager_test.go | 27 +++++++++++++++++++++++++++ 4 files changed, 32 insertions(+), 5 deletions(-) diff --git a/client.go b/client.go index 1fb7260b..3bf3dd9b 100644 --- a/client.go +++ b/client.go @@ -1325,10 +1325,10 @@ const subscribeChanSizeDefault = 1_000 // Client.SubscribeConfig. type SubscribeConfig struct { // ChanSize is the size of the buffered channel that will be created for the - // subscription. Incoming events that overall this number because a listener - // isn't reading from the channel in a timely manner will be dropped. + // subscription. Incoming events that would overflow this buffer because a + // listener isn't reading from the channel in a timely manner will be dropped. // - // Defaults to 1000. + // Defaults to 1000 when set to 0. ChanSize int // Kinds are the kinds of events that the subscription will receive. diff --git a/client_test.go b/client_test.go index 2a7dd702..f19b1051 100644 --- a/client_test.go +++ b/client_test.go @@ -7291,7 +7291,7 @@ func Test_Client_SubscribeConfig(t *testing.T) { client := newTestClient(t, dbPool, config) - require.PanicsWithValue(t, "SubscribeConfig.ChanSize must be greater or equal to 1", func() { + require.PanicsWithValue(t, "SubscribeConfig.ChanSize must be greater or equal to 0", func() { _, _ = client.SubscribeConfig(&SubscribeConfig{ ChanSize: -1, }) diff --git a/subscription_manager.go b/subscription_manager.go index c74ef4d4..4c403a45 100644 --- a/subscription_manager.go +++ b/subscription_manager.go @@ -222,7 +222,7 @@ func (sm *subscriptionManager) distributeQueueEventWithContext(ctx context.Conte // an overridden size. func (sm *subscriptionManager) SubscribeConfig(config *SubscribeConfig) (<-chan *Event, func()) { if config.ChanSize < 0 { - panic("SubscribeConfig.ChanSize must be greater or equal to 1") + panic("SubscribeConfig.ChanSize must be greater or equal to 0") } if config.ChanSize == 0 { config.ChanSize = subscribeChanSizeDefault diff --git a/subscription_manager_test.go b/subscription_manager_test.go index c8a04509..92956b65 100644 --- a/subscription_manager_test.go +++ b/subscription_manager_test.go @@ -169,4 +169,31 @@ func Test_SubscriptionManager(t *testing.T) { require.Contains(t, strings.TrimSpace(logBuf.String()), "Subscription event dropped due to full buffer") require.Contains(t, logBuf.String(), "event_kind=queue_paused") }) + + t.Run("PanicOnNegativeChanSize", func(t *testing.T) { + t.Parallel() + + manager := newSubscriptionManager(riversharedtest.BaseServiceArchetype(t), nil) + + require.PanicsWithValue(t, "SubscribeConfig.ChanSize must be greater or equal to 0", func() { + _, _ = manager.SubscribeConfig(&SubscribeConfig{ + ChanSize: -1, + Kinds: []EventKind{EventKindQueuePaused}, + }) + }) + }) + + t.Run("UsesDefaultChanSizeWhenZero", func(t *testing.T) { + t.Parallel() + + manager := newSubscriptionManager(riversharedtest.BaseServiceArchetype(t), nil) + + sub, cancelSub := manager.SubscribeConfig(&SubscribeConfig{ + ChanSize: 0, + Kinds: []EventKind{EventKindQueuePaused}, + }) + t.Cleanup(cancelSub) + + require.Equal(t, subscribeChanSizeDefault, cap(sub)) + }) } From 56fd461c813086c59f06def2d353008d90463dc1 Mon Sep 17 00:00:00 2001 From: "Peter Chen J." Date: Wed, 3 Jun 2026 20:25:43 +0800 Subject: [PATCH 2/4] Revert explicit zero-value chan size wording --- client.go | 2 +- subscription_manager.go | 2 +- subscription_manager_test.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/client.go b/client.go index 3bf3dd9b..e5523c1e 100644 --- a/client.go +++ b/client.go @@ -1328,7 +1328,7 @@ type SubscribeConfig struct { // subscription. Incoming events that would overflow this buffer because a // listener isn't reading from the channel in a timely manner will be dropped. // - // Defaults to 1000 when set to 0. + // Defaults to 1000. ChanSize int // Kinds are the kinds of events that the subscription will receive. diff --git a/subscription_manager.go b/subscription_manager.go index 4c403a45..c74ef4d4 100644 --- a/subscription_manager.go +++ b/subscription_manager.go @@ -222,7 +222,7 @@ func (sm *subscriptionManager) distributeQueueEventWithContext(ctx context.Conte // an overridden size. func (sm *subscriptionManager) SubscribeConfig(config *SubscribeConfig) (<-chan *Event, func()) { if config.ChanSize < 0 { - panic("SubscribeConfig.ChanSize must be greater or equal to 0") + panic("SubscribeConfig.ChanSize must be greater or equal to 1") } if config.ChanSize == 0 { config.ChanSize = subscribeChanSizeDefault diff --git a/subscription_manager_test.go b/subscription_manager_test.go index 92956b65..57d4a133 100644 --- a/subscription_manager_test.go +++ b/subscription_manager_test.go @@ -175,7 +175,7 @@ func Test_SubscriptionManager(t *testing.T) { manager := newSubscriptionManager(riversharedtest.BaseServiceArchetype(t), nil) - require.PanicsWithValue(t, "SubscribeConfig.ChanSize must be greater or equal to 0", func() { + require.PanicsWithValue(t, "SubscribeConfig.ChanSize must be greater or equal to 1", func() { _, _ = manager.SubscribeConfig(&SubscribeConfig{ ChanSize: -1, Kinds: []EventKind{EventKindQueuePaused}, From aec2483c97ac718cc8e56fcfd113c18b79ccf7ec Mon Sep 17 00:00:00 2001 From: "Peter Chen J." Date: Wed, 3 Jun 2026 20:36:21 +0800 Subject: [PATCH 3/4] Revert remaining chan size test assertion --- client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client_test.go b/client_test.go index f19b1051..2a7dd702 100644 --- a/client_test.go +++ b/client_test.go @@ -7291,7 +7291,7 @@ func Test_Client_SubscribeConfig(t *testing.T) { client := newTestClient(t, dbPool, config) - require.PanicsWithValue(t, "SubscribeConfig.ChanSize must be greater or equal to 0", func() { + require.PanicsWithValue(t, "SubscribeConfig.ChanSize must be greater or equal to 1", func() { _, _ = client.SubscribeConfig(&SubscribeConfig{ ChanSize: -1, }) From 3a3a9170bcec09064cfe7a0c55e442666beefc0f Mon Sep 17 00:00:00 2001 From: "Peter Chen J." Date: Wed, 3 Jun 2026 21:31:24 +0800 Subject: [PATCH 4/4] Retrigger CI after flaky race timeout