Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimize port forwarding for apply #1224

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 49 additions & 16 deletions pkg/cmd/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -83,8 +84,11 @@ var (
# Apply with the specified timeout duration for kusion apply command, measured in second(s)
kusion apply --timeout=120

# Apply with localhost port forwarding
kusion apply --port-forward=8080`)
# Apply with localhost port forwarding (local port number by default equals to k8s pod or service)
kusion apply --port-forward=8080

# Apply with specified localhost port forwarding (12345 at local to 8080 of k8s pod or service)
kusion apply --port-forward=12345:8080)`)
)

// To handle the release phase update when panic occurs.
Expand All @@ -111,7 +115,7 @@ type ApplyFlags struct {
DryRun bool
Watch bool
Timeout int
PortForward int
PortForward string

genericiooptions.IOStreams
}
Expand All @@ -120,12 +124,13 @@ type ApplyFlags struct {
type ApplyOptions struct {
*preview.PreviewOptions

SpecFile string
Yes bool
DryRun bool
Watch bool
Timeout int
PortForward int
SpecFile string
Yes bool
DryRun bool
Watch bool
Timeout int
LocalPort int
K8sPort int

genericiooptions.IOStreams
}
Expand Down Expand Up @@ -171,7 +176,7 @@ func (f *ApplyFlags) AddFlags(cmd *cobra.Command) {
cmd.Flags().BoolVarP(&f.DryRun, "dry-run", "", false, i18n.T("Preview the execution effect (always successful) without actually applying the changes"))
cmd.Flags().BoolVarP(&f.Watch, "watch", "", true, i18n.T("After creating/updating/deleting the requested object, watch for changes"))
cmd.Flags().IntVarP(&f.Timeout, "timeout", "", 0, i18n.T("The timeout duration for kusion apply command, measured in second(s)"))
cmd.Flags().IntVarP(&f.PortForward, "port-forward", "", 0, i18n.T("Forward the specified port from local to service"))
cmd.Flags().StringVarP(&f.PortForward, "port-forward", "", "", i18n.T("Forward the specified port from local machine to k8s pod or service"))
}

// ToOptions converts from CLI inputs to runtime inputs.
Expand All @@ -182,15 +187,38 @@ func (f *ApplyFlags) ToOptions() (*ApplyOptions, error) {
return nil, err
}

// Convert local and K8s Pod or Service port number to be forwarded.
localPort, k8sPort := 0, 0
if f.PortForward != "" {
ports := strings.Split(f.PortForward, ":")

if len(ports) > 2 {
return nil, fmt.Errorf("invalid format of the port number to forward: %s, no more than 2 ports", f.PortForward)
}

if localPort, err = strconv.Atoi(ports[0]); err != nil {
return nil, fmt.Errorf("invalid format of the port number to forward, %s cannot be changed into integer", ports[0])
}

if len(ports) == 1 {
k8sPort, _ = strconv.Atoi(ports[0])
} else {
if k8sPort, err = strconv.Atoi(ports[1]); err != nil {
return nil, fmt.Errorf("invalid format of the port number to forward, %s cannot be changed into integer", ports[1])
}
}
}

o := &ApplyOptions{
PreviewOptions: previewOptions,
SpecFile: f.SpecFile,
Yes: f.Yes,
DryRun: f.DryRun,
Watch: f.Watch,
Timeout: f.Timeout,
PortForward: f.PortForward,
IOStreams: f.IOStreams,
LocalPort: localPort,
K8sPort: k8sPort,
}

return o, nil
Expand All @@ -202,8 +230,12 @@ func (o *ApplyOptions) Validate(cmd *cobra.Command, args []string) error {
return cmdutil.UsageErrorf(cmd, "Unexpected args: %v", args)
}

if o.PortForward < 0 || o.PortForward > 65535 {
return cmdutil.UsageErrorf(cmd, "Invalid port number to forward: %d, must be between 1 and 65535", o.PortForward)
if o.LocalPort < 0 || o.LocalPort > 65535 {
return cmdutil.UsageErrorf(cmd, "Invalid port number to forward: %d, must be between 1 and 65535", o.LocalPort)
}

if o.K8sPort < 0 || o.K8sPort > 65535 {
return cmdutil.UsageErrorf(cmd, "Invalid port number to forward: %d, must be between 1 and 65535", o.K8sPort)
}

if o.SpecFile != "" {
Expand Down Expand Up @@ -429,7 +461,7 @@ func (o *ApplyOptions) run(rel *apiv1.Release, storage release.Storage) (err err
}
*rel = *updatedRel

if o.PortForward > 0 {
if o.LocalPort > 0 && o.K8sPort > 0 {
fmt.Printf("\nStart port-forwarding ...\n")
portForwarded = true
if err = PortForward(o, rel.Spec); err != nil {
Expand Down Expand Up @@ -815,8 +847,9 @@ func PortForward(
// portforward operation
wo := &operation.PortForwardOperation{}
if err := wo.PortForward(&operation.PortForwardRequest{
Spec: spec,
Port: o.PortForward,
Spec: spec,
LocalPort: o.LocalPort,
K8sPort: o.K8sPort,
}); err != nil {
return err
}
Expand Down
135 changes: 87 additions & 48 deletions pkg/engine/operation/port_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@

type PortForwardRequest struct {
models.Request
Spec *v1.Spec
Port int
Spec *v1.Spec
Port int
LocalPort int
K8sPort int
}

func (bpo *PortForwardOperation) PortForward(req *PortForwardRequest) error {
Expand All @@ -50,7 +52,9 @@
return err
}

// Find Kubernetes Service in the resources of Spec.
// Find Kubernetes Namespace and Service in the resources of Spec.
var nsSpec *v1.Resource
var namespace *corev1.Namespace
services := make(map[*v1.Resource]*corev1.Service)
for _, res := range req.Spec.Resources {
// Skip non-Kubernetes resources.
Expand All @@ -73,6 +77,12 @@
return fmt.Errorf("failed to decode yaml manifest into unstructured object: %v", err)
}

if obj.GetKind() == convertor.Namespace {
nsSpec = &res

Check failure on line 81 in pkg/engine/operation/port_forward.go

View workflow job for this annotation

GitHub Actions / Golang Lint

exporting a pointer for the loop variable res (exportloopref)
convertedObj := convertor.ToK8s(obj)
namespace = convertedObj.(*corev1.Namespace)
}

if obj.GetKind() != convertor.Service {
continue
}
Expand All @@ -81,45 +91,12 @@
services[&res] = convertedObj.(*corev1.Service)
}

if len(services) == 0 {
return ErrEmptyService
}

filteredServices := make(map[*v1.Resource]*corev1.Service)
for res, svc := range services {
targetPortFound := false
for _, port := range svc.Spec.Ports {
if port.Port == int32(req.Port) {
targetPortFound = true
continue
}
}

if targetPortFound {
filteredServices[res] = svc
}
}
services = filteredServices

if len(services) != 1 {
return ErrNotOneSvcWithTargetPort
}

// Port-forward the Service with client-go.
// Port-forward the K8s Pod or Service with client-go.
failed := make(chan error)
for res, svc := range services {
namespace := svc.GetNamespace()
serviceName := svc.GetName()

var servicePort int
if req.Port == 0 {
// We will use the first port in Service if not specified.
servicePort = int(svc.Spec.Ports[0].Port)
} else {
servicePort = req.Port
}

cfg, err := clientcmd.BuildConfigFromFlags("", kubeops.GetKubeConfig(res))
if len(services) == 0 {
// Port-forward directly to K8s Pod, if there is only one Pod in the Application Resources.
// Fixme: only one Pod with the target port?
cfg, err := clientcmd.BuildConfigFromFlags("", kubeops.GetKubeConfig(nsSpec))
if err != nil {
return err
}
Expand All @@ -129,17 +106,66 @@
return err
}

pods, err := clientset.CoreV1().Pods(namespace.Name).List(ctx, metav1.ListOptions{})

Check failure on line 109 in pkg/engine/operation/port_forward.go

View workflow job for this annotation

GitHub Actions / Golang Lint

ineffectual assignment to err (ineffassign)

if len(pods.Items) != 1 {
return errors.New("only support one pod to port forward when no k8s service in application resources")
}
pod := pods.Items[0]

fmt.Printf("Forwarding localhost port to targetPort of pod '%s' (%d:%d)\n", pod.Name, req.LocalPort, req.K8sPort)

go func() {
err = ForwardPort(ctx, cfg, clientset, namespace, serviceName, servicePort, servicePort)
failed <- err
failed <- forwardPodPort(cfg, clientset, pod.Namespace, pod.Name, req.K8sPort, req.LocalPort)
}()
} else {
// Port-forward to K8s Service, if there is only one Service with the targeted port.
filteredServices := make(map[*v1.Resource]*corev1.Service)
for res, svc := range services {
targetPortFound := false
for _, port := range svc.Spec.Ports {
if port.Port == int32(req.K8sPort) {
targetPortFound = true
continue
}
}

if targetPortFound {
filteredServices[res] = svc
}
}
services = filteredServices

if len(services) != 1 {
return ErrNotOneSvcWithTargetPort
}

for res, svc := range services {
serviceName := svc.GetName()

cfg, err := clientcmd.BuildConfigFromFlags("", kubeops.GetKubeConfig(res))
if err != nil {
return err
}

clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
return err
}

go func() {
err = forwardSvcPort(ctx, cfg, clientset, namespace.Name, serviceName, req.K8sPort, req.LocalPort)
failed <- err
}()
}
}

err := <-failed
return err
}

func ForwardPort(
// forwardSvcPort forwards the localhost port to K8s Service.
func forwardSvcPort(
ctx context.Context,
restConfig *rest.Config,
clientset *kubernetes.Clientset,
Expand Down Expand Up @@ -175,17 +201,27 @@
fmt.Printf("Forwarding localhost port to targetPort of pod '%s' selected by the service '%s' (%d:%d)\n",
pod.Name, serviceName, localPort, servicePort)

return forwardPodPort(restConfig, clientset, pod.Namespace, pod.Name, servicePort, localPort)
}

// forwardPodPort forwards the localhost port to K8s Pod.
func forwardPodPort(
restConfig *rest.Config,
clientset *kubernetes.Clientset,
namespace, podName string,
podPort, localPort int,
) error {
// Build a URL for SPDY connection for port-forwarding.
url := clientset.CoreV1().RESTClient().Post().
Resource("pods").Namespace(pod.Namespace).Name(pod.Name).
Resource("pods").Namespace(namespace).Name(podName).
SubResource("portforward").URL()

transport, upgrader, err := spdy.RoundTripperFor(restConfig)
if err != nil {
return err
}

ports := []string{fmt.Sprintf("%d:%d", localPort, servicePort)}
ports := []string{fmt.Sprintf("%d:%d", localPort, podPort)}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url)

stop, ready := make(chan struct{}, 1), make(chan struct{})
Expand All @@ -206,8 +242,11 @@
if err := release.ValidateSpec(req.Spec); err != nil {
return err
}
if req.Port < 0 || req.Port > 65535 {
return fmt.Errorf("invalid port %d", req.Port)
if req.LocalPort < 0 || req.LocalPort > 65535 {
return fmt.Errorf("invalid local port number: %d", req.LocalPort)
}
if req.K8sPort < 0 || req.K8sPort > 65535 {
return fmt.Errorf("invalid k8s pod or service port number: %d", req.K8sPort)
}
return nil
}
Loading