Skip to content

Commit 9351583

Browse files
author
Arvind Thirumurugan
committed
feat: process clusters in parallel within stage
Signed-off-by: Arvind Thirumurugan <arvindth@microsoft.com>
1 parent 2fd4460 commit 9351583

File tree

1 file changed

+48
-20
lines changed

1 file changed

+48
-20
lines changed

pkg/controllers/updaterun/execution.go

Lines changed: 48 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"fmt"
2323
"reflect"
2424
"strconv"
25+
"strings"
2526
"time"
2627

2728
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -68,7 +69,7 @@ func (r *Reconciler) execute(
6869
updateRunStatus := updateRun.GetUpdateRunStatus()
6970
if updatingStageIndex < len(updateRunStatus.StagesStatus) {
7071
updatingStage := &updateRunStatus.StagesStatus[updatingStageIndex]
71-
waitTime, execErr := r.executeUpdatingStage(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings)
72+
waitTime, execErr := r.executeUpdatingStage(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings, 1)
7273
if errors.Is(execErr, errStagedUpdatedAborted) {
7374
markStageUpdatingFailed(updatingStage, updateRun.GetGeneration(), execErr.Error())
7475
return true, waitTime, execErr
@@ -91,6 +92,7 @@ func (r *Reconciler) executeUpdatingStage(
9192
updateRun placementv1beta1.UpdateRunObj,
9293
updatingStageIndex int,
9394
toBeUpdatedBindings []placementv1beta1.BindingObj,
95+
maxConcurrency int,
9496
) (time.Duration, error) {
9597
updateRunStatus := updateRun.GetUpdateRunStatus()
9698
updateRunSpec := updateRun.GetUpdateRunSpec()
@@ -106,23 +108,40 @@ func (r *Reconciler) executeUpdatingStage(
106108
toBeUpdatedBindingsMap[bindingSpec.TargetCluster] = binding
107109
}
108110
finishedClusterCount := 0
111+
clusterUpdatingCount := 0
109112

110-
// Go through each cluster in the stage and check if it's updated.
111-
for i := range updatingStageStatus.Clusters {
113+
// List of clusters that need to be processed in parallel in this execution.
114+
var clustersToProcess []placementv1beta1.ClusterUpdatingStatus
115+
116+
// Go through each cluster in the stage and check if it's updating/succeeded/failed.
117+
for i := 0; i < len(updatingStageStatus.Clusters) && clusterUpdatingCount <= maxConcurrency; i++ {
112118
clusterStatus := &updatingStageStatus.Clusters[i]
113-
clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted))
119+
//clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted))
114120
clusterUpdateSucceededCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionSucceeded))
115-
if condition.IsConditionStatusFalse(clusterUpdateSucceededCond, updateRun.GetGeneration()) {
116-
// The cluster is marked as failed to update.
117-
failedErr := fmt.Errorf("the cluster `%s` in the stage %s has failed", clusterStatus.ClusterName, updatingStageStatus.StageName)
118-
klog.ErrorS(failedErr, "The cluster has failed to be updated", "updateRun", updateRunRef)
119-
return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, failedErr.Error())
120-
}
121-
if condition.IsConditionStatusTrue(clusterUpdateSucceededCond, updateRun.GetGeneration()) {
122-
// The cluster has been updated successfully.
123-
finishedClusterCount++
124-
continue
121+
if clusterUpdateSucceededCond == nil {
122+
// The cluster is either updating or not started yet.
123+
clustersToProcess = append(clustersToProcess, updatingStageStatus.Clusters[i])
124+
clusterUpdatingCount++
125+
} else {
126+
if condition.IsConditionStatusFalse(clusterUpdateSucceededCond, updateRun.GetGeneration()) {
127+
// The cluster is marked as failed to update.
128+
failedErr := fmt.Errorf("the cluster `%s` in the stage %s has failed", clusterStatus.ClusterName, updatingStageStatus.StageName)
129+
klog.ErrorS(failedErr, "The cluster has failed to be updated", "updateRun", updateRunRef)
130+
return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, failedErr.Error())
131+
}
132+
if condition.IsConditionStatusTrue(clusterUpdateSucceededCond, updateRun.GetGeneration()) {
133+
// The cluster has been updated successfully.
134+
finishedClusterCount++
135+
continue
136+
}
125137
}
138+
}
139+
140+
var stuckClusterNames []string
141+
// Now go through each cluster that needs to be processed.
142+
for i := range clustersToProcess {
143+
clusterStatus := &clustersToProcess[i]
144+
clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted))
126145
// The cluster is either updating or not started yet.
127146
binding := toBeUpdatedBindingsMap[clusterStatus.ClusterName]
128147
if !condition.IsConditionStatusTrue(clusterStartedCond, updateRun.GetGeneration()) {
@@ -172,8 +191,8 @@ func (r *Reconciler) executeUpdatingStage(
172191
if finishedClusterCount == 0 {
173192
markStageUpdatingStarted(updatingStageStatus, updateRun.GetGeneration())
174193
}
175-
// No need to continue as we only support one cluster updating at a time for now.
176-
return clusterUpdatingWaitTime, nil
194+
// Need to continue as we need to process at most maxConcurrency number of clusters in parallel.
195+
continue
177196
}
178197

179198
// Now the cluster has to be updating, the binding should point to the right resource snapshot and the binding should be bound.
@@ -194,20 +213,29 @@ func (r *Reconciler) executeUpdatingStage(
194213
}
195214

196215
finished, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun)
216+
if updateErr != nil {
217+
return clusterUpdatingWaitTime, updateErr
218+
}
197219
if finished {
198220
finishedClusterCount++
199-
markUpdateRunProgressing(updateRun)
200221
continue
201222
} else {
202223
// If cluster update has been running for more than "updateRunStuckThreshold", mark the update run as stuck.
203224
timeElapsed := time.Since(clusterStartedCond.LastTransitionTime.Time)
204225
if timeElapsed > updateRunStuckThreshold {
205226
klog.V(2).InfoS("Time waiting for cluster update to finish passes threshold, mark the update run as stuck", "time elapsed", timeElapsed, "threshold", updateRunStuckThreshold, "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
206-
markUpdateRunStuck(updateRun, updatingStageStatus.StageName, clusterStatus.ClusterName)
227+
stuckClusterNames = append(stuckClusterNames, clusterStatus.ClusterName)
207228
}
208229
}
209-
// No need to continue as we only support one cluster updating at a time for now.
210-
return clusterUpdatingWaitTime, updateErr
230+
// Need to continue as we need to process at most maxConcurrency number of clusters in parallel.
231+
}
232+
233+
// After processing maxConcurrency number of cluster, check if we need to mark the update run as stuck or progressing.
234+
if len(stuckClusterNames) > 0 {
235+
markUpdateRunStuck(updateRun, updatingStageStatus.StageName, strings.Join(stuckClusterNames, ", "))
236+
} else if finishedClusterCount > 0 {
237+
// If there is no stuck cluster but some progress has been made, mark the update run as progressing.
238+
markUpdateRunProgressing(updateRun)
211239
}
212240

213241
if finishedClusterCount == len(updatingStageStatus.Clusters) {

0 commit comments

Comments
 (0)