Skip to content
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ make docker-push IMG=<image> # Push container image
- **SeiNode** creates StatefulSets (replicas=1), headless Services, and PVCs via server-side apply (fieldOwner: `seinode-controller`).
- **Plan-driven reconciliation** — Both controllers use ordered task plans (stored in `.status.plan`) to drive lifecycle. Plans are built by `internal/planner/` (`ResolvePlan` for nodes, `ForGroup` for deployments), executed by `planner.Executor`, with individual tasks in `internal/task/`. The reconcile loop is: `ResolvePlan → persist plan → ExecutePlan`. See `internal/planner/doc.go` for the full plan lifecycle.
- **Init plans** transition nodes from Pending → Running. They include infrastructure tasks (`ensure-data-pvc`, `apply-statefulset`, `apply-service`) followed by sidecar tasks (`configure-genesis`, `config-apply`, etc.).
- **NodeUpdate plans** roll out image changes on Running nodes. Built when `spec.image != status.currentImage`. Tasks: `apply-statefulset`, `apply-service`, `observe-image` (polls StatefulSet rollout, stamps `currentImage`), `mark-ready` (sidecar re-init). The planner sets `NodeUpdateInProgress` condition on creation and clears it on completion/failure. When no drift is detected, no plan is built — the node sits in steady state.
- **NodeUpdate plans** roll out image changes on Running nodes. Built when `spec.image != status.currentImage`. Tasks: `apply-statefulset`, `apply-service`, `replace-pod` (proactively deletes pods at the old StatefulSet revision so the rollout proceeds even when seid is intentionally unready, e.g. halted at a chain upgrade height), `observe-image` (polls StatefulSet rollout, stamps `currentImage`), `mark-ready` (sidecar re-init). The planner sets `NodeUpdateInProgress` condition on creation and clears it on completion/failure. When no drift is detected, no plan is built — the node sits in steady state.
- **Atomic plan creation** — New plans are persisted before any tasks execute. The reconciler flushes the plan, then requeues. Execution starts on the next reconcile. This guarantees external observers see the plan before side effects occur.
- **Condition ownership** — The planner owns all condition management on the owning resource. It sets conditions when creating plans (e.g., `NodeUpdateInProgress=True`) and when observing terminal plans (e.g., `NodeUpdateInProgress=False`). The executor does not set conditions — it only mutates plan/task state and phase transitions.
- **Single-patch model** — All status mutations (plan state, conditions, phase, currentImage) accumulate in-memory during a reconcile and are flushed in a single `Status().Patch()` at the end. Tasks mutate owned resources (StatefulSets, Services, PVCs); the executor mutates plan state in-memory; the reconciler flushes once.
Expand Down
2 changes: 2 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func main() {
return buildSidecarClient(node)
},
KubeClient: kc,
APIReader: mgr.GetAPIReader(),
Scheme: mgr.GetScheme(),
Resource: node,
Platform: platformCfg,
Expand Down Expand Up @@ -225,6 +226,7 @@ func main() {
return buildSidecarClient(assemblerNode)
},
KubeClient: kc,
APIReader: mgr.GetAPIReader(),
Scheme: mgr.GetScheme(),
Resource: group,
Platform: platformCfg,
Expand Down
10 changes: 9 additions & 1 deletion config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,20 @@ rules:
- ""
resources:
- persistentvolumes
- pods
- secrets
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- pods
verbs:
- delete
- get
- list
- watch
- apiGroups:
- apps
resources:
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/node/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type SeiNodeReconciler struct {
// +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="",resources=persistentvolumes,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;delete
// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch

Expand Down
2 changes: 2 additions & 0 deletions internal/controller/node/plan_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func newProgressionReconciler(t *testing.T, mock *mockSidecarClient, objs ...cli
return task.ExecutionConfig{
BuildSidecarClient: func() (task.SidecarClient, error) { return mock, nil },
KubeClient: c,
APIReader: c,
Scheme: s,
Resource: node,
Platform: platformtest.Config(),
Expand Down Expand Up @@ -796,6 +797,7 @@ func TestReconcileInitializing_SidecarClientError_Requeues(t *testing.T) {
return nil, fmt.Errorf("sidecar unavailable")
},
KubeClient: c,
APIReader: c,
Scheme: s,
Resource: n,
Platform: platformtest.Config(),
Expand Down
1 change: 1 addition & 0 deletions internal/controller/node/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func newNodeReconciler(t *testing.T, objs ...client.Object) (*SeiNodeReconciler,
return task.ExecutionConfig{
BuildSidecarClient: func() (task.SidecarClient, error) { return mock, nil },
KubeClient: c,
APIReader: c,
Scheme: s,
Resource: node,
Platform: platformtest.Config(),
Expand Down
4 changes: 4 additions & 0 deletions internal/noderesource/noderesource.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ func GenerateStatefulSet(node *seiv1alpha1.SeiNode, p PlatformConfig) *appsv1.St
Selector: &metav1.LabelSelector{
MatchLabels: SelectorLabels(node),
},
// Pod lifecycle is the SeiNode controller's responsibility (replace-pod).
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
Type: appsv1.OnDeleteStatefulSetStrategyType,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
Expand Down
10 changes: 10 additions & 0 deletions internal/noderesource/noderesource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

. "github.com/onsi/gomega"
seiconfig "github.com/sei-protocol/sei-config"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -143,6 +144,15 @@ func TestGenerateNodeStatefulSet_BasicFields(t *testing.T) {
g.Expect(sts.Spec.VolumeClaimTemplates).To(BeEmpty())
}

func TestGenerateNodeStatefulSet_UsesOnDeleteUpdateStrategy(t *testing.T) {
g := NewWithT(t)
node := newGenesisNode("mynet-0", "default")

sts := GenerateStatefulSet(node, platformtest.Config())

g.Expect(sts.Spec.UpdateStrategy.Type).To(Equal(appsv1.OnDeleteStatefulSetStrategyType))
}

func TestGenerateNodeStatefulSet_AlwaysHasSidecar(t *testing.T) {
g := NewWithT(t)
node := newSnapshotNode("snap-0", "default")
Expand Down
4 changes: 3 additions & 1 deletion internal/planner/node_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func TestBuildRunningPlan_ImageDrift_ReturnsNodeUpdatePlan(t *testing.T) {
want := []string{
task.TaskTypeApplyStatefulSet,
task.TaskTypeApplyService,
task.TaskTypeReplacePod,
task.TaskTypeObserveImage,
TaskMarkReady,
}
Expand Down Expand Up @@ -380,10 +381,11 @@ func TestBuildRunningPlan_ImageDriftWinsOverSidecar(t *testing.T) {
g.Expect(err).NotTo(HaveOccurred())
g.Expect(plan).NotTo(BeNil())
// Image update plan ends with MarkReady, which also resolves the sidecar.
g.Expect(plan.Tasks).To(HaveLen(4), "should be full node-update plan, not one-task mark-ready")
g.Expect(plan.Tasks).To(HaveLen(5), "should be full node-update plan, not one-task mark-ready")
g.Expect(planTaskTypes(plan)).To(Equal([]string{
task.TaskTypeApplyStatefulSet,
task.TaskTypeApplyService,
task.TaskTypeReplacePod,
task.TaskTypeObserveImage,
TaskMarkReady,
}))
Expand Down
5 changes: 5 additions & 0 deletions internal/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,8 @@ func taskMaxRetries(taskType string) int {
return groupAssemblyMaxRetries
case TaskDiscoverPeers:
return discoverPeersMaxRetries
case task.TaskTypeReplacePod:
return 3
default:
return 0
}
Expand Down Expand Up @@ -541,6 +543,8 @@ func paramsForTaskType(
return &task.ApplyStatefulSetParams{NodeName: node.Name, Namespace: node.Namespace}
case task.TaskTypeApplyService:
return &task.ApplyServiceParams{NodeName: node.Name, Namespace: node.Namespace}
case task.TaskTypeReplacePod:
return &task.ReplacePodParams{NodeName: node.Name, Namespace: node.Namespace}
case task.TaskTypeObserveImage:
return &task.ObserveImageParams{NodeName: node.Name, Namespace: node.Namespace}
case task.TaskTypeValidateSigningKey:
Expand Down Expand Up @@ -708,6 +712,7 @@ func buildNodeUpdatePlan(node *seiv1alpha1.SeiNode) (*seiv1alpha1.TaskPlan, erro
prog := []string{
task.TaskTypeApplyStatefulSet,
task.TaskTypeApplyService,
task.TaskTypeReplacePod,
task.TaskTypeObserveImage,
sidecar.TaskTypeMarkReady,
}
Expand Down
2 changes: 2 additions & 0 deletions internal/task/bootstrap_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func testCfg(t *testing.T, objs ...client.Object) ExecutionConfig {
Build()
return ExecutionConfig{
KubeClient: c,
APIReader: c,
Scheme: s,
Resource: testNode(),
Platform: platformtest.Config(),
Expand Down Expand Up @@ -159,6 +160,7 @@ func TestDeployBootstrapJob_Execute_NilSnapshot(t *testing.T) {
c := fake.NewClientBuilder().WithScheme(s).Build()
cfg := ExecutionConfig{
KubeClient: c,
APIReader: c,
Scheme: s,
Resource: node,
Platform: platformtest.Config(),
Expand Down
1 change: 1 addition & 0 deletions internal/task/deployment_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func testDeploymentCfg(t *testing.T, group *seiv1alpha1.SeiNodeDeployment, nodes
c := builder.Build()
return ExecutionConfig{
KubeClient: c,
APIReader: c,
Scheme: s,
Resource: group,
Platform: platformtest.Config(),
Expand Down
6 changes: 4 additions & 2 deletions internal/task/ensure_pvc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func newEnsurePVCExec(t *testing.T, node *seiv1alpha1.SeiNode, objs ...client.Ob
c := fake.NewClientBuilder().WithScheme(s).WithObjects(objs...).Build()
cfg := ExecutionConfig{
KubeClient: c,
APIReader: c,
Scheme: s,
Resource: node,
Platform: platformtest.Config(),
Expand Down Expand Up @@ -197,6 +198,7 @@ func TestEnsureDataPVC_Create_AlreadyExistsRace_Requeues(t *testing.T) {

cfg := ExecutionConfig{
KubeClient: c,
APIReader: c,
Scheme: s,
Resource: node,
Platform: platformtest.Config(),
Expand Down Expand Up @@ -269,7 +271,7 @@ func TestEnsureDataPVC_Import_PVCNotFound_ThenAppears_Completes(t *testing.T) {
s := ensurePVCScheme(t)
c := fake.NewClientBuilder().WithScheme(s).Build()
cfg := ExecutionConfig{
KubeClient: c, Scheme: s, Resource: node, Platform: platformtest.Config(),
KubeClient: c, APIReader: c, Scheme: s, Resource: node, Platform: platformtest.Config(),
}
raw, _ := json.Marshal(EnsureDataPVCParams{NodeName: node.Name, Namespace: node.Namespace})
exec, err := deserializeEnsureDataPVC("ensure-1", raw, cfg)
Expand Down Expand Up @@ -465,7 +467,7 @@ func TestEnsureDataPVC_Import_TransientValidationRepeats(t *testing.T) {
base := fake.NewClientBuilder().WithScheme(s).Build()
counter := &countingClient{Client: base}
cfg := ExecutionConfig{
KubeClient: counter, Scheme: s, Resource: node, Platform: platformtest.Config(),
KubeClient: counter, APIReader: counter, Scheme: s, Resource: node, Platform: platformtest.Config(),
}
raw, _ := json.Marshal(EnsureDataPVCParams{NodeName: node.Name, Namespace: node.Namespace})

Expand Down
3 changes: 2 additions & 1 deletion internal/task/observe_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func (e *observeImageExecution) Execute(ctx context.Context) error {

sts := &appsv1.StatefulSet{}
key := types.NamespacedName{Name: node.Name, Namespace: node.Namespace}
if err := e.cfg.KubeClient.Get(ctx, key, sts); err != nil {
// Bypass cache: apply-statefulset just wrote this in the same reconcile.
if err := e.cfg.APIReader.Get(ctx, key, sts); err != nil {
Comment thread
cursor[bot] marked this conversation as resolved.
if apierrors.IsNotFound(err) {
return nil
}
Expand Down
1 change: 1 addition & 0 deletions internal/task/observe_image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func observeImageCfg(t *testing.T, node *seiv1alpha1.SeiNode, sts *appsv1.Statef
c := builder.Build()
return ExecutionConfig{
KubeClient: c,
APIReader: c,
Scheme: s,
Resource: node,
}
Expand Down
131 changes: 131 additions & 0 deletions internal/task/replace_pod.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package task

import (
"context"
"encoding/json"
"fmt"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

seiv1alpha1 "github.com/sei-protocol/sei-k8s-controller/api/v1alpha1"
)

const TaskTypeReplacePod = "replace-pod"

type ReplacePodParams struct {
NodeName string `json:"nodeName"`
Namespace string `json:"namespace"`
}

type replacePodExecution struct {
taskBase
params ReplacePodParams
cfg ExecutionConfig
}

func deserializeReplacePod(id string, params json.RawMessage, cfg ExecutionConfig) (TaskExecution, error) {
var p ReplacePodParams
if len(params) > 0 {
if err := json.Unmarshal(params, &p); err != nil {
return nil, fmt.Errorf("deserializing replace-pod params: %w", err)
}
}
return &replacePodExecution{
taskBase: taskBase{id: id, status: ExecutionRunning},
params: p,
cfg: cfg,
}, nil
}

// Execute deletes pods at the StatefulSet's old revision. Pairs with
// the StatefulSet's OnDelete update strategy: pod lifecycle is the
// SeiNode controller's responsibility, not the StatefulSet controller's.
func (e *replacePodExecution) Execute(ctx context.Context) error {
node, err := ResourceAs[*seiv1alpha1.SeiNode](e.cfg)
if err != nil {
return Terminal(err)
}

sts := &appsv1.StatefulSet{}
key := types.NamespacedName{Name: node.Name, Namespace: node.Namespace}
// Bypass cache: apply-statefulset just wrote this in the same reconcile.
if err := e.cfg.APIReader.Get(ctx, key, sts); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return fmt.Errorf("getting statefulset: %w", err)
}

if sts.Status.ObservedGeneration < sts.Generation {
return nil
}
if sts.Status.UpdateRevision == "" {
return nil
}
if sts.Status.CurrentRevision == sts.Status.UpdateRevision {
e.complete()
return nil
}
Comment thread
cursor[bot] marked this conversation as resolved.

if sts.Spec.Selector == nil || len(sts.Spec.Selector.MatchLabels) == 0 {
return Terminal(fmt.Errorf("statefulset %q has no selector; cannot identify owned pods", node.Name))
}

// Multi-replica needs ordinal-aware deletion (reverse-ordinal). Fail
// loud rather than silently violate StatefulSet rolling-update semantics.
if sts.Spec.Replicas != nil && *sts.Spec.Replicas > 1 {
return Terminal(fmt.Errorf(
"replace-pod does not support multi-replica StatefulSets (got replicas=%d); "+
"add ordinal-aware deletion before scaling up", *sts.Spec.Replicas))
}

pods := &corev1.PodList{}
if err := e.cfg.KubeClient.List(ctx, pods,
client.InNamespace(node.Namespace),
client.MatchingLabels(sts.Spec.Selector.MatchLabels),
); err != nil {
return fmt.Errorf("listing pods for statefulset %q: %w", node.Name, err)
}

updateRev := sts.Status.UpdateRevision
for i := range pods.Items {
pod := &pods.Items[i]
if !ownedByStatefulSet(pod, sts) {
continue
}
hash, hasHash := pod.Labels[appsv1.ControllerRevisionHashLabelKey]
if !hasHash {
continue
}
if hash == updateRev {
continue
}
if pod.DeletionTimestamp != nil {
continue
}
if err := e.cfg.KubeClient.Delete(ctx, pod); err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("deleting pod %q: %w", pod.Name, err)
}
}

e.complete()
return nil
}

func ownedByStatefulSet(pod *corev1.Pod, sts *appsv1.StatefulSet) bool {
for _, ref := range pod.OwnerReferences {
if ref.Kind == "StatefulSet" && ref.UID == sts.UID {
return true
}
}
return false
}

// Status returns the cached execution status.
func (e *replacePodExecution) Status(_ context.Context) ExecutionStatus {
return e.DefaultStatus()
}
Loading
Loading