Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improvement: Implement balanced scorer as URISelector #350

Draft
wants to merge 1 commit into
base: dt/uri-retry-middleware
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 2 additions & 39 deletions conjure-go-client/httpclient/internal/balanced_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,10 @@
package internal

import (
"math"
"math/rand"
"net/http"
"net/url"
"sort"
"sync/atomic"
"time"
)

const (
failureWeight = 10.0
failureMemory = 30 * time.Second
)

type URIScoringMiddleware interface {
Expand All @@ -38,18 +30,15 @@ type balancedScorer struct {
uriInfos map[string]uriInfo
}

type uriInfo struct {
inflight int32
recentFailures CourseExponentialDecayReservoir
}

// NewBalancedURIScoringMiddleware returns URI scoring middleware that tracks in-flight requests and recent failures
// for each URI configured on an HTTP client. URIs are scored based on fewest in-flight requests and recent errors,
// where client errors are weighted the same as 1/10 of an in-flight request, server errors are weighted as 10
// in-flight requests, and errors are decayed using exponential decay with a half-life of 30 seconds.
//
// This implementation is based on Dialogue's BalancedScoreTracker:
// https://github.com/palantir/dialogue/blob/develop/dialogue-core/src/main/java/com/palantir/dialogue/core/BalancedScoreTracker.java
//
// Deprecated: Use NewBalancedURISelector
func NewBalancedURIScoringMiddleware(uris []string, nanoClock func() int64) URIScoringMiddleware {
uriInfos := make(map[string]uriInfo, len(uris))
for _, uri := range uris {
Expand Down Expand Up @@ -101,29 +90,3 @@ func (u *balancedScorer) RoundTrip(req *http.Request, next http.RoundTripper) (*
}
return resp, nil
}

func (i *uriInfo) computeScore() int32 {
return atomic.LoadInt32(&i.inflight) + int32(math.Round(i.recentFailures.Get()))
}

func getBaseURI(u *url.URL) string {
uCopy := url.URL{
Scheme: u.Scheme,
Opaque: u.Opaque,
User: u.User,
Host: u.Host,
}
return uCopy.String()
}

func isGlobalQosStatus(statusCode int) bool {
return statusCode == StatusCodeRetryOther || statusCode == StatusCodeUnavailable
}

func isServerErrorRange(statusCode int) bool {
return statusCode/100 == 5
}

func isClientError(statusCode int) bool {
return statusCode/100 == 4
}
169 changes: 169 additions & 0 deletions conjure-go-client/httpclient/internal/balanced_selector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright (c) 2022 Palantir Technologies. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"math"
"math/rand"
"net/http"
"net/url"
"sort"
"sync"
"sync/atomic"
"time"

werror "github.com/palantir/witchcraft-go-error"
)

const (
failureWeight = 10.0
failureMemory = 30 * time.Second
)

// NewBalancedURISelector returns URI scoring middleware that tracks in-flight requests and recent failures
// for each URI configured on an HTTP client. URIs are scored based on fewest in-flight requests and recent errors,
// where client errors are weighted the same as 1/10 of an in-flight request, server errors are weighted as 10
// in-flight requests, and errors are decayed using exponential decay with a half-life of 30 seconds.
//
// This implementation is based on Dialogue's BalancedScoreTracker:
// https://github.com/palantir/dialogue/blob/develop/dialogue-core/src/main/java/com/palantir/dialogue/core/BalancedScoreTracker.java
func NewBalancedURISelector(nanoClock func() int64) URISelector {
return &balancedSelector{
nanoClock: nanoClock,
}
}

type balancedSelector struct {
sync.Mutex

nanoClock func() int64
uriInfos map[string]uriInfo
}

// Select implements Selector interface
func (s *balancedSelector) Select(uris []string, _ http.Header) ([]string, error) {
s.Lock()
defer s.Unlock()

s.updateURIs(uris)
return s.next()
}

func (s *balancedSelector) updateURIs(uris []string) {
uriInfos := make(map[string]uriInfo, len(uris))
for _, uri := range uris {
if exisiting, ok := s.uriInfos[uri]; ok {
uriInfos[uri] = exisiting
continue
}
uriInfos[uri] = uriInfo{
recentFailures: NewCourseExponentialDecayReservoir(s.nanoClock, failureMemory),
}
}

s.uriInfos = uriInfos
return
}

func (s *balancedSelector) next() ([]string, error) {
if len(s.uriInfos) == 0 {
return nil, werror.Error("no valid connections available")
}
uris := make([]string, 0, len(s.uriInfos))
scores := make(map[string]int32, len(s.uriInfos))
for uri, info := range s.uriInfos {
uris = append(uris, uri)
scores[uri] = info.computeScore()
}
// Pre-shuffle to avoid overloading first URI when no request are in-flight
rand.Shuffle(len(uris), func(i, j int) {
uris[i], uris[j] = uris[j], uris[i]
})
sort.Slice(uris, func(i, j int) bool {
return scores[uris[i]] < scores[uris[j]]
})
return uris, nil
}

func (s *balancedSelector) RoundTrip(req *http.Request, next http.RoundTripper) (*http.Response, error) {
baseURI := getBaseURI(req.URL)
s.updateInflight(baseURI, 1)
defer s.updateInflight(baseURI, -1)

resp, err := next.RoundTrip(req)
errCode, ok := StatusCodeFromError(err)
// fall back to the status code from the response
if !ok && resp != nil {
errCode = resp.StatusCode
}

if isGlobalQosStatus(errCode) || isServerErrorRange(errCode) {
s.updateRecentFailures(baseURI, failureWeight)
} else if isClientError(errCode) {
s.updateRecentFailures(baseURI, failureWeight/100)
}
return resp, err
}

func (s *balancedSelector) updateInflight(uri string, score int32) {
s.Lock()
defer s.Unlock()

info, ok := s.uriInfos[uri]
if ok {
atomic.AddInt32(&info.inflight, score)
}
}

func (s *balancedSelector) updateRecentFailures(uri string, weight float64) {
s.Lock()
defer s.Unlock()

info, ok := s.uriInfos[uri]
if ok {
info.recentFailures.Update(weight)
}
}

type uriInfo struct {
inflight int32
recentFailures CourseExponentialDecayReservoir
}

func (i *uriInfo) computeScore() int32 {
return atomic.LoadInt32(&i.inflight) + int32(math.Round(i.recentFailures.Get()))
}

func getBaseURI(u *url.URL) string {
uCopy := url.URL{
Scheme: u.Scheme,
Opaque: u.Opaque,
User: u.User,
Host: u.Host,
}
return uCopy.String()
}

func isGlobalQosStatus(statusCode int) bool {
return statusCode == StatusCodeRetryOther || statusCode == StatusCodeUnavailable
}

func isServerErrorRange(statusCode int) bool {
return statusCode/100 == 5
}

func isClientError(statusCode int) bool {
return statusCode/100 == 4
}
68 changes: 68 additions & 0 deletions conjure-go-client/httpclient/internal/balanced_selector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) 2021 Palantir Technologies. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
)

