Skip to content

Commit 912d794

Browse files
authored
CLOUDP-289150: Fix private endpoint conflict between sub-resource and new CRD (#1998)
1 parent 2d8525f commit 912d794

File tree

6 files changed

+391
-67
lines changed

6 files changed

+391
-67
lines changed

pkg/controller/atlasproject/atlasproject_controller.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package atlasproject
1818

1919
import (
2020
"context"
21+
"encoding/json"
2122
"fmt"
2223

2324
"go.uber.org/zap"
@@ -333,3 +334,19 @@ func logIfWarning(ctx *workflow.Context, result workflow.Result) {
333334
ctx.Log.Warnw(result.GetMessage())
334335
}
335336
}
337+
338+
func lastSpecFrom(atlasProject *akov2.AtlasProject, annotation string) (*akov2.AtlasProjectSpec, error) {
339+
var lastApplied akov2.AtlasProject
340+
ann, ok := atlasProject.GetAnnotations()[annotation]
341+
342+
if !ok {
343+
return nil, nil
344+
}
345+
346+
err := json.Unmarshal([]byte(ann), &lastApplied.Spec)
347+
if err != nil {
348+
return nil, fmt.Errorf("error reading AtlasProject Spec from annotation [%s]: %w", annotation, err)
349+
}
350+
351+
return &lastApplied.Spec, nil
352+
}

pkg/controller/atlasproject/atlasproject_controller_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,3 +388,36 @@ func TestFindProjectsForTeams(t *testing.T) {
388388
})
389389
}
390390
}
391+
392+
func TestLastSpecFrom(t *testing.T) {
393+
tests := map[string]struct {
394+
annotations map[string]string
395+
expectedLastSpec *akov2.AtlasProjectSpec
396+
expectedError string
397+
}{
398+
399+
"should return nil when there is no last spec": {},
400+
"should return error when last spec annotation is wrong": {
401+
annotations: map[string]string{"mongodb.com/last-applied-configuration": "{wrong}"},
402+
expectedError: "error reading AtlasProject Spec from annotation [mongodb.com/last-applied-configuration]:" +
403+
" invalid character 'w' looking for beginning of object key string",
404+
},
405+
"should return last spec": {
406+
annotations: map[string]string{"mongodb.com/last-applied-configuration": "{\"name\": \"my-project\"}"},
407+
expectedLastSpec: &akov2.AtlasProjectSpec{
408+
Name: "my-project",
409+
},
410+
},
411+
}
412+
for name, tt := range tests {
413+
t.Run(name, func(t *testing.T) {
414+
p := &akov2.AtlasProject{}
415+
p.WithAnnotations(tt.annotations)
416+
lastSpec, err := lastSpecFrom(p, "mongodb.com/last-applied-configuration")
417+
if err != nil {
418+
assert.ErrorContains(t, err, tt.expectedError)
419+
}
420+
assert.Equal(t, tt.expectedLastSpec, lastSpec)
421+
})
422+
}
423+
}

pkg/controller/atlasproject/private_endpoint.go

