diff --git a/charts/values.yaml b/charts/values.yaml index f3f39c503..ba163a2c1 100644 --- a/charts/values.yaml +++ b/charts/values.yaml @@ -34,6 +34,10 @@ feature: tunnelIpv6Subnet: "fd11::/112" ## @param feature.tunnelDetectMethod Tunnel base on which interface [`defaultRouteInterface`, `interface=eth0`] tunnelDetectMethod: "defaultRouteInterface" + ## @param feature.replyRouteTable host Reply routing table number on gateway node + replyRouteTable: 600 + ## @param feature.replyRouteTable host iptables mark for reply packet + replyRouteMark: 39 iptables: ## @param feature.iptables.backendMode Iptables mode can be specified as `nft` or `legacy`, with `auto` meaning automatic detection. The default value is `auto`. backendMode: "auto" diff --git a/pkg/agent/police.go b/pkg/agent/police.go index bef574a72..a14654f77 100644 --- a/pkg/agent/police.go +++ b/pkg/agent/police.go @@ -275,9 +275,9 @@ func (r *policeReconciler) initApplyPolicy() error { if _, ok := r.ipsetMap.Load(name); !ok { err = r.ipset.DestroySet(name) - if err != nil { - r.log.Error(err, "clean ipset", "ipset", name) - } + // if err != nil { + // r.log.Error(err, "clean ipset", "ipset", name) + // } } } @@ -449,6 +449,54 @@ func (r *policeReconciler) getPolicySrcIPs(policyNs, policyName string, filter f return ipv4List, ipv6List, nil } +func (r *policeReconciler) buildReplyRules(policyNs, policyName string) ([]egressv1.EgressEndpoint, error) { + ctx := context.Background() + selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ + MatchLabels: map[string]string{egressv1.LabelPolicyName: policyName}, + }) + if err != nil { + return nil, err + } + opt := &client.ListOptions{LabelSelector: selector} + + notEgressNodeEPs := make([]egressv1.EgressEndpoint, 0) + + if policyNs == "" { + eps := new(egressv1.EgressClusterEndpointSliceList) + err = r.client.List(ctx, eps, opt) + if err != nil { + return nil, err + } + for _, ep := range eps.Items { + if ep.DeletionTimestamp.IsZero() { + for _, e := range ep.Endpoints { + if e.Node != r.cfg.EnvConfig.NodeName { + notEgressNodeEPs = append(notEgressNodeEPs, e) + + } + } + } + } + } else { + eps := new(egressv1.EgressEndpointSliceList) + err = r.client.List(ctx, eps, opt) + if err != nil { + return nil, err + } + for _, ep := range eps.Items { + if ep.DeletionTimestamp.IsZero() { + for _, e := range ep.Endpoints { + if e.Node != r.cfg.EnvConfig.NodeName { + notEgressNodeEPs = append(notEgressNodeEPs, e) + } + } + } + } + } + + return nil, nil +} + func buildEipRule(policyName string, eip IP, version uint8, isIgnoreInternalCIDR bool) *iptables.Rule { if eip.V4 == "" && eip.V6 == "" { return nil diff --git a/pkg/agent/route/route.go b/pkg/agent/route/route.go index b48569e82..b3d4a8f75 100644 --- a/pkg/agent/route/route.go +++ b/pkg/agent/route/route.go @@ -68,14 +68,14 @@ func (r *RuleRoute) Ensure(linkName string, ipv4, ipv6 *net.IP, table int, mark log := r.log.WithValues("linkName", linkName, "table", table, "mark", mark) if ipv4 != nil { - err := r.ensureRule(netlink.FAMILY_V4, table, mark, log) + err := r.EnsureRule(netlink.FAMILY_V4, table, mark, log) if err != nil { return err } } if ipv6 != nil { - err := r.ensureRule(netlink.FAMILY_V6, table, mark, log) + err := r.EnsureRule(netlink.FAMILY_V6, table, mark, log) if err != nil { return err } @@ -139,7 +139,7 @@ func (r *RuleRoute) ensureRoute(link netlink.Link, ip *net.IP, family int, table return nil } -func (r *RuleRoute) ensureRule(family int, table int, mark int, log logr.Logger) error { +func (r *RuleRoute) EnsureRule(family int, table int, mark int, log logr.Logger) error { log = log.WithValues("family", family) log.V(1).Info("ensure rule") @@ -190,3 +190,84 @@ func (r *RuleRoute) ensureRule(family int, table int, mark int, log logr.Logger) } return nil } + +func (r *RuleRoute) ensureReplyRoute(link netlink.Link, ip, via net.IP, family int, table int, log logr.Logger) error { + log = log.WithValues("family", family, "ip", ip) + log.V(1).Info("ensure route") + + if ip == nil { + return nil + } + + routeFilter := &netlink.Route{Table: table} + routes, err := netlink.RouteListFiltered(family, routeFilter, netlink.RT_FILTER_TABLE) + if err != nil { + return err + } + + var find bool + for _, route := range routes { + if route.Table == table { + if route.Dst.String() == ip.String() { + if route.Via.String() != via.String() { + log.Info("delete route", "route", route.String()) + err := netlink.RouteDel(&route) + if err != nil { + return err + } + continue + } + find = true + } + } + } + + if !find { + index := link.Attrs().Index + err = netlink.RouteAdd(&netlink.Route{LinkIndex: index, Dst: &net.IPNet{IP: ip, Mask: net.CIDRMask(1, 32)}, Gw: via, Table: table}) + if err != nil { + return err + } + } + + return nil +} + +func (r *RuleRoute) EnsureReplyRoute(linkName string, ipv4, ipv6, viaIpv4, viaIpv6 net.IP, table int, mark int) error { + if mark == 0 { + return nil + } + + log := r.log.WithValues("linkName", linkName, "table", table, "mark", mark) + + if ipv4 != nil { + err := r.EnsureRule(netlink.FAMILY_V4, table, mark, log) + if err != nil { + return err + } + } + + if ipv6 != nil { + err := r.EnsureRule(netlink.FAMILY_V6, table, mark, log) + if err != nil { + return err + } + } + + link, err := netlink.LinkByName(linkName) + if err != nil { + return err + } + + log.V(1).Info("get link") + + err = r.ensureReplyRoute(link, ipv4, viaIpv4, netlink.FAMILY_V4, table, log) + if err != nil { + return err + } + err = r.ensureReplyRoute(link, ipv6, viaIpv6, netlink.FAMILY_V6, table, log) + if err != nil { + return err + } + return nil +} diff --git a/pkg/agent/vxlan.go b/pkg/agent/vxlan.go index f3cf0389b..4129b8ffd 100644 --- a/pkg/agent/vxlan.go +++ b/pkg/agent/vxlan.go @@ -14,6 +14,7 @@ import ( "github.com/go-logr/logr" "github.com/vishvananda/netlink" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -49,6 +50,11 @@ type VTEP struct { MAC net.HardwareAddr } +type replyRoute struct { + tunnelIP net.IP + linkIndex int +} + func (r *vxlanReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { kind, newReq, err := utils.ParseKindWithReq(req) if err != nil { @@ -61,11 +67,292 @@ func (r *vxlanReconciler) Reconcile(ctx context.Context, req reconcile.Request) return r.reconcileEgressTunnel(ctx, newReq, log) case "EgressGateway": return r.reconcileEgressGateway(ctx, newReq, log) + case "EgressEndpointSlice": + return r.reconcileEgressEndpointSlice(ctx, newReq, log) + case "EgressClusterEndpointSlice": + return r.reconcileEgressClusterEndpointSlice(ctx, newReq, log) default: return reconcile.Result{}, nil } } +func (r *vxlanReconciler) reconcileEgressEndpointSlice(ctx context.Context, req reconcile.Request, log logr.Logger) (reconcile.Result, error) { + if err := r.keepReplayRoute(log); err != nil { + return reconcile.Result{Requeue: true}, err + } + + return reconcile.Result{}, nil +} + +func (r *vxlanReconciler) reconcileEgressClusterEndpointSlice(ctx context.Context, req reconcile.Request, log logr.Logger) (reconcile.Result, error) { + if err := r.keepReplayRoute(log); err != nil { + return reconcile.Result{Requeue: true}, err + } + + return reconcile.Result{}, nil +} + +func (r *vxlanReconciler) keepReplayRoute(log logr.Logger) error { + // 确保策略路由存在 + // 从主机上获取路由信息,以两个 Map 分别保留IPV4、IPV6,key PodIP,value 为所有构建路由需要的值(PodTunnelIP、table、Mark) + // list 所有的 policy,然后构建成最新的 newMap + // 两个 Map 进行对比,找出需要删除及新增的路由 + + table := r.cfg.FileConfig.ReplyRouteTable + mark := r.cfg.FileConfig.ReplyRouteMark + ipv4RouteMap := make(map[string]replyRoute, 0) + ipv6RouteMap := make(map[string]replyRoute, 0) + hostIPV4RouteMap := make(map[string]replyRoute, 0) + hostIPV6RouteMap := make(map[string]replyRoute, 0) + ctx := context.Background() + link, err := netlink.LinkByName(r.cfg.FileConfig.VXLAN.Name) + if err != nil { + return err + } + index := link.Attrs().Index + + // 确保策略路由存在 + if r.cfg.FileConfig.EnableIPv4 { + err := r.ruleRoute.EnsureRule(netlink.FAMILY_V4, table, mark, r.log) + if err != nil { + log.Error(err, "failed to set the routing rule") + return err + } + } + + if r.cfg.FileConfig.EnableIPv6 { + err := r.ruleRoute.EnsureRule(netlink.FAMILY_V6, table, mark, r.log) + if err != nil { + log.Error(err, "failed to set the routing rule") + return err + } + } + + log.Info("---", "r.cfg.EnvConfig.NodeName=", r.cfg.EnvConfig.NodeName) + + // 获取最新的路由信息 + egpList := new(egressv1.EgressPolicyList) + if err := r.client.List(ctx, egpList); err != nil { + log.Error(err, "list EgressPolicyList failed") + return err + } + for _, egp := range egpList.Items { + log.Info("---1", " egp.Status.Node=", egp.Status.Node, "name=", egp.Name) + if egp.Status.Node == r.cfg.EnvConfig.NodeName { + selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ + MatchLabels: map[string]string{egressv1.LabelPolicyName: egp.Name}, + }) + if err != nil { + log.Error(err, "failed to build LabelSelector; ", "key: ", egressv1.LabelPolicyName, " value: ", egp.Name) + continue + } + + opt := &client.ListOptions{LabelSelector: selector} + egenps := new(egressv1.EgressEndpointSliceList) + err = r.client.List(ctx, egenps, opt) + if err != nil { + log.Error(err, "list EgressEndpointSliceList failed;", " egpName=", egp.Name) + continue + } + + log.Info("---2", " egenps.Items len=", len(egenps.Items)) + for _, egep := range egenps.Items { + for _, ep := range egep.Endpoints { + log.Info("---3", " ep.Node=", ep.Node) + if ep.Node != r.cfg.EnvConfig.NodeName { + if r.cfg.FileConfig.EnableIPv4 { + if tunnelIP, ok := r.peerMap.Load(ep.Node); ok { + for _, v := range ep.IPv4 { + log.Info("---", " ipv4RouteMap[%v]", v) + ipv4RouteMap[v] = replyRoute{tunnelIP: *tunnelIP.IPv4} + } + } else { + log.Info("---4 r.peerMap.Load(ep.Node) = nil") + } + } + + if r.cfg.FileConfig.EnableIPv6 { + if tunnelIP, ok := r.peerMap.Load(ep.Node); ok { + for _, v := range ep.IPv6 { + ipv6RouteMap[v] = replyRoute{tunnelIP: *tunnelIP.IPv6} + } + } + } + } + } + + } + } + } + + egcpList := new(egressv1.EgressClusterPolicyList) + if err := r.client.List(ctx, egcpList); err != nil { + log.Error(err, "list EgressClusterPolicyList failed") + return err + } + for _, egcp := range egcpList.Items { + if egcp.Status.Node == r.cfg.EnvConfig.NodeName { + selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ + MatchLabels: map[string]string{egressv1.LabelPolicyName: egcp.Name}, + }) + if err != nil { + log.Error(err, "failed to build LabelSelector; ", "key: ", egressv1.LabelPolicyName, "value: ", egcp.Name) + continue + } + + opt := &client.ListOptions{LabelSelector: selector} + egcenps := new(egressv1.EgressClusterEndpointSliceList) + err = r.client.List(ctx, egcenps, opt) + if err != nil { + log.Error(err, "list EgressClusterEndpointSliceList failed;", " egcpName=", egcp.Name) + continue + } + + for _, egcep := range egcenps.Items { + for _, ep := range egcep.Endpoints { + if ep.Node != r.cfg.EnvConfig.NodeName { + if r.cfg.FileConfig.EnableIPv4 { + if tunnelIP, ok := r.peerMap.Load(ep.Node); ok { + for _, v := range ep.IPv4 { + log.Info("---", "ipv4RouteMap[%v]", v) + ipv4RouteMap[v] = replyRoute{tunnelIP: *tunnelIP.IPv4} + } + } + } + + if r.cfg.FileConfig.EnableIPv6 { + if tunnelIP, ok := r.peerMap.Load(ep.Node); ok { + for _, v := range ep.IPv6 { + log.Info("---", "ipv6RouteMap[%v]", v) + ipv6RouteMap[v] = replyRoute{tunnelIP: *tunnelIP.IPv6} + } + } + } + } + } + } + } + } + + // 获取主机上对应的路由表信息 + routeFilter := &netlink.Route{Table: table} + ipV4Routes, err := netlink.RouteListFiltered(netlink.FAMILY_V4, routeFilter, netlink.RT_FILTER_TABLE) + if err != nil { + log.Error(err, "Failed to obtain the IPv4 route of the host") + return err + } + ipV6Routes, err := netlink.RouteListFiltered(netlink.FAMILY_V4, routeFilter, netlink.RT_FILTER_TABLE) + if err != nil { + log.Error(err, "Failed to obtain the IPv6 route of the host") + return err + } + + // 将主机路由填充到 map,方便后续比较 + for _, route := range ipV4Routes { + if route.Table == table { + log.Info("---", "hostIPV4RouteMap[%v]", route.Dst.IP.String()) + hostIPV4RouteMap[route.Dst.IP.String()] = replyRoute{tunnelIP: route.Gw, linkIndex: route.LinkIndex} + } + } + for _, route := range ipV6Routes { + if route.Table == table { + log.Info("---", "hostIPV6RouteMap[%v]", route.Dst.IP.String()) + hostIPV6RouteMap[route.Dst.IP.String()] = replyRoute{tunnelIP: route.Gw, linkIndex: route.LinkIndex} + } + } + + // IPV4 + if r.cfg.FileConfig.EnableIPv4 { + // 删除主机上多余或者不正确的路由 + for k, v := range hostIPV4RouteMap { + route := &netlink.Route{LinkIndex: index, Dst: &net.IPNet{IP: net.ParseIP(k).To4(), Mask: net.CIDRMask(32, 32)}, Gw: v.tunnelIP, Table: table} + if _, ok := ipv4RouteMap[k]; !ok { + err = netlink.RouteDel(route) + if err != nil { + log.Error(err, "failed to delete route; ", "route=", route) + continue + } + } else { + if v.tunnelIP.String() != ipv4RouteMap[k].tunnelIP.String() || index != v.linkIndex { + err = netlink.RouteDel(route) + if err != nil { + log.Error(err, "failed to delete route; ", "route=", route) + continue + } + + route.ILinkIndex = index + route.Dst = &net.IPNet{IP: net.ParseIP(k).To4(), Mask: net.CIDRMask(32, 32)} + route.Gw = ipv4RouteMap[k].tunnelIP + err = netlink.RouteAdd(route) + if err != nil { + log.Error(err, "failed to add route; ", "route=", route) + continue + } + } + } + } + + // 添加主机上缺少的路由 + for k, v := range ipv4RouteMap { + if _, ok := hostIPV4RouteMap[k]; !ok { + route := &netlink.Route{LinkIndex: index, Dst: &net.IPNet{IP: net.ParseIP(k).To4(), Mask: net.CIDRMask(32, 32)}, Gw: v.tunnelIP, Table: table} + err = netlink.RouteAdd(route) + r.log.Info("---", "add route; route=%v", route) + if err != nil { + log.Error(err, "failed to add route; ", "route=", route) + continue + } + } + } + } + + // IPV6 + if r.cfg.FileConfig.EnableIPv6 { + for k, v := range hostIPV6RouteMap { + route := &netlink.Route{LinkIndex: index, Dst: &net.IPNet{IP: net.ParseIP(k).To16(), Mask: net.CIDRMask(128, 128)}, Gw: v.tunnelIP, Table: table} + if _, ok := ipv6RouteMap[k]; !ok { + err = netlink.RouteDel(route) + if err != nil { + log.Error(err, "failed to delete route; ", "route=", route) + continue + } + } else { + if v.tunnelIP.String() != ipv6RouteMap[k].tunnelIP.String() || index != v.linkIndex { + err = netlink.RouteDel(route) + if err != nil { + log.Error(err, "failed to delete route; ", "route=", route) + continue + } + + route.ILinkIndex = index + route.Dst = &net.IPNet{IP: net.ParseIP(k).To16(), Mask: net.CIDRMask(128, 128)} + route.Gw = ipv6RouteMap[k].tunnelIP + err = netlink.RouteAdd(route) + if err != nil { + log.Error(err, "failed to add route; ", "route=", route) + continue + } + } + } + } + + // 添加主机上缺少的路由 + for k, v := range ipv6RouteMap { + if _, ok := hostIPV6RouteMap[k]; !ok { + route := &netlink.Route{LinkIndex: index, Dst: &net.IPNet{IP: net.ParseIP(k).To16(), Mask: net.CIDRMask(1, 128)}, Gw: v.tunnelIP, Table: table} + err = netlink.RouteAdd(route) + log.Info("---", "--- add route", *route) + if err != nil { + log.Error(err, "failed to add route; ", "route=", route) + continue + } + } + } + } + + return nil +} + func (r *vxlanReconciler) reconcileEgressGateway(ctx context.Context, req reconcile.Request, log logr.Logger) (reconcile.Result, error) { egressTunnelMap, err := r.getEgressTunnelByEgressGateway(ctx, req.Name) if err != nil { @@ -486,6 +773,26 @@ func (r *vxlanReconciler) ensureRoute() error { return nil } +func (r *vxlanReconciler) initTunnelPeerMap() error { + list := &egressv1.EgressTunnelList{} + ctx := context.Background() + err := r.client.List(ctx, list) + if err != nil { + return err + } + + for _, item := range list.Items { + if item.Status.Phase == egressv1.EgressTunnelReady { + vtep := r.parseVTEP(item.Status) + if vtep != nil { + r.peerMap.Store(r.cfg.EnvConfig.NodeName, *vtep) + } + } + } + + return nil +} + func parseMarkToInt(mark string) (int, error) { tmp := strings.ReplaceAll(mark, "0x", "") i64, err := strconv.ParseInt(tmp, 16, 32) @@ -521,6 +828,11 @@ func newEgressTunnelController(mgr manager.Manager, cfg *config.Config, log logr r.getParent = vxlan.GetParentByDefaultRoute(netLink) } + err := r.initTunnelPeerMap() + if err != nil { + return fmt.Errorf("failed to get EgressTunnel: %w", err) + } + c, err := controller.New("vxlan", mgr, controller.Options{Reconciler: r}) if err != nil { return err @@ -536,7 +848,24 @@ func newEgressTunnelController(mgr manager.Manager, cfg *config.Config, log logr return fmt.Errorf("failed to watch EgressGateway: %w", err) } + if err := c.Watch( + source.Kind(mgr.GetCache(), &egressv1.EgressEndpointSlice{}), + handler.EnqueueRequestsFromMapFunc(utils.KindToMapFlat("EgressEndpointSlice")), + epSlicePredicate{}, + ); err != nil { + return fmt.Errorf("failed to watch EgressEndpointSlice: %w", err) + } + + if err := c.Watch( + source.Kind(mgr.GetCache(), &egressv1.EgressClusterEndpointSlice{}), + handler.EnqueueRequestsFromMapFunc(utils.KindToMapFlat("EgressClusterEndpointSlice")), + epSlicePredicate{}, + ); err != nil { + return fmt.Errorf("failed to watch EgressClusterEndpointSlice: %w", err) + } + go r.keepVXLAN() + // go r.keepReplayRoute(log) return nil } diff --git a/pkg/config/config.go b/pkg/config/config.go index 8c8d33819..d454bc5a0 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -80,6 +80,8 @@ type FileConfig struct { Mark string `yaml:"mark"` AnnouncedInterfacesToExclude []string `yaml:"announcedInterfacesToExclude"` AnnounceExcludeRegexp *regexp.Regexp `json:"-"` + ReplyRouteTable int `yaml:"replyRouteTable"` + ReplyRouteMark int `yaml:"replyRouteMark"` } const TunnelInterfaceDefaultRoute = "defaultRouteInterface"