From 1d49640d41b8e641e4cdc8d17316f3d05506ff6b Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian <33908564+sarabala1979@users.noreply.github.com> Date: Tue, 7 Feb 2023 17:09:31 -0800 Subject: [PATCH] fix: HttpSink with cannot assign requested address issue (#16) Signed-off-by: Saravanan Balasubramanian --- http-sink/go.mod | 4 ++++ http-sink/go.sum | 7 +++++++ http-sink/main.go | 25 +++++++++++++++++++------ http-sink/main_test.go | 26 ++++++++++++++++++++++++++ 4 files changed, 56 insertions(+), 6 deletions(-) create mode 100644 http-sink/main_test.go diff --git a/http-sink/go.mod b/http-sink/go.mod index fa46ff1..c42258a 100644 --- a/http-sink/go.mod +++ b/http-sink/go.mod @@ -5,14 +5,17 @@ go 1.18 require ( github.com/numaproj/numaflow v0.6.0 github.com/numaproj/numaflow-go v0.2.3 + github.com/stretchr/testify v1.8.0 go.uber.org/zap v1.23.0 k8s.io/apimachinery v0.23.3 ) require ( + github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logr/logr v1.2.0 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/go-cmp v0.5.8 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.7.0 // indirect golang.org/x/net v0.0.0-20220909164309-bea034e7d591 // indirect @@ -21,6 +24,7 @@ require ( google.golang.org/genproto v0.0.0-20220920201722-2b89144ce006 // indirect google.golang.org/grpc v1.49.0 // indirect google.golang.org/protobuf v1.28.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/klog/v2 v2.30.0 // indirect k8s.io/utils v0.0.0-20211116205334-6203023598ed // indirect ) diff --git a/http-sink/go.sum b/http-sink/go.sum index b24390f..bd47caa 100644 --- a/http-sink/go.sum +++ b/http-sink/go.sum @@ -67,6 +67,7 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= @@ -78,6 +79,7 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/numaproj/numaflow v0.6.0 h1:W6UF9A0mE+3EDopCDvY7Ji8K8zK3rSd5m04CpGvYQs8= github.com/numaproj/numaflow v0.6.0/go.mod h1:RyerSgXaBcWObYDyv46PuTFuyArmgr7acFQr3LUmSNQ= @@ -100,10 +102,13 @@ github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTd github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= @@ -216,6 +221,7 @@ google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= @@ -230,6 +236,7 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= k8s.io/apimachinery v0.23.3 h1:7IW6jxNzrXTsP0c8yXz2E5Yx/WTzVPTsHIx/2Vm0cIk= diff --git a/http-sink/main.go b/http-sink/main.go index 9626e6f..d37d9a4 100644 --- a/http-sink/main.go +++ b/http-sink/main.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/tls" + "errors" "flag" "io" "k8s.io/apimachinery/pkg/util/wait" @@ -18,6 +19,7 @@ import ( type httpSink struct { logger *zap.SugaredLogger + httpClient *http.Client url string method string retries int @@ -38,25 +40,35 @@ func (i *arrayFlags) Set(value string) error { return nil } -func (hs *httpSink) sendHTTPRequest(data io.Reader) error { +func (hs *httpSink) createHTTPClient() { + //creating http client client := &http.Client{Timeout: time.Duration(hs.timeout) * time.Second} if hs.skipInsecure { - hs.logger.Info("Send insecure request") tr := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, } - client = &http.Client{Timeout: 2 * time.Second, Transport: tr} + client.Transport = tr } + hs.httpClient = client +} + +func (hs *httpSink) sendHTTPRequest(data io.Reader) error { req, err := http.NewRequest(hs.method, hs.url, data) if err != nil { return err } - res, err := client.Do(req) + if hs.httpClient == nil { + return errors.New("HTTP Client is not initialized") + } + res, err := hs.httpClient.Do(req) if err != nil { return err } if res != nil { - hs.logger.Infof("Response code: %d, Response Body: %s", res.StatusCode, res.Body) + if res.Body != nil { + res.Body.Close() + } + hs.logger.Infof("Response code: %d,", res.StatusCode) } return nil } @@ -111,7 +123,8 @@ func main() { // Parse the flag flag.Parse() - + //creating http client + hs.createHTTPClient() hs.logger.Info("HTTP Sink starting successfully with args %v", hs) server.New().RegisterSinker(sinksdk.SinkFunc(hs.handle)).Start(context.Background()) } diff --git a/http-sink/main_test.go b/http-sink/main_test.go new file mode 100644 index 0000000..b853092 --- /dev/null +++ b/http-sink/main_test.go @@ -0,0 +1,26 @@ +package main + +import ( + "github.com/numaproj/numaflow/pkg/shared/logging" + "github.com/stretchr/testify/assert" + "net/http" + "net/http/httptest" + "testing" +) + +func TestHttp_client(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNoContent) + + })) + hs := httpSink{} + hs.url = server.URL + hs.method = http.MethodPost + hs.logger = logging.NewLogger().Named("http-sink") + err := hs.sendHTTPRequest(nil) + assert.Error(t, err) + + hs.createHTTPClient() + err = hs.sendHTTPRequest(nil) + assert.NoError(t, err) +}