Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
Signed-off-by: Jose Vazquez <[email protected]>
  • Loading branch information
josvazg committed Aug 23, 2023
1 parent d35c49a commit da2ed29
Show file tree
Hide file tree
Showing 6 changed files with 370 additions and 14 deletions.
3 changes: 2 additions & 1 deletion pkg/api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions pkg/controller/atlasdeployment/atlasdeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (r *AtlasDeploymentReconciler) Reconcile(context context.Context, req ctrl.
}

handleDeployment := r.selectDeploymentHandler(deployment)
if result, _ := handleDeployment(workflowCtx, project, deployment, req); !result.IsOk() {
if result, _ := handleDeployment(context, workflowCtx, project, deployment, req); !result.IsOk() {
workflowCtx.SetConditionFromResult(status.DeploymentReadyType, result)
return result.ReconcileResult(), nil
}
Expand Down Expand Up @@ -352,7 +352,7 @@ func (r *AtlasDeploymentReconciler) selectDeploymentHandler(deployment *mdbv1.At
}

// handleAdvancedDeployment ensures the state of the deployment using the Advanced Deployment API
func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) {
func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) {
c, result := r.ensureAdvancedDeploymentState(workflowCtx, project, deployment)
if c != nil && c.StateName != "" {
workflowCtx.EnsureStatusOption(status.AtlasDeploymentStateNameOption(c.StateName))
Expand Down Expand Up @@ -381,7 +381,7 @@ func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(workflowCtx *workfl
}

if err := r.ensureBackupScheduleAndPolicy(
context.Background(),
ctx,
workflowCtx, project.ID(),
deployment,
backupEnabled,
Expand All @@ -405,8 +405,8 @@ func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(workflowCtx *workfl
}

// handleServerlessInstance ensures the state of the serverless instance using the serverless API
func (r *AtlasDeploymentReconciler) handleServerlessInstance(workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) {
c, result := ensureServerlessInstanceState(workflowCtx, project, deployment.Spec.ServerlessSpec)
func (r *AtlasDeploymentReconciler) handleServerlessInstance(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) {
c, result := r.ensureServerlessInstanceState(ctx, workflowCtx, project, deployment)
return r.ensureConnectionSecretsAndSetStatusOptions(workflowCtx, project, deployment, result, c)
}

Expand Down Expand Up @@ -573,7 +573,7 @@ func (r *AtlasDeploymentReconciler) removeDeletionFinalizer(context context.Cont
return nil
}

type deploymentHandlerFunc func(workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error)
type deploymentHandlerFunc func(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error)

type atlasClusterType int

Expand Down
14 changes: 9 additions & 5 deletions pkg/controller/atlasdeployment/serverless_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ import (
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/workflow"
)

func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasProject, serverlessSpec *mdbv1.ServerlessSpec) (atlasDeployment *mongodbatlas.Cluster, _ workflow.Result) {
atlasDeployment, resp, err := ctx.Client.ServerlessInstances.Get(context.Background(), project.Status.ID, serverlessSpec.Name)
func (r *AtlasDeploymentReconciler) ensureServerlessInstanceState(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment) (atlasDeployment *mongodbatlas.Cluster, _ workflow.Result) {
if deployment == nil || deployment.Spec.ServerlessSpec == nil {
return nil, workflow.Terminate(workflow.ServerlessPrivateEndpointReady, "deployment spec is empty")
}
serverlessSpec := deployment.Spec.ServerlessSpec
atlasDeployment, resp, err := workflowCtx.Client.ServerlessInstances.Get(context.Background(), project.Status.ID, serverlessSpec.Name)
if err != nil {
if resp == nil {
return atlasDeployment, workflow.Terminate(workflow.Internal, err.Error())
Expand All @@ -24,8 +28,8 @@ func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasPr
return atlasDeployment, workflow.Terminate(workflow.DeploymentNotCreatedInAtlas, err.Error())
}

ctx.Log.Infof("Serverless Instance %s doesn't exist in Atlas - creating", serverlessSpec.Name)
atlasDeployment, _, err = ctx.Client.ServerlessInstances.Create(context.Background(), project.Status.ID, &mongodbatlas.ServerlessCreateRequestParams{
workflowCtx.Log.Infof("Serverless Instance %s doesn't exist in Atlas - creating", serverlessSpec.Name)
atlasDeployment, _, err = workflowCtx.Client.ServerlessInstances.Create(context.Background(), project.Status.ID, &mongodbatlas.ServerlessCreateRequestParams{
Name: serverlessSpec.Name,
ProviderSettings: &mongodbatlas.ServerlessProviderSettings{
BackingProviderName: serverlessSpec.ProviderSettings.BackingProviderName,
Expand All @@ -40,7 +44,7 @@ func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasPr

switch atlasDeployment.StateName {
case status.StateIDLE:
result := ensureServerlessPrivateEndpoints(ctx, project.ID(), serverlessSpec, atlasDeployment.Name)
result := ensureServerlessPrivateEndpoints(ctx, workflowCtx, project.ID(), deployment, atlasDeployment.Name, false)
return atlasDeployment, result
case status.StateCREATING:
return atlasDeployment, workflow.InProgress(workflow.DeploymentCreating, "deployment is provisioning")
Expand Down
66 changes: 64 additions & 2 deletions pkg/controller/atlasdeployment/serverless_private_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package atlasdeployment

import (
"context"
"encoding/json"
"fmt"

"github.com/mongodb/mongodb-atlas-kubernetes/pkg/util/stringutil"
Expand All @@ -14,6 +15,7 @@ import (

mdbv1 "github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1/provider"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/customresource"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/workflow"
)

Expand All @@ -28,10 +30,30 @@ const (
SPEStatusFailed = "FAILED" //stage 2
)

func ensureServerlessPrivateEndpoints(service *workflow.Context, groupID string, deploymentSpec *mdbv1.ServerlessSpec, deploymentName string) workflow.Result {
if deploymentSpec == nil {
func ensureServerlessPrivateEndpoints(ctx context.Context, service *workflow.Context, groupID string, deployment *mdbv1.AtlasDeployment, deploymentName string, protected bool) workflow.Result {
if deployment == nil || deployment.Spec.ServerlessSpec == nil {
return workflow.Terminate(workflow.ServerlessPrivateEndpointReady, "deployment spec is empty")
}
deploymentSpec := deployment.Spec.ServerlessSpec

canReconcile, err := canServerlessPrivateEndpointsReconcile(ctx, service, protected, groupID, deployment)
if err != nil {
result := workflow.Terminate(workflow.Internal, fmt.Sprintf("unable to resolve ownership for deletion protection: %s", err))
service.SetConditionFromResult(status.AlertConfigurationReadyType, result)

return result
}

if !canReconcile {
result := workflow.Terminate(
workflow.AtlasDeletionProtection,
"unable to reconcile Serverless Private Endpoints due to deletion protection being enabled. see https://dochub.mongodb.org/core/ako-deletion-protection for further information",
)
service.SetConditionFromResult(status.AlertConfigurationReadyType, result)

return result
}

providerName := GetServerlessProvider(deploymentSpec)
if providerName == provider.ProviderGCP {
if len(deploymentSpec.PrivateEndpoints) == 0 {
Expand All @@ -57,6 +79,38 @@ func ensureServerlessPrivateEndpoints(service *workflow.Context, groupID string,
return result
}

func canServerlessPrivateEndpointsReconcile(ctx context.Context, service *workflow.Context, protected bool, groupID string, deployment *mdbv1.AtlasDeployment) (bool, error) {
if !protected {
return true, nil
}

latestConfig := &mdbv1.AtlasDeployment{}
latestConfigString, ok := deployment.Annotations[customresource.AnnotationLastAppliedConfiguration]
if ok {
if err := json.Unmarshal([]byte(latestConfigString), latestConfig); err != nil {
return false, err
}
}

atlasClient := service.Client
existingPE, err := getAllExistingServerlessPE(ctx, atlasClient.ServerlessPrivateEndpoints, groupID, deployment.Spec.ServerlessSpec.Name)
if err != nil {
return false, err
}

if len(existingPE) == 0 {
return true, nil
}

logger := service.Log
if setsMatch(logger, existingPE, deployment.Spec.ServerlessSpec.PrivateEndpoints) ||
setsMatch(logger, existingPE, latestConfig.Spec.ServerlessSpec.PrivateEndpoints) {
return true, nil
}

return false, nil
}

func GetServerlessProvider(deploymentSpec *mdbv1.ServerlessSpec) provider.ProviderName {
if deploymentSpec.ProviderSettings.ProviderName != provider.ProviderServerless {
return deploymentSpec.ProviderSettings.ProviderName
Expand Down Expand Up @@ -174,6 +228,14 @@ type SPEDiff struct {
DuplicateToCreate []mdbv1.ServerlessPrivateEndpoint
}

func setsMatch(logger *zap.SugaredLogger, existedPE []mongodbatlas.ServerlessPrivateEndpointConnection, desiredPE []mdbv1.ServerlessPrivateEndpoint) bool {
d := sortServerlessPE(logger, existedPE, desiredPE)
return len(d.PEToCreate) == 0 &&
len(d.PEToConnect) == 0 &&
len(d.PEToDelete) == 0 &&
len(d.DuplicateToCreate) == 0
}

func (d *SPEDiff) appendToCreate(pe mdbv1.ServerlessPrivateEndpoint) {
for _, p := range d.PEToCreate {
if p.Name == pe.Name {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package atlasdeployment

import (
"context"

"go.mongodb.org/atlas/mongodbatlas"
)

type ServerlessPrivateEndpointClientMock struct {
ListFn func(string, string, *mongodbatlas.ListOptions) ([]mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error)
}

func (spec ServerlessPrivateEndpointClientMock) List(_ context.Context, groupID string, instanceName string, opts *mongodbatlas.ListOptions) ([]mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) {
if spec.ListFn == nil {
panic("ListFn not mocked for test")
}
return spec.ListFn(groupID, instanceName, opts)
}

func (spec ServerlessPrivateEndpointClientMock) Create(ctx context.Context, groupID string, instanceName string, opts *mongodbatlas.ServerlessPrivateEndpointConnection) (*mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) {
panic("not implemented") // TODO: Implement
}

func (spec ServerlessPrivateEndpointClientMock) Get(ctx context.Context, groupID string, instanceName string, opts string) (*mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) {
panic("not implemented") // TODO: Implement
}

func (spec ServerlessPrivateEndpointClientMock) Delete(ctx context.Context, groupID string, instanceName string, opts string) (*mongodbatlas.Response, error) {
panic("not implemented") // TODO: Implement
}

func (spec ServerlessPrivateEndpointClientMock) Update(_ context.Context, _ string, _ string, _ string, _ *mongodbatlas.ServerlessPrivateEndpointConnection) (*mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) {
panic("not implemented") // TODO: Implement
}
Loading

0 comments on commit da2ed29

Please sign in to comment.