From bf59a4462dd4b00133995dd902a37e6c356b7d2f Mon Sep 17 00:00:00 2001 From: "Peter Chen J." Date: Wed, 3 Jun 2026 19:35:11 +0800 Subject: [PATCH 1/4] Log dropped subscription events --- subscription_manager.go | 7 +++++++ subscription_manager_test.go | 31 +++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/subscription_manager.go b/subscription_manager.go index 3afda983..a7610b9b 100644 --- a/subscription_manager.go +++ b/subscription_manager.go @@ -3,6 +3,7 @@ package river import ( "context" "fmt" + "log/slog" "sync" "time" @@ -186,6 +187,9 @@ func (sm *subscriptionManager) distributeJobEvent(job *rivertype.JobRow, stats * select { case sub.Chan <- event: default: + sm.Logger.WarnContext(context.Background(), sm.Name+": Subscription event dropped due to full buffer", + slog.String("event_kind", string(event.Kind)), + ) } } } @@ -202,6 +206,9 @@ func (sm *subscriptionManager) distributeQueueEvent(event *Event) { select { case sub.Chan <- event: default: + sm.Logger.WarnContext(context.Background(), sm.Name+": Subscription event dropped due to full buffer", + slog.String("event_kind", string(event.Kind)), + ) } } } diff --git a/subscription_manager_test.go b/subscription_manager_test.go index 1d667ad5..2cbdc2fa 100644 --- a/subscription_manager_test.go +++ b/subscription_manager_test.go @@ -1,7 +1,10 @@ package river import ( + "bytes" "context" + "log/slog" + "strings" "testing" "time" @@ -13,6 +16,7 @@ import ( "github.com/riverqueue/river/riverdbtest" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/baseservice" "github.com/riverqueue/river/rivershared/riversharedtest" "github.com/riverqueue/river/rivershared/startstoptest" "github.com/riverqueue/river/rivershared/testfactory" @@ -138,4 +142,31 @@ func Test_SubscriptionManager(t *testing.T) { startstoptest.Stress(ctx, t, svc) }) + + t.Run("LogsDroppedQueueEvents", func(t *testing.T) { + t.Parallel() + + var logBuf bytes.Buffer + + manager := newSubscriptionManager(&baseservice.Archetype{ + Logger: slog.New(slog.NewTextHandler(&logBuf, &slog.HandlerOptions{Level: slog.LevelWarn})), + Time: riversharedtest.BaseServiceArchetype(t).Time, + }, nil) + + sub, cancelSub := manager.SubscribeConfig(&SubscribeConfig{ChanSize: 1, Kinds: []EventKind{EventKindQueuePaused}}) + t.Cleanup(cancelSub) + + manager.distributeQueueEvent(&Event{ + Kind: EventKindQueuePaused, + Queue: &rivertype.Queue{Name: "default"}, + }) + manager.distributeQueueEvent(&Event{ + Kind: EventKindQueuePaused, + Queue: &rivertype.Queue{Name: "default"}, + }) + + require.Len(t, sub, 1) + require.Contains(t, strings.TrimSpace(logBuf.String()), "Subscription event dropped due to full buffer") + require.Contains(t, logBuf.String(), "event_kind=queue_paused") + }) } From 97eb71a883c96f83460c805bafcee29fb84ecc9b Mon Sep 17 00:00:00 2001 From: "Peter Chen J." Date: Wed, 3 Jun 2026 19:47:04 +0800 Subject: [PATCH 2/4] Thread context through subscription drop logs --- subscription_manager.go | 18 +++++++++++------- subscription_manager_test.go | 4 ++-- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/subscription_manager.go b/subscription_manager.go index a7610b9b..c74ef4d4 100644 --- a/subscription_manager.go +++ b/subscription_manager.go @@ -80,13 +80,13 @@ func (sm *subscriptionManager) Start(ctx context.Context) error { // one has to be careful in tests. sm.Logger.DebugContext(ctx, sm.Name+": Stopping; distributing subscriptions until channel is closed") for updates := range sm.subscribeCh { - sm.distributeJobUpdates(updates) + sm.distributeJobUpdates(ctx, updates) } return case updates := <-sm.subscribeCh: - sm.distributeJobUpdates(updates) + sm.distributeJobUpdates(ctx, updates) } } }() @@ -119,7 +119,7 @@ func (sm *subscriptionManager) safeDurationAverage(d time.Duration, n int) time. // Receives updates from the completer and prompts the client to update // statistics and distribute jobs into any listening subscriber channels. // (Subscriber channels are non-blocking so this should be quite fast.) -func (sm *subscriptionManager) distributeJobUpdates(updates []jobcompleter.CompleterJobUpdated) { +func (sm *subscriptionManager) distributeJobUpdates(ctx context.Context, updates []jobcompleter.CompleterJobUpdated) { func() { sm.statsMu.Lock() defer sm.statsMu.Unlock() @@ -142,7 +142,7 @@ func (sm *subscriptionManager) distributeJobUpdates(updates []jobcompleter.Compl } for _, update := range updates { - sm.distributeJobEvent(update.Job, jobStatisticsFromInternal(update.JobStats), update.Snoozed) + sm.distributeJobEvent(ctx, update.Job, jobStatisticsFromInternal(update.JobStats), update.Snoozed) } } @@ -152,7 +152,7 @@ func (sm *subscriptionManager) distributeJobUpdates(updates []jobcompleter.Compl // the queue. // // MUST be called with sm.mu already held. -func (sm *subscriptionManager) distributeJobEvent(job *rivertype.JobRow, stats *JobStatistics, snoozed bool) { +func (sm *subscriptionManager) distributeJobEvent(ctx context.Context, job *rivertype.JobRow, stats *JobStatistics, snoozed bool) { var event *Event if snoozed { event = &Event{Kind: EventKindJobSnoozed, Job: job, JobStats: stats} @@ -187,7 +187,7 @@ func (sm *subscriptionManager) distributeJobEvent(job *rivertype.JobRow, stats * select { case sub.Chan <- event: default: - sm.Logger.WarnContext(context.Background(), sm.Name+": Subscription event dropped due to full buffer", + sm.Logger.WarnContext(ctx, sm.Name+": Subscription event dropped due to full buffer", slog.String("event_kind", string(event.Kind)), ) } @@ -196,6 +196,10 @@ func (sm *subscriptionManager) distributeJobEvent(job *rivertype.JobRow, stats * } func (sm *subscriptionManager) distributeQueueEvent(event *Event) { + sm.distributeQueueEventWithContext(context.Background(), event) +} + +func (sm *subscriptionManager) distributeQueueEventWithContext(ctx context.Context, event *Event) { sm.mu.Lock() defer sm.mu.Unlock() @@ -206,7 +210,7 @@ func (sm *subscriptionManager) distributeQueueEvent(event *Event) { select { case sub.Chan <- event: default: - sm.Logger.WarnContext(context.Background(), sm.Name+": Subscription event dropped due to full buffer", + sm.Logger.WarnContext(ctx, sm.Name+": Subscription event dropped due to full buffer", slog.String("event_kind", string(event.Kind)), ) } diff --git a/subscription_manager_test.go b/subscription_manager_test.go index 2cbdc2fa..c8a04509 100644 --- a/subscription_manager_test.go +++ b/subscription_manager_test.go @@ -156,11 +156,11 @@ func Test_SubscriptionManager(t *testing.T) { sub, cancelSub := manager.SubscribeConfig(&SubscribeConfig{ChanSize: 1, Kinds: []EventKind{EventKindQueuePaused}}) t.Cleanup(cancelSub) - manager.distributeQueueEvent(&Event{ + manager.distributeQueueEventWithContext(ctx, &Event{ Kind: EventKindQueuePaused, Queue: &rivertype.Queue{Name: "default"}, }) - manager.distributeQueueEvent(&Event{ + manager.distributeQueueEventWithContext(ctx, &Event{ Kind: EventKindQueuePaused, Queue: &rivertype.Queue{Name: "default"}, }) From d6d53a4c4acb8da5c4a517c1eccf173d66e777a3 Mon Sep 17 00:00:00 2001 From: "Peter Chen J." Date: Wed, 3 Jun 2026 20:35:46 +0800 Subject: [PATCH 3/4] Add changelog entry for dropped subscription logs --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f52c6b08..6d734eb3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added `rivermigrate.ValidateOpts.TargetVersion` so validation can check migrations up to a specific target version, matching the target-version behavior available on `Migrate` and `MigrateTx`. Notably, this is a breaking API change as the validate functions previously didn't take any options. [PR #1259](https://github.com/riverqueue/river/pull/1259) - When using `(*Migrator[TTx]).Migrate` with a `TargetVersion` that's already applied, River now no-ops idempotently instead of returning an error as a user convenience. [PR #1260](https://github.com/riverqueue/river/pull/1260) +- Log dropped job and queue subscription events at warn level when a subscriber buffer is full, including `event_kind` for easier diagnosis. [PR #1271](https://github.com/riverqueue/river/pull/1271) ### Fixed From 55e5eed9988246e0dfaa0ac8b9a18f4c504ed65a Mon Sep 17 00:00:00 2001 From: "Peter Chen J." Date: Wed, 3 Jun 2026 20:46:27 +0800 Subject: [PATCH 4/4] Retrigger CI for subscription drop observability