diff --git a/server.go b/server.go index 0ebf558..b306567 100644 --- a/server.go +++ b/server.go @@ -97,24 +97,42 @@ func (s *Server) removeSub(conn net.Conn) { } } +// publish fan-outs evt to every subscriber on evt.Topic and "*" (minus +// the sender). The subscriber set is snapshotted under a brief RLock +// and the lock is released before any WriteEvent I/O — otherwise a slow +// subscriber (full pipe / TCP send buffer) would block WriteEvent while +// the RLock is still held, starving every concurrent addSub/removeSub +// caller waiting on mu.Lock(). See broker.publishWith in service.go for +// the same pattern (P2-003). func (s *Server) publish(evt *Event, sender net.Conn) { s.mu.RLock() - defer s.mu.RUnlock() - - // Send to topic-specific subscribers + targets := make([]net.Conn, 0, len(s.subs[evt.Topic])+len(s.subs["*"])) for _, conn := range s.subs[evt.Topic] { if conn != sender { - WriteEvent(conn, evt) + targets = append(targets, conn) } } - // Send to wildcard subscribers if evt.Topic != "*" { for _, conn := range s.subs["*"] { if conn != sender { - WriteEvent(conn, evt) + targets = append(targets, conn) } } } + s.mu.RUnlock() + + for _, conn := range targets { + if err := WriteEvent(conn, evt); err != nil { + slog.Debug("eventstream write failed", + "remote", conn.RemoteAddr(), + "topic", evt.Topic, + "error", err) + } + } - slog.Debug("eventstream published", "topic", evt.Topic, "bytes", len(evt.Payload), "from", sender.RemoteAddr()) + slog.Debug("eventstream published", + "topic", evt.Topic, + "bytes", len(evt.Payload), + "from", sender.RemoteAddr(), + "targets", len(targets)) } diff --git a/zz_server_internals_test.go b/zz_server_internals_test.go index a402ee0..9e9264d 100644 --- a/zz_server_internals_test.go +++ b/zz_server_internals_test.go @@ -4,7 +4,9 @@ package eventstream import ( "net" + "sync" "testing" + "time" ) // TestServer_AddRemoveSubDirect drives the bookkeeping methods on the @@ -97,6 +99,207 @@ func TestServer_Publish_WildcardSubscriber(t *testing.T) { } } +// TestServer_PublishDoesNotBlockOnSlowSubscriber is a regression test +// for the bug where Server.publish held mu.RLock() across WriteEvent +// I/O. With the bug, a slow subscriber (full pipe buffer) wedged the +// RLock, blocking every concurrent addSub/removeSub waiting on +// mu.Lock(). The fix snapshots the subscriber list under a brief lock +// then releases before iterating, so subscription mutations stay +// responsive even while a peer is wedged. +func TestServer_PublishDoesNotBlockOnSlowSubscriber(t *testing.T) { + t.Parallel() + s := &Server{subs: map[string][]net.Conn{}} + + // Fast subscriber: drain immediately in a goroutine. + fast, fastPeer := net.Pipe() + defer fast.Close() + defer fastPeer.Close() + go func() { + for { + if _, err := ReadEvent(fastPeer); err != nil { + return + } + } + }() + + // Slow subscriber: never read until the test releases it. With the + // old code, WriteEvent blocks on the pipe write, and the RLock is + // held the whole time. + slow, slowPeer := net.Pipe() + defer slow.Close() + defer slowPeer.Close() + + release := make(chan struct{}) + go func() { + // Drain the slow side only after the test signals. + <-release + for { + if _, err := ReadEvent(slowPeer); err != nil { + return + } + } + }() + + s.addSub("topic", fast) + s.addSub("topic", slow) + + // Drive a publish from a sender that isn't subscribed. + sender, _ := net.Pipe() + defer sender.Close() + + publishDone := make(chan struct{}) + go func() { + defer close(publishDone) + s.publish(&Event{Topic: "topic", Payload: []byte("payload")}, sender) + }() + + // Give publish a moment to enter — with the buggy implementation + // it acquires RLock and starts writing to fast (which drains), + // then blocks on slow's WriteEvent while still holding RLock. + // With the fixed implementation it has already snapshotted and + // released RLock before either WriteEvent call. + time.Sleep(20 * time.Millisecond) + + // Concurrently try to mutate the subscription set. With the bug, + // addSub blocks on mu.Lock() while the publish holds RLock. + extra, _ := net.Pipe() + defer extra.Close() + + addDone := make(chan struct{}) + addStart := time.Now() + go func() { + s.addSub("other-topic", extra) + close(addDone) + }() + + select { + case <-addDone: + elapsed := time.Since(addStart) + if elapsed > 100*time.Millisecond { + t.Fatalf("addSub took %v with slow subscriber present; want <100ms", elapsed) + } + case <-time.After(100 * time.Millisecond): + t.Fatalf("addSub blocked >100ms behind slow subscriber — publish is holding RLock across I/O") + } + + // Drain the slow side so publish can finish; the test goroutines clean up. + close(release) + select { + case <-publishDone: + case <-time.After(2 * time.Second): + t.Fatal("publish never completed after slow subscriber drained") + } +} + +// TestServer_PublishHandlesSlowSubscriberWithoutDeadlock exercises the +// happy-path semantics alongside the slow-subscriber scenario: a fast +// subscriber still receives the event promptly, multiple publishers +// can run concurrently against a wedged peer, and removeSub during the +// wedge succeeds without deadlock. +func TestServer_PublishHandlesSlowSubscriberWithoutDeadlock(t *testing.T) { + t.Parallel() + s := &Server{subs: map[string][]net.Conn{}} + + fast, fastPeer := net.Pipe() + defer fast.Close() + defer fastPeer.Close() + + slow, slowPeer := net.Pipe() + defer slow.Close() + defer slowPeer.Close() + + // Fast peer drains continuously; it asserts the first event was + // delivered, then keeps reading to unblock the rest of the publishers. + received := make(chan *Event, 8) + fastDone := make(chan struct{}) + go func() { + defer close(fastDone) + for { + evt, err := ReadEvent(fastPeer) + if err != nil { + return + } + select { + case received <- evt: + default: + } + } + }() + + // Slow peer never reads — its pipe stays wedged for the lifetime + // of the test. We rely on Close() at the end to unblock anything + // still stuck inside WriteEvent. + + s.addSub("t", fast) + s.addSub("t", slow) + + sender, _ := net.Pipe() + defer sender.Close() + + // Kick off three publishers in parallel. None of them must + // deadlock the broker; the fast peer must get all three events. + var wg sync.WaitGroup + for i := 0; i < 3; i++ { + wg.Add(1) + go func() { + defer wg.Done() + s.publish(&Event{Topic: "t", Payload: []byte("p")}, sender) + }() + } + + // The fast peer must receive the first event quickly. + select { + case got := <-received: + if got.Topic != "t" || string(got.Payload) != "p" { + t.Errorf("fast peer got unexpected event: %+v", got) + } + case <-time.After(500 * time.Millisecond): + t.Fatal("fast peer never received event — broker is wedged by slow peer") + } + + // removeSub on the slow peer must not deadlock even while + // publish goroutines are mid-flight blocked on the pipe. + removeDone := make(chan struct{}) + go func() { + s.removeSub(slow) + close(removeDone) + }() + select { + case <-removeDone: + case <-time.After(200 * time.Millisecond): + t.Fatal("removeSub blocked while slow subscriber wedged the broker") + } + + // Drain the slow side so the in-flight publishers can complete. + // We use a goroutine so closing slow/slowPeer in defer also + // works as a safety net. + drainDone := make(chan struct{}) + go func() { + defer close(drainDone) + for { + if _, err := ReadEvent(slowPeer); err != nil { + return + } + } + }() + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("publishers never drained after slow peer started draining") + } + // Close both sides to terminate the drain + fast reader goroutines. + slowPeer.Close() + fastPeer.Close() + <-drainDone + <-fastDone +} + // TestNewServer covers the driver-less constructor branch. func TestNewServer_NilDriverPanicsOnUse(t *testing.T) { t.Parallel()