Skip to content

Commit

Permalink
CLOUPD-193335: Deletion Protection Serverless Private Endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
josvazg committed Sep 1, 2023
1 parent df7db35 commit f1977e4
Show file tree
Hide file tree
Showing 13 changed files with 609 additions and 82 deletions.
97 changes: 54 additions & 43 deletions pkg/controller/atlasdeployment/atlasdeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,53 +153,67 @@ func (r *AtlasDeploymentReconciler) Reconcile(context context.Context, req ctrl.
// Allow users to specify M0/M2/M5 deployments without providing TENANT for Normal and Serverless deployments
r.verifyNonTenantCase(deployment)

if result := r.checkDeploymentIsManaged(workflowCtx, context, log, project, deployment); !result.IsOk() {
return result.ReconcileResult(), nil
// convertedDeployment is either serverless or advanced, deployment must be kept unchanged
// convertedDeployment is always a separate copy, to avoid changes on it to go back to k8s
convertedDeployment := deployment.DeepCopy()
if deployment.IsLegacyDeployment() {
if err := ConvertLegacyDeployment(&convertedDeployment.Spec); err != nil {
result = workflow.Terminate(workflow.Internal, err.Error())
log.Errorw("failed to convert legacy deployment", "error", err)
return result.ReconcileResult(), nil
}
convertedDeployment.Spec.DeploymentSpec = nil
}

deletionRequest, result := r.handleDeletion(workflowCtx, context, log, prevResult, project, deployment)
if deletionRequest {
if result := r.checkDeploymentIsManaged(workflowCtx, context, log, project, convertedDeployment); !result.IsOk() {
return result.ReconcileResult(), nil
}

err = customresource.ApplyLastConfigApplied(context, deployment, r.Client)
if err != nil {
result = workflow.Terminate(workflow.Internal, err.Error())
workflowCtx.SetConditionFromResult(status.DeploymentReadyType, result)
log.Error(result.GetMessage())

deletionRequest, result := r.handleDeletion(workflowCtx, context, log, prevResult, project, deployment)
if deletionRequest {
return result.ReconcileResult(), nil
}

if deployment.IsLegacyDeployment() {
if err := ConvertLegacyDeployment(&deployment.Spec); err != nil {
result = workflow.Terminate(workflow.Internal, err.Error())
log.Errorw("failed to convert legacy deployment", "error", err)
return result.ReconcileResult(), nil
}
deployment.Spec.DeploymentSpec = nil
}

if err := uniqueKey(&deployment.Spec); err != nil {
if err := uniqueKey(&convertedDeployment.Spec); err != nil {
log.Errorw("failed to validate tags", "error", err)
result := workflow.Terminate(workflow.Internal, err.Error())
workflowCtx.SetConditionFromResult(status.DeploymentReadyType, result)
return result.ReconcileResult(), nil
}

handleDeployment := r.selectDeploymentHandler(deployment)
if result, _ := handleDeployment(workflowCtx, project, deployment, req); !result.IsOk() {
handleDeployment := r.selectDeploymentHandler(convertedDeployment)
if result, _ := handleDeployment(context, workflowCtx, project, convertedDeployment, req); !result.IsOk() {
workflowCtx.SetConditionFromResult(status.DeploymentReadyType, result)
return result.ReconcileResult(), nil
return r.registerConfigAndReturn(workflowCtx, context, log, deployment, result), nil
}

if !deployment.IsServerless() {
if result := r.handleAdvancedOptions(workflowCtx, project, deployment); !result.IsOk() {
if !convertedDeployment.IsServerless() {
if result := r.handleAdvancedOptions(workflowCtx, project, convertedDeployment); !result.IsOk() {
workflowCtx.SetConditionFromResult(status.DeploymentReadyType, result)
return result.ReconcileResult(), nil
return r.registerConfigAndReturn(workflowCtx, context, log, deployment, result), nil
}
}

return r.registerConfigAndReturn(workflowCtx, context, log, deployment, workflow.OK()), nil
}

func (r *AtlasDeploymentReconciler) registerConfigAndReturn(
workflowCtx *workflow.Context,
context context.Context,
log *zap.SugaredLogger,
deployment *mdbv1.AtlasDeployment, // this must be the original non converted deployment
result workflow.Result) ctrl.Result {
if result.IsOk() || result.IsInProgress() {
err := customresource.ApplyLastConfigApplied(context, deployment, r.Client)
if err != nil {
alternateResult := workflow.Terminate(workflow.Internal, err.Error())
workflowCtx.SetConditionFromResult(status.DeploymentReadyType, alternateResult)
log.Error(result.GetMessage())

return result.ReconcileResult()
}
}
return workflow.OK().ReconcileResult(), nil
return result.ReconcileResult()
}

func (r *AtlasDeploymentReconciler) verifyNonTenantCase(deployment *mdbv1.AtlasDeployment) {
Expand Down Expand Up @@ -231,19 +245,16 @@ func (r *AtlasDeploymentReconciler) checkDeploymentIsManaged(
project *mdbv1.AtlasProject,
deployment *mdbv1.AtlasDeployment,
) workflow.Result {
advancedDeployment := deployment
if deployment.IsLegacyDeployment() {
advancedDeployment = deployment.DeepCopy()
if err := ConvertLegacyDeployment(&advancedDeployment.Spec); err != nil {
result := workflow.Terminate(workflow.Internal, err.Error())
log.Errorw("failed to temporary convert legacy deployment", "error", err)
return result
}
advancedDeployment.Spec.DeploymentSpec = nil
result := workflow.Terminate(workflow.Internal, "ownership check expected a converted deployment, not a legacy one")
workflowCtx.SetConditionFromResult(status.DatabaseUserReadyType, result)
log.Error(result.GetMessage())

return result
}

owner, err := customresource.IsOwner(
advancedDeployment,
deployment,
r.ObjectDeletionProtection,
customresource.IsResourceManagedByOperator,
managedByAtlas(context, workflowCtx.Client, project.ID(), log),
Expand Down Expand Up @@ -277,7 +288,7 @@ func (r *AtlasDeploymentReconciler) handleDeletion(
log *zap.SugaredLogger,
prevResult workflow.Result,
project *mdbv1.AtlasProject,
deployment *mdbv1.AtlasDeployment,
deployment *mdbv1.AtlasDeployment, // this must be the original non converted deployment
) (bool, workflow.Result) {
if deployment.GetDeletionTimestamp().IsZero() {
if !customresource.HaveFinalizer(deployment, customresource.FinalizerLabel) {
Expand Down Expand Up @@ -358,7 +369,7 @@ func (r *AtlasDeploymentReconciler) selectDeploymentHandler(deployment *mdbv1.At
}

// handleAdvancedDeployment ensures the state of the deployment using the Advanced Deployment API
func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) {
func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) {
c, result := r.ensureAdvancedDeploymentState(workflowCtx, project, deployment)
if c != nil && c.StateName != "" {
workflowCtx.EnsureStatusOption(status.AtlasDeploymentStateNameOption(c.StateName))
Expand Down Expand Up @@ -387,7 +398,7 @@ func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(workflowCtx *workfl
}

if err := r.ensureBackupScheduleAndPolicy(
context.Background(),
ctx,
workflowCtx, project.ID(),
deployment,
backupEnabled,
Expand All @@ -411,8 +422,8 @@ func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(workflowCtx *workfl
}

// handleServerlessInstance ensures the state of the serverless instance using the serverless API
func (r *AtlasDeploymentReconciler) handleServerlessInstance(workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) {
c, result := ensureServerlessInstanceState(workflowCtx, project, deployment.Spec.ServerlessSpec)
func (r *AtlasDeploymentReconciler) handleServerlessInstance(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) {
c, result := r.ensureServerlessInstanceState(ctx, workflowCtx, project, deployment)
return r.ensureConnectionSecretsAndSetStatusOptions(workflowCtx, project, deployment, result, c)
}

Expand Down Expand Up @@ -579,7 +590,7 @@ func (r *AtlasDeploymentReconciler) removeDeletionFinalizer(context context.Cont
return nil
}

type deploymentHandlerFunc func(workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error)
type deploymentHandlerFunc func(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error)

type atlasClusterType int

Expand Down Expand Up @@ -687,7 +698,7 @@ func advancedDeploymentMatchesSpec(log *zap.SugaredLogger, atlasSpec *mongodbatl
return d == "", nil
}

// Parse through tags and verfiy that all keys are unique. Return error otherwise.
// Parse through tags and verify that all keys are unique. Return error otherwise.
func uniqueKey(deploymentSpec *mdbv1.AtlasDeploymentSpec) error {
store := make(map[string]string)
var arrTags []*mdbv1.TagSpec
Expand Down
25 changes: 23 additions & 2 deletions pkg/controller/atlasdeployment/atlasdeployment_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestDeploymentManaged(t *testing.T) {
},
}
project := testProject(fakeNamespace)
deployment := v1.NewDeployment(project.Namespace, fakeDeployment, fakeDeployment)
deployment := asAdvanced(v1.NewDeployment(project.Namespace, fakeDeployment, fakeDeployment))
te := newTestDeploymentEnv(t, tc.protected, atlasClient, testK8sClient(), project, deployment)
if tc.managedTag {
customresource.SetAnnotation(te.deployment, customresource.AnnotationLastAppliedConfiguration, "")
Expand Down Expand Up @@ -132,7 +132,7 @@ func TestProtectedAdvancedDeploymentManagedInAtlas(t *testing.T) {
},
},
}
deployment := v1.NewDeployment(project.Namespace, fakeDeployment, fakeDeployment)
deployment := asAdvanced(v1.NewDeployment(project.Namespace, fakeDeployment, fakeDeployment))
te := newTestDeploymentEnv(t, protected, atlasClient, testK8sClient(), project, deployment)

result := te.reconciler.checkDeploymentIsManaged(te.workflowCtx, te.context, te.log, te.project, te.deployment)
Expand All @@ -146,6 +146,27 @@ func TestProtectedAdvancedDeploymentManagedInAtlas(t *testing.T) {
}
}

func TestLegacyIsManagedInAtlasMustFail(t *testing.T) {
t.Run("Legacy deployment must fail to check if it is managed in Atlas", func(t *testing.T) {
protected := true
project := testProject(fakeNamespace)
inAtlas := differentAdvancedDeployment(fakeNamespace)
atlasClient := mongodbatlas.Client{
AdvancedClusters: &advancedClustersClientMock{
GetFn: func(groupID string, clusterName string) (*mongodbatlas.AdvancedCluster, *mongodbatlas.Response, error) {
return inAtlas, nil, nil
},
},
}
deployment := v1.NewDeployment(project.Namespace, fakeDeployment, fakeDeployment)
te := newTestDeploymentEnv(t, protected, atlasClient, testK8sClient(), project, deployment)

result := te.reconciler.checkDeploymentIsManaged(te.workflowCtx, te.context, te.log, te.project, te.deployment)

assert.Regexp(t, regexp.MustCompile("ownership check expected a converted deployment"), result.GetMessage())
})
}

func TestProtectedServerlessManagedInAtlas(t *testing.T) {
testCases := []struct {
title string
Expand Down
16 changes: 10 additions & 6 deletions pkg/controller/atlasdeployment/serverless_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ import (
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/workflow"
)

func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasProject, serverlessSpec *mdbv1.ServerlessSpec) (atlasDeployment *mongodbatlas.Cluster, _ workflow.Result) {
atlasDeployment, resp, err := ctx.Client.ServerlessInstances.Get(context.Background(), project.Status.ID, serverlessSpec.Name)
func (r *AtlasDeploymentReconciler) ensureServerlessInstanceState(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment) (atlasDeployment *mongodbatlas.Cluster, _ workflow.Result) {
if deployment == nil || deployment.Spec.ServerlessSpec == nil {
return nil, workflow.Terminate(workflow.ServerlessPrivateEndpointReady, "deployment spec is empty")
}
serverlessSpec := deployment.Spec.ServerlessSpec
atlasDeployment, resp, err := workflowCtx.Client.ServerlessInstances.Get(context.Background(), project.Status.ID, serverlessSpec.Name)
if err != nil {
if resp == nil {
return atlasDeployment, workflow.Terminate(workflow.Internal, err.Error())
Expand All @@ -28,8 +32,8 @@ func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasPr
if err != nil {
return atlasDeployment, workflow.Terminate(workflow.Internal, err.Error())
}
ctx.Log.Infof("Serverless Instance %s doesn't exist in Atlas - creating", serverlessSpec.Name)
atlasDeployment, _, err = ctx.Client.ServerlessInstances.Create(context.Background(), project.Status.ID, &mongodbatlas.ServerlessCreateRequestParams{
workflowCtx.Log.Infof("Serverless Instance %s doesn't exist in Atlas - creating", serverlessSpec.Name)
atlasDeployment, _, err = workflowCtx.Client.ServerlessInstances.Create(context.Background(), project.Status.ID, &mongodbatlas.ServerlessCreateRequestParams{
Name: serverlessSpec.Name,
ProviderSettings: &mongodbatlas.ServerlessProviderSettings{
BackingProviderName: serverlessSpec.ProviderSettings.BackingProviderName,
Expand All @@ -53,7 +57,7 @@ func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasPr
convertedDeployment.Tags = &[]*mongodbatlas.Tag{}
}
if !isTagsEqual(*(atlasDeployment.Tags), *(convertedDeployment.Tags)) {
atlasDeployment, _, err = ctx.Client.ServerlessInstances.Update(context.Background(), project.Status.ID, serverlessSpec.Name, &mongodbatlas.ServerlessUpdateRequestParams{
atlasDeployment, _, err = workflowCtx.Client.ServerlessInstances.Update(context.Background(), project.Status.ID, serverlessSpec.Name, &mongodbatlas.ServerlessUpdateRequestParams{
// TODO: include ServerlessBackupOptions and TerminationProtectionEnabled
Tag: convertedDeployment.Tags,
})
Expand All @@ -62,7 +66,7 @@ func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasPr
}
return atlasDeployment, workflow.InProgress(workflow.DeploymentUpdating, "deployment is updating")
}
result := ensureServerlessPrivateEndpoints(ctx, project.ID(), serverlessSpec, atlasDeployment.Name)
result := ensureServerlessPrivateEndpoints(ctx, workflowCtx, project.ID(), deployment, atlasDeployment.Name, r.SubObjectDeletionProtection)
return atlasDeployment, result

case status.StateCREATING:
Expand Down
Loading

0 comments on commit f1977e4

Please sign in to comment.