From 29bca179aa28d27f5a5cc1b704a677ca52d33577 Mon Sep 17 00:00:00 2001 From: Jan Wozniak Date: Wed, 23 Oct 2024 15:27:17 +0200 Subject: [PATCH] support portName in HTTPScaledObject service scaleTargetRef Signed-off-by: Jan Wozniak --- CHANGELOG.md | 1 + interceptor/main.go | 6 +- interceptor/middleware/routing.go | 34 +++++- interceptor/middleware/routing_test.go | 112 +++++++++++++++++- .../http/v1alpha1/httpscaledobject_types.go | 6 +- 5 files changed, 151 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 64d57dcb9..065954515 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ This changelog keeps track of work items that have been completed and are ready ### New +- **General**: Support portName in HTTPScaledObject service scaleTargetRef ([#1174](https://github.com/kedacore/http-add-on/issues/1174)) - **General**: Support setting multiple TLS certs for different domains on the interceptor proxy ([#1116](https://github.com/kedacore/http-add-on/issues/1116)) - **General**: TODO ([#TODO](https://github.com/kedacore/http-add-on/issues/TODO)) diff --git a/interceptor/main.go b/interceptor/main.go index 56f451165..cb0cf634a 100644 --- a/interceptor/main.go +++ b/interceptor/main.go @@ -176,7 +176,7 @@ func main() { setupLog.Info("starting the proxy server with TLS enabled", "port", proxyTLSPort) - if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, timeoutCfg, proxyTLSPort, proxyTLSEnabled, proxyTLSConfig); !util.IsIgnoredErr(err) { + if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, endpointsCache, timeoutCfg, proxyTLSPort, proxyTLSEnabled, proxyTLSConfig); !util.IsIgnoredErr(err) { setupLog.Error(err, "tls proxy server failed") return err } @@ -188,7 +188,7 @@ func main() { eg.Go(func() error { setupLog.Info("starting the proxy server with TLS disabled", "port", proxyPort) - if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, timeoutCfg, proxyPort, false, nil); !util.IsIgnoredErr(err) { + if err := runProxyServer(ctx, ctrl.Log, queues, waitFunc, routingTable, endpointsCache, timeoutCfg, proxyPort, false, nil); !util.IsIgnoredErr(err) { setupLog.Error(err, "proxy server failed") return err } @@ -369,6 +369,7 @@ func runProxyServer( q queue.Counter, waitFunc forwardWaitFunc, routingTable routing.Table, + endpointsCache k8s.EndpointsCache, timeouts *config.Timeouts, port int, tlsEnabled bool, @@ -416,6 +417,7 @@ func runProxyServer( routingTable, probeHandler, upstreamHandler, + endpointsCache, tlsEnabled, ) rootHandler = middleware.NewLogging( diff --git a/interceptor/middleware/routing.go b/interceptor/middleware/routing.go index 197f5c507..339ed2d9a 100644 --- a/interceptor/middleware/routing.go +++ b/interceptor/middleware/routing.go @@ -8,6 +8,7 @@ import ( "github.com/kedacore/http-add-on/interceptor/handler" httpv1alpha1 "github.com/kedacore/http-add-on/operator/apis/http/v1alpha1" + "github.com/kedacore/http-add-on/pkg/k8s" "github.com/kedacore/http-add-on/pkg/routing" "github.com/kedacore/http-add-on/pkg/util" ) @@ -21,14 +22,16 @@ type Routing struct { routingTable routing.Table probeHandler http.Handler upstreamHandler http.Handler + endpointsCache k8s.EndpointsCache tlsEnabled bool } -func NewRouting(routingTable routing.Table, probeHandler http.Handler, upstreamHandler http.Handler, tlsEnabled bool) *Routing { +func NewRouting(routingTable routing.Table, probeHandler http.Handler, upstreamHandler http.Handler, endpointsCache k8s.EndpointsCache, tlsEnabled bool) *Routing { return &Routing{ routingTable: routingTable, probeHandler: probeHandler, upstreamHandler: upstreamHandler, + endpointsCache: endpointsCache, tlsEnabled: tlsEnabled, } } @@ -64,13 +67,38 @@ func (rm *Routing) ServeHTTP(w http.ResponseWriter, r *http.Request) { rm.upstreamHandler.ServeHTTP(w, r) } +func (rm *Routing) getPort(httpso *httpv1alpha1.HTTPScaledObject) (int32, error) { + if httpso.Spec.ScaleTargetRef.Port != 0 { + return httpso.Spec.ScaleTargetRef.Port, nil + } + if httpso.Spec.ScaleTargetRef.PortName == "" { + return 0, fmt.Errorf("must specify either port or portName") + } + endpoints, err := rm.endpointsCache.Get(httpso.GetNamespace(), httpso.Spec.ScaleTargetRef.Service) + if err != nil { + return 0, fmt.Errorf("failed to get Endpoints: %w", err) + } + for _, subset := range endpoints.Subsets { + for _, port := range subset.Ports { + if port.Name == httpso.Spec.ScaleTargetRef.PortName { + return port.Port, nil + } + } + } + return 0, fmt.Errorf("portName %s not found in Endpoints", httpso.Spec.ScaleTargetRef.PortName) +} + func (rm *Routing) streamFromHTTPSO(httpso *httpv1alpha1.HTTPScaledObject) (*url.URL, error) { + port, err := rm.getPort(httpso) + if err != nil { + return nil, fmt.Errorf("failed to get port: %w", err) + } if rm.tlsEnabled { return url.Parse(fmt.Sprintf( "https://%s.%s:%d", httpso.Spec.ScaleTargetRef.Service, httpso.GetNamespace(), - httpso.Spec.ScaleTargetRef.Port, + port, )) } //goland:noinspection HttpUrlsUsage @@ -78,7 +106,7 @@ func (rm *Routing) streamFromHTTPSO(httpso *httpv1alpha1.HTTPScaledObject) (*url "http://%s.%s:%d", httpso.Spec.ScaleTargetRef.Service, httpso.GetNamespace(), - httpso.Spec.ScaleTargetRef.Port, + port, )) } diff --git a/interceptor/middleware/routing_test.go b/interceptor/middleware/routing_test.go index b26f8086c..b5b16a84a 100644 --- a/interceptor/middleware/routing_test.go +++ b/interceptor/middleware/routing_test.go @@ -7,7 +7,11 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + httpv1alpha1 "github.com/kedacore/http-add-on/operator/apis/http/v1alpha1" + "github.com/kedacore/http-add-on/pkg/k8s" routingtest "github.com/kedacore/http-add-on/pkg/routing/test" ) @@ -22,8 +26,9 @@ var _ = Describe("RoutingMiddleware", func() { emptyHandler := http.HandlerFunc(func(http.ResponseWriter, *http.Request) {}) probeHandler.Handle("/probe", emptyHandler) upstreamHandler.Handle("/upstream", emptyHandler) + endpointsCache := k8s.NewFakeEndpointsCache() - rm := NewRouting(routingTable, probeHandler, upstreamHandler, false) + rm := NewRouting(routingTable, probeHandler, upstreamHandler, endpointsCache, false) Expect(rm).NotTo(BeNil()) Expect(rm.routingTable).To(Equal(routingTable)) Expect(rm.probeHandler).To(Equal(probeHandler)) @@ -40,6 +45,7 @@ var _ = Describe("RoutingMiddleware", func() { var ( upstreamHandler *http.ServeMux probeHandler *http.ServeMux + endpointsCache *k8s.FakeEndpointsCache routingTable *routingtest.Table routingMiddleware *Routing w *httptest.ResponseRecorder @@ -50,6 +56,41 @@ var _ = Describe("RoutingMiddleware", func() { Hosts: []string{ host, }, + ScaleTargetRef: httpv1alpha1.ScaleTargetRef{ + Port: 80, + }, + }, + } + + httpsoWithPortName = httpv1alpha1.HTTPScaledObject{ + ObjectMeta: metav1.ObjectMeta{ + Name: "keda", + Namespace: "default", + }, + Spec: httpv1alpha1.HTTPScaledObjectSpec{ + Hosts: []string{ + "keda2.sh", + }, + ScaleTargetRef: httpv1alpha1.ScaleTargetRef{ + Service: "keda-svc", + PortName: "http", + }, + }, + } + endpoints = corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "keda-svc", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Ports: []corev1.EndpointPort{ + { + Name: "http", + Port: 80, + }, + }, + }, }, } ) @@ -58,7 +99,8 @@ var _ = Describe("RoutingMiddleware", func() { upstreamHandler = http.NewServeMux() probeHandler = http.NewServeMux() routingTable = routingtest.NewTable() - routingMiddleware = NewRouting(routingTable, probeHandler, upstreamHandler, false) + endpointsCache = k8s.NewFakeEndpointsCache() + routingMiddleware = NewRouting(routingTable, probeHandler, upstreamHandler, endpointsCache, false) w = httptest.NewRecorder() @@ -91,7 +133,40 @@ var _ = Describe("RoutingMiddleware", func() { routingTable.Memory[host] = &httpso routingMiddleware.ServeHTTP(w, r) + Expect(uh).To(BeTrue()) + Expect(ph).To(BeFalse()) + Expect(w.Code).To(Equal(sc)) + Expect(w.Body.String()).To(Equal(st)) + }) + }) + + When("route is found with portName", func() { + It("routes to the upstream handler", func() { + endpointsCache.Set(endpoints) + var ( + sc = http.StatusTeapot + st = http.StatusText(sc) + ) + + var uh bool + upstreamHandler.Handle(path, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusTeapot) + + _, err := w.Write([]byte(st)) + Expect(err).NotTo(HaveOccurred()) + + uh = true + })) + var ph bool + probeHandler.Handle(path, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ph = true + })) + + routingTable.Memory["keda2.sh"] = &httpsoWithPortName + + r.Host = "keda2.sh" + routingMiddleware.ServeHTTP(w, r) Expect(uh).To(BeTrue()) Expect(ph).To(BeFalse()) Expect(w.Code).To(Equal(sc)) @@ -99,6 +174,39 @@ var _ = Describe("RoutingMiddleware", func() { }) }) + When("route is found with portName but endpoints are mismatched", func() { + It("errors to route to upstream handler", func() { + var ( + sc = http.StatusTeapot + st = http.StatusText(sc) + ) + + var uh bool + upstreamHandler.Handle(path, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusTeapot) + + _, err := w.Write([]byte(st)) + Expect(err).NotTo(HaveOccurred()) + + uh = true + })) + + var ph bool + probeHandler.Handle(path, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ph = true + })) + + routingTable.Memory["keda2.sh"] = &httpsoWithPortName + + r.Host = "keda2.sh" + routingMiddleware.ServeHTTP(w, r) + Expect(uh).To(BeFalse()) + Expect(ph).To(BeFalse()) + Expect(w.Code).To(Equal(http.StatusInternalServerError)) + Expect(w.Body.String()).To(Equal("Internal Server Error")) + }) + }) + When("route is not found", func() { It("routes to the probe handler", func() { const ( diff --git a/operator/apis/http/v1alpha1/httpscaledobject_types.go b/operator/apis/http/v1alpha1/httpscaledobject_types.go index 0b2b039b0..a76040eeb 100644 --- a/operator/apis/http/v1alpha1/httpscaledobject_types.go +++ b/operator/apis/http/v1alpha1/httpscaledobject_types.go @@ -31,7 +31,11 @@ type ScaleTargetRef struct { // The name of the service to route to Service string `json:"service"` // The port to route to - Port int32 `json:"port"` + // +optional + Port int32 `json:"port,omitempty"` + // The port to route to referenced by name + // +optional + PortName string `json:"portName,omitempty"` } // ReplicaStruct contains the minimum and maximum amount of replicas to have in the deployment