Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
Signed-off-by: Jose Vazquez <[email protected]>
  • Loading branch information
josvazg committed Aug 17, 2023
1 parent 369cf3d commit aafeb2d
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 18 deletions.
12 changes: 6 additions & 6 deletions pkg/controller/atlasdeployment/atlasdeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (r *AtlasDeploymentReconciler) Reconcile(context context.Context, req ctrl.
}

handleDeployment := r.selectDeploymentHandler(deployment)
if result, _ := handleDeployment(workflowCtx, project, deployment, req); !result.IsOk() {
if result, _ := handleDeployment(context, workflowCtx, project, deployment, req); !result.IsOk() {
workflowCtx.SetConditionFromResult(status.DeploymentReadyType, result)
return result.ReconcileResult(), nil
}
Expand Down Expand Up @@ -341,7 +341,7 @@ func (r *AtlasDeploymentReconciler) selectDeploymentHandler(deployment *mdbv1.At
}

// handleAdvancedDeployment ensures the state of the deployment using the Advanced Deployment API
func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) {
func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) {
c, result := r.ensureAdvancedDeploymentState(workflowCtx, project, deployment)
if c != nil && c.StateName != "" {
workflowCtx.EnsureStatusOption(status.AtlasDeploymentStateNameOption(c.StateName))
Expand Down Expand Up @@ -370,7 +370,7 @@ func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(workflowCtx *workfl
}

if err := r.ensureBackupScheduleAndPolicy(
context.Background(),
ctx,
workflowCtx, project.ID(),
deployment,
backupEnabled,
Expand All @@ -394,8 +394,8 @@ func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(workflowCtx *workfl
}

// handleServerlessInstance ensures the state of the serverless instance using the serverless API
func (r *AtlasDeploymentReconciler) handleServerlessInstance(workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) {
c, result := ensureServerlessInstanceState(workflowCtx, project, deployment.Spec.ServerlessSpec)
func (r *AtlasDeploymentReconciler) handleServerlessInstance(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) {
c, result := r.ensureServerlessInstanceState(ctx, workflowCtx, project, deployment)
return r.ensureConnectionSecretsAndSetStatusOptions(workflowCtx, project, deployment, result, c)
}

Expand Down Expand Up @@ -562,7 +562,7 @@ func (r *AtlasDeploymentReconciler) removeDeletionFinalizer(context context.Cont
return nil
}

type deploymentHandlerFunc func(workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error)
type deploymentHandlerFunc func(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error)

type atlasClusterType int

Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/atlasdeployment/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (r *AtlasDeploymentReconciler) ensureBackupSchedule(
bSchedule := &mdbv1.AtlasBackupSchedule{}
err := r.Client.Get(ctx, *backupScheduleRef, bSchedule)
if err != nil {
return nil, fmt.Errorf("%v backupschedule resource is not found. e: %w", *backupScheduleRef, err)
return nil, fmt.Errorf("%v AtlasBackupSchedule resource is not found. e: %w", *backupScheduleRef, err)
}

resourceVersionIsValid := customresource.ValidateResourceVersion(service, bSchedule, r.Log)
Expand Down Expand Up @@ -132,7 +132,7 @@ func (r *AtlasDeploymentReconciler) ensureBackupPolicy(
bPolicy := &mdbv1.AtlasBackupPolicy{}
err := r.Client.Get(ctx, bPolicyRef, bPolicy)
if err != nil {
return nil, fmt.Errorf("unable to get backuppolicy resource %s. e: %w", bPolicyRef.String(), err)
return nil, fmt.Errorf("unable to get AtlasBackupPolicy resource %s. e: %w", bPolicyRef.String(), err)
}

resourceVersionIsValid := customresource.ValidateResourceVersion(service, bPolicy, r.Log)
Expand Down Expand Up @@ -251,13 +251,13 @@ func (r *AtlasDeploymentReconciler) updateBackupScheduleAndPolicy(
}

if equal {
r.Log.Debug("backupschedules are equal, nothing to change")
r.Log.Debug("backup schedules are equal, nothing to change")
return nil
}

r.Log.Debugf("applying backup configuration: %v", *bSchedule)
if _, _, err := service.Client.CloudProviderSnapshotBackupPolicies.Update(ctx, projectID, clusterName, apiScheduleReq); err != nil {
return fmt.Errorf("unable to create backupschedule %s. e: %w", client.ObjectKeyFromObject(bSchedule).String(), err)
return fmt.Errorf("unable to create backup schedule %s. e: %w", client.ObjectKeyFromObject(bSchedule).String(), err)
}
r.Log.Infof("successfully updated backup configuration for deployment %v", clusterName)
return nil
Expand Down
14 changes: 9 additions & 5 deletions pkg/controller/atlasdeployment/serverless_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ import (
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/workflow"
)

func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasProject, serverlessSpec *mdbv1.ServerlessSpec) (atlasDeployment *mongodbatlas.Cluster, _ workflow.Result) {
atlasDeployment, resp, err := ctx.Client.ServerlessInstances.Get(context.Background(), project.Status.ID, serverlessSpec.Name)
func (r *AtlasDeploymentReconciler) ensureServerlessInstanceState(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment) (atlasDeployment *mongodbatlas.Cluster, _ workflow.Result) {
if deployment == nil || deployment.Spec.ServerlessSpec == nil {
return nil, workflow.Terminate(workflow.ServerlessPrivateEndpointReady, "deployment spec is empty")
}
serverlessSpec := deployment.Spec.ServerlessSpec
atlasDeployment, resp, err := workflowCtx.Client.ServerlessInstances.Get(context.Background(), project.Status.ID, serverlessSpec.Name)
if err != nil {
if resp == nil {
return atlasDeployment, workflow.Terminate(workflow.Internal, err.Error())
Expand All @@ -24,8 +28,8 @@ func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasPr
return atlasDeployment, workflow.Terminate(workflow.DeploymentNotCreatedInAtlas, err.Error())
}

ctx.Log.Infof("Serverless Instance %s doesn't exist in Atlas - creating", serverlessSpec.Name)
atlasDeployment, _, err = ctx.Client.ServerlessInstances.Create(context.Background(), project.Status.ID, &mongodbatlas.ServerlessCreateRequestParams{
workflowCtx.Log.Infof("Serverless Instance %s doesn't exist in Atlas - creating", serverlessSpec.Name)
atlasDeployment, _, err = workflowCtx.Client.ServerlessInstances.Create(context.Background(), project.Status.ID, &mongodbatlas.ServerlessCreateRequestParams{
Name: serverlessSpec.Name,
ProviderSettings: &mongodbatlas.ServerlessProviderSettings{
BackingProviderName: serverlessSpec.ProviderSettings.BackingProviderName,
Expand All @@ -40,7 +44,7 @@ func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasPr

switch atlasDeployment.StateName {
case status.StateIDLE:
result := ensureServerlessPrivateEndpoints(ctx, project.ID(), serverlessSpec, atlasDeployment.Name)
result := ensureServerlessPrivateEndpoints(ctx, workflowCtx, project.ID(), deployment, atlasDeployment.Name, false)
return atlasDeployment, result
case status.StateCREATING:
return atlasDeployment, workflow.InProgress(workflow.DeploymentCreating, "deployment is provisioning")
Expand Down
56 changes: 54 additions & 2 deletions pkg/controller/atlasdeployment/serverless_private_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package atlasdeployment

import (
"context"
"encoding/json"
"fmt"

"github.com/mongodb/mongodb-atlas-kubernetes/pkg/util/stringutil"
Expand All @@ -14,6 +15,7 @@ import (

mdbv1 "github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1/provider"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/customresource"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/workflow"
)

Expand All @@ -28,10 +30,30 @@ const (
SPEStatusFailed = "FAILED" //stage 2
)

func ensureServerlessPrivateEndpoints(service *workflow.Context, groupID string, deploymentSpec *mdbv1.ServerlessSpec, deploymentName string) workflow.Result {
if deploymentSpec == nil {
func ensureServerlessPrivateEndpoints(ctx context.Context, service *workflow.Context, groupID string, deployment *mdbv1.AtlasDeployment, deploymentName string, protected bool) workflow.Result {
if deployment == nil || deployment.Spec.ServerlessSpec == nil {
return workflow.Terminate(workflow.ServerlessPrivateEndpointReady, "deployment spec is empty")
}
deploymentSpec := deployment.Spec.ServerlessSpec

canReconcile, err := canServerlessPrivateEndpointsReconcile(ctx, service.Client, protected, groupID, deployment)
if err != nil {
result := workflow.Terminate(workflow.Internal, fmt.Sprintf("unable to resolve ownership for deletion protection: %s", err))
service.SetConditionFromResult(status.AlertConfigurationReadyType, result)

return result
}

if !canReconcile {
result := workflow.Terminate(
workflow.AtlasDeletionProtection,
"unable to reconcile Serverless Private Endpoints due to deletion protection being enabled. see https://dochub.mongodb.org/core/ako-deletion-protection for further information",
)
service.SetConditionFromResult(status.AlertConfigurationReadyType, result)

return result
}

providerName := GetServerlessProvider(deploymentSpec)
if providerName == provider.ProviderGCP {
if len(deploymentSpec.PrivateEndpoints) == 0 {
Expand All @@ -57,6 +79,36 @@ func ensureServerlessPrivateEndpoints(service *workflow.Context, groupID string,
return result
}

func canServerlessPrivateEndpointsReconcile(ctx context.Context, atlasClient mongodbatlas.Client, protected bool, groupID string, deployment *mdbv1.AtlasDeployment) (bool, error) {
if !protected {
return true, nil
}

latestConfig := &mdbv1.AtlasDeployment{}
latestConfigString, ok := deployment.Annotations[customresource.AnnotationLastAppliedConfiguration]
if ok {
if err := json.Unmarshal([]byte(latestConfigString), latestConfig); err != nil {
return false, err
}
}

list, _, err := atlasClient.ServerlessPrivateEndpoints.List(
ctx,
groupID,
deployment.Spec.ServerlessSpec.Name,
&mongodbatlas.ListOptions{},
)
if err != nil {
return false, err
}

if len(list) == 0 {
return true, nil
}

return false, nil
}

func GetServerlessProvider(deploymentSpec *mdbv1.ServerlessSpec) provider.ProviderName {
if deploymentSpec.ProviderSettings.ProviderName != provider.ProviderServerless {
return deploymentSpec.ProviderSettings.ProviderName
Expand Down
11 changes: 11 additions & 0 deletions pkg/controller/atlasdeployment/serverless_private_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package atlasdeployment

import "testing"

func TestCanServerlessPrivateEndpointsReconcile(t *testing.T) {
t.Run("should return true when subResourceDeletionProtection is disabled", func(t *testing.T) {
// result, err := canCloudProviderAccessReconcile(context.TODO(), mongodbatlas.Client{}, false, &mdbv1.AtlasProject{})
// require.NoError(t, err)
// require.True(t, result)
})
}
2 changes: 1 addition & 1 deletion test/int/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment"), func() {
lastGeneration++
})

By(fmt.Sprintf("Update Instance Size Margins with AutoScaling for Deployemnt %s", kube.ObjectKeyFromObject(createdDeployment)), func() {
By(fmt.Sprintf("Update Instance Size Margins with AutoScaling for Deployment %s", kube.ObjectKeyFromObject(createdDeployment)), func() {
regionConfig := createdDeployment.Spec.AdvancedDeploymentSpec.ReplicationSpecs[0].RegionConfigs[0]
regionConfig.AutoScaling.Compute.MinInstanceSize = "M20"
regionConfig.ElectableSpecs.InstanceSize = "M20"
Expand Down

0 comments on commit aafeb2d

Please sign in to comment.