Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,24 +97,42 @@ func (s *Server) removeSub(conn net.Conn) {
}
}

// publish fan-outs evt to every subscriber on evt.Topic and "*" (minus
// the sender). The subscriber set is snapshotted under a brief RLock
// and the lock is released before any WriteEvent I/O — otherwise a slow
// subscriber (full pipe / TCP send buffer) would block WriteEvent while
// the RLock is still held, starving every concurrent addSub/removeSub
// caller waiting on mu.Lock(). See broker.publishWith in service.go for
// the same pattern (P2-003).
func (s *Server) publish(evt *Event, sender net.Conn) {
s.mu.RLock()
defer s.mu.RUnlock()

// Send to topic-specific subscribers
targets := make([]net.Conn, 0, len(s.subs[evt.Topic])+len(s.subs["*"]))
for _, conn := range s.subs[evt.Topic] {
if conn != sender {
WriteEvent(conn, evt)
targets = append(targets, conn)
}
}
// Send to wildcard subscribers
if evt.Topic != "*" {
for _, conn := range s.subs["*"] {
if conn != sender {
WriteEvent(conn, evt)
targets = append(targets, conn)
}
}
}
s.mu.RUnlock()

for _, conn := range targets {
if err := WriteEvent(conn, evt); err != nil {
slog.Debug("eventstream write failed",
"remote", conn.RemoteAddr(),
"topic", evt.Topic,
"error", err)
}
}

slog.Debug("eventstream published", "topic", evt.Topic, "bytes", len(evt.Payload), "from", sender.RemoteAddr())
slog.Debug("eventstream published",
"topic", evt.Topic,
"bytes", len(evt.Payload),
"from", sender.RemoteAddr(),
"targets", len(targets))
}
203 changes: 203 additions & 0 deletions zz_server_internals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ package eventstream

import (
"net"
"sync"
"testing"
"time"
)

// TestServer_AddRemoveSubDirect drives the bookkeeping methods on the
Expand Down Expand Up @@ -97,6 +99,207 @@ func TestServer_Publish_WildcardSubscriber(t *testing.T) {
}
}

// TestServer_PublishDoesNotBlockOnSlowSubscriber is a regression test
// for the bug where Server.publish held mu.RLock() across WriteEvent
// I/O. With the bug, a slow subscriber (full pipe buffer) wedged the
// RLock, blocking every concurrent addSub/removeSub waiting on
// mu.Lock(). The fix snapshots the subscriber list under a brief lock
// then releases before iterating, so subscription mutations stay
// responsive even while a peer is wedged.
func TestServer_PublishDoesNotBlockOnSlowSubscriber(t *testing.T) {
t.Parallel()
s := &Server{subs: map[string][]net.Conn{}}

// Fast subscriber: drain immediately in a goroutine.
fast, fastPeer := net.Pipe()
defer fast.Close()
defer fastPeer.Close()
go func() {
for {
if _, err := ReadEvent(fastPeer); err != nil {
return
}
}
}()

// Slow subscriber: never read until the test releases it. With the
// old code, WriteEvent blocks on the pipe write, and the RLock is
// held the whole time.
slow, slowPeer := net.Pipe()
defer slow.Close()
defer slowPeer.Close()

release := make(chan struct{})
go func() {
// Drain the slow side only after the test signals.
<-release
for {
if _, err := ReadEvent(slowPeer); err != nil {
return
}
}
}()

s.addSub("topic", fast)
s.addSub("topic", slow)

// Drive a publish from a sender that isn't subscribed.
sender, _ := net.Pipe()
defer sender.Close()

publishDone := make(chan struct{})
go func() {
defer close(publishDone)
s.publish(&Event{Topic: "topic", Payload: []byte("payload")}, sender)
}()

// Give publish a moment to enter — with the buggy implementation
// it acquires RLock and starts writing to fast (which drains),
// then blocks on slow's WriteEvent while still holding RLock.
// With the fixed implementation it has already snapshotted and
// released RLock before either WriteEvent call.
time.Sleep(20 * time.Millisecond)

// Concurrently try to mutate the subscription set. With the bug,
// addSub blocks on mu.Lock() while the publish holds RLock.
extra, _ := net.Pipe()
defer extra.Close()

addDone := make(chan struct{})
addStart := time.Now()
go func() {
s.addSub("other-topic", extra)
close(addDone)
}()

select {
case <-addDone:
elapsed := time.Since(addStart)
if elapsed > 100*time.Millisecond {
t.Fatalf("addSub took %v with slow subscriber present; want <100ms", elapsed)
}
case <-time.After(100 * time.Millisecond):
t.Fatalf("addSub blocked >100ms behind slow subscriber — publish is holding RLock across I/O")
}

// Drain the slow side so publish can finish; the test goroutines clean up.
close(release)
select {
case <-publishDone:
case <-time.After(2 * time.Second):
t.Fatal("publish never completed after slow subscriber drained")
}
}

