diff --git a/CLAUDE.md b/CLAUDE.md index aaa46f9..08e07cc 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -76,7 +76,7 @@ make docker-push IMG= # Push container image - **SeiNode** creates StatefulSets (replicas=1), headless Services, and PVCs via server-side apply (fieldOwner: `seinode-controller`). - **Plan-driven reconciliation** — Both controllers use ordered task plans (stored in `.status.plan`) to drive lifecycle. Plans are built by `internal/planner/` (`ResolvePlan` for nodes, `ForGroup` for deployments), executed by `planner.Executor`, with individual tasks in `internal/task/`. The reconcile loop is: `ResolvePlan → persist plan → ExecutePlan`. See `internal/planner/doc.go` for the full plan lifecycle. - **Init plans** transition nodes from Pending → Running. They include infrastructure tasks (`ensure-data-pvc`, `apply-statefulset`, `apply-service`) followed by sidecar tasks (`configure-genesis`, `config-apply`, etc.). -- **NodeUpdate plans** roll out image changes on Running nodes. Built when `spec.image != status.currentImage`. Tasks: `apply-statefulset`, `apply-service`, `observe-image` (polls StatefulSet rollout, stamps `currentImage`), `mark-ready` (sidecar re-init). The planner sets `NodeUpdateInProgress` condition on creation and clears it on completion/failure. When no drift is detected, no plan is built — the node sits in steady state. +- **NodeUpdate plans** roll out image changes on Running nodes. Built when `spec.image != status.currentImage`. Tasks: `apply-statefulset`, `apply-service`, `replace-pod` (proactively deletes pods at the old StatefulSet revision so the rollout proceeds even when seid is intentionally unready, e.g. halted at a chain upgrade height), `observe-image` (polls StatefulSet rollout, stamps `currentImage`), `mark-ready` (sidecar re-init). The planner sets `NodeUpdateInProgress` condition on creation and clears it on completion/failure. When no drift is detected, no plan is built — the node sits in steady state. - **Atomic plan creation** — New plans are persisted before any tasks execute. The reconciler flushes the plan, then requeues. Execution starts on the next reconcile. This guarantees external observers see the plan before side effects occur. - **Condition ownership** — The planner owns all condition management on the owning resource. It sets conditions when creating plans (e.g., `NodeUpdateInProgress=True`) and when observing terminal plans (e.g., `NodeUpdateInProgress=False`). The executor does not set conditions — it only mutates plan/task state and phase transitions. - **Single-patch model** — All status mutations (plan state, conditions, phase, currentImage) accumulate in-memory during a reconcile and are flushed in a single `Status().Patch()` at the end. Tasks mutate owned resources (StatefulSets, Services, PVCs); the executor mutates plan state in-memory; the reconciler flushes once. diff --git a/cmd/main.go b/cmd/main.go index afc8c55..f854988 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -182,6 +182,7 @@ func main() { return buildSidecarClient(node) }, KubeClient: kc, + APIReader: mgr.GetAPIReader(), Scheme: mgr.GetScheme(), Resource: node, Platform: platformCfg, @@ -225,6 +226,7 @@ func main() { return buildSidecarClient(assemblerNode) }, KubeClient: kc, + APIReader: mgr.GetAPIReader(), Scheme: mgr.GetScheme(), Resource: group, Platform: platformCfg, diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 64c715e..7fbd2f1 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -28,12 +28,20 @@ rules: - "" resources: - persistentvolumes - - pods - secrets verbs: - get - list - watch +- apiGroups: + - "" + resources: + - pods + verbs: + - delete + - get + - list + - watch - apiGroups: - apps resources: diff --git a/internal/controller/node/controller.go b/internal/controller/node/controller.go index 99e6832..2efd804 100644 --- a/internal/controller/node/controller.go +++ b/internal/controller/node/controller.go @@ -58,7 +58,7 @@ type SeiNodeReconciler struct { // +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups="",resources=persistentvolumes,verbs=get;list;watch -// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch +// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;delete // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch // +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch diff --git a/internal/controller/node/plan_execution_test.go b/internal/controller/node/plan_execution_test.go index 939bca1..e541541 100644 --- a/internal/controller/node/plan_execution_test.go +++ b/internal/controller/node/plan_execution_test.go @@ -131,6 +131,7 @@ func newProgressionReconciler(t *testing.T, mock *mockSidecarClient, objs ...cli return task.ExecutionConfig{ BuildSidecarClient: func() (task.SidecarClient, error) { return mock, nil }, KubeClient: c, + APIReader: c, Scheme: s, Resource: node, Platform: platformtest.Config(), @@ -796,6 +797,7 @@ func TestReconcileInitializing_SidecarClientError_Requeues(t *testing.T) { return nil, fmt.Errorf("sidecar unavailable") }, KubeClient: c, + APIReader: c, Scheme: s, Resource: n, Platform: platformtest.Config(), diff --git a/internal/controller/node/reconciler_test.go b/internal/controller/node/reconciler_test.go index 6de50d2..6a135df 100644 --- a/internal/controller/node/reconciler_test.go +++ b/internal/controller/node/reconciler_test.go @@ -55,6 +55,7 @@ func newNodeReconciler(t *testing.T, objs ...client.Object) (*SeiNodeReconciler, return task.ExecutionConfig{ BuildSidecarClient: func() (task.SidecarClient, error) { return mock, nil }, KubeClient: c, + APIReader: c, Scheme: s, Resource: node, Platform: platformtest.Config(), diff --git a/internal/noderesource/noderesource.go b/internal/noderesource/noderesource.go index 26501ae..879d752 100644 --- a/internal/noderesource/noderesource.go +++ b/internal/noderesource/noderesource.go @@ -142,6 +142,10 @@ func GenerateStatefulSet(node *seiv1alpha1.SeiNode, p PlatformConfig) *appsv1.St Selector: &metav1.LabelSelector{ MatchLabels: SelectorLabels(node), }, + // Pod lifecycle is the SeiNode controller's responsibility (replace-pod). + UpdateStrategy: appsv1.StatefulSetUpdateStrategy{ + Type: appsv1.OnDeleteStatefulSetStrategyType, + }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: labels, diff --git a/internal/noderesource/noderesource_test.go b/internal/noderesource/noderesource_test.go index bb8b51e..bbeca08 100644 --- a/internal/noderesource/noderesource_test.go +++ b/internal/noderesource/noderesource_test.go @@ -5,6 +5,7 @@ import ( . "github.com/onsi/gomega" seiconfig "github.com/sei-protocol/sei-config" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -143,6 +144,15 @@ func TestGenerateNodeStatefulSet_BasicFields(t *testing.T) { g.Expect(sts.Spec.VolumeClaimTemplates).To(BeEmpty()) } +func TestGenerateNodeStatefulSet_UsesOnDeleteUpdateStrategy(t *testing.T) { + g := NewWithT(t) + node := newGenesisNode("mynet-0", "default") + + sts := GenerateStatefulSet(node, platformtest.Config()) + + g.Expect(sts.Spec.UpdateStrategy.Type).To(Equal(appsv1.OnDeleteStatefulSetStrategyType)) +} + func TestGenerateNodeStatefulSet_AlwaysHasSidecar(t *testing.T) { g := NewWithT(t) node := newSnapshotNode("snap-0", "default") diff --git a/internal/planner/node_update_test.go b/internal/planner/node_update_test.go index 127f4ee..333bc66 100644 --- a/internal/planner/node_update_test.go +++ b/internal/planner/node_update_test.go @@ -68,6 +68,7 @@ func TestBuildRunningPlan_ImageDrift_ReturnsNodeUpdatePlan(t *testing.T) { want := []string{ task.TaskTypeApplyStatefulSet, task.TaskTypeApplyService, + task.TaskTypeReplacePod, task.TaskTypeObserveImage, TaskMarkReady, } @@ -380,10 +381,11 @@ func TestBuildRunningPlan_ImageDriftWinsOverSidecar(t *testing.T) { g.Expect(err).NotTo(HaveOccurred()) g.Expect(plan).NotTo(BeNil()) // Image update plan ends with MarkReady, which also resolves the sidecar. - g.Expect(plan.Tasks).To(HaveLen(4), "should be full node-update plan, not one-task mark-ready") + g.Expect(plan.Tasks).To(HaveLen(5), "should be full node-update plan, not one-task mark-ready") g.Expect(planTaskTypes(plan)).To(Equal([]string{ task.TaskTypeApplyStatefulSet, task.TaskTypeApplyService, + task.TaskTypeReplacePod, task.TaskTypeObserveImage, TaskMarkReady, })) diff --git a/internal/planner/planner.go b/internal/planner/planner.go index 221b911..f6b8046 100644 --- a/internal/planner/planner.go +++ b/internal/planner/planner.go @@ -467,6 +467,8 @@ func taskMaxRetries(taskType string) int { return groupAssemblyMaxRetries case TaskDiscoverPeers: return discoverPeersMaxRetries + case task.TaskTypeReplacePod: + return 3 default: return 0 } @@ -541,6 +543,8 @@ func paramsForTaskType( return &task.ApplyStatefulSetParams{NodeName: node.Name, Namespace: node.Namespace} case task.TaskTypeApplyService: return &task.ApplyServiceParams{NodeName: node.Name, Namespace: node.Namespace} + case task.TaskTypeReplacePod: + return &task.ReplacePodParams{NodeName: node.Name, Namespace: node.Namespace} case task.TaskTypeObserveImage: return &task.ObserveImageParams{NodeName: node.Name, Namespace: node.Namespace} case task.TaskTypeValidateSigningKey: @@ -708,6 +712,7 @@ func buildNodeUpdatePlan(node *seiv1alpha1.SeiNode) (*seiv1alpha1.TaskPlan, erro prog := []string{ task.TaskTypeApplyStatefulSet, task.TaskTypeApplyService, + task.TaskTypeReplacePod, task.TaskTypeObserveImage, sidecar.TaskTypeMarkReady, } diff --git a/internal/task/bootstrap_task_test.go b/internal/task/bootstrap_task_test.go index 428e950..943d941 100644 --- a/internal/task/bootstrap_task_test.go +++ b/internal/task/bootstrap_task_test.go @@ -56,6 +56,7 @@ func testCfg(t *testing.T, objs ...client.Object) ExecutionConfig { Build() return ExecutionConfig{ KubeClient: c, + APIReader: c, Scheme: s, Resource: testNode(), Platform: platformtest.Config(), @@ -159,6 +160,7 @@ func TestDeployBootstrapJob_Execute_NilSnapshot(t *testing.T) { c := fake.NewClientBuilder().WithScheme(s).Build() cfg := ExecutionConfig{ KubeClient: c, + APIReader: c, Scheme: s, Resource: node, Platform: platformtest.Config(), diff --git a/internal/task/deployment_update_test.go b/internal/task/deployment_update_test.go index 8442265..2fe0e7c 100644 --- a/internal/task/deployment_update_test.go +++ b/internal/task/deployment_update_test.go @@ -50,6 +50,7 @@ func testDeploymentCfg(t *testing.T, group *seiv1alpha1.SeiNodeDeployment, nodes c := builder.Build() return ExecutionConfig{ KubeClient: c, + APIReader: c, Scheme: s, Resource: group, Platform: platformtest.Config(), diff --git a/internal/task/ensure_pvc_test.go b/internal/task/ensure_pvc_test.go index a8ac65f..430b841 100644 --- a/internal/task/ensure_pvc_test.go +++ b/internal/task/ensure_pvc_test.go @@ -105,6 +105,7 @@ func newEnsurePVCExec(t *testing.T, node *seiv1alpha1.SeiNode, objs ...client.Ob c := fake.NewClientBuilder().WithScheme(s).WithObjects(objs...).Build() cfg := ExecutionConfig{ KubeClient: c, + APIReader: c, Scheme: s, Resource: node, Platform: platformtest.Config(), @@ -197,6 +198,7 @@ func TestEnsureDataPVC_Create_AlreadyExistsRace_Requeues(t *testing.T) { cfg := ExecutionConfig{ KubeClient: c, + APIReader: c, Scheme: s, Resource: node, Platform: platformtest.Config(), @@ -269,7 +271,7 @@ func TestEnsureDataPVC_Import_PVCNotFound_ThenAppears_Completes(t *testing.T) { s := ensurePVCScheme(t) c := fake.NewClientBuilder().WithScheme(s).Build() cfg := ExecutionConfig{ - KubeClient: c, Scheme: s, Resource: node, Platform: platformtest.Config(), + KubeClient: c, APIReader: c, Scheme: s, Resource: node, Platform: platformtest.Config(), } raw, _ := json.Marshal(EnsureDataPVCParams{NodeName: node.Name, Namespace: node.Namespace}) exec, err := deserializeEnsureDataPVC("ensure-1", raw, cfg) @@ -465,7 +467,7 @@ func TestEnsureDataPVC_Import_TransientValidationRepeats(t *testing.T) { base := fake.NewClientBuilder().WithScheme(s).Build() counter := &countingClient{Client: base} cfg := ExecutionConfig{ - KubeClient: counter, Scheme: s, Resource: node, Platform: platformtest.Config(), + KubeClient: counter, APIReader: counter, Scheme: s, Resource: node, Platform: platformtest.Config(), } raw, _ := json.Marshal(EnsureDataPVCParams{NodeName: node.Name, Namespace: node.Namespace}) diff --git a/internal/task/observe_image.go b/internal/task/observe_image.go index 24fb393..68cb18f 100644 --- a/internal/task/observe_image.go +++ b/internal/task/observe_image.go @@ -54,7 +54,8 @@ func (e *observeImageExecution) Execute(ctx context.Context) error { sts := &appsv1.StatefulSet{} key := types.NamespacedName{Name: node.Name, Namespace: node.Namespace} - if err := e.cfg.KubeClient.Get(ctx, key, sts); err != nil { + // Bypass cache: apply-statefulset just wrote this in the same reconcile. + if err := e.cfg.APIReader.Get(ctx, key, sts); err != nil { if apierrors.IsNotFound(err) { return nil } diff --git a/internal/task/observe_image_test.go b/internal/task/observe_image_test.go index 617d21e..6576071 100644 --- a/internal/task/observe_image_test.go +++ b/internal/task/observe_image_test.go @@ -46,6 +46,7 @@ func observeImageCfg(t *testing.T, node *seiv1alpha1.SeiNode, sts *appsv1.Statef c := builder.Build() return ExecutionConfig{ KubeClient: c, + APIReader: c, Scheme: s, Resource: node, } diff --git a/internal/task/replace_pod.go b/internal/task/replace_pod.go new file mode 100644 index 0000000..384d1e8 --- /dev/null +++ b/internal/task/replace_pod.go @@ -0,0 +1,131 @@ +package task + +import ( + "context" + "encoding/json" + "fmt" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + seiv1alpha1 "github.com/sei-protocol/sei-k8s-controller/api/v1alpha1" +) + +const TaskTypeReplacePod = "replace-pod" + +type ReplacePodParams struct { + NodeName string `json:"nodeName"` + Namespace string `json:"namespace"` +} + +type replacePodExecution struct { + taskBase + params ReplacePodParams + cfg ExecutionConfig +} + +func deserializeReplacePod(id string, params json.RawMessage, cfg ExecutionConfig) (TaskExecution, error) { + var p ReplacePodParams + if len(params) > 0 { + if err := json.Unmarshal(params, &p); err != nil { + return nil, fmt.Errorf("deserializing replace-pod params: %w", err) + } + } + return &replacePodExecution{ + taskBase: taskBase{id: id, status: ExecutionRunning}, + params: p, + cfg: cfg, + }, nil +} + +// Execute deletes pods at the StatefulSet's old revision. Pairs with +// the StatefulSet's OnDelete update strategy: pod lifecycle is the +// SeiNode controller's responsibility, not the StatefulSet controller's. +func (e *replacePodExecution) Execute(ctx context.Context) error { + node, err := ResourceAs[*seiv1alpha1.SeiNode](e.cfg) + if err != nil { + return Terminal(err) + } + + sts := &appsv1.StatefulSet{} + key := types.NamespacedName{Name: node.Name, Namespace: node.Namespace} + // Bypass cache: apply-statefulset just wrote this in the same reconcile. + if err := e.cfg.APIReader.Get(ctx, key, sts); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return fmt.Errorf("getting statefulset: %w", err) + } + + if sts.Status.ObservedGeneration < sts.Generation { + return nil + } + if sts.Status.UpdateRevision == "" { + return nil + } + if sts.Status.CurrentRevision == sts.Status.UpdateRevision { + e.complete() + return nil + } + + if sts.Spec.Selector == nil || len(sts.Spec.Selector.MatchLabels) == 0 { + return Terminal(fmt.Errorf("statefulset %q has no selector; cannot identify owned pods", node.Name)) + } + + // Multi-replica needs ordinal-aware deletion (reverse-ordinal). Fail + // loud rather than silently violate StatefulSet rolling-update semantics. + if sts.Spec.Replicas != nil && *sts.Spec.Replicas > 1 { + return Terminal(fmt.Errorf( + "replace-pod does not support multi-replica StatefulSets (got replicas=%d); "+ + "add ordinal-aware deletion before scaling up", *sts.Spec.Replicas)) + } + + pods := &corev1.PodList{} + if err := e.cfg.KubeClient.List(ctx, pods, + client.InNamespace(node.Namespace), + client.MatchingLabels(sts.Spec.Selector.MatchLabels), + ); err != nil { + return fmt.Errorf("listing pods for statefulset %q: %w", node.Name, err) + } + + updateRev := sts.Status.UpdateRevision + for i := range pods.Items { + pod := &pods.Items[i] + if !ownedByStatefulSet(pod, sts) { + continue + } + hash, hasHash := pod.Labels[appsv1.ControllerRevisionHashLabelKey] + if !hasHash { + continue + } + if hash == updateRev { + continue + } + if pod.DeletionTimestamp != nil { + continue + } + if err := e.cfg.KubeClient.Delete(ctx, pod); err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("deleting pod %q: %w", pod.Name, err) + } + } + + e.complete() + return nil +} + +func ownedByStatefulSet(pod *corev1.Pod, sts *appsv1.StatefulSet) bool { + for _, ref := range pod.OwnerReferences { + if ref.Kind == "StatefulSet" && ref.UID == sts.UID { + return true + } + } + return false +} + +// Status returns the cached execution status. +func (e *replacePodExecution) Status(_ context.Context) ExecutionStatus { + return e.DefaultStatus() +} diff --git a/internal/task/replace_pod_test.go b/internal/task/replace_pod_test.go new file mode 100644 index 0000000..144fd28 --- /dev/null +++ b/internal/task/replace_pod_test.go @@ -0,0 +1,277 @@ +package task + +import ( + "context" + "encoding/json" + "testing" + + . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + seiv1alpha1 "github.com/sei-protocol/sei-k8s-controller/api/v1alpha1" +) + +const ( + stsUID = types.UID("sts-uid-1") + testReplaceNs = "default" + testReplaceSTS = "node-1" +) + +func replacePodNode() *seiv1alpha1.SeiNode { + return &seiv1alpha1.SeiNode{ + ObjectMeta: metav1.ObjectMeta{Name: testReplaceSTS, Namespace: testReplaceNs}, + Spec: seiv1alpha1.SeiNodeSpec{ + ChainID: "test-chain", + Image: "test:v1", + FullNode: &seiv1alpha1.FullNodeSpec{}, + }, + Status: seiv1alpha1.SeiNodeStatus{Phase: seiv1alpha1.PhaseRunning}, + } +} + +func stsForReplace(currentRev, updateRev string) *appsv1.StatefulSet { + one := int32(1) + return &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: testReplaceSTS, Namespace: testReplaceNs, UID: stsUID, Generation: 1, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: &one, + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{ + "app": "seinode", + "sei.io/seinode": testReplaceSTS, + }}, + }, + Status: appsv1.StatefulSetStatus{ + ObservedGeneration: 1, + CurrentRevision: currentRev, + UpdateRevision: updateRev, + }, + } +} + +func podForReplace(revisionHash string, terminating bool) *corev1.Pod { + controller := true + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: testReplaceSTS + "-0", + Namespace: testReplaceNs, + Labels: map[string]string{ + "app": "seinode", + "sei.io/seinode": testReplaceSTS, + appsv1.ControllerRevisionHashLabelKey: revisionHash, + }, + OwnerReferences: []metav1.OwnerReference{{ + APIVersion: "apps/v1", + Kind: "StatefulSet", + Name: testReplaceSTS, + UID: stsUID, + Controller: &controller, + }}, + }, + } + if terminating { + now := metav1.Now() + pod.DeletionTimestamp = &now + pod.Finalizers = []string{"test/keep-around"} + } + return pod +} + +func replacePodCfg(t *testing.T, node *seiv1alpha1.SeiNode, objs ...client.Object) ExecutionConfig { + t.Helper() + s := runtime.NewScheme() + if err := clientgoscheme.AddToScheme(s); err != nil { + t.Fatal(err) + } + if err := seiv1alpha1.AddToScheme(s); err != nil { + t.Fatal(err) + } + c := fake.NewClientBuilder().WithScheme(s).WithObjects(objs...).Build() + return ExecutionConfig{KubeClient: c, APIReader: c, Scheme: s, Resource: node} +} + +func newReplacePodExec(t *testing.T, cfg ExecutionConfig) TaskExecution { + t.Helper() + raw, _ := json.Marshal(ReplacePodParams{NodeName: testReplaceSTS, Namespace: testReplaceNs}) + exec, err := deserializeReplacePod("rp-test", raw, cfg) + if err != nil { + t.Fatal(err) + } + return exec +} + +// Stale-revision pod present → task deletes it and completes. +func TestReplacePod_StalePod_DeletesAndCompletes(t *testing.T) { + g := NewWithT(t) + node := replacePodNode() + sts := stsForReplace("old-rev", "new-rev") + stalePod := podForReplace("old-rev", false) + + cfg := replacePodCfg(t, node, sts, stalePod) + exec := newReplacePodExec(t, cfg) + + g.Expect(exec.Execute(context.Background())).To(Succeed()) + g.Expect(exec.Status(context.Background())).To(Equal(ExecutionComplete)) + + // Stale pod should be gone (or marked for deletion if a finalizer existed, + // but ours has none, so the fake client deletes it outright). + got := &corev1.Pod{} + err := cfg.KubeClient.Get(context.Background(), + types.NamespacedName{Name: stalePod.Name, Namespace: stalePod.Namespace}, got) + g.Expect(apierrors.IsNotFound(err)).To(BeTrue(), + "expected stale pod to be deleted, got err=%v", err) +} + +// Already at update revision → task is no-op (pod preserved, status complete). +func TestReplacePod_AlreadyAtUpdateRevision_NoOp(t *testing.T) { + g := NewWithT(t) + node := replacePodNode() + sts := stsForReplace("new-rev", "new-rev") + currentPod := podForReplace("new-rev", false) + + cfg := replacePodCfg(t, node, sts, currentPod) + exec := newReplacePodExec(t, cfg) + + g.Expect(exec.Execute(context.Background())).To(Succeed()) + g.Expect(exec.Status(context.Background())).To(Equal(ExecutionComplete)) + + got := &corev1.Pod{} + g.Expect(cfg.KubeClient.Get(context.Background(), + types.NamespacedName{Name: currentPod.Name, Namespace: currentPod.Namespace}, got)).To(Succeed()) +} + +// Pod already terminating (deletionTimestamp present) → task skips it. +// We assert the pod's finalizer wasn't stripped (i.e. task didn't double-delete). +func TestReplacePod_TerminatingPod_Skipped(t *testing.T) { + g := NewWithT(t) + node := replacePodNode() + sts := stsForReplace("old-rev", "new-rev") + terminatingPod := podForReplace("old-rev", true) + + cfg := replacePodCfg(t, node, sts, terminatingPod) + exec := newReplacePodExec(t, cfg) + + g.Expect(exec.Execute(context.Background())).To(Succeed()) + g.Expect(exec.Status(context.Background())).To(Equal(ExecutionComplete)) + + got := &corev1.Pod{} + g.Expect(cfg.KubeClient.Get(context.Background(), + types.NamespacedName{Name: terminatingPod.Name, Namespace: terminatingPod.Namespace}, got)).To(Succeed()) + g.Expect(got.DeletionTimestamp).NotTo(BeNil(), "terminating pod should still exist (finalizer holds it)") +} + +// Empty UpdateRevision (StatefulSet status not yet populated) → task waits. +func TestReplacePod_NoUpdateRevisionYet_TransientWait(t *testing.T) { + g := NewWithT(t) + node := replacePodNode() + sts := stsForReplace("rev-1", "") // UpdateRevision not yet set + + cfg := replacePodCfg(t, node, sts) + exec := newReplacePodExec(t, cfg) + + g.Expect(exec.Execute(context.Background())).To(Succeed()) + g.Expect(exec.Status(context.Background())).To(Equal(ExecutionRunning)) +} + +// StatefulSet missing entirely → task is a transient wait (apply-statefulset +// preceded us; the controller will retry on the next reconcile). +func TestReplacePod_StatefulSetMissing_TransientWait(t *testing.T) { + g := NewWithT(t) + node := replacePodNode() + + cfg := replacePodCfg(t, node) // no STS, no pods + exec := newReplacePodExec(t, cfg) + + g.Expect(exec.Execute(context.Background())).To(Succeed()) + g.Expect(exec.Status(context.Background())).To(Equal(ExecutionRunning)) +} + +// STS controller hasn't observed the latest spec yet — its reported revisions +// are stale. Task must wait, not delete. +func TestReplacePod_StatefulSetGenerationStale_TransientWait(t *testing.T) { + g := NewWithT(t) + node := replacePodNode() + sts := stsForReplace("old-rev", "new-rev") + sts.Generation = 2 + sts.Status.ObservedGeneration = 1 // stale + stalePod := podForReplace("old-rev", false) + + cfg := replacePodCfg(t, node, sts, stalePod) + exec := newReplacePodExec(t, cfg) + + g.Expect(exec.Execute(context.Background())).To(Succeed()) + g.Expect(exec.Status(context.Background())).To(Equal(ExecutionRunning)) + + // Pod must NOT have been deleted while STS revisions are stale. + got := &corev1.Pod{} + g.Expect(cfg.KubeClient.Get(context.Background(), + types.NamespacedName{Name: stalePod.Name, Namespace: stalePod.Namespace}, got)).To(Succeed()) +} + +// Pod has no controller-revision-hash label — task skips it (don't delete +// pods we don't recognize). +func TestReplacePod_PodMissingRevisionHashLabel_Skipped(t *testing.T) { + g := NewWithT(t) + node := replacePodNode() + sts := stsForReplace("old-rev", "new-rev") + pod := podForReplace("old-rev", false) + delete(pod.Labels, appsv1.ControllerRevisionHashLabelKey) + + cfg := replacePodCfg(t, node, sts, pod) + exec := newReplacePodExec(t, cfg) + + g.Expect(exec.Execute(context.Background())).To(Succeed()) + g.Expect(exec.Status(context.Background())).To(Equal(ExecutionComplete)) + + got := &corev1.Pod{} + g.Expect(cfg.KubeClient.Get(context.Background(), + types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, got)).To(Succeed()) +} + +// Pod matches the STS selector by labels but isn't actually owned by the STS +// (e.g. a manually-applied pod with the same labels). Task skips it. +func TestReplacePod_PodNotOwnedByStatefulSet_Skipped(t *testing.T) { + g := NewWithT(t) + node := replacePodNode() + sts := stsForReplace("old-rev", "new-rev") + pod := podForReplace("old-rev", false) + pod.OwnerReferences = nil // not owned by STS + + cfg := replacePodCfg(t, node, sts, pod) + exec := newReplacePodExec(t, cfg) + + g.Expect(exec.Execute(context.Background())).To(Succeed()) + g.Expect(exec.Status(context.Background())).To(Equal(ExecutionComplete)) + + got := &corev1.Pod{} + g.Expect(cfg.KubeClient.Get(context.Background(), + types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, got)).To(Succeed()) +} + +// Multi-replica StatefulSets need ordinal-aware deletion; the task fails +// loud rather than silently violating reverse-ordinal rolling-update semantics. +func TestReplacePod_MultiReplica_TerminalError(t *testing.T) { + g := NewWithT(t) + node := replacePodNode() + sts := stsForReplace("old-rev", "new-rev") + three := int32(3) + sts.Spec.Replicas = &three + + cfg := replacePodCfg(t, node, sts) + exec := newReplacePodExec(t, cfg) + + err := exec.Execute(context.Background()) + g.Expect(err).To(HaveOccurred()) + var termErr *TerminalError + g.Expect(err).To(BeAssignableToTypeOf(termErr)) + g.Expect(err.Error()).To(ContainSubstring("multi-replica")) +} diff --git a/internal/task/task.go b/internal/task/task.go index a4ac622..ebc3a99 100644 --- a/internal/task/task.go +++ b/internal/task/task.go @@ -151,10 +151,14 @@ type SidecarClient interface { type ExecutionConfig struct { BuildSidecarClient func() (SidecarClient, error) KubeClient client.Client - Scheme *runtime.Scheme - Resource client.Object - Platform platform.Config - ObjectStore platform.ObjectStore + // APIReader bypasses the controller-runtime cache. Use when reading a + // resource the same plan just wrote — the cached client lags behind + // SSA writes by milliseconds-to-unbounded. + APIReader client.Reader + Scheme *runtime.Scheme + Resource client.Object + Platform platform.Config + ObjectStore platform.ObjectStore } // ResourceAs is a generic helper that type-asserts the Resource field. @@ -203,6 +207,7 @@ var registry = map[string]taskDeserializer{ TaskTypeEnsureDataPVC: deserializeEnsureDataPVC, TaskTypeApplyStatefulSet: deserializeApplyStatefulSet, TaskTypeApplyService: deserializeApplyService, + TaskTypeReplacePod: deserializeReplacePod, TaskTypeObserveImage: deserializeObserveImage, TaskTypeValidateSigningKey: deserializeValidateSigningKey, TaskTypeValidateNodeKey: deserializeValidateNodeKey, diff --git a/internal/task/validate_node_key_test.go b/internal/task/validate_node_key_test.go index 1387a48..ec8cb5d 100644 --- a/internal/task/validate_node_key_test.go +++ b/internal/task/validate_node_key_test.go @@ -55,6 +55,7 @@ func newValidateNodeKeyExec(t *testing.T, node *seiv1alpha1.SeiNode, objs ...cli c := fake.NewClientBuilder().WithScheme(s).WithObjects(objs...).Build() cfg := ExecutionConfig{ KubeClient: c, + APIReader: c, Scheme: s, Resource: node, } diff --git a/internal/task/validate_signing_key_test.go b/internal/task/validate_signing_key_test.go index cf5fcdb..a70a5a8 100644 --- a/internal/task/validate_signing_key_test.go +++ b/internal/task/validate_signing_key_test.go @@ -81,6 +81,7 @@ func newValidateSigningKeyExec(t *testing.T, node *seiv1alpha1.SeiNode, objs ... c := fake.NewClientBuilder().WithScheme(s).WithObjects(objs...).Build() cfg := ExecutionConfig{ KubeClient: c, + APIReader: c, Scheme: s, Resource: node, } diff --git a/manifests/role.yaml b/manifests/role.yaml index 64c715e..7fbd2f1 100644 --- a/manifests/role.yaml +++ b/manifests/role.yaml @@ -28,12 +28,20 @@ rules: - "" resources: - persistentvolumes - - pods - secrets verbs: - get - list - watch +- apiGroups: + - "" + resources: + - pods + verbs: + - delete + - get + - list + - watch - apiGroups: - apps resources: