From 1176d606f1314ec94a7e01a10b8f9495472d2268 Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Sun, 9 Nov 2025 14:28:39 -0800 Subject: [PATCH 01/25] feat: process clusters in parallel within stage Signed-off-by: Arvind Thirumurugan --- pkg/controllers/updaterun/execution.go | 68 ++++++++++++++++++-------- 1 file changed, 48 insertions(+), 20 deletions(-) diff --git a/pkg/controllers/updaterun/execution.go b/pkg/controllers/updaterun/execution.go index c69e34f11..53a97b700 100644 --- a/pkg/controllers/updaterun/execution.go +++ b/pkg/controllers/updaterun/execution.go @@ -22,6 +22,7 @@ import ( "fmt" "reflect" "strconv" + "strings" "time" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -68,7 +69,7 @@ func (r *Reconciler) execute( updateRunStatus := updateRun.GetUpdateRunStatus() if updatingStageIndex < len(updateRunStatus.StagesStatus) { updatingStage := &updateRunStatus.StagesStatus[updatingStageIndex] - waitTime, execErr := r.executeUpdatingStage(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings) + waitTime, execErr := r.executeUpdatingStage(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings, 1) if errors.Is(execErr, errStagedUpdatedAborted) { markStageUpdatingFailed(updatingStage, updateRun.GetGeneration(), execErr.Error()) return true, waitTime, execErr @@ -91,6 +92,7 @@ func (r *Reconciler) executeUpdatingStage( updateRun placementv1beta1.UpdateRunObj, updatingStageIndex int, toBeUpdatedBindings []placementv1beta1.BindingObj, + maxConcurrency int, ) (time.Duration, error) { updateRunStatus := updateRun.GetUpdateRunStatus() updateRunSpec := updateRun.GetUpdateRunSpec() @@ -106,23 +108,40 @@ func (r *Reconciler) executeUpdatingStage( toBeUpdatedBindingsMap[bindingSpec.TargetCluster] = binding } finishedClusterCount := 0 + clusterUpdatingCount := 0 - // Go through each cluster in the stage and check if it's updated. - for i := range updatingStageStatus.Clusters { + // List of clusters that need to be processed in parallel in this execution. + var clustersToProcess []placementv1beta1.ClusterUpdatingStatus + + // Go through each cluster in the stage and check if it's updating/succeeded/failed. + for i := 0; i < len(updatingStageStatus.Clusters) && clusterUpdatingCount <= maxConcurrency; i++ { clusterStatus := &updatingStageStatus.Clusters[i] - clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted)) + //clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted)) clusterUpdateSucceededCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionSucceeded)) - if condition.IsConditionStatusFalse(clusterUpdateSucceededCond, updateRun.GetGeneration()) { - // The cluster is marked as failed to update. - failedErr := fmt.Errorf("the cluster `%s` in the stage %s has failed", clusterStatus.ClusterName, updatingStageStatus.StageName) - klog.ErrorS(failedErr, "The cluster has failed to be updated", "updateRun", updateRunRef) - return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, failedErr.Error()) - } - if condition.IsConditionStatusTrue(clusterUpdateSucceededCond, updateRun.GetGeneration()) { - // The cluster has been updated successfully. - finishedClusterCount++ - continue + if clusterUpdateSucceededCond == nil { + // The cluster is either updating or not started yet. + clustersToProcess = append(clustersToProcess, updatingStageStatus.Clusters[i]) + clusterUpdatingCount++ + } else { + if condition.IsConditionStatusFalse(clusterUpdateSucceededCond, updateRun.GetGeneration()) { + // The cluster is marked as failed to update. + failedErr := fmt.Errorf("the cluster `%s` in the stage %s has failed", clusterStatus.ClusterName, updatingStageStatus.StageName) + klog.ErrorS(failedErr, "The cluster has failed to be updated", "updateRun", updateRunRef) + return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, failedErr.Error()) + } + if condition.IsConditionStatusTrue(clusterUpdateSucceededCond, updateRun.GetGeneration()) { + // The cluster has been updated successfully. + finishedClusterCount++ + continue + } } + } + + var stuckClusterNames []string + // Now go through each cluster that needs to be processed. + for i := range clustersToProcess { + clusterStatus := &clustersToProcess[i] + clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted)) // The cluster is either updating or not started yet. binding := toBeUpdatedBindingsMap[clusterStatus.ClusterName] if !condition.IsConditionStatusTrue(clusterStartedCond, updateRun.GetGeneration()) { @@ -172,8 +191,8 @@ func (r *Reconciler) executeUpdatingStage( if finishedClusterCount == 0 { markStageUpdatingStarted(updatingStageStatus, updateRun.GetGeneration()) } - // No need to continue as we only support one cluster updating at a time for now. - return clusterUpdatingWaitTime, nil + // Need to continue as we need to process at most maxConcurrency number of clusters in parallel. + continue } // Now the cluster has to be updating, the binding should point to the right resource snapshot and the binding should be bound. @@ -194,20 +213,29 @@ func (r *Reconciler) executeUpdatingStage( } finished, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun) + if updateErr != nil { + return clusterUpdatingWaitTime, updateErr + } if finished { finishedClusterCount++ - markUpdateRunProgressing(updateRun) continue } else { // If cluster update has been running for more than "updateRunStuckThreshold", mark the update run as stuck. timeElapsed := time.Since(clusterStartedCond.LastTransitionTime.Time) if timeElapsed > updateRunStuckThreshold { klog.V(2).InfoS("Time waiting for cluster update to finish passes threshold, mark the update run as stuck", "time elapsed", timeElapsed, "threshold", updateRunStuckThreshold, "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef) - markUpdateRunStuck(updateRun, updatingStageStatus.StageName, clusterStatus.ClusterName) + stuckClusterNames = append(stuckClusterNames, clusterStatus.ClusterName) } } - // No need to continue as we only support one cluster updating at a time for now. - return clusterUpdatingWaitTime, updateErr + // Need to continue as we need to process at most maxConcurrency number of clusters in parallel. + } + + // After processing maxConcurrency number of cluster, check if we need to mark the update run as stuck or progressing. + if len(stuckClusterNames) > 0 { + markUpdateRunStuck(updateRun, updatingStageStatus.StageName, strings.Join(stuckClusterNames, ", ")) + } else if finishedClusterCount > 0 { + // If there is no stuck cluster but some progress has been made, mark the update run as progressing. + markUpdateRunProgressing(updateRun) } if finishedClusterCount == len(updatingStageStatus.Clusters) { From 73cc31c51ba2cd4ee13725c07b06e21a12b6ae59 Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Sun, 9 Nov 2025 14:45:21 -0800 Subject: [PATCH 02/25] address cyclomatic complexity Signed-off-by: Arvind Thirumurugan --- pkg/controllers/updaterun/execution.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/pkg/controllers/updaterun/execution.go b/pkg/controllers/updaterun/execution.go index 53a97b700..49c28e005 100644 --- a/pkg/controllers/updaterun/execution.go +++ b/pkg/controllers/updaterun/execution.go @@ -231,12 +231,7 @@ func (r *Reconciler) executeUpdatingStage( } // After processing maxConcurrency number of cluster, check if we need to mark the update run as stuck or progressing. - if len(stuckClusterNames) > 0 { - markUpdateRunStuck(updateRun, updatingStageStatus.StageName, strings.Join(stuckClusterNames, ", ")) - } else if finishedClusterCount > 0 { - // If there is no stuck cluster but some progress has been made, mark the update run as progressing. - markUpdateRunProgressing(updateRun) - } + aggregateUpdateRunStatus(updateRun, updatingStageStatus.StageName, stuckClusterNames, finishedClusterCount) if finishedClusterCount == len(updatingStageStatus.Clusters) { // All the clusters in the stage have been updated. @@ -263,6 +258,15 @@ func (r *Reconciler) executeUpdatingStage( return clusterUpdatingWaitTime, nil } +func aggregateUpdateRunStatus(updateRun placementv1beta1.UpdateRunObj, stageName string, stuckClusterNames []string, finishedClusterCount int) { + if len(stuckClusterNames) > 0 { + markUpdateRunStuck(updateRun, stageName, strings.Join(stuckClusterNames, ", ")) + } else if finishedClusterCount > 0 { + // If there is no stuck cluster but some progress has been made, mark the update run as progressing. + markUpdateRunProgressing(updateRun) + } +} + // executeDeleteStage executes the delete stage by deleting the bindings. func (r *Reconciler) executeDeleteStage( ctx context.Context, From ffa59339e237936eaa67c6b527d75e4b2f920c42 Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Sun, 9 Nov 2025 16:22:55 -0800 Subject: [PATCH 03/25] copy pointer Signed-off-by: Arvind Thirumurugan --- pkg/controllers/updaterun/execution.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/controllers/updaterun/execution.go b/pkg/controllers/updaterun/execution.go index 49c28e005..1e294b723 100644 --- a/pkg/controllers/updaterun/execution.go +++ b/pkg/controllers/updaterun/execution.go @@ -111,16 +111,15 @@ func (r *Reconciler) executeUpdatingStage( clusterUpdatingCount := 0 // List of clusters that need to be processed in parallel in this execution. - var clustersToProcess []placementv1beta1.ClusterUpdatingStatus + var clustersToProcess []*placementv1beta1.ClusterUpdatingStatus // Go through each cluster in the stage and check if it's updating/succeeded/failed. for i := 0; i < len(updatingStageStatus.Clusters) && clusterUpdatingCount <= maxConcurrency; i++ { clusterStatus := &updatingStageStatus.Clusters[i] - //clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted)) clusterUpdateSucceededCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionSucceeded)) if clusterUpdateSucceededCond == nil { // The cluster is either updating or not started yet. - clustersToProcess = append(clustersToProcess, updatingStageStatus.Clusters[i]) + clustersToProcess = append(clustersToProcess, &updatingStageStatus.Clusters[i]) clusterUpdatingCount++ } else { if condition.IsConditionStatusFalse(clusterUpdateSucceededCond, updateRun.GetGeneration()) { @@ -140,7 +139,7 @@ func (r *Reconciler) executeUpdatingStage( var stuckClusterNames []string // Now go through each cluster that needs to be processed. for i := range clustersToProcess { - clusterStatus := &clustersToProcess[i] + clusterStatus := clustersToProcess[i] clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted)) // The cluster is either updating or not started yet. binding := toBeUpdatedBindingsMap[clusterStatus.ClusterName] From 2029e02e906cf2e56a3c747aae20cb5710cd93c5 Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Sun, 9 Nov 2025 16:33:22 -0800 Subject: [PATCH 04/25] remove multiple cluster update check Signed-off-by: Arvind Thirumurugan --- pkg/controllers/updaterun/validation.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pkg/controllers/updaterun/validation.go b/pkg/controllers/updaterun/validation.go index 27d557b77..068b97f9e 100644 --- a/pkg/controllers/updaterun/validation.go +++ b/pkg/controllers/updaterun/validation.go @@ -244,13 +244,6 @@ func validateClusterUpdatingStatus( updatingClusters = append(updatingClusters, stageStatus.Clusters[j].ClusterName) } } - // We don't allow more than one clusters to be updating at the same time. - // TODO(wantjian): support multiple clusters updating at the same time. - if len(updatingClusters) > 1 { - unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("more than one cluster is updating in the stage `%s`, clusters: %v", stageStatus.StageName, updatingClusters)) - klog.ErrorS(unexpectedErr, "Detected more than one updating clusters in the stage", "updateRun", klog.KObj(updateRun)) - return -1, -1, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error()) - } } return updatingStageIndex, lastFinishedStageIndex, nil } From 7e0f3d2ef1781221d8a8ead3e597addb917b4475 Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Sun, 9 Nov 2025 16:43:51 -0800 Subject: [PATCH 05/25] fix concurrency check Signed-off-by: Arvind Thirumurugan --- pkg/controllers/updaterun/execution.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controllers/updaterun/execution.go b/pkg/controllers/updaterun/execution.go index 1e294b723..dbe840a23 100644 --- a/pkg/controllers/updaterun/execution.go +++ b/pkg/controllers/updaterun/execution.go @@ -114,7 +114,7 @@ func (r *Reconciler) executeUpdatingStage( var clustersToProcess []*placementv1beta1.ClusterUpdatingStatus // Go through each cluster in the stage and check if it's updating/succeeded/failed. - for i := 0; i < len(updatingStageStatus.Clusters) && clusterUpdatingCount <= maxConcurrency; i++ { + for i := 0; i < len(updatingStageStatus.Clusters) && clusterUpdatingCount < maxConcurrency; i++ { clusterStatus := &updatingStageStatus.Clusters[i] clusterUpdateSucceededCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionSucceeded)) if clusterUpdateSucceededCond == nil { From 7cf7ec3be7859d0650a04a489bc7503d1e2ef6e5 Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Sun, 9 Nov 2025 16:50:43 -0800 Subject: [PATCH 06/25] lint fix Signed-off-by: Arvind Thirumurugan --- pkg/controllers/updaterun/validation.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/pkg/controllers/updaterun/validation.go b/pkg/controllers/updaterun/validation.go index 068b97f9e..b3c5e5e0a 100644 --- a/pkg/controllers/updaterun/validation.go +++ b/pkg/controllers/updaterun/validation.go @@ -234,16 +234,6 @@ func validateClusterUpdatingStatus( return -1, -1, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error()) } updatingStageIndex = curStage - // Collect the updating clusters. - var updatingClusters []string - for j := range stageStatus.Clusters { - clusterStartedCond := meta.FindStatusCondition(stageStatus.Clusters[j].Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted)) - clusterFinishedCond := meta.FindStatusCondition(stageStatus.Clusters[j].Conditions, string(placementv1beta1.ClusterUpdatingConditionSucceeded)) - if condition.IsConditionStatusTrue(clusterStartedCond, updateRun.GetGeneration()) && - !(condition.IsConditionStatusTrue(clusterFinishedCond, updateRun.GetGeneration()) || condition.IsConditionStatusFalse(clusterFinishedCond, updateRun.GetGeneration())) { - updatingClusters = append(updatingClusters, stageStatus.Clusters[j].ClusterName) - } - } } return updatingStageIndex, lastFinishedStageIndex, nil } From 9ff1399e5d39ec11c2b73379ea9aaeeae4bc7725 Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Mon, 10 Nov 2025 16:16:29 -0800 Subject: [PATCH 07/25] aggregate cluster update errors Signed-off-by: Arvind Thirumurugan --- pkg/controllers/updaterun/execution.go | 30 +++++++++++++++++++------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/pkg/controllers/updaterun/execution.go b/pkg/controllers/updaterun/execution.go index dbe840a23..9c070781c 100644 --- a/pkg/controllers/updaterun/execution.go +++ b/pkg/controllers/updaterun/execution.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" @@ -137,6 +138,7 @@ func (r *Reconciler) executeUpdatingStage( } var stuckClusterNames []string + var clusterUpdateErrors []error // Now go through each cluster that needs to be processed. for i := range clustersToProcess { clusterStatus := clustersToProcess[i] @@ -156,11 +158,13 @@ func (r *Reconciler) executeUpdatingStage( bindingSpec.ApplyStrategy = updateRunStatus.ApplyStrategy if err := r.Client.Update(ctx, binding); err != nil { klog.ErrorS(err, "Failed to update binding to be bound with the matching spec of the updateRun", "binding", klog.KObj(binding), "updateRun", updateRunRef) - return 0, controller.NewUpdateIgnoreConflictError(err) + clusterUpdateErrors = append(clusterUpdateErrors, controller.NewUpdateIgnoreConflictError(err)) + continue } klog.V(2).InfoS("Updated the status of a binding to bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef) if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil { - return 0, err + clusterUpdateErrors = append(clusterUpdateErrors, err) + continue } } else { klog.V(2).InfoS("Found the first binding that is updating but the cluster status has not been updated", "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef) @@ -169,20 +173,24 @@ func (r *Reconciler) executeUpdatingStage( bindingSpec.State = placementv1beta1.BindingStateBound if err := r.Client.Update(ctx, binding); err != nil { klog.ErrorS(err, "Failed to update a binding to be bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef) - return 0, controller.NewUpdateIgnoreConflictError(err) + clusterUpdateErrors = append(clusterUpdateErrors, controller.NewUpdateIgnoreConflictError(err)) + continue } klog.V(2).InfoS("Updated the status of a binding to bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef) if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil { - return 0, err + clusterUpdateErrors = append(clusterUpdateErrors, err) + continue } } else if !condition.IsConditionStatusTrue(meta.FindStatusCondition(binding.GetBindingStatus().Conditions, string(placementv1beta1.ResourceBindingRolloutStarted)), binding.GetGeneration()) { klog.V(2).InfoS("The binding is bound and up-to-date but the generation is updated by the scheduler, update rolloutStarted status again", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef) if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil { - return 0, err + clusterUpdateErrors = append(clusterUpdateErrors, err) + continue } } else { if _, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun); updateErr != nil { - return clusterUpdatingWaitTime, updateErr + clusterUpdateErrors = append(clusterUpdateErrors, updateErr) + continue } } } @@ -208,12 +216,13 @@ func (r *Reconciler) executeUpdatingStage( "bindingSpecInSync", inSync, "bindingState", bindingSpec.State, "bindingRolloutStarted", rolloutStarted, "binding", klog.KObj(binding), "updateRun", updateRunRef) markClusterUpdatingFailed(clusterStatus, updateRun.GetGeneration(), preemptedErr.Error()) - return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, preemptedErr.Error()) + clusterUpdateErrors = append(clusterUpdateErrors, fmt.Errorf("%w: %s", errStagedUpdatedAborted, preemptedErr.Error())) + continue } finished, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun) if updateErr != nil { - return clusterUpdatingWaitTime, updateErr + clusterUpdateErrors = append(clusterUpdateErrors, updateErr) } if finished { finishedClusterCount++ @@ -232,6 +241,11 @@ func (r *Reconciler) executeUpdatingStage( // After processing maxConcurrency number of cluster, check if we need to mark the update run as stuck or progressing. aggregateUpdateRunStatus(updateRun, updatingStageStatus.StageName, stuckClusterNames, finishedClusterCount) + // After processing all clusters, aggregate and return errors + if len(clusterUpdateErrors) > 0 { + return 0, utilerrors.NewAggregate(clusterUpdateErrors) + } + if finishedClusterCount == len(updatingStageStatus.Clusters) { // All the clusters in the stage have been updated. markUpdateRunWaiting(updateRun, updatingStageStatus.StageName) From c7b4b5461cb1a475fd93f69ba922340d79dd555d Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Mon, 10 Nov 2025 18:20:12 -0800 Subject: [PATCH 08/25] fix UT Signed-off-by: Arvind Thirumurugan --- pkg/controllers/updaterun/validation_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/controllers/updaterun/validation_test.go b/pkg/controllers/updaterun/validation_test.go index 0f01168e3..d6ad8215d 100644 --- a/pkg/controllers/updaterun/validation_test.go +++ b/pkg/controllers/updaterun/validation_test.go @@ -145,7 +145,7 @@ func TestValidateClusterUpdatingStatus(t *testing.T) { wantLastFinishedStageIndex: -1, }, { - name: "determineUpdatignStage should return error if there are multiple clusters updating in an updating stage", + name: "determineUpdatignStage should not return error if there are multiple clusters updating in an updating stage", curStage: 0, updatingStageIndex: -1, lastFinishedStageIndex: -1, @@ -163,8 +163,8 @@ func TestValidateClusterUpdatingStatus(t *testing.T) { }, }, }, - wantErr: wrapErr(true, fmt.Errorf("more than one cluster is updating in the stage `test-stage`, clusters: [cluster-1 cluster-2]")), - wantUpdatingStageIndex: -1, + wantErr: nil, + wantUpdatingStageIndex: 0, wantLastFinishedStageIndex: -1, }, { From 0130772d8ed64cc1008742d417774d39f37be493 Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Wed, 12 Nov 2025 14:53:14 -0800 Subject: [PATCH 09/25] handle finished clusters Signed-off-by: Arvind Thirumurugan --- pkg/controllers/updaterun/execution.go | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/pkg/controllers/updaterun/execution.go b/pkg/controllers/updaterun/execution.go index 9c070781c..9fc530326 100644 --- a/pkg/controllers/updaterun/execution.go +++ b/pkg/controllers/updaterun/execution.go @@ -108,19 +108,17 @@ func (r *Reconciler) executeUpdatingStage( bindingSpec := binding.GetBindingSpec() toBeUpdatedBindingsMap[bindingSpec.TargetCluster] = binding } + finishedClusterCount := 0 clusterUpdatingCount := 0 - - // List of clusters that need to be processed in parallel in this execution. - var clustersToProcess []*placementv1beta1.ClusterUpdatingStatus - + var stuckClusterNames []string + var clusterUpdateErrors []error // Go through each cluster in the stage and check if it's updating/succeeded/failed. for i := 0; i < len(updatingStageStatus.Clusters) && clusterUpdatingCount < maxConcurrency; i++ { clusterStatus := &updatingStageStatus.Clusters[i] clusterUpdateSucceededCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionSucceeded)) if clusterUpdateSucceededCond == nil { // The cluster is either updating or not started yet. - clustersToProcess = append(clustersToProcess, &updatingStageStatus.Clusters[i]) clusterUpdatingCount++ } else { if condition.IsConditionStatusFalse(clusterUpdateSucceededCond, updateRun.GetGeneration()) { @@ -135,13 +133,7 @@ func (r *Reconciler) executeUpdatingStage( continue } } - } - - var stuckClusterNames []string - var clusterUpdateErrors []error - // Now go through each cluster that needs to be processed. - for i := range clustersToProcess { - clusterStatus := clustersToProcess[i] + // The cluster needs to be processed. clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted)) // The cluster is either updating or not started yet. binding := toBeUpdatedBindingsMap[clusterStatus.ClusterName] @@ -226,6 +218,8 @@ func (r *Reconciler) executeUpdatingStage( } if finished { finishedClusterCount++ + // The cluster has finished successfully, we can process another cluster in this round. + clusterUpdatingCount-- continue } else { // If cluster update has been running for more than "updateRunStuckThreshold", mark the update run as stuck. @@ -268,6 +262,7 @@ func (r *Reconciler) executeUpdatingStage( } return waitTime, nil } + // Some clusters are still updating. return clusterUpdatingWaitTime, nil } From 2e17ac98f00654296a9ed36bf7de52ade7ea9cb9 Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Wed, 12 Nov 2025 17:18:37 -0800 Subject: [PATCH 10/25] parse maxConcurrency Signed-off-by: Arvind Thirumurugan --- pkg/controllers/updaterun/execution.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/controllers/updaterun/execution.go b/pkg/controllers/updaterun/execution.go index 9fc530326..b0ee04783 100644 --- a/pkg/controllers/updaterun/execution.go +++ b/pkg/controllers/updaterun/execution.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" @@ -69,8 +70,13 @@ func (r *Reconciler) execute( updateRunStatus := updateRun.GetUpdateRunStatus() if updatingStageIndex < len(updateRunStatus.StagesStatus) { + // Round down the maxConcurrency to the number of clusters in the stage. + maxConcurrency, err := intstr.GetScaledValueFromIntOrPercent(updateRunStatus.UpdateStrategySnapshot.Stages[updatingStageIndex].MaxConcurrency, len(updateRunStatus.StagesStatus[updatingStageIndex].Clusters), false) + if err != nil { + return false, 0, err + } updatingStage := &updateRunStatus.StagesStatus[updatingStageIndex] - waitTime, execErr := r.executeUpdatingStage(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings, 1) + waitTime, execErr := r.executeUpdatingStage(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings, maxConcurrency) if errors.Is(execErr, errStagedUpdatedAborted) { markStageUpdatingFailed(updatingStage, updateRun.GetGeneration(), execErr.Error()) return true, waitTime, execErr From 82eacbd20f8948c381e71bdafdefe03838d0a612 Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Thu, 13 Nov 2025 10:50:56 -0800 Subject: [PATCH 11/25] add E2E Signed-off-by: Arvind Thirumurugan --- pkg/controllers/updaterun/execution.go | 2 +- test/e2e/cluster_staged_updaterun_test.go | 104 ++++++++++++++++++++-- 2 files changed, 98 insertions(+), 8 deletions(-) diff --git a/pkg/controllers/updaterun/execution.go b/pkg/controllers/updaterun/execution.go index b0ee04783..5b16ae761 100644 --- a/pkg/controllers/updaterun/execution.go +++ b/pkg/controllers/updaterun/execution.go @@ -29,8 +29,8 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/intstr" utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" diff --git a/test/e2e/cluster_staged_updaterun_test.go b/test/e2e/cluster_staged_updaterun_test.go index 9880c2eef..49634cbf0 100644 --- a/test/e2e/cluster_staged_updaterun_test.go +++ b/test/e2e/cluster_staged_updaterun_test.go @@ -27,6 +27,7 @@ import ( apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" @@ -37,13 +38,13 @@ import ( const ( // The current stage wait between clusters are 15 seconds - updateRunEventuallyDuration = time.Minute - - resourceSnapshotIndex1st = "0" - resourceSnapshotIndex2nd = "1" - policySnapshotIndex1st = "0" - policySnapshotIndex2nd = "1" - policySnapshotIndex3rd = "2" + updateRunEventuallyDuration = time.Minute + updateRunParallelEventuallyDuration = 20 * time.Second + resourceSnapshotIndex1st = "0" + resourceSnapshotIndex2nd = "1" + policySnapshotIndex1st = "0" + policySnapshotIndex2nd = "1" + policySnapshotIndex3rd = "2" testConfigMapDataValue = "new" ) @@ -1270,6 +1271,95 @@ var _ = Describe("test CRP rollout with staged update run", func() { } }) }) + + Context("Test parallel cluster updates with maxConcurrency set to 3", Ordered, func() { + var strategy *placementv1beta1.ClusterStagedUpdateStrategy + updateRunName := fmt.Sprintf(clusterStagedUpdateRunNameWithSubIndexTemplate, GinkgoParallelProcess(), 0) + + BeforeAll(func() { + // Create a test namespace and a configMap inside it on the hub cluster. + createWorkResources() + + // Create the CRP with external rollout strategy. + crp := &placementv1beta1.ClusterResourcePlacement{ + ObjectMeta: metav1.ObjectMeta{ + Name: crpName, + // Add a custom finalizer; this would allow us to better observe + // the behavior of the controllers. + Finalizers: []string{customDeletionBlockerFinalizer}, + }, + Spec: placementv1beta1.PlacementSpec{ + ResourceSelectors: workResourceSelector(), + Strategy: placementv1beta1.RolloutStrategy{ + Type: placementv1beta1.ExternalRolloutStrategyType, + }, + }, + } + Expect(hubClient.Create(ctx, crp)).To(Succeed(), "Failed to create CRP") + + // Create a strategy with a single stage selecting all 3 clusters with maxConcurrency=3 + strategy = &placementv1beta1.ClusterStagedUpdateStrategy{ + ObjectMeta: metav1.ObjectMeta{ + Name: strategyName, + }, + Spec: placementv1beta1.UpdateStrategySpec{ + Stages: []placementv1beta1.StageConfig{ + { + Name: "parallel", + // Pick all clusters in a single stage + LabelSelector: &metav1.LabelSelector{}, + MaxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 3}, + }, + }, + }, + } + Expect(hubClient.Create(ctx, strategy)).To(Succeed(), "Failed to create ClusterStagedUpdateStrategy with maxConcurrency=3") + }) + + AfterAll(func() { + // Remove the custom deletion blocker finalizer from the CRP. + ensureCRPAndRelatedResourcesDeleted(crpName, allMemberClusters) + + // Delete the clusterStagedUpdateRun. + ensureClusterStagedUpdateRunDeletion(updateRunName) + + // Delete the clusterStagedUpdateStrategy. + ensureClusterUpdateRunStrategyDeletion(strategyName) + }) + + It("Should not rollout any resources to member clusters as there's no update run yet", checkIfRemovedWorkResourcesFromAllMemberClustersConsistently) + + It("Should have the latest resource snapshot", func() { + validateLatestClusterResourceSnapshot(crpName, resourceSnapshotIndex1st) + }) + + It("Should successfully schedule the crp", func() { + validateLatestClusterSchedulingPolicySnapshot(crpName, policySnapshotIndex1st, 3) + }) + + It("Should update crp status as pending rollout", func() { + crpStatusUpdatedActual := crpStatusWithExternalStrategyActual(nil, "", false, allMemberClusterNames, []string{"", "", ""}, []bool{false, false, false}, nil, nil) + Eventually(crpStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update CRP %s status as expected", crpName) + }) + + It("Should create a cluster staged update run successfully", func() { + createClusterStagedUpdateRunSucceed(updateRunName, crpName, resourceSnapshotIndex1st, strategyName) + }) + + It("Should complete the cluster staged update run in 15s with all 3 clusters updated in parallel", func() { + // With maxConcurrency=3, all 3 clusters should be updated in parallel. + // Each cluster waits 15 seconds, so total time should be under 20s. + csurSucceededActual := clusterStagedUpdateRunStatusSucceededActual(updateRunName, policySnapshotIndex1st, len(allMemberClusters), defaultApplyStrategy, &strategy.Spec, [][]string{{allMemberClusterNames[0], allMemberClusterNames[1], allMemberClusterNames[2]}}, nil, nil, nil) + Eventually(csurSucceededActual, updateRunParallelEventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to validate updateRun %s succeeded within 15s", updateRunName) + checkIfPlacedWorkResourcesOnMemberClustersInUpdateRun(allMemberClusters) + }) + + It("Should update crp status as completed", func() { + crpStatusUpdatedActual := crpStatusWithExternalStrategyActual(workResourceIdentifiers(), resourceSnapshotIndex1st, true, allMemberClusterNames, + []string{resourceSnapshotIndex1st, resourceSnapshotIndex1st, resourceSnapshotIndex1st}, []bool{true, true, true}, nil, nil) + Eventually(crpStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update CRP %s status as expected", crpName) + }) + }) }) // Note that this container cannot run in parallel with other containers. From a39d8beb32a05cd13063ab857d042d11dc5ae660 Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Thu, 13 Nov 2025 11:58:57 -0800 Subject: [PATCH 12/25] add e2e Signed-off-by: Arvind Thirumurugan --- test/e2e/cluster_staged_updaterun_test.go | 15 ++-- test/e2e/staged_updaterun_test.go | 89 +++++++++++++++++++++++ 2 files changed, 97 insertions(+), 7 deletions(-) diff --git a/test/e2e/cluster_staged_updaterun_test.go b/test/e2e/cluster_staged_updaterun_test.go index 49634cbf0..974f33903 100644 --- a/test/e2e/cluster_staged_updaterun_test.go +++ b/test/e2e/cluster_staged_updaterun_test.go @@ -40,11 +40,12 @@ const ( // The current stage wait between clusters are 15 seconds updateRunEventuallyDuration = time.Minute updateRunParallelEventuallyDuration = 20 * time.Second - resourceSnapshotIndex1st = "0" - resourceSnapshotIndex2nd = "1" - policySnapshotIndex1st = "0" - policySnapshotIndex2nd = "1" - policySnapshotIndex3rd = "2" + + resourceSnapshotIndex1st = "0" + resourceSnapshotIndex2nd = "1" + policySnapshotIndex1st = "0" + policySnapshotIndex2nd = "1" + policySnapshotIndex3rd = "2" testConfigMapDataValue = "new" ) @@ -1346,11 +1347,11 @@ var _ = Describe("test CRP rollout with staged update run", func() { createClusterStagedUpdateRunSucceed(updateRunName, crpName, resourceSnapshotIndex1st, strategyName) }) - It("Should complete the cluster staged update run in 15s with all 3 clusters updated in parallel", func() { + It("Should complete the cluster staged update run with all 3 clusters updated in parallel", func() { // With maxConcurrency=3, all 3 clusters should be updated in parallel. // Each cluster waits 15 seconds, so total time should be under 20s. csurSucceededActual := clusterStagedUpdateRunStatusSucceededActual(updateRunName, policySnapshotIndex1st, len(allMemberClusters), defaultApplyStrategy, &strategy.Spec, [][]string{{allMemberClusterNames[0], allMemberClusterNames[1], allMemberClusterNames[2]}}, nil, nil, nil) - Eventually(csurSucceededActual, updateRunParallelEventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to validate updateRun %s succeeded within 15s", updateRunName) + Eventually(csurSucceededActual, updateRunParallelEventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to validate updateRun %s succeeded", updateRunName) checkIfPlacedWorkResourcesOnMemberClustersInUpdateRun(allMemberClusters) }) diff --git a/test/e2e/staged_updaterun_test.go b/test/e2e/staged_updaterun_test.go index b5a5428b1..28ea6cba9 100644 --- a/test/e2e/staged_updaterun_test.go +++ b/test/e2e/staged_updaterun_test.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" @@ -1101,6 +1102,94 @@ var _ = Describe("test RP rollout with staged update run", Label("resourceplacem } }) }) + + Context("Test parallel cluster updates with maxConcurrency set to 3", Ordered, func() { + var strategy *placementv1beta1.StagedUpdateStrategy + updateRunName := fmt.Sprintf(stagedUpdateRunNameWithSubIndexTemplate, GinkgoParallelProcess(), 0) + + BeforeAll(func() { + // Create the RP with external rollout strategy. + rp := &placementv1beta1.ResourcePlacement{ + ObjectMeta: metav1.ObjectMeta{ + Name: rpName, + Namespace: testNamespace, + // Add a custom finalizer; this would allow us to better observe + // the behavior of the controllers. + Finalizers: []string{customDeletionBlockerFinalizer}, + }, + Spec: placementv1beta1.PlacementSpec{ + ResourceSelectors: configMapSelector(), + Strategy: placementv1beta1.RolloutStrategy{ + Type: placementv1beta1.ExternalRolloutStrategyType, + }, + }, + } + Expect(hubClient.Create(ctx, rp)).To(Succeed(), "Failed to create RP") + + // Create a strategy with a single stage selecting all 3 clusters with maxConcurrency=3 + strategy = &placementv1beta1.StagedUpdateStrategy{ + ObjectMeta: metav1.ObjectMeta{ + Name: strategyName, + Namespace: testNamespace, + }, + Spec: placementv1beta1.UpdateStrategySpec{ + Stages: []placementv1beta1.StageConfig{ + { + Name: "parallel", + // Pick all clusters in a single stage + LabelSelector: &metav1.LabelSelector{}, + MaxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 3}, + }, + }, + }, + } + Expect(hubClient.Create(ctx, strategy)).To(Succeed(), "Failed to create StagedUpdateStrategy with maxConcurrency=3") + }) + + AfterAll(func() { + // Remove the custom deletion blocker finalizer from the RP. + ensureRPAndRelatedResourcesDeleted(types.NamespacedName{Name: rpName, Namespace: testNamespace}, allMemberClusters) + + // Delete the stagedUpdateRun. + ensureStagedUpdateRunDeletion(updateRunName, testNamespace) + + // Delete the stagedUpdateStrategy. + ensureStagedUpdateRunStrategyDeletion(strategyName, testNamespace) + }) + + It("Should not rollout any resources to member clusters as there's no update run yet", checkIfRemovedConfigMapFromAllMemberClustersConsistently) + + It("Should have the latest resource snapshot", func() { + validateLatestResourceSnapshot(rpName, testNamespace, resourceSnapshotIndex1st) + }) + + It("Should successfully schedule the rp", func() { + validateLatestSchedulingPolicySnapshot(rpName, testNamespace, policySnapshotIndex1st, 3) + }) + + It("Should update rp status as pending rollout", func() { + rpStatusUpdatedActual := rpStatusWithExternalStrategyActual(nil, "", false, allMemberClusterNames, []string{"", "", ""}, []bool{false, false, false}, nil, nil) + Eventually(rpStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update RP %s/%s status as expected", testNamespace, rpName) + }) + + It("Should create a staged update run successfully", func() { + createStagedUpdateRunSucceed(updateRunName, testNamespace, rpName, resourceSnapshotIndex1st, strategyName) + }) + + It("Should complete the staged update run with all 3 clusters updated in parallel", func() { + // With maxConcurrency=3, all 3 clusters should be updated in parallel. + // Each cluster waits 15 seconds, so total time should be under 20s. + surSucceededActual := stagedUpdateRunStatusSucceededActual(updateRunName, testNamespace, policySnapshotIndex1st, len(allMemberClusters), defaultApplyStrategy, &strategy.Spec, [][]string{{allMemberClusterNames[0], allMemberClusterNames[1], allMemberClusterNames[2]}}, nil, nil, nil) + Eventually(surSucceededActual, updateRunParallelEventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to validate updateRun %s/%s succeeded", testNamespace, updateRunName) + checkIfPlacedWorkResourcesOnMemberClustersInUpdateRun(allMemberClusters) + }) + + It("Should update rp status as completed", func() { + rpStatusUpdatedActual := rpStatusWithExternalStrategyActual(appConfigMapIdentifiers(), resourceSnapshotIndex1st, true, allMemberClusterNames, + []string{resourceSnapshotIndex1st, resourceSnapshotIndex1st, resourceSnapshotIndex1st}, []bool{true, true, true}, nil, nil) + Eventually(rpStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update RP %s/%s status as expected", testNamespace, rpName) + }) + }) }) func createStagedUpdateStrategySucceed(strategyName, namespace string) *placementv1beta1.StagedUpdateStrategy { From 7ece01c684e953ca6acbc39fac3b729697db81f3 Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Thu, 13 Nov 2025 12:37:24 -0800 Subject: [PATCH 13/25] minor fixes Signed-off-by: Arvind Thirumurugan --- pkg/controllers/updaterun/execution.go | 3 +-- test/e2e/cluster_staged_updaterun_test.go | 2 +- test/e2e/staged_updaterun_test.go | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/controllers/updaterun/execution.go b/pkg/controllers/updaterun/execution.go index 5b16ae761..c374a0ecf 100644 --- a/pkg/controllers/updaterun/execution.go +++ b/pkg/controllers/updaterun/execution.go @@ -235,13 +235,12 @@ func (r *Reconciler) executeUpdatingStage( stuckClusterNames = append(stuckClusterNames, clusterStatus.ClusterName) } } - // Need to continue as we need to process at most maxConcurrency number of clusters in parallel. } // After processing maxConcurrency number of cluster, check if we need to mark the update run as stuck or progressing. aggregateUpdateRunStatus(updateRun, updatingStageStatus.StageName, stuckClusterNames, finishedClusterCount) - // After processing all clusters, aggregate and return errors + // After processing all clusters, aggregate and return errors. if len(clusterUpdateErrors) > 0 { return 0, utilerrors.NewAggregate(clusterUpdateErrors) } diff --git a/test/e2e/cluster_staged_updaterun_test.go b/test/e2e/cluster_staged_updaterun_test.go index 974f33903..555ac7202 100644 --- a/test/e2e/cluster_staged_updaterun_test.go +++ b/test/e2e/cluster_staged_updaterun_test.go @@ -1314,7 +1314,7 @@ var _ = Describe("test CRP rollout with staged update run", func() { }, }, } - Expect(hubClient.Create(ctx, strategy)).To(Succeed(), "Failed to create ClusterStagedUpdateStrategy with maxConcurrency=3") + Expect(hubClient.Create(ctx, strategy)).To(Succeed(), "Failed to create ClusterStagedUpdateStrategy") }) AfterAll(func() { diff --git a/test/e2e/staged_updaterun_test.go b/test/e2e/staged_updaterun_test.go index 28ea6cba9..9c880bf18 100644 --- a/test/e2e/staged_updaterun_test.go +++ b/test/e2e/staged_updaterun_test.go @@ -1143,7 +1143,7 @@ var _ = Describe("test RP rollout with staged update run", Label("resourceplacem }, }, } - Expect(hubClient.Create(ctx, strategy)).To(Succeed(), "Failed to create StagedUpdateStrategy with maxConcurrency=3") + Expect(hubClient.Create(ctx, strategy)).To(Succeed(), "Failed to create StagedUpdateStrategy") }) AfterAll(func() { From 61c084190ae5442d174a484b8fddedf16a98cdc6 Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Thu, 13 Nov 2025 12:42:26 -0800 Subject: [PATCH 14/25] update stuck err msg Signed-off-by: Arvind Thirumurugan --- pkg/controllers/updaterun/execution.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/controllers/updaterun/execution.go b/pkg/controllers/updaterun/execution.go index c374a0ecf..212f7eb1b 100644 --- a/pkg/controllers/updaterun/execution.go +++ b/pkg/controllers/updaterun/execution.go @@ -589,14 +589,14 @@ func markUpdateRunProgressingIfNotWaitingOrStuck(updateRun placementv1beta1.Upda } // markUpdateRunStuck marks the updateRun as stuck in memory. -func markUpdateRunStuck(updateRun placementv1beta1.UpdateRunObj, stageName, clusterName string) { +func markUpdateRunStuck(updateRun placementv1beta1.UpdateRunObj, stageName, clusterNames string) { updateRunStatus := updateRun.GetUpdateRunStatus() meta.SetStatusCondition(&updateRunStatus.Conditions, metav1.Condition{ Type: string(placementv1beta1.StagedUpdateRunConditionProgressing), Status: metav1.ConditionFalse, ObservedGeneration: updateRun.GetGeneration(), Reason: condition.UpdateRunStuckReason, - Message: fmt.Sprintf("The updateRun is stuck waiting for cluster %s in stage %s to finish updating, please check placement status for potential errors", clusterName, stageName), + Message: fmt.Sprintf("The updateRun is stuck waiting for cluster/clusters %s in stage %s to finish updating, please check placement status for potential errors", clusterNames, stageName), }) } From 6e667c36b35a071479fc01283b80ab3e6a7196de Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Thu, 13 Nov 2025 14:46:18 -0800 Subject: [PATCH 15/25] add UT for test coverage Signed-off-by: Arvind Thirumurugan --- pkg/controllers/updaterun/execution.go | 18 +- pkg/controllers/updaterun/execution_test.go | 249 ++++++++++++++++++++ 2 files changed, 258 insertions(+), 9 deletions(-) diff --git a/pkg/controllers/updaterun/execution.go b/pkg/controllers/updaterun/execution.go index 212f7eb1b..fe14ecd18 100644 --- a/pkg/controllers/updaterun/execution.go +++ b/pkg/controllers/updaterun/execution.go @@ -271,15 +271,6 @@ func (r *Reconciler) executeUpdatingStage( return clusterUpdatingWaitTime, nil } -func aggregateUpdateRunStatus(updateRun placementv1beta1.UpdateRunObj, stageName string, stuckClusterNames []string, finishedClusterCount int) { - if len(stuckClusterNames) > 0 { - markUpdateRunStuck(updateRun, stageName, strings.Join(stuckClusterNames, ", ")) - } else if finishedClusterCount > 0 { - // If there is no stuck cluster but some progress has been made, mark the update run as progressing. - markUpdateRunProgressing(updateRun) - } -} - // executeDeleteStage executes the delete stage by deleting the bindings. func (r *Reconciler) executeDeleteStage( ctx context.Context, @@ -476,6 +467,15 @@ func (r *Reconciler) updateApprovalRequestAccepted(ctx context.Context, appReq p return nil } +func aggregateUpdateRunStatus(updateRun placementv1beta1.UpdateRunObj, stageName string, stuckClusterNames []string, finishedClusterCount int) { + if len(stuckClusterNames) > 0 { + markUpdateRunStuck(updateRun, stageName, strings.Join(stuckClusterNames, ", ")) + } else if finishedClusterCount > 0 { + // If there is no stuck cluster but some progress has been made, mark the update run as progressing. + markUpdateRunProgressing(updateRun) + } +} + // isBindingSyncedWithClusterStatus checks if the binding is up-to-date with the cluster status. func isBindingSyncedWithClusterStatus(resourceSnapshotName string, updateRun placementv1beta1.UpdateRunObj, binding placementv1beta1.BindingObj, cluster *placementv1beta1.ClusterUpdatingStatus) bool { bindingSpec := binding.GetBindingSpec() diff --git a/pkg/controllers/updaterun/execution_test.go b/pkg/controllers/updaterun/execution_test.go index 2dbf4dcff..62b7b66d4 100644 --- a/pkg/controllers/updaterun/execution_test.go +++ b/pkg/controllers/updaterun/execution_test.go @@ -17,12 +17,21 @@ limitations under the License. package updaterun import ( + "context" + "errors" + "fmt" "testing" + "time" "github.com/google/go-cmp/cmp" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" placementv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1" "github.com/kubefleet-dev/kubefleet/pkg/utils/condition" @@ -398,3 +407,243 @@ func TestBuildApprovalRequestObject(t *testing.T) { }) } } + +func TestExecuteUpdatingStage_Error(t *testing.T) { + tests := []struct { + name string + updateRun *placementv1beta1.ClusterStagedUpdateRun + bindings []placementv1beta1.BindingObj + interceptorFunc *interceptor.Funcs + expectError bool + expectStagedAbortedError bool + expectWaitTime time.Duration + verifyClusterFailed bool + }{ + { + name: "cluster update failed", + updateRun: &placementv1beta1.ClusterStagedUpdateRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-update-run", + Generation: 1, + }, + Spec: placementv1beta1.UpdateRunSpec{ + PlacementName: "test-placement", + ResourceSnapshotIndex: "1", + }, + Status: placementv1beta1.UpdateRunStatus{ + StagesStatus: []placementv1beta1.StageUpdatingStatus{ + { + StageName: "test-stage", + Clusters: []placementv1beta1.ClusterUpdatingStatus{ + { + ClusterName: "cluster-1", + Conditions: []metav1.Condition{ + { + Type: string(placementv1beta1.ClusterUpdatingConditionSucceeded), + Status: metav1.ConditionFalse, + ObservedGeneration: 1, + Reason: condition.ClusterUpdatingFailedReason, + Message: "cluster update failed", + }, + }, + }, + }, + }, + }, + UpdateStrategySnapshot: &placementv1beta1.UpdateStrategySpec{ + Stages: []placementv1beta1.StageConfig{ + { + Name: "test-stage", + MaxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 1}, + }, + }, + }, + }, + }, + bindings: nil, + interceptorFunc: nil, + expectError: true, + expectStagedAbortedError: true, + expectWaitTime: 0, + verifyClusterFailed: false, + }, + { + name: "binding update failure", + updateRun: &placementv1beta1.ClusterStagedUpdateRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-update-run", + Generation: 1, + }, + Spec: placementv1beta1.UpdateRunSpec{ + PlacementName: "test-placement", + ResourceSnapshotIndex: "1", + }, + Status: placementv1beta1.UpdateRunStatus{ + StagesStatus: []placementv1beta1.StageUpdatingStatus{ + { + StageName: "test-stage", + Clusters: []placementv1beta1.ClusterUpdatingStatus{ + { + ClusterName: "cluster-1", + }, + }, + }, + }, + UpdateStrategySnapshot: &placementv1beta1.UpdateStrategySpec{ + Stages: []placementv1beta1.StageConfig{ + { + Name: "test-stage", + MaxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 1}, + }, + }, + }, + }, + }, + bindings: []placementv1beta1.BindingObj{ + &placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "binding-1", + Generation: 1, + }, + Spec: placementv1beta1.ResourceBindingSpec{ + TargetCluster: "cluster-1", + State: placementv1beta1.BindingStateScheduled, + }, + }, + }, + interceptorFunc: &interceptor.Funcs{ + Update: func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.UpdateOption) error { + return fmt.Errorf("simulated update error") + }, + }, + expectError: true, + expectStagedAbortedError: false, + expectWaitTime: 0, + verifyClusterFailed: false, + }, + { + name: "binding preemption", + updateRun: &placementv1beta1.ClusterStagedUpdateRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-update-run", + Generation: 1, + }, + Spec: placementv1beta1.UpdateRunSpec{ + PlacementName: "test-placement", + ResourceSnapshotIndex: "1", + }, + Status: placementv1beta1.UpdateRunStatus{ + StagesStatus: []placementv1beta1.StageUpdatingStatus{ + { + StageName: "test-stage", + Clusters: []placementv1beta1.ClusterUpdatingStatus{ + { + ClusterName: "cluster-1", + Conditions: []metav1.Condition{ + { + Type: string(placementv1beta1.ClusterUpdatingConditionStarted), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + Reason: condition.ClusterUpdatingStartedReason, + }, + }, + }, + }, + }, + }, + UpdateStrategySnapshot: &placementv1beta1.UpdateStrategySpec{ + Stages: []placementv1beta1.StageConfig{ + { + Name: "test-stage", + MaxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 1}, + }, + }, + }, + }, + }, + bindings: []placementv1beta1.BindingObj{ + &placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "binding-1", + Generation: 1, + }, + Spec: placementv1beta1.ResourceBindingSpec{ + TargetCluster: "cluster-1", + ResourceSnapshotName: "wrong-snapshot", + State: placementv1beta1.BindingStateBound, + }, + Status: placementv1beta1.ResourceBindingStatus{ + Conditions: []metav1.Condition{ + { + Type: string(placementv1beta1.ResourceBindingRolloutStarted), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + }, + }, + }, + }, + }, + interceptorFunc: nil, + expectError: true, + expectStagedAbortedError: true, + expectWaitTime: 0, + verifyClusterFailed: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + scheme := runtime.NewScheme() + _ = placementv1beta1.AddToScheme(scheme) + + var fakeClient client.Client + objs := make([]client.Object, len(tt.bindings)) + for i := range tt.bindings { + objs[i] = tt.bindings[i] + } + if tt.interceptorFunc != nil { + fakeClient = interceptor.NewClient( + fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build(), + *tt.interceptorFunc, + ) + } else { + fakeClient = fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build() + } + + r := &Reconciler{ + Client: fakeClient, + } + + // Execute the stage. + waitTime, err := r.executeUpdatingStage(ctx, tt.updateRun, 0, tt.bindings, 1) + + // Verify error expectation. + if tt.expectError && err == nil { + t.Fatal("executeUpdatingStage() expected error, got nil") + } + if !tt.expectError && err != nil { + t.Fatalf("executeUpdatingStage() unexpected error: %v", err) + } + + // Verify specific error type. + if tt.expectStagedAbortedError && !errors.Is(err, errStagedUpdatedAborted) { + t.Fatalf("executeUpdatingStage() expected errStagedUpdatedAborted, got: %v", err) + } + + // Verify wait time. + if waitTime != tt.expectWaitTime { + t.Fatalf("executeUpdatingStage() expected waitTime=%v, got: %v", tt.expectWaitTime, waitTime) + } + + // Verify cluster failed status if needed. + if tt.verifyClusterFailed { + clusterStatus := &tt.updateRun.Status.StagesStatus[0].Clusters[0] + failedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionSucceeded)) + if failedCond == nil || failedCond.Status != metav1.ConditionFalse { + t.Fatal("executeUpdatingStage() failed to mark cluster as failed") + } + } + }) + } +} From 3cd1e489b0c6f015c1c796bb1823a8dc02b5686b Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Thu, 13 Nov 2025 15:48:47 -0800 Subject: [PATCH 16/25] minor fixes Signed-off-by: Arvind Thirumurugan --- pkg/controllers/updaterun/execution_test.go | 63 ++++++++++----------- 1 file changed, 31 insertions(+), 32 deletions(-) diff --git a/pkg/controllers/updaterun/execution_test.go b/pkg/controllers/updaterun/execution_test.go index 62b7b66d4..832ee05b4 100644 --- a/pkg/controllers/updaterun/execution_test.go +++ b/pkg/controllers/updaterun/execution_test.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "strings" "testing" "time" @@ -410,14 +411,13 @@ func TestBuildApprovalRequestObject(t *testing.T) { func TestExecuteUpdatingStage_Error(t *testing.T) { tests := []struct { - name string - updateRun *placementv1beta1.ClusterStagedUpdateRun - bindings []placementv1beta1.BindingObj - interceptorFunc *interceptor.Funcs - expectError bool - expectStagedAbortedError bool - expectWaitTime time.Duration - verifyClusterFailed bool + name string + updateRun *placementv1beta1.ClusterStagedUpdateRun + bindings []placementv1beta1.BindingObj + interceptorFunc *interceptor.Funcs + wantErr error + expectWaitTime time.Duration + verifyClusterFailed bool }{ { name: "cluster update failed", @@ -460,12 +460,11 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { }, }, }, - bindings: nil, - interceptorFunc: nil, - expectError: true, - expectStagedAbortedError: true, - expectWaitTime: 0, - verifyClusterFailed: false, + bindings: nil, + interceptorFunc: nil, + wantErr: errors.New("the cluster `cluster-1` in the stage test-stage has failed"), + expectWaitTime: 0, + verifyClusterFailed: false, }, { name: "binding update failure", @@ -516,10 +515,9 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { return fmt.Errorf("simulated update error") }, }, - expectError: true, - expectStagedAbortedError: false, - expectWaitTime: 0, - verifyClusterFailed: false, + wantErr: errors.New("simulated update error"), + expectWaitTime: 0, + verifyClusterFailed: false, }, { name: "binding preemption", @@ -583,11 +581,10 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { }, }, }, - interceptorFunc: nil, - expectError: true, - expectStagedAbortedError: true, - expectWaitTime: 0, - verifyClusterFailed: true, + interceptorFunc: nil, + wantErr: errors.New("the binding of the updating cluster `cluster-1` in the stage `test-stage` is not up-to-date with the desired status"), + expectWaitTime: 0, + verifyClusterFailed: true, }, } @@ -616,19 +613,21 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { } // Execute the stage. - waitTime, err := r.executeUpdatingStage(ctx, tt.updateRun, 0, tt.bindings, 1) + waitTime, gotErr := r.executeUpdatingStage(ctx, tt.updateRun, 0, tt.bindings, 1) - // Verify error expectation. - if tt.expectError && err == nil { - t.Fatal("executeUpdatingStage() expected error, got nil") + // Verify error expectation.x + if tt.wantErr != nil && gotErr == nil { + t.Fatalf("executeUpdatingStage() expected error containing %v, got nil", tt.wantErr) } - if !tt.expectError && err != nil { - t.Fatalf("executeUpdatingStage() unexpected error: %v", err) + if tt.wantErr == nil && gotErr != nil { + t.Fatalf("executeUpdatingStage() unexpected error: %v", gotErr) } - // Verify specific error type. - if tt.expectStagedAbortedError && !errors.Is(err, errStagedUpdatedAborted) { - t.Fatalf("executeUpdatingStage() expected errStagedUpdatedAborted, got: %v", err) + // Verify error message contains expected substring. + if tt.wantErr != nil && gotErr != nil { + if !strings.Contains(gotErr.Error(), tt.wantErr.Error()) { + t.Fatalf("executeUpdatingStage() expected error containing %v, got: %v", tt.wantErr, gotErr) + } } // Verify wait time. From 55223760be073e3e1e6b41716166d059fa241899 Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Thu, 13 Nov 2025 16:10:42 -0800 Subject: [PATCH 17/25] add more UTs Signed-off-by: Arvind Thirumurugan --- pkg/controllers/updaterun/execution_test.go | 165 ++++++++++++++++---- 1 file changed, 136 insertions(+), 29 deletions(-) diff --git a/pkg/controllers/updaterun/execution_test.go b/pkg/controllers/updaterun/execution_test.go index 832ee05b4..1c615bb99 100644 --- a/pkg/controllers/updaterun/execution_test.go +++ b/pkg/controllers/updaterun/execution_test.go @@ -411,13 +411,12 @@ func TestBuildApprovalRequestObject(t *testing.T) { func TestExecuteUpdatingStage_Error(t *testing.T) { tests := []struct { - name string - updateRun *placementv1beta1.ClusterStagedUpdateRun - bindings []placementv1beta1.BindingObj - interceptorFunc *interceptor.Funcs - wantErr error - expectWaitTime time.Duration - verifyClusterFailed bool + name string + updateRun *placementv1beta1.ClusterStagedUpdateRun + bindings []placementv1beta1.BindingObj + interceptorFunc *interceptor.Funcs + wantErr error + expectWaitTime time.Duration }{ { name: "cluster update failed", @@ -460,11 +459,10 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { }, }, }, - bindings: nil, - interceptorFunc: nil, - wantErr: errors.New("the cluster `cluster-1` in the stage test-stage has failed"), - expectWaitTime: 0, - verifyClusterFailed: false, + bindings: nil, + interceptorFunc: nil, + wantErr: errors.New("the cluster `cluster-1` in the stage test-stage has failed"), + expectWaitTime: 0, }, { name: "binding update failure", @@ -515,9 +513,8 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { return fmt.Errorf("simulated update error") }, }, - wantErr: errors.New("simulated update error"), - expectWaitTime: 0, - verifyClusterFailed: false, + wantErr: errors.New("simulated update error"), + expectWaitTime: 0, }, { name: "binding preemption", @@ -581,10 +578,129 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { }, }, }, - interceptorFunc: nil, - wantErr: errors.New("the binding of the updating cluster `cluster-1` in the stage `test-stage` is not up-to-date with the desired status"), - expectWaitTime: 0, - verifyClusterFailed: true, + interceptorFunc: nil, + wantErr: errors.New("the binding of the updating cluster `cluster-1` in the stage `test-stage` is not up-to-date with the desired status"), + expectWaitTime: 0, + }, + { + name: "binding synced but state not bound - update binding state fails", + updateRun: &placementv1beta1.ClusterStagedUpdateRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-update-run", + Generation: 1, + }, + Spec: placementv1beta1.UpdateRunSpec{ + PlacementName: "test-placement", + ResourceSnapshotIndex: "1", + }, + Status: placementv1beta1.UpdateRunStatus{ + StagesStatus: []placementv1beta1.StageUpdatingStatus{ + { + StageName: "test-stage", + Clusters: []placementv1beta1.ClusterUpdatingStatus{ + { + ClusterName: "cluster-1", + // No conditions - cluster has not started updating yet. + }, + }, + }, + }, + UpdateStrategySnapshot: &placementv1beta1.UpdateStrategySpec{ + Stages: []placementv1beta1.StageConfig{ + { + Name: "test-stage", + MaxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 1}, + }, + }, + }, + }, + }, + bindings: []placementv1beta1.BindingObj{ + &placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "binding-1", + Generation: 1, + }, + Spec: placementv1beta1.ResourceBindingSpec{ + TargetCluster: "cluster-1", + ResourceSnapshotName: "test-placement-1-snapshot", // Already synced. + State: placementv1beta1.BindingStateScheduled, // But not Bound yet. + }, + }, + }, + interceptorFunc: &interceptor.Funcs{ + Update: func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.UpdateOption) error { + return fmt.Errorf("failed to update binding state") + }, + }, + wantErr: errors.New("failed to update binding state"), + expectWaitTime: 0, + }, + { + name: "binding synced and bound but generation updated - update rolloutStarted fails", + updateRun: &placementv1beta1.ClusterStagedUpdateRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-update-run", + Generation: 1, + }, + Spec: placementv1beta1.UpdateRunSpec{ + PlacementName: "test-placement", + ResourceSnapshotIndex: "1", + }, + Status: placementv1beta1.UpdateRunStatus{ + StagesStatus: []placementv1beta1.StageUpdatingStatus{ + { + StageName: "test-stage", + Clusters: []placementv1beta1.ClusterUpdatingStatus{ + { + ClusterName: "cluster-1", + // No conditions - cluster has not started updating yet. + }, + }, + }, + }, + UpdateStrategySnapshot: &placementv1beta1.UpdateStrategySpec{ + Stages: []placementv1beta1.StageConfig{ + { + Name: "test-stage", + MaxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 1}, + }, + }, + }, + }, + }, + bindings: []placementv1beta1.BindingObj{ + &placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "binding-1", + Generation: 2, // Generation updated by scheduler. + }, + Spec: placementv1beta1.ResourceBindingSpec{ + TargetCluster: "cluster-1", + ResourceSnapshotName: "test-placement-1-snapshot", // Already synced. + State: placementv1beta1.BindingStateBound, // Already Bound. + }, + Status: placementv1beta1.ResourceBindingStatus{ + Conditions: []metav1.Condition{ + { + Type: string(placementv1beta1.ResourceBindingRolloutStarted), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, // Old generation - needs update. + Reason: "RolloutStarted", + Message: "Rollout started", + }, + }, + }, + }, + }, + interceptorFunc: &interceptor.Funcs{ + SubResourceUpdate: func(ctx context.Context, client client.Client, subResourceName string, obj client.Object, opts ...client.SubResourceUpdateOption) error { + // Fail the status update for rolloutStarted. + return fmt.Errorf("failed to update binding rolloutStarted status") + }, + }, + wantErr: errors.New("failed to update binding rolloutStarted status"), + expectWaitTime: 0, }, } @@ -615,7 +731,7 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { // Execute the stage. waitTime, gotErr := r.executeUpdatingStage(ctx, tt.updateRun, 0, tt.bindings, 1) - // Verify error expectation.x + // Verify error expectation. if tt.wantErr != nil && gotErr == nil { t.Fatalf("executeUpdatingStage() expected error containing %v, got nil", tt.wantErr) } @@ -634,15 +750,6 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { if waitTime != tt.expectWaitTime { t.Fatalf("executeUpdatingStage() expected waitTime=%v, got: %v", tt.expectWaitTime, waitTime) } - - // Verify cluster failed status if needed. - if tt.verifyClusterFailed { - clusterStatus := &tt.updateRun.Status.StagesStatus[0].Clusters[0] - failedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionSucceeded)) - if failedCond == nil || failedCond.Status != metav1.ConditionFalse { - t.Fatal("executeUpdatingStage() failed to mark cluster as failed") - } - } }) } } From 7a2f5820dfadd3df22a28a5f973b2789e691e6b2 Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Thu, 13 Nov 2025 16:33:29 -0800 Subject: [PATCH 18/25] add UT Signed-off-by: Arvind Thirumurugan --- pkg/controllers/updaterun/execution_test.go | 76 +++++++++++++++++++-- 1 file changed, 70 insertions(+), 6 deletions(-) diff --git a/pkg/controllers/updaterun/execution_test.go b/pkg/controllers/updaterun/execution_test.go index 1c615bb99..8e7ab2ebb 100644 --- a/pkg/controllers/updaterun/execution_test.go +++ b/pkg/controllers/updaterun/execution_test.go @@ -19,7 +19,6 @@ package updaterun import ( "context" "errors" - "fmt" "strings" "testing" "time" @@ -510,7 +509,7 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { }, interceptorFunc: &interceptor.Funcs{ Update: func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.UpdateOption) error { - return fmt.Errorf("simulated update error") + return errors.New("simulated update error") }, }, wantErr: errors.New("simulated update error"), @@ -630,7 +629,7 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { }, interceptorFunc: &interceptor.Funcs{ Update: func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.UpdateOption) error { - return fmt.Errorf("failed to update binding state") + return errors.New("failed to update binding state") }, }, wantErr: errors.New("failed to update binding state"), @@ -686,8 +685,7 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { Type: string(placementv1beta1.ResourceBindingRolloutStarted), Status: metav1.ConditionTrue, ObservedGeneration: 1, // Old generation - needs update. - Reason: "RolloutStarted", - Message: "Rollout started", + Reason: condition.RolloutStartedReason, }, }, }, @@ -696,12 +694,78 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { interceptorFunc: &interceptor.Funcs{ SubResourceUpdate: func(ctx context.Context, client client.Client, subResourceName string, obj client.Object, opts ...client.SubResourceUpdateOption) error { // Fail the status update for rolloutStarted. - return fmt.Errorf("failed to update binding rolloutStarted status") + return errors.New("failed to update binding rolloutStarted status") }, }, wantErr: errors.New("failed to update binding rolloutStarted status"), expectWaitTime: 0, }, + { + name: "binding synced, bound, rolloutStarted true, but binding has failed condition", + updateRun: &placementv1beta1.ClusterStagedUpdateRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-update-run", + Generation: 1, + }, + Spec: placementv1beta1.UpdateRunSpec{ + PlacementName: "test-placement", + ResourceSnapshotIndex: "1", + }, + Status: placementv1beta1.UpdateRunStatus{ + StagesStatus: []placementv1beta1.StageUpdatingStatus{ + { + StageName: "test-stage", + Clusters: []placementv1beta1.ClusterUpdatingStatus{ + { + ClusterName: "cluster-1", + // No conditions - cluster has not started updating yet. + }, + }, + }, + }, + UpdateStrategySnapshot: &placementv1beta1.UpdateStrategySpec{ + Stages: []placementv1beta1.StageConfig{ + { + Name: "test-stage", + MaxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 1}, + }, + }, + }, + }, + }, + bindings: []placementv1beta1.BindingObj{ + &placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "binding-1", + Generation: 1, + }, + Spec: placementv1beta1.ResourceBindingSpec{ + TargetCluster: "cluster-1", + ResourceSnapshotName: "test-placement-1-snapshot", // Already synced. + State: placementv1beta1.BindingStateBound, // Already Bound. + }, + Status: placementv1beta1.ResourceBindingStatus{ + Conditions: []metav1.Condition{ + { + Type: string(placementv1beta1.ResourceBindingRolloutStarted), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + Reason: condition.RolloutStartedReason, + }, + { + Type: string(placementv1beta1.ResourceBindingApplied), + Status: metav1.ConditionFalse, + ObservedGeneration: 1, + Reason: condition.ApplyFailedReason, + }, + }, + }, + }, + }, + interceptorFunc: nil, + wantErr: errors.New("cluster updating encountered an error at stage"), + expectWaitTime: 0, + }, } for _, tt := range tests { From 14fc2eaa87f7b9beaca47219fd7f5da25dff91e7 Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Thu, 13 Nov 2025 17:08:24 -0800 Subject: [PATCH 19/25] add UTs for maxConcurrency calculation Signed-off-by: Arvind Thirumurugan --- pkg/controllers/updaterun/execution.go | 19 +++- pkg/controllers/updaterun/execution_test.go | 105 ++++++++++++++++++++ 2 files changed, 122 insertions(+), 2 deletions(-) diff --git a/pkg/controllers/updaterun/execution.go b/pkg/controllers/updaterun/execution.go index fe14ecd18..3284b68e0 100644 --- a/pkg/controllers/updaterun/execution.go +++ b/pkg/controllers/updaterun/execution.go @@ -70,8 +70,7 @@ func (r *Reconciler) execute( updateRunStatus := updateRun.GetUpdateRunStatus() if updatingStageIndex < len(updateRunStatus.StagesStatus) { - // Round down the maxConcurrency to the number of clusters in the stage. - maxConcurrency, err := intstr.GetScaledValueFromIntOrPercent(updateRunStatus.UpdateStrategySnapshot.Stages[updatingStageIndex].MaxConcurrency, len(updateRunStatus.StagesStatus[updatingStageIndex].Clusters), false) + maxConcurrency, err := calculateMaxConcurrencyValue(updateRunStatus, updatingStageIndex) if err != nil { return false, 0, err } @@ -467,6 +466,22 @@ func (r *Reconciler) updateApprovalRequestAccepted(ctx context.Context, appReq p return nil } +// calculateMaxConcurrencyValue calculates the actual max concurrency value for a stage. +// It converts the IntOrString maxConcurrency (which can be an integer or percentage) to an integer value +// based on the total number of clusters in the stage. The value is rounded down. +func calculateMaxConcurrencyValue(status *placementv1beta1.UpdateRunStatus, stageIndex int) (int, error) { + specifiedMaxConcurrency := status.UpdateStrategySnapshot.Stages[stageIndex].MaxConcurrency + clusterCount := len(status.StagesStatus[stageIndex].Clusters) + // Round down the maxConcurrency to the number of clusters in the stage. + maxConcurrencyValue, err := intstr.GetScaledValueFromIntOrPercent(specifiedMaxConcurrency, clusterCount, false) + if err != nil { + return 0, err + } + return maxConcurrencyValue, nil +} + +// aggregateUpdateRunStatus aggregates the status of the update run based on the cluster update status. +// It marks the update run as stuck if any clusters are stuck, or as progressing if some clusters have finished updating. func aggregateUpdateRunStatus(updateRun placementv1beta1.UpdateRunObj, stageName string, stuckClusterNames []string, finishedClusterCount int) { if len(stuckClusterNames) > 0 { markUpdateRunStuck(updateRun, stageName, strings.Join(stuckClusterNames, ", ")) diff --git a/pkg/controllers/updaterun/execution_test.go b/pkg/controllers/updaterun/execution_test.go index 8e7ab2ebb..55646e679 100644 --- a/pkg/controllers/updaterun/execution_test.go +++ b/pkg/controllers/updaterun/execution_test.go @@ -817,3 +817,108 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { }) } } + +func TestCalculateMaxConcurrencyValue(t *testing.T) { + tests := []struct { + name string + maxConcurrency *intstr.IntOrString + clusterCount int + wantValue int + wantErr bool + }{ + { + name: "integer value - less than cluster count", + maxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 3}, + clusterCount: 10, + wantValue: 3, + wantErr: false, + }, + { + name: "integer value - equal to cluster count", + maxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 10}, + clusterCount: 10, + wantValue: 10, + wantErr: false, + }, + { + name: "integer value - greater than cluster count", + maxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 15}, + clusterCount: 10, + wantValue: 15, + wantErr: false, + }, + { + name: "percentage value - 50%", + maxConcurrency: &intstr.IntOrString{Type: intstr.String, StrVal: "50%"}, + clusterCount: 10, + wantValue: 5, + wantErr: false, + }, + { + name: "percentage value - 33% rounds down", + maxConcurrency: &intstr.IntOrString{Type: intstr.String, StrVal: "33%"}, + clusterCount: 10, + wantValue: 3, + wantErr: false, + }, + { + name: "percentage value - 100%", + maxConcurrency: &intstr.IntOrString{Type: intstr.String, StrVal: "100%"}, + clusterCount: 10, + wantValue: 10, + wantErr: false, + }, + { + name: "percentage value - 25% with 7 clusters", + maxConcurrency: &intstr.IntOrString{Type: intstr.String, StrVal: "25%"}, + clusterCount: 7, + wantValue: 1, + wantErr: false, + }, + { + name: "zero clusters", + maxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 3}, + clusterCount: 0, + wantValue: 3, + wantErr: false, + }, + { + name: "non-zero percentage with zero clusters", + maxConcurrency: &intstr.IntOrString{Type: intstr.String, StrVal: "50%"}, + clusterCount: 0, + wantValue: 0, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + status := &placementv1beta1.UpdateRunStatus{ + StagesStatus: []placementv1beta1.StageUpdatingStatus{ + { + StageName: "test-stage", + Clusters: make([]placementv1beta1.ClusterUpdatingStatus, tt.clusterCount), + }, + }, + UpdateStrategySnapshot: &placementv1beta1.UpdateStrategySpec{ + Stages: []placementv1beta1.StageConfig{ + { + Name: "test-stage", + MaxConcurrency: tt.maxConcurrency, + }, + }, + }, + } + + gotValue, gotErr := calculateMaxConcurrencyValue(status, 0) + + if (gotErr != nil) != tt.wantErr { + t.Fatalf("calculateMaxConcurrencyValue() error = %v, wantErr %v", gotErr, tt.wantErr) + } + + if gotValue != tt.wantValue { + t.Fatalf("calculateMaxConcurrencyValue() = %v, want %v", gotValue, tt.wantValue) + } + }) + } +} From cb5f88b7b0b3e984625a2f4c49d87f74e876049d Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Fri, 14 Nov 2025 10:34:34 -0800 Subject: [PATCH 20/25] minor fixes Signed-off-by: Arvind Thirumurugan --- pkg/controllers/updaterun/execution.go | 3 +-- test/e2e/cluster_staged_updaterun_test.go | 4 ++-- test/e2e/staged_updaterun_test.go | 4 ++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/controllers/updaterun/execution.go b/pkg/controllers/updaterun/execution.go index 3284b68e0..469bbdd96 100644 --- a/pkg/controllers/updaterun/execution.go +++ b/pkg/controllers/updaterun/execution.go @@ -140,7 +140,6 @@ func (r *Reconciler) executeUpdatingStage( } // The cluster needs to be processed. clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted)) - // The cluster is either updating or not started yet. binding := toBeUpdatedBindingsMap[clusterStatus.ClusterName] if !condition.IsConditionStatusTrue(clusterStartedCond, updateRun.GetGeneration()) { // The cluster has not started updating yet. @@ -239,7 +238,7 @@ func (r *Reconciler) executeUpdatingStage( // After processing maxConcurrency number of cluster, check if we need to mark the update run as stuck or progressing. aggregateUpdateRunStatus(updateRun, updatingStageStatus.StageName, stuckClusterNames, finishedClusterCount) - // After processing all clusters, aggregate and return errors. + // Aggregate and return errors. if len(clusterUpdateErrors) > 0 { return 0, utilerrors.NewAggregate(clusterUpdateErrors) } diff --git a/test/e2e/cluster_staged_updaterun_test.go b/test/e2e/cluster_staged_updaterun_test.go index 555ac7202..81dd10a00 100644 --- a/test/e2e/cluster_staged_updaterun_test.go +++ b/test/e2e/cluster_staged_updaterun_test.go @@ -1298,7 +1298,7 @@ var _ = Describe("test CRP rollout with staged update run", func() { } Expect(hubClient.Create(ctx, crp)).To(Succeed(), "Failed to create CRP") - // Create a strategy with a single stage selecting all 3 clusters with maxConcurrency=3 + // Create a strategy with a single stage selecting all 3 clusters with maxConcurrency specified. strategy = &placementv1beta1.ClusterStagedUpdateStrategy{ ObjectMeta: metav1.ObjectMeta{ Name: strategyName, @@ -1307,7 +1307,7 @@ var _ = Describe("test CRP rollout with staged update run", func() { Stages: []placementv1beta1.StageConfig{ { Name: "parallel", - // Pick all clusters in a single stage + // Pick all clusters in a single stage. LabelSelector: &metav1.LabelSelector{}, MaxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 3}, }, diff --git a/test/e2e/staged_updaterun_test.go b/test/e2e/staged_updaterun_test.go index 9c880bf18..357ec5759 100644 --- a/test/e2e/staged_updaterun_test.go +++ b/test/e2e/staged_updaterun_test.go @@ -1126,7 +1126,7 @@ var _ = Describe("test RP rollout with staged update run", Label("resourceplacem } Expect(hubClient.Create(ctx, rp)).To(Succeed(), "Failed to create RP") - // Create a strategy with a single stage selecting all 3 clusters with maxConcurrency=3 + // Create a strategy with a single stage selecting all 3 clusters with maxConcurrency specified. strategy = &placementv1beta1.StagedUpdateStrategy{ ObjectMeta: metav1.ObjectMeta{ Name: strategyName, @@ -1136,7 +1136,7 @@ var _ = Describe("test RP rollout with staged update run", Label("resourceplacem Stages: []placementv1beta1.StageConfig{ { Name: "parallel", - // Pick all clusters in a single stage + // Pick all clusters in a single stage. LabelSelector: &metav1.LabelSelector{}, MaxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 3}, }, From e89653512fac8767e1847c5d5c0176daf133c176 Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Fri, 14 Nov 2025 15:46:53 -0800 Subject: [PATCH 21/25] address comments Signed-off-by: Arvind Thirumurugan --- apis/placement/v1beta1/stageupdate_types.go | 3 +- ...etes-fleet.io_clusterstagedupdateruns.yaml | 5 +- ...leet.io_clusterstagedupdatestrategies.yaml | 5 +- ....kubernetes-fleet.io_stagedupdateruns.yaml | 5 +- ...netes-fleet.io_stagedupdatestrategies.yaml | 5 +- pkg/controllers/updaterun/execution.go | 6 +- pkg/controllers/updaterun/execution_test.go | 11 +++- .../api_validation_integration_test.go | 63 +++++++++++++++++++ 8 files changed, 95 insertions(+), 8 deletions(-) diff --git a/apis/placement/v1beta1/stageupdate_types.go b/apis/placement/v1beta1/stageupdate_types.go index 1f03fb625..cb7bb166e 100644 --- a/apis/placement/v1beta1/stageupdate_types.go +++ b/apis/placement/v1beta1/stageupdate_types.go @@ -322,7 +322,8 @@ type StageConfig struct { // Defaults to 1. // +kubebuilder:default=1 // +kubebuilder:validation:XIntOrString - // +kubebuilder:validation:Pattern="^((100|[0-9]{1,2})%|[0-9]+)$" + // +kubebuilder:validation:Pattern="^((100|[1-9][0-9]?)%|[1-9][0-9]*)$" + // +kubebuilder:validation:XValidation:rule="self == null || type(self) != int || self >= 1",message="maxConcurrency must be at least 1" // +kubebuilder:validation:Optional MaxConcurrency *intstr.IntOrString `json:"maxConcurrency,omitempty"` diff --git a/config/crd/bases/placement.kubernetes-fleet.io_clusterstagedupdateruns.yaml b/config/crd/bases/placement.kubernetes-fleet.io_clusterstagedupdateruns.yaml index bf75d613d..f492a4c9d 100644 --- a/config/crd/bases/placement.kubernetes-fleet.io_clusterstagedupdateruns.yaml +++ b/config/crd/bases/placement.kubernetes-fleet.io_clusterstagedupdateruns.yaml @@ -2040,8 +2040,11 @@ spec: Fractional results are rounded down. A minimum of 1 update is enforced. If not specified, all clusters in the stage are updated sequentially (effectively maxConcurrency = 1). Defaults to 1. - pattern: ^((100|[0-9]{1,2})%|[0-9]+)$ + pattern: ^((100|[1-9][0-9]?)%|[1-9][0-9]*)$ x-kubernetes-int-or-string: true + x-kubernetes-validations: + - message: maxConcurrency must be at least 1 + rule: self == null || type(self) != int || self >= 1 name: description: The name of the stage. This MUST be unique within the same StagedUpdateStrategy. diff --git a/config/crd/bases/placement.kubernetes-fleet.io_clusterstagedupdatestrategies.yaml b/config/crd/bases/placement.kubernetes-fleet.io_clusterstagedupdatestrategies.yaml index 4d088a0ce..e94b72d5d 100644 --- a/config/crd/bases/placement.kubernetes-fleet.io_clusterstagedupdatestrategies.yaml +++ b/config/crd/bases/placement.kubernetes-fleet.io_clusterstagedupdatestrategies.yaml @@ -315,8 +315,11 @@ spec: Fractional results are rounded down. A minimum of 1 update is enforced. If not specified, all clusters in the stage are updated sequentially (effectively maxConcurrency = 1). Defaults to 1. - pattern: ^((100|[0-9]{1,2})%|[0-9]+)$ + pattern: ^((100|[1-9][0-9]?)%|[1-9][0-9]*)$ x-kubernetes-int-or-string: true + x-kubernetes-validations: + - message: maxConcurrency must be at least 1 + rule: self == null || type(self) != int || self >= 1 name: description: The name of the stage. This MUST be unique within the same StagedUpdateStrategy. diff --git a/config/crd/bases/placement.kubernetes-fleet.io_stagedupdateruns.yaml b/config/crd/bases/placement.kubernetes-fleet.io_stagedupdateruns.yaml index 10fe5f738..f421cc0a9 100644 --- a/config/crd/bases/placement.kubernetes-fleet.io_stagedupdateruns.yaml +++ b/config/crd/bases/placement.kubernetes-fleet.io_stagedupdateruns.yaml @@ -960,8 +960,11 @@ spec: Fractional results are rounded down. A minimum of 1 update is enforced. If not specified, all clusters in the stage are updated sequentially (effectively maxConcurrency = 1). Defaults to 1. - pattern: ^((100|[0-9]{1,2})%|[0-9]+)$ + pattern: ^((100|[1-9][0-9]?)%|[1-9][0-9]*)$ x-kubernetes-int-or-string: true + x-kubernetes-validations: + - message: maxConcurrency must be at least 1 + rule: self == null || type(self) != int || self >= 1 name: description: The name of the stage. This MUST be unique within the same StagedUpdateStrategy. diff --git a/config/crd/bases/placement.kubernetes-fleet.io_stagedupdatestrategies.yaml b/config/crd/bases/placement.kubernetes-fleet.io_stagedupdatestrategies.yaml index 898f92a88..68b93f92b 100644 --- a/config/crd/bases/placement.kubernetes-fleet.io_stagedupdatestrategies.yaml +++ b/config/crd/bases/placement.kubernetes-fleet.io_stagedupdatestrategies.yaml @@ -177,8 +177,11 @@ spec: Fractional results are rounded down. A minimum of 1 update is enforced. If not specified, all clusters in the stage are updated sequentially (effectively maxConcurrency = 1). Defaults to 1. - pattern: ^((100|[0-9]{1,2})%|[0-9]+)$ + pattern: ^((100|[1-9][0-9]?)%|[1-9][0-9]*)$ x-kubernetes-int-or-string: true + x-kubernetes-validations: + - message: maxConcurrency must be at least 1 + rule: self == null || type(self) != int || self >= 1 name: description: The name of the stage. This MUST be unique within the same StagedUpdateStrategy. diff --git a/pkg/controllers/updaterun/execution.go b/pkg/controllers/updaterun/execution.go index 469bbdd96..f3d5f2f59 100644 --- a/pkg/controllers/updaterun/execution.go +++ b/pkg/controllers/updaterun/execution.go @@ -476,6 +476,10 @@ func calculateMaxConcurrencyValue(status *placementv1beta1.UpdateRunStatus, stag if err != nil { return 0, err } + // Handle the case where maxConcurrency is specified as percentage but results in 0 after scaling down. + if maxConcurrencyValue == 0 { + maxConcurrencyValue = 1 + } return maxConcurrencyValue, nil } @@ -610,7 +614,7 @@ func markUpdateRunStuck(updateRun placementv1beta1.UpdateRunObj, stageName, clus Status: metav1.ConditionFalse, ObservedGeneration: updateRun.GetGeneration(), Reason: condition.UpdateRunStuckReason, - Message: fmt.Sprintf("The updateRun is stuck waiting for cluster/clusters %s in stage %s to finish updating, please check placement status for potential errors", clusterNames, stageName), + Message: fmt.Sprintf("The updateRun is stuck waiting for cluster(s) %s in stage %s to finish updating, please check placement status for potential errors", clusterNames, stageName), }) } diff --git a/pkg/controllers/updaterun/execution_test.go b/pkg/controllers/updaterun/execution_test.go index 55646e679..8c474ec6e 100644 --- a/pkg/controllers/updaterun/execution_test.go +++ b/pkg/controllers/updaterun/execution_test.go @@ -848,12 +848,19 @@ func TestCalculateMaxConcurrencyValue(t *testing.T) { wantErr: false, }, { - name: "percentage value - 50%", + name: "percentage value - 50% with cluster count > 1", maxConcurrency: &intstr.IntOrString{Type: intstr.String, StrVal: "50%"}, clusterCount: 10, wantValue: 5, wantErr: false, }, + { + name: "percentage value - non zero percentage with cluster count equal to 1", + maxConcurrency: &intstr.IntOrString{Type: intstr.String, StrVal: "10%"}, + clusterCount: 1, + wantValue: 1, + wantErr: false, + }, { name: "percentage value - 33% rounds down", maxConcurrency: &intstr.IntOrString{Type: intstr.String, StrVal: "33%"}, @@ -886,7 +893,7 @@ func TestCalculateMaxConcurrencyValue(t *testing.T) { name: "non-zero percentage with zero clusters", maxConcurrency: &intstr.IntOrString{Type: intstr.String, StrVal: "50%"}, clusterCount: 0, - wantValue: 0, + wantValue: 1, wantErr: false, }, } diff --git a/test/apis/placement/v1beta1/api_validation_integration_test.go b/test/apis/placement/v1beta1/api_validation_integration_test.go index d8eeb7d0c..b21b6e35b 100644 --- a/test/apis/placement/v1beta1/api_validation_integration_test.go +++ b/test/apis/placement/v1beta1/api_validation_integration_test.go @@ -1567,6 +1567,69 @@ var _ = Describe("Test placement v1beta1 API validation", func() { Expect(errors.As(err, &statusErr)).To(BeTrue(), fmt.Sprintf("Create updateRunStrategy call produced error %s. Error type wanted is %s.", reflect.TypeOf(err), reflect.TypeOf(&k8sErrors.StatusError{}))) Expect(statusErr.ErrStatus.Message).Should(MatchRegexp("Too many: 2: must have at most 1 items")) }) + + It("Should deny creation of ClusterStagedUpdateStrategy with MaxConcurrency set to 0", func() { + maxConcurrency := intstr.FromInt(0) + strategy := placementv1beta1.ClusterStagedUpdateStrategy{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(updateRunStrategyNameTemplate, GinkgoParallelProcess()), + }, + Spec: placementv1beta1.UpdateStrategySpec{ + Stages: []placementv1beta1.StageConfig{ + { + Name: fmt.Sprintf(updateRunStageNameTemplate, GinkgoParallelProcess(), 1), + MaxConcurrency: &maxConcurrency, + }, + }, + }, + } + err := hubClient.Create(ctx, &strategy) + var statusErr *k8sErrors.StatusError + Expect(errors.As(err, &statusErr)).To(BeTrue(), fmt.Sprintf("Create updateRunStrategy call produced error %s. Error type wanted is %s.", reflect.TypeOf(err), reflect.TypeOf(&k8sErrors.StatusError{}))) + Expect(statusErr.ErrStatus.Message).Should(MatchRegexp("maxConcurrency must be at least 1")) + }) + + It("Should deny creation of ClusterStagedUpdateStrategy with MaxConcurrency set to 0%", func() { + maxConcurrency := intstr.FromString("0%") + strategy := placementv1beta1.ClusterStagedUpdateStrategy{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(updateRunStrategyNameTemplate, GinkgoParallelProcess()), + }, + Spec: placementv1beta1.UpdateStrategySpec{ + Stages: []placementv1beta1.StageConfig{ + { + Name: fmt.Sprintf(updateRunStageNameTemplate, GinkgoParallelProcess(), 1), + MaxConcurrency: &maxConcurrency, + }, + }, + }, + } + err := hubClient.Create(ctx, &strategy) + var statusErr *k8sErrors.StatusError + Expect(errors.As(err, &statusErr)).To(BeTrue(), fmt.Sprintf("Create updateRunStrategy call produced error %s. Error type wanted is %s.", reflect.TypeOf(err), reflect.TypeOf(&k8sErrors.StatusError{}))) + Expect(statusErr.ErrStatus.Message).Should(MatchRegexp("spec.stages\\[0\\].maxConcurrency in body should match")) + }) + + It("Should deny creation of ClusterStagedUpdateStrategy with MaxConcurrency set to 0%", func() { + maxConcurrency := intstr.FromString("0") + strategy := placementv1beta1.ClusterStagedUpdateStrategy{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(updateRunStrategyNameTemplate, GinkgoParallelProcess()), + }, + Spec: placementv1beta1.UpdateStrategySpec{ + Stages: []placementv1beta1.StageConfig{ + { + Name: fmt.Sprintf(updateRunStageNameTemplate, GinkgoParallelProcess(), 1), + MaxConcurrency: &maxConcurrency, + }, + }, + }, + } + err := hubClient.Create(ctx, &strategy) + var statusErr *k8sErrors.StatusError + Expect(errors.As(err, &statusErr)).To(BeTrue(), fmt.Sprintf("Create updateRunStrategy call produced error %s. Error type wanted is %s.", reflect.TypeOf(err), reflect.TypeOf(&k8sErrors.StatusError{}))) + Expect(statusErr.ErrStatus.Message).Should(MatchRegexp("spec.stages\\[0\\].maxConcurrency in body should match")) + }) }) Context("Test ClusterApprovalRequest API validation - valid cases", func() { From 7352cf108d4534b1010dd8bfbcc65303cf255234 Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Fri, 14 Nov 2025 16:01:39 -0800 Subject: [PATCH 22/25] address UT comment Signed-off-by: Arvind Thirumurugan --- pkg/controllers/updaterun/execution_test.go | 27 +++++++++------------ 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/pkg/controllers/updaterun/execution_test.go b/pkg/controllers/updaterun/execution_test.go index 8c474ec6e..0a59ce71c 100644 --- a/pkg/controllers/updaterun/execution_test.go +++ b/pkg/controllers/updaterun/execution_test.go @@ -415,7 +415,7 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { bindings []placementv1beta1.BindingObj interceptorFunc *interceptor.Funcs wantErr error - expectWaitTime time.Duration + wantWaitTime time.Duration }{ { name: "cluster update failed", @@ -461,7 +461,7 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { bindings: nil, interceptorFunc: nil, wantErr: errors.New("the cluster `cluster-1` in the stage test-stage has failed"), - expectWaitTime: 0, + wantWaitTime: 0, }, { name: "binding update failure", @@ -513,7 +513,7 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { }, }, wantErr: errors.New("simulated update error"), - expectWaitTime: 0, + wantWaitTime: 0, }, { name: "binding preemption", @@ -579,7 +579,7 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { }, interceptorFunc: nil, wantErr: errors.New("the binding of the updating cluster `cluster-1` in the stage `test-stage` is not up-to-date with the desired status"), - expectWaitTime: 0, + wantWaitTime: 0, }, { name: "binding synced but state not bound - update binding state fails", @@ -633,7 +633,7 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { }, }, wantErr: errors.New("failed to update binding state"), - expectWaitTime: 0, + wantWaitTime: 0, }, { name: "binding synced and bound but generation updated - update rolloutStarted fails", @@ -698,7 +698,7 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { }, }, wantErr: errors.New("failed to update binding rolloutStarted status"), - expectWaitTime: 0, + wantWaitTime: 0, }, { name: "binding synced, bound, rolloutStarted true, but binding has failed condition", @@ -764,7 +764,7 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { }, interceptorFunc: nil, wantErr: errors.New("cluster updating encountered an error at stage"), - expectWaitTime: 0, + wantWaitTime: 0, }, } @@ -796,23 +796,20 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { waitTime, gotErr := r.executeUpdatingStage(ctx, tt.updateRun, 0, tt.bindings, 1) // Verify error expectation. - if tt.wantErr != nil && gotErr == nil { - t.Fatalf("executeUpdatingStage() expected error containing %v, got nil", tt.wantErr) - } - if tt.wantErr == nil && gotErr != nil { - t.Fatalf("executeUpdatingStage() unexpected error: %v", gotErr) + if (tt.wantErr != nil) != (gotErr != nil) { + t.Fatalf("executeUpdatingStage() want error: %v, got error: %v", tt.wantErr, gotErr) } // Verify error message contains expected substring. if tt.wantErr != nil && gotErr != nil { if !strings.Contains(gotErr.Error(), tt.wantErr.Error()) { - t.Fatalf("executeUpdatingStage() expected error containing %v, got: %v", tt.wantErr, gotErr) + t.Fatalf("executeUpdatingStage() want error: %v, got error: %v", tt.wantErr, gotErr) } } // Verify wait time. - if waitTime != tt.expectWaitTime { - t.Fatalf("executeUpdatingStage() expected waitTime=%v, got: %v", tt.expectWaitTime, waitTime) + if waitTime != tt.wantWaitTime { + t.Fatalf("executeUpdatingStage() want waitTime: %v, got waitTime: %v", tt.wantWaitTime, waitTime) } }) } From 18e26440c1a1eb6da4a5630379aa821f3a386fdc Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Fri, 14 Nov 2025 16:10:57 -0800 Subject: [PATCH 23/25] address comments Signed-off-by: Arvind Thirumurugan --- pkg/controllers/updaterun/execution.go | 26 +++++++++------------ pkg/controllers/updaterun/execution_test.go | 14 +++++------ 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/pkg/controllers/updaterun/execution.go b/pkg/controllers/updaterun/execution.go index f3d5f2f59..60a6d2afd 100644 --- a/pkg/controllers/updaterun/execution.go +++ b/pkg/controllers/updaterun/execution.go @@ -122,22 +122,18 @@ func (r *Reconciler) executeUpdatingStage( for i := 0; i < len(updatingStageStatus.Clusters) && clusterUpdatingCount < maxConcurrency; i++ { clusterStatus := &updatingStageStatus.Clusters[i] clusterUpdateSucceededCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionSucceeded)) - if clusterUpdateSucceededCond == nil { - // The cluster is either updating or not started yet. - clusterUpdatingCount++ - } else { - if condition.IsConditionStatusFalse(clusterUpdateSucceededCond, updateRun.GetGeneration()) { - // The cluster is marked as failed to update. - failedErr := fmt.Errorf("the cluster `%s` in the stage %s has failed", clusterStatus.ClusterName, updatingStageStatus.StageName) - klog.ErrorS(failedErr, "The cluster has failed to be updated", "updateRun", updateRunRef) - return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, failedErr.Error()) - } - if condition.IsConditionStatusTrue(clusterUpdateSucceededCond, updateRun.GetGeneration()) { - // The cluster has been updated successfully. - finishedClusterCount++ - continue - } + if condition.IsConditionStatusFalse(clusterUpdateSucceededCond, updateRun.GetGeneration()) { + // The cluster is marked as failed to update. + failedErr := fmt.Errorf("the cluster `%s` in the stage %s has failed", clusterStatus.ClusterName, updatingStageStatus.StageName) + klog.ErrorS(failedErr, "The cluster has failed to be updated", "updateRun", updateRunRef) + return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, failedErr.Error()) + } + if condition.IsConditionStatusTrue(clusterUpdateSucceededCond, updateRun.GetGeneration()) { + // The cluster has been updated successfully. + finishedClusterCount++ + continue } + clusterUpdatingCount++ // The cluster needs to be processed. clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted)) binding := toBeUpdatedBindingsMap[clusterStatus.ClusterName] diff --git a/pkg/controllers/updaterun/execution_test.go b/pkg/controllers/updaterun/execution_test.go index 0a59ce71c..78f46567a 100644 --- a/pkg/controllers/updaterun/execution_test.go +++ b/pkg/controllers/updaterun/execution_test.go @@ -415,7 +415,7 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { bindings []placementv1beta1.BindingObj interceptorFunc *interceptor.Funcs wantErr error - wantWaitTime time.Duration + wantWaitTime time.Duration }{ { name: "cluster update failed", @@ -461,7 +461,7 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { bindings: nil, interceptorFunc: nil, wantErr: errors.New("the cluster `cluster-1` in the stage test-stage has failed"), - wantWaitTime: 0, + wantWaitTime: 0, }, { name: "binding update failure", @@ -512,7 +512,7 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { return errors.New("simulated update error") }, }, - wantErr: errors.New("simulated update error"), + wantErr: errors.New("simulated update error"), wantWaitTime: 0, }, { @@ -579,7 +579,7 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { }, interceptorFunc: nil, wantErr: errors.New("the binding of the updating cluster `cluster-1` in the stage `test-stage` is not up-to-date with the desired status"), - wantWaitTime: 0, + wantWaitTime: 0, }, { name: "binding synced but state not bound - update binding state fails", @@ -632,7 +632,7 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { return errors.New("failed to update binding state") }, }, - wantErr: errors.New("failed to update binding state"), + wantErr: errors.New("failed to update binding state"), wantWaitTime: 0, }, { @@ -697,7 +697,7 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { return errors.New("failed to update binding rolloutStarted status") }, }, - wantErr: errors.New("failed to update binding rolloutStarted status"), + wantErr: errors.New("failed to update binding rolloutStarted status"), wantWaitTime: 0, }, { @@ -764,7 +764,7 @@ func TestExecuteUpdatingStage_Error(t *testing.T) { }, interceptorFunc: nil, wantErr: errors.New("cluster updating encountered an error at stage"), - wantWaitTime: 0, + wantWaitTime: 0, }, } From 278c0ce92fa0b6d5067bff2610a9ae9a18eafccb Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Fri, 14 Nov 2025 16:36:13 -0800 Subject: [PATCH 24/25] address comment, add E2E Signed-off-by: Arvind Thirumurugan --- test/e2e/cluster_staged_updaterun_test.go | 91 +++++++++++++++++++++++ test/e2e/staged_updaterun_test.go | 90 ++++++++++++++++++++++ 2 files changed, 181 insertions(+) diff --git a/test/e2e/cluster_staged_updaterun_test.go b/test/e2e/cluster_staged_updaterun_test.go index 81dd10a00..7f81de108 100644 --- a/test/e2e/cluster_staged_updaterun_test.go +++ b/test/e2e/cluster_staged_updaterun_test.go @@ -1361,6 +1361,97 @@ var _ = Describe("test CRP rollout with staged update run", func() { Eventually(crpStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update CRP %s status as expected", crpName) }) }) + + Context("Test parallel cluster updates with maxConcurrency set to 70%", Ordered, func() { + var strategy *placementv1beta1.ClusterStagedUpdateStrategy + updateRunName := fmt.Sprintf(clusterStagedUpdateRunNameWithSubIndexTemplate, GinkgoParallelProcess(), 0) + + BeforeAll(func() { + // Create a test namespace and a configMap inside it on the hub cluster. + createWorkResources() + + // Create the CRP with external rollout strategy. + crp := &placementv1beta1.ClusterResourcePlacement{ + ObjectMeta: metav1.ObjectMeta{ + Name: crpName, + // Add a custom finalizer; this would allow us to better observe + // the behavior of the controllers. + Finalizers: []string{customDeletionBlockerFinalizer}, + }, + Spec: placementv1beta1.PlacementSpec{ + ResourceSelectors: workResourceSelector(), + Strategy: placementv1beta1.RolloutStrategy{ + Type: placementv1beta1.ExternalRolloutStrategyType, + }, + }, + } + Expect(hubClient.Create(ctx, crp)).To(Succeed(), "Failed to create CRP") + + // Create a strategy with maxConcurrency set to 70%. + // With 3 clusters, 70% = 2.1, rounded down to 2 clusters. + strategy = &placementv1beta1.ClusterStagedUpdateStrategy{ + ObjectMeta: metav1.ObjectMeta{ + Name: strategyName, + }, + Spec: placementv1beta1.UpdateStrategySpec{ + Stages: []placementv1beta1.StageConfig{ + { + Name: "percentage", + // Pick all clusters in a single stage. + LabelSelector: &metav1.LabelSelector{}, + MaxConcurrency: &intstr.IntOrString{Type: intstr.String, StrVal: "70%"}, + }, + }, + }, + } + Expect(hubClient.Create(ctx, strategy)).To(Succeed(), "Failed to create ClusterStagedUpdateStrategy") + }) + + AfterAll(func() { + // Remove the custom deletion blocker finalizer from the CRP. + ensureCRPAndRelatedResourcesDeleted(crpName, allMemberClusters) + + // Delete the clusterStagedUpdateRun. + ensureClusterStagedUpdateRunDeletion(updateRunName) + + // Delete the clusterStagedUpdateStrategy. + ensureClusterUpdateRunStrategyDeletion(strategyName) + }) + + It("Should not rollout any resources to member clusters as there's no update run yet", checkIfRemovedWorkResourcesFromAllMemberClustersConsistently) + + It("Should have the latest resource snapshot", func() { + validateLatestClusterResourceSnapshot(crpName, resourceSnapshotIndex1st) + }) + + It("Should successfully schedule the crp", func() { + validateLatestClusterSchedulingPolicySnapshot(crpName, policySnapshotIndex1st, 3) + }) + + It("Should update crp status as pending rollout", func() { + crpStatusUpdatedActual := crpStatusWithExternalStrategyActual(nil, "", false, allMemberClusterNames, []string{"", "", ""}, []bool{false, false, false}, nil, nil) + Eventually(crpStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update CRP %s status as expected", crpName) + }) + + It("Should create a cluster staged update run successfully", func() { + createClusterStagedUpdateRunSucceed(updateRunName, crpName, resourceSnapshotIndex1st, strategyName) + }) + + It("Should complete the cluster staged update run with all 3 clusters", func() { + // Since maxConcurrency=70% each round we process 2 clusters in parallel, + // so all 3 clusters should be updated in 2 rounds. + // Each cluster waits 15 seconds, so total time should be under 40s. + csurSucceededActual := clusterStagedUpdateRunStatusSucceededActual(updateRunName, policySnapshotIndex1st, len(allMemberClusters), defaultApplyStrategy, &strategy.Spec, [][]string{{allMemberClusterNames[0], allMemberClusterNames[1], allMemberClusterNames[2]}}, nil, nil, nil) + Eventually(csurSucceededActual, updateRunParallelEventuallyDuration*2, eventuallyInterval).Should(Succeed(), "Failed to validate updateRun %s succeeded", updateRunName) + checkIfPlacedWorkResourcesOnMemberClustersInUpdateRun(allMemberClusters) + }) + + It("Should update crp status as completed", func() { + crpStatusUpdatedActual := crpStatusWithExternalStrategyActual(workResourceIdentifiers(), resourceSnapshotIndex1st, true, allMemberClusterNames, + []string{resourceSnapshotIndex1st, resourceSnapshotIndex1st, resourceSnapshotIndex1st}, []bool{true, true, true}, nil, nil) + Eventually(crpStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update CRP %s status as expected", crpName) + }) + }) }) // Note that this container cannot run in parallel with other containers. diff --git a/test/e2e/staged_updaterun_test.go b/test/e2e/staged_updaterun_test.go index 357ec5759..13710ac6e 100644 --- a/test/e2e/staged_updaterun_test.go +++ b/test/e2e/staged_updaterun_test.go @@ -1190,6 +1190,96 @@ var _ = Describe("test RP rollout with staged update run", Label("resourceplacem Eventually(rpStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update RP %s/%s status as expected", testNamespace, rpName) }) }) + + Context("Test parallel cluster updates with maxConcurrency set to 70%", Ordered, func() { + var strategy *placementv1beta1.StagedUpdateStrategy + updateRunName := fmt.Sprintf(stagedUpdateRunNameWithSubIndexTemplate, GinkgoParallelProcess(), 0) + + BeforeAll(func() { + // Create the RP with external rollout strategy. + rp := &placementv1beta1.ResourcePlacement{ + ObjectMeta: metav1.ObjectMeta{ + Name: rpName, + Namespace: testNamespace, + // Add a custom finalizer; this would allow us to better observe + // the behavior of the controllers. + Finalizers: []string{customDeletionBlockerFinalizer}, + }, + Spec: placementv1beta1.PlacementSpec{ + ResourceSelectors: configMapSelector(), + Strategy: placementv1beta1.RolloutStrategy{ + Type: placementv1beta1.ExternalRolloutStrategyType, + }, + }, + } + Expect(hubClient.Create(ctx, rp)).To(Succeed(), "Failed to create RP") + + // Create a strategy with maxConcurrency set to 70%. + // With 3 clusters, 70% = 2.1, rounded down to 2 clusters. + strategy = &placementv1beta1.StagedUpdateStrategy{ + ObjectMeta: metav1.ObjectMeta{ + Name: strategyName, + Namespace: testNamespace, + }, + Spec: placementv1beta1.UpdateStrategySpec{ + Stages: []placementv1beta1.StageConfig{ + { + Name: "parallel", + // Pick all clusters in a single stage. + LabelSelector: &metav1.LabelSelector{}, + MaxConcurrency: &intstr.IntOrString{Type: intstr.String, StrVal: "70%"}, + }, + }, + }, + } + Expect(hubClient.Create(ctx, strategy)).To(Succeed(), "Failed to create StagedUpdateStrategy") + }) + + AfterAll(func() { + // Remove the custom deletion blocker finalizer from the RP. + ensureRPAndRelatedResourcesDeleted(types.NamespacedName{Name: rpName, Namespace: testNamespace}, allMemberClusters) + + // Delete the stagedUpdateRun. + ensureStagedUpdateRunDeletion(updateRunName, testNamespace) + + // Delete the stagedUpdateStrategy. + ensureStagedUpdateRunStrategyDeletion(strategyName, testNamespace) + }) + + It("Should not rollout any resources to member clusters as there's no update run yet", checkIfRemovedConfigMapFromAllMemberClustersConsistently) + + It("Should have the latest resource snapshot", func() { + validateLatestResourceSnapshot(rpName, testNamespace, resourceSnapshotIndex1st) + }) + + It("Should successfully schedule the rp", func() { + validateLatestSchedulingPolicySnapshot(rpName, testNamespace, policySnapshotIndex1st, 3) + }) + + It("Should update rp status as pending rollout", func() { + rpStatusUpdatedActual := rpStatusWithExternalStrategyActual(nil, "", false, allMemberClusterNames, []string{"", "", ""}, []bool{false, false, false}, nil, nil) + Eventually(rpStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update RP %s/%s status as expected", testNamespace, rpName) + }) + + It("Should create a staged update run successfully", func() { + createStagedUpdateRunSucceed(updateRunName, testNamespace, rpName, resourceSnapshotIndex1st, strategyName) + }) + + It("Should complete the staged update run with all 3 clusters", func() { + // Since maxConcurrency=70% each round we process 2 clusters in parallel, + // so all 3 clusters should be updated in 2 rounds. + // Each cluster waits 15 seconds, so total time should be under 40s. + surSucceededActual := stagedUpdateRunStatusSucceededActual(updateRunName, testNamespace, policySnapshotIndex1st, len(allMemberClusters), defaultApplyStrategy, &strategy.Spec, [][]string{{allMemberClusterNames[0], allMemberClusterNames[1], allMemberClusterNames[2]}}, nil, nil, nil) + Eventually(surSucceededActual, updateRunParallelEventuallyDuration*2, eventuallyInterval).Should(Succeed(), "Failed to validate updateRun %s/%s succeeded", testNamespace, updateRunName) + checkIfPlacedWorkResourcesOnMemberClustersInUpdateRun(allMemberClusters) + }) + + It("Should update rp status as completed", func() { + rpStatusUpdatedActual := rpStatusWithExternalStrategyActual(appConfigMapIdentifiers(), resourceSnapshotIndex1st, true, allMemberClusterNames, + []string{resourceSnapshotIndex1st, resourceSnapshotIndex1st, resourceSnapshotIndex1st}, []bool{true, true, true}, nil, nil) + Eventually(rpStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update RP %s/%s status as expected", testNamespace, rpName) + }) + }) }) func createStagedUpdateStrategySucceed(strategyName, namespace string) *placementv1beta1.StagedUpdateStrategy { From ab0382a56a892abedde13df843f7797bb783a21d Mon Sep 17 00:00:00 2001 From: Arvind Thirumurugan Date: Fri, 14 Nov 2025 16:59:22 -0800 Subject: [PATCH 25/25] add TODO Signed-off-by: Arvind Thirumurugan --- pkg/controllers/updaterun/validation.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/controllers/updaterun/validation.go b/pkg/controllers/updaterun/validation.go index b3c5e5e0a..ffa5ea3c2 100644 --- a/pkg/controllers/updaterun/validation.go +++ b/pkg/controllers/updaterun/validation.go @@ -234,6 +234,7 @@ func validateClusterUpdatingStatus( return -1, -1, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error()) } updatingStageIndex = curStage + // TODO(arvindth): add validation to ensure updating cluster count should not exceed maxConcurrency. } return updatingStageIndex, lastFinishedStageIndex, nil }