Skip to content

Commit

Permalink
Merge pull request #473 from msherif1234/multi_filter
Browse files Browse the repository at this point in the history
NETOBSERV-2005: Support multiple flow filter rules using json fmt string
  • Loading branch information
msherif1234 authored Dec 13, 2024
2 parents 2654098 + 5c07db3 commit 7e7f8c4
Show file tree
Hide file tree
Showing 29 changed files with 245 additions and 163 deletions.
30 changes: 18 additions & 12 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@
*/
#include "pkt_translation.h"

static inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt, u64 len) {
static inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt, u64 len,
u32 sampling) {
bpf_spin_lock(&aggregate_flow->lock);
aggregate_flow->packets += 1;
aggregate_flow->bytes += len;
aggregate_flow->end_mono_time_ts = pkt->current_ts;
aggregate_flow->flags |= pkt->flags;
aggregate_flow->dscp = pkt->dscp;
aggregate_flow->sampling = sampling;
bpf_spin_unlock(&aggregate_flow->lock);
}

Expand All @@ -77,15 +79,7 @@ static inline void update_dns(additional_metrics *extra_metrics, pkt_info *pkt,
}

static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
// If sampling is defined, will only parse 1 out of "sampling" flows
if (sampling > 1 && (bpf_get_prandom_u32() % sampling) != 0) {
do_sampling = 0;
return TC_ACT_OK;
}
do_sampling = 1;

u16 eth_protocol = 0;

pkt_info pkt;
__builtin_memset(&pkt, 0, sizeof(pkt));

Expand All @@ -109,18 +103,29 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
id.direction = direction;

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol);
u32 filter_sampling = 0;
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, &filter_sampling);
if (skip) {
return TC_ACT_OK;
}
if (filter_sampling == 0) {
filter_sampling = sampling;
}

// If sampling is defined, will only parse 1 out of "sampling" flows
if (filter_sampling > 1 && (bpf_get_prandom_u32() % filter_sampling) != 0) {
do_sampling = 0;
return TC_ACT_OK;
}
do_sampling = 1;

int dns_errno = 0;
if (enable_dns_tracking) {
dns_errno = track_dns_packet(skb, &pkt);
}
flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
update_existing_flow(aggregate_flow, &pkt, len);
update_existing_flow(aggregate_flow, &pkt, len, filter_sampling);
} else {
// Key does not exist in the map, and will need to create a new entry.
flow_metrics new_flow = {
Expand All @@ -131,6 +136,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
.end_mono_time_ts = pkt.current_ts,
.flags = pkt.flags,
.dscp = pkt.dscp,
.sampling = filter_sampling,
};
__builtin_memcpy(new_flow.dst_mac, eth->h_dest, ETH_ALEN);
__builtin_memcpy(new_flow.src_mac, eth->h_source, ETH_ALEN);
Expand All @@ -144,7 +150,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
flow_metrics *aggregate_flow =
(flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
update_existing_flow(aggregate_flow, &pkt, len);
update_existing_flow(aggregate_flow, &pkt, len, filter_sampling);
} else {
if (trace_messages) {
bpf_printk("failed to update an exising flow\n");
Expand Down
13 changes: 9 additions & 4 deletions bpf/flows_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ static __always_inline int is_equal_ip(u8 *ip1, u8 *ip2, u8 len) {

static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_t *key,
filter_action *action, u8 len, u8 offset,
u16 flags, u32 drop_reason) {
u16 flags, u32 drop_reason, u32 *sampling) {
int result = 0;

struct filter_value_t *rule = (struct filter_value_t *)bpf_map_lookup_elem(&filter_map, key);
Expand Down Expand Up @@ -195,6 +195,11 @@ static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_
goto end;
}
}
u32 sample = rule->sample;
if (sample && sampling != NULL) {
BPF_PRINTK("sampling action is set to %d\n", sample);
*sampling = sample;
}
}
end:
BPF_PRINTK("result: %d action %d\n", result, *action);
Expand Down Expand Up @@ -233,7 +238,7 @@ static __always_inline int flow_filter_setup_lookup_key(flow_id *id, struct filt
* check if the flow match filter rule and return >= 1 if the flow is to be dropped
*/
static __always_inline int is_flow_filtered(flow_id *id, filter_action *action, u16 flags,
u32 drop_reason, u16 eth_protocol) {
u32 drop_reason, u16 eth_protocol, u32 *sampling) {
struct filter_key_t key;
u8 len, offset;
int result = 0;
Expand All @@ -247,7 +252,7 @@ static __always_inline int is_flow_filtered(flow_id *id, filter_action *action,
return result;
}

result = do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason);
result = do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason, sampling);
// we have a match so return
if (result > 0) {
return result;
Expand All @@ -259,7 +264,7 @@ static __always_inline int is_flow_filtered(flow_id *id, filter_action *action,
return result;
}

return do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason);
return do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason, sampling);
}

