Skip to content

Commit

Permalink
Add feature: Support AWS with Cilium
Browse files Browse the repository at this point in the history
  • Loading branch information
lou-lan committed Jul 22, 2024
1 parent a1a3804 commit df66709
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 457 deletions.
140 changes: 97 additions & 43 deletions pkg/agent/police.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ func (r *policeReconciler) Reconcile(ctx context.Context, req reconcile.Request)
res, err = r.reconcilePolicy(ctx, newReq, log)
case "EgressClusterInfo":
res, err = r.reconcileClusterInfo(ctx, newReq, log)
case "EgressTunnel":
res, err = r.reconcileTunnel(ctx, newReq, log)
default:
return reconcile.Result{}, nil
}
Expand All @@ -91,6 +93,7 @@ type PolicyCommon struct {
NodeName string
DestSubnet []string
IP IP
UseNodeIP bool
}

type IP struct {
Expand Down Expand Up @@ -132,10 +135,15 @@ func (r *policeReconciler) initApplyPolicy() error {
if list.Name == r.cfg.NodeName {
isEgressNode = true
for _, eip := range list.Eips {
useNodeIP := false
if eip.IPv4 == "" && eip.IPv6 == "" {
useNodeIP = true
}
for _, policy := range eip.Policies {
snatPolicies[policy] = &PolicyCommon{
NodeName: list.Name,
IP: IP{V4: eip.IPv4, V6: eip.IPv6},
NodeName: list.Name,
IP: IP{V4: eip.IPv4, V6: eip.IPv6},
UseNodeIP: useNodeIP,
}
}
}
Expand Down Expand Up @@ -197,20 +205,6 @@ func (r *policeReconciler) initApplyPolicy() error {
}
}

// add forward rules for replay packet on gateway node, which should be enabled for spiderpool
//if isEgressNode && r.cfg.FileConfig.EnableGatewayReplyRoute {
// gatewayReplyRouteMark := r.cfg.FileConfig.GatewayReplyRouteMark
// dev := r.cfg.FileConfig.VXLAN.Name
//
// for _, table := range r.mangleTables {
// table.UpdateChain(&iptables.Chain{Name: "EGRESSGATEWAY-REPLY-ROUTING"})
// chainMapRules := buildReplyRouteIptables(uint32(gatewayReplyRouteMark), dev)
// for chain, rules := range chainMapRules {
// table.InsertOrAppendRules(chain, rules)
// }
// }
//}

for _, table := range r.mangleTables {
rules := make([]iptables.Rule, 0)
for policy, val := range unSnatPolicies {
Expand Down Expand Up @@ -242,10 +236,37 @@ func (r *policeReconciler) initApplyPolicy() error {
Name: "EGRESSGATEWAY-MARK-REQUEST",
Rules: rules,
})

tunnels := new(egressv1.EgressTunnelList)
err := r.client.List(ctx, tunnels)
if err != nil {
return err
}
rules = make([]iptables.Rule, 0)
for _, tunnel := range tunnels.Items {
if tunnel.Name == r.cfg.NodeName {
continue
}
if tunnel.Status.Mark == "" || tunnel.Status.Tunnel.MAC == "" {
continue
}
rule, err := buildPreroutingReplyRouting(r.cfg.FileConfig.VXLAN.Name, baseMark, tunnel.Status.Mark, tunnel.Status.Tunnel.MAC)
if err != nil {
return err
}
rules = append(rules, rule...)
}
restore := iptables.Rule{
Match: iptables.MatchCriteria{}.CTDirectionOriginal(iptables.DirectionReply),
Action: iptables.RestoreConnMarkAction{RestoreMask: 0xffffffff},
Comment: []string{
"label for restoring connections, rule is from the EgressGateway",
},
}
rules = append(rules, restore)
table.UpdateChain(&iptables.Chain{
Name: "EGRESSGATEWAY-REPLY-ROUTING",
Rules: buildPreroutingReplyRouting(r.cfg.FileConfig.VXLAN.Name,
uint32(r.cfg.FileConfig.GatewayReplyRouteMark)),
Name: "EGRESSGATEWAY-REPLY-ROUTING",
Rules: rules,
})
}

Expand All @@ -262,7 +283,7 @@ func (r *policeReconciler) initApplyPolicy() error {
isIgnoreInternalCIDR = true
}

rule := buildEipRule(policyName, val.IP, table.IPVersion, isIgnoreInternalCIDR)
rule := buildEipRule(policyName, val.IP, table.IPVersion, isIgnoreInternalCIDR, val.UseNodeIP)
if rule != nil {
rules = append(rules, *rule)
}
Expand Down Expand Up @@ -475,11 +496,7 @@ func (r *policeReconciler) getPolicySrcIPs(policyNs, policyName string, filter f
return ipv4List, ipv6List, nil
}

func buildEipRule(policyName string, eip IP, version uint8, isIgnoreInternalCIDR bool) *iptables.Rule {
if (version == 4 && eip.V4 == "") || (version == 6 && eip.V6 == "") {
return nil
}

func buildEipRule(policyName string, eip IP, version uint8, isIgnoreInternalCIDR bool, useNodeIP bool) *iptables.Rule {
tmp := "v4-"
ip := eip.V4
ignoreName := EgressClusterCIDRIPv4
Expand All @@ -499,10 +516,26 @@ func buildEipRule(policyName string, eip IP, version uint8, isIgnoreInternalCIDR
CTDirectionOriginal(iptables.DirectionOriginal)
}

action := iptables.SNATAction{ToAddr: ip}
var action iptables.Action
action = iptables.SNATAction{ToAddr: ip}
if useNodeIP {
fmt.Println("use node ip rule")
fmt.Println("use node ip rule")
fmt.Println("use node ip rule")
fmt.Println("use node ip rule")
fmt.Println("use node ip rule")

action = iptables.MasqAction{}
}
rule := &iptables.Rule{Match: matchCriteria, Action: action, Comment: []string{
fmt.Sprintf("snat policy %s", policyName),
}}

if useNodeIP {
fmt.Println("use node ip rule")
fmt.Println(rule)
}

return rule
}

Expand Down Expand Up @@ -732,6 +765,15 @@ func (r *policeReconciler) reconcileGateway(ctx context.Context, req reconcile.R
return reconcile.Result{}, nil
}

func (r *policeReconciler) reconcileTunnel(ctx context.Context, req reconcile.Request, log logr.Logger) (reconcile.Result, error) {
log.V(1).Info("reconciling")
err := r.initApplyPolicy()
if err != nil {
return reconcile.Result{Requeue: true}, err
}
return reconcile.Result{}, nil
}

func buildFilterStaticRule(base uint32) map[string][]iptables.Rule {
res := map[string][]iptables.Rule{
"FORWARD": {{
Expand Down Expand Up @@ -783,13 +825,16 @@ func buildMangleStaticRule(base uint32,
},
})

if isEgressNode && enableGatewayReplyRoute {
if isEgressNode {
prerouting = append(prerouting, iptables.Rule{
Match: iptables.MatchCriteria{},
Action: iptables.JumpAction{Target: "EGRESSGATEWAY-REPLY-ROUTING"},
Comment: []string{
"egressGateway Reply datapath rule, rule is from the EgressGateway",
},
Match: iptables.MatchCriteria{},
Action: iptables.JumpAction{Target: "EGRESSGATEWAY-REPLY-ROUTING"},
Comment: []string{"EgressGateway reply datapath rule, rule is from the EgressGateway"},
})
prerouting = append(prerouting, iptables.Rule{
Match: iptables.MatchCriteria{}.MarkMatchesWithMask(base, 0xff000000),
Action: iptables.AcceptAction{},
Comment: []string{"EgressGateway reply datapath rule, rule is from the EgressGateway"},
})
postrouting = append(postrouting, iptables.Rule{
Match: iptables.MatchCriteria{}.MarkMatchesWithMask(replyMark, 0xffffffff),
Expand All @@ -808,30 +853,34 @@ func buildMangleStaticRule(base uint32,
return res
}

func buildPreroutingReplyRouting(vxlanName string, replyMark uint32) []iptables.Rule {
func buildPreroutingReplyRouting(vxlanName string, base uint32, replyMark string, mac string) ([]iptables.Rule, error) {
mark, err := parseMark(replyMark)
if err != nil {
return nil, err
}
return []iptables.Rule{
{
Match: iptables.MatchCriteria{}.InInterface(vxlanName),
Action: iptables.SetMaskedMarkAction{Mark: replyMark, Mask: 0xffffffff},
Match: iptables.MatchCriteria{}.InInterface(vxlanName).SrcMacSource(mac).CTDirectionOriginal(iptables.DirectionOriginal),
Action: iptables.SetMaskedMarkAction{Mark: mark, Mask: 0xffffffff},
Comment: []string{
"mark the traffic from the EgressGateway tunnel, rule is from the EgressGateway",
"Mark the traffic from the EgressGateway tunnel, rule is from the EgressGateway",
},
},
{
Match: iptables.MatchCriteria{}.MarkMatchesWithMask(replyMark, 0xffffffff),
Action: iptables.SaveConnMarkAction{SaveMask: replyMark},
Match: iptables.MatchCriteria{}.MarkMatchesWithMask(mark, 0xffffffff),
Action: iptables.SaveConnMarkAction{SaveMask: 0xffffffff},
Comment: []string{
"save mark to the connection, rule is from the EgressGateway",
"Save mark to the connection, rule is from the EgressGateway",
},
},
{
Match: iptables.MatchCriteria{}.ConntrackState("ESTABLISHED"),
Action: iptables.RestoreConnMarkAction{RestoreMask: 0},
Match: iptables.MatchCriteria{}.InInterface(vxlanName).SrcMacSource(mac),
Action: iptables.SetMaskedMarkAction{Mark: base, Mask: 0xffffffff},
Comment: []string{
"label for restoring connections, rule is from the EgressGateway",
"Clear Mark of the inner package, rule is from the EgressGateway",
},
},
}
}, nil
}

// reconcilePolicy reconcile egress policy
Expand Down Expand Up @@ -1157,6 +1206,11 @@ func newPolicyController(mgr manager.Manager, log logr.Logger, cfg *config.Confi
return fmt.Errorf("failed to watch EgressPolicy: %w", err)
}

if err := c.Watch(source.Kind(mgr.GetCache(), &egressv1.EgressTunnel{}),
handler.EnqueueRequestsFromMapFunc(utils.KindToMapFlat("EgressTunnel")), egressTunnelPredicate{}); err != nil {
return fmt.Errorf("failed to watch EgressTunnel: %w", err)
}

if err := c.Watch(source.Kind(mgr.GetCache(), &egressv1.EgressClusterPolicy{}),
handler.EnqueueRequestsFromMapFunc(utils.KindToMapFlat("EgressClusterPolicy")), policyPredicate{}); err != nil {
return fmt.Errorf("failed to watch EgressClusterPolicy: %w", err)
Expand Down
55 changes: 41 additions & 14 deletions pkg/agent/route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,34 @@ import (
"github.com/spidernet-io/egressgateway/pkg/markallocator"
)

func NewRuleRoute(log logr.Logger) *RuleRoute {
return &RuleRoute{log: log}
// NewRuleRoute creates a new RuleRoute with the provided options.
func NewRuleRoute(options ...Option) *RuleRoute {
r := &RuleRoute{priority: 99}
for _, o := range options {
o(r)
}
return r
}

type Option func(*RuleRoute)

// WithPriority sets the priority of the RuleRoute.
func WithPriority(priority int) Option {
return func(r *RuleRoute) {
r.priority = priority
}
}

// WithLogger sets the logger of the RuleRoute.
func WithLogger(logger logr.Logger) Option {
return func(r *RuleRoute) {
r.log = logger
}
}

type RuleRoute struct {
log logr.Logger
log logr.Logger
priority int
}

func (r *RuleRoute) PurgeStaleRules(marks map[int]struct{}, baseMark string) error {
Expand All @@ -29,6 +51,7 @@ func (r *RuleRoute) PurgeStaleRules(marks map[int]struct{}, baseMark string) err
clean := func(rules []netlink.Rule, family int) error {
for _, rule := range rules {
rule.Family = family

if _, ok := marks[rule.Mark]; !ok {
if int(start) <= rule.Mark && int(end) >= rule.Mark {
err := netlink.RuleDel(&rule)
Expand Down Expand Up @@ -158,10 +181,14 @@ func (r *RuleRoute) EnsureRule(family int, table int, mark int, log logr.Logger)
if rule.Table != table {
del = true
}
if rule.Priority != 99 {
del = true
}
if found {
del = true
}
if del {
r.log.V(1).Info("delete rule", "rule", rule.String())
rule.Family = family
err = netlink.RuleDel(&rule)
if err != nil {
Expand All @@ -175,18 +202,18 @@ func (r *RuleRoute) EnsureRule(family int, table int, mark int, log logr.Logger)
return nil
}

if !found {
r.log.V(1).Info("rule not match, try add it")
rule := netlink.NewRule()
rule.Table = table
rule.Mark = mark
rule.Family = family
// not found
r.log.V(1).Info("rule not match, try add it")
rule := netlink.NewRule()
rule.Table = table
rule.Mark = mark
rule.Family = family
rule.Priority = 99

r.log.V(1).Info("add rule", "rule", rule.String())
err := netlink.RuleAdd(rule)
if err != nil {
return err
}
r.log.V(1).Info("add rule", "rule", rule.String())
err = netlink.RuleAdd(rule)
if err != nil {
return err
}
return nil
}
Loading

0 comments on commit df66709

Please sign in to comment.