Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
Signed-off-by: Jose Vazquez <[email protected]>
  • Loading branch information
josvazg committed Aug 25, 2023
1 parent df7db35 commit f593f2a
Show file tree
Hide file tree
Showing 10 changed files with 544 additions and 52 deletions.
33 changes: 17 additions & 16 deletions pkg/controller/atlasdeployment/atlasdeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,6 @@ func (r *AtlasDeploymentReconciler) Reconcile(context context.Context, req ctrl.
return result.ReconcileResult(), nil
}

err = customresource.ApplyLastConfigApplied(context, deployment, r.Client)
if err != nil {
result = workflow.Terminate(workflow.Internal, err.Error())
workflowCtx.SetConditionFromResult(status.DeploymentReadyType, result)
log.Error(result.GetMessage())

return result.ReconcileResult(), nil
}

if deployment.IsLegacyDeployment() {
if err := ConvertLegacyDeployment(&deployment.Spec); err != nil {
result = workflow.Terminate(workflow.Internal, err.Error())
Expand All @@ -188,7 +179,7 @@ func (r *AtlasDeploymentReconciler) Reconcile(context context.Context, req ctrl.
}

handleDeployment := r.selectDeploymentHandler(deployment)
if result, _ := handleDeployment(workflowCtx, project, deployment, req); !result.IsOk() {
if result, _ := handleDeployment(context, workflowCtx, project, deployment, req); !result.IsOk() {
workflowCtx.SetConditionFromResult(status.DeploymentReadyType, result)
return result.ReconcileResult(), nil
}
Expand All @@ -199,6 +190,16 @@ func (r *AtlasDeploymentReconciler) Reconcile(context context.Context, req ctrl.
return result.ReconcileResult(), nil
}
}

err = customresource.ApplyLastConfigApplied(context, deployment, r.Client)
if err != nil {
result = workflow.Terminate(workflow.Internal, err.Error())
workflowCtx.SetConditionFromResult(status.DeploymentReadyType, result)
log.Error(result.GetMessage())

return result.ReconcileResult(), nil
}

return workflow.OK().ReconcileResult(), nil
}

Expand Down Expand Up @@ -358,7 +359,7 @@ func (r *AtlasDeploymentReconciler) selectDeploymentHandler(deployment *mdbv1.At
}

// handleAdvancedDeployment ensures the state of the deployment using the Advanced Deployment API
func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) {
func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) {
c, result := r.ensureAdvancedDeploymentState(workflowCtx, project, deployment)
if c != nil && c.StateName != "" {
workflowCtx.EnsureStatusOption(status.AtlasDeploymentStateNameOption(c.StateName))
Expand Down Expand Up @@ -387,7 +388,7 @@ func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(workflowCtx *workfl
}

if err := r.ensureBackupScheduleAndPolicy(
context.Background(),
ctx,
workflowCtx, project.ID(),
deployment,
backupEnabled,
Expand All @@ -411,8 +412,8 @@ func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(workflowCtx *workfl
}

// handleServerlessInstance ensures the state of the serverless instance using the serverless API
func (r *AtlasDeploymentReconciler) handleServerlessInstance(workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) {
c, result := ensureServerlessInstanceState(workflowCtx, project, deployment.Spec.ServerlessSpec)
func (r *AtlasDeploymentReconciler) handleServerlessInstance(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error) {
c, result := r.ensureServerlessInstanceState(ctx, workflowCtx, project, deployment)
return r.ensureConnectionSecretsAndSetStatusOptions(workflowCtx, project, deployment, result, c)
}

Expand Down Expand Up @@ -579,7 +580,7 @@ func (r *AtlasDeploymentReconciler) removeDeletionFinalizer(context context.Cont
return nil
}

type deploymentHandlerFunc func(workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error)
type deploymentHandlerFunc func(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment, req reconcile.Request) (workflow.Result, error)

type atlasClusterType int

Expand Down Expand Up @@ -687,7 +688,7 @@ func advancedDeploymentMatchesSpec(log *zap.SugaredLogger, atlasSpec *mongodbatl
return d == "", nil
}

// Parse through tags and verfiy that all keys are unique. Return error otherwise.
// Parse through tags and verify that all keys are unique. Return error otherwise.
func uniqueKey(deploymentSpec *mdbv1.AtlasDeploymentSpec) error {
store := make(map[string]string)
var arrTags []*mdbv1.TagSpec
Expand Down
16 changes: 10 additions & 6 deletions pkg/controller/atlasdeployment/serverless_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ import (
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/workflow"
)

func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasProject, serverlessSpec *mdbv1.ServerlessSpec) (atlasDeployment *mongodbatlas.Cluster, _ workflow.Result) {
atlasDeployment, resp, err := ctx.Client.ServerlessInstances.Get(context.Background(), project.Status.ID, serverlessSpec.Name)
func (r *AtlasDeploymentReconciler) ensureServerlessInstanceState(ctx context.Context, workflowCtx *workflow.Context, project *mdbv1.AtlasProject, deployment *mdbv1.AtlasDeployment) (atlasDeployment *mongodbatlas.Cluster, _ workflow.Result) {
if deployment == nil || deployment.Spec.ServerlessSpec == nil {
return nil, workflow.Terminate(workflow.ServerlessPrivateEndpointReady, "deployment spec is empty")
}
serverlessSpec := deployment.Spec.ServerlessSpec
atlasDeployment, resp, err := workflowCtx.Client.ServerlessInstances.Get(context.Background(), project.Status.ID, serverlessSpec.Name)
if err != nil {
if resp == nil {
return atlasDeployment, workflow.Terminate(workflow.Internal, err.Error())
Expand All @@ -28,8 +32,8 @@ func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasPr
if err != nil {
return atlasDeployment, workflow.Terminate(workflow.Internal, err.Error())
}
ctx.Log.Infof("Serverless Instance %s doesn't exist in Atlas - creating", serverlessSpec.Name)
atlasDeployment, _, err = ctx.Client.ServerlessInstances.Create(context.Background(), project.Status.ID, &mongodbatlas.ServerlessCreateRequestParams{
workflowCtx.Log.Infof("Serverless Instance %s doesn't exist in Atlas - creating", serverlessSpec.Name)
atlasDeployment, _, err = workflowCtx.Client.ServerlessInstances.Create(context.Background(), project.Status.ID, &mongodbatlas.ServerlessCreateRequestParams{
Name: serverlessSpec.Name,
ProviderSettings: &mongodbatlas.ServerlessProviderSettings{
BackingProviderName: serverlessSpec.ProviderSettings.BackingProviderName,
Expand All @@ -53,7 +57,7 @@ func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasPr
convertedDeployment.Tags = &[]*mongodbatlas.Tag{}
}
if !isTagsEqual(*(atlasDeployment.Tags), *(convertedDeployment.Tags)) {
atlasDeployment, _, err = ctx.Client.ServerlessInstances.Update(context.Background(), project.Status.ID, serverlessSpec.Name, &mongodbatlas.ServerlessUpdateRequestParams{
atlasDeployment, _, err = workflowCtx.Client.ServerlessInstances.Update(context.Background(), project.Status.ID, serverlessSpec.Name, &mongodbatlas.ServerlessUpdateRequestParams{
// TODO: include ServerlessBackupOptions and TerminationProtectionEnabled
Tag: convertedDeployment.Tags,
})
Expand All @@ -62,7 +66,7 @@ func ensureServerlessInstanceState(ctx *workflow.Context, project *mdbv1.AtlasPr
}
return atlasDeployment, workflow.InProgress(workflow.DeploymentUpdating, "deployment is updating")
}
result := ensureServerlessPrivateEndpoints(ctx, project.ID(), serverlessSpec, atlasDeployment.Name)
result := ensureServerlessPrivateEndpoints(ctx, workflowCtx, project.ID(), deployment, atlasDeployment.Name, r.SubObjectDeletionProtection)
return atlasDeployment, result

case status.StateCREATING:
Expand Down
103 changes: 101 additions & 2 deletions pkg/controller/atlasdeployment/serverless_private_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package atlasdeployment

import (
"context"
"encoding/json"
"fmt"
"sort"

"github.com/mongodb/mongodb-atlas-kubernetes/pkg/util/stringutil"

Expand All @@ -14,6 +16,7 @@ import (

mdbv1 "github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1/provider"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/customresource"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/workflow"
)

Expand All @@ -28,10 +31,30 @@ const (
SPEStatusFailed = "FAILED" //stage 2
)

func ensureServerlessPrivateEndpoints(service *workflow.Context, groupID string, deploymentSpec *mdbv1.ServerlessSpec, deploymentName string) workflow.Result {
if deploymentSpec == nil {
func ensureServerlessPrivateEndpoints(ctx context.Context, service *workflow.Context, groupID string, deployment *mdbv1.AtlasDeployment, deploymentName string, protected bool) workflow.Result {
if deployment == nil || deployment.Spec.ServerlessSpec == nil {
return workflow.Terminate(workflow.ServerlessPrivateEndpointReady, "deployment spec is empty")
}
deploymentSpec := deployment.Spec.ServerlessSpec

canReconcile, err := canServerlessPrivateEndpointsReconcile(ctx, service, protected, groupID, deployment)
if err != nil {
result := workflow.Terminate(workflow.Internal, fmt.Sprintf("unable to resolve ownership for deletion protection: %s", err))
service.SetConditionFromResult(status.AlertConfigurationReadyType, result)

return result
}

if !canReconcile {
result := workflow.Terminate(
workflow.AtlasDeletionProtection,
"unable to reconcile Serverless Private Endpoints due to deletion protection being enabled. see https://dochub.mongodb.org/core/ako-deletion-protection for further information",
)
service.SetConditionFromResult(status.AlertConfigurationReadyType, result)

return result
}

providerName := GetServerlessProvider(deploymentSpec)
if providerName == provider.ProviderGCP {
if len(deploymentSpec.PrivateEndpoints) == 0 {
Expand All @@ -57,6 +80,82 @@ func ensureServerlessPrivateEndpoints(service *workflow.Context, groupID string,
return result
}

func canServerlessPrivateEndpointsReconcile(ctx context.Context, service *workflow.Context, protected bool, groupID string, deployment *mdbv1.AtlasDeployment) (bool, error) {
if !protected {
return true, nil
}

latestConfig := &mdbv1.AtlasDeploymentSpec{}
latestConfigString, ok := deployment.Annotations[customresource.AnnotationLastAppliedConfiguration]
if ok {
if err := json.Unmarshal([]byte(latestConfigString), latestConfig); err != nil {
return false, err
}
}

atlasClient := service.Client
existingPE, err := getAllExistingServerlessPE(ctx, atlasClient.ServerlessPrivateEndpoints, groupID, deployment.Spec.ServerlessSpec.Name)
if err != nil {
return false, err
}

if len(existingPE) == 0 {
return true, nil
}

logger := service.Log
prevCfg := prevPEConfig(latestConfig)
if matchingPEs(logger, deployment.Spec.ServerlessSpec.PrivateEndpoints, existingPE) ||
matchingPEs(logger, prevCfg, existingPE) {
return true, nil
}
return false, nil
}

func sortedK8sPENames(spes []mdbv1.ServerlessPrivateEndpoint) []string {
names := make([]string, 0, len(spes))
for _, spe := range spes {
names = append(names, spe.Name)
}
sort.Strings(names)
return names
}

func sortedAtlasPENames(atlasPEs []mongodbatlas.ServerlessPrivateEndpointConnection) []string {
names := make([]string, 0, len(atlasPEs))
for _, atlasPE := range atlasPEs {
names = append(names, atlasPE.Comment)
}
sort.Strings(names)
return names
}

func matchingPEs(logger *zap.SugaredLogger, spes []mdbv1.ServerlessPrivateEndpoint, atlasPEs []mongodbatlas.ServerlessPrivateEndpointConnection) bool {
k8sPENames := sortedK8sPENames(spes)
atlasPENames := sortedAtlasPENames(atlasPEs)
if len(k8sPENames) != len(atlasPEs) {
logger.Debugf("Kubernetes PEs do not match Atlas: k8s %v != Atlas %v", k8sPENames, atlasPENames)
logger.Debugf("Different PE sets lengths Kubernetes wants %d but atlas has %d", len(k8sPENames), len(atlasPEs))
return false
}
for i, k8sName := range k8sPENames {
if atlasPENames[i] != k8sName {
logger.Debugf("Kubernetes PEs do not match Atlas: k8s %v != Atlas %v", k8sPENames, atlasPENames)
logger.Debugf("Different PE at index %d %d but atlas has %d", k8sName, atlasPENames[i])
return false
}
}
logger.Debugf("Kubernetes PEs MATCH Atlas: k8s %v == Atlas %v", k8sPENames, atlasPENames)
return true
}

func prevPEConfig(deploymentSpec *mdbv1.AtlasDeploymentSpec) []mdbv1.ServerlessPrivateEndpoint {
if deploymentSpec.ServerlessSpec == nil || deploymentSpec.ServerlessSpec.PrivateEndpoints == nil {
return []mdbv1.ServerlessPrivateEndpoint{}
}
return deploymentSpec.ServerlessSpec.PrivateEndpoints
}

func GetServerlessProvider(deploymentSpec *mdbv1.ServerlessSpec) provider.ProviderName {
if deploymentSpec.ProviderSettings.ProviderName != provider.ProviderServerless {
return deploymentSpec.ProviderSettings.ProviderName
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package atlasdeployment

import (
"context"

"go.mongodb.org/atlas/mongodbatlas"
)

type ServerlessPrivateEndpointClientMock struct {
ListFn func(string, string, *mongodbatlas.ListOptions) ([]mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error)
}

func (spec ServerlessPrivateEndpointClientMock) List(_ context.Context, groupID string, instanceName string, opts *mongodbatlas.ListOptions) ([]mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) {
if spec.ListFn == nil {
panic("ListFn not mocked for test")
}
return spec.ListFn(groupID, instanceName, opts)
}

func (spec ServerlessPrivateEndpointClientMock) Create(ctx context.Context, groupID string, instanceName string, opts *mongodbatlas.ServerlessPrivateEndpointConnection) (*mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) {
panic("not implemented") // TODO: Implement
}

func (spec ServerlessPrivateEndpointClientMock) Get(ctx context.Context, groupID string, instanceName string, opts string) (*mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) {
panic("not implemented") // TODO: Implement
}

func (spec ServerlessPrivateEndpointClientMock) Delete(ctx context.Context, groupID string, instanceName string, opts string) (*mongodbatlas.Response, error) {
panic("not implemented") // TODO: Implement
}

func (spec ServerlessPrivateEndpointClientMock) Update(_ context.Context, _ string, _ string, _ string, _ *mongodbatlas.ServerlessPrivateEndpointConnection) (*mongodbatlas.ServerlessPrivateEndpointConnection, *mongodbatlas.Response, error) {
panic("not implemented") // TODO: Implement
}
Loading

0 comments on commit f593f2a

Please sign in to comment.