Skip to content

Commit

Permalink
use cached api to retrieve interface name to avoid highcpu (#251)
Browse files Browse the repository at this point in the history
Signed-off-by: Mohamed Mahmoud <[email protected]>
  • Loading branch information
msherif1234 authored Jan 23, 2024
1 parent 3f14e8e commit 504add4
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 21 deletions.
2 changes: 1 addition & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*fl
rbTracer.SendsTo(accounter)

if f.cfg.Deduper == DeduperFirstCome {
deduper := node.AsMiddle(flow.Dedupe(f.cfg.DeduperFCExpiry, f.cfg.DeduperJustMark, f.cfg.DeduperMerge),
deduper := node.AsMiddle(flow.Dedupe(f.cfg.DeduperFCExpiry, f.cfg.DeduperJustMark, f.cfg.DeduperMerge, f.interfaceNamer),
node.ChannelBufferLen(f.cfg.BuffersLength))
mapTracer.SendsTo(deduper)
accounter.SendsTo(deduper)
Expand Down
11 changes: 5 additions & 6 deletions pkg/flow/deduper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/sirupsen/logrus"

"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
)

var dlog = logrus.WithField("component", "flow/Deduper")
Expand Down Expand Up @@ -40,7 +39,7 @@ type entry struct {
// (no activity for it during the expiration time)
// The justMark argument tells that the deduper should not drop the duplicate flows but
// set their Duplicate field.
func Dedupe(expireTime time.Duration, justMark, mergeDup bool) func(in <-chan []*Record, out chan<- []*Record) {
func Dedupe(expireTime time.Duration, justMark, mergeDup bool, ifaceNamer InterfaceNamer) func(in <-chan []*Record, out chan<- []*Record) {
cache := &deduperCache{
expire: expireTime,
entries: list.New(),
Expand All @@ -51,7 +50,7 @@ func Dedupe(expireTime time.Duration, justMark, mergeDup bool) func(in <-chan []
cache.removeExpired()
fwd := make([]*Record, 0, len(records))
for _, record := range records {
cache.checkDupe(record, justMark, mergeDup, &fwd)
cache.checkDupe(record, justMark, mergeDup, &fwd, ifaceNamer)
}
if len(fwd) > 0 {
out <- fwd
Expand All @@ -61,7 +60,7 @@ func Dedupe(expireTime time.Duration, justMark, mergeDup bool) func(in <-chan []
}

// checkDupe check current record if its already available nad if not added to fwd records list
func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Record) {
func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Record, ifaceNamer InterfaceNamer) {
mergeEntry := make(map[string]uint8)
rk := r.Id
// zeroes fields from key that should be ignored from the flow comparison
Expand Down Expand Up @@ -95,7 +94,7 @@ func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Rec
*fwd = append(*fwd, r)
}
if mergeDup {
ifName := utils.GetInterfaceName(r.Id.IfIndex)
ifName := ifaceNamer(int(r.Id.IfIndex))
mergeEntry[ifName] = r.Id.Direction
if dupEntryNew(*fEntry.dupList, mergeEntry) {
*fEntry.dupList = append(*fEntry.dupList, mergeEntry)
Expand All @@ -122,7 +121,7 @@ func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Rec
expiryTime: timeNow().Add(c.expire),
}
if mergeDup {
ifName := utils.GetInterfaceName(r.Id.IfIndex)
ifName := ifaceNamer(int(r.Id.IfIndex))
mergeEntry[ifName] = r.Id.Direction
r.DupList = append(r.DupList, mergeEntry)
e.dupList = &r.DupList
Expand Down
20 changes: 14 additions & 6 deletions pkg/flow/deduper_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package flow

import (
"net"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"
)

var (
Expand Down Expand Up @@ -70,7 +70,7 @@ func TestDedupe(t *testing.T) {
input := make(chan []*Record, 100)
output := make(chan []*Record, 100)

go Dedupe(time.Minute, false, false)(input, output)
go Dedupe(time.Minute, false, false, interfaceNamer)(input, output)

input <- []*Record{
oneIf2, // record 1 at interface 2: should be accepted
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestDedupe_EvictFlows(t *testing.T) {
input := make(chan []*Record, 100)
output := make(chan []*Record, 100)

go Dedupe(15*time.Second, false, false)(input, output)
go Dedupe(15*time.Second, false, false, interfaceNamer)(input, output)

// Should only accept records 1 and 2, at interface 1
input <- []*Record{oneIf1, twoIf1, oneIf2}
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestDedupeMerge(t *testing.T) {
input := make(chan []*Record, 100)
output := make(chan []*Record, 100)

go Dedupe(time.Minute, false, true)(input, output)
go Dedupe(time.Minute, false, true, interfaceNamer)(input, output)

input <- []*Record{
oneIf2, // record 1 at interface 2: should be accepted
Expand All @@ -155,10 +155,10 @@ func TestDedupeMerge(t *testing.T) {

expectedMap := []map[string]uint8{
{
utils.GetInterfaceName(oneIf2.Id.IfIndex): oneIf2.Id.Direction,
interfaceNamer(int(oneIf2.Id.IfIndex)): oneIf2.Id.Direction,
},
{
utils.GetInterfaceName(oneIf1.Id.IfIndex): oneIf1.Id.Direction,
interfaceNamer(int(oneIf1.Id.IfIndex)): oneIf1.Id.Direction,
},
}

Expand All @@ -174,3 +174,11 @@ type timerMock struct {
func (tm *timerMock) Now() time.Time {
return tm.now
}

func interfaceNamer(ifIndex int) string {
iface, err := net.InterfaceByIndex(ifIndex)
if err != nil {
return "unknown"
}
return iface.Name
}
8 changes: 0 additions & 8 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,3 @@ func utsnameStr[T int8 | uint8](in []T) string {
}
return string(out)
}

func GetInterfaceName(ifIndex uint32) string {
iface, err := net.InterfaceByIndex(int(ifIndex))
if err != nil {
return "unknown"
}
return iface.Name
}

0 comments on commit 504add4

Please sign in to comment.