Skip to content

Commit

Permalink
Add the sampled value to flows
Browse files Browse the repository at this point in the history
Signed-off-by: Mohamed Mahmoud <[email protected]>
  • Loading branch information
msherif1234 committed Dec 13, 2024
1 parent cd873a3 commit 5c07db3
Show file tree
Hide file tree
Showing 17 changed files with 78 additions and 50 deletions.
9 changes: 6 additions & 3 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@
*/
#include "pkt_translation.h"

static inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt, u64 len) {
static inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt, u64 len,
u32 sampling) {
bpf_spin_lock(&aggregate_flow->lock);
aggregate_flow->packets += 1;
aggregate_flow->bytes += len;
aggregate_flow->end_mono_time_ts = pkt->current_ts;
aggregate_flow->flags |= pkt->flags;
aggregate_flow->dscp = pkt->dscp;
aggregate_flow->sampling = sampling;
bpf_spin_unlock(&aggregate_flow->lock);
}

Expand Down Expand Up @@ -123,7 +125,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
}
flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
update_existing_flow(aggregate_flow, &pkt, len);
update_existing_flow(aggregate_flow, &pkt, len, filter_sampling);
} else {
// Key does not exist in the map, and will need to create a new entry.
flow_metrics new_flow = {
Expand All @@ -134,6 +136,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
.end_mono_time_ts = pkt.current_ts,
.flags = pkt.flags,
.dscp = pkt.dscp,
.sampling = filter_sampling,
};
__builtin_memcpy(new_flow.dst_mac, eth->h_dest, ETH_ALEN);
__builtin_memcpy(new_flow.src_mac, eth->h_source, ETH_ALEN);
Expand All @@ -147,7 +150,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
flow_metrics *aggregate_flow =
(flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
update_existing_flow(aggregate_flow, &pkt, len);
update_existing_flow(aggregate_flow, &pkt, len, filter_sampling);
} else {
if (trace_messages) {
bpf_printk("failed to update an exising flow\n");
Expand Down
1 change: 1 addition & 0 deletions bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ typedef struct flow_metrics_t {
// https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
u8 errno;
u8 dscp;
u32 sampling;
} flow_metrics;

// Force emitting enums/structs into the ELF
Expand Down
3 changes: 3 additions & 0 deletions pkg/decode/decode_protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ func RecordToMap(fr *model.Record) config.GenericMap {
out["Packets"] = fr.Metrics.Packets
}

if fr.Metrics.Sampling != 0 {
out["Sampling"] = fr.Metrics.Sampling
}
var interfaces []string
var directions []int
if len(fr.DupList) != 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ebpf/bpf_arm64_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_arm64_bpfel.o
Binary file not shown.
2 changes: 1 addition & 1 deletion pkg/ebpf/bpf_powerpc_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_powerpc_bpfel.o
Binary file not shown.
2 changes: 1 addition & 1 deletion pkg/ebpf/bpf_s390_bpfeb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_s390_bpfeb.o
Binary file not shown.
2 changes: 1 addition & 1 deletion pkg/ebpf/bpf_x86_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_x86_bpfel.o
Binary file not shown.
4 changes: 4 additions & 0 deletions pkg/exporter/converters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestConversions(t *testing.T) {
Packets: 123,
Flags: 0x100,
Dscp: 64,
Sampling: 1,
},
AdditionalMetrics: &ebpf.BpfAdditionalMetrics{
DnsRecord: ebpf.BpfDnsRecordT{
Expand All @@ -74,6 +75,7 @@ func TestConversions(t *testing.T) {
"SrcPort": 23000,
"DstPort": 443,
"Flags": 0x100,
"Sampling": 1,
"TimeFlowStartMs": someTime.UnixMilli(),
"TimeFlowEndMs": someTime.UnixMilli(),
"Interfaces": []string{"eth0"},
Expand All @@ -99,6 +101,7 @@ func TestConversions(t *testing.T) {
Bytes: 456,
Packets: 123,
Dscp: 64,
Sampling: 2,
},
},
Interface: "eth0",
Expand All @@ -117,6 +120,7 @@ func TestConversions(t *testing.T) {
"Etype": 2048,
"Packets": 123,
"Proto": 17,
"Sampling": 2,
"SrcPort": 23000,
"DstPort": 443,
"TimeFlowStartMs": someTime.UnixMilli(),
Expand Down
3 changes: 3 additions & 0 deletions pkg/model/flow_content.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ func AccumulateBase(p *ebpf.BpfFlowMetrics, other *ebpf.BpfFlowMetrics) *ebpf.Bp
if other.Dscp != 0 {
p.Dscp = other.Dscp
}
if other.Sampling != 0 {
p.Sampling = other.Sampling
}
return p
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/model/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestRecordBinaryEncoding(t *testing.T) {
0x13, 0x14, // flags
0x33, // u8 errno
0x60, // u8 dscp
0x00, 0x00, 0x00, 0x00, // 4 bytes padding
0x00, 0x00, 0x00, 0x00, // 4 bytes sampling
}))
require.NoError(t, err)

Expand All @@ -67,6 +67,7 @@ func TestRecordBinaryEncoding(t *testing.T) {
Flags: 0x1413,
Errno: 0x33,
Dscp: 0x60,
Sampling: 0x00,
},
}, *fr)
// assert that IP addresses are interpreted as IPv4 addresses
Expand Down
94 changes: 52 additions & 42 deletions pkg/pbflow/flow.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/pbflow/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func FlowToPB(fr *model.Record, s *ovnobserv.SampleDecoder) *Record {
Flags: uint32(fr.Metrics.Flags),
Interface: fr.Interface,
TimeFlowRtt: durationpb.New(fr.TimeFlowRtt),
Sampling: fr.Metrics.Sampling,
}
if fr.Metrics.AdditionalMetrics != nil {
pbflowRecord.PktDropBytes = fr.Metrics.AdditionalMetrics.PktDrops.Bytes
Expand Down Expand Up @@ -181,6 +182,7 @@ func PBToFlow(pb *Record) *model.Record {
Packets: uint32(pb.Packets),
Flags: uint16(pb.Flags),
Dscp: uint8(pb.Network.Dscp),
Sampling: pb.Sampling,
},
AdditionalMetrics: &ebpf.BpfAdditionalMetrics{
PktDrops: ebpf.BpfPktDropsT{
Expand Down
1 change: 1 addition & 0 deletions proto/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ message Record {
repeated DupMapEntry dup_list = 26;
repeated NetworkEvent network_events_metadata = 27;
Xlat xlat = 28;
uint32 sampling = 29;
}

message DataLink {
Expand Down

0 comments on commit 5c07db3

Please sign in to comment.