diff --git a/cmd/satellite/main.go b/cmd/satellite/main.go index 22860318..5be2999a 100644 --- a/cmd/satellite/main.go +++ b/cmd/satellite/main.go @@ -220,7 +220,7 @@ func loadRESTConfig() (*rest.Config, error) { // satellite.ManagerFactory signature. func mgrFactory(ready *readyState, logger *slog.Logger, ueventListener controllers.UeventNotifier) satellite.ManagerFactory { return func(restCfg *rest.Config, nodeName, probeAddr string, rec *satellite.Reconciler) (manager.Manager, error) { - mgr, err := controllers.NewManager(restCfg, controllers.Config{ + mgr, err := controllers.NewManager(restCfg, &controllers.Config{ NodeName: nodeName, Apply: rec, Exec: storage.RealExec{}, diff --git a/pkg/rest/autoplace.go b/pkg/rest/autoplace.go index 65c3eaa8..511575e8 100644 --- a/pkg/rest/autoplace.go +++ b/pkg/rest/autoplace.go @@ -1138,7 +1138,10 @@ func (s *Server) applyBug156AutoTiebreakerOptOut( // decision happens out-of-band in the controller — the REST handler // can't suppress it inline. func (s *Server) stampAutoTiebreakerOptOut(ctx context.Context, rdName string) error { - const propKey = "DrbdOptions/AutoAddQuorumTiebreaker" + const ( + propKey = "DrbdOptions/AutoAddQuorumTiebreaker" + propValueOff = "false" + ) // Bug 205: typed-Patch via PatchResourceDefinitionSpec — the // closure re-runs on every conflict against the live RD, so @@ -1146,7 +1149,7 @@ func (s *Server) stampAutoTiebreakerOptOut(ctx context.Context, rdName string) e // add) converge with the tiebreaker opt-out instead of being // lost by a stale-wire-snapshot replay. err := s.Store.ResourceDefinitions().PatchResourceDefinitionSpec(ctx, rdName, func(live *apiv1.ResourceDefinition) error { - if live.Props != nil && live.Props[propKey] == "false" { + if live.Props != nil && live.Props[propKey] == propValueOff { return nil } @@ -1154,7 +1157,7 @@ func (s *Server) stampAutoTiebreakerOptOut(ctx context.Context, rdName string) e live.Props = map[string]string{} } - live.Props[propKey] = "false" + live.Props[propKey] = propValueOff return nil }) diff --git a/pkg/rest/cache_invalidation_bug_124_test.go b/pkg/rest/cache_invalidation_bug_124_test.go index 6b8be495..caa5cd18 100644 --- a/pkg/rest/cache_invalidation_bug_124_test.go +++ b/pkg/rest/cache_invalidation_bug_124_test.go @@ -167,10 +167,14 @@ func (l *laggingRDs) Delete(ctx context.Context, name string) error { lag := l.lag inner := l.inner + // Bug 124 test helper: detached goroutine survives the request + // context cancel, so propagate a fresh non-cancellable context + // derived from the caller's value chain (contextcheck-friendly). + bgCtx := context.WithoutCancel(ctx) go func() { time.Sleep(lag) - _ = inner.Delete(context.Background(), name) + _ = inner.Delete(bgCtx, name) }() return nil diff --git a/pkg/rest/node_lifecycle.go b/pkg/rest/node_lifecycle.go index 8de51d12..b92fe8ab 100644 --- a/pkg/rest/node_lifecycle.go +++ b/pkg/rest/node_lifecycle.go @@ -264,7 +264,17 @@ func (s *Server) handleNodeEvacuateMulti(w http.ResponseWriter, r *http.Request) // stdlib, not in every call site (goconst flags package-wide // `"true"` repetition above its threshold). func isForce(r *http.Request) bool { - v, _ := strconv.ParseBool(r.URL.Query().Get("force")) + return queryFlag(r, "force") +} + +// queryFlag centralises every `?=true` boolean-toggle check +// in the REST surface. Routed through strconv.ParseBool so the +// literal `"true"` lives in stdlib (goconst flags package-wide +// `"true"` repetition above its threshold). Callers: isForce, +// handleResourceActivate (?reallocate-port), handleRDsList +// (?with_volume_definitions), handleResourceToggleDisk (?cancel). +func queryFlag(r *http.Request, name string) bool { + v, _ := strconv.ParseBool(r.URL.Query().Get(name)) return v } diff --git a/pkg/rest/nodes.go b/pkg/rest/nodes.go index d7b6f5b1..7983b6e4 100644 --- a/pkg/rest/nodes.go +++ b/pkg/rest/nodes.go @@ -867,6 +867,7 @@ func (s *Server) handleNodePropDelete(w http.ResponseWriter, r *http.Request) { func (s *Server) handleNodeDelete(w http.ResponseWriter, r *http.Request) { name := r.PathValue("node") force := isForce(r) + ctx := r.Context() // Bug 179: `?force=true` cascade-deletes every referencing // Resource + StoragePool CRD before dropping the Node — same @@ -874,7 +875,7 @@ func (s *Server) handleNodeDelete(w http.ResponseWriter, r *http.Request) { // force-delete would leave orphan SP CRDs pointing at a deleted // Node, which is precisely the symptom Bug 179 closed. if force { - err := s.cascadeOrphansForLostNode(r.Context(), name) + err := s.cascadeOrphansForLostNode(ctx, name) if err != nil { writeStoreError(w, err) @@ -891,10 +892,10 @@ func (s *Server) handleNodeDelete(w http.ResponseWriter, r *http.Request) { return s.refuseNodeDeleteIfReferenced(w, r, name) }, capture: func() (apiv1.Node, bool) { - return s.captureNode(r.Context(), name) + return s.captureNode(ctx, name) }, remove: func() error { - return s.Store.Nodes().Delete(r.Context(), name) + return s.Store.Nodes().Delete(ctx, name) }, rolledBackIfRaced: func(captured apiv1.Node, capturedOK bool) bool { if force || !capturedOK { diff --git a/pkg/rest/resource_adjust.go b/pkg/rest/resource_adjust.go index 047ae417..7d8d4681 100644 --- a/pkg/rest/resource_adjust.go +++ b/pkg/rest/resource_adjust.go @@ -94,7 +94,7 @@ func (s *Server) registerResourceLifecycle(mux *http.ServeMux) { // against a collision. Operators opt in only when they actively want // the port to be reshuffled. func (s *Server) handleResourceActivate(w http.ResponseWriter, r *http.Request) { - if r.URL.Query().Get("reallocate-port") == "true" { + if queryFlag(r, "reallocate-port") { rdName := r.PathValue("rd") node := r.PathValue("node") diff --git a/pkg/rest/resource_definitions.go b/pkg/rest/resource_definitions.go index 977148a2..ce134a8c 100644 --- a/pkg/rest/resource_definitions.go +++ b/pkg/rest/resource_definitions.go @@ -78,7 +78,7 @@ func (s *Server) handleRDList(w http.ResponseWriter, r *http.Request) { // their VDs inlined under `volume_definitions`. Without this // handling, `vd l` renders an empty table even when VDs exist // (the Python CLI never falls back to per-RD GETs). - if r.URL.Query().Get("with_volume_definitions") == "true" { + if queryFlag(r, "with_volume_definitions") { for i := range rds { vds, vdErr := s.Store.VolumeDefinitions().List(r.Context(), rds[i].Name) if vdErr != nil { diff --git a/pkg/rest/resource_groups.go b/pkg/rest/resource_groups.go index 77cd6a64..2a05362b 100644 --- a/pkg/rest/resource_groups.go +++ b/pkg/rest/resource_groups.go @@ -599,16 +599,17 @@ func mergeRGProps(existing, patch *apiv1.ResourceGroup) { // but RDs still pointing at it) can never arise. func (s *Server) handleRGDelete(w http.ResponseWriter, r *http.Request) { name := r.PathValue("rg") + ctx := r.Context() (&deleteWithRollback[apiv1.ResourceGroup]{ refuseIfReferenced: func() bool { return s.refuseRGDeleteIfReferenced(w, r, name) }, capture: func() (apiv1.ResourceGroup, bool) { - return s.captureResourceGroup(r.Context(), name) + return s.captureResourceGroup(ctx, name) }, remove: func() error { - return s.Store.ResourceGroups().Delete(r.Context(), name) + return s.Store.ResourceGroups().Delete(ctx, name) }, rolledBackIfRaced: func(captured apiv1.ResourceGroup, capturedOK bool) bool { if !capturedOK { diff --git a/pkg/rest/resource_toggle_disk.go b/pkg/rest/resource_toggle_disk.go index aadf3127..5ef2a0b3 100644 --- a/pkg/rest/resource_toggle_disk.go +++ b/pkg/rest/resource_toggle_disk.go @@ -111,7 +111,7 @@ func (s *Server) handleResourceToggleDisk(w http.ResponseWriter, r *http.Request // the reconciler does that itself as the last step of the // rollback so an external observer sees DISKLESS reappear only // AFTER drbdadm down + storage Delete succeed. - if r.URL.Query().Get("cancel") == "true" { + if queryFlag(r, "cancel") { err := s.Store.Resources().PatchResourceSpec(r.Context(), rdName, node, func(res *apiv1.Resource) error { res.ToggleDiskCancel = true diff --git a/pkg/rest/server.go b/pkg/rest/server.go index 6f9ab0b1..8b808a73 100644 --- a/pkg/rest/server.go +++ b/pkg/rest/server.go @@ -154,17 +154,6 @@ func (s *Server) SetResolveHost(fn resolveHostFunc) resolveHostFunc { return prev } -// lookupHost dispatches to s.resolveHost if set, else the package -// default. Hoisted off-handler so the production handler stays -// readable and the test seam is explicit. -func (s *Server) lookupHost(ctx context.Context, host string) ([]string, error) { - if s.resolveHost != nil { - return s.resolveHost(ctx, host) - } - - return defaultResolveHost(ctx, host) -} - // NeedLeaderElection reports whether the server requires leader election. // REST is read-mostly today and safe to run on every replica; once we // introduce write-paths that need a single leader we'll change this to true. @@ -250,6 +239,17 @@ func (s *Server) Start(ctx context.Context) error { } } +// lookupHost dispatches to s.resolveHost if set, else the package +// default. Hoisted off-handler so the production handler stays +// readable and the test seam is explicit. +func (s *Server) lookupHost(ctx context.Context, host string) ([]string, error) { + if s.resolveHost != nil { + return s.resolveHost(ctx, host) + } + + return defaultResolveHost(ctx, host) +} + // buildMux registers every endpoint on a fresh ServeMux. Pulled out // of Start to keep the latter under the funlen budget — Start now // only handles lifecycle (listener + shutdown), this owns routing. diff --git a/pkg/rest/storage_pools.go b/pkg/rest/storage_pools.go index 12cd12c0..f7631dd8 100644 --- a/pkg/rest/storage_pools.go +++ b/pkg/rest/storage_pools.go @@ -161,6 +161,7 @@ func (s *Server) handleNodeStoragePoolDelete(w http.ResponseWriter, r *http.Requ node := r.PathValue("node") pool := r.PathValue("pool") force := isForce(r) + ctx := r.Context() (&deleteWithRollback[apiv1.StoragePool]{ refuseIfReferenced: func() bool { @@ -180,10 +181,10 @@ func (s *Server) handleNodeStoragePoolDelete(w http.ResponseWriter, r *http.Requ return s.refuseSPDeleteIfReferenced(w, r, node, pool) }, capture: func() (apiv1.StoragePool, bool) { - return s.captureStoragePool(r.Context(), node, pool) + return s.captureStoragePool(ctx, node, pool) }, remove: func() error { - return s.Store.StoragePools().Delete(r.Context(), node, pool) + return s.Store.StoragePools().Delete(ctx, node, pool) }, rolledBackIfRaced: func(captured apiv1.StoragePool, capturedOK bool) bool { // `?force=true` callers opt past this check too. diff --git a/pkg/rest/volume_definitions.go b/pkg/rest/volume_definitions.go index a4a007fd..8a80d970 100644 --- a/pkg/rest/volume_definitions.go +++ b/pkg/rest/volume_definitions.go @@ -445,6 +445,17 @@ const minVolumeDefinitionSizeKib int64 = 4 * 1024 // retry loop. const maxVolumeDefinitionSizeKib int64 = 16 * 1024 * 1024 * 1024 +// ErrVolumeSizeBelowMinimum is the sentinel for Bug 155's lower-bound +// rejection (size_kib < minVolumeDefinitionSizeKib). Wrapped with +// %w + a sizeKib / bound detail by validateVDSize; static-error +// requirement is err113. +var ErrVolumeSizeBelowMinimum = errors.New("size_kib below minimum") + +// ErrVolumeSizeAboveMaximum is the sentinel for Bug 155's upper-bound +// rejection (size_kib > maxVolumeDefinitionSizeKib). See +// ErrVolumeSizeBelowMinimum for the rationale. +var ErrVolumeSizeAboveMaximum = errors.New("size_kib above maximum") + // validateVDSize returns nil when the requested size_kib is within // the accepted bounds [minVolumeDefinitionSizeKib, // maxVolumeDefinitionSizeKib] (Bug 155). Otherwise it returns a @@ -453,16 +464,16 @@ const maxVolumeDefinitionSizeKib int64 = 16 * 1024 * 1024 * 1024 func validateVDSize(sizeKib int64) error { if sizeKib < minVolumeDefinitionSizeKib { return fmt.Errorf( - "size_kib=%d below minimum %d KiB (DRBD reserves ~32 KiB of "+ + "%w: size_kib=%d below minimum %d KiB (DRBD reserves ~32 KiB of "+ "metadata per peer; backing layers add alignment on top)", - sizeKib, minVolumeDefinitionSizeKib, + ErrVolumeSizeBelowMinimum, sizeKib, minVolumeDefinitionSizeKib, ) } if sizeKib > maxVolumeDefinitionSizeKib { return fmt.Errorf( - "size_kib=%d above maximum %d KiB (DRBD's per-device hard ceiling)", - sizeKib, maxVolumeDefinitionSizeKib, + "%w: size_kib=%d above maximum %d KiB (DRBD's per-device hard ceiling)", + ErrVolumeSizeAboveMaximum, sizeKib, maxVolumeDefinitionSizeKib, ) } diff --git a/pkg/satellite/controllers/discovered_storage.go b/pkg/satellite/controllers/discovered_storage.go index 175dedc3..3e0dec87 100644 --- a/pkg/satellite/controllers/discovered_storage.go +++ b/pkg/satellite/controllers/discovered_storage.go @@ -142,25 +142,8 @@ func (d *DiscoveredStorageRunnable) Start(ctx context.Context) error { logger := log.FromContext(ctx).WithName("discovered-storage").WithValues("node", d.NodeName) - err := d.tick(ctx, logger) - if err != nil { - logger.Error(err, "initial discovered-storage tick") - } - - ticker := time.NewTicker(period) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return nil - case <-ticker.C: - err = d.tick(ctx, logger) - if err != nil { - logger.Error(err, "discovered-storage tick") - } - } - } + return runPeriodicTick(ctx, period, logger, d.tick, + "initial discovered-storage tick", "discovered-storage tick") } // RegisterWithManager adds the runnable to mgr. Symmetrical with diff --git a/pkg/satellite/controllers/manager.go b/pkg/satellite/controllers/manager.go index 9688edad..f8bed6ad 100644 --- a/pkg/satellite/controllers/manager.go +++ b/pkg/satellite/controllers/manager.go @@ -79,13 +79,10 @@ func init() { // so tests can validate the registration + filter-predicate // plumbing independently of the agent's gRPC-server-still- // running mainline. -func NewManager(restCfg *rest.Config, cfg Config) (manager.Manager, error) { - if cfg.NodeName == "" { - return nil, errors.New("controllers: NodeName is required") - } - - if cfg.Apply == nil { - return nil, errors.New("controllers: Apply is required") +func NewManager(restCfg *rest.Config, cfg *Config) (manager.Manager, error) { + err := validateConfig(cfg) + if err != nil { + return nil, err } // Bug 285: bound the manager's stop sequence so a wedged @@ -138,29 +135,29 @@ func NewManager(restCfg *rest.Config, cfg Config) (manager.Manager, error) { // the caller left the field nil; unit tests that construct // reconcilers directly can supply their own channel or leave // the field nil to short-circuit both ends. - ensureWiredDefaults(&cfg) + ensureWiredDefaults(cfg) - err = (&ResourceReconciler{Config: cfg, Client: mgr.GetClient()}).SetupWithManager(mgr) + err = (&ResourceReconciler{Config: *cfg, Client: mgr.GetClient()}).SetupWithManager(mgr) if err != nil { return nil, errors.Wrap(err, "setup ResourceReconciler") } - err = (&ResourceDefinitionReconciler{Config: cfg, Client: mgr.GetClient()}).SetupWithManager(mgr) + err = (&ResourceDefinitionReconciler{Config: *cfg, Client: mgr.GetClient()}).SetupWithManager(mgr) if err != nil { return nil, errors.Wrap(err, "setup ResourceDefinitionReconciler") } - err = (&SnapshotReconciler{Config: cfg, Client: mgr.GetClient()}).SetupWithManager(mgr) + err = (&SnapshotReconciler{Config: *cfg, Client: mgr.GetClient()}).SetupWithManager(mgr) if err != nil { return nil, errors.Wrap(err, "setup SnapshotReconciler") } - err = (&StoragePoolReconciler{Config: cfg, Client: mgr.GetClient()}).SetupWithManager(mgr) + err = (&StoragePoolReconciler{Config: *cfg, Client: mgr.GetClient()}).SetupWithManager(mgr) if err != nil { return nil, errors.Wrap(err, "setup StoragePoolReconciler") } - err = (&PhysicalDeviceReconciler{Config: cfg, Client: mgr.GetClient()}).SetupWithManager(mgr) + err = (&PhysicalDeviceReconciler{Config: *cfg, Client: mgr.GetClient()}).SetupWithManager(mgr) if err != nil { return nil, errors.Wrap(err, "setup PhysicalDeviceReconciler") } @@ -176,6 +173,22 @@ func NewManager(restCfg *rest.Config, cfg Config) (manager.Manager, error) { return mgr, nil } +// validateConfig sanity-checks the NewManager input — keeps the +// constructor itself under the funlen budget by hoisting the +// three-required-field gate out of band. +func validateConfig(cfg *Config) error { + switch { + case cfg == nil: + return errors.New("controllers: Config is required") + case cfg.NodeName == "": + return errors.New("controllers: NodeName is required") + case cfg.Apply == nil: + return errors.New("controllers: Apply is required") + } + + return nil +} + // wireConditionStampers injects the satellite-side Status-Condition // stampers (Phase 11.3) onto the Apply chain. Each stamper owns a // distinct Condition `type` so SSA's listMap merge keeps the writers @@ -186,7 +199,7 @@ func NewManager(restCfg *rest.Config, cfg Config) (manager.Manager, error) { // its own helper so NewManager stays under the funlen budget — the // clearer is the same shape as the Stampers (one apiserver writer // per satellite-side reconciler hook). -func wireConditionStampers(mgr manager.Manager, cfg Config) { +func wireConditionStampers(mgr manager.Manager, cfg *Config) { wireMetadataCreatedStamper(mgr, cfg) wireFilesystemFormattedStamper(mgr, cfg) wireSkipDiskClearer(mgr, cfg) @@ -214,7 +227,7 @@ func ensureWiredDefaults(cfg *Config) { // Pulled out of NewManager to keep that function under the funlen // budget — Scenario 5.34 added the third runnable and the // inline chain tipped over the limit. -func addBackgroundRunnables(mgr manager.Manager, cfg Config) error { +func addBackgroundRunnables(mgr manager.Manager, cfg *Config) error { err := mgr.Add(&ObserverRunnable{ Client: mgr.GetClient(), Exec: cfg.Exec, @@ -289,7 +302,7 @@ func addBackgroundRunnables(mgr manager.Manager, cfg Config) error { // startup backfill runnable. Pulled out of addBackgroundRunnables // to keep that function under the funlen budget — the bookkeeping // for one more runnable nudges it over the limit. -func registerMetadataCreatedBackfill(mgr manager.Manager, cfg Config) error { +func registerMetadataCreatedBackfill(mgr manager.Manager, cfg *Config) error { err := (&MetadataCreatedBackfillRunnable{ Client: mgr.GetClient(), Adm: drbd.NewAdm(cfg.Exec), @@ -309,7 +322,7 @@ func registerMetadataCreatedBackfill(mgr manager.Manager, cfg Config) error { // fetcher needs the manager's cached client, which is why it ships // here rather than at NewReconciler time. Pulled out of NewManager // to keep that function under the funlen budget. -func wireCrossNodeFetcher(mgr manager.Manager, cfg Config) { +func wireCrossNodeFetcher(mgr manager.Manager, cfg *Config) { cfg.Apply.SetCrossNodeFetcher(&SnapshotFetcher{ Client: mgr.GetClient(), NodeName: cfg.NodeName, @@ -322,7 +335,7 @@ func wireCrossNodeFetcher(mgr manager.Manager, cfg Config) { // succeeds. Mirrors `wireCrossNodeFetcher` — the stamper needs the // manager's cached client, which is why it lands here rather than at // NewReconciler time. Phase 11.3 Stage 1. -func wireMetadataCreatedStamper(mgr manager.Manager, cfg Config) { +func wireMetadataCreatedStamper(mgr manager.Manager, cfg *Config) { cfg.Apply.SetMetadataCreatedStamper(&MetadataCreatedStamper{ Client: mgr.GetClient(), }) @@ -333,7 +346,7 @@ func wireMetadataCreatedStamper(mgr manager.Manager, cfg Config) { // `FilesystemFormatted=True` Status Condition after every diskful // volume reports a filesystem (freshly mkfs'd or adopted via blkid). // Mirrors `wireMetadataCreatedStamper`. Phase 11.3 Stage 2. -func wireFilesystemFormattedStamper(mgr manager.Manager, cfg Config) { +func wireFilesystemFormattedStamper(mgr manager.Manager, cfg *Config) { cfg.Apply.SetFilesystemFormattedStamper(&FilesystemFormattedStamper{ Client: mgr.GetClient(), }) @@ -345,7 +358,7 @@ func wireFilesystemFormattedStamper(mgr manager.Manager, cfg Config) { // after a defensive stamp (Bug 278: Talos kernel upgrade reattach). // Mirrors `wireMetadataCreatedStamper` — the clearer needs the // manager's cached client which doesn't exist at NewReconciler time. -func wireSkipDiskClearer(mgr manager.Manager, cfg Config) { +func wireSkipDiskClearer(mgr manager.Manager, cfg *Config) { cfg.Apply.SetSkipDiskClearer(&SkipDiskClearer{ Client: mgr.GetClient(), NodeName: cfg.NodeName, diff --git a/pkg/satellite/controllers/physicaldevice_discovery.go b/pkg/satellite/controllers/physicaldevice_discovery.go index cf770aaa..82ec85f7 100644 --- a/pkg/satellite/controllers/physicaldevice_discovery.go +++ b/pkg/satellite/controllers/physicaldevice_discovery.go @@ -182,7 +182,7 @@ type UeventNotifier interface { // // Production caller: manager.go addBackgroundRunnables. Test // callers exist in physicaldevice_discovery_uevent_bug341_test.go. -func NewPhysicalDeviceDiscoveryRunnableFromConfig(cli client.Client, cfg Config) *PhysicalDeviceDiscoveryRunnable { +func NewPhysicalDeviceDiscoveryRunnableFromConfig(cli client.Client, cfg *Config) *PhysicalDeviceDiscoveryRunnable { return &PhysicalDeviceDiscoveryRunnable{ Client: cli, Exec: cfg.Exec, @@ -604,11 +604,37 @@ func (p *PhysicalDeviceDiscoveryRunnable) publishDeviceWithReason(ctx context.Co // Status.StableID for CRD-name determinism only. devicePath := "/dev/" + row.KName currentDevPath := "/dev/" + row.KName - rotational := row.Rotational desiredStatus := buildDiscoveryStatus(p.NodeName, stableID, devicePath, currentDevPath, row, &rotational, free, reason, message) + existing, ok := p.upsertDeviceCRD(ctx, logger, name) + if !ok { + return "", false + } + + existing.Status = desiredStatus + + err := p.Client.Status().Update(ctx, existing) + if err != nil { + logger.Error(err, "update PhysicalDevice status", "name", name) + + return "", false + } + + return name, true +} + +// upsertDeviceCRD ensures the PhysicalDevice CRD exists for `name` +// and that its NodeName label matches the local satellite. Returns +// the live object ready for a Status().Update, or (nil, false) on +// any apiserver error (already logged by the caller's logr). Pulled +// out of publishDeviceWithReason so the parent stays under the +// funlen budget; the get-or-create + label-sync chain has nothing +// device-specific in it. +func (p *PhysicalDeviceDiscoveryRunnable) upsertDeviceCRD( + ctx context.Context, logger logr.Logger, name string, +) (*blockstoriov1alpha1.PhysicalDevice, bool) { var existing blockstoriov1alpha1.PhysicalDevice err := p.Client.Get(ctx, client.ObjectKey{Name: name}, &existing) @@ -627,7 +653,7 @@ func (p *PhysicalDeviceDiscoveryRunnable) publishDeviceWithReason(ctx context.Co if err != nil && !apierrors.IsAlreadyExists(err) { logger.Error(err, "create PhysicalDevice", "name", name) - return "", false + return nil, false } // Re-fetch for the status update so we have the apiserver- @@ -637,12 +663,12 @@ func (p *PhysicalDeviceDiscoveryRunnable) publishDeviceWithReason(ctx context.Co if err != nil { logger.Error(err, "re-get after Create", "name", name) - return "", false + return nil, false } case err != nil: logger.Error(err, "get PhysicalDevice", "name", name) - return "", false + return nil, false } // Ensure the node label is set on existing CRDs that may have @@ -658,20 +684,11 @@ func (p *PhysicalDeviceDiscoveryRunnable) publishDeviceWithReason(ctx context.Co if err != nil { logger.Error(err, "update PhysicalDevice labels", "name", name) - return "", false + return nil, false } } - existing.Status = desiredStatus - - err = p.Client.Status().Update(ctx, &existing) - if err != nil { - logger.Error(err, "update PhysicalDevice status", "name", name) - - return "", false - } - - return name, true + return &existing, true } // buildDiscoveryStatus assembles the Status subresource the diff --git a/pkg/satellite/controllers/physicaldevice_discovery_uevent_bug341_test.go b/pkg/satellite/controllers/physicaldevice_discovery_uevent_bug341_test.go index 7178c701..49e489a6 100644 --- a/pkg/satellite/controllers/physicaldevice_discovery_uevent_bug341_test.go +++ b/pkg/satellite/controllers/physicaldevice_discovery_uevent_bug341_test.go @@ -274,7 +274,7 @@ func TestNewPhysicalDeviceDiscoveryRunnableFromConfigThreadsUevent_Bug341(t *tes UeventListener: notifier, } - runnable := controllers.NewPhysicalDeviceDiscoveryRunnableFromConfig(nil, cfg) + runnable := controllers.NewPhysicalDeviceDiscoveryRunnableFromConfig(nil, &cfg) if runnable == nil { t.Fatal("NewPhysicalDeviceDiscoveryRunnableFromConfig returned nil") diff --git a/pkg/satellite/controllers/resource.go b/pkg/satellite/controllers/resource.go index cae67b83..22f91a8e 100644 --- a/pkg/satellite/controllers/resource.go +++ b/pkg/satellite/controllers/resource.go @@ -462,63 +462,7 @@ func (r *ResourceReconciler) runApply(ctx context.Context, res *blockstoriov1alp // for the controller to stamp Status.DRBDNodeID/Port/Minor would // block apply forever — they never come. if rdNeedsDRBD(&rd) { - // Bug 342 v4: deterministic UID-mismatch eviction BEFORE - // the allocation gate. A rapid `r d ` + `r c ` - // produces a new peer Resource CR with a fresh - // metadata.uid while the dispatcher's .res-driven diff - // sees the same peer-name set and skips del-peer. The - // kernel slot for the old incarnation (with its stale GI - // epoch + PSK) wedges the new peer in Connecting forever. - // Comparing the peer's current UID against the local - // Status.AppliedPeerUIDs baseline catches that race - // deterministically — no timing dependency on kernel - // state observation, no Phase 2 vs Phase 3 scheduling - // races. Returned `evicted` map ({peerName: newUID}) gets - // stamped onto Status.AppliedPeerUIDs so subsequent - // reconciles don't re-evict the same peer in a loop. - evicted, evictErr := r.Config.Apply.EvictPeersByUIDMismatch( - ctx, - rd.Name, - desiredPeersFromCRDs(peers), - res.Status.AppliedPeerUIDs, - volNumsOf(&rd), - nil, // devices unknown at this layer; forget-peer falls back to nodeID skip - ) - if evictErr != nil { - logger.Error(evictErr, "EvictPeersByUIDMismatch failed; continuing") - } - - if len(evicted) > 0 { - if stampErr := r.stampAppliedPeerUIDs(ctx, res, evicted); stampErr != nil { - logger.Error(stampErr, "stampAppliedPeerUIDs failed; will retry next reconcile", - "evicted", evicted) - } - } - - // Bug 342 v3: prune zombie kernel slots BEFORE the - // allocation gate. The gate short-circuits the entire - // Apply chain when ANY peer's Status.DRBDNodeID is nil, - // which is exactly the state a Phase-3 relocate lands in - // — and the kernel-side slot from the previous incarnation - // (Connecting / StandAlone with no peer-device) would - // survive the entire wait window otherwise. Kernel-state - // cleanup doesn't need peer DRBDNodeID allocation, only - // the current expected peer-name set. Non-fatal: a Pass-1 - // del-peer failure logs and falls through to the gate so - // the next reconcile retries; the safety net is best- - // effort by design. - pruneErr := r.Config.Apply.PruneStaleKernelSlots( - ctx, - rd.Name, - expectedPeerNamesFor(res, peers), - volNumsOf(&rd), - nil, // devices unknown at this layer; forget-peer skipped per-vol - ) - if pruneErr != nil { - logger.Error(pruneErr, "PruneStaleKernelSlots failed; continuing to allocation gate") - } - - if waitResult, waitOK := r.waitForControllerAllocation(ctx, res, peers, logger); !waitOK { + if waitResult, waitOK := r.prepareDRBDApply(ctx, res, &rd, peers, logger); !waitOK { return waitResult, nil } } @@ -537,33 +481,8 @@ func (r *ResourceReconciler) runApply(ctx context.Context, res *blockstoriov1alp r.stampPostApply(ctx, res, &rd, results, anyFailed, logger) - // Bug 342 v4 adoption baseline: after a fully-successful apply, - // stamp Status.AppliedPeerUIDs for peers that have no baseline - // yet (fresh CRD / restore / migration path). Does NOT overwrite - // existing baselines — that's the eviction path's responsibility - // (Bug 342 v6): if applied[peer] is already set to an OLD UID, - // the EvictPeersByUIDMismatch may have deferred eviction this - // reconcile (peer node-id unresolvable). Overwriting the baseline - // here with current peer UIDs would mask the pending mismatch - // — next reconcile would see applied == current and never try - // the eviction again, leaving the stale kernel slot forever. - // Only EvictPeersByUIDMismatch should change an existing - // baseline, AFTER actually completing del-peer + forget-peer. if !anyFailed && rdNeedsDRBD(&rd) { - full := currentPeerUIDs(peers) - missing := make(map[string]string, len(full)) - - for name, uid := range full { - if _, ok := res.Status.AppliedPeerUIDs[name]; !ok { - missing[name] = uid - } - } - - if len(missing) > 0 { - if stampErr := r.stampAppliedPeerUIDs(ctx, res, missing); stampErr != nil { - logger.Error(stampErr, "post-apply AppliedPeerUIDs baseline stamp failed; will retry next reconcile") - } - } + r.stampAppliedPeerUIDsBaseline(ctx, res, peers, logger) } // Apply chain surfaces per-resource errors via results (e.g. @@ -594,6 +513,110 @@ func (r *ResourceReconciler) runApply(ctx context.Context, res *blockstoriov1alp return ctrl.Result{}, nil } +// prepareDRBDApply runs the DRBD-only pre-apply sequence: peer UID +// mismatch eviction (Bug 342 v4), stale kernel slot prune +// (Bug 342 v3), and the controller-allocation wait gate. Returns +// (result, false) when the caller should bail out and let the +// requeue handle it; (zero, true) when apply can proceed. Pulled +// out of runApply so it stays under the gocognit budget. +func (r *ResourceReconciler) prepareDRBDApply( + ctx context.Context, + res *blockstoriov1alpha1.Resource, + rd *blockstoriov1alpha1.ResourceDefinition, + peers []blockstoriov1alpha1.Resource, + logger logr.Logger, +) (ctrl.Result, bool) { + // Bug 342 v4: deterministic UID-mismatch eviction BEFORE the + // allocation gate. A rapid `r d ` + `r c ` produces + // a new peer Resource CR with a fresh metadata.uid while the + // dispatcher's .res-driven diff sees the same peer-name set and + // skips del-peer. The kernel slot for the old incarnation (with + // its stale GI epoch + PSK) wedges the new peer in Connecting + // forever. Comparing the peer's current UID against the local + // Status.AppliedPeerUIDs baseline catches that race + // deterministically — no timing dependency on kernel state + // observation, no Phase 2 vs Phase 3 scheduling races. Returned + // `evicted` map ({peerName: newUID}) gets stamped onto + // Status.AppliedPeerUIDs so subsequent reconciles don't + // re-evict the same peer in a loop. + evicted, evictErr := r.Config.Apply.EvictPeersByUIDMismatch( + ctx, + rd.Name, + desiredPeersFromCRDs(peers), + res.Status.AppliedPeerUIDs, + volNumsOf(rd), + nil, // devices unknown at this layer; forget-peer falls back to nodeID skip + ) + if evictErr != nil { + logger.Error(evictErr, "EvictPeersByUIDMismatch failed; continuing") + } + + if len(evicted) > 0 { + stampErr := r.stampAppliedPeerUIDs(ctx, res, evicted) + if stampErr != nil { + logger.Error(stampErr, "stampAppliedPeerUIDs failed; will retry next reconcile", + "evicted", evicted) + } + } + + // Bug 342 v3: prune zombie kernel slots BEFORE the allocation + // gate. The gate short-circuits the entire Apply chain when + // ANY peer's Status.DRBDNodeID is nil, which is exactly the + // state a Phase-3 relocate lands in — and the kernel-side slot + // from the previous incarnation (Connecting / StandAlone with + // no peer-device) would survive the entire wait window + // otherwise. Kernel-state cleanup doesn't need peer DRBDNodeID + // allocation, only the current expected peer-name set. + // Non-fatal: a Pass-1 del-peer failure logs and falls through + // to the gate so the next reconcile retries; the safety net + // is best-effort by design. + pruneErr := r.Config.Apply.PruneStaleKernelSlots( + ctx, + rd.Name, + expectedPeerNamesFor(res, peers), + volNumsOf(rd), + nil, // devices unknown at this layer; forget-peer skipped per-vol + ) + if pruneErr != nil { + logger.Error(pruneErr, "PruneStaleKernelSlots failed; continuing to allocation gate") + } + + return r.waitForControllerAllocation(ctx, res, peers, logger) +} + +// stampAppliedPeerUIDsBaseline writes the Bug 342 v4 adoption +// baseline after a fully-successful apply on a DRBD-stack RD: for +// every peer that has NO entry in Status.AppliedPeerUIDs yet +// (fresh CRD / restore / migration path), stamp the current UID. +// Does NOT overwrite existing baselines — that's the eviction +// path's responsibility (Bug 342 v6). Pulled out of runApply so +// the orchestrator stays under the gocyclo budget; the +// best-effort logging contract is unchanged. +func (r *ResourceReconciler) stampAppliedPeerUIDsBaseline( + ctx context.Context, + res *blockstoriov1alpha1.Resource, + peers []blockstoriov1alpha1.Resource, + logger logr.Logger, +) { + full := currentPeerUIDs(peers) + missing := make(map[string]string, len(full)) + + for name, uid := range full { + if _, ok := res.Status.AppliedPeerUIDs[name]; !ok { + missing[name] = uid + } + } + + if len(missing) == 0 { + return + } + + stampErr := r.stampAppliedPeerUIDs(ctx, res, missing) + if stampErr != nil { + logger.Error(stampErr, "post-apply AppliedPeerUIDs baseline stamp failed; will retry next reconcile") + } +} + // recordPerResourceFailures logs each per-resource Apply failure and // returns whether any of the results came back not-Ok. Extracted from // runApply so the orchestration stays under the funlen budget. diff --git a/pkg/satellite/controllers/resource_test.go b/pkg/satellite/controllers/resource_test.go index 4d619938..181dca61 100644 --- a/pkg/satellite/controllers/resource_test.go +++ b/pkg/satellite/controllers/resource_test.go @@ -68,18 +68,6 @@ func (p *orderingProvider) DeleteVolume(_ context.Context, _ storage.Volume) err return nil } -// deletesSnapshot returns a copy of the recorded step values so the -// test can assert ordering without holding the provider's mutex. -func (p *orderingProvider) deletesSnapshot() []int32 { - p.mu.Lock() - defer p.mu.Unlock() - - out := make([]int32, len(p.deletes)) - copy(out, p.deletes) - - return out -} - func (p *orderingProvider) ResizeVolume(_ context.Context, _ storage.Volume) error { return nil } func (p *orderingProvider) VolumeStatus(_ context.Context, vol storage.Volume) (storage.VolumeStatus, error) { @@ -94,6 +82,18 @@ func (p *orderingProvider) RestoreVolumeFromSnapshot(_ context.Context, _ storag return nil } +// deletesSnapshot returns a copy of the recorded step values so the +// test can assert ordering without holding the provider's mutex. +func (p *orderingProvider) deletesSnapshot() []int32 { + p.mu.Lock() + defer p.mu.Unlock() + + out := make([]int32, len(p.deletes)) + copy(out, p.deletes) + + return out +} + // newDeleteRaceFixture wires a fake-client Resource with a // DeletionTimestamp + SatelliteResourceFinalizer plus its parent RD // (so handleDelete's lookupVolumeNumbers finds the VolumeDefinitions). diff --git a/pkg/satellite/controllers/resource_toggle_disk.go b/pkg/satellite/controllers/resource_toggle_disk.go index 024a3c2b..feb55fe0 100644 --- a/pkg/satellite/controllers/resource_toggle_disk.go +++ b/pkg/satellite/controllers/resource_toggle_disk.go @@ -30,6 +30,7 @@ import ( blockstoriov1alpha1 "github.com/cozystack/blockstor/api/v1alpha1" apiv1 "github.com/cozystack/blockstor/pkg/api/v1" + "github.com/cozystack/blockstor/pkg/drbd" intent "github.com/cozystack/blockstor/pkg/satellite/intent" ) @@ -165,7 +166,7 @@ func isToggleDiskInFlight(res *blockstoriov1alpha1.Resource) bool { for i := range res.Status.Volumes { v := &res.Status.Volumes[i] - if v.DiskState != "UpToDate" { + if v.DiskState != string(drbd.DiskStateUpToDate) { allUpToDate = false break diff --git a/pkg/satellite/controllers/resource_toggle_disk_test.go b/pkg/satellite/controllers/resource_toggle_disk_test.go index 199b4e56..e8687d25 100644 --- a/pkg/satellite/controllers/resource_toggle_disk_test.go +++ b/pkg/satellite/controllers/resource_toggle_disk_test.go @@ -269,6 +269,7 @@ func TestToggleDiskIncrementsRetriesOnFailure(t *testing.T) { // away with a half-carved LV on disk). type recordingDeleteProvider struct { flakyCreateProvider + deleted int32 } diff --git a/pkg/satellite/controllers/runnable_common.go b/pkg/satellite/controllers/runnable_common.go new file mode 100644 index 00000000..a9c8167c --- /dev/null +++ b/pkg/satellite/controllers/runnable_common.go @@ -0,0 +1,66 @@ +/* +Copyright 2026 Cozystack contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "time" + + "github.com/go-logr/logr" +) + +// runPeriodicTick is the shared serve loop used by every +// fire-and-forget periodic runnable in the satellite-controllers +// package (DiscoveredStorageRunnable, OrphanSweeperRunnable, …). +// +// Shape: immediate first-tick on entry — controller-runtime has +// already waited for cache sync before calling Start, so we can +// safely fire the initial pass without warm-up logic. After that +// a time.Ticker drives the loop at `period`. Ticker errors are +// logged through `logger` under `tickErrLabel` so the audit grep +// for the per-runnable error label stays stable across loops. +// +// Centralised here so dupl doesn't flag the two byte-identical +// loops that previously lived inline in DiscoveredStorageRunnable +// and OrphanSweeperRunnable. +func runPeriodicTick( + ctx context.Context, + period time.Duration, + logger logr.Logger, + tick func(context.Context, logr.Logger) error, + initialErrLabel, tickErrLabel string, +) error { + err := tick(ctx, logger) + if err != nil { + logger.Error(err, initialErrLabel) + } + + ticker := time.NewTicker(period) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + err = tick(ctx, logger) + if err != nil { + logger.Error(err, tickErrLabel) + } + } + } +} diff --git a/pkg/satellite/controllers/storage_sweeper.go b/pkg/satellite/controllers/storage_sweeper.go index e69b4cc0..03bf7557 100644 --- a/pkg/satellite/controllers/storage_sweeper.go +++ b/pkg/satellite/controllers/storage_sweeper.go @@ -214,79 +214,124 @@ func (s *StorageOrphanSweeperRunnable) sweepOnce(ctx context.Context, logger log var deleted int for poolName, provider := range providers { - lister, ok := provider.(storage.VolumeLister) - if !ok { - // Provider can't enumerate (or backend kind doesn't - // support it); silently skip — the contract is opt-in. - continue + stop := s.sweepPool(ctx, logger, poolName, provider, owned, limit, &deleted) + if stop { + return nil } + } - refs, err := lister.ListVolumeNames(ctx) - if err != nil { - // Per-pool failure shouldn't sink the whole sweep — - // log and try the next pool. A real backend outage - // will surface on the next tick (and on real - // reconciles). - logger.Error(err, "list volumes on pool", "pool", poolName) + return nil +} - continue - } +// sweepPool reaps orphan volumes on one pool. Returns true when the +// per-cycle rate-limit budget is exhausted and the caller should +// stop iterating remaining pools; false when the pool was processed +// (with any errors logged). Pulled out of sweepOnce so the parent +// stays under the gocyclo budget; the per-volume orphan-classification +// chain has nothing pool-routing-specific in it. +func (s *StorageOrphanSweeperRunnable) sweepPool( + ctx context.Context, + logger logr.Logger, + poolName string, + provider storage.Provider, + owned map[string]struct{}, + limit int, + deleted *int, +) bool { + lister, ok := provider.(storage.VolumeLister) + if !ok { + // Provider can't enumerate (or backend kind doesn't + // support it); silently skip — the contract is opt-in. + return false + } - for _, ref := range refs { - ref.PoolName = poolName + refs, err := lister.ListVolumeNames(ctx) + if err != nil { + // Per-pool failure shouldn't sink the whole sweep — log + // and try the next pool. A real backend outage will + // surface on the next tick (and on real reconciles). + logger.Error(err, "list volumes on pool", "pool", poolName) - if _, ok := owned[ownedKey(ref.ResourceName, ref.VolumeNumber)]; ok { - continue - } + return false + } - // Wildcard match — a Resource CRD exists for this RD on - // this node but the parent RD has already been deleted - // (cascade out-of-order). The satellite finalizer is - // still responsible for cleanup; don't race it. - if _, ok := owned[ownedKey(ref.ResourceName, -1)]; ok { - continue - } + for _, ref := range refs { + ref.PoolName = poolName - if !hasStoragePrefix(ref.ResourceName) { - // Operator-owned volume that happens to share the - // pool. Leave it alone — see prefix allowlist - // rationale. - continue - } + stop := s.reapIfOrphan(ctx, logger, poolName, provider, owned, limit, deleted, ref) + if stop { + return true + } + } - if limit >= 0 && deleted >= limit { - logger.Info("storage sweep rate-limit hit; deferring remainder", - "limit", limit, "pool", poolName, - "deferred_resource", ref.ResourceName, - "deferred_volume", ref.VolumeNumber) + return false +} - return nil - } +// reapIfOrphan classifies a single (pool, resource, volume) row and +// runs DeleteVolume when it's a satellite-owned orphan past the +// allowlist gate. Returns true when the per-cycle rate-limit budget +// has been hit and the caller should stop scanning the rest of the +// pool. Pulled out of sweepPool so that loop body stays under the +// funlen / gocyclo budgets; the classification cascade is identical +// to the inlined version. +func (s *StorageOrphanSweeperRunnable) reapIfOrphan( + ctx context.Context, + logger logr.Logger, + poolName string, + provider storage.Provider, + owned map[string]struct{}, + limit int, + deleted *int, + ref storage.VolumeRef, +) bool { + if _, ok := owned[ownedKey(ref.ResourceName, ref.VolumeNumber)]; ok { + return false + } - logger.Info("orphan storage volume detected; running DeleteVolume", - "pool", poolName, "resource", ref.ResourceName, "volume", ref.VolumeNumber) - - delErr := provider.DeleteVolume(ctx, storage.Volume{ - PoolName: poolName, - ResourceName: ref.ResourceName, - VolumeNumber: ref.VolumeNumber, - }) - if delErr != nil { - // Per-volume failure shouldn't abort the cycle — - // next tick retries. We DON'T bump `deleted` so - // the rate-limit budget reflects successful - // reaps only. - logger.Error(delErr, "DeleteVolume on orphan", - "pool", poolName, "resource", ref.ResourceName, "volume", ref.VolumeNumber) - - continue - } + // Wildcard match — a Resource CRD exists for this RD on this + // node but the parent RD has already been deleted (cascade + // out-of-order). The satellite finalizer is still responsible + // for cleanup; don't race it. + if _, ok := owned[ownedKey(ref.ResourceName, -1)]; ok { + return false + } - deleted++ - } + if !hasStoragePrefix(ref.ResourceName) { + // Operator-owned volume that happens to share the pool. + // Leave it alone — see prefix allowlist rationale. + return false } - return nil + if limit >= 0 && *deleted >= limit { + logger.Info("storage sweep rate-limit hit; deferring remainder", + "limit", limit, "pool", poolName, + "deferred_resource", ref.ResourceName, + "deferred_volume", ref.VolumeNumber) + + return true + } + + logger.Info("orphan storage volume detected; running DeleteVolume", + "pool", poolName, "resource", ref.ResourceName, "volume", ref.VolumeNumber) + + delErr := provider.DeleteVolume(ctx, storage.Volume{ + PoolName: poolName, + ResourceName: ref.ResourceName, + VolumeNumber: ref.VolumeNumber, + }) + if delErr != nil { + // Per-volume failure shouldn't abort the cycle — next tick + // retries. We DON'T bump `deleted` so the rate-limit budget + // reflects successful reaps only. + logger.Error(delErr, "DeleteVolume on orphan", + "pool", poolName, "resource", ref.ResourceName, "volume", ref.VolumeNumber) + + return false + } + + *deleted++ + + return false } // shouldSkip mirrors OrphanSweeperRunnable.shouldSkip — checks the diff --git a/pkg/satellite/controllers/sweeper.go b/pkg/satellite/controllers/sweeper.go index 82d93f4b..07d8ae5a 100644 --- a/pkg/satellite/controllers/sweeper.go +++ b/pkg/satellite/controllers/sweeper.go @@ -257,25 +257,8 @@ func (s *OrphanSweeperRunnable) Start(ctx context.Context) error { // orphan classification cannot mis-fire against a half-warm // cache. ctx propagation in sweepOnce ensures a shutdown still // aborts any in-flight drbdsetup call. - err := s.sweepOnce(ctx, logger) - if err != nil { - logger.Error(err, "initial sweep") - } - - ticker := time.NewTicker(period) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return nil - case <-ticker.C: - err := s.sweepOnce(ctx, logger) - if err != nil { - logger.Error(err, "sweep cycle") - } - } - } + return runPeriodicTick(ctx, period, logger, s.sweepOnce, + "initial sweep", "sweep cycle") } // RegisterWithManager adds the sweeper to mgr alongside the diff --git a/pkg/satellite/peer_identity_cleanup.go b/pkg/satellite/peer_identity_cleanup.go index daedc892..99258620 100644 --- a/pkg/satellite/peer_identity_cleanup.go +++ b/pkg/satellite/peer_identity_cleanup.go @@ -20,8 +20,10 @@ import ( "context" "github.com/cockroachdb/errors" + "github.com/go-logr/logr" "sigs.k8s.io/controller-runtime/pkg/log" + "github.com/cozystack/blockstor/pkg/drbd" "github.com/cozystack/blockstor/pkg/satellite/intent" ) @@ -101,88 +103,117 @@ func (r *Reconciler) EvictPeersByUIDMismatch( var cleaned map[string]string for _, peer := range desiredPeers { - if peer.ResourceUID == "" { - // Peer's UID not yet known to the dispatcher - // (informer cache trail / fresh CRD just hit - // apiserver). Skip — next reconcile will retry - // with a populated UID. - continue + evicted, err := r.evictOnePeerByUIDMismatch(ctx, logger, rdName, peer, + appliedPeerUIDs, slots, devices) + if err != nil { + return cleaned, err } - last, hasLast := appliedPeerUIDs[peer.Name] - if !hasLast || last == peer.ResourceUID { - // No prior baseline (rollout window / first - // apply) OR baseline matches — nothing to evict. + if !evicted { continue } - // UID mismatch — force-evict the kernel slot for this - // peer. Forget-peer needs a node-id; prefer the kernel's - // observation, fall back to the dispatcher's. - nodeID := peer.NodeID - - if slot, ok := slots[peer.Name]; ok && slot.NodeID != 0 { - nodeID = slot.NodeID + if cleaned == nil { + cleaned = make(map[string]string) } - // Bug 342 v5: when neither the kernel-observed slot nor - // the K8s-allocated peer NodeID is available, DEFER - // eviction to a future reconcile. A del-peer without - // matching forget-peer drops the kernel connection but - // leaves stale per-volume GI/bitmap metadata; the - // subsequent new-peer handshake (after the relocated - // peer brings DRBD up with a fresh GI epoch) exposes - // the mismatch and the LOCAL stable peer regresses its - // own disk_state to Inconsistent / Outdated. Skipping - // this reconcile is safe: the next one (after kernel - // loads OR allocation lands) will see the same UID - // mismatch and try again with a resolvable node-id. - if nodeID == 0 { - logger.Info("UID mismatch detected but peer node-id unresolved — deferring eviction", - "peer", peer.Name, - "oldUID", last, - "newUID", peer.ResourceUID) + cleaned[peer.Name] = peer.ResourceUID + } - continue - } + return cleaned, nil +} + +// evictOnePeerByUIDMismatch performs the per-peer half of +// EvictPeersByUIDMismatch: classify whether the peer needs eviction, +// resolve the kernel-side node-id (preferring the kernel's +// observation), and run del-peer + per-volume forget-peer when so. +// +// Returns (true, nil) when this peer was evicted and the caller +// should record it in `cleaned`; (false, nil) when the peer was +// skipped (no UID known yet / no prior baseline / baseline matches +// / node-id unresolved); (false, err) on a fatal del-peer error. +// Pulled out of EvictPeersByUIDMismatch so the orchestrator stays +// under the gocyclo budget; the cascade is byte-identical. +func (r *Reconciler) evictOnePeerByUIDMismatch( + ctx context.Context, + logger logr.Logger, + rdName string, + peer intent.DesiredPeer, + appliedPeerUIDs map[string]string, + slots map[string]drbd.KernelSlot, + devices map[int32]string, +) (bool, error) { + if peer.ResourceUID == "" { + // Peer's UID not yet known to the dispatcher (informer + // cache trail / fresh CRD just hit apiserver). Skip — + // next reconcile will retry with a populated UID. + return false, nil + } - logger.Info("UID mismatch — evicting kernel slot for re-incarnated peer", + last, hasLast := appliedPeerUIDs[peer.Name] + if !hasLast || last == peer.ResourceUID { + // No prior baseline (rollout window / first apply) OR + // baseline matches — nothing to evict. + return false, nil + } + + // UID mismatch — force-evict the kernel slot for this peer. + // Forget-peer needs a node-id; prefer the kernel's observation, + // fall back to the dispatcher's. + nodeID := peer.NodeID + + if slot, ok := slots[peer.Name]; ok && slot.NodeID != 0 { + nodeID = slot.NodeID + } + + // Bug 342 v5: when neither the kernel-observed slot nor the + // K8s-allocated peer NodeID is available, DEFER eviction to a + // future reconcile. A del-peer without matching forget-peer + // drops the kernel connection but leaves stale per-volume + // GI/bitmap metadata; the subsequent new-peer handshake (after + // the relocated peer brings DRBD up with a fresh GI epoch) + // exposes the mismatch and the LOCAL stable peer regresses its + // own disk_state to Inconsistent / Outdated. Skipping this + // reconcile is safe: the next one (after kernel loads OR + // allocation lands) will see the same UID mismatch and try + // again with a resolvable node-id. + if nodeID == 0 { + logger.Info("UID mismatch detected but peer node-id unresolved — deferring eviction", "peer", peer.Name, "oldUID", last, - "newUID", peer.ResourceUID, - "nodeID", nodeID) + "newUID", peer.ResourceUID) - if err := r.cfg.Adm.DelPeer(ctx, rdName, peer.Name); err != nil { - return cleaned, errors.Wrapf(err, "drbdadm del-peer %s:%s", peer.Name, rdName) - } + return false, nil + } - // forget-peer is per-volume because v09 metadata lives - // in the per-volume block. Skip volumes without a device - // path (DISKLESS local replica — no metadata to clean). - // Skip when nodeID is zero (no resolvable id — leaves a - // slow-leak slot, recoverable later). - if nodeID != 0 { - for volNum, device := range devices { - if device == "" { - continue - } - - if forgetErr := r.cfg.Adm.ForgetPeer(ctx, rdName, volNum, device, nodeID); forgetErr != nil { - logger.Info("EvictPeersByUIDMismatch: forget-peer failed (non-fatal)", - "peer", peer.Name, - "vol", volNum, - "nodeID", nodeID, - "err", forgetErr.Error()) - } - } - } + logger.Info("UID mismatch — evicting kernel slot for re-incarnated peer", + "peer", peer.Name, + "oldUID", last, + "newUID", peer.ResourceUID, + "nodeID", nodeID) - if cleaned == nil { - cleaned = make(map[string]string) + err := r.cfg.Adm.DelPeer(ctx, rdName, peer.Name) + if err != nil { + return false, errors.Wrapf(err, "drbdadm del-peer %s:%s", peer.Name, rdName) + } + + // forget-peer is per-volume because v09 metadata lives in the + // per-volume block. Skip volumes without a device path + // (DISKLESS local replica — no metadata to clean). + for volNum, device := range devices { + if device == "" { + continue } - cleaned[peer.Name] = peer.ResourceUID + forgetErr := r.cfg.Adm.ForgetPeer(ctx, rdName, volNum, device, nodeID) + if forgetErr != nil { + logger.Info("EvictPeersByUIDMismatch: forget-peer failed (non-fatal)", + "peer", peer.Name, + "vol", volNum, + "nodeID", nodeID, + "err", forgetErr.Error()) + } } - return cleaned, nil + return true, nil } diff --git a/pkg/store/k8s/snapshots.go b/pkg/store/k8s/snapshots.go index fdce7844..e1dd57f1 100644 --- a/pkg/store/k8s/snapshots.go +++ b/pkg/store/k8s/snapshots.go @@ -118,57 +118,6 @@ func (s *snapshots) Get(ctx context.Context, rdName, snapName string) (apiv1.Sna return crdToWireSnapshot(&crd, parent), nil } -// getParentRD fetches the parent ResourceDefinition for a Snapshot, -// returning nil + the not-found error when the parent has already -// been deleted (orphan Snapshot — the view layer still has to render -// the snapshot row without panicking on nil-deref). -func (s *snapshots) getParentRD(ctx context.Context, rdName string) (*crdv1alpha1.ResourceDefinition, error) { - if rdName == "" { - return nil, nil //nolint:nilnil // empty name == no parent to look up - } - - var rd crdv1alpha1.ResourceDefinition - - err := s.c.Get(ctx, types.NamespacedName{Name: Name(rdName)}, &rd) - if err != nil { - if apierrors.IsNotFound(err) { - return nil, nil //nolint:nilnil // orphan snapshot — no parent - } - - return nil, errors.Wrapf(err, "get parent RD %q for Snapshot", rdName) - } - - return &rd, nil -} - -// collectParentRDs batches the parent-RD lookups for a List call so -// the view of 50 snapshots-on-the-same-RD doesn't trigger 50 separate -// API server GETs. Missing parents (orphan Snapshots) silently -// produce a nil entry in the map — the wire-shape conversion folds -// nil into empty maps. -func (s *snapshots) collectParentRDs( - ctx context.Context, snaps []crdv1alpha1.Snapshot, -) (map[string]*crdv1alpha1.ResourceDefinition, error) { - out := make(map[string]*crdv1alpha1.ResourceDefinition, len(snaps)) - - for i := range snaps { - rdName := snaps[i].Spec.ResourceDefinitionName - - if _, seen := out[rdName]; seen { - continue - } - - rd, err := s.getParentRD(ctx, rdName) - if err != nil { - return nil, err - } - - out[rdName] = rd - } - - return out, nil -} - func (s *snapshots) Create(ctx context.Context, in *apiv1.Snapshot) error { if in == nil { return errors.New("nil Snapshot") @@ -257,6 +206,57 @@ func (s *snapshots) Delete(ctx context.Context, rdName, snapName string) error { return nil } +// getParentRD fetches the parent ResourceDefinition for a Snapshot, +// returning nil + the not-found error when the parent has already +// been deleted (orphan Snapshot — the view layer still has to render +// the snapshot row without panicking on nil-deref). +func (s *snapshots) getParentRD(ctx context.Context, rdName string) (*crdv1alpha1.ResourceDefinition, error) { + if rdName == "" { + return nil, nil //nolint:nilnil // empty name == no parent to look up + } + + var rd crdv1alpha1.ResourceDefinition + + err := s.c.Get(ctx, types.NamespacedName{Name: Name(rdName)}, &rd) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, nil //nolint:nilnil // orphan snapshot — no parent + } + + return nil, errors.Wrapf(err, "get parent RD %q for Snapshot", rdName) + } + + return &rd, nil +} + +// collectParentRDs batches the parent-RD lookups for a List call so +// the view of 50 snapshots-on-the-same-RD doesn't trigger 50 separate +// API server GETs. Missing parents (orphan Snapshots) silently +// produce a nil entry in the map — the wire-shape conversion folds +// nil into empty maps. +func (s *snapshots) collectParentRDs( + ctx context.Context, snaps []crdv1alpha1.Snapshot, +) (map[string]*crdv1alpha1.ResourceDefinition, error) { + out := make(map[string]*crdv1alpha1.ResourceDefinition, len(snaps)) + + for i := range snaps { + rdName := snaps[i].Spec.ResourceDefinitionName + + if _, seen := out[rdName]; seen { + continue + } + + rd, err := s.getParentRD(ctx, rdName) + if err != nil { + return nil, err + } + + out[rdName] = rd + } + + return out, nil +} + // crdToWireSnapshot converts the Snapshot CRD into the wire DTO. // `parent` is the parent ResourceDefinition CRD at conversion time — // nil when the parent has been deleted (orphan Snapshot). The parent