Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ All notable changes to this project will be documented in this file.

### Changes

- Client
- Gate the multicast publisher heartbeat on the BGP session reaching Up
- Onchain programs
- Validate the device `mgmt_vrf` field against the account-code charset (`[A-Za-z0-9:_-]`) and a 32-byte length cap, matching the device `code` field. Empty (the default VRF) is still accepted.
- Controller
Expand Down
94 changes: 86 additions & 8 deletions client/doublezerod/internal/services/multicast.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package services

import (
"context"
"errors"
"fmt"
"log/slog"
"net"
"sync"
"syscall"
"time"

Expand All @@ -26,8 +28,24 @@ type MulticastService struct {
MulticastPubGroups []net.IP
MulticastSubGroups []net.IP
provisionReq *api.ProvisionRequest
heartbeatCancel context.CancelFunc
heartbeatWatcherWG sync.WaitGroup

// mu guards the heartbeat lifecycle: heartbeatStarted records whether the
// readiness watcher has started the sender, and heartbeatGroups is the group
// set the sender should use (updated by UpdateGroups before the sender starts).
mu sync.Mutex
heartbeatStarted bool
heartbeatGroups []net.IP
}

// publisherReadyPollInterval is how often the readiness watcher checks the BGP
// session state before starting the publisher heartbeat. It is a poll cadence,
// not a readiness timeout: the watcher rides the existing BGP session lifecycle
// (the session resolves to Up or a failure within bgp.BGPSessionTimeout) and is
// cancelled on Teardown.
const publisherReadyPollInterval = 250 * time.Millisecond

func (s *MulticastService) UserType() api.UserType { return api.UserTypeMulticast }
func (s *MulticastService) ServiceType() ServiceType { return ServiceTypeMulticast }

Expand Down Expand Up @@ -87,11 +105,6 @@ func (s *MulticastService) Setup(p *api.ProvisionRequest) error {
return fmt.Errorf("error adding multicast route: %v", err)
}
}

s.MulticastPubGroups = p.MulticastPubGroups
if err := s.heartbeat.Start(tun.Name, p.DoubleZeroIP, p.MulticastPubGroups, multicast.DefaultHeartbeatTTL, multicast.DefaultHeartbeatInterval); err != nil {
return fmt.Errorf("error starting heartbeat sender: %v", err)
}
}

if isSubscriber {
Expand Down Expand Up @@ -149,16 +162,73 @@ func (s *MulticastService) Setup(p *api.ProvisionRequest) error {
return fmt.Errorf("error adding peer: %v", err)
}
}

// Defer the publisher heartbeat (which registers the source at the DZD) until
// the BGP session is Up, so the DZD never sees a source register before its
// tunnel/PIM plumbing is ready — the overlap that produces a wedged (S,G).
if isPublisher {
s.mu.Lock()
s.heartbeatGroups = p.MulticastPubGroups
s.mu.Unlock()
ctx, cancel := context.WithCancel(context.Background())
s.heartbeatCancel = cancel
s.heartbeatWatcherWG.Add(1)
go func() {
defer s.heartbeatWatcherWG.Done()
s.startHeartbeatWhenReady(ctx, s.Tunnel.Name, p.DoubleZeroIP, s.Tunnel.RemoteOverlay)
}()
}

return nil
}

// startHeartbeatWhenReady waits for the BGP session to reach Up, then starts the
// publisher heartbeat with the current group set. It rides the existing BGP
// session lifecycle: the session resolves to Up or a failure within
// bgp.BGPSessionTimeout, and a cycle that never reaches Up is retried when the
// manager reconcile re-provisions. It exits when the context is cancelled
// (Teardown), so no heartbeat start races with or follows a Close.
func (s *MulticastService) startHeartbeatWhenReady(ctx context.Context, iface string, srcIP net.IP, remoteOverlay net.IP) {
ticker := time.NewTicker(publisherReadyPollInterval)
defer ticker.Stop()
for {
if s.bgp.GetPeerStatus(remoteOverlay).SessionStatus == bgp.SessionStatusUp {
// Hold the lock across Start so a concurrent UpdateGroups can't touch
// the sender before it exists: UpdateGroups observes heartbeatStarted
// and either updates in place or leaves the group set for us to start
// with here.
s.mu.Lock()
if !s.heartbeatStarted {
if err := s.heartbeat.Start(iface, srcIP, s.heartbeatGroups, multicast.DefaultHeartbeatTTL, multicast.DefaultHeartbeatInterval); err != nil {
slog.Error("error starting heartbeat sender", "error", err)
} else {
s.heartbeatStarted = true
}
}
s.mu.Unlock()
return
}
select {
case <-ctx.Done():
return
case <-ticker.C:
}
}
}

func (s *MulticastService) Teardown() error {
var errRemoveTunnel, errRemovePeer error
if s.Tunnel == nil {
return nil
}

if s.isPublisher() {
// Stop the readiness watcher and wait for it to exit before closing the
// heartbeat, so a pending start can't race with or follow the close.
if s.heartbeatCancel != nil {
s.heartbeatCancel()
}
s.heartbeatWatcherWG.Wait()
if err := s.heartbeat.Close(); err != nil {
slog.Error("error stopping heartbeat sender", "error", err)
}
Expand Down Expand Up @@ -261,10 +331,18 @@ func (s *MulticastService) UpdateGroups(newPR *api.ProvisionRequest) error {
}
}

// Restart heartbeat if publisher groups changed
// Apply publisher group changes to the heartbeat. If the readiness watcher
// hasn't started the sender yet (BGP not up), just record the new group set —
// the watcher will start with it; otherwise update the running sender in place.
if isPublisher && (len(pubAdded) > 0 || len(pubRemoved) > 0) {
if err := s.heartbeat.UpdateGroups(newPR.MulticastPubGroups); err != nil {
return fmt.Errorf("error updating heartbeat groups: %v", err)
s.mu.Lock()
s.heartbeatGroups = newPR.MulticastPubGroups
started := s.heartbeatStarted
s.mu.Unlock()
if started {
if err := s.heartbeat.UpdateGroups(newPR.MulticastPubGroups); err != nil {
return fmt.Errorf("error updating heartbeat groups: %v", err)
}
}
}

Expand Down
Loading
Loading