From 0eeae11ee837aca00f4990f96226c929e6b05a77 Mon Sep 17 00:00:00 2001 From: Ben Blier Date: Mon, 15 Jun 2026 16:59:30 -0400 Subject: [PATCH] doublezerod: gate multicast publisher heartbeat on BGP session up MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The publisher heartbeat is what registers the multicast source at the DZD and builds the (S,G). It was started inline in MulticastService.Setup before the BGP session was established, so on a reprovision after a host network reload the new source register could overlap the DZD tearing down the prior PIM-register state — the window that leaves a wedged (S,G) (no N/notify-MSDP flag, no OIF) and stalls delivery until a manual publisher reconnect. Defer the heartbeat behind a readiness watcher that starts it only once the BGP session reports Up, riding the existing 30s session lifecycle and 10s reconcile (no new timeout). Teardown cancels and awaits the watcher before closing the heartbeat. UpdateGroups (incremental add/remove without a disconnect/reconnect) now respects readiness: it records the new group set and updates the running sender in place, or defers to the watcher when the heartbeat has not started yet, avoiding a nil-conn panic in that window. --- CHANGELOG.md | 2 + .../internal/services/multicast.go | 94 +++++++- .../internal/services/services_test.go | 222 ++++++++++++++++-- 3 files changed, 289 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 87ea50ccbe..86f3a5c7ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ All notable changes to this project will be documented in this file. ### Changes +- Client + - Gate the multicast publisher heartbeat on the BGP session reaching Up - Onchain programs - Validate the device `mgmt_vrf` field against the account-code charset (`[A-Za-z0-9:_-]`) and a 32-byte length cap, matching the device `code` field. Empty (the default VRF) is still accepted. - Controller diff --git a/client/doublezerod/internal/services/multicast.go b/client/doublezerod/internal/services/multicast.go index 70228fe5be..08d7c18a36 100644 --- a/client/doublezerod/internal/services/multicast.go +++ b/client/doublezerod/internal/services/multicast.go @@ -1,10 +1,12 @@ package services import ( + "context" "errors" "fmt" "log/slog" "net" + "sync" "syscall" "time" @@ -26,8 +28,24 @@ type MulticastService struct { MulticastPubGroups []net.IP MulticastSubGroups []net.IP provisionReq *api.ProvisionRequest + heartbeatCancel context.CancelFunc + heartbeatWatcherWG sync.WaitGroup + + // mu guards the heartbeat lifecycle: heartbeatStarted records whether the + // readiness watcher has started the sender, and heartbeatGroups is the group + // set the sender should use (updated by UpdateGroups before the sender starts). + mu sync.Mutex + heartbeatStarted bool + heartbeatGroups []net.IP } +// publisherReadyPollInterval is how often the readiness watcher checks the BGP +// session state before starting the publisher heartbeat. It is a poll cadence, +// not a readiness timeout: the watcher rides the existing BGP session lifecycle +// (the session resolves to Up or a failure within bgp.BGPSessionTimeout) and is +// cancelled on Teardown. +const publisherReadyPollInterval = 250 * time.Millisecond + func (s *MulticastService) UserType() api.UserType { return api.UserTypeMulticast } func (s *MulticastService) ServiceType() ServiceType { return ServiceTypeMulticast } @@ -87,11 +105,6 @@ func (s *MulticastService) Setup(p *api.ProvisionRequest) error { return fmt.Errorf("error adding multicast route: %v", err) } } - - s.MulticastPubGroups = p.MulticastPubGroups - if err := s.heartbeat.Start(tun.Name, p.DoubleZeroIP, p.MulticastPubGroups, multicast.DefaultHeartbeatTTL, multicast.DefaultHeartbeatInterval); err != nil { - return fmt.Errorf("error starting heartbeat sender: %v", err) - } } if isSubscriber { @@ -149,9 +162,60 @@ func (s *MulticastService) Setup(p *api.ProvisionRequest) error { return fmt.Errorf("error adding peer: %v", err) } } + + // Defer the publisher heartbeat (which registers the source at the DZD) until + // the BGP session is Up, so the DZD never sees a source register before its + // tunnel/PIM plumbing is ready — the overlap that produces a wedged (S,G). + if isPublisher { + s.mu.Lock() + s.heartbeatGroups = p.MulticastPubGroups + s.mu.Unlock() + ctx, cancel := context.WithCancel(context.Background()) + s.heartbeatCancel = cancel + s.heartbeatWatcherWG.Add(1) + go func() { + defer s.heartbeatWatcherWG.Done() + s.startHeartbeatWhenReady(ctx, s.Tunnel.Name, p.DoubleZeroIP, s.Tunnel.RemoteOverlay) + }() + } + return nil } +// startHeartbeatWhenReady waits for the BGP session to reach Up, then starts the +// publisher heartbeat with the current group set. It rides the existing BGP +// session lifecycle: the session resolves to Up or a failure within +// bgp.BGPSessionTimeout, and a cycle that never reaches Up is retried when the +// manager reconcile re-provisions. It exits when the context is cancelled +// (Teardown), so no heartbeat start races with or follows a Close. +func (s *MulticastService) startHeartbeatWhenReady(ctx context.Context, iface string, srcIP net.IP, remoteOverlay net.IP) { + ticker := time.NewTicker(publisherReadyPollInterval) + defer ticker.Stop() + for { + if s.bgp.GetPeerStatus(remoteOverlay).SessionStatus == bgp.SessionStatusUp { + // Hold the lock across Start so a concurrent UpdateGroups can't touch + // the sender before it exists: UpdateGroups observes heartbeatStarted + // and either updates in place or leaves the group set for us to start + // with here. + s.mu.Lock() + if !s.heartbeatStarted { + if err := s.heartbeat.Start(iface, srcIP, s.heartbeatGroups, multicast.DefaultHeartbeatTTL, multicast.DefaultHeartbeatInterval); err != nil { + slog.Error("error starting heartbeat sender", "error", err) + } else { + s.heartbeatStarted = true + } + } + s.mu.Unlock() + return + } + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + } +} + func (s *MulticastService) Teardown() error { var errRemoveTunnel, errRemovePeer error if s.Tunnel == nil { @@ -159,6 +223,12 @@ func (s *MulticastService) Teardown() error { } if s.isPublisher() { + // Stop the readiness watcher and wait for it to exit before closing the + // heartbeat, so a pending start can't race with or follow the close. + if s.heartbeatCancel != nil { + s.heartbeatCancel() + } + s.heartbeatWatcherWG.Wait() if err := s.heartbeat.Close(); err != nil { slog.Error("error stopping heartbeat sender", "error", err) } @@ -261,10 +331,18 @@ func (s *MulticastService) UpdateGroups(newPR *api.ProvisionRequest) error { } } - // Restart heartbeat if publisher groups changed + // Apply publisher group changes to the heartbeat. If the readiness watcher + // hasn't started the sender yet (BGP not up), just record the new group set — + // the watcher will start with it; otherwise update the running sender in place. if isPublisher && (len(pubAdded) > 0 || len(pubRemoved) > 0) { - if err := s.heartbeat.UpdateGroups(newPR.MulticastPubGroups); err != nil { - return fmt.Errorf("error updating heartbeat groups: %v", err) + s.mu.Lock() + s.heartbeatGroups = newPR.MulticastPubGroups + started := s.heartbeatStarted + s.mu.Unlock() + if started { + if err := s.heartbeat.UpdateGroups(newPR.MulticastPubGroups); err != nil { + return fmt.Errorf("error updating heartbeat groups: %v", err) + } } } diff --git a/client/doublezerod/internal/services/services_test.go b/client/doublezerod/internal/services/services_test.go index 2de0960303..f151519e16 100644 --- a/client/doublezerod/internal/services/services_test.go +++ b/client/doublezerod/internal/services/services_test.go @@ -2,6 +2,7 @@ package services_test import ( "net" + "sync" "syscall" "testing" "time" @@ -17,6 +18,8 @@ import ( ) type MockBgpServer struct { + mu sync.Mutex + status bgp.Session deletedPeer net.IP addPeer *bgp.PeerConfig } @@ -31,9 +34,19 @@ func (m *MockBgpServer) DeletePeer(ip net.IP) error { m.deletedPeer = ip return nil } -func (m *MockBgpServer) GetPeerStatus(net.IP) bgp.Session { return bgp.Session{} } -func (m *MockBgpServer) Close() {} -func (m *MockBgpServer) GetPeers() []corebgp.PeerConfig { return []corebgp.PeerConfig{} } +func (m *MockBgpServer) GetPeerStatus(net.IP) bgp.Session { + m.mu.Lock() + defer m.mu.Unlock() + return m.status +} + +func (m *MockBgpServer) SetStatus(s bgp.Session) { + m.mu.Lock() + defer m.mu.Unlock() + m.status = s +} +func (m *MockBgpServer) Close() {} +func (m *MockBgpServer) GetPeers() []corebgp.PeerConfig { return []corebgp.PeerConfig{} } type MockNetlink struct { routes []*routing.Route @@ -124,15 +137,19 @@ func (m *MockPIMServer) Close() error { } type MockHeartbeatSender struct { - started bool - closed bool - iface string - srcIP net.IP - groups []net.IP - ttl int + mu sync.Mutex + started bool + closed bool + updateGroupsCalls int + iface string + srcIP net.IP + groups []net.IP + ttl int } func (m *MockHeartbeatSender) Start(iface string, srcIP net.IP, groups []net.IP, ttl int, interval time.Duration) error { + m.mu.Lock() + defer m.mu.Unlock() m.started = true m.iface = iface m.srcIP = srcIP @@ -142,17 +159,50 @@ func (m *MockHeartbeatSender) Start(iface string, srcIP net.IP, groups []net.IP, } func (m *MockHeartbeatSender) UpdateGroups(groups []net.IP) error { - m.closed = true - m.started = true + m.mu.Lock() + defer m.mu.Unlock() + m.updateGroupsCalls++ m.groups = groups return nil } func (m *MockHeartbeatSender) Close() error { + m.mu.Lock() + defer m.mu.Unlock() m.closed = true return nil } +func (m *MockHeartbeatSender) wasStarted() bool { + m.mu.Lock() + defer m.mu.Unlock() + return m.started +} + +func (m *MockHeartbeatSender) updateCalls() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.updateGroupsCalls +} + +func (m *MockHeartbeatSender) getGroups() []net.IP { + m.mu.Lock() + defer m.mu.Unlock() + return m.groups +} + +func waitForHeartbeatStarted(t *testing.T, m *MockHeartbeatSender) { + t.Helper() + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + if m.wasStarted() { + return + } + time.Sleep(20 * time.Millisecond) + } + t.Fatal("timed out waiting for heartbeat to start") +} + func TestServices(t *testing.T) { tests := []struct { name string @@ -721,11 +771,16 @@ func TestMulticastService_UpdateGroups_AddPubGroup(t *testing.T) { if err := svc.Setup(pr); err != nil { t.Fatalf("unexpected error: %v", err) } + t.Cleanup(func() { _ = svc.Teardown() }) + + // Bring BGP up and wait for the heartbeat to start so we exercise the + // running-heartbeat group-update path. + mockBgp.SetStatus(bgp.Session{SessionStatus: bgp.SessionStatusUp}) + waitForHeartbeatStarted(t, mockHeartbeat) + // Reset mock state after setup mockNetlink.routesAdded = nil mockNetlink.routesRemoved = nil - mockHeartbeat.started = false - mockHeartbeat.closed = false // Add a second pub group newPR := &api.ProvisionRequest{ @@ -765,15 +820,12 @@ func TestMulticastService_UpdateGroups_AddPubGroup(t *testing.T) { t.Fatalf("expected route src 7.7.7.7, got %v", addedRoute.Src) } - // Heartbeat should have been restarted - if !mockHeartbeat.closed { - t.Fatal("expected heartbeat to be closed during restart") + // Heartbeat groups should be updated in place (no teardown/restart). + if mockHeartbeat.updateCalls() != 1 { + t.Fatalf("expected heartbeat.UpdateGroups called once, got %d", mockHeartbeat.updateCalls()) } - if !mockHeartbeat.started { - t.Fatal("expected heartbeat to be restarted") - } - if len(mockHeartbeat.groups) != 2 { - t.Fatalf("expected heartbeat restarted with 2 groups, got %d", len(mockHeartbeat.groups)) + if len(mockHeartbeat.getGroups()) != 2 { + t.Fatalf("expected heartbeat updated with 2 groups, got %d", len(mockHeartbeat.getGroups())) } // ProvisionRequest should be updated @@ -811,6 +863,7 @@ func TestMulticastService_UpdateGroups_RemovePubGroup(t *testing.T) { if err := svc.Setup(pr); err != nil { t.Fatalf("unexpected error: %v", err) } + t.Cleanup(func() { _ = svc.Teardown() }) mockNetlink.routesAdded = nil mockNetlink.routesRemoved = nil @@ -948,3 +1001,130 @@ func TestMulticastService_DoubleTeardown(t *testing.T) { t.Fatalf("second Teardown() returned error: %v", err) } } + +// TestMulticastService_HeartbeatGatedOnBGPUp encodes the fix for the recurring +// wedged-(S,G) bug: the publisher source register (the heartbeat) must not start +// until the BGP session is Up, so the DZD never sees a source register before its +// plumbing is ready. On current main the heartbeat starts inline in Setup before +// the session is established, so this test fails. +func TestMulticastService_HeartbeatGatedOnBGPUp(t *testing.T) { + mockBgp := &MockBgpServer{} + mockNetlink := &MockNetlink{} + mockPim := &MockPIMServer{} + mockHeartbeat := &MockHeartbeatSender{} + + // Session starts Pending — the source must stay silent. + mockBgp.SetStatus(bgp.Session{SessionStatus: bgp.SessionStatusPending}) + + svc, err := manager.CreateService(api.UserTypeMulticast, mockBgp, mockNetlink, mockPim, mockHeartbeat) + if err != nil { + t.Fatalf("failed to create service: %v", err) + } + + pr := &api.ProvisionRequest{ + UserType: api.UserTypeMulticast, + TunnelSrc: net.IPv4(1, 1, 1, 1), + TunnelDst: net.IPv4(2, 2, 2, 2), + MulticastPubGroups: []net.IP{{239, 0, 0, 1}}, + TunnelNet: &net.IPNet{IP: net.IPv4(169, 254, 0, 0), Mask: net.IPMask{255, 255, 255, 254}}, + DoubleZeroIP: net.IPv4(7, 7, 7, 7), + DoubleZeroPrefixes: []*net.IPNet{}, + BgpLocalAsn: 65000, + BgpRemoteAsn: 65001, + } + if err := svc.Setup(pr); err != nil { + t.Fatalf("unexpected error: %v", err) + } + t.Cleanup(func() { _ = svc.Teardown() }) + + // While BGP is not Up, the heartbeat must stay silent even after the readiness + // watcher has had time to poll. + time.Sleep(300 * time.Millisecond) + if mockHeartbeat.wasStarted() { + t.Fatal("heartbeat started before BGP session reached Up") + } + + // Once BGP reaches Up, the heartbeat should start. + mockBgp.SetStatus(bgp.Session{SessionStatus: bgp.SessionStatusUp}) + + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + if mockHeartbeat.wasStarted() { + break + } + time.Sleep(20 * time.Millisecond) + } + if !mockHeartbeat.wasStarted() { + t.Fatal("heartbeat did not start after BGP session reached Up") + } +} + +// TestMulticastService_HeartbeatGroupChangeBeforeBGPUp guards the incremental +// group-update path (add/remove groups without a disconnect/reconnect) against +// the readiness gate: if pub groups change before BGP is Up, UpdateGroups must +// not touch the not-yet-started heartbeat (the real sender would panic on a nil +// conn), and the eventual start must use the updated group set. +func TestMulticastService_HeartbeatGroupChangeBeforeBGPUp(t *testing.T) { + mockBgp := &MockBgpServer{} + mockNetlink := &MockNetlink{} + mockPim := &MockPIMServer{} + mockHeartbeat := &MockHeartbeatSender{} + mockBgp.SetStatus(bgp.Session{SessionStatus: bgp.SessionStatusPending}) + + svc, err := manager.CreateService(api.UserTypeMulticast, mockBgp, mockNetlink, mockPim, mockHeartbeat) + if err != nil { + t.Fatalf("failed to create service: %v", err) + } + pr := &api.ProvisionRequest{ + UserType: api.UserTypeMulticast, + TunnelSrc: net.IPv4(1, 1, 1, 1), + TunnelDst: net.IPv4(2, 2, 2, 2), + MulticastPubGroups: []net.IP{{239, 0, 0, 1}}, + TunnelNet: &net.IPNet{IP: net.IPv4(169, 254, 0, 0), Mask: net.IPMask{255, 255, 255, 254}}, + DoubleZeroIP: net.IPv4(7, 7, 7, 7), + DoubleZeroPrefixes: []*net.IPNet{}, + BgpLocalAsn: 65000, + BgpRemoteAsn: 65001, + } + if err := svc.Setup(pr); err != nil { + t.Fatalf("unexpected error: %v", err) + } + t.Cleanup(func() { _ = svc.Teardown() }) + + gu, ok := svc.(interface { + UpdateGroups(*api.ProvisionRequest) error + }) + if !ok { + t.Fatal("service does not implement UpdateGroups") + } + + // Groups change while BGP is still down. + newPR := &api.ProvisionRequest{ + UserType: api.UserTypeMulticast, + TunnelSrc: net.IPv4(1, 1, 1, 1), + TunnelDst: net.IPv4(2, 2, 2, 2), + MulticastPubGroups: []net.IP{{239, 0, 0, 1}, {239, 0, 0, 3}}, + TunnelNet: &net.IPNet{IP: net.IPv4(169, 254, 0, 0), Mask: net.IPMask{255, 255, 255, 254}}, + DoubleZeroIP: net.IPv4(7, 7, 7, 7), + DoubleZeroPrefixes: []*net.IPNet{}, + BgpLocalAsn: 65000, + BgpRemoteAsn: 65001, + } + if err := gu.UpdateGroups(newPR); err != nil { + t.Fatalf("UpdateGroups failed: %v", err) + } + + if mockHeartbeat.wasStarted() { + t.Fatal("heartbeat started before BGP reached Up") + } + if mockHeartbeat.updateCalls() != 0 { + t.Fatalf("heartbeat.UpdateGroups called %d times before the heartbeat started; must be 0", mockHeartbeat.updateCalls()) + } + + // When BGP comes up, the heartbeat starts with the updated group set. + mockBgp.SetStatus(bgp.Session{SessionStatus: bgp.SessionStatusUp}) + waitForHeartbeatStarted(t, mockHeartbeat) + if got := mockHeartbeat.getGroups(); len(got) != 2 { + t.Fatalf("expected heartbeat to start with the updated 2 groups, got %d", len(got)) + } +}