From 3c8f3471d14e37866c65f73170ef83c038ae5a8c Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Fri, 18 Sep 2020 17:39:25 +0300 Subject: [PATCH] feat: provide a way to configure logger for the loadbalancer It was using `log.Printf` by default which was spamming stdout in Sfyra. Default is still to use same writer as `log` defaults. Signed-off-by: Andrey Smirnov --- loadbalancer/loadbalancer.go | 43 ++++++++++++++++++++++--------- loadbalancer/loadbalancer_test.go | 2 ++ upstream/upstream.go | 8 +++++- 3 files changed, 40 insertions(+), 13 deletions(-) diff --git a/loadbalancer/loadbalancer.go b/loadbalancer/loadbalancer.go index 6f6b7ba..1799203 100644 --- a/loadbalancer/loadbalancer.go +++ b/loadbalancer/loadbalancer.go @@ -27,17 +27,22 @@ import ( type TCP struct { tcpproxy.Proxy + Logger *log.Logger + routes map[string]*upstream.List } -type lbUpstream string +type lbUpstream struct { + upstream string + logger *log.Logger +} func (upstream lbUpstream) HealthCheck(ctx context.Context) error { d := net.Dialer{} - c, err := d.DialContext(ctx, "tcp", string(upstream)) + c, err := d.DialContext(ctx, "tcp", upstream.upstream) if err != nil { - log.Printf("healthcheck failed for %q: %s", string(upstream), err) + upstream.logger.Printf("healthcheck failed for %q: %s", upstream.upstream, err) return err } @@ -46,27 +51,28 @@ func (upstream lbUpstream) HealthCheck(ctx context.Context) error { } type lbTarget struct { - list *upstream.List + list *upstream.List + logger *log.Logger } func (target *lbTarget) HandleConn(conn net.Conn) { upstreamBackend, err := target.list.Pick() if err != nil { - log.Printf("no upstreams available, closing connection from %s", conn.RemoteAddr()) + target.logger.Printf("no upstreams available, closing connection from %s", conn.RemoteAddr()) conn.Close() //nolint: errcheck return } - upstreamAddr := upstreamBackend.(lbUpstream) //nolint: errcheck + upstream := upstreamBackend.(lbUpstream) //nolint: errcheck - log.Printf("proxying connection %s -> %s", conn.RemoteAddr(), string(upstreamAddr)) + target.logger.Printf("proxying connection %s -> %s", conn.RemoteAddr(), upstream.upstream) - upstreamTarget := tcpproxy.To(string(upstreamAddr)) + upstreamTarget := tcpproxy.To(upstream.upstream) upstreamTarget.OnDialError = func(src net.Conn, dstDialErr error) { src.Close() //nolint: errcheck - log.Printf("error dialing upstream %s: %s", string(upstreamAddr), dstDialErr) + target.logger.Printf("error dialing upstream %s: %s", upstream.upstream, dstDialErr) target.list.Down(upstreamBackend) } @@ -79,13 +85,20 @@ func (target *lbTarget) HandleConn(conn net.Conn) { // TCP automatically does background health checks for the upstreams and picks only healthy // ones. Healthcheck is simple Dial attempt. func (t *TCP) AddRoute(ipPort string, upstreamAddrs []string, options ...upstream.ListOption) error { + if t.Logger == nil { + t.Logger = log.New(log.Writer(), "", log.Flags()) + } + if t.routes == nil { t.routes = make(map[string]*upstream.List) } upstreams := make([]upstream.Backend, len(upstreamAddrs)) for i := range upstreams { - upstreams[i] = lbUpstream(upstreamAddrs[i]) + upstreams[i] = lbUpstream{ + upstream: upstreamAddrs[i], + logger: t.Logger, + } } list, err := upstream.NewList(upstreams, options...) @@ -95,7 +108,10 @@ func (t *TCP) AddRoute(ipPort string, upstreamAddrs []string, options ...upstrea t.routes[ipPort] = list - t.Proxy.AddRoute(ipPort, &lbTarget{list: list}) + t.Proxy.AddRoute(ipPort, &lbTarget{ + list: list, + logger: t.Logger, + }) return nil } @@ -113,7 +129,10 @@ func (t *TCP) ReconcileRoute(ipPort string, upstreamAddrs []string) error { upstreams := make([]upstream.Backend, len(upstreamAddrs)) for i := range upstreams { - upstreams[i] = lbUpstream(upstreamAddrs[i]) + upstreams[i] = lbUpstream{ + upstream: upstreamAddrs[i], + logger: t.Logger, + } } list.Reconcile(upstreams) diff --git a/loadbalancer/loadbalancer_test.go b/loadbalancer/loadbalancer_test.go index 606c7e3..67aa775 100644 --- a/loadbalancer/loadbalancer_test.go +++ b/loadbalancer/loadbalancer_test.go @@ -149,6 +149,8 @@ func (suite *TCPSuite) TestReconcile() { no, err := strconv.ParseInt(string(id), 10, 32) suite.Require().NoError(err) + suite.Assert().EqualValues(no, pivot+(i+pivot)%(upstreamCount-pivot)) + suite.Assert().Less(no, int64(upstreamCount)) suite.Assert().GreaterOrEqual(no, int64(pivot)) upstreamsUsed[no]++ diff --git a/upstream/upstream.go b/upstream/upstream.go index 7d8e773..8b2531e 100644 --- a/upstream/upstream.go +++ b/upstream/upstream.go @@ -184,7 +184,13 @@ func (list *List) Reconcile(upstreams []Backend) { i-- } - for upstream := range newUpstreams { + // make insert order predictable by going over the list once again, + // as iterating over the map might lead to unpredictable order + for _, upstream := range upstreams { + if _, ok := newUpstreams[upstream]; !ok { + continue + } + list.nodes = append(list.nodes, node{ backend: upstream, score: list.initialScore,