From 03bb6a38b3f08c4aec7f20097b5aea7c8fae59ae Mon Sep 17 00:00:00 2001 From: "Mohamed S. Mahmoud" Date: Wed, 17 Jan 2024 09:42:39 -0500 Subject: [PATCH] Interface name wasn't populated yet so add func to find ifname (#233) Signed-off-by: Mohamed Mahmoud --- pkg/decode/decode_protobuf.go | 14 +++++++++---- pkg/decode/decode_protobuf_test.go | 3 ++- pkg/exporter/proto.go | 32 +++++++++++++++++++----------- pkg/flow/deduper.go | 31 +++++++++++++++++++++++++---- pkg/flow/deduper_test.go | 14 +++++++++++++ pkg/utils/utils.go | 8 ++++++++ 6 files changed, 81 insertions(+), 21 deletions(-) diff --git a/pkg/decode/decode_protobuf.go b/pkg/decode/decode_protobuf.go index b3b05b576..4d0ea2bbb 100644 --- a/pkg/decode/decode_protobuf.go +++ b/pkg/decode/decode_protobuf.go @@ -66,11 +66,17 @@ func PBFlowToMap(flow *pbflow.Record) config.GenericMap { out["Packets"] = flow.Packets } var interfaces []interface{} - var directions []interface{} - for _, entry := range flow.GetDupList() { - out["Interfaces"] = append([]interface{}{entry.Interface}, interfaces...) - out["FlowDirections"] = append([]interface{}{int(entry.Direction.Number())}, directions...) + var flowDirections []interface{} + + if len(flow.GetDupList()) != 0 { + for _, entry := range flow.GetDupList() { + interfaces = append(interfaces, entry.Interface) + flowDirections = append(flowDirections, entry.Direction) + } + out["Interfaces"] = interfaces + out["FlowDirections"] = flowDirections } + ethType := ethernet.EtherType(flow.EthProtocol) if ethType == ethernet.EtherTypeIPv4 || ethType == ethernet.EtherTypeIPv6 { out["SrcAddr"] = ipToStr(flow.Network.GetSrcAddr()) diff --git a/pkg/decode/decode_protobuf_test.go b/pkg/decode/decode_protobuf_test.go index 8867d03fa..1bca74c50 100644 --- a/pkg/decode/decode_protobuf_test.go +++ b/pkg/decode/decode_protobuf_test.go @@ -6,6 +6,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" + "github.com/stretchr/testify/assert" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" @@ -70,7 +71,7 @@ func TestPBFlowToMap(t *testing.T) { delete(out, "TimeReceived") assert.Equal(t, config.GenericMap{ "FlowDirection": 1, - "FlowDirections": []interface{}{1}, + "FlowDirections": []interface{}{pbflow.Direction(1)}, "Bytes": uint64(456), "SrcAddr": "1.2.3.4", "DstAddr": "5.6.7.8", diff --git a/pkg/exporter/proto.go b/pkg/exporter/proto.go index 4da92eab1..35708c296 100644 --- a/pkg/exporter/proto.go +++ b/pkg/exporter/proto.go @@ -85,12 +85,16 @@ func v4FlowToPB(fr *flow.Record) *pbflow.Record { if fr.Metrics.DnsRecord.Latency != 0 { pbflowRecord.DnsLatency = durationpb.New(fr.DNSLatency) } - pbflowRecord.DupList = make([]*pbflow.DupMapEntry, 0) - for _, m := range fr.DupList { - pbflowRecord.DupList = append(pbflowRecord.DupList, &pbflow.DupMapEntry{ - Interface: fr.Interface, - Direction: pbflow.Direction(m[fr.Interface]), - }) + if len(fr.DupList) != 0 { + pbflowRecord.DupList = make([]*pbflow.DupMapEntry, 0) + for _, m := range fr.DupList { + for key, value := range m { + pbflowRecord.DupList = append(pbflowRecord.DupList, &pbflow.DupMapEntry{ + Interface: key, + Direction: pbflow.Direction(value), + }) + } + } } return &pbflowRecord } @@ -142,12 +146,16 @@ func v6FlowToPB(fr *flow.Record) *pbflow.Record { if fr.Metrics.DnsRecord.Latency != 0 { pbflowRecord.DnsLatency = durationpb.New(fr.DNSLatency) } - pbflowRecord.DupList = make([]*pbflow.DupMapEntry, 0) - for _, m := range fr.DupList { - pbflowRecord.DupList = append(pbflowRecord.DupList, &pbflow.DupMapEntry{ - Interface: fr.Interface, - Direction: pbflow.Direction(m[fr.Interface]), - }) + if len(fr.DupList) != 0 { + pbflowRecord.DupList = make([]*pbflow.DupMapEntry, 0) + for _, m := range fr.DupList { + for key, value := range m { + pbflowRecord.DupList = append(pbflowRecord.DupList, &pbflow.DupMapEntry{ + Interface: key, + Direction: pbflow.Direction(value), + }) + } + } } return &pbflowRecord } diff --git a/pkg/flow/deduper.go b/pkg/flow/deduper.go index f4d1c72a6..85d6d91fd 100644 --- a/pkg/flow/deduper.go +++ b/pkg/flow/deduper.go @@ -2,11 +2,13 @@ package flow import ( "container/list" + "reflect" "time" "github.com/sirupsen/logrus" "github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf" + "github.com/netobserv/netobserv-ebpf-agent/pkg/utils" ) var dlog = logrus.WithField("component", "flow/Deduper") @@ -93,8 +95,17 @@ func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Rec *fwd = append(*fwd, r) } if mergeDup { - mergeEntry[r.Interface] = r.Id.Direction - *fEntry.dupList = append(*fEntry.dupList, mergeEntry) + ifName := utils.GetInterfaceName(r.Id.IfIndex) + mergeEntry[ifName] = r.Id.Direction + if dupEntryNew(*fEntry.dupList, mergeEntry) { + *fEntry.dupList = append(*fEntry.dupList, mergeEntry) + dlog.Debugf("merge list entries dump:") + for _, entry := range *fEntry.dupList { + for k, v := range entry { + dlog.Debugf("interface %s dir %d", k, v) + } + } + } } return } @@ -111,7 +122,8 @@ func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Rec expiryTime: timeNow().Add(c.expire), } if mergeDup { - mergeEntry[r.Interface] = r.Id.Direction + ifName := utils.GetInterfaceName(r.Id.IfIndex) + mergeEntry[ifName] = r.Id.Direction r.DupList = append(r.DupList, mergeEntry) e.dupList = &r.DupList } @@ -119,6 +131,15 @@ func (c *deduperCache) checkDupe(r *Record, justMark, mergeDup bool, fwd *[]*Rec *fwd = append(*fwd, r) } +func dupEntryNew(dupList []map[string]uint8, mergeEntry map[string]uint8) bool { + for _, entry := range dupList { + if reflect.DeepEqual(entry, mergeEntry) { + return false + } + } + return true +} + func (c *deduperCache) removeExpired() { now := timeNow() ele := c.entries.Back() @@ -126,7 +147,9 @@ func (c *deduperCache) removeExpired() { for ele != nil && now.After(ele.Value.(*entry).expiryTime) { evicted++ c.entries.Remove(ele) - delete(c.ifaces, *ele.Value.(*entry).key) + fEntry := ele.Value.(*entry) + fEntry.dupList = nil + delete(c.ifaces, *fEntry.key) ele = c.entries.Back() } if evicted > 0 { diff --git a/pkg/flow/deduper_test.go b/pkg/flow/deduper_test.go index e91dbe4af..1bd9cb907 100644 --- a/pkg/flow/deduper_test.go +++ b/pkg/flow/deduper_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf" + "github.com/netobserv/netobserv-ebpf-agent/pkg/utils" ) var ( @@ -151,6 +152,19 @@ func TestDedupeMerge(t *testing.T) { deduped := receiveTimeout(t, output) assert.Equal(t, []*Record{oneIf2}, deduped) assert.Equal(t, 2, len(oneIf2.DupList)) + + expectedMap := []map[string]uint8{ + { + utils.GetInterfaceName(oneIf2.Id.IfIndex): oneIf2.Id.Direction, + }, + { + utils.GetInterfaceName(oneIf1.Id.IfIndex): oneIf1.Id.Direction, + }, + } + + for k, v := range oneIf2.DupList { + assert.Equal(t, expectedMap[k], v) + } } type timerMock struct { diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index f813fd9ca..c2a4c80ea 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -91,3 +91,11 @@ func utsnameStr[T int8 | uint8](in []T) string { } return string(out) } + +func GetInterfaceName(ifIndex uint32) string { + iface, err := net.InterfaceByIndex(int(ifIndex)) + if err != nil { + return "" + } + return iface.Name +}