Skip to content

Commit

Permalink
CLOUPD-193335: Deletion Protection Serverless Private Endpoints
Browse files Browse the repository at this point in the history
Signed-off-by: Jose Vazquez <[email protected]>
  • Loading branch information
josvazg committed Aug 31, 2023
1 parent 338e39b commit a3dee69
Show file tree
Hide file tree
Showing 13 changed files with 616 additions and 81 deletions.
95 changes: 53 additions & 42 deletions pkg/controller/atlasdeployment/atlasdeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,53 +153,67 @@ func (r *AtlasDeploymentReconciler) Reconcile(context context.Context, req ctrl.
// Allow users to specify M0/M2/M5 deployments without providing TENANT for Normal and Serverless deployments
r.verifyNonTenantCase(deployment)

if result := r.checkDeploymentIsManaged(workflowCtx, context, log, project, deployment); !result.IsOk() {
return result.ReconcileResult(), nil
// convertedDeployment is either serverless or advanced, deployment must be kept unchanged
// convertedDeployment is always a separate copy, to avoid changes on it to go back to k8s
convertedDeployment := deployment.DeepCopy()
if deployment.IsLegacyDeployment() {
if err := ConvertLegacyDeployment(&convertedDeployment.Spec); err != nil {
result = workflow.Terminate(workflow.Internal, err.Error())
log.Errorw("failed to convert legacy deployment", "error", err)
return result.ReconcileResult(), nil
}
convertedDeployment.Spec.DeploymentSpec = nil
}

deletionRequest, result := r.handleDeletion(workflowCtx, context, log, prevResult, project, deployment)
if deletionRequest {
if result := r.checkDeploymentIsManaged(workflowCtx, context, log, project, convertedDeployment); !result.IsOk() {
return result.ReconcileResult(), nil
}

err = customresource.ApplyLastConfigApplied(context, deployment, r.Client)
if err != nil {
result = workflow.Terminate(workflow.Internal, err.Error())
workflowCtx.SetConditionFromResult(status.DeploymentReadyType, result)
log.Error(result.GetMessage())

deletionRequest, result := r.handleDeletion(workflowCtx, context, log, prevResult, project, deployment)
if deletionRequest {
return result.ReconcileResult(), nil
}

if deployment.IsLegacyDeployment() {
if err := ConvertLegacyDeployment(&deployment.Spec); err != nil {
result = workflow.Terminate(workflow.Internal, err.Error())
log.Errorw("failed to convert legacy deployment", "error", err)
return result.ReconcileResult(), nil
}
deployment.Spec.DeploymentSpec = nil
}

if err := uniqueKey(&deployment.Spec); err != nil {
if err := uniqueKey(&convertedDeployment.Spec); err != nil {
log.Errorw("failed to validate tags", "error", err)
result := workflow.Terminate(workflow.Internal, err.Error())
workflowCtx.SetConditionFromResult(status.DeploymentReadyType, result)
return result.ReconcileResult(), nil
}

handleDeployment := r.selectDeploymentHandler(deployment)
if result, _ := handleDeployment(workflowCtx, project, deployment, req); !result.IsOk() {
handleDeployment := r.selectDeploymentHandler(convertedDeployment)
if result, _ := handleDeployment(context, workflowCtx, project, convertedDeployment, req); !result.IsOk() {
workflowCtx.SetConditionFromResult(status.DeploymentReadyType, result)
return result.ReconcileResult(), nil
return r.registerConfigAndReturn(workflowCtx, context, log, deployment, result), nil
}

if !deployment.IsServerless() {
if result := r.handleAdvancedOptions(workflowCtx, project, deployment); !result.IsOk() {
if !convertedDeployment.IsServerless() {
if result := r.handleAdvancedOptions(workflowCtx, project, convertedDeployment); !result.IsOk() {
workflowCtx.SetConditionFromResult(status.DeploymentReadyType, result)
return result.ReconcileResult(), nil
return r.registerConfigAndReturn(workflowCtx, context, log, deployment, result), nil
}
}

return r.registerConfigAndReturn(workflowCtx, context, log, deployment, workflow.OK()), nil
}

func (r *AtlasDeploymentReconciler) registerConfigAndReturn(
workflowCtx *workflow.Context,
context context.Context,
log *zap.SugaredLogger,
deployment *mdbv1.AtlasDeployment, // this must be the original non converted deployment
result workflow.Result) ctrl.Result {
if result.IsOk() || result.IsInProgress() {
err := customresource.ApplyLastConfigApplied(context, deployment, r.Client)
if err != nil {
alternateResult := workflow.Terminate(workflow.Internal, err.Error())
workflowCtx.SetConditionFromResult(status.DeploymentReadyType, alternateResult)
log.Error(result.GetMessage())

return result.ReconcileResult()
}
}
return workflow.OK().ReconcileResult(), nil
return result.ReconcileResult()
}

func (r *AtlasDeploymentReconciler) verifyNonTenantCase(deployment *mdbv1.AtlasDeployment) {
Expand Down Expand Up @@ -231,19 +245,16 @@ func (r *AtlasDeploymentReconciler) checkDeploymentIsManaged(
project *mdbv1.AtlasProject,
deployment *mdbv1.AtlasDeployment,
) workflow.Result {
advancedDeployment := deployment
if deployment.IsLegacyDeployment() {
advancedDeployment = deployment.DeepCopy()
if err := ConvertLegacyDeployment(&advancedDeployment.Spec); err != nil {
result := workflow.Terminate(workflow.Internal, err.Error())
log.Errorw("failed to temporary convert legacy deployment", "error", err)
return result
}
advancedDeployment.Spec.DeploymentSpec = nil
result := workflow.Terminate(workflow.Internal, "ownership check expected a converted deployment, not a legacy one")
workflowCtx.SetConditionFromResult(status.DatabaseUserReadyType, result)
log.Error(result.GetMessage())

return result
}

owner, err := customresource.IsOwner(
advancedDeployment,
deployment,
r.ObjectDeletionProtection,
customresource.IsResourceManagedByOperator,
managedByAtlas(context, workflowCtx.Client, project.ID(), log),
Expand Down Expand Up @@ -277,7 +288,7 @@ func (r *AtlasDeploymentReconciler) handleDeletion(
log *zap.SugaredLogger,
prevResult workflow.Result,
project *mdbv1.AtlasProject,
deployment *mdbv1.AtlasDeployment,
deployment *mdbv1.AtlasDeployment, // this must be the original non converted deployment
) (bool, workflow.Result) {
if deployment.GetDeletionTimestamp().IsZero() {
if !customresource.HaveFinalizer(deployment, customresource.FinalizerLabel) {
Expand Down Expand Up @@ -358,7 +369,7 @@ func (r *AtlasDeploymentReconciler) selectDeploymentHandler(deployment *mdbv1.At
}

// handleAdvancedDeployment ensures the state of the deployment using the Advanced Deployment API
func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) {
func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) {
c, result := r.ensureAdvancedDeploymentState(workflowCtx, project, deployment)
if c != nil && c.StateName != "" {
workflowCtx.EnsureStatusOption(status.AtlasDeploymentStateNameOption(c.StateName))
Expand Down Expand Up @@ -387,7 +398,7 @@ func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(workflowCtx *workfl
}

if err := r.ensureBackupScheduleAndPolicy(
context.Background(),
ctx,
workflowCtx, project.ID(),
deployment,
backupEnabled,
Expand All @@ -411,8 +422,8 @@ func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(workflowCtx *workfl
}

// handleServerlessInstance ensures the state of the serverless instance using the serverless API
func (r *AtlasDeploymentReconciler) handleServerlessInstance(workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) {
c, result := ensureServerlessInstanceState(workflowCtx, project, deployment.Spec.ServerlessSpec)
func (r *AtlasDeploymentReconciler) handleServerlessInstance(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) {
c, result := r.ensureServerlessInstanceState(ctx, workflowCtx, project, deployment)
return r.ensureConnectionSecretsAndSetStatusOptions(workflowCtx, project, deployment, result, c)
}

Expand Down Expand Up @@ -579,7 +590,7 @@ func (r *AtlasDeploymentReconciler) removeDeletionFinalizer(context context.Cont
return nil
}

type deploymentHandlerFunc func(workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error)
type deploymentHandlerFunc func(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error)

type atlasClusterType int

Expand Down
25 changes: 23 additions & 2 deletions pkg/controller/atlasdeployment/atlasdeployment_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestDeploymentManaged(t *testing.T) {
},
}
project := testProject(fakeNamespace)
deployment := v1.NewDeployment(project.Namespace, fakeDeployment, fakeDeployment)
deployment := asAdvanced(v1.NewDeployment(project.Namespace, fakeDeployment, fakeDeployment))
te := newTestDeploymentEnv(t, tc.protected, atlasClient, testK8sClient(), project, deployment)
if tc.managedTag {
customresource.SetAnnotation(te.deployment, customresource.AnnotationLastAppliedConfiguration, "")
Expand Down Expand Up @@ -132,7 +132,7 @@ func TestProtectedAdvancedDeploymentManagedInAtlas(t *testing.T) {
},
},
}
deployment := v1.NewDeployment(project.Namespace, fakeDeployment, fakeDeployment)
deployment := asAdvanced(v1.NewDeployment(project.Namespace, fakeDeployment, fakeDeployment))
te := newTestDeploymentEnv(t, protected, atlasClient, testK8sClient(), project, deployment)

result := te.reconciler.checkDeploymentIsManaged(te.workflowCtx, te.context, te.log, te.project, te.deployment)
Expand All @@ -146,6 +146,27 @@ func TestProtectedAdvancedDeploymentManagedInAtlas(t *testing.T) {
}
}

func TestLegacyIsManagedInAtlasMustFail(t *testing.T) {
t.Run("Legacy deployment must fail to check if it is managed in Atlas", func(t *testing.T) {
protected := true
project := testProject(fakeNamespace)
inAtlas := differentAdvancedDeployment(fakeNamespace)
atlasClient := mongodbatlas.Client{
AdvancedClusters: &advancedClustersClientMock{
GetFn: func(groupID string, clusterName string) (*mongodbatlas.AdvancedCluster, *mongodbatlas.Response, error) {
return inAtlas, nil, nil
},
},
}
deployment := v1.NewDeployment(project.Namespace, fakeDeployment, fakeDeployment)
te := newTestDeploymentEnv(t, protected, atlasClient, testK8sClient(), project, deployment)

result := te.reconciler.checkDeploymentIsManaged(te.workflowCtx, te.context, te.log, te.project, te.deployment)

assert.Regexp(t, regexp.MustCompile("ownership check expected a converted deployment"), result.GetMessage())
})
}

func TestProtectedServerlessManagedInAtlas(t *testing.T) {
testCases := []struct {
title string
Expand Down
16 changes: 10 additions & 6 deletions pkg/controller/atlasdeployment/serverless_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ import (
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/workflow"
)

func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasProject, serverlessSpec *mdbv1.ServerlessSpec) (atlasDeployment *mongodbatlas.Cluster, _ workflow.Result) {
atlasDeployment, resp, err := ctx.Client.ServerlessInstances.Get(context.Background(), project.Status.ID, serverlessSpec.Name)
func (r *AtlasDeploymentReconciler) ensureServerlessInstanceState(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment) (atlasDeployment *mongodbatlas.Cluster, _ workflow.Result) {
if deployment == nil || deployment.Spec.ServerlessSpec == nil {
return nil, workflow.Terminate(workflow.ServerlessPrivateEndpointReady, "deployment spec is empty")
}
serverlessSpec := deployment.Spec.ServerlessSpec
atlasDeployment, resp, err := workflowCtx.Client.ServerlessInstances.Get(context.Background(), project.Status.ID, serverlessSpec.Name)
if err != nil {
if resp == nil {
return atlasDeployment, workflow.Terminate(workflow.Internal, err.Error())
Expand All @@ -28,8 +32,8 @@ func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasPr
if err != nil {
return atlasDeployment, workflow.Terminate(workflow.Internal, err.Error())
}
ctx.Log.Infof("Serverless Instance %s doesn't exist in Atlas - creating", serverlessSpec.Name)
atlasDeployment, _, err = ctx.Client.ServerlessInstances.Create(context.Background(), project.Status.ID, &mongodbatlas.ServerlessCreateRequestParams{
workflowCtx.Log.Infof("Serverless Instance %s doesn't exist in Atlas - creating", serverlessSpec.Name)
atlasDeployment, _, err = workflowCtx.Client.ServerlessInstances.Create(context.Background(), project.Status.ID, &mongodbatlas.ServerlessCreateRequestParams{
Name: serverlessSpec.Name,
ProviderSettings: &mongodbatlas.ServerlessProviderSettings{
BackingProviderName: serverlessSpec.ProviderSettings.BackingProviderName,
Expand All @@ -53,7 +57,7 @@ func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasPr
convertedDeployment.Tags = &[]*mongodbatlas.Tag{}
}
if !isTagsEqual(*(atlasDeployment.Tags), *(convertedDeployment.Tags)) {
atlasDeployment, _, err = ctx.Client.ServerlessInstances.Update(context.Background(), project.Status.ID, serverlessSpec.Name, &mongodbatlas.ServerlessUpdateRequestParams{
atlasDeployment, _, err = workflowCtx.Client.ServerlessInstances.Update(context.Background(), project.Status.ID, serverlessSpec.Name, &mongodbatlas.ServerlessUpdateRequestParams{
Tag: convertedDeployment.Tags,
ServerlessBackupOptions: &mongodbatlas.ServerlessBackupOptions{
ServerlessContinuousBackupEnabled: &serverlessSpec.BackupOptions.ServerlessContinuousBackupEnabled,
Expand All @@ -65,7 +69,7 @@ func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasPr
}
return atlasDeployment, workflow.InProgress(workflow.DeploymentUpdating, "deployment is updating")
}
result := ensureServerlessPrivateEndpoints(ctx, project.ID(), serverlessSpec, atlasDeployment.Name)
result := ensureServerlessPrivateEndpoints(ctx, workflowCtx, project.ID(), deployment, atlasDeployment.Name, r.SubObjectDeletionProtection)
return atlasDeployment, result

case status.StateCREATING:
Expand Down
111 changes: 109 additions & 2 deletions pkg/controller/atlasdeployment/serverless_private_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package atlasdeployment

import (
"context"
"encoding/json"
"fmt"
"sort"

"github.com/mongodb/mongodb-atlas-kubernetes/pkg/util/stringutil"

Expand All @@ -14,6 +16,7 @@ import (

mdbv1 "github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1/provider"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/customresource"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/workflow"
)

Expand All @@ -28,10 +31,30 @@ const (
SPEStatusFailed = "FAILED" //stage 2
)

func ensureServerlessPrivateEndpoints(service *workflow.Context, groupID string, deploymentSpec *mdbv1.ServerlessSpec, deploymentName string) workflow.Result {
if deploymentSpec == nil {
func ensureServerlessPrivateEndpoints(ctx context.Context, service *workflow.Context, groupID string, deployment *mdbv1.AtlasDeployment, deploymentName string, protected bool) workflow.Result {
if deployment == nil || deployment.Spec.ServerlessSpec == nil {
return workflow.Terminate(workflow.ServerlessPrivateEndpointReady, "deployment spec is empty")
}
deploymentSpec := deployment.Spec.ServerlessSpec

canReconcile, err := canServerlessPrivateEndpointsReconcile(ctx, service, protected, groupID, deployment)
if err != nil {
result := workflow.Terminate(workflow.Internal, fmt.Sprintf("unable to resolve ownership for deletion protection: %s", err))
service.SetConditionFromResult(status.AlertConfigurationReadyType, result)

return result
}

if !canReconcile {
result := workflow.Terminate(
workflow.AtlasDeletionProtection,
"unable to reconcile Serverless Private Endpoints due to deletion protection being enabled. see https://dochub.mongodb.org/core/ako-deletion-protection for further information",
)
service.SetConditionFromResult(status.AlertConfigurationReadyType, result)

return result
}

providerName := GetServerlessProvider(deploymentSpec)
if providerName == provider.ProviderGCP {
if len(deploymentSpec.PrivateEndpoints) == 0 {
Expand All @@ -57,6 +80,90 @@ func ensureServerlessPrivateEndpoints(service *workflow.Context, groupID string,
return result
}

func canServerlessPrivateEndpointsReconcile(ctx context.Context, service *workflow.Context, protected bool, groupID string, deployment *mdbv1.AtlasDeployment) (bool, error) {
if !protected {
return true, nil
}

latestConfig := &mdbv1.AtlasDeploymentSpec{}
latestConfigString, ok := deployment.Annotations[customresource.AnnotationLastAppliedConfiguration]
if ok {
if err := json.Unmarshal([]byte(latestConfigString), latestConfig); err != nil {
return false, err
}
}

atlasClient := service.Client
existingPE, err := getAllExistingServerlessPE(ctx, atlasClient.ServerlessPrivateEndpoints, groupID, deployment.Spec.ServerlessSpec.Name)
if err != nil {
return false, err
}

if len(existingPE) == 0 {
return true, nil
}

logger := service.Log
prevCfg := prevPEConfig(latestConfig)
if matchingPEs(logger, deployment.Spec.ServerlessSpec.PrivateEndpoints, existingPE) ||
matchingPEs(logger, prevCfg, existingPE) {
return true, nil
}
return false, nil
}

func sortedK8sPENames(spes []mdbv1.ServerlessPrivateEndpoint) []string {
names := make([]string, 0, len(spes))
for _, spe := range spes {
names = append(names, summarizeSPE(spe))
}
sort.Strings(names)
return names
}

func summarizeSPE(spe mdbv1.ServerlessPrivateEndpoint) string {
return fmt.Sprintf("(%q,%q,%q)", spe.Name, spe.CloudProviderEndpointID, spe.PrivateEndpointIPAddress)
}

func sortedAtlasPENames(atlasPEs []mongodbatlas.ServerlessPrivateEndpointConnection) []string {
names := make([]string, 0, len(atlasPEs))
for _, atlasPE := range atlasPEs {
names = append(names, summarizeAtlasPE(atlasPE))
}
sort.Strings(names)
return names
}

func summarizeAtlasPE(atlasPE mongodbatlas.ServerlessPrivateEndpointConnection) string {
return fmt.Sprintf("(%q,%q,%q)", atlasPE.Comment, atlasPE.CloudProviderEndpointID, atlasPE.PrivateEndpointIPAddress)
}

func matchingPEs(logger *zap.SugaredLogger, spes []mdbv1.ServerlessPrivateEndpoint, atlasPECs []mongodbatlas.ServerlessPrivateEndpointConnection) bool {
k8sPEs := sortedK8sPENames(spes)
atlasPEs := sortedAtlasPENames(atlasPECs)
if len(k8sPEs) != len(atlasPEs) {
logger.Debugf("Kubernetes PEs do not match Atlas: k8s %v != Atlas %v", k8sPEs, atlasPEs)
logger.Debugf("Different PE sets lengths Kubernetes wants %d but atlas has %d", len(k8sPEs), len(atlasPEs))
return false
}
for i, k8sPE := range k8sPEs {
if atlasPEs[i] != k8sPE {
logger.Debugf("Kubernetes PEs do not match Atlas: k8s %v != Atlas %v", spes, atlasPEs)
logger.Debugf("Different PE at index %d %v but atlas has %v", i, k8sPE, atlasPEs[i])
return false
}
}
logger.Debugf("Kubernetes PEs MATCH Atlas: k8s %v == Atlas %v", k8sPEs, atlasPEs)
return true
}

func prevPEConfig(deploymentSpec *mdbv1.AtlasDeploymentSpec) []mdbv1.ServerlessPrivateEndpoint {
if deploymentSpec.ServerlessSpec == nil || deploymentSpec.ServerlessSpec.PrivateEndpoints == nil {
return []mdbv1.ServerlessPrivateEndpoint{}
}
return deploymentSpec.ServerlessSpec.PrivateEndpoints
}

func GetServerlessProvider(deploymentSpec *mdbv1.ServerlessSpec) provider.ProviderName {
if deploymentSpec.ProviderSettings.ProviderName != provider.ProviderServerless {
return deploymentSpec.ProviderSettings.ProviderName
Expand Down
Loading

0 comments on commit a3dee69

Please sign in to comment.