diff --git a/CHANGELOG.md b/CHANGELOG.md index 87ea50ccbe..86f3a5c7ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ All notable changes to this project will be documented in this file. ### Changes +- Client + - Gate the multicast publisher heartbeat on the BGP session reaching Up - Onchain programs - Validate the device `mgmt_vrf` field against the account-code charset (`[A-Za-z0-9:_-]`) and a 32-byte length cap, matching the device `code` field. Empty (the default VRF) is still accepted. - Controller diff --git a/client/doublezerod/internal/services/multicast.go b/client/doublezerod/internal/services/multicast.go index 70228fe5be..08d7c18a36 100644 --- a/client/doublezerod/internal/services/multicast.go +++ b/client/doublezerod/internal/services/multicast.go @@ -1,10 +1,12 @@ package services import ( + "context" "errors" "fmt" "log/slog" "net" + "sync" "syscall" "time" @@ -26,8 +28,24 @@ type MulticastService struct { MulticastPubGroups []net.IP MulticastSubGroups []net.IP provisionReq *api.ProvisionRequest + heartbeatCancel context.CancelFunc + heartbeatWatcherWG sync.WaitGroup + + // mu guards the heartbeat lifecycle: heartbeatStarted records whether the + // readiness watcher has started the sender, and heartbeatGroups is the group + // set the sender should use (updated by UpdateGroups before the sender starts). + mu sync.Mutex + heartbeatStarted bool + heartbeatGroups []net.IP } +// publisherReadyPollInterval is how often the readiness watcher checks the BGP +// session state before starting the publisher heartbeat. It is a poll cadence, +// not a readiness timeout: the watcher rides the existing BGP session lifecycle +// (the session resolves to Up or a failure within bgp.BGPSessionTimeout) and is +// cancelled on Teardown. +const publisherReadyPollInterval = 250 * time.Millisecond + func (s *MulticastService) UserType() api.UserType { return api.UserTypeMulticast } func (s *MulticastService) ServiceType() ServiceType { return ServiceTypeMulticast } @@ -87,11 +105,6 @@ func (s *MulticastService) Setup(p *api.ProvisionRequest) error { return fmt.Errorf("error adding multicast route: %v", err) } } - - s.MulticastPubGroups = p.MulticastPubGroups - if err := s.heartbeat.Start(tun.Name, p.DoubleZeroIP, p.MulticastPubGroups, multicast.DefaultHeartbeatTTL, multicast.DefaultHeartbeatInterval); err != nil { - return fmt.Errorf("error starting heartbeat sender: %v", err) - } } if isSubscriber { @@ -149,9 +162,60 @@ func (s *MulticastService) Setup(p *api.ProvisionRequest) error { return fmt.Errorf("error adding peer: %v", err) } } + + // Defer the publisher heartbeat (which registers the source at the DZD) until + // the BGP session is Up, so the DZD never sees a source register before its + // tunnel/PIM plumbing is ready — the overlap that produces a wedged (S,G). + if isPublisher { + s.mu.Lock() + s.heartbeatGroups = p.MulticastPubGroups + s.mu.Unlock() + ctx, cancel := context.WithCancel(context.Background()) + s.heartbeatCancel = cancel + s.heartbeatWatcherWG.Add(1) + go func() { + defer s.heartbeatWatcherWG.Done() + s.startHeartbeatWhenReady(ctx, s.Tunnel.Name, p.DoubleZeroIP, s.Tunnel.RemoteOverlay) + }() + } + return nil } +// startHeartbeatWhenReady waits for the BGP session to reach Up, then starts the +// publisher heartbeat with the current group set. It rides the existing BGP +// session lifecycle: the session resolves to Up or a failure within +// bgp.BGPSessionTimeout, and a cycle that never reaches Up is retried when the +// manager reconcile re-provisions. It exits when the context is cancelled +// (Teardown), so no heartbeat start races with or follows a Close. +func (s *MulticastService) startHeartbeatWhenReady(ctx context.Context, iface string, srcIP net.IP, remoteOverlay net.IP) { + ticker := time.NewTicker(publisherReadyPollInterval) + defer ticker.Stop() + for { + if s.bgp.GetPeerStatus(remoteOverlay).SessionStatus == bgp.SessionStatusUp { + // Hold the lock across Start so a concurrent UpdateGroups can't touch + // the sender before it exists: UpdateGroups observes heartbeatStarted + // and either updates in place or leaves the group set for us to start + // with here. + s.mu.Lock() + if !s.heartbeatStarted { + if err := s.heartbeat.Start(iface, srcIP, s.heartbeatGroups, multicast.DefaultHeartbeatTTL, multicast.DefaultHeartbeatInterval); err != nil { + slog.Error("error starting heartbeat sender", "error", err) + } else { + s.heartbeatStarted = true + } + } + s.mu.Unlock() + return + } + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + } +} + func (s *MulticastService) Teardown() error { var errRemoveTunnel, errRemovePeer error if s.Tunnel == nil { @@ -159,6 +223,12 @@ func (s *MulticastService) Teardown() error { } if s.isPublisher() { + // Stop the readiness watcher and wait for it to exit before closing the + // heartbeat, so a pending start can't race with or follow the close. + if s.heartbeatCancel != nil { + s.heartbeatCancel() + } + s.heartbeatWatcherWG.Wait() if err := s.heartbeat.Close(); err != nil { slog.Error("error stopping heartbeat sender", "error", err) } @@ -261,10 +331,18 @@ func (s *MulticastService) UpdateGroups(newPR *api.ProvisionRequest) error { } } - // Restart heartbeat if publisher groups changed + // Apply publisher group changes to the heartbeat. If the readiness watcher + // hasn't started the sender yet (BGP not up), just record the new group set — + // the watcher will start with it; otherwise update the running sender in place. if isPublisher && (len(pubAdded) > 0 || len(pubRemoved) > 0) { - if err := s.heartbeat.UpdateGroups(newPR.MulticastPubGroups); err != nil { - return fmt.Errorf("error updating heartbeat groups: %v", err) + s.mu.Lock() + s.heartbeatGroups = newPR.MulticastPubGroups + started := s.heartbeatStarted + s.mu.Unlock() + if started { + if err := s.heartbeat.UpdateGroups(newPR.MulticastPubGroups); err != nil { + return fmt.Errorf("error updating heartbeat groups: %v", err) + } } } diff --git a/client/doublezerod/internal/services/services_test.go b/client/doublezerod/internal/services/services_test.go index 2de0960303..f151519e16 100644 --- a/client/doublezerod/internal/services/services_test.go +++ b/client/doublezerod/internal/services/services_test.go @@ -2,6 +2,7 @@ package services_test import ( "net" + "sync" "syscall" "testing" "time" @@ -17,6 +18,8 @@ import ( ) type MockBgpServer struct { + mu sync.Mutex + status bgp.Session deletedPeer net.IP addPeer *bgp.PeerConfig } @@ -31,9 +34,19 @@ func (m *MockBgpServer) DeletePeer(ip net.IP) error { m.deletedPeer = ip return nil } -func (m *MockBgpServer) GetPeerStatus(net.IP) bgp.Session { return bgp.Session{} } -func (m *MockBgpServer) Close() {} -func (m *MockBgpServer) GetPeers() []corebgp.PeerConfig { return []corebgp.PeerConfig{} } +func (m *MockBgpServer) GetPeerStatus(net.IP) bgp.Session { + m.mu.Lock() + defer m.mu.Unlock() + return m.status +} + +func (m *MockBgpServer) SetStatus(s bgp.Session) { + m.mu.Lock() + defer m.mu.Unlock() + m.status = s +} +func (m *MockBgpServer) Close() {} +func (m *MockBgpServer) GetPeers() []corebgp.PeerConfig { return []corebgp.PeerConfig{} } type MockNetlink struct { routes []*routing.Route @@ -124,15 +137,19 @@ func (m *MockPIMServer) Close() error { } type MockHeartbeatSender struct { - started bool - closed bool - iface string - srcIP net.IP - groups []net.IP - ttl int + mu sync.Mutex + started bool + closed bool + updateGroupsCalls int + iface string + srcIP net.IP + groups []net.IP + ttl int } func (m *MockHeartbeatSender) Start(iface string, srcIP net.IP, groups []net.IP, ttl int, interval time.Duration) error { + m.mu.Lock() + defer m.mu.Unlock() m.started = true m.iface = iface m.srcIP = srcIP @@ -142,17 +159,50 @@ func (m *MockHeartbeatSender) Start(iface string, srcIP net.IP, groups []net.IP, } func (m *MockHeartbeatSender) UpdateGroups(groups []net.IP) error { - m.closed = true - m.started = true + m.mu.Lock() + defer m.mu.Unlock() + m.updateGroupsCalls++ m.groups = groups return nil } func (m *MockHeartbeatSender) Close() error { + m.mu.Lock() + defer m.mu.Unlock() m.closed = true return nil } +func (m *MockHeartbeatSender) wasStarted() bool { + m.mu.Lock() + defer m.mu.Unlock() + return m.started +} + +func (m *MockHeartbeatSender) updateCalls() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.updateGroupsCalls +} + +func (m *MockHeartbeatSender) getGroups() []net.IP { + m.mu.Lock() + defer m.mu.Unlock() + return m.groups +} + +func waitForHeartbeatStarted(t *testing.T, m *MockHeartbeatSender) { + t.Helper() + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + if m.wasStarted() { + return + } + time.Sleep(20 * time.Millisecond) + } + t.Fatal("timed out waiting for heartbeat to start") +} + func TestServices(t *testing.T) { tests := []struct { name string @@ -721,11 +771,16 @@ func TestMulticastService_UpdateGroups_AddPubGroup(t *testing.T) { if err := svc.Setup(pr); err != nil { t.Fatalf("unexpected error: %v", err) } + t.Cleanup(func() { _ = svc.Teardown() }) + + // Bring BGP up and wait for the heartbeat to start so we exercise the + // running-heartbeat group-update path. + mockBgp.SetStatus(bgp.Session{SessionStatus: bgp.SessionStatusUp}) + waitForHeartbeatStarted(t, mockHeartbeat) + // Reset mock state after setup mockNetlink.routesAdded = nil mockNetlink.routesRemoved = nil - mockHeartbeat.started = false - mockHeartbeat.closed = false // Add a second pub group newPR := &api.ProvisionRequest{ @@ -765,15 +820,12 @@ func TestMulticastService_UpdateGroups_AddPubGroup(t *testing.T) { t.Fatalf("expected route src 7.7.7.7, got %v", addedRoute.Src) } - // Heartbeat should have been restarted - if !mockHeartbeat.closed { - t.Fatal("expected heartbeat to be closed during restart") + // Heartbeat groups should be updated in place (no teardown/restart). + if mockHeartbeat.updateCalls() != 1 { + t.Fatalf("expected heartbeat.UpdateGroups called once, got %d", mockHeartbeat.updateCalls()) } - if !mockHeartbeat.started { - t.Fatal("expected heartbeat to be restarted") - } - if len(mockHeartbeat.groups) != 2 { - t.Fatalf("expected heartbeat restarted with 2 groups, got %d", len(mockHeartbeat.groups)) + if len(mockHeartbeat.getGroups()) != 2 { + t.Fatalf("expected heartbeat updated with 2 groups, got %d", len(mockHeartbeat.getGroups())) } // ProvisionRequest should be updated @@ -811,6 +863,7 @@ func TestMulticastService_UpdateGroups_RemovePubGroup(t *testing.T) { if err := svc.Setup(pr); err != nil { t.Fatalf("unexpected error: %v", err) } + t.Cleanup(func() { _ = svc.Teardown() }) mockNetlink.routesAdded = nil mockNetlink.routesRemoved = nil @@ -948,3 +1001,130 @@ func TestMulticastService_DoubleTeardown(t *testing.T) { t.Fatalf("second Teardown() returned error: %v", err) } } + +// TestMulticastService_HeartbeatGatedOnBGPUp encodes the fix for the recurring +// wedged-(S,G) bug: the publisher source register (the heartbeat) must not start +// until the BGP session is Up, so the DZD never sees a source register before its +// plumbing is ready. On current main the heartbeat starts inline in Setup before +// the session is established, so this test fails. +func TestMulticastService_HeartbeatGatedOnBGPUp(t *testing.T) { + mockBgp := &MockBgpServer{} + mockNetlink := &MockNetlink{} + mockPim := &MockPIMServer{} + mockHeartbeat := &MockHeartbeatSender{} + + // Session starts Pending — the source must stay silent. + mockBgp.SetStatus(bgp.Session{SessionStatus: bgp.SessionStatusPending}) + + svc, err := manager.CreateService(api.UserTypeMulticast, mockBgp, mockNetlink, mockPim, mockHeartbeat) + if err != nil { + t.Fatalf("failed to create service: %v", err) + } + + pr := &api.ProvisionRequest{ + UserType: api.UserTypeMulticast, + TunnelSrc: net.IPv4(1, 1, 1, 1), + TunnelDst: net.IPv4(2, 2, 2, 2), + MulticastPubGroups: []net.IP{{239, 0, 0, 1}}, + TunnelNet: &net.IPNet{IP: net.IPv4(169, 254, 0, 0), Mask: net.IPMask{255, 255, 255, 254}}, + DoubleZeroIP: net.IPv4(7, 7, 7, 7), + DoubleZeroPrefixes: []*net.IPNet{}, + BgpLocalAsn: 65000, + BgpRemoteAsn: 65001, + } + if err := svc.Setup(pr); err != nil { + t.Fatalf("unexpected error: %v", err) + } + t.Cleanup(func() { _ = svc.Teardown() }) + + // While BGP is not Up, the heartbeat must stay silent even after the readiness + // watcher has had time to poll. + time.Sleep(300 * time.Millisecond) + if mockHeartbeat.wasStarted() { + t.Fatal("heartbeat started before BGP session reached Up") + } + + // Once BGP reaches Up, the heartbeat should start. + mockBgp.SetStatus(bgp.Session{SessionStatus: bgp.SessionStatusUp}) + + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + if mockHeartbeat.wasStarted() { + break + } + time.Sleep(20 * time.Millisecond) + } + if !mockHeartbeat.wasStarted() { + t.Fatal("heartbeat did not start after BGP session reached Up") + } +} + +// TestMulticastService_HeartbeatGroupChangeBeforeBGPUp guards the incremental +// group-update path (add/remove groups without a disconnect/reconnect) against +// the readiness gate: if pub groups change before BGP is Up, UpdateGroups must +// not touch the not-yet-started heartbeat (the real sender would panic on a nil +// conn), and the eventual start must use the updated group set. +func TestMulticastService_HeartbeatGroupChangeBeforeBGPUp(t *testing.T) { + mockBgp := &MockBgpServer{} + mockNetlink := &MockNetlink{} + mockPim := &MockPIMServer{} + mockHeartbeat := &MockHeartbeatSender{} + mockBgp.SetStatus(bgp.Session{SessionStatus: bgp.SessionStatusPending}) + + svc, err := manager.CreateService(api.UserTypeMulticast, mockBgp, mockNetlink, mockPim, mockHeartbeat) + if err != nil { + t.Fatalf("failed to create service: %v", err) + } + pr := &api.ProvisionRequest{ + UserType: api.UserTypeMulticast, + TunnelSrc: net.IPv4(1, 1, 1, 1), + TunnelDst: net.IPv4(2, 2, 2, 2), + MulticastPubGroups: []net.IP{{239, 0, 0, 1}}, + TunnelNet: &net.IPNet{IP: net.IPv4(169, 254, 0, 0), Mask: net.IPMask{255, 255, 255, 254}}, + DoubleZeroIP: net.IPv4(7, 7, 7, 7), + DoubleZeroPrefixes: []*net.IPNet{}, + BgpLocalAsn: 65000, + BgpRemoteAsn: 65001, + } + if err := svc.Setup(pr); err != nil { + t.Fatalf("unexpected error: %v", err) + } + t.Cleanup(func() { _ = svc.Teardown() }) + + gu, ok := svc.(interface { + UpdateGroups(*api.ProvisionRequest) error + }) + if !ok { + t.Fatal("service does not implement UpdateGroups") + } + + // Groups change while BGP is still down. + newPR := &api.ProvisionRequest{ + UserType: api.UserTypeMulticast, + TunnelSrc: net.IPv4(1, 1, 1, 1), + TunnelDst: net.IPv4(2, 2, 2, 2), + MulticastPubGroups: []net.IP{{239, 0, 0, 1}, {239, 0, 0, 3}}, + TunnelNet: &net.IPNet{IP: net.IPv4(169, 254, 0, 0), Mask: net.IPMask{255, 255, 255, 254}}, + DoubleZeroIP: net.IPv4(7, 7, 7, 7), + DoubleZeroPrefixes: []*net.IPNet{}, + BgpLocalAsn: 65000, + BgpRemoteAsn: 65001, + } + if err := gu.UpdateGroups(newPR); err != nil { + t.Fatalf("UpdateGroups failed: %v", err) + } + + if mockHeartbeat.wasStarted() { + t.Fatal("heartbeat started before BGP reached Up") + } + if mockHeartbeat.updateCalls() != 0 { + t.Fatalf("heartbeat.UpdateGroups called %d times before the heartbeat started; must be 0", mockHeartbeat.updateCalls()) + } + + // When BGP comes up, the heartbeat starts with the updated group set. + mockBgp.SetStatus(bgp.Session{SessionStatus: bgp.SessionStatusUp}) + waitForHeartbeatStarted(t, mockHeartbeat) + if got := mockHeartbeat.getGroups(); len(got) != 2 { + t.Fatalf("expected heartbeat to start with the updated 2 groups, got %d", len(got)) + } +}