From 2407d9f5ef9528abd087a687de2d60686c5e1326 Mon Sep 17 00:00:00 2001 From: liuhaoming Date: Tue, 16 Jul 2024 15:47:55 +0800 Subject: [PATCH] feat: optimize port forwarding for apply --- pkg/cmd/apply/apply.go | 65 +++++++++---- pkg/engine/operation/port_forward.go | 135 +++++++++++++++++---------- 2 files changed, 136 insertions(+), 64 deletions(-) diff --git a/pkg/cmd/apply/apply.go b/pkg/cmd/apply/apply.go index d6c2d20a9..36033914b 100644 --- a/pkg/cmd/apply/apply.go +++ b/pkg/cmd/apply/apply.go @@ -23,6 +23,7 @@ import ( "os" "path/filepath" "reflect" + "strconv" "strings" "sync" "time" @@ -83,8 +84,11 @@ var ( # Apply with the specified timeout duration for kusion apply command, measured in second(s) kusion apply --timeout=120 - # Apply with localhost port forwarding - kusion apply --port-forward=8080`) + # Apply with localhost port forwarding (local port number by default equals to k8s pod or service) + kusion apply --port-forward=8080 + + # Apply with specified localhost port forwarding (12345 at local to 8080 of k8s pod or service) + kusion apply --port-forward=12345:8080)`) ) // To handle the release phase update when panic occurs. @@ -111,7 +115,7 @@ type ApplyFlags struct { DryRun bool Watch bool Timeout int - PortForward int + PortForward string genericiooptions.IOStreams } @@ -120,12 +124,13 @@ type ApplyFlags struct { type ApplyOptions struct { *preview.PreviewOptions - SpecFile string - Yes bool - DryRun bool - Watch bool - Timeout int - PortForward int + SpecFile string + Yes bool + DryRun bool + Watch bool + Timeout int + LocalPort int + K8sPort int genericiooptions.IOStreams } @@ -171,7 +176,7 @@ func (f *ApplyFlags) AddFlags(cmd *cobra.Command) { cmd.Flags().BoolVarP(&f.DryRun, "dry-run", "", false, i18n.T("Preview the execution effect (always successful) without actually applying the changes")) cmd.Flags().BoolVarP(&f.Watch, "watch", "", true, i18n.T("After creating/updating/deleting the requested object, watch for changes")) cmd.Flags().IntVarP(&f.Timeout, "timeout", "", 0, i18n.T("The timeout duration for kusion apply command, measured in second(s)")) - cmd.Flags().IntVarP(&f.PortForward, "port-forward", "", 0, i18n.T("Forward the specified port from local to service")) + cmd.Flags().StringVarP(&f.PortForward, "port-forward", "", "", i18n.T("Forward the specified port from local machine to k8s pod or service")) } // ToOptions converts from CLI inputs to runtime inputs. @@ -182,6 +187,28 @@ func (f *ApplyFlags) ToOptions() (*ApplyOptions, error) { return nil, err } + // Convert local and K8s Pod or Service port number to be forwarded. + localPort, k8sPort := 0, 0 + if f.PortForward != "" { + ports := strings.Split(f.PortForward, ":") + + if len(ports) > 2 { + return nil, fmt.Errorf("invalid format of the port number to forward: %s, no more than 2 ports", f.PortForward) + } + + if localPort, err = strconv.Atoi(ports[0]); err != nil { + return nil, fmt.Errorf("invalid format of the port number to forward, %s cannot be changed into integer", ports[0]) + } + + if len(ports) == 1 { + k8sPort, _ = strconv.Atoi(ports[0]) + } else { + if k8sPort, err = strconv.Atoi(ports[1]); err != nil { + return nil, fmt.Errorf("invalid format of the port number to forward, %s cannot be changed into integer", ports[1]) + } + } + } + o := &ApplyOptions{ PreviewOptions: previewOptions, SpecFile: f.SpecFile, @@ -189,8 +216,9 @@ func (f *ApplyFlags) ToOptions() (*ApplyOptions, error) { DryRun: f.DryRun, Watch: f.Watch, Timeout: f.Timeout, - PortForward: f.PortForward, IOStreams: f.IOStreams, + LocalPort: localPort, + K8sPort: k8sPort, } return o, nil @@ -202,8 +230,12 @@ func (o *ApplyOptions) Validate(cmd *cobra.Command, args []string) error { return cmdutil.UsageErrorf(cmd, "Unexpected args: %v", args) } - if o.PortForward < 0 || o.PortForward > 65535 { - return cmdutil.UsageErrorf(cmd, "Invalid port number to forward: %d, must be between 1 and 65535", o.PortForward) + if o.LocalPort < 0 || o.LocalPort > 65535 { + return cmdutil.UsageErrorf(cmd, "Invalid port number to forward: %d, must be between 1 and 65535", o.LocalPort) + } + + if o.K8sPort < 0 || o.K8sPort > 65535 { + return cmdutil.UsageErrorf(cmd, "Invalid port number to forward: %d, must be between 1 and 65535", o.K8sPort) } if o.SpecFile != "" { @@ -429,7 +461,7 @@ func (o *ApplyOptions) run(rel *apiv1.Release, storage release.Storage) (err err } *rel = *updatedRel - if o.PortForward > 0 { + if o.LocalPort > 0 && o.K8sPort > 0 { fmt.Printf("\nStart port-forwarding ...\n") portForwarded = true if err = PortForward(o, rel.Spec); err != nil { @@ -815,8 +847,9 @@ func PortForward( // portforward operation wo := &operation.PortForwardOperation{} if err := wo.PortForward(&operation.PortForwardRequest{ - Spec: spec, - Port: o.PortForward, + Spec: spec, + LocalPort: o.LocalPort, + K8sPort: o.K8sPort, }); err != nil { return err } diff --git a/pkg/engine/operation/port_forward.go b/pkg/engine/operation/port_forward.go index ba1da1c51..6380bb6d3 100644 --- a/pkg/engine/operation/port_forward.go +++ b/pkg/engine/operation/port_forward.go @@ -38,8 +38,10 @@ type PortForwardOperation struct { type PortForwardRequest struct { models.Request - Spec *v1.Spec - Port int + Spec *v1.Spec + Port int + LocalPort int + K8sPort int } func (bpo *PortForwardOperation) PortForward(req *PortForwardRequest) error { @@ -50,7 +52,9 @@ func (bpo *PortForwardOperation) PortForward(req *PortForwardRequest) error { return err } - // Find Kubernetes Service in the resources of Spec. + // Find Kubernetes Namespace and Service in the resources of Spec. + var nsSpec *v1.Resource + var namespace *corev1.Namespace services := make(map[*v1.Resource]*corev1.Service) for _, res := range req.Spec.Resources { // Skip non-Kubernetes resources. @@ -73,6 +77,12 @@ func (bpo *PortForwardOperation) PortForward(req *PortForwardRequest) error { return fmt.Errorf("failed to decode yaml manifest into unstructured object: %v", err) } + if obj.GetKind() == convertor.Namespace { + nsSpec = &res + convertedObj := convertor.ToK8s(obj) + namespace = convertedObj.(*corev1.Namespace) + } + if obj.GetKind() != convertor.Service { continue } @@ -81,45 +91,12 @@ func (bpo *PortForwardOperation) PortForward(req *PortForwardRequest) error { services[&res] = convertedObj.(*corev1.Service) } - if len(services) == 0 { - return ErrEmptyService - } - - filteredServices := make(map[*v1.Resource]*corev1.Service) - for res, svc := range services { - targetPortFound := false - for _, port := range svc.Spec.Ports { - if port.Port == int32(req.Port) { - targetPortFound = true - continue - } - } - - if targetPortFound { - filteredServices[res] = svc - } - } - services = filteredServices - - if len(services) != 1 { - return ErrNotOneSvcWithTargetPort - } - - // Port-forward the Service with client-go. + // Port-forward the K8s Pod or Service with client-go. failed := make(chan error) - for res, svc := range services { - namespace := svc.GetNamespace() - serviceName := svc.GetName() - - var servicePort int - if req.Port == 0 { - // We will use the first port in Service if not specified. - servicePort = int(svc.Spec.Ports[0].Port) - } else { - servicePort = req.Port - } - - cfg, err := clientcmd.BuildConfigFromFlags("", kubeops.GetKubeConfig(res)) + if len(services) == 0 { + // Port-forward directly to K8s Pod, if there is only one Pod in the Application Resources. + // Fixme: only one Pod with the target port? + cfg, err := clientcmd.BuildConfigFromFlags("", kubeops.GetKubeConfig(nsSpec)) if err != nil { return err } @@ -129,17 +106,66 @@ func (bpo *PortForwardOperation) PortForward(req *PortForwardRequest) error { return err } + pods, err := clientset.CoreV1().Pods(namespace.Name).List(ctx, metav1.ListOptions{}) + + if len(pods.Items) != 1 { + return errors.New("only support one pod to port forward when no k8s service in application resources") + } + pod := pods.Items[0] + + fmt.Printf("Forwarding localhost port to targetPort of pod '%s' (%d:%d)\n", pod.Name, req.LocalPort, req.K8sPort) + go func() { - err = ForwardPort(ctx, cfg, clientset, namespace, serviceName, servicePort, servicePort) - failed <- err + failed <- forwardPodPort(cfg, clientset, pod.Namespace, pod.Name, req.K8sPort, req.LocalPort) }() + } else { + // Port-forward to K8s Service, if there is only one Service with the targeted port. + filteredServices := make(map[*v1.Resource]*corev1.Service) + for res, svc := range services { + targetPortFound := false + for _, port := range svc.Spec.Ports { + if port.Port == int32(req.K8sPort) { + targetPortFound = true + continue + } + } + + if targetPortFound { + filteredServices[res] = svc + } + } + services = filteredServices + + if len(services) != 1 { + return ErrNotOneSvcWithTargetPort + } + + for res, svc := range services { + serviceName := svc.GetName() + + cfg, err := clientcmd.BuildConfigFromFlags("", kubeops.GetKubeConfig(res)) + if err != nil { + return err + } + + clientset, err := kubernetes.NewForConfig(cfg) + if err != nil { + return err + } + + go func() { + err = forwardSvcPort(ctx, cfg, clientset, namespace.Name, serviceName, req.K8sPort, req.LocalPort) + failed <- err + }() + } } err := <-failed return err } -func ForwardPort( +// forwardSvcPort forwards the localhost port to K8s Service. +func forwardSvcPort( ctx context.Context, restConfig *rest.Config, clientset *kubernetes.Clientset, @@ -175,9 +201,19 @@ func ForwardPort( fmt.Printf("Forwarding localhost port to targetPort of pod '%s' selected by the service '%s' (%d:%d)\n", pod.Name, serviceName, localPort, servicePort) + return forwardPodPort(restConfig, clientset, pod.Namespace, pod.Name, servicePort, localPort) +} + +// forwardPodPort forwards the localhost port to K8s Pod. +func forwardPodPort( + restConfig *rest.Config, + clientset *kubernetes.Clientset, + namespace, podName string, + podPort, localPort int, +) error { // Build a URL for SPDY connection for port-forwarding. url := clientset.CoreV1().RESTClient().Post(). - Resource("pods").Namespace(pod.Namespace).Name(pod.Name). + Resource("pods").Namespace(namespace).Name(podName). SubResource("portforward").URL() transport, upgrader, err := spdy.RoundTripperFor(restConfig) @@ -185,7 +221,7 @@ func ForwardPort( return err } - ports := []string{fmt.Sprintf("%d:%d", localPort, servicePort)} + ports := []string{fmt.Sprintf("%d:%d", localPort, podPort)} dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url) stop, ready := make(chan struct{}, 1), make(chan struct{}) @@ -206,8 +242,11 @@ func validatePortForwardRequest(req *PortForwardRequest) error { if err := release.ValidateSpec(req.Spec); err != nil { return err } - if req.Port < 0 || req.Port > 65535 { - return fmt.Errorf("invalid port %d", req.Port) + if req.LocalPort < 0 || req.LocalPort > 65535 { + return fmt.Errorf("invalid local port number: %d", req.LocalPort) + } + if req.K8sPort < 0 || req.K8sPort > 65535 { + return fmt.Errorf("invalid k8s pod or service port number: %d", req.K8sPort) } return nil }