diff --git a/go.mod b/go.mod index 00ff6df..ca4e783 100644 --- a/go.mod +++ b/go.mod @@ -2,11 +2,14 @@ module github.com/talos-systems/go-loadbalancer go 1.14 +replace inet.af/tcpproxy => github.com/smira/tcpproxy v0.0.0-20201015133617-de5f7797b95b + require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/kr/pretty v0.1.0 // indirect github.com/stretchr/testify v1.6.1 github.com/talos-systems/go-retry v0.1.0 + golang.org/x/sys v0.0.0-20201015000850-e3ed0017c211 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect inet.af/tcpproxy v0.0.0-20200125044825-b6bb9b5b8252 ) diff --git a/go.sum b/go.sum index bea74de..593a6c8 100644 --- a/go.sum +++ b/go.sum @@ -8,17 +8,17 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/smira/tcpproxy v0.0.0-20201015133617-de5f7797b95b h1:95WXQlM2dDPgIXTlnpwiJPa0l0ipl1RwMvdy8KLUyH8= +github.com/smira/tcpproxy v0.0.0-20201015133617-de5f7797b95b/go.mod h1:yDIWrelwlTRXdKvQqqQ+8lCwCjbSRtkat49REnui7hk= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/talos-systems/go-retry v0.0.0-20200902131929-073067bd95a7 h1:KL8Nz3tlwA1UhTsoGQXwg1gDoYPldiwfTtcipcuALXM= -github.com/talos-systems/go-retry v0.0.0-20200902131929-073067bd95a7/go.mod h1:HiXQqyVStZ35uSY/MTLWVvQVmC3lIW2MS5VdDaMtoKM= github.com/talos-systems/go-retry v0.1.0 h1:O+OeZR54CQ1+ch99p/81Pqi5GqJH6LIu1MTN/N0vd78= github.com/talos-systems/go-retry v0.1.0/go.mod h1:HiXQqyVStZ35uSY/MTLWVvQVmC3lIW2MS5VdDaMtoKM= +golang.org/x/sys v0.0.0-20201015000850-e3ed0017c211 h1:9UQO31fZ+0aKQOFldThf7BKPMJTiBfWycGh/u3UoO88= +golang.org/x/sys v0.0.0-20201015000850-e3ed0017c211/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -inet.af/tcpproxy v0.0.0-20200125044825-b6bb9b5b8252 h1:gmJCKidOfjKDUHF1jjke+I+2iQIyE3HNNxu2OKO/FUI= -inet.af/tcpproxy v0.0.0-20200125044825-b6bb9b5b8252/go.mod h1:zq+R+tLcdHugi7Jt+FtIQY6m6wtX34lr2CdQVH2fhW0= diff --git a/loadbalancer/loadbalancer.go b/loadbalancer/loadbalancer.go index 1799203..e7ae655 100644 --- a/loadbalancer/loadbalancer.go +++ b/loadbalancer/loadbalancer.go @@ -10,6 +10,7 @@ import ( "fmt" "log" "net" + "time" "inet.af/tcpproxy" @@ -27,6 +28,10 @@ import ( type TCP struct { tcpproxy.Proxy + DialTimeout time.Duration + KeepAlivePeriod time.Duration + TCPUserTimeout time.Duration + Logger *log.Logger routes map[string]*upstream.List @@ -51,8 +56,11 @@ func (upstream lbUpstream) HealthCheck(ctx context.Context) error { } type lbTarget struct { - list *upstream.List - logger *log.Logger + list *upstream.List + logger *log.Logger + dialTimeout time.Duration + keepAlivePeriod time.Duration + tcpUserTimeout time.Duration } func (target *lbTarget) HandleConn(conn net.Conn) { @@ -69,6 +77,9 @@ func (target *lbTarget) HandleConn(conn net.Conn) { target.logger.Printf("proxying connection %s -> %s", conn.RemoteAddr(), upstream.upstream) upstreamTarget := tcpproxy.To(upstream.upstream) + upstreamTarget.DialTimeout = target.dialTimeout + upstreamTarget.KeepAlivePeriod = target.keepAlivePeriod + upstreamTarget.TCPUserTimeout = target.tcpUserTimeout upstreamTarget.OnDialError = func(src net.Conn, dstDialErr error) { src.Close() //nolint: errcheck @@ -78,6 +89,8 @@ func (target *lbTarget) HandleConn(conn net.Conn) { } upstreamTarget.HandleConn(conn) + + target.logger.Printf("closing connection %s -> %s", conn.RemoteAddr(), upstream.upstream) } // AddRoute installs load balancer route from listen address ipAddr to list of upstreams. @@ -109,8 +122,11 @@ func (t *TCP) AddRoute(ipPort string, upstreamAddrs []string, options ...upstrea t.routes[ipPort] = list t.Proxy.AddRoute(ipPort, &lbTarget{ - list: list, - logger: t.Logger, + list: list, + logger: t.Logger, + dialTimeout: t.DialTimeout, + keepAlivePeriod: t.KeepAlivePeriod, + tcpUserTimeout: t.TCPUserTimeout, }) return nil