Skip to content

Commit

Permalink
NETOBSERV-1996: in-kernel de-duplication (#470)
Browse files Browse the repository at this point in the history
* NETOBSERV-2031: fix agent verifier issue and userspace crash with no features

Signed-off-by: Mohamed Mahmoud <[email protected]>

* NETOBSERV-1996: in-kernel de-duplication

- Remove interface from the flow key; instead, use it as flow value. The
  first interface+dir seen for a given flow is the one taken into account
for counters. Other interfaces+dirs are stored in a separate map for this
flow. This algorithm is more or less the deduper algo that we had in
userspace.
- Remove user-space deduper
- Adapt user-space model for the new interfaces+directions array
  provided directly from ebpf structs
- Remove "decorator" (which was doing the interface naming). This is
  just for simplification. This enrichment is now done in a more
straightforward way, when creating the Record objects

try optimizing alignment

* Fix correctness: copy metrics instead of pointer

* reformat

* Fix test

* remove packed, handle error

* move accumulate test to model

* Increase counter and log when cant observe interface

* Fix rebase issues

* Remove obsolete comment + update arch chart

* Add timestamp/ethproto on observed interfaces

---------

Signed-off-by: Mohamed Mahmoud <[email protected]>
Co-authored-by: Mohamed Mahmoud <[email protected]>
  • Loading branch information
jotak and msherif1234 authored Jan 8, 2025
1 parent 7d44bea commit 7c29f2a
Show file tree
Hide file tree
Showing 44 changed files with 747 additions and 1,307 deletions.
58 changes: 49 additions & 9 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,23 @@ static inline void update_dns(additional_metrics *extra_metrics, pkt_info *pkt,
}
}

static inline void add_observed_intf(additional_metrics *value, u32 if_index, u8 direction) {
if (value->nb_observed_intf < MAX_OBSERVED_INTERFACES) {
for (u8 i = 0; i < value->nb_observed_intf; i++) {
if (value->observed_intf[i].if_index == if_index &&
value->observed_intf[i].direction == direction) {
return;
}
}
value->observed_intf[value->nb_observed_intf].if_index = if_index;
value->observed_intf[value->nb_observed_intf].direction = direction;
value->nb_observed_intf++;
} else {
increase_counter(OBSERVED_INTF_MISSED);
BPF_PRINTK("observed interface missed (array capacity reached) for ifindex %d\n", if_index);
}
}

static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
u32 filter_sampling = 0;

Expand Down Expand Up @@ -110,13 +127,10 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
return TC_ACT_OK;
}

//Set extra fields
id.if_index = skb->ifindex;
id.direction = direction;

