Skip to content

Commit

Permalink
Update manual resolver creation
Browse files Browse the repository at this point in the history
  • Loading branch information
bananacocodrilo committed Sep 30, 2024
1 parent b92cd78 commit 8e55004
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 5 deletions.
20 changes: 15 additions & 5 deletions protobuf/source_reflection.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package protobuf
import (
"context"
"fmt"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -32,15 +31,14 @@ type ReflectionArgs struct {
// NewDescriptorProviderReflection returns a DescriptorProvider that reaches
// out to a reflection server to access file descriptors.
func NewDescriptorProviderReflection(args ReflectionArgs) (DescriptorProvider, error) {
r := GenerateAndRegisterManualResolver()
peers := make([]resolver.Address, len(args.Peers))
for i, p := range args.Peers {
if strings.Contains(p, "://") {
return nil, fmt.Errorf("peer contains scheme %q", p)
}
peers[i] = resolver.Address{Addr: p, Type: resolver.Backend}
}
r.InitialState(resolver.State{Addresses: peers})
r := GetOrGenerateAndRegisterManualResolver(args.Service, peers)

conn, err := grpc.DialContext(context.Background(),
r.Scheme()+":///", // minimal target to dial registered host:port pairs
Expand Down Expand Up @@ -126,9 +124,21 @@ func wrapReflectionError(err error) error {
return fmt.Errorf("error in protobuf reflection: %v", err)
}

func GenerateAndRegisterManualResolver() *manual.Resolver {
scheme := strconv.FormatInt(time.Now().UnixNano(), 36)
func GetOrGenerateAndRegisterManualResolver(service string, peers []resolver.Address) *manual.Resolver {
scheme := "dest-" + service
newState := resolver.State{Addresses: peers}

rb := resolver.Get(scheme)
if rb != nil {
if r, ok := rb.(*manual.Resolver); ok {
r.InitialState(newState)
return r
}
}

r := manual.NewBuilderWithScheme(scheme)
resolver.Register(r)
r.InitialState(newState)

return r
}
39 changes: 39 additions & 0 deletions protobuf/source_reflection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,44 @@ func TestReflectionRoutingHeaders(t *testing.T) {
}
}

func TestResolverAlreadyExists(t *testing.T) {
ln, err := net.Listen("tcp", "localhost:0")
ln2, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
require.NoError(t, err)

defer ln.Close()
defer ln2.Close()

s := grpc.NewServer()
reflection.Register(s)

go s.Serve(ln)
go s.Serve(ln2)

// Ensure that all streams are closed by the end of the test.
defer s.GracefulStop()

provider, err := NewDescriptorProviderReflection(ReflectionArgs{
Timeout: time.Second,
Peers: []string{ln.Addr().String()},
Service: "test",
})
require.NoError(t, err, "failed to create reflection provider")
_, err = provider.FindService("grpc.reflection.v1alpha.ServerReflection")
assert.NoError(t, err, "unexpected error")

provider, err = NewDescriptorProviderReflection(ReflectionArgs{
Timeout: time.Second,
Peers: []string{ln2.Addr().String()},
Service: "test",
})
require.NoError(t, err, "failed to create reflection provider")
_, err = provider.FindService("grpc.reflection.v1alpha.ServerReflection")
assert.NoError(t, err, "unexpected error")

}

func TestE2eErrors(t *testing.T) {
ln, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
Expand All @@ -212,6 +250,7 @@ func TestE2eErrors(t *testing.T) {
source, err := NewDescriptorProviderReflection(ReflectionArgs{
Timeout: time.Second,
Peers: []string{ln.Addr().String()},
Service: "TestE2eErrors",
})
require.NoError(t, err)
defer source.Close()
Expand Down

0 comments on commit 8e55004

Please sign in to comment.