diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index cfab9ffce..ceada0ae2 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -422,7 +422,7 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*fl rbTracer.SendsTo(accounter) if f.cfg.Deduper == DeduperFirstCome { - deduper := node.AsMiddle(flow.Dedupe(f.cfg.DeduperFCExpiry, f.cfg.DeduperJustMark, f.cfg.DeduperMerge), + deduper := node.AsMiddle(flow.Dedupe(f.cfg.DeduperFCExpiry, f.cfg.DeduperJustMark, f.cfg.DeduperMerge, f.interfaceNamer), node.ChannelBufferLen(f.cfg.BuffersLength)) mapTracer.SendsTo(deduper) accounter.SendsTo(deduper) diff --git a/pkg/flow/deduper.go b/pkg/flow/deduper.go index 85d6d91fd..d8cb8ea4b 100644 --- a/pkg/flow/deduper.go +++ b/pkg/flow/deduper.go @@ -8,7 +8,6 @@ import ( "github.com/sirupsen/logrus" "github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf" - "github.com/netobserv/netobserv-ebpf-agent/pkg/utils" ) var dlog = logrus.WithField("component", "flow/Deduper") @@ -40,7 +39,7 @@ type entry struct { // (no activity for it during the expiration time) // The justMark argument tells that the deduper should not drop the duplicate flows but // set their Duplicate field. -func Dedupe(expireTime time.Duration, justMark, mergeDup bool) func(in <-chan []*Record, out chan<- []*Record) { +func Dedupe(expireTime time.Duration, justMark, mergeDup bool, ifaceNamer InterfaceNamer) func(in <-chan []*Record, out chan<- []*Record) { cache := &deduperCache{ expire: expireTime, entries: list.New(), @@ -51,7 +50,7 @@ func Dedupe(expireTime time.Duration, justMark, mergeDup bool) func(in <-chan [] cache.removeExpired() fwd := make([]*Record, 0, len(records)) for _, record := range records { - cache.checkDupe(record, justMark, mergeDup, &fwd) + cache.checkDupe(record, justMark, mergeDup, &fwd, ifaceNamer) } if len(fwd) > 0 { out <- fwd @@ -61,7 +60,7 @@ func Dedupe(expireTime time.Duration, justMark, mergeDup bool) func(in <-chan [] } // checkDupe check current record if its already available nad if not added to fwd records list -func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Record) { +func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Record, ifaceNamer InterfaceNamer) { mergeEntry := make(map[string]uint8) rk := r.Id // zeroes fields from key that should be ignored from the flow comparison @@ -95,7 +94,7 @@ func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Rec *fwd = append(*fwd, r) } if mergeDup { - ifName := utils.GetInterfaceName(r.Id.IfIndex) + ifName := ifaceNamer(int(r.Id.IfIndex)) mergeEntry[ifName] = r.Id.Direction if dupEntryNew(*fEntry.dupList, mergeEntry) { *fEntry.dupList = append(*fEntry.dupList, mergeEntry) @@ -122,7 +121,7 @@ func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Rec expiryTime: timeNow().Add(c.expire), } if mergeDup { - ifName := utils.GetInterfaceName(r.Id.IfIndex) + ifName := ifaceNamer(int(r.Id.IfIndex)) mergeEntry[ifName] = r.Id.Direction r.DupList = append(r.DupList, mergeEntry) e.dupList = &r.DupList diff --git a/pkg/flow/deduper_test.go b/pkg/flow/deduper_test.go index 1bd9cb907..e054191b0 100644 --- a/pkg/flow/deduper_test.go +++ b/pkg/flow/deduper_test.go @@ -1,13 +1,13 @@ package flow import ( + "net" "testing" "time" "github.com/stretchr/testify/assert" "github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf" - "github.com/netobserv/netobserv-ebpf-agent/pkg/utils" ) var ( @@ -70,7 +70,7 @@ func TestDedupe(t *testing.T) { input := make(chan []*Record, 100) output := make(chan []*Record, 100) - go Dedupe(time.Minute, false, false)(input, output) + go Dedupe(time.Minute, false, false, interfaceNamer)(input, output) input <- []*Record{ oneIf2, // record 1 at interface 2: should be accepted @@ -108,7 +108,7 @@ func TestDedupe_EvictFlows(t *testing.T) { input := make(chan []*Record, 100) output := make(chan []*Record, 100) - go Dedupe(15*time.Second, false, false)(input, output) + go Dedupe(15*time.Second, false, false, interfaceNamer)(input, output) // Should only accept records 1 and 2, at interface 1 input <- []*Record{oneIf1, twoIf1, oneIf2} @@ -143,7 +143,7 @@ func TestDedupeMerge(t *testing.T) { input := make(chan []*Record, 100) output := make(chan []*Record, 100) - go Dedupe(time.Minute, false, true)(input, output) + go Dedupe(time.Minute, false, true, interfaceNamer)(input, output) input <- []*Record{ oneIf2, // record 1 at interface 2: should be accepted @@ -155,10 +155,10 @@ func TestDedupeMerge(t *testing.T) { expectedMap := []map[string]uint8{ { - utils.GetInterfaceName(oneIf2.Id.IfIndex): oneIf2.Id.Direction, + interfaceNamer(int(oneIf2.Id.IfIndex)): oneIf2.Id.Direction, }, { - utils.GetInterfaceName(oneIf1.Id.IfIndex): oneIf1.Id.Direction, + interfaceNamer(int(oneIf1.Id.IfIndex)): oneIf1.Id.Direction, }, } @@ -174,3 +174,11 @@ type timerMock struct { func (tm *timerMock) Now() time.Time { return tm.now } + +func interfaceNamer(ifIndex int) string { + iface, err := net.InterfaceByIndex(ifIndex) + if err != nil { + return "unknown" + } + return iface.Name +} diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index a5eedfba6..f813fd9ca 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -91,11 +91,3 @@ func utsnameStr[T int8 | uint8](in []T) string { } return string(out) } - -func GetInterfaceName(ifIndex uint32) string { - iface, err := net.InterfaceByIndex(int(ifIndex)) - if err != nil { - return "unknown" - } - return iface.Name -}