diff --git a/pkg/controller/atlasdatabaseuser/atlasdatabaseuser_controller.go b/pkg/controller/atlasdatabaseuser/atlasdatabaseuser_controller.go index 91f46b11f6..a4865abe8a 100644 --- a/pkg/controller/atlasdatabaseuser/atlasdatabaseuser_controller.go +++ b/pkg/controller/atlasdatabaseuser/atlasdatabaseuser_controller.go @@ -222,13 +222,7 @@ func (r *AtlasDatabaseUserReconciler) SetupWithManager(mgr ctrl.Manager, skipNam ). Watches( &corev1.Secret{}, - handler.EnqueueRequestsFromMapFunc(indexer.CredentialsIndexMapperFunc( - indexer.AtlasDatabaseUserCredentialsIndex, - &akov2.AtlasDatabaseUserList{}, - indexer.DatabaseUserRequests, - r.Client, - r.Log, - )), + handler.EnqueueRequestsFromMapFunc(r.databaseUsersForCredentialMapFunc()), builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}), ). WithOptions(controller.TypedOptions[reconcile.Request]{SkipNameValidation: pointer.MakePtr(skipNameValidation)}). @@ -272,6 +266,16 @@ func (r *AtlasDatabaseUserReconciler) findAtlasDatabaseUserForSecret(ctx context return requests } +func (r *AtlasDatabaseUserReconciler) databaseUsersForCredentialMapFunc() handler.MapFunc { + return indexer.CredentialsIndexMapperFunc( + indexer.AtlasDatabaseUserCredentialsIndex, + func() *akov2.AtlasDatabaseUserList { return &akov2.AtlasDatabaseUserList{} }, + indexer.DatabaseUserRequests, + r.Client, + r.Log, + ) +} + func NewAtlasDatabaseUserReconciler( mgr manager.Manager, predicates []predicate.Predicate, diff --git a/pkg/controller/atlasdeployment/atlasdeployment_controller.go b/pkg/controller/atlasdeployment/atlasdeployment_controller.go index a1f0a8da37..80014e2b10 100644 --- a/pkg/controller/atlasdeployment/atlasdeployment_controller.go +++ b/pkg/controller/atlasdeployment/atlasdeployment_controller.go @@ -451,13 +451,7 @@ func (r *AtlasDeploymentReconciler) SetupWithManager(mgr ctrl.Manager, skipNameV ). Watches( &corev1.Secret{}, - handler.EnqueueRequestsFromMapFunc(indexer.CredentialsIndexMapperFunc( - indexer.AtlasDeploymentCredentialsIndex, - &akov2.AtlasDeploymentList{}, - indexer.DeploymentRequests, - r.Client, - r.Log, - )), + handler.EnqueueRequestsFromMapFunc(r.deploymentsForCredentialMapFunc()), builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}), ). WithOptions(controller.TypedOptions[reconcile.Request]{SkipNameValidation: pointer.MakePtr(skipNameValidation)}). @@ -595,3 +589,13 @@ func (r *AtlasDeploymentReconciler) findDeploymentsForSearchIndexConfig(ctx cont return requests } + +func (r *AtlasDeploymentReconciler) deploymentsForCredentialMapFunc() handler.MapFunc { + return indexer.CredentialsIndexMapperFunc( + indexer.AtlasDeploymentCredentialsIndex, + func() *akov2.AtlasDeploymentList { return &akov2.AtlasDeploymentList{} }, + indexer.DeploymentRequests, + r.Client, + r.Log, + ) +} diff --git a/pkg/indexer/localcredentials.go b/pkg/indexer/localcredentials.go index f91bc1d5e0..d7d051e423 100644 --- a/pkg/indexer/localcredentials.go +++ b/pkg/indexer/localcredentials.go @@ -57,7 +57,7 @@ func (lc *LocalCredentialIndexer) Keys(object client.Object) []string { type requestsFunc[L client.ObjectList] func(L) []reconcile.Request -func CredentialsIndexMapperFunc[L client.ObjectList](indexerName string, list L, reqsFn requestsFunc[L], kubeClient client.Client, logger *zap.SugaredLogger) handler.MapFunc { +func CredentialsIndexMapperFunc[L client.ObjectList](indexerName string, listGenFn func() L, reqsFn requestsFunc[L], kubeClient client.Client, logger *zap.SugaredLogger) handler.MapFunc { return func(ctx context.Context, obj client.Object) []reconcile.Request { secret, ok := obj.(*corev1.Secret) if !ok { @@ -71,6 +71,7 @@ func CredentialsIndexMapperFunc[L client.ObjectList](indexerName string, list L, client.ObjectKeyFromObject(secret).String(), ), } + list := listGenFn() err := kubeClient.List(ctx, list, listOpts) if err != nil { logger.Errorf("failed to list from indexer %s: %v", indexerName, err) diff --git a/pkg/indexer/localcredentials_test.go b/pkg/indexer/localcredentials_test.go index 7f1e95fd87..cf4784205c 100644 --- a/pkg/indexer/localcredentials_test.go +++ b/pkg/indexer/localcredentials_test.go @@ -2,7 +2,9 @@ package indexer import ( "context" + "fmt" "sort" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -21,6 +23,10 @@ import ( akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" ) +const ( + testUsername = "matching-user" +) + func TestAtlasDatabaseUserLocalCredentialsIndexer(t *testing.T) { for _, tc := range []struct { name string @@ -106,27 +112,8 @@ func TestCredentialsIndexMapperFunc(t *testing.T) { { name: "matching input credentials renders matching user", mapperFn: dbUserMapperFunc, - input: &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: "secret-ref", - Namespace: "ns", - }, - }, - objects: []client.Object{ - &akov2.AtlasDatabaseUser{ - ObjectMeta: metav1.ObjectMeta{ - Name: "matching-user", - Namespace: "ns", - }, - Spec: akov2.AtlasDatabaseUserSpec{ - LocalCredentialHolder: api.LocalCredentialHolder{ - ConnectionSecret: &api.LocalObjectReference{ - Name: "secret-ref", - }, - }, - }, - }, - }, + input: newTestSecret("matching-user-secret-ref"), + objects: []client.Object{newTestUser("matching-user")}, want: []reconcile.Request{ {NamespacedName: types.NamespacedName{ Name: "matching-user", @@ -161,10 +148,77 @@ func TestCredentialsIndexMapperFunc(t *testing.T) { } } +func TestCredentialsIndexMapperFuncRace(t *testing.T) { + scheme := runtime.NewScheme() + assert.NoError(t, corev1.AddToScheme(scheme)) + assert.NoError(t, akov2.AddToScheme(scheme)) + indexer := NewLocalCredentialsIndexer( + AtlasDatabaseUserCredentialsIndex, + &akov2.AtlasDatabaseUser{}, + zaptest.NewLogger(t), + ) + objs := make([]client.Object, 10) + for i := range objs { + objs[i] = newTestUser(fmt.Sprintf("%s-%d", testUsername, i)) + } + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(objs...). + WithIndex( + &akov2.AtlasDatabaseUser{}, + AtlasDatabaseUserCredentialsIndex, + func(obj client.Object) []string { + return indexer.Keys(obj) + }). + Build() + fn := dbUserMapperFunc(fakeClient, zaptest.NewLogger(t).Sugar()) + ctx := context.Background() + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + input := newTestSecret(fmt.Sprintf("%s-%d-secret-ref", testUsername, i)) + result := fn(ctx, input) + if i < len(objs) { + assert.NotEmpty(t, result, "failed to find for index %d", i) + } else { + assert.Empty(t, result, "failed not to find for index %d", i) + } + }(i) + } + wg.Wait() +} + +func newTestUser(username string) *akov2.AtlasDatabaseUser { + return &akov2.AtlasDatabaseUser{ + ObjectMeta: metav1.ObjectMeta{ + Name: username, + Namespace: "ns", + }, + Spec: akov2.AtlasDatabaseUserSpec{ + LocalCredentialHolder: api.LocalCredentialHolder{ + ConnectionSecret: &api.LocalObjectReference{ + Name: fmt.Sprintf("%s-secret-ref", username), + }, + }, + }, + } +} + +func newTestSecret(name string) *corev1.Secret { + return &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "ns", + }, + } +} + func dbUserMapperFunc(kubeClient client.Client, logger *zap.SugaredLogger) handler.MapFunc { - return CredentialsIndexMapperFunc( + return CredentialsIndexMapperFunc[*akov2.AtlasDatabaseUserList]( AtlasDatabaseUserCredentialsIndex, - &akov2.AtlasDatabaseUserList{}, + func() *akov2.AtlasDatabaseUserList { return &akov2.AtlasDatabaseUserList{} }, DatabaseUserRequests, kubeClient, logger,