diff --git a/pkg/agent/police.go b/pkg/agent/police.go index c6e0cf5e9..b4a335f5c 100644 --- a/pkg/agent/police.go +++ b/pkg/agent/police.go @@ -81,6 +81,8 @@ func (r *policeReconciler) Reconcile(ctx context.Context, req reconcile.Request) res, err = r.reconcilePolicy(ctx, newReq, log) case "EgressClusterInfo": res, err = r.reconcileClusterInfo(ctx, newReq, log) + case "EgressTunnel": + res, err = r.reconcileTunnel(ctx, newReq, log) default: return reconcile.Result{}, nil } @@ -91,6 +93,7 @@ type PolicyCommon struct { NodeName string DestSubnet []string IP IP + UseNodeIP bool } type IP struct { @@ -132,10 +135,15 @@ func (r *policeReconciler) initApplyPolicy() error { if list.Name == r.cfg.NodeName { isEgressNode = true for _, eip := range list.Eips { + useNodeIP := false + if eip.IPv4 == "" && eip.IPv6 == "" { + useNodeIP = true + } for _, policy := range eip.Policies { snatPolicies[policy] = &PolicyCommon{ - NodeName: list.Name, - IP: IP{V4: eip.IPv4, V6: eip.IPv6}, + NodeName: list.Name, + IP: IP{V4: eip.IPv4, V6: eip.IPv6}, + UseNodeIP: useNodeIP, } } } @@ -197,20 +205,6 @@ func (r *policeReconciler) initApplyPolicy() error { } } - // add forward rules for replay packet on gateway node, which should be enabled for spiderpool - //if isEgressNode && r.cfg.FileConfig.EnableGatewayReplyRoute { - // gatewayReplyRouteMark := r.cfg.FileConfig.GatewayReplyRouteMark - // dev := r.cfg.FileConfig.VXLAN.Name - // - // for _, table := range r.mangleTables { - // table.UpdateChain(&iptables.Chain{Name: "EGRESSGATEWAY-REPLY-ROUTING"}) - // chainMapRules := buildReplyRouteIptables(uint32(gatewayReplyRouteMark), dev) - // for chain, rules := range chainMapRules { - // table.InsertOrAppendRules(chain, rules) - // } - // } - //} - for _, table := range r.mangleTables { rules := make([]iptables.Rule, 0) for policy, val := range unSnatPolicies { @@ -242,10 +236,37 @@ func (r *policeReconciler) initApplyPolicy() error { Name: "EGRESSGATEWAY-MARK-REQUEST", Rules: rules, }) + + tunnels := new(egressv1.EgressTunnelList) + err := r.client.List(ctx, tunnels) + if err != nil { + return err + } + rules = make([]iptables.Rule, 0) + for _, tunnel := range tunnels.Items { + if tunnel.Name == r.cfg.NodeName { + continue + } + if tunnel.Status.Mark == "" || tunnel.Status.Tunnel.MAC == "" { + continue + } + rule, err := buildPreroutingReplyRouting(r.cfg.FileConfig.VXLAN.Name, baseMark, tunnel.Status.Mark, tunnel.Status.Tunnel.MAC) + if err != nil { + return err + } + rules = append(rules, rule...) + } + restore := iptables.Rule{ + Match: iptables.MatchCriteria{}.CTDirectionOriginal(iptables.DirectionReply), + Action: iptables.RestoreConnMarkAction{RestoreMask: 0xffffffff}, + Comment: []string{ + "label for restoring connections, rule is from the EgressGateway", + }, + } + rules = append(rules, restore) table.UpdateChain(&iptables.Chain{ - Name: "EGRESSGATEWAY-REPLY-ROUTING", - Rules: buildPreroutingReplyRouting(r.cfg.FileConfig.VXLAN.Name, - uint32(r.cfg.FileConfig.GatewayReplyRouteMark)), + Name: "EGRESSGATEWAY-REPLY-ROUTING", + Rules: rules, }) } @@ -262,7 +283,7 @@ func (r *policeReconciler) initApplyPolicy() error { isIgnoreInternalCIDR = true } - rule := buildEipRule(policyName, val.IP, table.IPVersion, isIgnoreInternalCIDR) + rule := buildEipRule(policyName, val.IP, table.IPVersion, isIgnoreInternalCIDR, val.UseNodeIP) if rule != nil { rules = append(rules, *rule) } @@ -475,11 +496,7 @@ func (r *policeReconciler) getPolicySrcIPs(policyNs, policyName string, filter f return ipv4List, ipv6List, nil } -func buildEipRule(policyName string, eip IP, version uint8, isIgnoreInternalCIDR bool) *iptables.Rule { - if (version == 4 && eip.V4 == "") || (version == 6 && eip.V6 == "") { - return nil - } - +func buildEipRule(policyName string, eip IP, version uint8, isIgnoreInternalCIDR bool, useNodeIP bool) *iptables.Rule { tmp := "v4-" ip := eip.V4 ignoreName := EgressClusterCIDRIPv4 @@ -499,10 +516,26 @@ func buildEipRule(policyName string, eip IP, version uint8, isIgnoreInternalCIDR CTDirectionOriginal(iptables.DirectionOriginal) } - action := iptables.SNATAction{ToAddr: ip} + var action iptables.Action + action = iptables.SNATAction{ToAddr: ip} + if useNodeIP { + fmt.Println("use node ip rule") + fmt.Println("use node ip rule") + fmt.Println("use node ip rule") + fmt.Println("use node ip rule") + fmt.Println("use node ip rule") + + action = iptables.MasqAction{} + } rule := &iptables.Rule{Match: matchCriteria, Action: action, Comment: []string{ fmt.Sprintf("snat policy %s", policyName), }} + + if useNodeIP { + fmt.Println("use node ip rule") + fmt.Println(rule) + } + return rule } @@ -732,6 +765,15 @@ func (r *policeReconciler) reconcileGateway(ctx context.Context, req reconcile.R return reconcile.Result{}, nil } +func (r *policeReconciler) reconcileTunnel(ctx context.Context, req reconcile.Request, log logr.Logger) (reconcile.Result, error) { + log.V(1).Info("reconciling") + err := r.initApplyPolicy() + if err != nil { + return reconcile.Result{Requeue: true}, err + } + return reconcile.Result{}, nil +} + func buildFilterStaticRule(base uint32) map[string][]iptables.Rule { res := map[string][]iptables.Rule{ "FORWARD": {{ @@ -783,13 +825,16 @@ func buildMangleStaticRule(base uint32, }, }) - if isEgressNode && enableGatewayReplyRoute { + if isEgressNode { prerouting = append(prerouting, iptables.Rule{ - Match: iptables.MatchCriteria{}, - Action: iptables.JumpAction{Target: "EGRESSGATEWAY-REPLY-ROUTING"}, - Comment: []string{ - "egressGateway Reply datapath rule, rule is from the EgressGateway", - }, + Match: iptables.MatchCriteria{}, + Action: iptables.JumpAction{Target: "EGRESSGATEWAY-REPLY-ROUTING"}, + Comment: []string{"EgressGateway reply datapath rule, rule is from the EgressGateway"}, + }) + prerouting = append(prerouting, iptables.Rule{ + Match: iptables.MatchCriteria{}.MarkMatchesWithMask(base, 0xff000000), + Action: iptables.AcceptAction{}, + Comment: []string{"EgressGateway reply datapath rule, rule is from the EgressGateway"}, }) postrouting = append(postrouting, iptables.Rule{ Match: iptables.MatchCriteria{}.MarkMatchesWithMask(replyMark, 0xffffffff), @@ -808,30 +853,34 @@ func buildMangleStaticRule(base uint32, return res } -func buildPreroutingReplyRouting(vxlanName string, replyMark uint32) []iptables.Rule { +func buildPreroutingReplyRouting(vxlanName string, base uint32, replyMark string, mac string) ([]iptables.Rule, error) { + mark, err := parseMark(replyMark) + if err != nil { + return nil, err + } return []iptables.Rule{ { - Match: iptables.MatchCriteria{}.InInterface(vxlanName), - Action: iptables.SetMaskedMarkAction{Mark: replyMark, Mask: 0xffffffff}, + Match: iptables.MatchCriteria{}.InInterface(vxlanName).SrcMacSource(mac).CTDirectionOriginal(iptables.DirectionOriginal), + Action: iptables.SetMaskedMarkAction{Mark: mark, Mask: 0xffffffff}, Comment: []string{ - "mark the traffic from the EgressGateway tunnel, rule is from the EgressGateway", + "Mark the traffic from the EgressGateway tunnel, rule is from the EgressGateway", }, }, { - Match: iptables.MatchCriteria{}.MarkMatchesWithMask(replyMark, 0xffffffff), - Action: iptables.SaveConnMarkAction{SaveMask: replyMark}, + Match: iptables.MatchCriteria{}.MarkMatchesWithMask(mark, 0xffffffff), + Action: iptables.SaveConnMarkAction{SaveMask: 0xffffffff}, Comment: []string{ - "save mark to the connection, rule is from the EgressGateway", + "Save mark to the connection, rule is from the EgressGateway", }, }, { - Match: iptables.MatchCriteria{}.ConntrackState("ESTABLISHED"), - Action: iptables.RestoreConnMarkAction{RestoreMask: 0}, + Match: iptables.MatchCriteria{}.InInterface(vxlanName).SrcMacSource(mac), + Action: iptables.SetMaskedMarkAction{Mark: base, Mask: 0xffffffff}, Comment: []string{ - "label for restoring connections, rule is from the EgressGateway", + "Clear Mark of the inner package, rule is from the EgressGateway", }, }, - } + }, nil } // reconcilePolicy reconcile egress policy @@ -1157,6 +1206,11 @@ func newPolicyController(mgr manager.Manager, log logr.Logger, cfg *config.Confi return fmt.Errorf("failed to watch EgressPolicy: %w", err) } + if err := c.Watch(source.Kind(mgr.GetCache(), &egressv1.EgressTunnel{}), + handler.EnqueueRequestsFromMapFunc(utils.KindToMapFlat("EgressTunnel")), egressTunnelPredicate{}); err != nil { + return fmt.Errorf("failed to watch EgressTunnel: %w", err) + } + if err := c.Watch(source.Kind(mgr.GetCache(), &egressv1.EgressClusterPolicy{}), handler.EnqueueRequestsFromMapFunc(utils.KindToMapFlat("EgressClusterPolicy")), policyPredicate{}); err != nil { return fmt.Errorf("failed to watch EgressClusterPolicy: %w", err) diff --git a/pkg/agent/route/route.go b/pkg/agent/route/route.go index c8cfed4cb..61a7549c1 100644 --- a/pkg/agent/route/route.go +++ b/pkg/agent/route/route.go @@ -12,12 +12,34 @@ import ( "github.com/spidernet-io/egressgateway/pkg/markallocator" ) -func NewRuleRoute(log logr.Logger) *RuleRoute { - return &RuleRoute{log: log} +// NewRuleRoute creates a new RuleRoute with the provided options. +func NewRuleRoute(options ...Option) *RuleRoute { + r := &RuleRoute{priority: 99} + for _, o := range options { + o(r) + } + return r +} + +type Option func(*RuleRoute) + +// WithPriority sets the priority of the RuleRoute. +func WithPriority(priority int) Option { + return func(r *RuleRoute) { + r.priority = priority + } +} + +// WithLogger sets the logger of the RuleRoute. +func WithLogger(logger logr.Logger) Option { + return func(r *RuleRoute) { + r.log = logger + } } type RuleRoute struct { - log logr.Logger + log logr.Logger + priority int } func (r *RuleRoute) PurgeStaleRules(marks map[int]struct{}, baseMark string) error { @@ -29,6 +51,7 @@ func (r *RuleRoute) PurgeStaleRules(marks map[int]struct{}, baseMark string) err clean := func(rules []netlink.Rule, family int) error { for _, rule := range rules { rule.Family = family + if _, ok := marks[rule.Mark]; !ok { if int(start) <= rule.Mark && int(end) >= rule.Mark { err := netlink.RuleDel(&rule) @@ -158,10 +181,14 @@ func (r *RuleRoute) EnsureRule(family int, table int, mark int, log logr.Logger) if rule.Table != table { del = true } + if rule.Priority != 99 { + del = true + } if found { del = true } if del { + r.log.V(1).Info("delete rule", "rule", rule.String()) rule.Family = family err = netlink.RuleDel(&rule) if err != nil { @@ -175,18 +202,18 @@ func (r *RuleRoute) EnsureRule(family int, table int, mark int, log logr.Logger) return nil } - if !found { - r.log.V(1).Info("rule not match, try add it") - rule := netlink.NewRule() - rule.Table = table - rule.Mark = mark - rule.Family = family + // not found + r.log.V(1).Info("rule not match, try add it") + rule := netlink.NewRule() + rule.Table = table + rule.Mark = mark + rule.Family = family + rule.Priority = 99 - r.log.V(1).Info("add rule", "rule", rule.String()) - err := netlink.RuleAdd(rule) - if err != nil { - return err - } + r.log.V(1).Info("add rule", "rule", rule.String()) + err = netlink.RuleAdd(rule) + if err != nil { + return err } return nil } diff --git a/pkg/agent/route/route_test.go b/pkg/agent/route/route_test.go index d077fea69..3fab091a8 100644 --- a/pkg/agent/route/route_test.go +++ b/pkg/agent/route/route_test.go @@ -10,12 +10,11 @@ import ( "github.com/agiledragon/gomonkey/v2" "github.com/go-logr/logr" - "github.com/spidernet-io/egressgateway/pkg/markallocator" "github.com/stretchr/testify/assert" "github.com/vishvananda/netlink" -) -var mockLogger = logr.Logger{} + "github.com/spidernet-io/egressgateway/pkg/markallocator" +) func TestPurgeStaleRules(t *testing.T) { cases := map[string]struct { @@ -23,28 +22,28 @@ func TestPurgeStaleRules(t *testing.T) { expErr bool }{ "failed RangeSize": { - prepare: err_PurgeStaleRules_RangeSize, + prepare: errPurgeStaleRulesRangeSize, expErr: true, }, "failed RuleListFiltered v4": { - prepare: err_PurgeStaleRules_RuleListFilteredV4, + prepare: errPurgeStaleRulesRuleListFilteredV4, expErr: true, }, "failed RuleListFiltered v6": { - prepare: err_PurgeStaleRules_RuleListFilteredV6, + prepare: errPurgeStaleRulesRuleListFilteredV6, expErr: true, }, "failed RuleDel v4": { - prepare: err_PurgeStaleRules_RuleDelV4, + prepare: errPurgeStaleRulesRuleDelV4, expErr: true, }, "failed RuleDel v6": { - prepare: err_PurgeStaleRules_RuleDelV6, + prepare: errPurgeStaleRulesRuleDelV6, expErr: true, }, "succeed": {}, } - ruleRoute := NewRuleRoute(mockLogger) + ruleRoute := NewRuleRoute() marks := map[int]struct{}{ 1: {}, @@ -55,17 +54,14 @@ func TestPurgeStaleRules(t *testing.T) { for name, tc := range cases { t.Run(name, func(t *testing.T) { - var err error - var patches = make([]gomonkey.Patches, 0) + patches := make([]gomonkey.Patches, 0) if tc.prepare != nil { - patchess := tc.prepare() - patches = append(patches, patchess...) + patches = tc.prepare() } + err := ruleRoute.PurgeStaleRules(marks, baseMark) if tc.expErr { - err = ruleRoute.PurgeStaleRules(marks, baseMark) assert.Error(t, err) } else { - err = ruleRoute.PurgeStaleRules(marks, baseMark) assert.NoError(t, err) } for _, p := range patches { @@ -82,46 +78,45 @@ func TestEnsure(t *testing.T) { expErr bool }{ "zero mark": { - prepare: mock_Ensure__zeroMark, + prepare: mockEnsureZeroMark, }, "failed EnsureRule v4": { - makePatch: err_Ensure_EnsureRuleV4, - prepare: mock_Ensure_params, + makePatch: errEnsureEnsureRuleV4, + prepare: mockEnsureParams, expErr: true, }, "failed EnsureRule v6": { - makePatch: err_Ensure_EnsureRuleV6, - prepare: mock_Ensure_params, + makePatch: errEnsureEnsureRuleV6, + prepare: mockEnsureParams, expErr: true, }, "failed LinkByName": { - prepare: mock_Ensure_params, + prepare: mockEnsureParams, expErr: true, }, "failed EnsureRoute v4": { - makePatch: err_Ensure_EnsureRouteV4, - prepare: mock_Ensure_params, + makePatch: errEnsureEnsureRouteV4, + prepare: mockEnsureParams, expErr: true, }, "failed EnsureRoute v6": { - makePatch: err_Ensure_EnsureRouteV6, - prepare: mock_Ensure_params, + makePatch: errEnsureEnsureRouteV6, + prepare: mockEnsureParams, expErr: true, }, "succeeded Ensure": { - makePatch: succ_Ensure, - prepare: mock_Ensure_params, + prepare: mockEnsureParams, + makePatch: successEnsure, }, } - ruleRoute := NewRuleRoute(mockLogger) + ruleRoute := NewRuleRoute() for name, tc := range cases { t.Run(name, func(t *testing.T) { var err error var patches = make([]gomonkey.Patches, 0) if tc.makePatch != nil { - patchess := tc.makePatch(ruleRoute) - patches = append(patches, patchess...) + patches = tc.makePatch(ruleRoute) } name, ipv4, ipv6, table, mark := tc.prepare() @@ -147,39 +142,38 @@ func TestEnsureRoute(t *testing.T) { expErr bool }{ "failed RouteListFiltered v4": { - prepare: mock_EnsureRoute_params, - makePatch: err_EnsureRoute_RouteListFiltered, + prepare: mockEnsureRouteParams, + makePatch: errEnsureRouteRouteListFiltered, expErr: true, }, "failed RouteDel v4": { - prepare: mock_EnsureRoute_params, - makePatch: err_EnsureRoute_RouteDel, + prepare: mockEnsureRouteParams, + makePatch: errEnsureRouteRouteDel, expErr: true, }, "succeeded EnsureRoute": { - prepare: mock_EnsureRoute_params, - makePatch: succ_EnsureRoute, + prepare: mockEnsureRouteParams, + makePatch: successEnsureRoute, }, "nil ip": { - prepare: mock_EnsureRoute_empty_ip, - makePatch: err_EnsureRoute_empty_ip, + prepare: mockEnsureRouteEmptyIP, + makePatch: errEnsureRouteEmptyIP, }, "failed RouteAdd": { - prepare: mock_EnsureRoute_params, - makePatch: err_EnsureRoute_RouteAdd, + prepare: mockEnsureRouteParams, + makePatch: errEnsureRouteRouteAdd, expErr: true, }, } - ruleRoute := NewRuleRoute(mockLogger) + ruleRoute := NewRuleRoute() for name, tc := range cases { t.Run(name, func(t *testing.T) { var err error var patches = make([]gomonkey.Patches, 0) if tc.makePatch != nil { - patchess := tc.makePatch() - patches = append(patches, patchess...) + patches = tc.makePatch() } name, ip, family, table, log := tc.prepare() @@ -205,45 +199,43 @@ func TestEnsureRule(t *testing.T) { expErr bool }{ "failed RuleListFiltered v4": { - prepare: mock_EnsureRule_params, - makePatch: err_EnsureRule_RuleListFiltered, + prepare: mockEnsureRuleParams, + makePatch: errEnsureRuleRuleListFiltered, expErr: true, }, "failed RuleDel": { - prepare: mock_EnsureRule_params, - makePatch: err_EnsureRule_RuleDel, + prepare: mockEnsureRuleParams, + makePatch: errEnsureRuleRuleDel, expErr: true, }, "succeeded found": { - prepare: mock_EnsureRule_params, - makePatch: succ_EnsureRule_found, + prepare: mockEnsureRuleParams, + makePatch: successEnsureRuleFound, }, "failed RuleAdd": { - prepare: mock_EnsureRule_params, - makePatch: err_EnsureRule_RuleAdd, + prepare: mockEnsureRuleParams, + makePatch: errEnsureRuleRuleAdd, expErr: true, }, "succeeded RuleAdd": { - prepare: mock_EnsureRule_params, - makePatch: succ_EnsureRule_RuleAdd, + prepare: mockEnsureRuleParams, + makePatch: successEnsureRuleRuleAdd, }, - "succeeded multi-RuleDel": { - prepare: mock_EnsureRule_params, - makePatch: succ_EnsureRule_multi_RuleDel, + "succeeded multi RuleDel": { + prepare: mockEnsureRuleParams, + makePatch: successEnsureRuleMultiRuleDel, }, } - ruleRoute := NewRuleRoute(mockLogger) + ruleRoute := NewRuleRoute() for name, tc := range cases { t.Run(name, func(t *testing.T) { var err error var patches = make([]gomonkey.Patches, 0) if tc.makePatch != nil { - patchess := tc.makePatch() - patches = append(patches, patchess...) + patches = tc.makePatch() } - family, table, mark, log := tc.prepare() if tc.expErr { err = ruleRoute.EnsureRule(family, table, mark, log) @@ -259,17 +251,17 @@ func TestEnsureRule(t *testing.T) { } } -func err_PurgeStaleRules_RangeSize() []gomonkey.Patches { +func errPurgeStaleRulesRangeSize() []gomonkey.Patches { patch := gomonkey.ApplyFuncReturn(markallocator.RangeSize, uint64(0), uint64(0), errors.New("some error")) return []gomonkey.Patches{*patch} } -func err_PurgeStaleRules_RuleListFilteredV4() []gomonkey.Patches { +func errPurgeStaleRulesRuleListFilteredV4() []gomonkey.Patches { patch := gomonkey.ApplyFuncReturn(netlink.RuleListFiltered, nil, errors.New("some error")) return []gomonkey.Patches{*patch} } -func err_PurgeStaleRules_RuleListFilteredV6() []gomonkey.Patches { +func errPurgeStaleRulesRuleListFilteredV6() []gomonkey.Patches { patch := gomonkey.ApplyFuncSeq(netlink.RuleListFiltered, []gomonkey.OutputCell{ {Values: gomonkey.Params{nil, nil}, Times: 1}, {Values: gomonkey.Params{nil, errors.New("some error")}, Times: 1}, @@ -277,14 +269,14 @@ func err_PurgeStaleRules_RuleListFilteredV6() []gomonkey.Patches { return []gomonkey.Patches{*patch} } -func err_PurgeStaleRules_RuleDelV4() []gomonkey.Patches { - patch := gomonkey.ApplyFuncReturn(netlink.RuleListFiltered, []netlink.Rule{{Mark: 5000}}, nil) +func errPurgeStaleRulesRuleDelV4() []gomonkey.Patches { + patch := gomonkey.ApplyFuncReturn(netlink.RuleListFiltered, []netlink.Rule{{Mark: 5000, Priority: 99}}, nil) patch2 := gomonkey.ApplyFuncReturn(netlink.RuleDel, errors.New("some error")) return []gomonkey.Patches{*patch, *patch2} } -func err_PurgeStaleRules_RuleDelV6() []gomonkey.Patches { - patch := gomonkey.ApplyFuncReturn(netlink.RuleListFiltered, []netlink.Rule{{Mark: 5000}}, nil) +func errPurgeStaleRulesRuleDelV6() []gomonkey.Patches { + patch := gomonkey.ApplyFuncReturn(netlink.RuleListFiltered, []netlink.Rule{{Mark: 5000, Priority: 99}}, nil) patch2 := gomonkey.ApplyFuncSeq(netlink.RuleDel, []gomonkey.OutputCell{ {Values: gomonkey.Params{nil}, Times: 1}, {Values: gomonkey.Params{errors.New("some error")}, Times: 1}, @@ -292,22 +284,22 @@ func err_PurgeStaleRules_RuleDelV6() []gomonkey.Patches { return []gomonkey.Patches{*patch, *patch2} } -func mock_Ensure__zeroMark() (string, *net.IP, *net.IP, int, int) { - return "testlink", nil, nil, 0, 0 +func mockEnsureZeroMark() (string, *net.IP, *net.IP, int, int) { + return "test-link", nil, nil, 0, 0 } -func mock_Ensure_params() (string, *net.IP, *net.IP, int, int) { +func mockEnsureParams() (string, *net.IP, *net.IP, int, int) { ipv4 := net.ParseIP("192.168.0.1") ipv6 := net.ParseIP("2001:db8::1") - return "testlink", &ipv4, &ipv6, 1000, 1234 + return "test-link", &ipv4, &ipv6, 1000, 1234 } -func err_Ensure_EnsureRuleV4(r *RuleRoute) []gomonkey.Patches { +func errEnsureEnsureRuleV4(r *RuleRoute) []gomonkey.Patches { patch := gomonkey.ApplyMethodReturn(r, "EnsureRule", errors.New("some err")) return []gomonkey.Patches{*patch} } -func err_Ensure_EnsureRuleV6(r *RuleRoute) []gomonkey.Patches { +func errEnsureEnsureRuleV6(r *RuleRoute) []gomonkey.Patches { patch := gomonkey.ApplyMethodSeq(r, "EnsureRule", []gomonkey.OutputCell{ {Values: gomonkey.Params{nil}, Times: 1}, {Values: gomonkey.Params{errors.New("some err")}, Times: 1}, @@ -315,13 +307,13 @@ func err_Ensure_EnsureRuleV6(r *RuleRoute) []gomonkey.Patches { return []gomonkey.Patches{*patch} } -func err_Ensure_EnsureRouteV4(r *RuleRoute) []gomonkey.Patches { +func errEnsureEnsureRouteV4(r *RuleRoute) []gomonkey.Patches { patch1 := gomonkey.ApplyFuncReturn(netlink.LinkByName, &netlink.Dummy{}, nil) patch := gomonkey.ApplyMethodReturn(r, "EnsureRoute", errors.New("some err")) return []gomonkey.Patches{*patch1, *patch} } -func err_Ensure_EnsureRouteV6(r *RuleRoute) []gomonkey.Patches { +func errEnsureEnsureRouteV6(r *RuleRoute) []gomonkey.Patches { patch1 := gomonkey.ApplyFuncReturn(netlink.LinkByName, &netlink.Dummy{}, nil) patch := gomonkey.ApplyMethodSeq(r, "EnsureRoute", []gomonkey.OutputCell{ {Values: gomonkey.Params{nil}, Times: 1}, @@ -330,45 +322,45 @@ func err_Ensure_EnsureRouteV6(r *RuleRoute) []gomonkey.Patches { return []gomonkey.Patches{*patch1, *patch} } -func succ_Ensure(r *RuleRoute) []gomonkey.Patches { +func successEnsure(r *RuleRoute) []gomonkey.Patches { patch1 := gomonkey.ApplyFuncReturn(netlink.LinkByName, &netlink.Dummy{}, nil) - patch := gomonkey.ApplyMethodReturn(r, "EnsureRoute", nil) - return []gomonkey.Patches{*patch1, *patch} + patch2 := gomonkey.ApplyMethodReturn(r, "EnsureRoute", nil) + return []gomonkey.Patches{*patch1, *patch2} } -func mock_EnsureRoute_params() (netlink.Link, *net.IP, int, int, logr.Logger) { +func mockEnsureRouteParams() (netlink.Link, *net.IP, int, int, logr.Logger) { link := &netlink.Dummy{ LinkAttrs: netlink.LinkAttrs{ Index: 1, - Name: "testlink", + Name: "test-link", }, } ipv4 := net.ParseIP("192.168.0.1") - family := 4 + family := netlink.FAMILY_V4 table := 1000 log := logr.Logger{} return link, &ipv4, family, table, log } -func mock_EnsureRoute_empty_ip() (netlink.Link, *net.IP, int, int, logr.Logger) { +func mockEnsureRouteEmptyIP() (netlink.Link, *net.IP, int, int, logr.Logger) { link := &netlink.Dummy{ LinkAttrs: netlink.LinkAttrs{ Index: 1, - Name: "testlink", + Name: "test-link", }, } - family := 4 + family := netlink.FAMILY_V4 table := 1000 log := logr.Logger{} return link, nil, family, table, log } -func err_EnsureRoute_RouteListFiltered() []gomonkey.Patches { +func errEnsureRouteRouteListFiltered() []gomonkey.Patches { patch := gomonkey.ApplyFuncReturn(netlink.RouteListFiltered, nil, errors.New("some error")) return []gomonkey.Patches{*patch} } -func err_EnsureRoute_RouteDel() []gomonkey.Patches { +func errEnsureRouteRouteDel() []gomonkey.Patches { patch1 := gomonkey.ApplyFuncReturn(netlink.RouteListFiltered, []netlink.Route{ {Table: 1000}, }, nil) @@ -376,7 +368,7 @@ func err_EnsureRoute_RouteDel() []gomonkey.Patches { return []gomonkey.Patches{*patch, *patch1} } -func succ_EnsureRoute() []gomonkey.Patches { +func successEnsureRoute() []gomonkey.Patches { gw := net.ParseIP("192.168.0.1") patch1 := gomonkey.ApplyFuncReturn(netlink.RouteListFiltered, []netlink.Route{ {Table: 1000, Gw: gw}, @@ -385,7 +377,7 @@ func succ_EnsureRoute() []gomonkey.Patches { return []gomonkey.Patches{*patch, *patch1} } -func err_EnsureRoute_empty_ip() []gomonkey.Patches { +func errEnsureRouteEmptyIP() []gomonkey.Patches { patch1 := gomonkey.ApplyFuncReturn(netlink.RouteListFiltered, []netlink.Route{ {Table: 1000}, }, nil) @@ -393,7 +385,7 @@ func err_EnsureRoute_empty_ip() []gomonkey.Patches { return []gomonkey.Patches{*patch, *patch1} } -func err_EnsureRoute_RouteAdd() []gomonkey.Patches { +func errEnsureRouteRouteAdd() []gomonkey.Patches { patch1 := gomonkey.ApplyFuncReturn(netlink.RouteListFiltered, []netlink.Route{ {Table: 1000}, }, nil) @@ -403,20 +395,20 @@ func err_EnsureRoute_RouteAdd() []gomonkey.Patches { return []gomonkey.Patches{*patch2, *patch1, *patch3} } -func mock_EnsureRule_params() (int, int, int, logr.Logger) { - family := 4 +func mockEnsureRuleParams() (int, int, int, logr.Logger) { + family := netlink.FAMILY_V4 table := 1000 mark := 1234 log := logr.Logger{} return family, table, mark, log } -func err_EnsureRule_RuleListFiltered() []gomonkey.Patches { +func errEnsureRuleRuleListFiltered() []gomonkey.Patches { patch := gomonkey.ApplyFuncReturn(netlink.RuleListFiltered, nil, errors.New("some error")) return []gomonkey.Patches{*patch} } -func err_EnsureRule_RuleDel() []gomonkey.Patches { +func errEnsureRuleRuleDel() []gomonkey.Patches { patch1 := gomonkey.ApplyFuncReturn(netlink.RuleListFiltered, []netlink.Rule{ {Table: 100}, }, nil) @@ -425,32 +417,32 @@ func err_EnsureRule_RuleDel() []gomonkey.Patches { return []gomonkey.Patches{*patch, *patch1} } -func succ_EnsureRule_found() []gomonkey.Patches { +func successEnsureRuleFound() []gomonkey.Patches { patch1 := gomonkey.ApplyFuncReturn(netlink.RuleListFiltered, []netlink.Rule{ - {Table: 1000}, + {Table: 1000, Priority: 99}, }, nil) return []gomonkey.Patches{*patch1} } -func err_EnsureRule_RuleAdd() []gomonkey.Patches { +func errEnsureRuleRuleAdd() []gomonkey.Patches { patch1 := gomonkey.ApplyFuncReturn(netlink.RuleListFiltered, []netlink.Rule{}, nil) patch := gomonkey.ApplyFuncReturn(netlink.RuleAdd, errors.New("some error")) return []gomonkey.Patches{*patch, *patch1} } -func succ_EnsureRule_RuleAdd() []gomonkey.Patches { +func successEnsureRuleRuleAdd() []gomonkey.Patches { patch1 := gomonkey.ApplyFuncReturn(netlink.RuleListFiltered, []netlink.Rule{}, nil) patch := gomonkey.ApplyFuncReturn(netlink.RuleAdd, nil) return []gomonkey.Patches{*patch, *patch1} } -func succ_EnsureRule_multi_RuleDel() []gomonkey.Patches { +func successEnsureRuleMultiRuleDel() []gomonkey.Patches { patch1 := gomonkey.ApplyFuncReturn(netlink.RuleListFiltered, []netlink.Rule{ - {Table: 1000}, - {Table: 1000}, + {Table: 1000, Priority: 99}, + {Table: 1000, Priority: 99}, }, nil) patch := gomonkey.ApplyFuncSeq(netlink.RuleDel, []gomonkey.OutputCell{ diff --git a/pkg/agent/vxlan.go b/pkg/agent/vxlan.go index bee8b6b7a..78343a32d 100644 --- a/pkg/agent/vxlan.go +++ b/pkg/agent/vxlan.go @@ -59,11 +59,6 @@ type VTEP struct { MAC net.HardwareAddr } -type replyRoute struct { - tunnelIP net.IP - linkIndex int -} - func (r *vxlanReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { kind, newReq, err := utils.ParseKindWithReq(req) if err != nil { @@ -88,279 +83,16 @@ func (r *vxlanReconciler) Reconcile(ctx context.Context, req reconcile.Request) case "EgressGateway": return r.reconcileEgressGateway(ctx, newReq, log) case "EgressEndpointSlice": - return r.reconcileEgressEndpointSlice(ctx, newReq, log) + // return r.reconcileEgressEndpointSlice(ctx, newReq, log) + return reconcile.Result{}, nil case "EgressClusterEndpointSlice": - return r.reconcileEgressClusterEndpointSlice(ctx, newReq, log) + // return r.reconcileEgressClusterEndpointSlice(ctx, newReq, log) + return reconcile.Result{}, nil default: return reconcile.Result{}, nil } } -func (r *vxlanReconciler) reconcileEgressEndpointSlice(ctx context.Context, req reconcile.Request, log logr.Logger) (reconcile.Result, error) { - if err := r.syncReplayRoute(log); err != nil { - return reconcile.Result{Requeue: true}, err - } - - return reconcile.Result{}, nil -} - -func (r *vxlanReconciler) reconcileEgressClusterEndpointSlice(ctx context.Context, req reconcile.Request, log logr.Logger) (reconcile.Result, error) { - if err := r.syncReplayRoute(log); err != nil { - return reconcile.Result{Requeue: true}, err - } - - return reconcile.Result{}, nil -} - -func (r *vxlanReconciler) syncReplayRoute(log logr.Logger) error { - - if !r.cfg.FileConfig.EnableGatewayReplyRoute { - log.Info("EnableGatewayReplyRoute=false") - return nil - } - - table := r.cfg.FileConfig.GatewayReplyRouteTable - mark := r.cfg.FileConfig.GatewayReplyRouteMark - ipv4RouteMap := make(map[string]replyRoute, 0) - ipv6RouteMap := make(map[string]replyRoute, 0) - hostIPV4RouteMap := make(map[string]replyRoute, 0) - hostIPV6RouteMap := make(map[string]replyRoute, 0) - ctx := context.Background() - link, err := netlink.LinkByName(r.cfg.FileConfig.VXLAN.Name) - if err != nil { - return err - } - index := link.Attrs().Index - - // Ensure policy routing - if r.cfg.FileConfig.EnableIPv4 { - err := r.ruleRoute.EnsureRule(netlink.FAMILY_V4, table, mark, r.log) - if err != nil { - log.Error(err, "failed to set the routing rule") - return err - } - } - - if r.cfg.FileConfig.EnableIPv6 { - err := r.ruleRoute.EnsureRule(netlink.FAMILY_V6, table, mark, r.log) - if err != nil { - log.Error(err, "failed to set the routing rule") - return err - } - } - - // get the latest routing info - egpList := new(egressv1.EgressPolicyList) - if err := r.client.List(ctx, egpList); err != nil { - log.Error(err, "list EgressPolicyList failed") - return err - } - for _, egp := range egpList.Items { - if egp.Status.Node == r.cfg.EnvConfig.NodeName { - selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ - MatchLabels: map[string]string{egressv1.LabelPolicyName: egp.Name}, - }) - if err != nil { - log.Error(err, "failed to build LabelSelector; ", "key: ", egressv1.LabelPolicyName, " value: ", egp.Name) - continue - } - - opt := &client.ListOptions{LabelSelector: selector} - egenps := new(egressv1.EgressEndpointSliceList) - err = r.client.List(ctx, egenps, opt) - if err != nil { - log.Error(err, "list EgressEndpointSliceList failed;", " egpName=", egp.Name) - continue - } - - for _, egep := range egenps.Items { - for _, ep := range egep.Endpoints { - if ep.Node != r.cfg.EnvConfig.NodeName { - if r.cfg.FileConfig.EnableIPv4 { - if tunnelIP, ok := r.peerMap.Load(ep.Node); ok { - for _, v := range ep.IPv4 { - ipv4RouteMap[v] = replyRoute{tunnelIP: *tunnelIP.IPv4} - } - } else { - log.Info("r.peerMap.Load(", ep.Node, ") = nil") - } - } - - if r.cfg.FileConfig.EnableIPv6 { - if tunnelIP, ok := r.peerMap.Load(ep.Node); ok { - for _, v := range ep.IPv6 { - ipv6RouteMap[v] = replyRoute{tunnelIP: *tunnelIP.IPv6} - } - } - } - } - } - - } - } - } - - egcpList := new(egressv1.EgressClusterPolicyList) - if err := r.client.List(ctx, egcpList); err != nil { - log.Error(err, "list EgressClusterPolicyList failed") - return err - } - for _, egcp := range egcpList.Items { - if egcp.Status.Node == r.cfg.EnvConfig.NodeName { - selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ - MatchLabels: map[string]string{egressv1.LabelPolicyName: egcp.Name}, - }) - if err != nil { - log.Error(err, "failed to build LabelSelector; ", "key: ", egressv1.LabelPolicyName, "value: ", egcp.Name) - continue - } - - opt := &client.ListOptions{LabelSelector: selector} - egcenps := new(egressv1.EgressClusterEndpointSliceList) - err = r.client.List(ctx, egcenps, opt) - if err != nil { - log.Error(err, "list EgressClusterEndpointSliceList failed;", " egcpName=", egcp.Name) - continue - } - - for _, egcep := range egcenps.Items { - for _, ep := range egcep.Endpoints { - if ep.Node != r.cfg.EnvConfig.NodeName { - if r.cfg.FileConfig.EnableIPv4 { - if tunnelIP, ok := r.peerMap.Load(ep.Node); ok { - for _, v := range ep.IPv4 { - ipv4RouteMap[v] = replyRoute{tunnelIP: *tunnelIP.IPv4} - } - } - } - - if r.cfg.FileConfig.EnableIPv6 { - if tunnelIP, ok := r.peerMap.Load(ep.Node); ok { - for _, v := range ep.IPv6 { - ipv6RouteMap[v] = replyRoute{tunnelIP: *tunnelIP.IPv6} - } - } - } - } - } - } - } - } - - // get info about the routing table on the host - routeFilter := &netlink.Route{Table: table} - ipV4Routes, err := netlink.RouteListFiltered(netlink.FAMILY_V4, routeFilter, netlink.RT_FILTER_TABLE) - if err != nil { - log.Error(err, "Failed to obtain the IPv4 route of the host") - return err - } - ipV6Routes, err := netlink.RouteListFiltered(netlink.FAMILY_V4, routeFilter, netlink.RT_FILTER_TABLE) - if err != nil { - log.Error(err, "Failed to obtain the IPv6 route of the host") - return err - } - - for _, route := range ipV4Routes { - if route.Table == table { - hostIPV4RouteMap[route.Dst.IP.String()] = replyRoute{tunnelIP: route.Gw, linkIndex: route.LinkIndex} - } - } - for _, route := range ipV6Routes { - if route.Table == table { - hostIPV6RouteMap[route.Dst.IP.String()] = replyRoute{tunnelIP: route.Gw, linkIndex: route.LinkIndex} - } - } - - // IPV4 - if r.cfg.FileConfig.EnableIPv4 { - // delete unnecessary or incorrect routes from the host - for k, v := range hostIPV4RouteMap { - route := &netlink.Route{LinkIndex: index, Dst: &net.IPNet{IP: net.ParseIP(k).To4(), Mask: net.CIDRMask(32, 32)}, Gw: v.tunnelIP, Table: table} - if _, ok := ipv4RouteMap[k]; !ok { - err = netlink.RouteDel(route) - if err != nil { - log.Error(err, "failed to delete route; ", "route=", route) - continue - } - } else { - if v.tunnelIP.String() != ipv4RouteMap[k].tunnelIP.String() || index != v.linkIndex { - err = netlink.RouteDel(route) - if err != nil { - log.Error(err, "failed to delete route; ", "route=", route) - continue - } - - route.ILinkIndex = index - route.Dst = &net.IPNet{IP: net.ParseIP(k).To4(), Mask: net.CIDRMask(32, 32)} - route.Gw = ipv4RouteMap[k].tunnelIP - err = netlink.RouteAdd(route) - if err != nil { - log.Error(err, "failed to add route; ", "route=", route) - continue - } - } - } - } - - // add a missing route from the host - for k, v := range ipv4RouteMap { - if _, ok := hostIPV4RouteMap[k]; !ok { - route := &netlink.Route{LinkIndex: index, Dst: &net.IPNet{IP: net.ParseIP(k).To4(), Mask: net.CIDRMask(32, 32)}, Gw: v.tunnelIP, Table: table} - err = netlink.RouteAdd(route) - log.Info("add ", "route=", route) - if err != nil { - log.Error(err, "failed to add route; ", "route=", route) - continue - } - } - } - } - - // IPV6 - if r.cfg.FileConfig.EnableIPv6 { - for k, v := range hostIPV6RouteMap { - route := &netlink.Route{LinkIndex: index, Dst: &net.IPNet{IP: net.ParseIP(k).To16(), Mask: net.CIDRMask(128, 128)}, Gw: v.tunnelIP, Table: table} - if _, ok := ipv6RouteMap[k]; !ok { - err = netlink.RouteDel(route) - if err != nil { - log.Error(err, "failed to delete route; ", "route=", route) - continue - } - } else { - if v.tunnelIP.String() != ipv6RouteMap[k].tunnelIP.String() || index != v.linkIndex { - err = netlink.RouteDel(route) - if err != nil { - log.Error(err, "failed to delete route; ", "route=", route) - continue - } - - route.ILinkIndex = index - route.Dst = &net.IPNet{IP: net.ParseIP(k).To16(), Mask: net.CIDRMask(128, 128)} - route.Gw = ipv6RouteMap[k].tunnelIP - err = netlink.RouteAdd(route) - if err != nil { - log.Error(err, "failed to add route; ", "route=", route) - continue - } - } - } - } - - for k, v := range ipv6RouteMap { - if _, ok := hostIPV6RouteMap[k]; !ok { - route := &netlink.Route{LinkIndex: index, Dst: &net.IPNet{IP: net.ParseIP(k).To16(), Mask: net.CIDRMask(1, 128)}, Gw: v.tunnelIP, Table: table} - err = netlink.RouteAdd(route) - if err != nil { - log.Error(err, "failed to add route; ", "route=", route) - continue - } - } - } - } - - return nil -} - func (r *vxlanReconciler) reconcileEgressGateway(ctx context.Context, req reconcile.Request, log logr.Logger) (reconcile.Result, error) { egressTunnelMap, err := r.getEgressTunnelByEgressGateway(ctx, req.Name) if err != nil { @@ -451,18 +183,10 @@ func (r *vxlanReconciler) reconcileEgressTunnel(ctx context.Context, req reconci log.Error(err, "add egress tunnel, ensure route with error") } - egressTunnelMap, err := r.listEgressTunnel(ctx) + err = r.ruleRoute.Ensure(r.cfg.FileConfig.VXLAN.Name, peer.IPv4, peer.IPv6, peer.Mark, peer.Mark) if err != nil { - return reconcile.Result{}, err - } - if _, ok := egressTunnelMap[node.Name]; ok { - // if it is egresstunnel - err = r.ruleRoute.Ensure(r.cfg.FileConfig.VXLAN.Name, peer.IPv4, peer.IPv6, peer.Mark, peer.Mark) - if err != nil { - r.log.Error(err, "ensure vxlan link") - } + r.log.Error(err, "ensure vxlan link") } - return reconcile.Result{}, nil } @@ -735,12 +459,7 @@ func (r *vxlanReconciler) keepVXLAN() { markMap := make(map[int]struct{}) r.peerMap.Range(func(key string, val vxlan.Peer) bool { - egressTunnelMap, err := r.listEgressTunnel(context.Background()) - if err != nil { - r.log.Error(err, "ensure vxlan list EgressTunnel with error") - return false - } - if _, ok := egressTunnelMap[key]; ok && val.Mark != 0 { + if val.Mark != 0 { markMap[val.Mark] = struct{}{} err = r.ruleRoute.Ensure(r.cfg.FileConfig.VXLAN.Name, val.IPv4, val.IPv6, val.Mark, val.Mark) if err != nil { @@ -854,14 +573,6 @@ func (r *vxlanReconciler) keepReplayRoute() { return } - for { - err := r.syncReplayRoute(log) - if err != nil { - log.Error(err, "failed to keep replay route") - } - - time.Sleep(time.Second * 10) - } } func (r *vxlanReconciler) Start(ctx context.Context) error { @@ -882,7 +593,7 @@ func parseMarkToInt(mark string) (int, error) { } func newEgressTunnelController(mgr manager.Manager, cfg *config.Config, log logr.Logger) error { - ruleRoute := route.NewRuleRoute(log) + ruleRoute := route.NewRuleRoute(route.WithLogger(log)) r := &vxlanReconciler{ client: mgr.GetClient(), @@ -947,7 +658,6 @@ func newEgressTunnelController(mgr manager.Manager, cfg *config.Config, log logr } go r.keepVXLAN() - go r.keepReplayRoute() return nil } @@ -976,4 +686,6 @@ func (p egressTunnelPredicate) Update(updateEvent event.UpdateEvent) bool { } return false } -func (p egressTunnelPredicate) Generic(_ event.GenericEvent) bool { return false } +func (p egressTunnelPredicate) Generic(_ event.GenericEvent) bool { + return false +} diff --git a/pkg/agent/vxlan/vxlan.go b/pkg/agent/vxlan/vxlan.go index 146bb0105..11353ada3 100644 --- a/pkg/agent/vxlan/vxlan.go +++ b/pkg/agent/vxlan/vxlan.go @@ -149,7 +149,7 @@ func (dev *Device) ensureLink(vxlan *netlink.Vxlan) (*netlink.Vxlan, error) { func (dev *Device) ensureFilter(ipv4, ipv6 *net.IPNet) error { name := "all" if ipv4 != nil { - err := writeProcSys(fmt.Sprintf("/proc/sys/net/ipv4/conf/%s/rp_filter", name), "2") + err := writeProcSys(fmt.Sprintf("/proc/sys/net/ipv4/conf/%s/rp_filter", name), "0") if err != nil { return err } diff --git a/pkg/iptables/match_builder.go b/pkg/iptables/match_builder.go index 0591d7cd5..1cf108d65 100644 --- a/pkg/iptables/match_builder.go +++ b/pkg/iptables/match_builder.go @@ -85,6 +85,10 @@ func (m MatchCriteria) InInterface(ifaceMatch string) MatchCriteria { return append(m, fmt.Sprintf("--in-interface %s", ifaceMatch)) } +func (m MatchCriteria) SrcMacSource(mac string) MatchCriteria { + return append(m, fmt.Sprintf("-m mac --mac-source %s", mac)) +} + func (m MatchCriteria) OutInterface(ifaceMatch string) MatchCriteria { return append(m, fmt.Sprintf("--out-interface %s", ifaceMatch)) }