Skip to content

Commit

Permalink
CLOUDP-278387: Add index for dbUser by project and improve connection…
Browse files Browse the repository at this point in the history
… secret generation (#1860)
  • Loading branch information
helderjs authored Oct 14, 2024
1 parent 94ecc3e commit 39a3e3e
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 40 deletions.
41 changes: 8 additions & 33 deletions pkg/controller/atlasdeployment/advanced_deployment.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package atlasdeployment

import (
"context"
"fmt"
"reflect"
"strings"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -19,6 +19,7 @@ import (
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/connectionsecret"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/customresource"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/workflow"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/indexer"
)

const FreeTier = "M0"
Expand Down Expand Up @@ -112,13 +113,17 @@ func (r *AtlasDeploymentReconciler) handleAdvancedDeployment(ctx *workflow.Conte
}

func (r *AtlasDeploymentReconciler) ensureConnectionSecrets(ctx *workflow.Context, deploymentInAKO deployment.Deployment, connection *status.ConnectionStrings) error {
dbUsers, err := r.getProjectDatabaseUsers(ctx.Context, deploymentInAKO.GetProjectID())
databaseUsers := &akov2.AtlasDatabaseUserList{}
listOpts := &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector(indexer.AtlasDatabaseUserByProject, deploymentInAKO.GetProjectID()),
}
err := r.Client.List(ctx.Context, databaseUsers, listOpts)
if err != nil {
return err
}

secrets := make([]string, 0)
for _, dbUser := range dbUsers {
for _, dbUser := range databaseUsers.Items {
found := false
for _, c := range dbUser.Status.Conditions {
if c.Type == api.ReadyType && c.Status == v1.ConditionTrue {
Expand Down Expand Up @@ -204,33 +209,3 @@ func (r *AtlasDeploymentReconciler) ensureAdvancedOptions(ctx *workflow.Context,

return nil
}

func (r *AtlasDeploymentReconciler) getProjectDatabaseUsers(ctx context.Context, projectID string) ([]*akov2.AtlasDatabaseUser, error) {
databaseUsers := &akov2.AtlasDatabaseUserList{}
err := r.Client.List(ctx, databaseUsers, &client.ListOptions{})
if err != nil {
return nil, err
}

result := make([]*akov2.AtlasDatabaseUser, 0, len(databaseUsers.Items))
for _, dbUser := range databaseUsers.Items {
switch {
// if external reference and refer to same project as deployment, append
case dbUser.Spec.ExternalProjectRef != nil && dbUser.Spec.ExternalProjectRef.ID == projectID:
result = append(result, &dbUser)
// if internal reference and project resource is the same of the deployment, append
case dbUser.Spec.Project != nil:
atlasProject := &akov2.AtlasProject{}
err = r.Client.Get(ctx, *dbUser.Spec.Project.GetObject(dbUser.Namespace), atlasProject)
if err != nil {
return nil, err
}

if atlasProject.ID() == projectID {
result = append(result, &dbUser)
}
}
}

return result, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -366,17 +366,20 @@ func TestRegularClusterReconciliation(t *testing.T) {
}
d.Spec.DeploymentSpec.SearchNodes = searchNodes

ctx := context.Background()
logger := zaptest.NewLogger(t)

sch := runtime.NewScheme()
require.NoError(t, akov2.AddToScheme(sch))
require.NoError(t, corev1.AddToScheme(sch))
dbUserProjectIndexer := indexer.NewAtlasDatabaseUserByProjectIndexer(ctx, nil, logger)
// Subresources need to be explicitly set now since controller-runtime 1.15
// https://github.com/kubernetes-sigs/controller-runtime/issues/2362#issuecomment-1698194188
k8sClient := fake.NewClientBuilder().
WithScheme(sch).
WithObjects(secret, project, bPolicy, bSchedule, d).
WithStatusSubresource(bPolicy, bSchedule).
WithIndex(dbUserProjectIndexer.Object(), dbUserProjectIndexer.Name(), dbUserProjectIndexer.Keys).
Build()

orgID := "0987654321"
Expand Down Expand Up @@ -535,7 +538,7 @@ func TestRegularClusterReconciliation(t *testing.T) {

t.Run("should reconcile with existing cluster", func(t *testing.T) {
result, err := reconciler.Reconcile(
context.Background(),
ctx,
ctrl.Request{
NamespacedName: types.NamespacedName{
Namespace: d.Namespace,
Expand All @@ -549,6 +552,7 @@ func TestRegularClusterReconciliation(t *testing.T) {
}

func TestServerlessInstanceReconciliation(t *testing.T) {
ctx := context.Background()
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "api-secret",
Expand Down Expand Up @@ -585,11 +589,13 @@ func TestServerlessInstanceReconciliation(t *testing.T) {
sch := runtime.NewScheme()
require.NoError(t, akov2.AddToScheme(sch))
require.NoError(t, corev1.AddToScheme(sch))
dbUserProjectIndexer := indexer.NewAtlasDatabaseUserByProjectIndexer(ctx, nil, logger)
// Subresources need to be explicitly set now since controller-runtime 1.15
// https://github.com/kubernetes-sigs/controller-runtime/issues/2362#issuecomment-1698194188
k8sClient := fake.NewClientBuilder().
WithScheme(sch).
WithObjects(secret, project, d).
WithIndex(dbUserProjectIndexer.Object(), dbUserProjectIndexer.Name(), dbUserProjectIndexer.Keys).
Build()

orgID := "0987654321"
Expand Down Expand Up @@ -660,7 +666,7 @@ func TestServerlessInstanceReconciliation(t *testing.T) {

t.Run("should reconcile with existing serverless instance", func(t *testing.T) {
result, err := reconciler.Reconcile(
context.Background(),
ctx,
ctrl.Request{
NamespacedName: types.NamespacedName{
Namespace: d.Namespace,
Expand Down
12 changes: 8 additions & 4 deletions pkg/controller/atlasdeployment/serverless_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/common"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/provider"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/workflow"
"github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/indexer"
)

func TestHandleServerlessInstance(t *testing.T) {
Expand Down Expand Up @@ -779,33 +780,36 @@ func TestHandleServerlessInstance(t *testing.T) {

for name, tt := range tests {
t.Run(name, func(t *testing.T) {
ctx := context.Background()
logger := zaptest.NewLogger(t)
testScheme := runtime.NewScheme()
require.NoError(t, akov2.AddToScheme(testScheme))
dbUserProjectIndexer := indexer.NewAtlasDatabaseUserByProjectIndexer(ctx, nil, logger)
k8sClient := fake.NewClientBuilder().
WithScheme(testScheme).
WithObjects(tt.atlasDeployment).
WithIndex(dbUserProjectIndexer.Object(), dbUserProjectIndexer.Name(), dbUserProjectIndexer.Keys).
Build()
reconciler := &AtlasDeploymentReconciler{
Client: k8sClient,
Log: logger.Sugar(),
deploymentService: tt.deploymentService(),
}
ctx := &workflow.Context{
Context: context.Background(),
workflowCtx := &workflow.Context{
Context: ctx,
Log: logger.Sugar(),
SdkClient: tt.sdkMock(),
}

deploymentInAKO := deployment.NewDeployment("project-id", tt.atlasDeployment).(*deployment.Serverless)
result, err := reconciler.handleServerlessInstance(ctx, deploymentInAKO, tt.deploymentInAtlas)
result, err := reconciler.handleServerlessInstance(workflowCtx, deploymentInAKO, tt.deploymentInAtlas)
require.NoError(t, err)
assert.Equal(t, tt.expectedResult, result)
assert.True(
t,
cmp.Equal(
tt.expectedConditions,
ctx.Conditions(),
workflowCtx.Conditions(),
cmpopts.IgnoreFields(api.Condition{}, "LastTransitionTime"),
),
)
Expand Down
64 changes: 64 additions & 0 deletions pkg/indexer/atlasdatabaseuserprojects.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package indexer

import (
"context"

"go.uber.org/zap"
"sigs.k8s.io/controller-runtime/pkg/client"

akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1"
)

const (
AtlasDatabaseUserByProject = "atlasdatabaseuser.spec.projectRef,externalProjectID"
)

type AtlasDatabaseUserByProjectIndexer struct {
ctx context.Context
client client.Client
logger *zap.SugaredLogger
}

func NewAtlasDatabaseUserByProjectIndexer(ctx context.Context, client client.Client, logger *zap.Logger) *AtlasDatabaseUserByProjectIndexer {
return &AtlasDatabaseUserByProjectIndexer{
ctx: ctx,
client: client,
logger: logger.Named(AtlasDatabaseUserByProject).Sugar(),
}
}

func (*AtlasDatabaseUserByProjectIndexer) Object() client.Object {
return &akov2.AtlasDatabaseUser{}
}

func (*AtlasDatabaseUserByProjectIndexer) Name() string {
return AtlasDatabaseUserByProject
}

func (a *AtlasDatabaseUserByProjectIndexer) Keys(object client.Object) []string {
user, ok := object.(*akov2.AtlasDatabaseUser)
if !ok {
a.logger.Errorf("expected *v1.AtlasDatabaseUser but got %T", object)
return nil
}

if user.Spec.ExternalProjectRef != nil && user.Spec.ExternalProjectRef.ID != "" {
return []string{user.Spec.ExternalProjectRef.ID}
}

if user.Spec.Project != nil && user.Spec.Project.Name != "" {
project := &akov2.AtlasProject{}
err := a.client.Get(a.ctx, *user.Spec.Project.GetObject(user.Namespace), project)
if err != nil {
a.logger.Errorf("unable to find project to index: %s", err)

return nil
}

if project.ID() != "" {
return []string{project.ID()}
}
}

return nil
}
Loading

0 comments on commit 39a3e3e

Please sign in to comment.