From f593f2a98e052d27e7a7d2c391224c742dd5d924 Mon Sep 17 00:00:00 2001 From: Jose Vazquez Date: Wed, 9 Aug 2023 11:26:21 +0200 Subject: [PATCH] draft Signed-off-by: Jose Vazquez --- .../atlasdeployment_controller.go | 33 +- .../atlasdeployment/serverless_deployment.go | 16 +- .../serverless_private_endpoint.go | 103 +++++- .../serverless_private_endpoint_mock_test.go | 34 ++ .../serverless_private_endpoint_test.go | 301 ++++++++++++++++++ test/e2e/actions/project_flow.go | 2 + test/e2e/k8s/operator.go | 18 +- test/e2e/model/dataprovider.go | 53 ++- test/e2e/private_link_test.go | 4 +- test/e2e/serverless_pe_test.go | 32 +- 10 files changed, 544 insertions(+), 52 deletions(-) create mode 100644 pkg/controller/atlasdeployment/serverless_private_endpoint_mock_test.go create mode 100644 pkg/controller/atlasdeployment/serverless_private_endpoint_test.go diff --git a/pkg/controller/atlasdeployment/atlasdeployment_controller.go b/pkg/controller/atlasdeployment/atlasdeployment_controller.go index 21ed7843a5..d02664cca3 100644 --- a/pkg/controller/atlasdeployment/atlasdeployment_controller.go +++ b/pkg/controller/atlasdeployment/atlasdeployment_controller.go @@ -162,15 +162,6 @@ func (r *AtlasDeploymentReconciler) Reconcile(context context.Context, req ctrl. return result.ReconcileResult(), nil } - err = customresource.ApplyLastConfigApplied(context, deployment, r.Client) - if err != nil { - result = workflow.Terminate(workflow.Internal, err.Error()) - workflowCtx.SetConditionFromResult(status.DeploymentReadyType, result) - log.Error(result.GetMessage()) - - return result.ReconcileResult(), nil - } - if deployment.IsLegacyDeployment() { if err := ConvertLegacyDeployment(&deployment.Spec); err != nil { result = workflow.Terminate(workflow.Internal, err.Error()) @@ -188,7 +179,7 @@ func (r *AtlasDeploymentReconciler) Reconcile(context context.Context, req ctrl. } handleDeployment := r.selectDeploymentHandler(deployment) - if result, _ := handleDeployment(workflowCtx, project, deployment, req); !result.IsOk() { + if result, _ := handleDeployment(context, workflowCtx, project, deployment, req); !result.IsOk() { workflowCtx.SetConditionFromResult(status.DeploymentReadyType, result) return result.ReconcileResult(), nil } @@ -199,6 +190,16 @@ func (r *AtlasDeploymentReconciler) Reconcile(context context.Context, req ctrl. return result.ReconcileResult(), nil } } + + err = customresource.ApplyLastConfigApplied(context, deployment, r.Client) + if err != nil { + result = workflow.Terminate(workflow.Internal, err.Error()) + workflowCtx.SetConditionFromResult(status.DeploymentReadyType, result) + log.Error(result.GetMessage()) + + return result.ReconcileResult(), nil + } + return workflow.OK().ReconcileResult(), nil } @@ -358,7 +359,7 @@ func (r *AtlasDeploymentReconciler) selectDeploymentHandler(deployment *mdbv1.At } // handleAdvancedDeployment ensures the state of the deployment using the Advanced Deployment API -func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) { +func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) { c, result := r.ensureAdvancedDeploymentState(workflowCtx, project, deployment) if c != nil && c.StateName != "" { workflowCtx.EnsureStatusOption(status.AtlasDeploymentStateNameOption(c.StateName)) @@ -387,7 +388,7 @@ func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(workflowCtx *workfl } if err := r.ensureBackupScheduleAndPolicy( - context.Background(), + ctx, workflowCtx, project.ID(), deployment, backupEnabled, @@ -411,8 +412,8 @@ func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(workflowCtx *workfl } // handleServerlessInstance ensures the state of the serverless instance using the serverless API -func (r *AtlasDeploymentReconciler) handleServerlessInstance(workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) { - c, result := ensureServerlessInstanceState(workflowCtx, project, deployment.Spec.ServerlessSpec) +func (r *AtlasDeploymentReconciler) handleServerlessInstance(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) { + c, result := r.ensureServerlessInstanceState(ctx, workflowCtx, project, deployment) return r.ensureConnectionSecretsAndSetStatusOptions(workflowCtx, project, deployment, result, c) } @@ -579,7 +580,7 @@ func (r *AtlasDeploymentReconciler) removeDeletionFinalizer(context context.Cont return nil } -type deploymentHandlerFunc func(workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) +type deploymentHandlerFunc func(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) type atlasClusterType int @@ -687,7 +688,7 @@ func advancedDeploymentMatchesSpec(log *zap.SugaredLogger, atlasSpec *mongodbatl return d == "", nil } -// Parse through tags and verfiy that all keys are unique. Return error otherwise. +// Parse through tags and verify that all keys are unique. Return error otherwise. func uniqueKey(deploymentSpec *mdbv1.AtlasDeploymentSpec) error { store := make(map[string]string) var arrTags []*mdbv1.TagSpec diff --git a/pkg/controller/atlasdeployment/serverless_deployment.go b/pkg/controller/atlasdeployment/serverless_deployment.go index 45fe350c4e..82d129f4d6 100644 --- a/pkg/controller/atlasdeployment/serverless_deployment.go +++ b/pkg/controller/atlasdeployment/serverless_deployment.go @@ -13,8 +13,12 @@ import ( "github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/workflow" ) -func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasProject, serverlessSpec *mdbv1.ServerlessSpec) (atlasDeployment *mongodbatlas.Cluster, _ workflow.Result) { - atlasDeployment, resp, err := ctx.Client.ServerlessInstances.Get(context.Background(), project.Status.ID, serverlessSpec.Name) +func (r *AtlasDeploymentReconciler) ensureServerlessInstanceState(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment) (atlasDeployment *mongodbatlas.Cluster, _ workflow.Result) { + if deployment == nil || deployment.Spec.ServerlessSpec == nil { + return nil, workflow.Terminate(workflow.ServerlessPrivateEndpointReady, "deployment spec is empty") + } + serverlessSpec := deployment.Spec.ServerlessSpec + atlasDeployment, resp, err := workflowCtx.Client.ServerlessInstances.Get(context.Background(), project.Status.ID, serverlessSpec.Name) if err != nil { if resp == nil { return atlasDeployment, workflow.Terminate(workflow.Internal, err.Error()) @@ -28,8 +32,8 @@ func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasPr if err != nil { return atlasDeployment, workflow.Terminate(workflow.Internal, err.Error()) } - ctx.Log.Infof("Serverless Instance %s doesn't exist in Atlas - creating", serverlessSpec.Name) - atlasDeployment, _, err = ctx.Client.ServerlessInstances.Create(context.Background(), project.Status.ID, &mongodbatlas.ServerlessCreateRequestParams{ + workflowCtx.Log.Infof("Serverless Instance %s doesn't exist in Atlas - creating", serverlessSpec.Name) + atlasDeployment, _, err = workflowCtx.Client.ServerlessInstances.Create(context.Background(), project.Status.ID, &mongodbatlas.ServerlessCreateRequestParams{ Name: serverlessSpec.Name, ProviderSettings: &mongodbatlas.ServerlessProviderSettings{ BackingProviderName: serverlessSpec.ProviderSettings.BackingProviderName, @@ -53,7 +57,7 @@ func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasPr convertedDeployment.Tags = &[]*mongodbatlas.Tag{} } if !isTagsEqual(*(atlasDeployment.Tags), *(convertedDeployment.Tags)) { - atlasDeployment, _, err = ctx.Client.ServerlessInstances.Update(context.Background(), project.Status.ID, serverlessSpec.Name, &mongodbatlas.ServerlessUpdateRequestParams{ + atlasDeployment, _, err = workflowCtx.Client.ServerlessInstances.Update(context.Background(), project.Status.ID, serverlessSpec.Name, &mongodbatlas.ServerlessUpdateRequestParams{ // TODO: include ServerlessBackupOptions and TerminationProtectionEnabled Tag: convertedDeployment.Tags, }) @@ -62,7 +66,7 @@ func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasPr } return atlasDeployment, workflow.InProgress(workflow.DeploymentUpdating, "deployment is updating") } - result := ensureServerlessPrivateEndpoints(ctx, project.ID(), serverlessSpec, atlasDeployment.Name) + result := ensureServerlessPrivateEndpoints(ctx, workflowCtx, project.ID(), deployment, atlasDeployment.Name, r.SubObjectDeletionProtection) return atlasDeployment, result case status.StateCREATING: diff --git a/pkg/controller/atlasdeployment/serverless_private_endpoint.go b/pkg/controller/atlasdeployment/serverless_private_endpoint.go index 784d846461..3a14d2f2e0 100644 --- a/pkg/controller/atlasdeployment/serverless_private_endpoint.go +++ b/pkg/controller/atlasdeployment/serverless_private_endpoint.go @@ -2,7 +2,9 @@ package atlasdeployment import ( "context" + "encoding/json" "fmt" + "sort" "github.com/mongodb/mongodb-atlas-kubernetes/pkg/util/stringutil" @@ -14,6 +16,7 @@ import ( mdbv1 "github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1" "github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1/provider" + "github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/customresource" "github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/workflow" ) @@ -28,10 +31,30 @@ const ( SPEStatusFailed = "FAILED" //stage 2 ) -func ensureServerlessPrivateEndpoints(service *workflow.Context, groupID string, deploymentSpec *mdbv1.ServerlessSpec, deploymentName string) workflow.Result { - if deploymentSpec == nil { +func ensureServerlessPrivateEndpoints(ctx context.Context, service *workflow.Context, groupID string, deployment *mdbv1.AtlasDeployment, deploymentName string, protected bool) workflow.Result { + if deployment == nil || deployment.Spec.ServerlessSpec == nil { return workflow.Terminate(workflow.ServerlessPrivateEndpointReady, "deployment spec is empty") } + deploymentSpec := deployment.Spec.ServerlessSpec + + canReconcile, err := canServerlessPrivateEndpointsReconcile(ctx, service, protected, groupID, deployment) + if err != nil { + result := workflow.Terminate(workflow.Internal, fmt.Sprintf("unable to resolve ownership for deletion protection: %s", err)) + service.SetConditionFromResult(status.AlertConfigurationReadyType, result) + + return result + } + + if !canReconcile { + result := workflow.Terminate( + workflow.AtlasDeletionProtection, + "unable to reconcile Serverless Private Endpoints due to deletion protection being enabled. see https://dochub.mongodb.org/core/ako-deletion-protection for further information", + ) + service.SetConditionFromResult(status.AlertConfigurationReadyType, result) + + return result + } + providerName := GetServerlessProvider(deploymentSpec) if providerName == provider.ProviderGCP { if len(deploymentSpec.PrivateEndpoints) == 0 { @@ -57,6 +80,82 @@ func ensureServerlessPrivateEndpoints(service *workflow.Context, groupID string, return result } +func canServerlessPrivateEndpointsReconcile(ctx context.Context, service *workflow.Context, protected bool, groupID string, deployment *mdbv1.AtlasDeployment) (bool, error) { + if !protected { + return true, nil + } + + latestConfig := &mdbv1.AtlasDeploymentSpec{} + latestConfigString, ok := deployment.Annotations[customresource.AnnotationLastAppliedConfiguration] + if ok { + if err := json.Unmarshal([]byte(latestConfigString), latestConfig); err != nil { + return false, err + } + } + + atlasClient := service.Client + existingPE, err := getAllExistingServerlessPE(ctx, atlasClient.ServerlessPrivateEndpoints, groupID, deployment.Spec.ServerlessSpec.Name) + if err != nil { + return false, err + } + + if len(existingPE) == 0 { + return true, nil + } + + logger := service.Log + prevCfg := prevPEConfig(latestConfig) + if matchingPEs(logger, deployment.Spec.ServerlessSpec.PrivateEndpoints, existingPE) || + matchingPEs(logger, prevCfg, existingPE) { + return true, nil + } + return false, nil +} + +func sortedK8sPENames(spes []mdbv1.ServerlessPrivateEndpoint) []string { + names := make([]string, 0, len(spes)) + for _, spe := range spes { + names = append(names, spe.Name) + } + sort.Strings(names) + return names +} + +func sortedAtlasPENames(atlasPEs []mongodbatlas.ServerlessPrivateEndpointConnection) []string { + names := make([]string, 0, len(atlasPEs)) + for _, atlasPE := range atlasPEs { + names = append(names, atlasPE.Comment) + } + sort.Strings(names) + return names +} + +func matchingPEs(logger *zap.SugaredLogger, spes []mdbv1.ServerlessPrivateEndpoint, atlasPEs []mongodbatlas.ServerlessPrivateEndpointConnection) bool { + k8sPENames := sortedK8sPENames(spes) + atlasPENames := sortedAtlasPENames(atlasPEs) + if len(k8sPENames) != len(atlasPEs) { + logger.Debugf("Kubernetes PEs do not match Atlas: k8s %v != Atlas %v", k8sPENames, atlasPENames) + logger.Debugf("Different PE sets lengths Kubernetes wants %d but atlas has %d", len(k8sPENames), len(atlasPEs)) + return false + } + for i, k8sName := range k8sPENames { + if atlasPENames[i] != k8sName { + logger.Debugf("Kubernetes PEs do not match Atlas: k8s %v != Atlas %v", k8sPENames, atlasPENames) + logger.Debugf("Different PE at index %d %d but atlas has %d", k8sName, atlasPENames[i]) + return false + } + } + logger.Debugf("Kubernetes PEs MATCH Atlas: k8s %v == Atlas %v", k8sPENames, atlasPENames) + return true +} + +func prevPEConfig(deploymentSpec *mdbv1.AtlasDeploymentSpec) []mdbv1.ServerlessPrivateEndpoint { + if deploymentSpec.ServerlessSpec == nil || deploymentSpec.ServerlessSpec.PrivateEndpoints == nil { + return []mdbv1.ServerlessPrivateEndpoint{} + } + return deploymentSpec.ServerlessSpec.PrivateEndpoints +} + func GetServerlessProvider(deploymentSpec *mdbv1.ServerlessSpec) provider.ProviderName { if deploymentSpec.ProviderSettings.ProviderName != provider.ProviderServerless { return deploymentSpec.ProviderSettings.ProviderName diff --git a/pkg/controller/atlasdeployment/serverless_private_endpoint_mock_test.go b/pkg/controller/atlasdeployment/serverless_private_endpoint_mock_test.go new file mode 100644 index 0000000000..136c3bbdd7 --- /dev/null +++ b/pkg/controller/atlasdeployment/serverless_private_endpoint_mock_test.go @@ -0,0 +1,34 @@ +package atlasdeployment + +import ( + "context" + + "go.mongodb.org/atlas/mongodbatlas" +) + +type ServerlessPrivateEndpointClientMock struct { + ListFn func(string, string, *mongodbatlas.ListOptions) ([]mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) +} + +func (spec ServerlessPrivateEndpointClientMock) List(_ context.Context, groupID string, instanceName string, opts *mongodbatlas.ListOptions) ([]mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) { + if spec.ListFn == nil { + panic("ListFn not mocked for test") + } + return spec.ListFn(groupID, instanceName, opts) +} + +func (spec ServerlessPrivateEndpointClientMock) Create(ctx context.Context, groupID string, instanceName string, opts *mongodbatlas.ServerlessPrivateEndpointConnection) (*mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) { + panic("not implemented") // TODO: Implement +} + +func (spec ServerlessPrivateEndpointClientMock) Get(ctx context.Context, groupID string, instanceName string, opts string) (*mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) { + panic("not implemented") // TODO: Implement +} + +func (spec ServerlessPrivateEndpointClientMock) Delete(ctx context.Context, groupID string, instanceName string, opts string) (*mongodbatlas.Response, error) { + panic("not implemented") // TODO: Implement +} + +func (spec ServerlessPrivateEndpointClientMock) Update(_ context.Context, _ string, _ string, _ string, _ *mongodbatlas.ServerlessPrivateEndpointConnection) (*mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) { + panic("not implemented") // TODO: Implement +} diff --git a/pkg/controller/atlasdeployment/serverless_private_endpoint_test.go b/pkg/controller/atlasdeployment/serverless_private_endpoint_test.go new file mode 100644 index 0000000000..414e87e316 --- /dev/null +++ b/pkg/controller/atlasdeployment/serverless_private_endpoint_test.go @@ -0,0 +1,301 @@ +package atlasdeployment + +import ( + "context" + "encoding/json" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.mongodb.org/atlas/mongodbatlas" + "go.uber.org/zap" + + v1 "github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1" + "github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/customresource" + "github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/workflow" +) + +const ( + fakeInstanceName = "fake-instance-name" +) + +func TestCanReconcileServerlessPrivateEndpoints(t *testing.T) { + t.Run("when subResourceDeletionProtection is disabled", func(t *testing.T) { + protected := false + result, err := canServerlessPrivateEndpointsReconcile( + context.TODO(), + &workflow.Context{}, + protected, + "fake-project-id-wont-be-checked", + &v1.AtlasDeployment{}) + + require.NoError(t, err) + assert.True(t, result) + }) + + t.Run("when protected but there is no Atlas Serverless Endpoint configured", func(t *testing.T) { + ctx := context.Background() + client := mongodbatlas.Client{ + ServerlessPrivateEndpoints: ServerlessPrivateEndpointClientMock{ + ListFn: func(groupID string, instanceName string, opts *mongodbatlas.ListOptions) ([]mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) { + return []mongodbatlas.ServerlessPrivateEndpointConnection{}, nil, nil + }, + }, + } + deployment := sampleServerlessDeployment() + protected := true + workflowCtx := workflow.Context{Client: client} + + result, err := canServerlessPrivateEndpointsReconcile(ctx, &workflowCtx, protected, fakeProjectID, deployment) + + require.NoError(t, err) + assert.True(t, result) + }) + + t.Run("when protected but configs match", func(t *testing.T) { + ctx := context.Background() + endpointsConfig := sampleAtlasSPEConfig() + client := mongodbatlas.Client{ + ServerlessPrivateEndpoints: ServerlessPrivateEndpointClientMock{ + ListFn: func(groupID string, instanceName string, opts *mongodbatlas.ListOptions) ([]mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) { + return endpointsConfig, nil, nil + }, + }, + } + deployment := sampleAnnotatedServerlessDeployment(endpointsFrom(endpointsConfig)) + protected := true + workflowCtx := workflow.Context{Client: client, Log: debugLogger(t)} + + result, err := canServerlessPrivateEndpointsReconcile(ctx, &workflowCtx, protected, fakeProjectID, deployment) + + require.NoError(t, err) + assert.True(t, result) + }) + + t.Run("when protected but configs match, even with different order", func(t *testing.T) { + ctx := context.Background() + endpointsConfig := sampleAtlasSPEConfig() + client := mongodbatlas.Client{ + ServerlessPrivateEndpoints: ServerlessPrivateEndpointClientMock{ + ListFn: func(groupID string, instanceName string, opts *mongodbatlas.ListOptions) ([]mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) { + return endpointsConfig, nil, nil + }, + }, + } + deployment := sampleAnnotatedServerlessDeployment(reverse(endpointsFrom(endpointsConfig))) + protected := true + workflowCtx := workflow.Context{Client: client, Log: debugLogger(t)} + + result, err := canServerlessPrivateEndpointsReconcile(ctx, &workflowCtx, protected, fakeProjectID, deployment) + + require.NoError(t, err) + assert.True(t, result) + }) + + t.Run("when protected but only old configs matches", func(t *testing.T) { + ctx := context.Background() + endpointsConfig := sampleAtlasSPEConfig() + client := mongodbatlas.Client{ + ServerlessPrivateEndpoints: ServerlessPrivateEndpointClientMock{ + ListFn: func(groupID string, instanceName string, opts *mongodbatlas.ListOptions) ([]mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) { + return endpointsConfig, nil, nil + }, + }, + } + deployment := sampleAnnotatedServerlessDeployment(endpointsFrom(endpointsConfig)) + // remove all PEs in the current desired setup + deployment.Spec.ServerlessSpec.PrivateEndpoints = []v1.ServerlessPrivateEndpoint{} + protected := true + workflowCtx := workflow.Context{Client: client, Log: debugLogger(t)} + + result, err := canServerlessPrivateEndpointsReconcile(ctx, &workflowCtx, protected, fakeProjectID, deployment) + + require.NoError(t, err) + assert.True(t, result) + }) +} + +func TestCannotReconcileServerlessPrivateEndpoints(t *testing.T) { + t.Run("when configs do not match", func(t *testing.T) { + ctx := context.Background() + endpointsConfig := sampleAtlasSPEConfig() + client := mongodbatlas.Client{ + ServerlessPrivateEndpoints: ServerlessPrivateEndpointClientMock{ + ListFn: func(groupID string, instanceName string, opts *mongodbatlas.ListOptions) ([]mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) { + return endpointsConfig, nil, nil + }, + }, + } + endpoints := endpointsFrom(endpointsConfig) + endpoints[0].Name = "non-matching-fake-name" + deployment := sampleAnnotatedServerlessDeployment(endpoints) + protected := true + workflowCtx := workflow.Context{Client: client, Log: debugLogger(t)} + + result, err := canServerlessPrivateEndpointsReconcile(ctx, &workflowCtx, protected, fakeProjectID, deployment) + + require.NoError(t, err) + assert.False(t, result) + }) + + t.Run("when ownership cannot be assured (empty prior config)", func(t *testing.T) { + ctx := context.Background() + endpointsConfig := sampleAtlasSPEConfig() + client := mongodbatlas.Client{ + ServerlessPrivateEndpoints: ServerlessPrivateEndpointClientMock{ + ListFn: func(groupID string, instanceName string, opts *mongodbatlas.ListOptions) ([]mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) { + return endpointsConfig, nil, nil + }, + }, + } + deployment := sampleServerlessDeployment() + deployment.Annotations = map[string]string{ + customresource.AnnotationLastAppliedConfiguration: "{}", + } + protected := true + workflowCtx := workflow.Context{Client: client, Log: debugLogger(t)} + + result, err := canServerlessPrivateEndpointsReconcile(ctx, &workflowCtx, protected, fakeProjectID, deployment) + + require.NoError(t, err) + assert.False(t, result) + }) + + t.Run("when ownership cannot be assured (unset prior config)", func(t *testing.T) { + ctx := context.Background() + endpointsConfig := sampleAtlasSPEConfig() + client := mongodbatlas.Client{ + ServerlessPrivateEndpoints: ServerlessPrivateEndpointClientMock{ + ListFn: func(groupID string, instanceName string, opts *mongodbatlas.ListOptions) ([]mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) { + return endpointsConfig, nil, nil + }, + }, + } + deployment := sampleServerlessDeployment() + deployment.Annotations = map[string]string{} + protected := true + workflowCtx := workflow.Context{Client: client, Log: debugLogger(t)} + + result, err := canServerlessPrivateEndpointsReconcile(ctx, &workflowCtx, protected, fakeProjectID, deployment) + + require.NoError(t, err) + assert.False(t, result) + }) +} + +func TestCanReconcileServerlessPrivateEndpointsFail(t *testing.T) { + t.Run("when the old config is not a proper JSON", func(t *testing.T) { + ctx := context.Background() + client := mongodbatlas.Client{} + deployment := sampleServerlessDeployment() + deployment.Annotations = map[string]string{ + customresource.AnnotationLastAppliedConfiguration: "{", + } + protected := true + workflowCtx := workflow.Context{Client: client, Log: debugLogger(t)} + + result, err := canServerlessPrivateEndpointsReconcile(ctx, &workflowCtx, protected, fakeProjectID, deployment) + + require.False(t, result) + var aJSONError *json.SyntaxError + assert.ErrorAs(t, err, &aJSONError) + }) + + t.Run("when list fails in Atlas", func(t *testing.T) { + ctx := context.Background() + fakeError := fmt.Errorf("fake error from Atlas") + client := mongodbatlas.Client{ + ServerlessPrivateEndpoints: ServerlessPrivateEndpointClientMock{ + ListFn: func(groupID string, instanceName string, opts *mongodbatlas.ListOptions) ([]mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) { + return nil, nil, fakeError + }, + }, + } + deployment := sampleServerlessDeployment() + protected := true + workflowCtx := workflow.Context{Client: client, Log: debugLogger(t)} + + result, err := canServerlessPrivateEndpointsReconcile(ctx, &workflowCtx, protected, fakeProjectID, deployment) + + require.False(t, result) + assert.ErrorIs(t, err, fakeError) + }) +} + +func sampleServerlessDeployment() *v1.AtlasDeployment { + return &v1.AtlasDeployment{ + Spec: v1.AtlasDeploymentSpec{ + ServerlessSpec: &v1.ServerlessSpec{Name: fakeInstanceName}, + }, + } +} + +func sampleAnnotatedServerlessDeployment(endpoints []v1.ServerlessPrivateEndpoint) *v1.AtlasDeployment { + deployment := &v1.AtlasDeployment{ + Spec: v1.AtlasDeploymentSpec{ServerlessSpec: &v1.ServerlessSpec{ + Name: fakeInstanceName, + PrivateEndpoints: endpoints, + }}, + } + deployment.Annotations = map[string]string{ + customresource.AnnotationLastAppliedConfiguration: jsonize(deployment.Spec), + } + return deployment +} + +func sampleAtlasSPEConfig() []mongodbatlas.ServerlessPrivateEndpointConnection { + return []mongodbatlas.ServerlessPrivateEndpointConnection{ + { + ID: "fake-id-1", + CloudProviderEndpointID: "opaque-cloud-fake-id-1", + Comment: "fake-name-1", + Status: SPEStatusAvailable, + ProviderName: "AWS", + }, + { + ID: "fake-id-2", + CloudProviderEndpointID: "opaque-cloud-fake-id-2", + Comment: "fake-name-2", + Status: SPEStatusAvailable, + ProviderName: "Azure", + PrivateEndpointIPAddress: "11.11.10.0", + }, + } +} + +func endpointsFrom(configs []mongodbatlas.ServerlessPrivateEndpointConnection) []v1.ServerlessPrivateEndpoint { + endpoints := []v1.ServerlessPrivateEndpoint{} + for _, cfg := range configs { + endpoints = append(endpoints, v1.ServerlessPrivateEndpoint{ + Name: cfg.Comment, + CloudProviderEndpointID: cfg.CloudProviderEndpointID, + PrivateEndpointIPAddress: cfg.PrivateEndpointIPAddress, + }) + } + return endpoints +} + +func reverse(endpoints []v1.ServerlessPrivateEndpoint) []v1.ServerlessPrivateEndpoint { + reversed := make([]v1.ServerlessPrivateEndpoint, 0, len(endpoints)) + for i := len(endpoints) - 1; i >= 0; i-- { + reversed = append(reversed, endpoints[i]) + } + return reversed +} + +func jsonize(obj any) string { + jsonBytes, err := json.Marshal(obj) + if err != nil { + return err.Error() + } + return string(jsonBytes) +} + +func debugLogger(t *testing.T) *zap.SugaredLogger { + t.Helper() + + logger, err := zap.NewDevelopment() + require.NoError(t, err) + return logger.Sugar() +} diff --git a/test/e2e/actions/project_flow.go b/test/e2e/actions/project_flow.go index 13ebd54546..32ada2f646 100644 --- a/test/e2e/actions/project_flow.go +++ b/test/e2e/actions/project_flow.go @@ -48,6 +48,8 @@ func PrepareOperatorConfigurations(userData *model.TestDataProvider) manager.Man Namespace: userData.Resources.Namespace, Name: config.DefaultOperatorGlobalKey, }, + ObjectDeletionProtection: userData.ObjectDeletionProtection, + SubObjectDeletionProtection: userData.SubObjectDeletionProtection, }) Expect(err).NotTo(HaveOccurred()) return mgr diff --git a/test/e2e/k8s/operator.go b/test/e2e/k8s/operator.go index 9d71536173..ba8423f5c0 100644 --- a/test/e2e/k8s/operator.go +++ b/test/e2e/k8s/operator.go @@ -103,14 +103,16 @@ func BuildManager(initCfg *Config) (manager.Manager, error) { } if err = (&atlasdeployment.AtlasDeploymentReconciler{ - Client: mgr.GetClient(), - Log: logger.Named("controllers").Named("AtlasDeployment").Sugar(), - Scheme: mgr.GetScheme(), - AtlasDomain: config.AtlasDomain, - GlobalAPISecret: config.GlobalAPISecret, - ResourceWatcher: watch.NewResourceWatcher(), - GlobalPredicates: globalPredicates, - EventRecorder: mgr.GetEventRecorderFor("AtlasDeployment"), + Client: mgr.GetClient(), + Log: logger.Named("controllers").Named("AtlasDeployment").Sugar(), + Scheme: mgr.GetScheme(), + AtlasDomain: config.AtlasDomain, + GlobalAPISecret: config.GlobalAPISecret, + ResourceWatcher: watch.NewResourceWatcher(), + GlobalPredicates: globalPredicates, + EventRecorder: mgr.GetEventRecorderFor("AtlasDeployment"), + ObjectDeletionProtection: config.ObjectDeletionProtection, + SubObjectDeletionProtection: config.SubObjectDeletionProtection, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "AtlasDeployment") return nil, err diff --git a/test/e2e/model/dataprovider.go b/test/e2e/model/dataprovider.go index e10652f487..b15cdc3cc2 100644 --- a/test/e2e/model/dataprovider.go +++ b/test/e2e/model/dataprovider.go @@ -16,23 +16,30 @@ import ( v1 "github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1" ) +const ( + defaultE2EObjectProtectionDeletion = false + defaultE2ESubObjectProtectionDeletion = false +) + // Full Data set for the current test case type TestDataProvider struct { - ConfPaths []string // init deployments configuration - ConfUpdatePaths []string // update configuration - Resources UserInputs // struct of all user resoucers project,deployments,databaseusers - Actions []func(*TestDataProvider) // additional actions for the current data set - PortGroup int // ports for the test application starts from _ - SkipAppConnectivityCheck bool - Context context.Context - K8SClient client.Client - InitialDeployments []*v1.AtlasDeployment - Project *v1.AtlasProject - Prefix string - Users []*v1.AtlasDatabaseUser - Teams []*v1.AtlasTeam - ManagerContext context.Context - AWSResourcesGenerator *helper.AwsResourcesGenerator + ConfPaths []string // init deployments configuration + ConfUpdatePaths []string // update configuration + Resources UserInputs // struct of all user resources project, deployments, database users + Actions []func(*TestDataProvider) // additional actions for the current data set + PortGroup int // ports for the test application starts from _ + SkipAppConnectivityCheck bool + Context context.Context + K8SClient client.Client + InitialDeployments []*v1.AtlasDeployment + Project *v1.AtlasProject + Prefix string + Users []*v1.AtlasDatabaseUser + Teams []*v1.AtlasTeam + ManagerContext context.Context + AWSResourcesGenerator *helper.AwsResourcesGenerator + ObjectDeletionProtection bool + SubObjectDeletionProtection bool } func DataProviderWithResources(keyTestPrefix string, project AProject, r *AtlasKeyType, initDeploymentConfigs []string, updateDeploymentConfig []string, users []DBUser, portGroup int, actions []func(*TestDataProvider)) TestDataProvider { @@ -53,6 +60,9 @@ func DataProviderWithResources(keyTestPrefix string, project AProject, r *AtlasK data.AWSResourcesGenerator = helper.NewAwsResourcesGenerator(GinkgoT(), nil) + data.ObjectDeletionProtection = defaultE2EObjectProtectionDeletion + data.SubObjectDeletionProtection = defaultE2ESubObjectProtectionDeletion + return data } @@ -69,6 +79,9 @@ func DataProvider(keyTestPrefix string, r *AtlasKeyType, portGroup int, actions data.AWSResourcesGenerator = helper.NewAwsResourcesGenerator(GinkgoT(), nil) + data.ObjectDeletionProtection = defaultE2EObjectProtectionDeletion + data.SubObjectDeletionProtection = defaultE2ESubObjectProtectionDeletion + return &data } @@ -87,3 +100,13 @@ func (data TestDataProvider) WithUsers(users ...*v1.AtlasDatabaseUser) *TestData data.Users = append(data.Users, users...) return &data } + +func (data TestDataProvider) WithObjectDeletionProtection(protected bool) *TestDataProvider { + data.ObjectDeletionProtection = protected + return &data +} + +func (data TestDataProvider) WithSubObjectDeletionProtection(protected bool) *TestDataProvider { + data.SubObjectDeletionProtection = protected + return &data +} diff --git a/test/e2e/private_link_test.go b/test/e2e/private_link_test.go index eebf6ed0ec..e4c6d3c45e 100644 --- a/test/e2e/private_link_test.go +++ b/test/e2e/private_link_test.go @@ -148,11 +148,11 @@ var _ = Describe("UserLogin", Label("privatelink"), func() { ) }) -func privateFlow(userData *model.TestDataProvider, providerAction cloud.Provider, requstedPE []privateEndpoint) { +func privateFlow(userData *model.TestDataProvider, providerAction cloud.Provider, requestedPE []privateEndpoint) { By("Create Private Link and the rest users resources", func() { Expect(userData.K8SClient.Get(userData.Context, types.NamespacedName{Name: userData.Project.Name, Namespace: userData.Resources.Namespace}, userData.Project)).To(Succeed()) - for _, pe := range requstedPE { + for _, pe := range requestedPE { userData.Project.Spec.PrivateEndpoints = append(userData.Project.Spec.PrivateEndpoints, v1.PrivateEndpoint{ Provider: pe.provider, diff --git a/test/e2e/serverless_pe_test.go b/test/e2e/serverless_pe_test.go index 10546fb1b4..12f3ce4916 100644 --- a/test/e2e/serverless_pe_test.go +++ b/test/e2e/serverless_pe_test.go @@ -60,7 +60,7 @@ var _ = Describe("UserLogin", Label("serverless-pe"), func() { model.NewEmptyAtlasKeyType().UseDefaultFullAccess(), 40000, []func(*model.TestDataProvider){}, - ).WithProject(data.DefaultProject()).WithInitialDeployments(data.CreateServerlessDeployment("spetest1", "AWS", "US_EAST_1")), + ).WithProject(data.DefaultProject()).WithInitialDeployments(data.CreateServerlessDeployment("spe-test-1", "AWS", "US_EAST_1")), []v1.ServerlessPrivateEndpoint{ { Name: newRandomName("pe"), @@ -73,7 +73,7 @@ var _ = Describe("UserLogin", Label("serverless-pe"), func() { model.NewEmptyAtlasKeyType().UseDefaultFullAccess(), 40000, []func(*model.TestDataProvider){}, - ).WithProject(data.DefaultProject()).WithInitialDeployments(data.CreateServerlessDeployment("spetest3", "AZURE", "US_EAST_2")), + ).WithProject(data.DefaultProject()).WithInitialDeployments(data.CreateServerlessDeployment("spe-test-2", "AZURE", "US_EAST_2")), []v1.ServerlessPrivateEndpoint{ { Name: newRandomName("pe"), @@ -86,7 +86,7 @@ var _ = Describe("UserLogin", Label("serverless-pe"), func() { model.NewEmptyAtlasKeyType().UseDefaultFullAccess(), 40000, []func(*model.TestDataProvider){}, - ).WithProject(data.DefaultProject()).WithInitialDeployments(data.CreateServerlessDeployment("spetest3", "AZURE", "US_EAST_2")), + ).WithProject(data.DefaultProject()).WithInitialDeployments(data.CreateServerlessDeployment("spe-test-3", "AZURE", "US_EAST_2")), []v1.ServerlessPrivateEndpoint{ { Name: newRandomName("pe"), @@ -98,6 +98,32 @@ var _ = Describe("UserLogin", Label("serverless-pe"), func() { }, }, ), + Entry("Test[spe-aws-1]: Serverless deployment with one AWS PE (protected)", Label("spe-aws-4-protected"), + model.DataProvider( + "spe-aws-1-protected", + model.NewEmptyAtlasKeyType().UseDefaultFullAccess(), + 40000, + []func(*model.TestDataProvider){}, + ).WithProject(data.DefaultProject()).WithInitialDeployments(data.CreateServerlessDeployment("spe-test4-protected", "AWS", "US_EAST_1")).WithSubObjectDeletionProtection(true), + []v1.ServerlessPrivateEndpoint{ + { + Name: newRandomName("pe"), + }, + }, + ), + Entry("Test[spe-azure-1]: Serverless deployment with one Azure PE (protected)", Label("spe-azure-5-protected"), + model.DataProvider( + "spe-azure-1-protected", + model.NewEmptyAtlasKeyType().UseDefaultFullAccess(), + 40000, + []func(*model.TestDataProvider){}, + ).WithProject(data.DefaultProject()).WithInitialDeployments(data.CreateServerlessDeployment("spe-test-5-protected", "AZURE", "US_EAST_2")).WithSubObjectDeletionProtection(true), + []v1.ServerlessPrivateEndpoint{ + { + Name: newRandomName("pe"), + }, + }, + ), ) })