func TestBalancedSelectorRandomizesWithNoneInflight(t *testing.T) {
uris := []string{"uri1", "uri2", "uri3", "uri4", "uri5"}
scorer := NewBalancedURISelector(func() int64 { return 0 })
scoredURIs, err := scorer.Select(uris, nil)
assert.NoError(t, err)
assert.ElementsMatch(t, uris, scoredURIs)
}

func TestBalancedSelect(t *testing.T) {
server200 := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusOK)
}))
defer server200.Close()
server429 := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusTooManyRequests)
}))
defer server429.Close()
server503 := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusServiceUnavailable)
}))
defer server503.Close()
uris := []string{server503.URL, server429.URL, server200.URL}
scorer := NewBalancedURISelector(func() int64 { return 0 })
for _, server := range []*httptest.Server{server200, server429, server503} {
for i := 0; i < 10; i++ {
uris, err := scorer.Select(uris, nil)
assert.NoError(t, err)
req, err := http.NewRequest("GET", uris[0], nil)
assert.NoError(t, err)

_, err = scorer.RoundTrip(req, server.Client().Transport)
assert.NoError(t, err)
}
}

uris, err := scorer.Select(uris, nil)
assert.NoError(t, err)
expected := []string{
server200.URL,
server429.URL,
server503.URL,
}
assert.Equal(t, expected, uris)
}