#endif //__FLOWS_FILTER_H__
2 changes: 1 addition & 1 deletion bpf/network_events_monitoring.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me
}

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol);
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL);
if (skip) {
return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion bpf/pca.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static inline bool validate_pca_filter(struct __sk_buff *skb, direction dir) {
id.direction = dir;

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol);
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, NULL);
if (skip) {
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion bpf/pkt_drops.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ static inline int trace_pkt_drop(void *ctx, u8 state, struct sk_buff *skb,
}

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, flags, reason, eth_protocol);
bool skip = check_and_do_flow_filtering(&id, flags, reason, eth_protocol, NULL);
if (skip) {
return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion bpf/pkt_translation.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ static inline int trace_nat_manip_pkt(struct nf_conn *ct, struct sk_buff *skb) {
}

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol);
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL);
if (skip) {
return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion bpf/rtt_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static inline int calculate_flow_rtt_tcp(struct sock *sk, struct sk_buff *skb) {
rtt *= 1000u;

// check if this packet need to be filtered if filtering feature is enabled
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol);
bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL);
if (skip) {
return 0;
}
Expand Down
4 changes: 3 additions & 1 deletion bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ typedef __u64 u64;
#define DSCP_SHIFT 2
#define DSCP_MASK 0x3F

#define MAX_FILTER_ENTRIES 1 // we have only one global filter
#define MAX_FILTER_ENTRIES 16
#define MAX_EVENT_MD 8
#define MAX_NETWORK_EVENTS 4

Expand Down Expand Up @@ -99,6 +99,7 @@ typedef struct flow_metrics_t {
// https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
u8 errno;
u8 dscp;
u32 sampling;
} flow_metrics;

// Force emitting enums/structs into the ELF
Expand Down Expand Up @@ -262,6 +263,7 @@ struct filter_value_t {
filter_action action;
tcp_flags tcpFlags;
u8 filter_drops;
u32 sample;
u8 ip[IP_MAX_LEN];
} __attribute__((packed));

