From b92cd78e848a3bf4b165bb31c2ae677641e8253f Mon Sep 17 00:00:00 2001 From: bananacocodrilo Date: Tue, 24 Sep 2024 12:25:43 +0200 Subject: [PATCH 1/3] remove resolver.UnregisterForTesting usage --- protobuf/source_reflection.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/protobuf/source_reflection.go b/protobuf/source_reflection.go index 5e71092..287a25f 100644 --- a/protobuf/source_reflection.go +++ b/protobuf/source_reflection.go @@ -32,8 +32,7 @@ type ReflectionArgs struct { // NewDescriptorProviderReflection returns a DescriptorProvider that reaches // out to a reflection server to access file descriptors. func NewDescriptorProviderReflection(args ReflectionArgs) (DescriptorProvider, error) { - r, deregisterScheme := GenerateAndRegisterManualResolver() - defer deregisterScheme() + r := GenerateAndRegisterManualResolver() peers := make([]resolver.Address, len(args.Peers)) for i, p := range args.Peers { if strings.Contains(p, "://") { @@ -127,9 +126,9 @@ func wrapReflectionError(err error) error { return fmt.Errorf("error in protobuf reflection: %v", err) } -func GenerateAndRegisterManualResolver() (*manual.Resolver, func()) { +func GenerateAndRegisterManualResolver() *manual.Resolver { scheme := strconv.FormatInt(time.Now().UnixNano(), 36) r := manual.NewBuilderWithScheme(scheme) resolver.Register(r) - return r, func() { resolver.UnregisterForTesting(scheme) } + return r } From 8e5500490f6ceb826437cd1f3865894bd253cd84 Mon Sep 17 00:00:00 2001 From: bananacocodrilo Date: Mon, 30 Sep 2024 17:35:00 +0200 Subject: [PATCH 2/3] Update manual resolver creation --- protobuf/source_reflection.go | 20 +++++++++++---- protobuf/source_reflection_test.go | 39 ++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 5 deletions(-) diff --git a/protobuf/source_reflection.go b/protobuf/source_reflection.go index 287a25f..020bf4a 100644 --- a/protobuf/source_reflection.go +++ b/protobuf/source_reflection.go @@ -3,7 +3,6 @@ package protobuf import ( "context" "fmt" - "strconv" "strings" "time" @@ -32,7 +31,6 @@ type ReflectionArgs struct { // NewDescriptorProviderReflection returns a DescriptorProvider that reaches // out to a reflection server to access file descriptors. func NewDescriptorProviderReflection(args ReflectionArgs) (DescriptorProvider, error) { - r := GenerateAndRegisterManualResolver() peers := make([]resolver.Address, len(args.Peers)) for i, p := range args.Peers { if strings.Contains(p, "://") { @@ -40,7 +38,7 @@ func NewDescriptorProviderReflection(args ReflectionArgs) (DescriptorProvider, e } peers[i] = resolver.Address{Addr: p, Type: resolver.Backend} } - r.InitialState(resolver.State{Addresses: peers}) + r := GetOrGenerateAndRegisterManualResolver(args.Service, peers) conn, err := grpc.DialContext(context.Background(), r.Scheme()+":///", // minimal target to dial registered host:port pairs @@ -126,9 +124,21 @@ func wrapReflectionError(err error) error { return fmt.Errorf("error in protobuf reflection: %v", err) } -func GenerateAndRegisterManualResolver() *manual.Resolver { - scheme := strconv.FormatInt(time.Now().UnixNano(), 36) +func GetOrGenerateAndRegisterManualResolver(service string, peers []resolver.Address) *manual.Resolver { + scheme := "dest-" + service + newState := resolver.State{Addresses: peers} + + rb := resolver.Get(scheme) + if rb != nil { + if r, ok := rb.(*manual.Resolver); ok { + r.InitialState(newState) + return r + } + } + r := manual.NewBuilderWithScheme(scheme) resolver.Register(r) + r.InitialState(newState) + return r } diff --git a/protobuf/source_reflection_test.go b/protobuf/source_reflection_test.go index 141be5d..59d6c33 100644 --- a/protobuf/source_reflection_test.go +++ b/protobuf/source_reflection_test.go @@ -196,6 +196,44 @@ func TestReflectionRoutingHeaders(t *testing.T) { } } +func TestResolverAlreadyExists(t *testing.T) { + ln, err := net.Listen("tcp", "localhost:0") + ln2, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + require.NoError(t, err) + + defer ln.Close() + defer ln2.Close() + + s := grpc.NewServer() + reflection.Register(s) + + go s.Serve(ln) + go s.Serve(ln2) + + // Ensure that all streams are closed by the end of the test. + defer s.GracefulStop() + + provider, err := NewDescriptorProviderReflection(ReflectionArgs{ + Timeout: time.Second, + Peers: []string{ln.Addr().String()}, + Service: "test", + }) + require.NoError(t, err, "failed to create reflection provider") + _, err = provider.FindService("grpc.reflection.v1alpha.ServerReflection") + assert.NoError(t, err, "unexpected error") + + provider, err = NewDescriptorProviderReflection(ReflectionArgs{ + Timeout: time.Second, + Peers: []string{ln2.Addr().String()}, + Service: "test", + }) + require.NoError(t, err, "failed to create reflection provider") + _, err = provider.FindService("grpc.reflection.v1alpha.ServerReflection") + assert.NoError(t, err, "unexpected error") + +} + func TestE2eErrors(t *testing.T) { ln, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) @@ -212,6 +250,7 @@ func TestE2eErrors(t *testing.T) { source, err := NewDescriptorProviderReflection(ReflectionArgs{ Timeout: time.Second, Peers: []string{ln.Addr().String()}, + Service: "TestE2eErrors", }) require.NoError(t, err) defer source.Close() From 2a6e6399020a47df34c8686142050136176e5ac6 Mon Sep 17 00:00:00 2001 From: bananacocodrilo Date: Tue, 1 Oct 2024 13:52:15 +0200 Subject: [PATCH 3/3] add mutex for resolver --- protobuf/source_reflection.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/protobuf/source_reflection.go b/protobuf/source_reflection.go index 020bf4a..9ae0e4a 100644 --- a/protobuf/source_reflection.go +++ b/protobuf/source_reflection.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "sync" "time" "github.com/jhump/protoreflect/desc" @@ -18,6 +19,8 @@ import ( "google.golang.org/grpc/resolver/manual" ) +var resolverMutex sync.Mutex + // ReflectionArgs are args for constructing a DescriptorProvider that reaches out to a reflection server. type ReflectionArgs struct { Caller string @@ -38,6 +41,7 @@ func NewDescriptorProviderReflection(args ReflectionArgs) (DescriptorProvider, e } peers[i] = resolver.Address{Addr: p, Type: resolver.Backend} } + resolverMutex.Lock() r := GetOrGenerateAndRegisterManualResolver(args.Service, peers) conn, err := grpc.DialContext(context.Background(), @@ -45,6 +49,9 @@ func NewDescriptorProviderReflection(args ReflectionArgs) (DescriptorProvider, e grpc.WithTimeout(args.Timeout), grpc.WithBlock(), grpc.WithInsecure()) + + resolverMutex.Unlock() + if err != nil { return nil, fmt.Errorf("could not reach reflection server: %s", err) }