Lines changed: 74 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,17 @@ func ensurePrivateEndpoint(workflowCtx *workflow.Context, project *akov2.AtlasPr
3636

3737
specPEs := project.Spec.DeepCopy().PrivateEndpoints
3838

39+
lastAppliedPEs, err := mapLastAppliedPrivateEndpoint(project)
40+
if err != nil {
41+
return workflow.Terminate(workflow.Internal, err.Error())
42+
}
43+
3944
atlasPEs, err := getAllPrivateEndpoints(workflowCtx.Context, workflowCtx.SdkClient, project.ID())
4045
if err != nil {
4146
return workflow.Terminate(workflow.Internal, err.Error())
4247
}
4348

44-
result, conditionType := syncPrivateEndpointsWithAtlas(workflowCtx, project.ID(), specPEs, atlasPEs)
49+
result, conditionType := syncPrivateEndpointsWithAtlas(workflowCtx, project.ID(), specPEs, atlasPEs, lastAppliedPEs)
4550
if !result.IsOk() {
4651
if conditionType == api.PrivateEndpointServiceReadyType {
4752
workflowCtx.UnsetCondition(api.PrivateEndpointReadyType)
@@ -50,13 +55,13 @@ func ensurePrivateEndpoint(workflowCtx *workflow.Context, project *akov2.AtlasPr
5055
return result
5156
}
5257

53-
if len(specPEs) == 0 && len(atlasPEs) == 0 {
58+
if (len(specPEs) == 0 && len(atlasPEs) == 0) || !hasManagedPrivateEndpoints(specPEs, atlasPEs, lastAppliedPEs) {
5459
workflowCtx.UnsetCondition(api.PrivateEndpointServiceReadyType)
5560
workflowCtx.UnsetCondition(api.PrivateEndpointReadyType)
5661
return workflow.OK()
5762
}
5863

59-
serviceStatus := getStatusForServices(workflowCtx, atlasPEs)
64+
serviceStatus := getStatusForServices(atlasPEs)
6065
if !serviceStatus.IsOk() {
6166
workflowCtx.SetConditionFromResult(api.PrivateEndpointServiceReadyType, serviceStatus)
6267
return serviceStatus
@@ -81,11 +86,17 @@ func ensurePrivateEndpoint(workflowCtx *workflow.Context, project *akov2.AtlasPr
8186
return interfaceStatus
8287
}
8388

84-
func syncPrivateEndpointsWithAtlas(ctx *workflow.Context, projectID string, specPEs []akov2.PrivateEndpoint, atlasPEs []atlasPE) (workflow.Result, api.ConditionType) {
89+
func syncPrivateEndpointsWithAtlas(
90+
ctx *workflow.Context,
91+
projectID string,
92+
specPEs []akov2.PrivateEndpoint,
93+
atlasPEs []atlasPE,
94+
lastAppliedPEs map[string]akov2.PrivateEndpoint,
95+
) (workflow.Result, api.ConditionType) {
8596
log := ctx.Log
8697

8798
log.Debugw("PE Connections", "atlasPEs", atlasPEs, "specPEs", specPEs)
88-
endpointsToDelete := getEndpointsNotInSpec(specPEs, atlasPEs)
99+
endpointsToDelete := getEndpointsNotInSpec(specPEs, atlasPEs, lastAppliedPEs)
89100
log.Debugf("Number of Private Endpoints to delete: %d", len(endpointsToDelete))
90101
if result := deletePrivateEndpointsFromAtlas(ctx, projectID, endpointsToDelete); !result.IsOk() {
91102
return result, api.PrivateEndpointServiceReadyType
@@ -115,12 +126,18 @@ func syncPrivateEndpointsWithAtlas(ctx *workflow.Context, projectID string, spec
115126
return workflow.OK(), api.PrivateEndpointReadyType
116127
}
117128

118-
func getStatusForServices(ctx *workflow.Context, atlasPEs []atlasPE) workflow.Result {
119-
allAvailable, failureMessage := areServicesAvailableOrFailed(atlasPEs)
120-
ctx.Log.Debugw("Get Status for Services", "allAvailable", allAvailable, "failureMessage", failureMessage)
121-
if failureMessage != "" {
122-
return workflow.Terminate(workflow.ProjectPEServiceIsNotReadyInAtlas, failureMessage)
129+
func getStatusForServices(atlasPEs []atlasPE) workflow.Result {
130+
allAvailable := true
131+
for _, conn := range atlasPEs {
132+
if isFailed(conn.GetStatus()) {
133+
return workflow.Terminate(workflow.ProjectPEServiceIsNotReadyInAtlas, conn.GetErrorMessage())
134+
}
135+
136+
if !isAvailable(conn.GetStatus()) {
137+
allAvailable = false
138+
}
123139
}
140+
124141
if !allAvailable {
125142
return notReadyServiceResult
126143
}
@@ -167,22 +184,6 @@ func getStatusForInterfaces(ctx *workflow.Context, projectID string, specPEs []a
167184
return workflow.OK()
168185
}
169186

170-
func areServicesAvailableOrFailed(atlasPeConnections []atlasPE) (allAvailable bool, failureMessage string) {
171-
allAvailable = true
172-
173-
for _, conn := range atlasPeConnections {
174-
if isFailed(conn.GetStatus()) {
175-
failureMessage = conn.GetErrorMessage()
176-
return
177-
}
178-
if !isAvailable(conn.GetStatus()) {
179-
allAvailable = false
180-
}
181-
}
182-
183-
return
184-
}
185-
186187
func updatePEStatusOption(ctx *workflow.Context, projectID string, newConnections, syncedConnections []atlasPE) {
187188
setPEStatusOption(ctx, projectID, syncedConnections)
188189
addPEStatusOption(ctx, projectID, newConnections)
@@ -332,14 +333,19 @@ func endpointDefinedInSpec(specEndpoint akov2.PrivateEndpoint) bool {
332333
return specEndpoint.ID != "" || specEndpoint.EndpointGroupName != ""
333334
}
334335

335-
func DeleteAllPrivateEndpoints(ctx *workflow.Context, projectID string) workflow.Result {
336-
atlasPEs, err := getAllPrivateEndpoints(ctx.Context, ctx.SdkClient, projectID)
336+
func DeleteAllPrivateEndpoints(ctx *workflow.Context, atlasProject *akov2.AtlasProject) workflow.Result {
337+
atlasPEs, err := getAllPrivateEndpoints(ctx.Context, ctx.SdkClient, atlasProject.ID())
337338
if err != nil {
338339
return workflow.Terminate(workflow.Internal, err.Error())
339340
}
340341

341-
endpointsToDelete := getEndpointsNotInSpec([]akov2.PrivateEndpoint{}, atlasPEs)
342-
return deletePrivateEndpointsFromAtlas(ctx, projectID, endpointsToDelete)
342+
lastAppliedSpecPEs, err := mapLastAppliedPrivateEndpoint(atlasProject)
343+
if err != nil {
344+
return workflow.Terminate(workflow.Internal, err.Error())
345+
}
346+
347+
endpointsToDelete := getEndpointsNotInSpec([]akov2.PrivateEndpoint{}, atlasPEs, lastAppliedSpecPEs)
348+
return deletePrivateEndpointsFromAtlas(ctx, atlasProject.ID(), endpointsToDelete)
343349
}
344350

345351
func deletePrivateEndpointsFromAtlas(ctx *workflow.Context, projectID string, listsToRemove []atlasPE) workflow.Result {
@@ -516,9 +522,16 @@ func terminateWithError(ctx *workflow.Context, conditionType api.ConditionType,
516522
var notReadyServiceResult = workflow.InProgress(workflow.ProjectPEServiceIsNotReadyInAtlas, "Private Endpoint Service is not ready")
517523
var notReadyInterfaceResult = workflow.InProgress(workflow.ProjectPEInterfaceIsNotReadyInAtlas, "Interface Private Endpoint is not ready")
518524

519-
func getEndpointsNotInSpec(specPEs []akov2.PrivateEndpoint, atlasPEs []atlasPE) []atlasPE {
520-
uniqueItems, _ := getUniqueDifference(atlasPEs, specPEs)
521-
return uniqueItems
525+
func getEndpointsNotInSpec(specPEs []akov2.PrivateEndpoint, atlasPEs []atlasPE, lastAppliedPEs map[string]akov2.PrivateEndpoint) []atlasPE {
526+
notInSpecItems, _ := getUniqueDifference(atlasPEs, specPEs)
527+
toDelete := make([]atlasPE, 0, len(notInSpecItems))
528+
for _, item := range notInSpecItems {
529+
if _, ok := lastAppliedPEs[item.Identifier().(string)]; ok {
530+
toDelete = append(toDelete, item)
531+
}
532+
}
533+
534+
return toDelete
522535
}
523536

524537
func getEndpointsNotInAtlas(specPEs []akov2.PrivateEndpoint, atlasPEs []atlasPE) (toCreate []akov2.PrivateEndpoint, counts []int) {
@@ -585,3 +598,31 @@ func hasSkippedPrivateEndpointConfiguration(atlasProject *akov2.AtlasProject) (b
585598

586599
return false, nil
587600
}
601+
602+
func mapLastAppliedPrivateEndpoint(atlasProject *akov2.AtlasProject) (map[string]akov2.PrivateEndpoint, error) {
603+
lastApplied, err := lastSpecFrom(atlasProject, customresource.AnnotationLastAppliedConfiguration)
604+
if err != nil {
605+
return nil, err
606+
}
607+
608+
if lastApplied == nil || len(lastApplied.PrivateEndpoints) == 0 {
609+
return nil, nil
610+
}
611+
612+
result := map[string]akov2.PrivateEndpoint{}
613+
for _, pe := range lastApplied.PrivateEndpoints {
614+
result[pe.Identifier().(string)] = pe
615+
}
616+
617+
return result, nil
618+
}
619+
620+
func hasManagedPrivateEndpoints(specPEs []akov2.PrivateEndpoint, atlasPEs []atlasPE, lastAppliedPEs map[string]akov2.PrivateEndpoint) bool {
621+
for _, pe := range atlasPEs {
622+
if _, ok := lastAppliedPEs[pe.Identifier().(string)]; ok {
623+
return true
624+
}
625+
}
626+
627+
return len(set.DeprecatedDifference(specPEs, atlasPEs)) == 0
628+
}

0 commit comments

Comments
 (0)