diff --git a/pkg/api/v1/zz_generated.deepcopy.go b/pkg/api/v1/zz_generated.deepcopy.go index 855f9791fa..12b7eae21e 100644 --- a/pkg/api/v1/zz_generated.deepcopy.go +++ b/pkg/api/v1/zz_generated.deepcopy.go @@ -14,9 +14,10 @@ a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 package v1 import ( + "k8s.io/apimachinery/pkg/runtime" + "github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1/common" "github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1/project" - "k8s.io/apimachinery/pkg/runtime" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. diff --git a/pkg/controller/atlasdeployment/atlasdeployment_controller.go b/pkg/controller/atlasdeployment/atlasdeployment_controller.go index 79b4665a1e..91c4f9935e 100644 --- a/pkg/controller/atlasdeployment/atlasdeployment_controller.go +++ b/pkg/controller/atlasdeployment/atlasdeployment_controller.go @@ -181,7 +181,7 @@ func (r *AtlasDeploymentReconciler) Reconcile(context context.Context, req ctrl. } handleDeployment := r.selectDeploymentHandler(deployment) - if result, _ := handleDeployment(workflowCtx, project, deployment, req); !result.IsOk() { + if result, _ := handleDeployment(context, workflowCtx, project, deployment, req); !result.IsOk() { workflowCtx.SetConditionFromResult(status.DeploymentReadyType, result) return result.ReconcileResult(), nil } @@ -352,7 +352,7 @@ func (r *AtlasDeploymentReconciler) selectDeploymentHandler(deployment *mdbv1.At } // handleAdvancedDeployment ensures the state of the deployment using the Advanced Deployment API -func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) { +func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) { c, result := r.ensureAdvancedDeploymentState(workflowCtx, project, deployment) if c != nil && c.StateName != "" { workflowCtx.EnsureStatusOption(status.AtlasDeploymentStateNameOption(c.StateName)) @@ -381,7 +381,7 @@ func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(workflowCtx *workfl } if err := r.ensureBackupScheduleAndPolicy( - context.Background(), + ctx, workflowCtx, project.ID(), deployment, backupEnabled, @@ -405,8 +405,8 @@ func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(workflowCtx *workfl } // handleServerlessInstance ensures the state of the serverless instance using the serverless API -func (r *AtlasDeploymentReconciler) handleServerlessInstance(workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) { - c, result := ensureServerlessInstanceState(workflowCtx, project, deployment.Spec.ServerlessSpec) +func (r *AtlasDeploymentReconciler) handleServerlessInstance(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) { + c, result := r.ensureServerlessInstanceState(ctx, workflowCtx, project, deployment) return r.ensureConnectionSecretsAndSetStatusOptions(workflowCtx, project, deployment, result, c) } @@ -573,7 +573,7 @@ func (r *AtlasDeploymentReconciler) removeDeletionFinalizer(context context.Cont return nil } -type deploymentHandlerFunc func(workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) +type deploymentHandlerFunc func(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) type atlasClusterType int diff --git a/pkg/controller/atlasdeployment/serverless_deployment.go b/pkg/controller/atlasdeployment/serverless_deployment.go index 42dbc6bfaf..7c50570e40 100644 --- a/pkg/controller/atlasdeployment/serverless_deployment.go +++ b/pkg/controller/atlasdeployment/serverless_deployment.go @@ -13,8 +13,12 @@ import ( "github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/workflow" ) -func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasProject, serverlessSpec *mdbv1.ServerlessSpec) (atlasDeployment *mongodbatlas.Cluster, _ workflow.Result) { - atlasDeployment, resp, err := ctx.Client.ServerlessInstances.Get(context.Background(), project.Status.ID, serverlessSpec.Name) +func (r *AtlasDeploymentReconciler) ensureServerlessInstanceState(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment) (atlasDeployment *mongodbatlas.Cluster, _ workflow.Result) { + if deployment == nil || deployment.Spec.ServerlessSpec == nil { + return nil, workflow.Terminate(workflow.ServerlessPrivateEndpointReady, "deployment spec is empty") + } + serverlessSpec := deployment.Spec.ServerlessSpec + atlasDeployment, resp, err := workflowCtx.Client.ServerlessInstances.Get(context.Background(), project.Status.ID, serverlessSpec.Name) if err != nil { if resp == nil { return atlasDeployment, workflow.Terminate(workflow.Internal, err.Error()) @@ -24,8 +28,8 @@ func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasPr return atlasDeployment, workflow.Terminate(workflow.DeploymentNotCreatedInAtlas, err.Error()) } - ctx.Log.Infof("Serverless Instance %s doesn't exist in Atlas - creating", serverlessSpec.Name) - atlasDeployment, _, err = ctx.Client.ServerlessInstances.Create(context.Background(), project.Status.ID, &mongodbatlas.ServerlessCreateRequestParams{ + workflowCtx.Log.Infof("Serverless Instance %s doesn't exist in Atlas - creating", serverlessSpec.Name) + atlasDeployment, _, err = workflowCtx.Client.ServerlessInstances.Create(context.Background(), project.Status.ID, &mongodbatlas.ServerlessCreateRequestParams{ Name: serverlessSpec.Name, ProviderSettings: &mongodbatlas.ServerlessProviderSettings{ BackingProviderName: serverlessSpec.ProviderSettings.BackingProviderName, @@ -40,7 +44,7 @@ func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasPr switch atlasDeployment.StateName { case status.StateIDLE: - result := ensureServerlessPrivateEndpoints(ctx, project.ID(), serverlessSpec, atlasDeployment.Name) + result := ensureServerlessPrivateEndpoints(ctx, workflowCtx, project.ID(), deployment, atlasDeployment.Name, false) return atlasDeployment, result case status.StateCREATING: return atlasDeployment, workflow.InProgress(workflow.DeploymentCreating, "deployment is provisioning") diff --git a/pkg/controller/atlasdeployment/serverless_private_endpoint.go b/pkg/controller/atlasdeployment/serverless_private_endpoint.go index 784d846461..c0341226d1 100644 --- a/pkg/controller/atlasdeployment/serverless_private_endpoint.go +++ b/pkg/controller/atlasdeployment/serverless_private_endpoint.go @@ -2,6 +2,7 @@ package atlasdeployment import ( "context" + "encoding/json" "fmt" "github.com/mongodb/mongodb-atlas-kubernetes/pkg/util/stringutil" @@ -14,6 +15,7 @@ import ( mdbv1 "github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1" "github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1/provider" + "github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/customresource" "github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/workflow" ) @@ -28,10 +30,30 @@ const ( SPEStatusFailed = "FAILED" //stage 2 ) -func ensureServerlessPrivateEndpoints(service *workflow.Context, groupID string, deploymentSpec *mdbv1.ServerlessSpec, deploymentName string) workflow.Result { - if deploymentSpec == nil { +func ensureServerlessPrivateEndpoints(ctx context.Context, service *workflow.Context, groupID string, deployment *mdbv1.AtlasDeployment, deploymentName string, protected bool) workflow.Result { + if deployment == nil || deployment.Spec.ServerlessSpec == nil { return workflow.Terminate(workflow.ServerlessPrivateEndpointReady, "deployment spec is empty") } + deploymentSpec := deployment.Spec.ServerlessSpec + + canReconcile, err := canServerlessPrivateEndpointsReconcile(ctx, service, protected, groupID, deployment) + if err != nil { + result := workflow.Terminate(workflow.Internal, fmt.Sprintf("unable to resolve ownership for deletion protection: %s", err)) + service.SetConditionFromResult(status.AlertConfigurationReadyType, result) + + return result + } + + if !canReconcile { + result := workflow.Terminate( + workflow.AtlasDeletionProtection, + "unable to reconcile Serverless Private Endpoints due to deletion protection being enabled. see https://dochub.mongodb.org/core/ako-deletion-protection for further information", + ) + service.SetConditionFromResult(status.AlertConfigurationReadyType, result) + + return result + } + providerName := GetServerlessProvider(deploymentSpec) if providerName == provider.ProviderGCP { if len(deploymentSpec.PrivateEndpoints) == 0 { @@ -57,6 +79,38 @@ func ensureServerlessPrivateEndpoints(service *workflow.Context, groupID string, return result } +func canServerlessPrivateEndpointsReconcile(ctx context.Context, service *workflow.Context, protected bool, groupID string, deployment *mdbv1.AtlasDeployment) (bool, error) { + if !protected { + return true, nil + } + + latestConfig := &mdbv1.AtlasDeployment{} + latestConfigString, ok := deployment.Annotations[customresource.AnnotationLastAppliedConfiguration] + if ok { + if err := json.Unmarshal([]byte(latestConfigString), latestConfig); err != nil { + return false, err + } + } + + atlasClient := service.Client + existingPE, err := getAllExistingServerlessPE(ctx, atlasClient.ServerlessPrivateEndpoints, groupID, deployment.Spec.ServerlessSpec.Name) + if err != nil { + return false, err + } + + if len(existingPE) == 0 { + return true, nil + } + + logger := service.Log + if setsMatch(logger, existingPE, deployment.Spec.ServerlessSpec.PrivateEndpoints) || + setsMatch(logger, existingPE, latestConfig.Spec.ServerlessSpec.PrivateEndpoints) { + return true, nil + } + + return false, nil +} + func GetServerlessProvider(deploymentSpec *mdbv1.ServerlessSpec) provider.ProviderName { if deploymentSpec.ProviderSettings.ProviderName != provider.ProviderServerless { return deploymentSpec.ProviderSettings.ProviderName @@ -174,6 +228,14 @@ type SPEDiff struct { DuplicateToCreate []mdbv1.ServerlessPrivateEndpoint } +func setsMatch(logger *zap.SugaredLogger, existedPE []mongodbatlas.ServerlessPrivateEndpointConnection, desiredPE []mdbv1.ServerlessPrivateEndpoint) bool { + d := sortServerlessPE(logger, existedPE, desiredPE) + return len(d.PEToCreate) == 0 && + len(d.PEToConnect) == 0 && + len(d.PEToDelete) == 0 && + len(d.DuplicateToCreate) == 0 +} + func (d *SPEDiff) appendToCreate(pe mdbv1.ServerlessPrivateEndpoint) { for _, p := range d.PEToCreate { if p.Name == pe.Name { diff --git a/pkg/controller/atlasdeployment/serverless_private_endpoint_mock_test.go b/pkg/controller/atlasdeployment/serverless_private_endpoint_mock_test.go new file mode 100644 index 0000000000..136c3bbdd7 --- /dev/null +++ b/pkg/controller/atlasdeployment/serverless_private_endpoint_mock_test.go @@ -0,0 +1,34 @@ +package atlasdeployment + +import ( + "context" + + "go.mongodb.org/atlas/mongodbatlas" +) + +type ServerlessPrivateEndpointClientMock struct { + ListFn func(string, string, *mongodbatlas.ListOptions) ([]mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) +} + +func (spec ServerlessPrivateEndpointClientMock) List(_ context.Context, groupID string, instanceName string, opts *mongodbatlas.ListOptions) ([]mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) { + if spec.ListFn == nil { + panic("ListFn not mocked for test") + } + return spec.ListFn(groupID, instanceName, opts) +} + +func (spec ServerlessPrivateEndpointClientMock) Create(ctx context.Context, groupID string, instanceName string, opts *mongodbatlas.ServerlessPrivateEndpointConnection) (*mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) { + panic("not implemented") // TODO: Implement +} + +func (spec ServerlessPrivateEndpointClientMock) Get(ctx context.Context, groupID string, instanceName string, opts string) (*mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) { + panic("not implemented") // TODO: Implement +} + +func (spec ServerlessPrivateEndpointClientMock) Delete(ctx context.Context, groupID string, instanceName string, opts string) (*mongodbatlas.Response, error) { + panic("not implemented") // TODO: Implement +} + +func (spec ServerlessPrivateEndpointClientMock) Update(_ context.Context, _ string, _ string, _ string, _ *mongodbatlas.ServerlessPrivateEndpointConnection) (*mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) { + panic("not implemented") // TODO: Implement +} diff --git a/pkg/controller/atlasdeployment/serverless_private_endpoint_test.go b/pkg/controller/atlasdeployment/serverless_private_endpoint_test.go new file mode 100644 index 0000000000..d457d43fc6 --- /dev/null +++ b/pkg/controller/atlasdeployment/serverless_private_endpoint_test.go @@ -0,0 +1,255 @@ +package atlasdeployment + +import ( + "context" + "encoding/json" + "fmt" + "slices" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.mongodb.org/atlas/mongodbatlas" + "go.uber.org/zap" + + v1 "github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1" + "github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/customresource" + "github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/workflow" +) + +const ( + fakeInstanceName = "fake-instance-name" +) + +func TestCanReconcileServerlessPrivateEndpoints(t *testing.T) { + t.Run("when subResourceDeletionProtection is disabled", func(t *testing.T) { + protected := false + result, err := canServerlessPrivateEndpointsReconcile( + context.TODO(), + &workflow.Context{}, + protected, + "fake-project-id-wont-be-checked", + &v1.AtlasDeployment{}) + + require.NoError(t, err) + assert.True(t, result) + }) + + t.Run("when protected but there is no Atlas Serverless Endpoint configured", func(t *testing.T) { + ctx := context.Background() + client := mongodbatlas.Client{ + ServerlessPrivateEndpoints: ServerlessPrivateEndpointClientMock{ + ListFn: func(groupID string, instanceName string, opts *mongodbatlas.ListOptions) ([]mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) { + return []mongodbatlas.ServerlessPrivateEndpointConnection{}, nil, nil + }, + }, + } + deployment := sampleServerlessDeployment() + protected := true + workflowCtx := workflow.Context{Client: client} + + result, err := canServerlessPrivateEndpointsReconcile(ctx, &workflowCtx, protected, fakeProjectID, deployment) + + require.NoError(t, err) + assert.True(t, result) + }) + + t.Run("when protected but configs match", func(t *testing.T) { + ctx := context.Background() + endpointsConfig := sampleAtlasSPEConfig() + client := mongodbatlas.Client{ + ServerlessPrivateEndpoints: ServerlessPrivateEndpointClientMock{ + ListFn: func(groupID string, instanceName string, opts *mongodbatlas.ListOptions) ([]mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) { + return endpointsConfig, nil, nil + }, + }, + } + deployment := sampleAnnotatedServerlessDeployment(endpointsFrom(endpointsConfig)) + protected := true + workflowCtx := workflow.Context{Client: client, Log: debugLogger(t)} + + result, err := canServerlessPrivateEndpointsReconcile(ctx, &workflowCtx, protected, fakeProjectID, deployment) + + require.NoError(t, err) + assert.True(t, result) + }) + + t.Run("when protected but configs match, even with different order", func(t *testing.T) { + ctx := context.Background() + endpointsConfig := sampleAtlasSPEConfig() + client := mongodbatlas.Client{ + ServerlessPrivateEndpoints: ServerlessPrivateEndpointClientMock{ + ListFn: func(groupID string, instanceName string, opts *mongodbatlas.ListOptions) ([]mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) { + return endpointsConfig, nil, nil + }, + }, + } + deployment := sampleAnnotatedServerlessDeployment(reverse(endpointsFrom(endpointsConfig))) + protected := true + workflowCtx := workflow.Context{Client: client, Log: debugLogger(t)} + + result, err := canServerlessPrivateEndpointsReconcile(ctx, &workflowCtx, protected, fakeProjectID, deployment) + + require.NoError(t, err) + assert.True(t, result) + }) + + t.Run("when protected but only old configs matches", func(t *testing.T) { + ctx := context.Background() + endpointsConfig := sampleAtlasSPEConfig() + client := mongodbatlas.Client{ + ServerlessPrivateEndpoints: ServerlessPrivateEndpointClientMock{ + ListFn: func(groupID string, instanceName string, opts *mongodbatlas.ListOptions) ([]mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) { + return endpointsConfig, nil, nil + }, + }, + } + deployment := sampleAnnotatedServerlessDeployment(endpointsFrom(endpointsConfig)) + // remove all PEs in the current desired setup + deployment.Spec.ServerlessSpec.PrivateEndpoints = []v1.ServerlessPrivateEndpoint{} + protected := true + workflowCtx := workflow.Context{Client: client, Log: debugLogger(t)} + + result, err := canServerlessPrivateEndpointsReconcile(ctx, &workflowCtx, protected, fakeProjectID, deployment) + + require.NoError(t, err) + assert.True(t, result) + }) +} + +func TestCannotReconcileServerlessPrivateEndpoints(t *testing.T) { + t.Run("when configs do not match", func(t *testing.T) { + ctx := context.Background() + endpointsConfig := sampleAtlasSPEConfig() + client := mongodbatlas.Client{ + ServerlessPrivateEndpoints: ServerlessPrivateEndpointClientMock{ + ListFn: func(groupID string, instanceName string, opts *mongodbatlas.ListOptions) ([]mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) { + return endpointsConfig, nil, nil + }, + }, + } + endpoints := endpointsFrom(endpointsConfig) + endpoints[0].CloudProviderEndpointID = "non-matching-fake-id" + deployment := sampleAnnotatedServerlessDeployment(endpoints) + protected := true + workflowCtx := workflow.Context{Client: client, Log: debugLogger(t)} + + result, err := canServerlessPrivateEndpointsReconcile(ctx, &workflowCtx, protected, fakeProjectID, deployment) + + require.NoError(t, err) + assert.False(t, result) + }) +} + +func TestCanReconcileServerlessPrivateEndpointsFail(t *testing.T) { + t.Run("when the old config is not a proper JSON", func(t *testing.T) { + ctx := context.Background() + client := mongodbatlas.Client{} + deployment := sampleServerlessDeployment() + deployment.Annotations = map[string]string{ + customresource.AnnotationLastAppliedConfiguration: "{", + } + protected := true + workflowCtx := workflow.Context{Client: client, Log: debugLogger(t)} + + result, err := canServerlessPrivateEndpointsReconcile(ctx, &workflowCtx, protected, fakeProjectID, deployment) + + require.False(t, result) + var aJSONError *json.SyntaxError + assert.ErrorAs(t, err, &aJSONError) + }) + + t.Run("when list fails in Atlas", func(t *testing.T) { + ctx := context.Background() + fakeError := fmt.Errorf("fake error from Atlas") + client := mongodbatlas.Client{ + ServerlessPrivateEndpoints: ServerlessPrivateEndpointClientMock{ + ListFn: func(groupID string, instanceName string, opts *mongodbatlas.ListOptions) ([]mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) { + return nil, nil, fakeError + }, + }, + } + deployment := sampleServerlessDeployment() + protected := true + workflowCtx := workflow.Context{Client: client, Log: debugLogger(t)} + + result, err := canServerlessPrivateEndpointsReconcile(ctx, &workflowCtx, protected, fakeProjectID, deployment) + + require.False(t, result) + assert.ErrorIs(t, err, fakeError) + }) +} + +func sampleServerlessDeployment() *v1.AtlasDeployment { + return &v1.AtlasDeployment{ + Spec: v1.AtlasDeploymentSpec{ + ServerlessSpec: &v1.ServerlessSpec{Name: fakeInstanceName}, + }, + } +} + +func sampleAnnotatedServerlessDeployment(endpoints []v1.ServerlessPrivateEndpoint) *v1.AtlasDeployment { + deployment := &v1.AtlasDeployment{ + Spec: v1.AtlasDeploymentSpec{ServerlessSpec: &v1.ServerlessSpec{ + Name: fakeInstanceName, + PrivateEndpoints: endpoints, + }}, + } + deployment.Annotations = map[string]string{ + customresource.AnnotationLastAppliedConfiguration: jsonize(deployment), + } + return deployment +} + +func sampleAtlasSPEConfig() []mongodbatlas.ServerlessPrivateEndpointConnection { + return []mongodbatlas.ServerlessPrivateEndpointConnection{ + { + ID: "fake-id-1", + CloudProviderEndpointID: "fake-id-1", + Comment: "fake-name-1", + Status: SPEStatusAvailable, + ProviderName: "AWS", + }, + { + ID: "fake-id-2", + CloudProviderEndpointID: "fake-id-2", + Comment: "fake-name-2", + Status: SPEStatusAvailable, + ProviderName: "Azure", + PrivateEndpointIPAddress: "11.11.10.0", + }, + } +} + +func endpointsFrom(configs []mongodbatlas.ServerlessPrivateEndpointConnection) []v1.ServerlessPrivateEndpoint { + endpoints := []v1.ServerlessPrivateEndpoint{} + for _, cfg := range configs { + endpoints = append(endpoints, v1.ServerlessPrivateEndpoint{ + Name: cfg.Comment, + CloudProviderEndpointID: cfg.CloudProviderEndpointID, + PrivateEndpointIPAddress: cfg.PrivateEndpointIPAddress, + }) + } + return endpoints +} + +func reverse(endpoints []v1.ServerlessPrivateEndpoint) []v1.ServerlessPrivateEndpoint { + slices.Reverse(endpoints) + return endpoints +} + +func jsonize(obj any) string { + jsonBytes, err := json.Marshal(obj) + if err != nil { + return err.Error() + } + return string(jsonBytes) +} + +func debugLogger(t *testing.T) *zap.SugaredLogger { + t.Helper() + + logger, err := zap.NewDevelopment() + require.NoError(t, err) + return logger.Sugar() +}