Skip to content

Commit

Permalink
support portName in HTTPScaledObject service scaleTargetRef
Browse files Browse the repository at this point in the history
Signed-off-by: Jan Wozniak <[email protected]>
  • Loading branch information
wozniakjan committed Oct 23, 2024
1 parent 48a1881 commit 4731e6d
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ This changelog keeps track of work items that have been completed and are ready

### New

- **General**: Support portName in HTTPScaledObject service scaleTargetRef ([#1174](https://github.com/kedacore/http-add-on/issues/1174))
- **General**: Support setting multiple TLS certs for different domains on the interceptor proxy ([#1116](https://github.com/kedacore/http-add-on/issues/1116))
- **General**: TODO ([#TODO](https://github.com/kedacore/http-add-on/issues/TODO))

Expand Down
6 changes: 4 additions & 2 deletions interceptor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func main() {

setupLog.Info("starting the proxy server with TLS enabled", "port", proxyTLSPort)

if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, timeoutCfg, proxyTLSPort, proxyTLSEnabled, proxyTLSConfig); !util.IsIgnoredErr(err) {
if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, endpointsCache, timeoutCfg, proxyTLSPort, proxyTLSEnabled, proxyTLSConfig); !util.IsIgnoredErr(err) {
setupLog.Error(err, "tls proxy server failed")
return err
}
Expand All @@ -188,7 +188,7 @@ func main() {
eg.Go(func() error {
setupLog.Info("starting the proxy server with TLS disabled", "port", proxyPort)

if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, timeoutCfg, proxyPort, false, nil); !util.IsIgnoredErr(err) {
if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, endpointsCache, timeoutCfg, proxyPort, false, nil); !util.IsIgnoredErr(err) {
setupLog.Error(err, "proxy server failed")
return err
}
Expand Down Expand Up @@ -369,6 +369,7 @@ func runProxyServer(
q queue.Counter,
waitFunc forwardWaitFunc,
routingTable routing.Table,
endpointsCache k8s.EndpointsCache,
timeouts *config.Timeouts,
port int,
tlsEnabled bool,
Expand Down Expand Up @@ -416,6 +417,7 @@ func runProxyServer(
routingTable,
probeHandler,
upstreamHandler,
endpointsCache,
tlsEnabled,
)
rootHandler = middleware.NewLogging(
Expand Down
6 changes: 6 additions & 0 deletions interceptor/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestRunProxyServerCountMiddleware(t *testing.T) {
// server
routingTable := routingtest.NewTable()
routingTable.Memory[host] = httpso
endpointsCache := k8s.NewFakeEndpointsCache()

timeouts := &config.Timeouts{}
waiterCh := make(chan struct{})
Expand All @@ -77,6 +78,7 @@ func TestRunProxyServerCountMiddleware(t *testing.T) {
q,
waitFunc,
routingTable,
endpointsCache,
timeouts,
port,
false,
Expand Down Expand Up @@ -194,6 +196,7 @@ func TestRunProxyServerWithTLSCountMiddleware(t *testing.T) {
// server
routingTable := routingtest.NewTable()
routingTable.Memory[host] = httpso
endpointsCache := k8s.NewFakeEndpointsCache()

timeouts := &config.Timeouts{}
waiterCh := make(chan struct{})
Expand All @@ -209,6 +212,7 @@ func TestRunProxyServerWithTLSCountMiddleware(t *testing.T) {
q,
waitFunc,
routingTable,
endpointsCache,
timeouts,
port,
true,
Expand Down Expand Up @@ -339,6 +343,7 @@ func TestRunProxyServerWithMultipleCertsTLSCountMiddleware(t *testing.T) {
// server
routingTable := routingtest.NewTable()
routingTable.Memory[host] = httpso
endpointsCache := k8s.NewFakeEndpointsCache()

timeouts := &config.Timeouts{}
waiterCh := make(chan struct{})
Expand All @@ -354,6 +359,7 @@ func TestRunProxyServerWithMultipleCertsTLSCountMiddleware(t *testing.T) {
q,
waitFunc,
routingTable,
endpointsCache,
timeouts,
port,
true,
Expand Down
34 changes: 31 additions & 3 deletions interceptor/middleware/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/kedacore/http-add-on/interceptor/handler"
httpv1alpha1 "github.com/kedacore/http-add-on/operator/apis/http/v1alpha1"
"github.com/kedacore/http-add-on/pkg/k8s"
"github.com/kedacore/http-add-on/pkg/routing"
"github.com/kedacore/http-add-on/pkg/util"
)
Expand All @@ -21,14 +22,16 @@ type Routing struct {
routingTable routing.Table
probeHandler http.Handler
upstreamHandler http.Handler
endpointsCache k8s.EndpointsCache
tlsEnabled bool
}

func NewRouting(routingTable routing.Table, probeHandler http.Handler, upstreamHandler http.Handler, tlsEnabled bool) *Routing {
func NewRouting(routingTable routing.Table, probeHandler http.Handler, upstreamHandler http.Handler, endpointsCache k8s.EndpointsCache, tlsEnabled bool) *Routing {
return &Routing{
routingTable: routingTable,
probeHandler: probeHandler,
upstreamHandler: upstreamHandler,
endpointsCache: endpointsCache,
tlsEnabled: tlsEnabled,
}
}
Expand Down Expand Up @@ -64,21 +67,46 @@ func (rm *Routing) ServeHTTP(w http.ResponseWriter, r *http.Request) {
rm.upstreamHandler.ServeHTTP(w, r)
}

func (rm *Routing) getPort(httpso *httpv1alpha1.HTTPScaledObject) (int32, error) {
if httpso.Spec.ScaleTargetRef.Port != 0 {
return httpso.Spec.ScaleTargetRef.Port, nil
}
if httpso.Spec.ScaleTargetRef.PortName == "" {
return 0, fmt.Errorf("must specify either port or portName")
}
endpoints, err := rm.endpointsCache.Get(httpso.GetNamespace(), httpso.Spec.ScaleTargetRef.Service)
if err != nil {
return 0, fmt.Errorf("failed to get Endpoints: %w", err)
}
for _, subset := range endpoints.Subsets {
for _, port := range subset.Ports {
if port.Name == httpso.Spec.ScaleTargetRef.PortName {
return port.Port, nil
}
}
}
return 0, fmt.Errorf("portName %s not found in Endpoints", httpso.Spec.ScaleTargetRef.PortName)
}

func (rm *Routing) streamFromHTTPSO(httpso *httpv1alpha1.HTTPScaledObject) (*url.URL, error) {
port, err := rm.getPort(httpso)
if err != nil {
return nil, fmt.Errorf("failed to get port: %w", err)
}
if rm.tlsEnabled {
return url.Parse(fmt.Sprintf(
"https://%s.%s:%d",
httpso.Spec.ScaleTargetRef.Service,
httpso.GetNamespace(),
httpso.Spec.ScaleTargetRef.Port,
port,
))
}
//goland:noinspection HttpUrlsUsage
return url.Parse(fmt.Sprintf(
"http://%s.%s:%d",
httpso.Spec.ScaleTargetRef.Service,
httpso.GetNamespace(),
httpso.Spec.ScaleTargetRef.Port,
port,
))
}

Expand Down
112 changes: 110 additions & 2 deletions interceptor/middleware/routing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

httpv1alpha1 "github.com/kedacore/http-add-on/operator/apis/http/v1alpha1"
"github.com/kedacore/http-add-on/pkg/k8s"
routingtest "github.com/kedacore/http-add-on/pkg/routing/test"
)

Expand All @@ -22,8 +26,9 @@ var _ = Describe("RoutingMiddleware", func() {
emptyHandler := http.HandlerFunc(func(http.ResponseWriter, *http.Request) {})
probeHandler.Handle("/probe", emptyHandler)
upstreamHandler.Handle("/upstream", emptyHandler)
endpointsCache := k8s.NewFakeEndpointsCache()

rm := NewRouting(routingTable, probeHandler, upstreamHandler, false)
rm := NewRouting(routingTable, probeHandler, upstreamHandler, endpointsCache, false)
Expect(rm).NotTo(BeNil())
Expect(rm.routingTable).To(Equal(routingTable))
Expect(rm.probeHandler).To(Equal(probeHandler))
Expand All @@ -40,6 +45,7 @@ var _ = Describe("RoutingMiddleware", func() {
var (
upstreamHandler *http.ServeMux
probeHandler *http.ServeMux
endpointsCache *k8s.FakeEndpointsCache
routingTable *routingtest.Table
routingMiddleware *Routing
w *httptest.ResponseRecorder
Expand All @@ -50,6 +56,41 @@ var _ = Describe("RoutingMiddleware", func() {
Hosts: []string{
host,
},
ScaleTargetRef: httpv1alpha1.ScaleTargetRef{
Port: 80,
},
},
}

httpsoWithPortName = httpv1alpha1.HTTPScaledObject{
ObjectMeta: metav1.ObjectMeta{
Name: "keda",
Namespace: "default",
},
Spec: httpv1alpha1.HTTPScaledObjectSpec{
Hosts: []string{
"keda2.sh",
},
ScaleTargetRef: httpv1alpha1.ScaleTargetRef{
Service: "keda-svc",
PortName: "http",
},
},
}
endpoints = corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "keda-svc",
Namespace: "default",
},
Subsets: []corev1.EndpointSubset{
{
Ports: []corev1.EndpointPort{
{
Name: "http",
Port: 80,
},
},
},
},
}
)
Expand All @@ -58,7 +99,8 @@ var _ = Describe("RoutingMiddleware", func() {
upstreamHandler = http.NewServeMux()
probeHandler = http.NewServeMux()
routingTable = routingtest.NewTable()
routingMiddleware = NewRouting(routingTable, probeHandler, upstreamHandler, false)
endpointsCache = k8s.NewFakeEndpointsCache()
routingMiddleware = NewRouting(routingTable, probeHandler, upstreamHandler, endpointsCache, false)

w = httptest.NewRecorder()

Expand Down Expand Up @@ -91,14 +133,80 @@ var _ = Describe("RoutingMiddleware", func() {
routingTable.Memory[host] = &httpso

routingMiddleware.ServeHTTP(w, r)
Expect(uh).To(BeTrue())
Expect(ph).To(BeFalse())
Expect(w.Code).To(Equal(sc))
Expect(w.Body.String()).To(Equal(st))
})
})

When("route is found with portName", func() {
It("routes to the upstream handler", func() {
endpointsCache.Set(endpoints)
var (
sc = http.StatusTeapot
st = http.StatusText(sc)
)

var uh bool
upstreamHandler.Handle(path, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusTeapot)

_, err := w.Write([]byte(st))
Expect(err).NotTo(HaveOccurred())

uh = true
}))

var ph bool
probeHandler.Handle(path, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ph = true
}))

routingTable.Memory["keda2.sh"] = &httpsoWithPortName

r.Host = "keda2.sh"
routingMiddleware.ServeHTTP(w, r)
Expect(uh).To(BeTrue())
Expect(ph).To(BeFalse())
Expect(w.Code).To(Equal(sc))
Expect(w.Body.String()).To(Equal(st))
})
})

When("route is found with portName but endpoints are mismatched", func() {
It("errors to route to upstream handler", func() {
var (
sc = http.StatusTeapot
st = http.StatusText(sc)
)

var uh bool
upstreamHandler.Handle(path, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusTeapot)

_, err := w.Write([]byte(st))
Expect(err).NotTo(HaveOccurred())

uh = true
}))

var ph bool
probeHandler.Handle(path, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ph = true
}))

routingTable.Memory["keda2.sh"] = &httpsoWithPortName

r.Host = "keda2.sh"
routingMiddleware.ServeHTTP(w, r)
Expect(uh).To(BeFalse())
Expect(ph).To(BeFalse())
Expect(w.Code).To(Equal(http.StatusInternalServerError))
Expect(w.Body.String()).To(Equal("Internal Server Error"))
})
})

When("route is not found", func() {
It("routes to the probe handler", func() {
const (
Expand Down
1 change: 1 addition & 0 deletions interceptor/proxy_handlers_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ func newHarness(
respHeaderTimeout: time.Second,
},
&tls.Config{}),
endpCache,
false,
)

Expand Down
6 changes: 5 additions & 1 deletion operator/apis/http/v1alpha1/httpscaledobject_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ type ScaleTargetRef struct {
// The name of the service to route to
Service string `json:"service"`
// The port to route to
Port int32 `json:"port"`
// +optional
Port int32 `json:"port,omitempty"`
// The port to route to referenced by name
// +optional
PortName string `json:"portName,omitempty"`
}

// ReplicaStruct contains the minimum and maximum amount of replicas to have in the deployment
Expand Down

0 comments on commit 4731e6d

Please sign in to comment.