@@ -27,6 +27,7 @@ import (
2727 apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
2828 "k8s.io/apimachinery/pkg/api/meta"
2929 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30+ "k8s.io/apimachinery/pkg/util/intstr"
3031 "k8s.io/utils/ptr"
3132 "sigs.k8s.io/controller-runtime/pkg/client"
3233
@@ -37,13 +38,13 @@ import (
3738
3839const (
3940 // The current stage wait between clusters are 15 seconds
40- updateRunEventuallyDuration = time .Minute
41-
42- resourceSnapshotIndex1st = "0"
43- resourceSnapshotIndex2nd = "1"
44- policySnapshotIndex1st = "0"
45- policySnapshotIndex2nd = "1"
46- policySnapshotIndex3rd = "2"
41+ updateRunEventuallyDuration = time .Minute
42+ updateRunParallelEventuallyDuration = 20 * time . Second
43+ resourceSnapshotIndex1st = "0"
44+ resourceSnapshotIndex2nd = "1"
45+ policySnapshotIndex1st = "0"
46+ policySnapshotIndex2nd = "1"
47+ policySnapshotIndex3rd = "2"
4748
4849 testConfigMapDataValue = "new"
4950)
@@ -1270,6 +1271,95 @@ var _ = Describe("test CRP rollout with staged update run", func() {
12701271 }
12711272 })
12721273 })
1274+
1275+ Context ("Test parallel cluster updates with maxConcurrency set to 3" , Ordered , func () {
1276+ var strategy * placementv1beta1.ClusterStagedUpdateStrategy
1277+ updateRunName := fmt .Sprintf (clusterStagedUpdateRunNameWithSubIndexTemplate , GinkgoParallelProcess (), 0 )
1278+
1279+ BeforeAll (func () {
1280+ // Create a test namespace and a configMap inside it on the hub cluster.
1281+ createWorkResources ()
1282+
1283+ // Create the CRP with external rollout strategy.
1284+ crp := & placementv1beta1.ClusterResourcePlacement {
1285+ ObjectMeta : metav1.ObjectMeta {
1286+ Name : crpName ,
1287+ // Add a custom finalizer; this would allow us to better observe
1288+ // the behavior of the controllers.
1289+ Finalizers : []string {customDeletionBlockerFinalizer },
1290+ },
1291+ Spec : placementv1beta1.PlacementSpec {
1292+ ResourceSelectors : workResourceSelector (),
1293+ Strategy : placementv1beta1.RolloutStrategy {
1294+ Type : placementv1beta1 .ExternalRolloutStrategyType ,
1295+ },
1296+ },
1297+ }
1298+ Expect (hubClient .Create (ctx , crp )).To (Succeed (), "Failed to create CRP" )
1299+
1300+ // Create a strategy with a single stage selecting all 3 clusters with maxConcurrency=3
1301+ strategy = & placementv1beta1.ClusterStagedUpdateStrategy {
1302+ ObjectMeta : metav1.ObjectMeta {
1303+ Name : strategyName ,
1304+ },
1305+ Spec : placementv1beta1.UpdateStrategySpec {
1306+ Stages : []placementv1beta1.StageConfig {
1307+ {
1308+ Name : "parallel" ,
1309+ // Pick all clusters in a single stage
1310+ LabelSelector : & metav1.LabelSelector {},
1311+ MaxConcurrency : & intstr.IntOrString {Type : intstr .Int , IntVal : 3 },
1312+ },
1313+ },
1314+ },
1315+ }
1316+ Expect (hubClient .Create (ctx , strategy )).To (Succeed (), "Failed to create ClusterStagedUpdateStrategy with maxConcurrency=3" )
1317+ })
1318+
1319+ AfterAll (func () {
1320+ // Remove the custom deletion blocker finalizer from the CRP.
1321+ ensureCRPAndRelatedResourcesDeleted (crpName , allMemberClusters )
1322+
1323+ // Delete the clusterStagedUpdateRun.
1324+ ensureClusterStagedUpdateRunDeletion (updateRunName )
1325+
1326+ // Delete the clusterStagedUpdateStrategy.
1327+ ensureClusterUpdateRunStrategyDeletion (strategyName )
1328+ })
1329+
1330+ It ("Should not rollout any resources to member clusters as there's no update run yet" , checkIfRemovedWorkResourcesFromAllMemberClustersConsistently )
1331+
1332+ It ("Should have the latest resource snapshot" , func () {
1333+ validateLatestClusterResourceSnapshot (crpName , resourceSnapshotIndex1st )
1334+ })
1335+
1336+ It ("Should successfully schedule the crp" , func () {
1337+ validateLatestClusterSchedulingPolicySnapshot (crpName , policySnapshotIndex1st , 3 )
1338+ })
1339+
1340+ It ("Should update crp status as pending rollout" , func () {
1341+ crpStatusUpdatedActual := crpStatusWithExternalStrategyActual (nil , "" , false , allMemberClusterNames , []string {"" , "" , "" }, []bool {false , false , false }, nil , nil )
1342+ Eventually (crpStatusUpdatedActual , eventuallyDuration , eventuallyInterval ).Should (Succeed (), "Failed to update CRP %s status as expected" , crpName )
1343+ })
1344+
1345+ It ("Should create a cluster staged update run successfully" , func () {
1346+ createClusterStagedUpdateRunSucceed (updateRunName , crpName , resourceSnapshotIndex1st , strategyName )
1347+ })
1348+
1349+ It ("Should complete the cluster staged update run in 15s with all 3 clusters updated in parallel" , func () {
1350+ // With maxConcurrency=3, all 3 clusters should be updated in parallel.
1351+ // Each cluster waits 15 seconds, so total time should be under 20s.
1352+ csurSucceededActual := clusterStagedUpdateRunStatusSucceededActual (updateRunName , policySnapshotIndex1st , len (allMemberClusters ), defaultApplyStrategy , & strategy .Spec , [][]string {{allMemberClusterNames [0 ], allMemberClusterNames [1 ], allMemberClusterNames [2 ]}}, nil , nil , nil )
1353+ Eventually (csurSucceededActual , updateRunParallelEventuallyDuration , eventuallyInterval ).Should (Succeed (), "Failed to validate updateRun %s succeeded within 15s" , updateRunName )
1354+ checkIfPlacedWorkResourcesOnMemberClustersInUpdateRun (allMemberClusters )
1355+ })
1356+
1357+ It ("Should update crp status as completed" , func () {
1358+ crpStatusUpdatedActual := crpStatusWithExternalStrategyActual (workResourceIdentifiers (), resourceSnapshotIndex1st , true , allMemberClusterNames ,
1359+ []string {resourceSnapshotIndex1st , resourceSnapshotIndex1st , resourceSnapshotIndex1st }, []bool {true , true , true }, nil , nil )
1360+ Eventually (crpStatusUpdatedActual , eventuallyDuration , eventuallyInterval ).Should (Succeed (), "Failed to update CRP %s status as expected" , crpName )
1361+ })
1362+ })
12731363})
12741364
12751365// Note that this container cannot run in parallel with other containers.
0 commit comments