Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improvement: Introduce new URIPool and URISelector interfaces. #341

Draft
wants to merge 9 commits into
base: develop
Choose a base branch
from
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-341.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: improvement
improvement:
description: Introduce new URIPool and URISelector interfaces.
links:
- https://github.com/palantir/conjure-go-runtime/pull/341
55 changes: 27 additions & 28 deletions conjure-go-client/httpclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ type clientImpl struct {
errorDecoderMiddleware Middleware
recoveryMiddleware Middleware

uriScorer internal.RefreshableURIScoringMiddleware
uriPool internal.URIPool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

URIPool is new functionality. I think the behavior can be captured behind a URISelector implementation so prefer to just use the refreshable slice of URIs directly for now.

uriSelector internal.URISelector
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These interfaces exist for the lifetime of the client - we don't believe we need to manage state on a request-specific basis, so not optimizing the interface for that. This works well for state managed over the liftime of the client and URI strategies that are stateless (i.e. rendezvous hashing).

maxAttempts refreshable.IntPtr // 0 means no limit. If nil, uses 2*len(uris).
backoffOptions refreshingclient.RefreshableRetryParams
bufferPool bytesbuffers.Pool
Expand All @@ -82,45 +83,34 @@ func (c *clientImpl) Delete(ctx context.Context, params ...RequestParam) (*http.
}

func (c *clientImpl) Do(ctx context.Context, params ...RequestParam) (*http.Response, error) {
uris := c.uriScorer.CurrentURIScoringMiddleware().GetURIsInOrderOfIncreasingScore()
if len(uris) == 0 {
return nil, werror.ErrorWithContextParams(ctx, "no base URIs are configured")
}

attempts := 2 * len(uris)
attempts := 2 * c.uriPool.NumURIs()
if c.maxAttempts != nil {
if confMaxAttempts := c.maxAttempts.CurrentIntPtr(); confMaxAttempts != nil {
attempts = *confMaxAttempts
}
}

var err error
var resp *http.Response

retrier := internal.NewRequestRetrier(uris, c.backoffOptions.CurrentRetryParams().Start(ctx), attempts)
var err error
retrier := internal.NewRequestRetrier(c.backoffOptions.CurrentRetryParams().Start(ctx), attempts)
for {
uri, isRelocated := retrier.GetNextURI(resp, err)
if uri == "" {
shouldRetry, retryURL := retrier.Next(resp, err)
if !shouldRetry {
break
}
resp, err = c.doOnce(ctx, retryURL, params...)
if err != nil {
svc1log.FromContext(ctx).Debug("Retrying request", svc1log.Stacktrace(err))
}
resp, err = c.doOnce(ctx, uri, isRelocated, params...)
}
if err != nil {
return nil, err
}
return resp, nil
return resp, err
}

func (c *clientImpl) doOnce(
ctx context.Context,
baseURI string,
useBaseURIOnly bool,
retryURL *url.URL,
params ...RequestParam,
) (*http.Response, error) {

// 1. create the request
b := &requestBuilder{
headers: make(http.Header),
Expand All @@ -136,9 +126,6 @@ func (c *clientImpl) doOnce(
return nil, err
}
}
if useBaseURIOnly {
b.path = ""
}

for _, c := range b.configureCtx {
ctx = c(ctx)
Expand All @@ -147,12 +134,23 @@ func (c *clientImpl) doOnce(
if b.method == "" {
return nil, werror.ErrorWithContextParams(ctx, "httpclient: use WithRequestMethod() to specify HTTP method")
}
reqURI := joinURIAndPath(baseURI, b.path)
req, err := http.NewRequest(b.method, reqURI, nil)
var uri string
if retryURL == nil {
var err error
uri, err = c.uriSelector.Select(c.uriPool.URIs(), b.headers)
if err != nil {
return nil, werror.WrapWithContextParams(ctx, err, "failed to select uri")
}
uri = joinURIAndPath(uri, b.path)
} else {
b.path = ""
uri = retryURL.String()
}
req, err := http.NewRequestWithContext(ctx, b.method, uri, nil)
if err != nil {
return nil, werror.WrapWithContextParams(ctx, err, "failed to build new HTTP request")
return nil, werror.WrapWithContextParams(ctx, err, "failed to build request")
}
req = req.WithContext(ctx)

req.Header = b.headers
if q := b.query.Encode(); q != "" {
req.URL.RawQuery = q
Expand All @@ -164,7 +162,8 @@ func (c *clientImpl) doOnce(
transport := clientCopy.Transport // start with the client's transport configured with default middleware

// must precede the error decoders to read the status code of the raw response.
transport = wrapTransport(transport, c.uriScorer.CurrentURIScoringMiddleware())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scoring URI middleware was moved into clientImpl.middleware instantiated when the client is built here:

// append uriSelector and uriPool middlewares
middleware = append(middleware, uriPool, b.URISelector)

transport = wrapTransport(transport, c.uriSelector)
transport = wrapTransport(transport, c.uriPool)
// request decoder must precede the client decoder
// must precede the body middleware to read the response body
transport = wrapTransport(transport, b.errorDecoderMiddleware, c.errorDecoderMiddleware)
Expand Down
17 changes: 8 additions & 9 deletions conjure-go-client/httpclient/client_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ const (
type clientBuilder struct {
HTTP *httpClientBuilder

URIs refreshable.StringSlice
URIScorerBuilder func([]string) internal.URIScoringMiddleware
URIs refreshable.StringSlice
URISelector internal.URISelector

ErrorDecoder ErrorDecoder

Expand Down Expand Up @@ -153,15 +153,14 @@ func newClient(ctx context.Context, b *clientBuilder, params ...ClientParam) (Cl
if !b.HTTP.DisableRecovery {
recovery = recoveryMiddleware{}
}
uriScorer := internal.NewRefreshableURIScoringMiddleware(b.URIs, func(uris []string) internal.URIScoringMiddleware {
if b.URIScorerBuilder == nil {
return internal.NewRandomURIScoringMiddleware(uris, func() int64 { return time.Now().UnixNano() })
}
return b.URIScorerBuilder(uris)
})
uriPool := internal.NewStatefulURIPool(b.URIs)
if b.URISelector == nil {
b.URISelector = internal.NewRoundRobinURISelector(func() int64 { return time.Now().UnixNano() })
}
return &clientImpl{
client: httpClient,
uriScorer: uriScorer,
uriPool: uriPool,
uriSelector: b.URISelector,
maxAttempts: b.MaxAttempts,
backoffOptions: b.RetryParams,
middlewares: middleware,
Expand Down
6 changes: 1 addition & 5 deletions conjure-go-client/httpclient/client_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,11 +537,7 @@ func WithBasicAuth(username, password string) ClientParam {
// and least recent errors.
func WithBalancedURIScoring() ClientParam {
return clientParamFunc(func(b *clientBuilder) error {
b.URIScorerBuilder = func(uris []string) internal.URIScoringMiddleware {
return internal.NewBalancedURIScoringMiddleware(uris, func() int64 {
return time.Now().UnixNano()
})
}
b.URISelector = internal.NewBalancedURISelector(func() int64 { return time.Now().UnixNano() })
return nil
})
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021 Palantir Technologies. All rights reserved.
// Copyright (c) 2022 Palantir Technologies. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -20,50 +20,70 @@ import (
"net/http"
"net/url"
"sort"
"sync"
"sync/atomic"
"time"

werror "github.com/palantir/witchcraft-go-error"
)

const (
failureWeight = 10.0
failureMemory = 30 * time.Second
)

type URIScoringMiddleware interface {
GetURIsInOrderOfIncreasingScore() []string
RoundTrip(req *http.Request, next http.RoundTripper) (*http.Response, error)
}

type balancedScorer struct {
uriInfos map[string]uriInfo
}

type uriInfo struct {
inflight int32
recentFailures CourseExponentialDecayReservoir
}

// NewBalancedURIScoringMiddleware returns URI scoring middleware that tracks in-flight requests and recent failures
// NewBalancedURISelector returns URI scoring middleware that tracks in-flight requests and recent failures
// for each URI configured on an HTTP client. URIs are scored based on fewest in-flight requests and recent errors,
// where client errors are weighted the same as 1/10 of an in-flight request, server errors are weighted as 10
// in-flight requests, and errors are decayed using exponential decay with a half-life of 30 seconds.
//
// This implementation is based on Dialogue's BalancedScoreTracker:
// https://github.com/palantir/dialogue/blob/develop/dialogue-core/src/main/java/com/palantir/dialogue/core/BalancedScoreTracker.java
func NewBalancedURIScoringMiddleware(uris []string, nanoClock func() int64) URIScoringMiddleware {
func NewBalancedURISelector(nanoClock func() int64) URISelector {
return &balancedSelector{
nanoClock: nanoClock,
}
}

type balancedSelector struct {
sync.Mutex

nanoClock func() int64
uriInfos map[string]uriInfo
}

// Select implements Selector interface
func (s *balancedSelector) Select(uris []string, _ http.Header) (string, error) {
s.Lock()
defer s.Unlock()

s.updateURIs(uris)
return s.next()
}

func (s *balancedSelector) updateURIs(uris []string) {
uriInfos := make(map[string]uriInfo, len(uris))
for _, uri := range uris {
if exisiting, ok := s.uriInfos[uri]; ok {
uriInfos[uri] = exisiting
continue
}
uriInfos[uri] = uriInfo{
recentFailures: NewCourseExponentialDecayReservoir(nanoClock, failureMemory),
recentFailures: NewCourseExponentialDecayReservoir(s.nanoClock, failureMemory),
}
}
return &balancedScorer{uriInfos}

s.uriInfos = uriInfos
return
}

func (u *balancedScorer) GetURIsInOrderOfIncreasingScore() []string {
uris := make([]string, 0, len(u.uriInfos))
scores := make(map[string]int32, len(u.uriInfos))
for uri, info := range u.uriInfos {
func (s *balancedSelector) next() (string, error) {
if len(s.uriInfos) == 0 {
return "", werror.Error("no valid connections available")
}
uris := make([]string, 0, len(s.uriInfos))
scores := make(map[string]int32, len(s.uriInfos))
for uri, info := range s.uriInfos {
uris = append(uris, uri)
scores[uri] = info.computeScore()
}
Expand All @@ -74,34 +94,53 @@ func (u *balancedScorer) GetURIsInOrderOfIncreasingScore() []string {
sort.Slice(uris, func(i, j int) bool {
return scores[uris[i]] < scores[uris[j]]
})
return uris
return uris[0], nil
}

func (u *balancedScorer) RoundTrip(req *http.Request, next http.RoundTripper) (*http.Response, error) {
func (s *balancedSelector) RoundTrip(req *http.Request, next http.RoundTripper) (*http.Response, error) {
baseURI := getBaseURI(req.URL)
info, foundInfo := u.uriInfos[baseURI]
if foundInfo {
atomic.AddInt32(&info.inflight, 1)
defer atomic.AddInt32(&info.inflight, -1)
}
s.updateInflight(baseURI, 1)
defer s.updateInflight(baseURI, -1)

resp, err := next.RoundTrip(req)
if resp == nil || err != nil {
if foundInfo {
info.recentFailures.Update(failureWeight)
}
s.updateRecentFailures(baseURI, failureWeight)
return nil, err
}
if foundInfo {
statusCode := resp.StatusCode
if isGlobalQosStatus(statusCode) || isServerErrorRange(statusCode) {
info.recentFailures.Update(failureWeight)
} else if isClientError(statusCode) {
info.recentFailures.Update(failureWeight / 100)
}
statusCode := resp.StatusCode
if isGlobalQosStatus(statusCode) || isServerErrorRange(statusCode) {
s.updateRecentFailures(baseURI, failureWeight)
} else if isClientError(statusCode) {
s.updateRecentFailures(baseURI, failureWeight/100)
}
return resp, nil
}

func (s *balancedSelector) updateInflight(uri string, score int32) {
s.Lock()
defer s.Unlock()

info, ok := s.uriInfos[uri]
if ok {
atomic.AddInt32(&info.inflight, score)
}
}

func (s *balancedSelector) updateRecentFailures(uri string, weight float64) {
s.Lock()
defer s.Unlock()

info, ok := s.uriInfos[uri]
if ok {
info.recentFailures.Update(weight)
}
}

type uriInfo struct {
inflight int32
recentFailures CourseExponentialDecayReservoir
}

func (i *uriInfo) computeScore() int32 {
return atomic.LoadInt32(&i.inflight) + int32(math.Round(i.recentFailures.Get()))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@ package internal
import (
"net/http"
"net/http/httptest"
"net/url"
"testing"

"github.com/stretchr/testify/assert"
)

func TestBalancedScorerRandomizesWithNoneInflight(t *testing.T) {
func TestBalancedSelectorRandomizesWithNoneInflight(t *testing.T) {
uris := []string{"uri1", "uri2", "uri3", "uri4", "uri5"}
scorer := NewBalancedURIScoringMiddleware(uris, func() int64 { return 0 })
scoredUris := scorer.GetURIsInOrderOfIncreasingScore()
assert.ElementsMatch(t, scoredUris, uris)
assert.NotEqual(t, scoredUris, uris)
scorer := NewBalancedURISelector(func() int64 { return 0 })
scoredURI, err := scorer.Select(uris, nil)
assert.NoError(t, err)
assert.Contains(t, uris, scoredURI)
}

func TestBalancedScoring(t *testing.T) {
func TestBalancedSelect(t *testing.T) {
server200 := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusOK)
}))
Expand All @@ -44,15 +45,23 @@ func TestBalancedScoring(t *testing.T) {
}))
defer server503.Close()
uris := []string{server503.URL, server429.URL, server200.URL}
scorer := NewBalancedURIScoringMiddleware(uris, func() int64 { return 0 })
scorer := NewBalancedURISelector(func() int64 { return 0 })
for _, server := range []*httptest.Server{server200, server429, server503} {
for i := 0; i < 10; i++ {
req, err := http.NewRequest("GET", server.URL, nil)
uri, err := scorer.Select(uris, nil)
assert.NoError(t, err)
req, err := http.NewRequest("GET", uri, nil)
assert.NoError(t, err)

url, err := url.Parse(uri)
assert.NoError(t, err)
req.URL = url
_, err = scorer.RoundTrip(req, server.Client().Transport)
assert.NoError(t, err)
}
}
scoredUris := scorer.GetURIsInOrderOfIncreasingScore()
assert.Equal(t, []string{server200.URL, server429.URL, server503.URL}, scoredUris)

uri, err := scorer.Select(uris, nil)
assert.NoError(t, err)
assert.Equal(t, server200.URL, uri)
}
Loading