Expand Down
4 changes: 2 additions & 2 deletions bpf/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,11 @@ static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, pkt_info *pkt,
* check if flow filter is enabled and if we need to continue processing the packet or not
*/
static inline bool check_and_do_flow_filtering(flow_id *id, u16 flags, u32 drop_reason,
u16 eth_protocol) {
u16 eth_protocol, u32 *sampling) {
// check if this packet need to be filtered if filtering feature is enabled
if (enable_flows_filtering || enable_pca) {
filter_action action = ACCEPT;
if (is_flow_filtered(id, &action, flags, drop_reason, eth_protocol) != 0 &&
if (is_flow_filtered(id, &action, flags, drop_reason, eth_protocol, sampling) != 0 &&
action != MAX_FILTER_ACTIONS) {
// we have matching rules follow through the actions to decide if we should accept or reject the flow
// and update global counter for both cases
Expand Down
36 changes: 24 additions & 12 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package agent

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -199,7 +200,29 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
if cfg.LogLevel == logrus.TraceLevel.String() || cfg.LogLevel == logrus.DebugLevel.String() {
debug = true
}
filterRules := make([]*tracer.FilterConfig, 0)
if cfg.EnableFlowFilter {
var flowFilters []*FlowFilter
if err := json.Unmarshal([]byte(cfg.FlowFilterRules), &flowFilters); err != nil {
return nil, err
}

for _, r := range flowFilters {
filterRules = append(filterRules, &tracer.FilterConfig{
FilterAction: r.FilterAction,
FilterDirection: r.FilterDirection,
FilterIPCIDR: r.FilterIPCIDR,
FilterProtocol: r.FilterProtocol,
FilterPeerIP: r.FilterPeerIP,
FilterDestinationPort: tracer.ConvertFilterPortsToInstr(r.FilterDestinationPort, r.FilterDestinationPortRange, r.FilterDestinationPorts),
FilterSourcePort: tracer.ConvertFilterPortsToInstr(r.FilterSourcePort, r.FilterSourcePortRange, r.FilterSourcePorts),
FilterPort: tracer.ConvertFilterPortsToInstr(r.FilterPort, r.FilterPortRange, r.FilterPorts),
FilterTCPFlags: r.FilterTCPFlags,
FilterDrops: r.FilterDrops,
FilterSample: r.FilterSample,
})
}
}
ebpfConfig := &tracer.FlowFetcherConfig{
EnableIngress: ingress,
EnableEgress: egress,
Expand All @@ -216,18 +239,7 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
EnablePktTranslation: cfg.EnablePktTranslationTracking,
UseEbpfManager: cfg.EbpfProgramManagerMode,
BpfManBpfFSPath: cfg.BpfManBpfFSPath,
FilterConfig: &tracer.FilterConfig{
FilterAction: cfg.FilterAction,
FilterDirection: cfg.FilterDirection,
FilterIPCIDR: cfg.FilterIPCIDR,
FilterProtocol: cfg.FilterProtocol,
FilterPeerIP: cfg.FilterPeerIP,
FilterDestinationPort: tracer.ConvertFilterPortsToInstr(cfg.FilterDestinationPort, cfg.FilterDestinationPortRange, cfg.FilterDestinationPorts),
FilterSourcePort: tracer.ConvertFilterPortsToInstr(cfg.FilterSourcePort, cfg.FilterSourcePortRange, cfg.FilterSourcePorts),
FilterPort: tracer.ConvertFilterPortsToInstr(cfg.FilterPort, cfg.FilterPortRange, cfg.FilterPorts),
FilterTCPFLags: cfg.FilterTCPFlags,
FilterDrops: cfg.FilterDrops,
},
FilterConfig: filterRules,
}

fetcher, err := tracer.NewFlowFetcher(ebpfConfig)
Expand Down
103 changes: 55 additions & 48 deletions pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,59 @@ const (
IPIfaceNamedPrefix = "name:"
)

type FlowFilter struct {
// FilterDirection is the direction of the flow filter.
// Possible values are "Ingress" or "Egress".
FilterDirection string `json:"direction,omitempty"`
// FilterIPCIDR is the IP CIDR to filter flows.
// Example: 10.10.10.0/24 or 100:100:100:100::/64, default is 0.0.0.0/0
FilterIPCIDR string `json:"ip_cidr,omitempty"`
// FilterProtocol is the protocol to filter flows.
// supported protocols: TCP, UDP, SCTP, ICMP, ICMPv6
FilterProtocol string `json:"protocol,omitempty"`
// FilterSourcePort is the source port to filter flows.
FilterSourcePort int32 `json:"source_port,omitempty"`
// FilterDestinationPort is the destination port to filter flows.
FilterDestinationPort int32 `json:"destination_port,omitempty"`
// FilterPort is the port to filter flows, can be use for either source or destination port.
FilterPort int32 `json:"port,omitempty"`
// FilterSourcePortRange is the source port range to filter flows.
// Example: 8000-8010
FilterSourcePortRange string `json:"source_port_range,omitempty"`
// FilterSourcePorts is two source ports to filter flows.
// Example: 8000,8010
FilterSourcePorts string `json:"source_ports,omitempty"`
// FilterDestinationPortRange is the destination port range to filter flows.
// Example: 8000-8010
FilterDestinationPortRange string `json:"destination_port_range,omitempty"`
// FilterDestinationPorts is two destination ports to filter flows.
// Example: 8000,8010
FilterDestinationPorts string `json:"destination_ports,omitempty"`
// FilterPortRange is the port range to filter flows, can be used for either source or destination port.
// Example: 8000-8010
FilterPortRange string `json:"port_range,omitempty"`
// FilterPorts is two ports option to filter flows, can be used for either source or destination port.
// Example: 8000,8010
FilterPorts string `json:"ports,omitempty"`
// FilterICMPType is the ICMP type to filter flows.
FilterICMPType int `json:"icmp_type,omitempty"`
// FilterICMPCode is the ICMP code to filter flows.
FilterICMPCode int `json:"icmp_code,omitempty"`
// FilterPeerIP is the IP to filter flows.
// Example: 10.10.10.10
FilterPeerIP string `json:"peer_ip,omitempty"`
// FilterAction is the action to filter flows.
// Possible values are "Accept" or "Reject".
FilterAction string `json:"action,omitempty"`
// FilterTCPFlags is the TCP flags to filter flows.
// possible values are: SYN, SYN-ACK, ACK, FIN, RST, PSH, URG, ECE, CWR, FIN-ACK, RST-ACK
FilterTCPFlags string `json:"tcp_flags,omitempty"`
// FilterDrops allow filtering flows with packet drops, default is false.
FilterDrops bool `json:"drops,omitempty"`
// FilterSample is the sample rate this matching flow will use
FilterSample uint32 `json:"sample,omitempty"`
}

type Config struct {
// AgentIP allows overriding the reported Agent IP address on each flow.
AgentIP string `env:"AGENT_IP"`
Expand Down Expand Up @@ -183,54 +236,8 @@ type Config struct {
MetricsPrefix string `env:"METRICS_PREFIX" envDefault:"ebpf_agent_"`
// EnableFlowFilter enables flow filter, default is false.
EnableFlowFilter bool `env:"ENABLE_FLOW_FILTER" envDefault:"false"`
// FilterDirection is the direction of the flow filter.
// Possible values are "Ingress" or "Egress".
FilterDirection string `env:"FILTER_DIRECTION"`
// FilterIPCIDR is the IP CIDR to filter flows.
// Example: 10.10.10.0/24 or 100:100:100:100::/64
FilterIPCIDR string `env:"FILTER_IP_CIDR" envDefault:"0.0.0.0/0"`
// FilterProtocol is the protocol to filter flows.
// Example: TCP, UDP, SCTP, ICMP, ICMPv6
FilterProtocol string `env:"FILTER_PROTOCOL"`
// FilterSourcePort is the source port to filter flows.
FilterSourcePort int32 `env:"FILTER_SOURCE_PORT"`
// FilterDestinationPort is the destination port to filter flows.
FilterDestinationPort int32 `env:"FILTER_DESTINATION_PORT"`
// FilterPort is the port to filter flows, can be use for either source or destination port.
FilterPort int32 `env:"FILTER_PORT"`
// FilterSourcePortRange is the source port range to filter flows.
// Example: 8000-8010
FilterSourcePortRange string `env:"FILTER_SOURCE_PORT_RANGE"`
// FilterSourcePorts is two source ports to filter flows.
// Example: 8000,8010
FilterSourcePorts string `env:"FILTER_SOURCE_PORTS"`
// FilterDestinationPortRange is the destination port range to filter flows.
// Example: 8000-8010
FilterDestinationPortRange string `env:"FILTER_DESTINATION_PORT_RANGE"`
// FilterDestinationPorts is two destination ports to filter flows.
// Example: 8000,8010
FilterDestinationPorts string `env:"FILTER_DESTINATION_PORTS"`
// FilterPortRange is the port range to filter flows, can be used for either source or destination port.
// Example: 8000-8010
FilterPortRange string `env:"FILTER_PORT_RANGE"`
// FilterPorts is two ports option to filter flows, can be used for either source or destination port.
// Example: 8000,8010
FilterPorts string `env:"FILTER_PORTS"`
// FilterICMPType is the ICMP type to filter flows.
FilterICMPType int `env:"FILTER_ICMP_TYPE"`
// FilterICMPCode is the ICMP code to filter flows.
FilterICMPCode int `env:"FILTER_ICMP_CODE"`
// FilterPeerIP is the IP to filter flows.
// Example: 10.10.10.10
FilterPeerIP string `env:"FILTER_PEER_IP"`
// FilterAction is the action to filter flows.
// Possible values are "Accept" or "Reject".
FilterAction string `env:"FILTER_ACTION" envDefault:"Accept"`
// FilterTCPFlags is the TCP flags to filter flows.
// possible values are: SYN, SYN-ACK, ACK, FIN, RST, PSH, URG, ECE, CWR, FIN-ACK, RST-ACK
FilterTCPFlags string `env:"FILTER_TCP_FLAGS"`
// FilterDrops allow filtering flows with packet drops, default is false.
FilterDrops bool `env:"FILTER_DROPS" envDefault:"false"`
// FlowFilterRules list of flow filter rules
FlowFilterRules string `env:"FLOW_FILTER_RULES"`
// EnableNetworkEventsMonitoring enables monitoring network plugin events, default is false.
EnableNetworkEventsMonitoring bool `env:"ENABLE_NETWORK_EVENTS_MONITORING" envDefault:"false"`
// NetworkEventsMonitoringGroupID to allow ebpf hook to process samples for specific groupID and ignore the rest
Expand Down
Loading

0 comments on commit 7e7f8c4

Please sign in to comment.