Skip to content

Commit

Permalink
fix: HttpSink with cannot assign requested address issue (#16)
Browse files Browse the repository at this point in the history
Signed-off-by: Saravanan Balasubramanian <[email protected]>
  • Loading branch information
sarabala1979 authored Feb 8, 2023
1 parent cbe2eec commit 1d49640
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 6 deletions.
4 changes: 4 additions & 0 deletions http-sink/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ go 1.18
require (
github.com/numaproj/numaflow v0.6.0
github.com/numaproj/numaflow-go v0.2.3
github.com/stretchr/testify v1.8.0
go.uber.org/zap v1.23.0
k8s.io/apimachinery v0.23.3
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.2.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
golang.org/x/net v0.0.0-20220909164309-bea034e7d591 // indirect
Expand All @@ -21,6 +24,7 @@ require (
google.golang.org/genproto v0.0.0-20220920201722-2b89144ce006 // indirect
google.golang.org/grpc v1.49.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.30.0 // indirect
k8s.io/utils v0.0.0-20211116205334-6203023598ed // indirect
)
7 changes: 7 additions & 0 deletions http-sink/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
Expand All @@ -78,6 +79,7 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/numaproj/numaflow v0.6.0 h1:W6UF9A0mE+3EDopCDvY7Ji8K8zK3rSd5m04CpGvYQs8=
github.com/numaproj/numaflow v0.6.0/go.mod h1:RyerSgXaBcWObYDyv46PuTFuyArmgr7acFQr3LUmSNQ=
Expand All @@ -100,10 +102,13 @@ github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTd
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
Expand Down Expand Up @@ -216,6 +221,7 @@ google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
Expand All @@ -230,6 +236,7 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
k8s.io/apimachinery v0.23.3 h1:7IW6jxNzrXTsP0c8yXz2E5Yx/WTzVPTsHIx/2Vm0cIk=
Expand Down
25 changes: 19 additions & 6 deletions http-sink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/tls"
"errors"
"flag"
"io"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -18,6 +19,7 @@ import (

type httpSink struct {
logger *zap.SugaredLogger
httpClient *http.Client
url string
method string
retries int
Expand All @@ -38,25 +40,35 @@ func (i *arrayFlags) Set(value string) error {
return nil
}

func (hs *httpSink) sendHTTPRequest(data io.Reader) error {
func (hs *httpSink) createHTTPClient() {
//creating http client
client := &http.Client{Timeout: time.Duration(hs.timeout) * time.Second}
if hs.skipInsecure {
hs.logger.Info("Send insecure request")
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
client = &http.Client{Timeout: 2 * time.Second, Transport: tr}
client.Transport = tr
}
hs.httpClient = client
}

func (hs *httpSink) sendHTTPRequest(data io.Reader) error {
req, err := http.NewRequest(hs.method, hs.url, data)
if err != nil {
return err
}
res, err := client.Do(req)
if hs.httpClient == nil {
return errors.New("HTTP Client is not initialized")
}
res, err := hs.httpClient.Do(req)
if err != nil {
return err
}
if res != nil {
hs.logger.Infof("Response code: %d, Response Body: %s", res.StatusCode, res.Body)
if res.Body != nil {
res.Body.Close()
}
hs.logger.Infof("Response code: %d,", res.StatusCode)
}
return nil
}
Expand Down Expand Up @@ -111,7 +123,8 @@ func main() {

// Parse the flag
flag.Parse()

//creating http client
hs.createHTTPClient()
hs.logger.Info("HTTP Sink starting successfully with args %v", hs)
server.New().RegisterSinker(sinksdk.SinkFunc(hs.handle)).Start(context.Background())
}
26 changes: 26 additions & 0 deletions http-sink/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package main

import (
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/stretchr/testify/assert"
"net/http"
"net/http/httptest"
"testing"
)

func TestHttp_client(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)

}))
hs := httpSink{}
hs.url = server.URL
hs.method = http.MethodPost
hs.logger = logging.NewLogger().Named("http-sink")
err := hs.sendHTTPRequest(nil)
assert.Error(t, err)

hs.createHTTPClient()
err = hs.sendHTTPRequest(nil)
assert.NoError(t, err)
}

0 comments on commit 1d49640

Please sign in to comment.