Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apis/placement/v1beta1/stageupdate_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ type StageConfig struct {
// Defaults to 1.
// +kubebuilder:default=1
// +kubebuilder:validation:XIntOrString
// +kubebuilder:validation:Pattern="^((100|[0-9]{1,2})%|[0-9]+)$"
// +kubebuilder:validation:Pattern="^((100|[1-9][0-9]?)%|[1-9][0-9]*)$"
// +kubebuilder:validation:XValidation:rule="self == null || type(self) != int || self >= 1",message="maxConcurrency must be at least 1"
// +kubebuilder:validation:Optional
MaxConcurrency *intstr.IntOrString `json:"maxConcurrency,omitempty"`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2040,8 +2040,11 @@ spec:
Fractional results are rounded down. A minimum of 1 update is enforced.
If not specified, all clusters in the stage are updated sequentially (effectively maxConcurrency = 1).
Defaults to 1.
pattern: ^((100|[0-9]{1,2})%|[0-9]+)$
pattern: ^((100|[1-9][0-9]?)%|[1-9][0-9]*)$
x-kubernetes-int-or-string: true
x-kubernetes-validations:
- message: maxConcurrency must be at least 1
rule: self == null || type(self) != int || self >= 1
name:
description: The name of the stage. This MUST be unique
within the same StagedUpdateStrategy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,11 @@ spec:
Fractional results are rounded down. A minimum of 1 update is enforced.
If not specified, all clusters in the stage are updated sequentially (effectively maxConcurrency = 1).
Defaults to 1.
pattern: ^((100|[0-9]{1,2})%|[0-9]+)$
pattern: ^((100|[1-9][0-9]?)%|[1-9][0-9]*)$
x-kubernetes-int-or-string: true
x-kubernetes-validations:
- message: maxConcurrency must be at least 1
rule: self == null || type(self) != int || self >= 1
name:
description: The name of the stage. This MUST be unique within
the same StagedUpdateStrategy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -960,8 +960,11 @@ spec:
Fractional results are rounded down. A minimum of 1 update is enforced.
If not specified, all clusters in the stage are updated sequentially (effectively maxConcurrency = 1).
Defaults to 1.
pattern: ^((100|[0-9]{1,2})%|[0-9]+)$
pattern: ^((100|[1-9][0-9]?)%|[1-9][0-9]*)$
x-kubernetes-int-or-string: true
x-kubernetes-validations:
- message: maxConcurrency must be at least 1
rule: self == null || type(self) != int || self >= 1
name:
description: The name of the stage. This MUST be unique
within the same StagedUpdateStrategy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,11 @@ spec:
Fractional results are rounded down. A minimum of 1 update is enforced.
If not specified, all clusters in the stage are updated sequentially (effectively maxConcurrency = 1).
Defaults to 1.
pattern: ^((100|[0-9]{1,2})%|[0-9]+)$
pattern: ^((100|[1-9][0-9]?)%|[1-9][0-9]*)$
x-kubernetes-int-or-string: true
x-kubernetes-validations:
- message: maxConcurrency must be at least 1
rule: self == null || type(self) != int || self >= 1
name:
description: The name of the stage. This MUST be unique within
the same StagedUpdateStrategy.
Expand Down
101 changes: 80 additions & 21 deletions pkg/controllers/updaterun/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ import (
"fmt"
"reflect"
"strconv"
"strings"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -67,8 +70,12 @@ func (r *Reconciler) execute(

updateRunStatus := updateRun.GetUpdateRunStatus()
if updatingStageIndex < len(updateRunStatus.StagesStatus) {
maxConcurrency, err := calculateMaxConcurrencyValue(updateRunStatus, updatingStageIndex)
if err != nil {
return false, 0, err
}
updatingStage := &updateRunStatus.StagesStatus[updatingStageIndex]
waitTime, execErr := r.executeUpdatingStage(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings)
waitTime, execErr := r.executeUpdatingStage(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings, maxConcurrency)
if errors.Is(execErr, errStagedUpdatedAborted) {
markStageUpdatingFailed(updatingStage, updateRun.GetGeneration(), execErr.Error())
return true, waitTime, execErr
Expand All @@ -91,6 +98,7 @@ func (r *Reconciler) executeUpdatingStage(
updateRun placementv1beta1.UpdateRunObj,
updatingStageIndex int,
toBeUpdatedBindings []placementv1beta1.BindingObj,
maxConcurrency int,
) (time.Duration, error) {
updateRunStatus := updateRun.GetUpdateRunStatus()
updateRunSpec := updateRun.GetUpdateRunSpec()
Expand All @@ -105,12 +113,14 @@ func (r *Reconciler) executeUpdatingStage(
bindingSpec := binding.GetBindingSpec()
toBeUpdatedBindingsMap[bindingSpec.TargetCluster] = binding
}
finishedClusterCount := 0

// Go through each cluster in the stage and check if it's updated.
for i := range updatingStageStatus.Clusters {
finishedClusterCount := 0
clusterUpdatingCount := 0
var stuckClusterNames []string
var clusterUpdateErrors []error
// Go through each cluster in the stage and check if it's updating/succeeded/failed.
for i := 0; i < len(updatingStageStatus.Clusters) && clusterUpdatingCount < maxConcurrency; i++ {
clusterStatus := &updatingStageStatus.Clusters[i]
clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted))
clusterUpdateSucceededCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionSucceeded))
if condition.IsConditionStatusFalse(clusterUpdateSucceededCond, updateRun.GetGeneration()) {
// The cluster is marked as failed to update.
Expand All @@ -123,7 +133,9 @@ func (r *Reconciler) executeUpdatingStage(
finishedClusterCount++
continue
}
// The cluster is either updating or not started yet.
clusterUpdatingCount++
// The cluster needs to be processed.
clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted))
binding := toBeUpdatedBindingsMap[clusterStatus.ClusterName]
if !condition.IsConditionStatusTrue(clusterStartedCond, updateRun.GetGeneration()) {
// The cluster has not started updating yet.
Expand All @@ -138,11 +150,13 @@ func (r *Reconciler) executeUpdatingStage(
bindingSpec.ApplyStrategy = updateRunStatus.ApplyStrategy
if err := r.Client.Update(ctx, binding); err != nil {
klog.ErrorS(err, "Failed to update binding to be bound with the matching spec of the updateRun", "binding", klog.KObj(binding), "updateRun", updateRunRef)
return 0, controller.NewUpdateIgnoreConflictError(err)
clusterUpdateErrors = append(clusterUpdateErrors, controller.NewUpdateIgnoreConflictError(err))
continue
}
klog.V(2).InfoS("Updated the status of a binding to bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil {
return 0, err
clusterUpdateErrors = append(clusterUpdateErrors, err)
continue
}
} else {
klog.V(2).InfoS("Found the first binding that is updating but the cluster status has not been updated", "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
Expand All @@ -151,29 +165,33 @@ func (r *Reconciler) executeUpdatingStage(
bindingSpec.State = placementv1beta1.BindingStateBound
if err := r.Client.Update(ctx, binding); err != nil {
klog.ErrorS(err, "Failed to update a binding to be bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
return 0, controller.NewUpdateIgnoreConflictError(err)
clusterUpdateErrors = append(clusterUpdateErrors, controller.NewUpdateIgnoreConflictError(err))
continue
}
klog.V(2).InfoS("Updated the status of a binding to bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil {
return 0, err
clusterUpdateErrors = append(clusterUpdateErrors, err)
continue
}
} else if !condition.IsConditionStatusTrue(meta.FindStatusCondition(binding.GetBindingStatus().Conditions, string(placementv1beta1.ResourceBindingRolloutStarted)), binding.GetGeneration()) {
klog.V(2).InfoS("The binding is bound and up-to-date but the generation is updated by the scheduler, update rolloutStarted status again", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil {
return 0, err
clusterUpdateErrors = append(clusterUpdateErrors, err)
continue
}
} else {
if _, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun); updateErr != nil {
return clusterUpdatingWaitTime, updateErr
clusterUpdateErrors = append(clusterUpdateErrors, updateErr)
continue
}
}
}
markClusterUpdatingStarted(clusterStatus, updateRun.GetGeneration())
if finishedClusterCount == 0 {
markStageUpdatingStarted(updatingStageStatus, updateRun.GetGeneration())
}
// No need to continue as we only support one cluster updating at a time for now.
return clusterUpdatingWaitTime, nil
// Need to continue as we need to process at most maxConcurrency number of clusters in parallel.
continue
}

// Now the cluster has to be updating, the binding should point to the right resource snapshot and the binding should be bound.
Expand All @@ -190,24 +208,35 @@ func (r *Reconciler) executeUpdatingStage(
"bindingSpecInSync", inSync, "bindingState", bindingSpec.State,
"bindingRolloutStarted", rolloutStarted, "binding", klog.KObj(binding), "updateRun", updateRunRef)
markClusterUpdatingFailed(clusterStatus, updateRun.GetGeneration(), preemptedErr.Error())
return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, preemptedErr.Error())
clusterUpdateErrors = append(clusterUpdateErrors, fmt.Errorf("%w: %s", errStagedUpdatedAborted, preemptedErr.Error()))
continue
}

finished, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun)
if updateErr != nil {
clusterUpdateErrors = append(clusterUpdateErrors, updateErr)
}
if finished {
finishedClusterCount++
markUpdateRunProgressing(updateRun)
// The cluster has finished successfully, we can process another cluster in this round.
clusterUpdatingCount--
continue
} else {
// If cluster update has been running for more than "updateRunStuckThreshold", mark the update run as stuck.
timeElapsed := time.Since(clusterStartedCond.LastTransitionTime.Time)
if timeElapsed > updateRunStuckThreshold {
klog.V(2).InfoS("Time waiting for cluster update to finish passes threshold, mark the update run as stuck", "time elapsed", timeElapsed, "threshold", updateRunStuckThreshold, "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
markUpdateRunStuck(updateRun, updatingStageStatus.StageName, clusterStatus.ClusterName)
stuckClusterNames = append(stuckClusterNames, clusterStatus.ClusterName)
}
}
// No need to continue as we only support one cluster updating at a time for now.
return clusterUpdatingWaitTime, updateErr
}

// After processing maxConcurrency number of cluster, check if we need to mark the update run as stuck or progressing.
aggregateUpdateRunStatus(updateRun, updatingStageStatus.StageName, stuckClusterNames, finishedClusterCount)

// Aggregate and return errors.
if len(clusterUpdateErrors) > 0 {
return 0, utilerrors.NewAggregate(clusterUpdateErrors)
}

if finishedClusterCount == len(updatingStageStatus.Clusters) {
Expand All @@ -232,6 +261,7 @@ func (r *Reconciler) executeUpdatingStage(
}
return waitTime, nil
}
// Some clusters are still updating.
return clusterUpdatingWaitTime, nil
}

Expand Down Expand Up @@ -431,6 +461,35 @@ func (r *Reconciler) updateApprovalRequestAccepted(ctx context.Context, appReq p
return nil
}

// calculateMaxConcurrencyValue calculates the actual max concurrency value for a stage.
// It converts the IntOrString maxConcurrency (which can be an integer or percentage) to an integer value
// based on the total number of clusters in the stage. The value is rounded down.
func calculateMaxConcurrencyValue(status *placementv1beta1.UpdateRunStatus, stageIndex int) (int, error) {
specifiedMaxConcurrency := status.UpdateStrategySnapshot.Stages[stageIndex].MaxConcurrency
clusterCount := len(status.StagesStatus[stageIndex].Clusters)
// Round down the maxConcurrency to the number of clusters in the stage.
maxConcurrencyValue, err := intstr.GetScaledValueFromIntOrPercent(specifiedMaxConcurrency, clusterCount, false)
if err != nil {
return 0, err
}
// Handle the case where maxConcurrency is specified as percentage but results in 0 after scaling down.
if maxConcurrencyValue == 0 {
maxConcurrencyValue = 1
}
return maxConcurrencyValue, nil
}

// aggregateUpdateRunStatus aggregates the status of the update run based on the cluster update status.
// It marks the update run as stuck if any clusters are stuck, or as progressing if some clusters have finished updating.
func aggregateUpdateRunStatus(updateRun placementv1beta1.UpdateRunObj, stageName string, stuckClusterNames []string, finishedClusterCount int) {
if len(stuckClusterNames) > 0 {
markUpdateRunStuck(updateRun, stageName, strings.Join(stuckClusterNames, ", "))
} else if finishedClusterCount > 0 {
// If there is no stuck cluster but some progress has been made, mark the update run as progressing.
markUpdateRunProgressing(updateRun)
}
}

// isBindingSyncedWithClusterStatus checks if the binding is up-to-date with the cluster status.
func isBindingSyncedWithClusterStatus(resourceSnapshotName string, updateRun placementv1beta1.UpdateRunObj, binding placementv1beta1.BindingObj, cluster *placementv1beta1.ClusterUpdatingStatus) bool {
bindingSpec := binding.GetBindingSpec()
Expand Down Expand Up @@ -544,14 +603,14 @@ func markUpdateRunProgressingIfNotWaitingOrStuck(updateRun placementv1beta1.Upda
}

// markUpdateRunStuck marks the updateRun as stuck in memory.
func markUpdateRunStuck(updateRun placementv1beta1.UpdateRunObj, stageName, clusterName string) {
func markUpdateRunStuck(updateRun placementv1beta1.UpdateRunObj, stageName, clusterNames string) {
updateRunStatus := updateRun.GetUpdateRunStatus()
meta.SetStatusCondition(&updateRunStatus.Conditions, metav1.Condition{
Type: string(placementv1beta1.StagedUpdateRunConditionProgressing),
Status: metav1.ConditionFalse,
ObservedGeneration: updateRun.GetGeneration(),
Reason: condition.UpdateRunStuckReason,
Message: fmt.Sprintf("The updateRun is stuck waiting for cluster %s in stage %s to finish updating, please check placement status for potential errors", clusterName, stageName),
Message: fmt.Sprintf("The updateRun is stuck waiting for cluster(s) %s in stage %s to finish updating, please check placement status for potential errors", clusterNames, stageName),
})
}

Expand Down
Loading
Loading