// check if this packet need to be filtered if filtering feature is enabled
if (is_filter_enabled()) {
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, &filter_sampling);
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, &filter_sampling,
direction);
if (filter_sampling == 0) {
filter_sampling = sampling;
}
Expand All @@ -137,11 +151,40 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
}
flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
update_existing_flow(aggregate_flow, &pkt, len, filter_sampling);
if (aggregate_flow->if_index_first_seen == skb->ifindex) {
update_existing_flow(aggregate_flow, &pkt, len, filter_sampling);
} else if (skb->ifindex != 0) {
// Only add info that we've seen this interface
additional_metrics *extra_metrics =
(additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id);
if (extra_metrics != NULL) {
add_observed_intf(extra_metrics, skb->ifindex, direction);
} else {
additional_metrics new_metrics = {
.eth_protocol = eth_protocol,
.start_mono_time_ts = pkt.current_ts,
.end_mono_time_ts = pkt.current_ts,
};
add_observed_intf(&new_metrics, skb->ifindex, direction);
long ret =
bpf_map_update_elem(&additional_flow_metrics, &id, &new_metrics, BPF_NOEXIST);
if (ret == -EEXIST) {
extra_metrics =
(additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id);
if (extra_metrics != NULL) {
add_observed_intf(extra_metrics, skb->ifindex, direction);
}
} else if (ret != 0 && trace_messages) {
bpf_printk("error creating new observed_intf: %d\n", ret);
}
}
}
} else {
// Key does not exist in the map, and will need to create a new entry.
flow_metrics new_flow;
__builtin_memset(&new_flow, 0, sizeof(new_flow));
new_flow.if_index_first_seen = skb->ifindex;
new_flow.direction_first_seen = direction;
new_flow.packets = 1;
new_flow.bytes = len;
new_flow.eth_protocol = eth_protocol;
Expand Down Expand Up @@ -193,9 +236,6 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {

// Update additional metrics (per-CPU map)
if (pkt.dns_id != 0 || dns_errno != 0) {
// hack on id will be removed with dedup-in-kernel work
id.direction = 0;
id.if_index = 0;
additional_metrics *extra_metrics =
(additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id);
if (extra_metrics != NULL) {
Expand Down
16 changes: 10 additions & 6 deletions bpf/flows_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ static __always_inline int is_equal_ip(u8 *ip1, u8 *ip2, u8 len) {

static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_t *key,
filter_action *action, u8 len, u8 offset,
u16 flags, u32 drop_reason, u32 *sampling) {
u16 flags, u32 drop_reason, u32 *sampling,
u8 direction) {
int result = 0;

struct filter_value_t *rule = (struct filter_value_t *)bpf_map_lookup_elem(&filter_map, key);
Expand Down Expand Up @@ -161,7 +162,7 @@ static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_

if (!is_zero_ip(rule->ip, len)) {
// for Ingress side we can filter using dstIP and for Egress side we can filter using srcIP
if (id->direction == INGRESS) {
if (direction == INGRESS) {
if (is_equal_ip(rule->ip, id->dst_ip + offset, len)) {
BPF_PRINTK("dstIP matched\n");
result++;
Expand All @@ -181,7 +182,7 @@ static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_
}

if (rule->direction != MAX_DIRECTION) {
if (rule->direction == id->direction) {
if (rule->direction == direction) {
BPF_PRINTK("direction matched\n");
result++;
} else {
Expand Down Expand Up @@ -237,7 +238,8 @@ static __always_inline int flow_filter_setup_lookup_key(flow_id *id, struct filt
* check if the flow match filter rule and return >= 1 if the flow is to be dropped
*/
static __always_inline int is_flow_filtered(flow_id *id, filter_action *action, u16 flags,
u32 drop_reason, u16 eth_protocol, u32 *sampling) {
u32 drop_reason, u16 eth_protocol, u32 *sampling,
u8 direction) {
struct filter_key_t key;
u8 len, offset;
int result = 0;
Expand All @@ -251,7 +253,8 @@ static __always_inline int is_flow_filtered(flow_id *id, filter_action *action,
return result;
}

result = do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason, sampling);
result = do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason, sampling,
direction);
// we have a match so return
if (result > 0) {
return result;
Expand All @@ -263,7 +266,8 @@ static __always_inline int is_flow_filtered(flow_id *id, filter_action *action,
return result;
}

return do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason, sampling);
return do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason, sampling,
direction);
}

#endif //__FLOWS_FILTER_H__
11 changes: 4 additions & 7 deletions bpf/network_events_monitoring.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,14 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me
}

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL);
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL, 0);
if (skip) {
return 0;
}

for (direction dir = INGRESS; dir < MAX_DIRECTION; dir++) {
id.direction = dir;
ret = lookup_and_update_existing_flow_network_events(&id, md_len, user_cookie);
if (ret == 0) {
return ret;
}
ret = lookup_and_update_existing_flow_network_events(&id, md_len, user_cookie);
if (ret == 0) {
return ret;
}

// there is no matching flows so lets create new one and add the network event metadata
Expand Down
6 changes: 1 addition & 5 deletions bpf/pca.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,8 @@ static inline bool validate_pca_filter(struct __sk_buff *skb, direction dir) {
return false;
}

//Set extra fields
id.if_index = skb->ifindex;
id.direction = dir;

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, NULL);
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, NULL, dir);
if (skip) {
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion bpf/pkt_drops.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ static inline int trace_pkt_drop(void *ctx, u8 state, struct sk_buff *skb,
}

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, flags, reason, eth_protocol, NULL);
bool skip = check_and_do_flow_filtering(&id, flags, reason, eth_protocol, NULL, 0);
if (skip) {
return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion bpf/pkt_translation.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ static inline int trace_nat_manip_pkt(struct nf_conn *ct, struct sk_buff *skb) {
}

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL);
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL, 0);
if (skip) {
return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion bpf/rtt_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ static inline int calculate_flow_rtt_tcp(struct sock *sk, struct sk_buff *skb) {
rtt *= 1000u;

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL);
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL, 0);
if (skip) {
return 0;
}
Expand Down
46 changes: 24 additions & 22 deletions bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ typedef __u64 u64;
#define MAX_FILTER_ENTRIES 16
#define MAX_EVENT_MD 8
#define MAX_NETWORK_EVENTS 4
#define MAX_OBSERVED_INTERFACES 4

// according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml
typedef enum direction_t {
Expand All @@ -80,26 +81,29 @@ const enum direction_t *unused1 __attribute__((unused));
const u8 ip4in6[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff};

typedef struct flow_metrics_t {
struct bpf_spin_lock lock;
u16 eth_protocol;
// L2 data link layer
u8 src_mac[ETH_ALEN];
u8 dst_mac[ETH_ALEN];
u32 packets;
u64 bytes;
// Flow start and end times as monotomic timestamps in nanoseconds
// as output from bpf_ktime_get_ns()
u64 start_mono_time_ts;
u64 end_mono_time_ts;
u64 bytes;
u32 packets;
u16 eth_protocol;
// TCP Flags from https://www.ietf.org/rfc/rfc793.txt
u16 flags;
// L2 data link layer
u8 src_mac[ETH_ALEN];
u8 dst_mac[ETH_ALEN];
// OS interface index
u32 if_index_first_seen;
struct bpf_spin_lock lock;
u32 sampling;
u8 direction_first_seen;
// The positive errno of a failed map insertion that caused a flow
// to be sent via ringbuffer.
// 0 otherwise
// https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
u8 errno;
u8 dscp;
u32 sampling;
} flow_metrics;

// Force emitting enums/structs into the ELF
Expand All @@ -109,22 +113,20 @@ typedef struct additional_metrics_t {
u64 start_mono_time_ts;
u64 end_mono_time_ts;
struct dns_record_t {
u64 latency;
u16 id;
u16 flags;
u64 latency;
u8 errno;
} dns_record;
struct pkt_drops_t {
u32 packets;
u64 bytes;
u32 packets;
u32 latest_drop_cause;
u16 latest_flags;
u8 latest_state;
u32 latest_drop_cause;
} pkt_drops;
u64 flow_rtt;
u8 network_events_idx;
u8 network_events[MAX_NETWORK_EVENTS][MAX_EVENT_MD];
u16 eth_protocol;
struct translated_flow_t {
u8 saddr[IP_MAX_LEN];
u8 daddr[IP_MAX_LEN];
Expand All @@ -133,23 +135,24 @@ typedef struct additional_metrics_t {
u16 zone_id;
u8 icmp_id;
} translated_flow;
struct observed_intf_t {
u8 direction;
u32 if_index;
} observed_intf[MAX_OBSERVED_INTERFACES];
u16 eth_protocol;
u8 network_events_idx;
u8 nb_observed_intf;
} additional_metrics;

// Force emitting enums/structs into the ELF
const struct additional_metrics_t *unused3 __attribute__((unused));

// Force emitting enums/structs into the ELF
const struct dns_record_t *unused4 __attribute__((unused));

// Force emitting enums/structs into the ELF
const struct pkt_drops_t *unused5 __attribute__((unused));

// Force emitting struct translated_flow_t into the ELF.
const struct translated_flow_t *unused6 __attribute__((unused));
const struct observed_intf_t *unused13 __attribute__((unused));

// Attributes that uniquely identify a flow
typedef struct flow_id_t {
u8 direction;
// L3 network layer
// IPv4 addresses are encoded as IPv6 addresses with prefix ::ffff/96
// as described in https://datatracker.ietf.org/doc/html/rfc4038#section-4.2
Expand All @@ -162,8 +165,6 @@ typedef struct flow_id_t {
// ICMP protocol
u8 icmp_type;
u8 icmp_code;
// OS interface index
u32 if_index;
} flow_id;

// Force emitting enums/structs into the ELF
Expand Down Expand Up @@ -220,6 +221,7 @@ typedef enum global_counters_key_t {
NETWORK_EVENTS_ERR_GROUPID_MISMATCH,
NETWORK_EVENTS_ERR_UPDATE_MAP_FLOWS,
NETWORK_EVENTS_GOOD,
OBSERVED_INTF_MISSED,
MAX_COUNTERS,
} global_counters_key;

Expand Down
8 changes: 5 additions & 3 deletions bpf/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,14 @@ static inline bool is_filter_enabled() {
/*
* check if flow filter is enabled and if we need to continue processing the packet or not
*/
static inline bool check_and_do_flow_filtering(flow_id *id, u16 flags, u32 drop_reason,
u16 eth_protocol, u32 *sampling) {
static __always_inline bool check_and_do_flow_filtering(flow_id *id, u16 flags, u32 drop_reason,
u16 eth_protocol, u32 *sampling,
u8 direction) {
// check if this packet need to be filtered if filtering feature is enabled
if (is_filter_enabled()) {
filter_action action = ACCEPT;
if (is_flow_filtered(id, &action, flags, drop_reason, eth_protocol, sampling) != 0 &&
if (is_flow_filtered(id, &action, flags, drop_reason, eth_protocol, sampling, direction) !=
0 &&
action != MAX_FILTER_ACTIONS) {
// we have matching rules follow through the actions to decide if we should accept or reject the flow
// and update global counter for both cases
Expand Down
3 changes: 0 additions & 3 deletions cmd/netobserv-ebpf-agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ func main() {
Error("PProf HTTP listener stopped working")
}()
}
if config.DeduperFCExpiry == 0 {
config.DeduperFCExpiry = 2 * config.CacheActiveTimeout
}

logrus.WithField("configuration", fmt.Sprintf("%#v", config)).Debugf("configuration loaded")

Expand Down
18 changes: 6 additions & 12 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ The following graph provides a birds' eye view on how the different components a

For more info on each component, please check their corresponding Go docs.

<!-- When editing, you can use an online editor for a live preview, e.g. https://mermaid.live/ -->

### Kernel space

```mermaid
Expand Down Expand Up @@ -33,21 +35,13 @@ flowchart TD
```mermaid
flowchart TD
E(ebpf.FlowFetcher) --> |"pushes via<br/>RingBuffer"| RB(flow.RingBufTracer)
style E fill:#990
style E fill:#7CA
E --> |"polls<br/>HashMap"| M(flow.MapTracer)
RB --> |chan *model.Record| ACC(flow.Accounter)
RB -.-> |flushes| M
ACC --> |"chan []*model.Record"| DD(flow.Deduper)
M --> |"chan []*model.Record"| DD
subgraph Optional
DD
end
DD --> |"chan []*model.Record"| CL(flow.CapacityLimiter)
ACC --> |"chan []*model.Record"| CL(flow.CapacityLimiter)
M --> |"chan []*model.Record"| CL
CL --> |"chan []*model.Record"| DC(flow.Decorator)
DC --> |"chan []*model.Record"| EX("export.GRPCProto<br/>or<br/>export.KafkaProto<br/>or<br/>export.DirectFLP")
CL --> |"chan []*model.Record"| EX("export.GRPCProto<br/>or<br/>export.KafkaProto<br/>or<br/>export.DirectFLP")
```
Loading

0 comments on commit 7c29f2a

Please sign in to comment.