Skip to content

Commit

Permalink
CLOUDP-279474: Use generator to avoid sharing list value (#1871)
Browse files Browse the repository at this point in the history
* Add data race detection unit test

* CLOUDP-279474: Use generator to avoid sharing list value
  • Loading branch information
josvazg authored Oct 18, 2024
1 parent 756aba9 commit ad40d6f
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 38 deletions.
18 changes: 11 additions & 7 deletions pkg/controller/atlasdatabaseuser/atlasdatabaseuser_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,7 @@ func (r *AtlasDatabaseUserReconciler) SetupWithManager(mgr ctrl.Manager, skipNam
).
Watches(
&corev1.Secret{},
handler.EnqueueRequestsFromMapFunc(indexer.CredentialsIndexMapperFunc(
indexer.AtlasDatabaseUserCredentialsIndex,
&akov2.AtlasDatabaseUserList{},
indexer.DatabaseUserRequests,
r.Client,
r.Log,
)),
handler.EnqueueRequestsFromMapFunc(r.databaseUsersForCredentialMapFunc()),
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}),
).
WithOptions(controller.TypedOptions[reconcile.Request]{SkipNameValidation: pointer.MakePtr(skipNameValidation)}).
Expand Down Expand Up @@ -272,6 +266,16 @@ func (r *AtlasDatabaseUserReconciler) findAtlasDatabaseUserForSecret(ctx context
return requests
}

func (r *AtlasDatabaseUserReconciler) databaseUsersForCredentialMapFunc() handler.MapFunc {
return indexer.CredentialsIndexMapperFunc(
indexer.AtlasDatabaseUserCredentialsIndex,
func() *akov2.AtlasDatabaseUserList { return &akov2.AtlasDatabaseUserList{} },
indexer.DatabaseUserRequests,
r.Client,
r.Log,
)
}

func NewAtlasDatabaseUserReconciler(
mgr manager.Manager,
predicates []predicate.Predicate,
Expand Down
18 changes: 11 additions & 7 deletions pkg/controller/atlasdeployment/atlasdeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,13 +451,7 @@ func (r *AtlasDeploymentReconciler) SetupWithManager(mgr ctrl.Manager, skipNameV
).
Watches(
&corev1.Secret{},
handler.EnqueueRequestsFromMapFunc(indexer.CredentialsIndexMapperFunc(
indexer.AtlasDeploymentCredentialsIndex,
&akov2.AtlasDeploymentList{},
indexer.DeploymentRequests,
r.Client,
r.Log,
)),
handler.EnqueueRequestsFromMapFunc(r.deploymentsForCredentialMapFunc()),
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}),
).
WithOptions(controller.TypedOptions[reconcile.Request]{SkipNameValidation: pointer.MakePtr(skipNameValidation)}).
Expand Down Expand Up @@ -595,3 +589,13 @@ func (r *AtlasDeploymentReconciler) findDeploymentsForSearchIndexConfig(ctx cont

return requests
}