// TestServer_PublishHandlesSlowSubscriberWithoutDeadlock exercises the
// happy-path semantics alongside the slow-subscriber scenario: a fast
// subscriber still receives the event promptly, multiple publishers
// can run concurrently against a wedged peer, and removeSub during the
// wedge succeeds without deadlock.
func TestServer_PublishHandlesSlowSubscriberWithoutDeadlock(t *testing.T) {
t.Parallel()
s := &Server{subs: map[string][]net.Conn{}}

fast, fastPeer := net.Pipe()
defer fast.Close()
defer fastPeer.Close()

slow, slowPeer := net.Pipe()
defer slow.Close()
defer slowPeer.Close()

// Fast peer drains continuously; it asserts the first event was
// delivered, then keeps reading to unblock the rest of the publishers.
received := make(chan *Event, 8)
fastDone := make(chan struct{})
go func() {
defer close(fastDone)
for {
evt, err := ReadEvent(fastPeer)
if err != nil {
return
}
select {
case received <- evt:
default:
}
}
}()

// Slow peer never reads — its pipe stays wedged for the lifetime
// of the test. We rely on Close() at the end to unblock anything
// still stuck inside WriteEvent.

s.addSub("t", fast)
s.addSub("t", slow)

sender, _ := net.Pipe()
defer sender.Close()

// Kick off three publishers in parallel. None of them must
// deadlock the broker; the fast peer must get all three events.
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
s.publish(&Event{Topic: "t", Payload: []byte("p")}, sender)
}()
}

// The fast peer must receive the first event quickly.
select {
case got := <-received:
if got.Topic != "t" || string(got.Payload) != "p" {
t.Errorf("fast peer got unexpected event: %+v", got)
}
case <-time.After(500 * time.Millisecond):
t.Fatal("fast peer never received event — broker is wedged by slow peer")
}

// removeSub on the slow peer must not deadlock even while
// publish goroutines are mid-flight blocked on the pipe.
removeDone := make(chan struct{})
go func() {
s.removeSub(slow)
close(removeDone)
}()
select {
case <-removeDone:
case <-time.After(200 * time.Millisecond):
t.Fatal("removeSub blocked while slow subscriber wedged the broker")
}

// Drain the slow side so the in-flight publishers can complete.
// We use a goroutine so closing slow/slowPeer in defer also
// works as a safety net.
drainDone := make(chan struct{})
go func() {
defer close(drainDone)
for {
if _, err := ReadEvent(slowPeer); err != nil {
return
}
}
}()

done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("publishers never drained after slow peer started draining")
}
// Close both sides to terminate the drain + fast reader goroutines.
slowPeer.Close()
fastPeer.Close()
<-drainDone
<-fastDone
}

// TestNewServer covers the driver-less constructor branch.
func TestNewServer_NilDriverPanicsOnUse(t *testing.T) {
t.Parallel()
Expand Down
Loading