Skip to content

Commit

Permalink
optimize filter sampling logic when filtering not enabled (#488)
Browse files Browse the repository at this point in the history
* optimize filter sampling logic when filtering not enabled

Signed-off-by: Mohamed Mahmoud <[email protected]>

* Fix generating flows from additional map

It's possible that flow ids found in the additional map don't match any
id from the main flow map, for different reasons. This is especially
visible when filtering for drops, as in this case all flows are produced
only by the drop hook.

In these cases, we must be able to reconstruct the flows from the user
space. 3 things were particularly missing: end/start time and
eth_protocol. So, they are added back into the additional map.

Also, in user space, need to iterate separately over the two maps, to
not miss any orphan flow.

* Format, fix test, handle legacy mode

---------

Signed-off-by: Mohamed Mahmoud <[email protected]>
Co-authored-by: Joel Takvorian <[email protected]>
  • Loading branch information
msherif1234 and jotak authored Dec 20, 2024
1 parent ecdb840 commit 4d8f9d9
Show file tree
Hide file tree
Showing 20 changed files with 182 additions and 51 deletions.
43 changes: 29 additions & 14 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ static inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *

static inline void update_dns(additional_metrics *extra_metrics, pkt_info *pkt, int dns_errno) {
if (pkt->dns_id != 0) {
extra_metrics->end_mono_time_ts = pkt->current_ts;
extra_metrics->dns_record.id = pkt->dns_id;
extra_metrics->dns_record.flags = pkt->dns_flags;
extra_metrics->dns_record.latency = pkt->dns_latency;
Expand All @@ -79,6 +80,17 @@ static inline void update_dns(additional_metrics *extra_metrics, pkt_info *pkt,
}

static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
u32 filter_sampling = 0;

if (!is_filter_enabled()) {
if (sampling > 1 && (bpf_get_prandom_u32() % sampling) != 0) {
do_sampling = 0;
return TC_ACT_OK;
}
filter_sampling = sampling;
do_sampling = 1;
}

u16 eth_protocol = 0;
pkt_info pkt;
__builtin_memset(&pkt, 0, sizeof(pkt));
Expand All @@ -103,21 +115,21 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
id.direction = direction;

// check if this packet need to be filtered if filtering feature is enabled
u32 filter_sampling = 0;
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, &filter_sampling);
if (skip) {
return TC_ACT_OK;
}
if (filter_sampling == 0) {
filter_sampling = sampling;
}

// If sampling is defined, will only parse 1 out of "sampling" flows
if (filter_sampling > 1 && (bpf_get_prandom_u32() % filter_sampling) != 0) {
do_sampling = 0;
return TC_ACT_OK;
if (is_filter_enabled()) {
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, &filter_sampling);
if (filter_sampling == 0) {
filter_sampling = sampling;
}
// If sampling is defined, will only parse 1 out of "sampling" flows
if (filter_sampling > 1 && (bpf_get_prandom_u32() % filter_sampling) != 0) {
do_sampling = 0;
return TC_ACT_OK;
}
do_sampling = 1;
if (skip) {
return TC_ACT_OK;
}
}
do_sampling = 1;

int dns_errno = 0;
if (enable_dns_tracking) {
Expand Down Expand Up @@ -191,6 +203,9 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
update_dns(extra_metrics, &pkt, dns_errno);
} else {
additional_metrics new_metrics = {
.start_mono_time_ts = pkt.current_ts,
.end_mono_time_ts = pkt.current_ts,
.eth_protocol = eth_protocol,
.dns_record.id = pkt.dns_id,
.dns_record.flags = pkt.dns_flags,
.dns_record.latency = pkt.dns_latency,
Expand Down
13 changes: 6 additions & 7 deletions bpf/flows_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,18 @@ static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_
struct filter_value_t *rule = (struct filter_value_t *)bpf_map_lookup_elem(&filter_map, key);

if (rule) {
BPF_PRINTK("rule found\n");
BPF_PRINTK("rule found drop_reason %d flags %d\n", drop_reason, flags);
result++;
if (rule->action != MAX_FILTER_ACTIONS) {
BPF_PRINTK("action matched: %d\n", rule->action);
*action = rule->action;
result++;
}

if (rule->sample && sampling != NULL) {
BPF_PRINTK("sampling action is set to %d\n", rule->sample);
*sampling = rule->sample;
result++;
}
// match specific rule protocol or use wildcard protocol
if (rule->protocol == id->transport_protocol || rule->protocol == 0) {
switch (id->transport_protocol) {
Expand Down Expand Up @@ -195,11 +199,6 @@ static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_
goto end;
}
}
u32 sample = rule->sample;
if (sample && sampling != NULL) {
BPF_PRINTK("sampling action is set to %d\n", sample);
*sampling = sample;
}
}
end:
BPF_PRINTK("result: %d action %d\n", result, *action);
Expand Down
5 changes: 5 additions & 0 deletions bpf/network_events_monitoring.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ static inline int lookup_and_update_existing_flow_network_events(flow_id *id, u8
additional_metrics *extra_metrics = bpf_map_lookup_elem(&additional_flow_metrics, id);
if (extra_metrics != NULL) {
u8 idx = extra_metrics->network_events_idx;
extra_metrics->end_mono_time_ts = bpf_ktime_get_ns();
// Needed to check length here again to keep JIT verifier happy
if (idx < MAX_NETWORK_EVENTS && md_len <= MAX_EVENT_MD) {
if (!md_already_exists(extra_metrics->network_events, (u8 *)cookie)) {
Expand Down Expand Up @@ -107,7 +108,11 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me
}

// there is no matching flows so lets create new one and add the network event metadata
u64 current_time = bpf_ktime_get_ns();
additional_metrics new_flow = {
.start_mono_time_ts = current_time,
.end_mono_time_ts = current_time,
.eth_protocol = eth_protocol,
.network_events_idx = 0,
};
bpf_probe_read(new_flow.network_events[0], md_len, user_cookie);
Expand Down
5 changes: 5 additions & 0 deletions bpf/pkt_drops.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ static inline long pkt_drop_lookup_and_update_flow(flow_id *id, u8 state, u16 fl
enum skb_drop_reason reason, u64 len) {
additional_metrics *extra_metrics = bpf_map_lookup_elem(&additional_flow_metrics, id);
if (extra_metrics != NULL) {
extra_metrics->end_mono_time_ts = bpf_ktime_get_ns();
extra_metrics->pkt_drops.packets += 1;
extra_metrics->pkt_drops.bytes += len;
extra_metrics->pkt_drops.latest_state = state;
Expand Down Expand Up @@ -73,7 +74,11 @@ static inline int trace_pkt_drop(void *ctx, u8 state, struct sk_buff *skb,
return 0;
}
// there is no matching flows so lets create new one and add the drops
u64 current_time = bpf_ktime_get_ns();
additional_metrics new_flow = {
.start_mono_time_ts = current_time,
.end_mono_time_ts = current_time,
.eth_protocol = eth_protocol,
.pkt_drops.packets = 1,
.pkt_drops.bytes = len,
.pkt_drops.latest_state = state,
Expand Down
14 changes: 11 additions & 3 deletions bpf/pkt_translation.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ static inline void parse_tuple(struct nf_conntrack_tuple *t, struct translated_f
static inline long translate_lookup_and_update_flow(flow_id *id, u16 flags,
struct nf_conntrack_tuple *orig_t,
struct nf_conntrack_tuple *reply_t, u16 zone_id,
u16 family) {
u16 family, u16 eth_protocol) {
long ret = 0;
struct translated_flow_t orig;

Expand All @@ -80,15 +80,21 @@ static inline long translate_lookup_and_update_flow(flow_id *id, u16 flags,
__builtin_memcpy(id->dst_ip, orig.daddr, IP_MAX_LEN);
id->src_port = orig.sport;
id->dst_port = orig.dport;
u64 current_time = bpf_ktime_get_ns();

additional_metrics *extra_metrics = bpf_map_lookup_elem(&additional_flow_metrics, id);
if (extra_metrics != NULL) {
extra_metrics->end_mono_time_ts = current_time;
parse_tuple(reply_t, &extra_metrics->translated_flow, zone_id, family, true);
return ret;
}

// there is no matching flows so lets create new one and add the xlation
additional_metrics new_extra_metrics = {};
additional_metrics new_extra_metrics = {
.start_mono_time_ts = current_time,
.end_mono_time_ts = current_time,
.eth_protocol = eth_protocol,
};
parse_tuple(reply_t, &new_extra_metrics.translated_flow, zone_id, family, true);
ret = bpf_map_update_elem(&additional_flow_metrics, id, &new_extra_metrics, BPF_NOEXIST);
if (ret != 0) {
Expand All @@ -98,6 +104,7 @@ static inline long translate_lookup_and_update_flow(flow_id *id, u16 flags,
if (ret == -EEXIST) {
additional_metrics *extra_metrics = bpf_map_lookup_elem(&additional_flow_metrics, id);
if (extra_metrics != NULL) {
extra_metrics->end_mono_time_ts = current_time;
parse_tuple(reply_t, &extra_metrics->translated_flow, zone_id, family, true);
return 0;
}
Expand Down Expand Up @@ -164,7 +171,8 @@ static inline int trace_nat_manip_pkt(struct nf_conn *ct, struct sk_buff *skb) {
BPF_PRINTK("Xlat: protocol %d flags 0x%x family %d dscp %d\n", protocol, flags, family, dscp);

bpf_probe_read(&zone_id, sizeof(zone_id), &ct->zone.id);
ret = translate_lookup_and_update_flow(&id, flags, orig_tuple, reply_tuple, zone_id, family);
ret = translate_lookup_and_update_flow(&id, flags, orig_tuple, reply_tuple, zone_id, family,
eth_protocol);

return ret;
}
Expand Down
5 changes: 5 additions & 0 deletions bpf/rtt_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
static inline int rtt_lookup_and_update_flow(flow_id *id, u64 rtt) {
additional_metrics *extra_metrics = bpf_map_lookup_elem(&additional_flow_metrics, id);
if (extra_metrics != NULL) {
extra_metrics->end_mono_time_ts = bpf_ktime_get_ns();
if (extra_metrics->flow_rtt < rtt) {
extra_metrics->flow_rtt = rtt;
}
Expand Down Expand Up @@ -69,7 +70,11 @@ static inline int calculate_flow_rtt_tcp(struct sock *sk, struct sk_buff *skb) {
return 0;
}

u64 current_time = bpf_ktime_get_ns();
additional_metrics new_flow = {
.start_mono_time_ts = current_time,
.end_mono_time_ts = current_time,
.eth_protocol = eth_protocol,
.flow_rtt = rtt,
};
ret = bpf_map_update_elem(&additional_flow_metrics, &id, &new_flow, BPF_NOEXIST);
Expand Down
3 changes: 3 additions & 0 deletions bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ typedef struct flow_metrics_t {
const struct flow_metrics_t *unused2 __attribute__((unused));

typedef struct additional_metrics_t {
u64 start_mono_time_ts;
u64 end_mono_time_ts;
struct dns_record_t {
u16 id;
u16 flags;
Expand All @@ -122,6 +124,7 @@ typedef struct additional_metrics_t {
u64 flow_rtt;
u8 network_events_idx;
u8 network_events[MAX_NETWORK_EVENTS][MAX_EVENT_MD];
u16 eth_protocol;
struct translated_flow_t {
u8 saddr[IP_MAX_LEN];
u8 daddr[IP_MAX_LEN];
Expand Down
12 changes: 10 additions & 2 deletions bpf/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,20 @@ static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, pkt_info *pkt,
return SUBMIT;
}

static inline bool is_filter_enabled() {
if (enable_flows_filtering || enable_pca) {
return true;
}
return false;
}

/*
* check if flow filter is enabled and if we need to continue processing the packet or not
*/
static inline bool check_and_do_flow_filtering(flow_id *id, u16 flags, u32 drop_reason,
u16 eth_protocol, u32 *sampling) {
// check if this packet need to be filtered if filtering feature is enabled
if (enable_flows_filtering || enable_pca) {
if (is_filter_enabled()) {
filter_action action = ACCEPT;
if (is_flow_filtered(id, &action, flags, drop_reason, eth_protocol, sampling) != 0 &&
action != MAX_FILTER_ACTIONS) {
Expand Down Expand Up @@ -210,7 +217,8 @@ static inline bool check_and_do_flow_filtering(flow_id *id, u16 flags, u32 drop_
} else {
// we have no matching rules so we update global counter for flows that are not matched by any rule
increase_counter(FILTER_NOMATCH);
// we have accept rule but no match so we can't let mismatched flows in the hashmap table.
// we have accept rule but no match so we can't let mismatched flows in the hashmap table or
// we have no match at all and the action is the default value MAX_FILTER_ACTIONS.
if (action == ACCEPT || action == MAX_FILTER_ACTIONS) {
return true;
} else {
Expand Down
5 changes: 4 additions & 1 deletion pkg/ebpf/bpf_arm64_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_arm64_bpfel.o
Binary file not shown.
5 changes: 4 additions & 1 deletion pkg/ebpf/bpf_powerpc_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_powerpc_bpfel.o
Binary file not shown.
5 changes: 4 additions & 1 deletion pkg/ebpf/bpf_s390_bpfeb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_s390_bpfeb.o
Binary file not shown.
5 changes: 4 additions & 1 deletion pkg/ebpf/bpf_x86_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_x86_bpfel.o
Binary file not shown.
25 changes: 25 additions & 0 deletions pkg/flow/tracer_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,31 @@ func TestPacketAggregation(t *testing.T) {
{1, 4, 0, 0, 0, 0, 0, 0},
},
}},
}, {
name: "accumulate no base",
input: []model.BpfFlowContent{
{
BpfFlowMetrics: &ebpf.BpfFlowMetrics{},
AdditionalMetrics: &ebpf.BpfAdditionalMetrics{DnsRecord: ebpf.BpfDnsRecordT{Id: 5}, StartMonoTimeTs: 15, EndMonoTimeTs: 25},
},
{
BpfFlowMetrics: &ebpf.BpfFlowMetrics{},
AdditionalMetrics: &ebpf.BpfAdditionalMetrics{FlowRtt: 500},
},
{
BpfFlowMetrics: &ebpf.BpfFlowMetrics{},
AdditionalMetrics: &ebpf.BpfAdditionalMetrics{PktDrops: ebpf.BpfPktDropsT{Packets: 5, Bytes: 1000, LatestFlags: 1}},
},
},
expected: model.BpfFlowContent{
BpfFlowMetrics: &ebpf.BpfFlowMetrics{StartMonoTimeTs: 15, EndMonoTimeTs: 25, Flags: 1},
AdditionalMetrics: &ebpf.BpfAdditionalMetrics{
StartMonoTimeTs: 15,
EndMonoTimeTs: 25,
DnsRecord: ebpf.BpfDnsRecordT{Id: 5},
FlowRtt: 500,
PktDrops: ebpf.BpfPktDropsT{Packets: 5, Bytes: 1000, LatestFlags: 1},
}},
},
}
for i, tc := range tcs {
Expand Down
Loading

0 comments on commit 4d8f9d9

Please sign in to comment.