func (r *AtlasDeploymentReconciler) deploymentsForCredentialMapFunc() handler.MapFunc {
return indexer.CredentialsIndexMapperFunc(
indexer.AtlasDeploymentCredentialsIndex,
func() *akov2.AtlasDeploymentList { return &akov2.AtlasDeploymentList{} },
indexer.DeploymentRequests,
r.Client,
r.Log,
)
}
3 changes: 2 additions & 1 deletion pkg/indexer/localcredentials.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (lc *LocalCredentialIndexer) Keys(object client.Object) []string {

type requestsFunc[L client.ObjectList] func(L) []reconcile.Request

func CredentialsIndexMapperFunc[L client.ObjectList](indexerName string, list L, reqsFn requestsFunc[L], kubeClient client.Client, logger *zap.SugaredLogger) handler.MapFunc {
func CredentialsIndexMapperFunc[L client.ObjectList](indexerName string, listGenFn func() L, reqsFn requestsFunc[L], kubeClient client.Client, logger *zap.SugaredLogger) handler.MapFunc {
return func(ctx context.Context, obj client.Object) []reconcile.Request {
secret, ok := obj.(*corev1.Secret)
if !ok {
Expand All @@ -71,6 +71,7 @@ func CredentialsIndexMapperFunc[L client.ObjectList](indexerName string, list L,
client.ObjectKeyFromObject(secret).String(),
),
}
list := listGenFn()
err := kubeClient.List(ctx, list, listOpts)
if err != nil {
logger.Errorf("failed to list from indexer %s: %v", indexerName, err)
Expand Down
100 changes: 77 additions & 23 deletions pkg/indexer/localcredentials_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package indexer

import (
"context"
"fmt"
"sort"
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -21,6 +23,10 @@ import (
akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1"
)

const (
testUsername = "matching-user"
)

func TestAtlasDatabaseUserLocalCredentialsIndexer(t *testing.T) {
for _, tc := range []struct {
name string
Expand Down Expand Up @@ -106,27 +112,8 @@ func TestCredentialsIndexMapperFunc(t *testing.T) {
{
name: "matching input credentials renders matching user",
mapperFn: dbUserMapperFunc,
input: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "secret-ref",
Namespace: "ns",
},
},
objects: []client.Object{
&akov2.AtlasDatabaseUser{
ObjectMeta: metav1.ObjectMeta{
Name: "matching-user",
Namespace: "ns",
},
Spec: akov2.AtlasDatabaseUserSpec{
LocalCredentialHolder: api.LocalCredentialHolder{
ConnectionSecret: &api.LocalObjectReference{
Name: "secret-ref",
},
},
},
},
},
input: newTestSecret("matching-user-secret-ref"),
objects: []client.Object{newTestUser("matching-user")},
want: []reconcile.Request{
{NamespacedName: types.NamespacedName{
Name: "matching-user",
Expand Down Expand Up @@ -161,10 +148,77 @@ func TestCredentialsIndexMapperFunc(t *testing.T) {
}
}

func TestCredentialsIndexMapperFuncRace(t *testing.T) {
scheme := runtime.NewScheme()
assert.NoError(t, corev1.AddToScheme(scheme))
assert.NoError(t, akov2.AddToScheme(scheme))
indexer := NewLocalCredentialsIndexer(
AtlasDatabaseUserCredentialsIndex,
&akov2.AtlasDatabaseUser{},
zaptest.NewLogger(t),
)
objs := make([]client.Object, 10)
for i := range objs {
objs[i] = newTestUser(fmt.Sprintf("%s-%d", testUsername, i))
}
fakeClient := fake.NewClientBuilder().
WithScheme(scheme).
WithObjects(objs...).
WithIndex(
&akov2.AtlasDatabaseUser{},
AtlasDatabaseUserCredentialsIndex,
func(obj client.Object) []string {
return indexer.Keys(obj)
}).
Build()
fn := dbUserMapperFunc(fakeClient, zaptest.NewLogger(t).Sugar())
ctx := context.Background()
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
input := newTestSecret(fmt.Sprintf("%s-%d-secret-ref", testUsername, i))
result := fn(ctx, input)
if i < len(objs) {
assert.NotEmpty(t, result, "failed to find for index %d", i)
} else {
assert.Empty(t, result, "failed not to find for index %d", i)
}
}(i)
}
wg.Wait()
}

func newTestUser(username string) *akov2.AtlasDatabaseUser {
return &akov2.AtlasDatabaseUser{
ObjectMeta: metav1.ObjectMeta{
Name: username,
Namespace: "ns",
},
Spec: akov2.AtlasDatabaseUserSpec{
LocalCredentialHolder: api.LocalCredentialHolder{
ConnectionSecret: &api.LocalObjectReference{
Name: fmt.Sprintf("%s-secret-ref", username),
},
},
},
}
}

func newTestSecret(name string) *corev1.Secret {
return &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "ns",
},
}
}

func dbUserMapperFunc(kubeClient client.Client, logger *zap.SugaredLogger) handler.MapFunc {
return CredentialsIndexMapperFunc(
return CredentialsIndexMapperFunc[*akov2.AtlasDatabaseUserList](
AtlasDatabaseUserCredentialsIndex,
&akov2.AtlasDatabaseUserList{},
func() *akov2.AtlasDatabaseUserList { return &akov2.AtlasDatabaseUserList{} },
DatabaseUserRequests,
kubeClient,
logger,
Expand Down

0 comments on commit ad40d6f

Please sign in to comment.