diff --git a/conjure-go-client/httpclient/internal/balanced_scorer.go b/conjure-go-client/httpclient/internal/balanced_scorer.go index cff197eb..90e251c4 100644 --- a/conjure-go-client/httpclient/internal/balanced_scorer.go +++ b/conjure-go-client/httpclient/internal/balanced_scorer.go @@ -15,18 +15,10 @@ package internal import ( - "math" "math/rand" "net/http" - "net/url" "sort" "sync/atomic" - "time" -) - -const ( - failureWeight = 10.0 - failureMemory = 30 * time.Second ) type URIScoringMiddleware interface { @@ -38,11 +30,6 @@ type balancedScorer struct { uriInfos map[string]uriInfo } -type uriInfo struct { - inflight int32 - recentFailures CourseExponentialDecayReservoir -} - // NewBalancedURIScoringMiddleware returns URI scoring middleware that tracks in-flight requests and recent failures // for each URI configured on an HTTP client. URIs are scored based on fewest in-flight requests and recent errors, // where client errors are weighted the same as 1/10 of an in-flight request, server errors are weighted as 10 @@ -50,6 +37,8 @@ type uriInfo struct { // // This implementation is based on Dialogue's BalancedScoreTracker: // https://github.com/palantir/dialogue/blob/develop/dialogue-core/src/main/java/com/palantir/dialogue/core/BalancedScoreTracker.java +// +// Deprecated: Use NewBalancedURISelector func NewBalancedURIScoringMiddleware(uris []string, nanoClock func() int64) URIScoringMiddleware { uriInfos := make(map[string]uriInfo, len(uris)) for _, uri := range uris { @@ -101,29 +90,3 @@ func (u *balancedScorer) RoundTrip(req *http.Request, next http.RoundTripper) (* } return resp, nil } - -func (i *uriInfo) computeScore() int32 { - return atomic.LoadInt32(&i.inflight) + int32(math.Round(i.recentFailures.Get())) -} - -func getBaseURI(u *url.URL) string { - uCopy := url.URL{ - Scheme: u.Scheme, - Opaque: u.Opaque, - User: u.User, - Host: u.Host, - } - return uCopy.String() -} - -func isGlobalQosStatus(statusCode int) bool { - return statusCode == StatusCodeRetryOther || statusCode == StatusCodeUnavailable -} - -func isServerErrorRange(statusCode int) bool { - return statusCode/100 == 5 -} - -func isClientError(statusCode int) bool { - return statusCode/100 == 4 -} diff --git a/conjure-go-client/httpclient/internal/balanced_selector.go b/conjure-go-client/httpclient/internal/balanced_selector.go new file mode 100644 index 00000000..463f3125 --- /dev/null +++ b/conjure-go-client/httpclient/internal/balanced_selector.go @@ -0,0 +1,169 @@ +// Copyright (c) 2022 Palantir Technologies. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "math" + "math/rand" + "net/http" + "net/url" + "sort" + "sync" + "sync/atomic" + "time" + + werror "github.com/palantir/witchcraft-go-error" +) + +const ( + failureWeight = 10.0 + failureMemory = 30 * time.Second +) + +// NewBalancedURISelector returns URI scoring middleware that tracks in-flight requests and recent failures +// for each URI configured on an HTTP client. URIs are scored based on fewest in-flight requests and recent errors, +// where client errors are weighted the same as 1/10 of an in-flight request, server errors are weighted as 10 +// in-flight requests, and errors are decayed using exponential decay with a half-life of 30 seconds. +// +// This implementation is based on Dialogue's BalancedScoreTracker: +// https://github.com/palantir/dialogue/blob/develop/dialogue-core/src/main/java/com/palantir/dialogue/core/BalancedScoreTracker.java +func NewBalancedURISelector(nanoClock func() int64) URISelector { + return &balancedSelector{ + nanoClock: nanoClock, + } +} + +type balancedSelector struct { + sync.Mutex + + nanoClock func() int64 + uriInfos map[string]uriInfo +} + +// Select implements Selector interface +func (s *balancedSelector) Select(uris []string, _ http.Header) ([]string, error) { + s.Lock() + defer s.Unlock() + + s.updateURIs(uris) + return s.next() +} + +func (s *balancedSelector) updateURIs(uris []string) { + uriInfos := make(map[string]uriInfo, len(uris)) + for _, uri := range uris { + if exisiting, ok := s.uriInfos[uri]; ok { + uriInfos[uri] = exisiting + continue + } + uriInfos[uri] = uriInfo{ + recentFailures: NewCourseExponentialDecayReservoir(s.nanoClock, failureMemory), + } + } + + s.uriInfos = uriInfos + return +} + +func (s *balancedSelector) next() ([]string, error) { + if len(s.uriInfos) == 0 { + return nil, werror.Error("no valid connections available") + } + uris := make([]string, 0, len(s.uriInfos)) + scores := make(map[string]int32, len(s.uriInfos)) + for uri, info := range s.uriInfos { + uris = append(uris, uri) + scores[uri] = info.computeScore() + } + // Pre-shuffle to avoid overloading first URI when no request are in-flight + rand.Shuffle(len(uris), func(i, j int) { + uris[i], uris[j] = uris[j], uris[i] + }) + sort.Slice(uris, func(i, j int) bool { + return scores[uris[i]] < scores[uris[j]] + }) + return uris, nil +} + +func (s *balancedSelector) RoundTrip(req *http.Request, next http.RoundTripper) (*http.Response, error) { + baseURI := getBaseURI(req.URL) + s.updateInflight(baseURI, 1) + defer s.updateInflight(baseURI, -1) + + resp, err := next.RoundTrip(req) + errCode, ok := StatusCodeFromError(err) + // fall back to the status code from the response + if !ok && resp != nil { + errCode = resp.StatusCode + } + + if isGlobalQosStatus(errCode) || isServerErrorRange(errCode) { + s.updateRecentFailures(baseURI, failureWeight) + } else if isClientError(errCode) { + s.updateRecentFailures(baseURI, failureWeight/100) + } + return resp, err +} + +func (s *balancedSelector) updateInflight(uri string, score int32) { + s.Lock() + defer s.Unlock() + + info, ok := s.uriInfos[uri] + if ok { + atomic.AddInt32(&info.inflight, score) + } +} + +func (s *balancedSelector) updateRecentFailures(uri string, weight float64) { + s.Lock() + defer s.Unlock() + + info, ok := s.uriInfos[uri] + if ok { + info.recentFailures.Update(weight) + } +} + +type uriInfo struct { + inflight int32 + recentFailures CourseExponentialDecayReservoir +} + +func (i *uriInfo) computeScore() int32 { + return atomic.LoadInt32(&i.inflight) + int32(math.Round(i.recentFailures.Get())) +} + +func getBaseURI(u *url.URL) string { + uCopy := url.URL{ + Scheme: u.Scheme, + Opaque: u.Opaque, + User: u.User, + Host: u.Host, + } + return uCopy.String() +} + +func isGlobalQosStatus(statusCode int) bool { + return statusCode == StatusCodeRetryOther || statusCode == StatusCodeUnavailable +} + +func isServerErrorRange(statusCode int) bool { + return statusCode/100 == 5 +} + +func isClientError(statusCode int) bool { + return statusCode/100 == 4 +} diff --git a/conjure-go-client/httpclient/internal/balanced_selector_test.go b/conjure-go-client/httpclient/internal/balanced_selector_test.go new file mode 100644 index 00000000..5f78b6f3 --- /dev/null +++ b/conjure-go-client/httpclient/internal/balanced_selector_test.go @@ -0,0 +1,68 @@ +// Copyright (c) 2021 Palantir Technologies. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBalancedSelectorRandomizesWithNoneInflight(t *testing.T) { + uris := []string{"uri1", "uri2", "uri3", "uri4", "uri5"} + scorer := NewBalancedURISelector(func() int64 { return 0 }) + scoredURIs, err := scorer.Select(uris, nil) + assert.NoError(t, err) + assert.ElementsMatch(t, uris, scoredURIs) +} + +func TestBalancedSelect(t *testing.T) { + server200 := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.WriteHeader(http.StatusOK) + })) + defer server200.Close() + server429 := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.WriteHeader(http.StatusTooManyRequests) + })) + defer server429.Close() + server503 := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.WriteHeader(http.StatusServiceUnavailable) + })) + defer server503.Close() + uris := []string{server503.URL, server429.URL, server200.URL} + scorer := NewBalancedURISelector(func() int64 { return 0 }) + for _, server := range []*httptest.Server{server200, server429, server503} { + for i := 0; i < 10; i++ { + uris, err := scorer.Select(uris, nil) + assert.NoError(t, err) + req, err := http.NewRequest("GET", uris[0], nil) + assert.NoError(t, err) + + _, err = scorer.RoundTrip(req, server.Client().Transport) + assert.NoError(t, err) + } + } + + uris, err := scorer.Select(uris, nil) + assert.NoError(t, err) + expected := []string{ + server200.URL, + server429.URL, + server503.URL, + } + assert.Equal(t, expected, uris) +}