Skip to content

Commit

Permalink
Implementation of OVS monitoring
Browse files Browse the repository at this point in the history
Signed-off-by: Mohamed Mahmoud <[email protected]>
  • Loading branch information
msherif1234 committed Sep 5, 2024
1 parent 6adad95 commit 2358cf4
Show file tree
Hide file tree
Showing 410 changed files with 71,171 additions and 1,258 deletions.
2 changes: 2 additions & 0 deletions bpf/configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ volatile const u8 enable_pca = 0;
volatile const u8 enable_dns_tracking = 0;
volatile const u8 enable_flows_filtering = 0;
volatile const u16 dns_port = 0;
volatile const u8 enable_network_events_monitoring = 0;
volatile const u8 network_events_monitoring_groupid = 0;
#endif //__CONFIGS_H__
33 changes: 21 additions & 12 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,37 @@
#include "configs.h"
#include "utils.h"

/* Defines a packet drops statistics tracker,
which attaches at kfree_skb hook. Is optional.
*/
/*
* Defines a packet drops statistics tracker,
* which attaches at kfree_skb hook. Is optional.
*/
#include "pkt_drops.h"

/* Defines a dns tracker,
which attaches at net_dev_queue hook. Is optional.
*/
/*
* Defines a dns tracker,
* which attaches at net_dev_queue hook. Is optional.
*/
#include "dns_tracker.h"

/* Defines an rtt tracker,
which runs inside flow_monitor. Is optional.
*/
/*
* Defines an rtt tracker,
* which runs inside flow_monitor. Is optional.
*/
#include "rtt_tracker.h"

/* Defines a Packet Capture Agent (PCA) tracker,
It is enabled by setting env var ENABLE_PCA= true. Is Optional
*/
/*
* Defines a Packet Capture Agent (PCA) tracker,
* It is enabled by setting env var ENABLE_PCA= true. Is Optional
*/
#include "pca.h"

/* Do flow filtering. Is optional. */
#include "flows_filter.h"
/*
* Defines an Network events monitoring tracker,
* which runs inside flow_monitor. Is optional.
*/
#include "network_events_monitoring.h"

static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
// If sampling is defined, will only parse 1 out of "sampling" flows
Expand Down
201 changes: 201 additions & 0 deletions bpf/network_events_monitoring.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Network events monitoring kprobe eBPF hook.
*/

#ifndef __NETWORK_EVENTS_MONITORING_H__
#define __NETWORK_EVENTS_MONITORING_H__

#include "utils.h"

struct rh_psample_metadata {
u32 trunc_size;
int in_ifindex;
int out_ifindex;
u16 out_tc;
u64 out_tc_occ; /* bytes */
u64 latency; /* nanoseconds */
u8 out_tc_valid : 1, out_tc_occ_valid : 1, latency_valid : 1, rate_as_probability : 1,
unused : 4;
const u8 *user_cookie;
u32 user_cookie_len;
};

static inline bool md_already_exists(u8 network_events[MAX_NETWORK_EVENTS][MAX_EVENT_MD], u8 *md) {
for (u8 i = 0; i < MAX_NETWORK_EVENTS; i++) {
if (__builtin_memcmp(network_events[i], md, MAX_EVENT_MD) == 0) {
return true;
}
}
return false;
}

static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_metadata *md) {
u8 dscp = 0, protocol = 0, md_len = 0;
u16 family = 0, flags = 0;
u8 *user_cookie = NULL;
u8 cookie[MAX_EVENT_MD];
long ret = 0;
u64 len = 0;
flow_id id;

__builtin_memset(&id, 0, sizeof(id));
__builtin_memset(cookie, 0, sizeof(cookie));

md_len = BPF_CORE_READ(md, user_cookie_len);
user_cookie = (u8 *)BPF_CORE_READ(md, user_cookie);
if (md_len == 0 || md_len > MAX_EVENT_MD || user_cookie == NULL) {
return -1;
}

bpf_probe_read(cookie, md_len, user_cookie);

id.if_index = BPF_CORE_READ(md, in_ifindex);

len = BPF_CORE_READ(skb, len);

// read L2 info
core_fill_in_l2(skb, &id, &family);

// read L3 info
core_fill_in_l3(skb, &id, family, &protocol, &dscp);

// read L4 info
switch (protocol) {
case IPPROTO_TCP:
core_fill_in_tcp(skb, &id, &flags);
break;
case IPPROTO_UDP:
core_fill_in_udp(skb, &id);
break;
case IPPROTO_SCTP:
core_fill_in_sctp(skb, &id);
break;
case IPPROTO_ICMP:
core_fill_in_icmpv4(skb, &id);
break;
case IPPROTO_ICMPV6:
core_fill_in_icmpv6(skb, &id);
break;
default:
fill_in_others_protocol(&id, protocol);
}

for (direction dir = INGRESS; dir < MAX_DIRECTION; dir++) {
id.direction = dir;
flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
u8 idx = aggregate_flow->network_events_idx;
aggregate_flow->end_mono_time_ts = bpf_ktime_get_ns();
// Needed to check length here again to keep JIT verifier happy
if (idx < MAX_NETWORK_EVENTS && md_len <= MAX_EVENT_MD) {
if (!md_already_exists(aggregate_flow->network_events, (u8 *)cookie)) {
__builtin_memcpy(aggregate_flow->network_events[idx], cookie, MAX_EVENT_MD);
aggregate_flow->network_events_idx = (idx + 1) % MAX_NETWORK_EVENTS;
}
ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY);
if (ret == 0) {
return 0;
}
} else {
return -1;
}
}
}

if (ret != 0) {
if (trace_messages) {
bpf_printk("error network events updating existing flow %d\n", ret);
}
return ret;
}

// there is no matching flows so lets create new one and add the network event metadata
u64 current_time = bpf_ktime_get_ns();
id.direction = INGRESS;
flow_metrics new_flow = {
.packets = 1,
.bytes = len,
.start_mono_time_ts = current_time,
.end_mono_time_ts = current_time,
.flags = flags,
.network_events_idx = 0,
};
bpf_probe_read(new_flow.network_events[0], md_len, user_cookie);
new_flow.network_events_idx++;
ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY);
if (trace_messages && ret != 0) {
bpf_printk("error network events creating new flow %d\n", ret);
}
return ret;
}

static inline void update_network_events_statistics(u32 *key, u32 *value) {
u32 *error_counter_p = NULL;
error_counter_p = bpf_map_lookup_elem(&global_counters, key);
if (!error_counter_p) {
bpf_map_update_elem(&global_counters, key, value, BPF_ANY);
return;
}
__sync_fetch_and_add(error_counter_p, 1);
}

// for older kernel will use `rh_psample_sample_packet` to avoid kapis issues
SEC("kprobe/rh_psample_sample_packet")
int BPF_KPROBE(rh_network_events_monitoring, struct psample_group *group, struct sk_buff *skb,
u32 sample_rate, struct rh_psample_metadata *md) {
u32 initVal = 1, key = NETWORK_EVENTS_ERR_KEY;
if (enable_network_events_monitoring == 0 || do_sampling == 0) {
return 0;
}
if (skb == NULL || md == NULL || group == NULL) {
update_network_events_statistics(&key, &initVal);
return 0;
}
// filter out none matching samples with different groupid
int group_id = BPF_CORE_READ(group, group_num);
if (group_id != network_events_monitoring_groupid) {
key = NETWORK_EVENTS_ERR_GROUPID_MISMATCH;
update_network_events_statistics(&key, &initVal);
return 0;
}
long ret = 0;
if ((ret = trace_network_events(skb, md)) != 0) {
key = NETWORK_EVENTS_ERR_UPDATE_MAP_FLOWS;
update_network_events_statistics(&key, &initVal);
return 0;
}
key = NETWORK_EVENTS_GOOD;
update_network_events_statistics(&key, &initVal);
return 0;
}
/*
SEC("kprobe/psample_sample_packet")
int BPF_KPROBE(network_events_monitoring, struct psample_group *group, struct sk_buff *skb, u32 sample_rate,
struct psample_metadata *md) {
u32 initVal = 1, key = NETWORK_EVENTS_ERR_KEY;
if (enable_network_events_monitoring == 0 || do_sampling == 0) {
return 0;
}
if (skb == NULL || md == NULL || group == NULL) {
update_network_events_statistics(&key, &initVal);
return 0;
}
// filter out none matching samples with different groupid
int group_id = BPF_CORE_READ(group, group_num);
if (group_id != network_events_monitoring_groupid) {
key = NETWORK_EVENTS_ERR_GROUPID_MISMATCH;
update_network_events_statistics(&key, &initVal);
return 0;
}
if (trace_network_events(skb, md) != 0) {
key = NETWORK_EVENTS_ERR_UPDATE_FLOWS;
update_network_events_statistics(&key, &initVal);
return 0;
}
key = NETWORK_EVENTS_GOOD;
update_network_events_statistics(&key, &initVal);
return 0;
}
*/
#endif /* __NETWORK_EVENTS_MONITORING_H__ */
76 changes: 8 additions & 68 deletions bpf/rtt_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,72 +5,10 @@
#ifndef __RTT_TRACKER_H__
#define __RTT_TRACKER_H__

#include <bpf_core_read.h>
#include <bpf_tracing.h>
#include "utils.h"
#include "maps_definition.h"

static inline void rtt_fill_in_l2(struct sk_buff *skb, flow_id *id) {
struct ethhdr eth;

__builtin_memset(&eth, 0, sizeof(eth));

u8 *skb_head = BPF_CORE_READ(skb, head);
u16 skb_mac_header = BPF_CORE_READ(skb, mac_header);

bpf_probe_read(&eth, sizeof(eth), (struct ethhdr *)(skb_head + skb_mac_header));
__builtin_memcpy(id->dst_mac, eth.h_dest, ETH_ALEN);
__builtin_memcpy(id->src_mac, eth.h_source, ETH_ALEN);
id->eth_protocol = bpf_ntohs(eth.h_proto);
}

static inline void rtt_fill_in_l3(struct sk_buff *skb, flow_id *id, u16 family, u8 *dscp) {
u16 skb_network_header = BPF_CORE_READ(skb, network_header);
u8 *skb_head = BPF_CORE_READ(skb, head);

switch (family) {
case AF_INET: {
struct iphdr ip;
__builtin_memset(&ip, 0, sizeof(ip));
bpf_probe_read(&ip, sizeof(ip), (struct iphdr *)(skb_head + skb_network_header));
__builtin_memcpy(id->src_ip, ip4in6, sizeof(ip4in6));
__builtin_memcpy(id->dst_ip, ip4in6, sizeof(ip4in6));
__builtin_memcpy(id->src_ip + sizeof(ip4in6), &ip.saddr, sizeof(ip.saddr));
__builtin_memcpy(id->dst_ip + sizeof(ip4in6), &ip.daddr, sizeof(ip.daddr));
*dscp = ipv4_get_dscp(&ip);
break;
}
case AF_INET6: {
struct ipv6hdr ip;
__builtin_memset(&ip, 0, sizeof(ip));
bpf_probe_read(&ip, sizeof(ip), (struct ipv6hdr *)(skb_head + skb_network_header));
__builtin_memcpy(id->src_ip, ip.saddr.in6_u.u6_addr8, IP_MAX_LEN);
__builtin_memcpy(id->dst_ip, ip.daddr.in6_u.u6_addr8, IP_MAX_LEN);
*dscp = ipv6_get_dscp(&ip);
break;
}
default:
return;
}
}

static inline void rtt_fill_in_tcp(struct sk_buff *skb, flow_id *id, u16 *flags) {
u16 skb_transport_header = BPF_CORE_READ(skb, transport_header);
u8 *skb_head = BPF_CORE_READ(skb, head);
struct tcphdr tcp;
u16 sport, dport;

__builtin_memset(&tcp, 0, sizeof(tcp));

bpf_probe_read(&tcp, sizeof(tcp), (struct tcphdr *)(skb_head + skb_transport_header));
sport = bpf_ntohs(tcp.source);
dport = bpf_ntohs(tcp.dest);
id->src_port = sport;
id->dst_port = dport;
set_flags(&tcp, flags);
id->transport_protocol = IPPROTO_TCP;
}

static inline int rtt_lookup_and_update_flow(flow_id *id, u16 flags, u64 rtt) {
flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, id);
if (aggregate_flow != NULL) {
Expand All @@ -89,12 +27,12 @@ static inline int rtt_lookup_and_update_flow(flow_id *id, u16 flags, u64 rtt) {
}

static inline int calculate_flow_rtt_tcp(struct sock *sk, struct sk_buff *skb) {
u8 dscp = 0, protocol = 0;
struct tcp_sock *ts;
u16 family, flags = 0;
u64 rtt = 0, len;
int ret = 0;
flow_id id;
u8 dscp = 0;

if (!enable_rtt) {
return 0;
Expand All @@ -109,15 +47,17 @@ static inline int calculate_flow_rtt_tcp(struct sock *sk, struct sk_buff *skb) {
len = BPF_CORE_READ(skb, len);

// read L2 info
rtt_fill_in_l2(skb, &id);

family = BPF_CORE_READ(sk, __sk_common.skc_family);
core_fill_in_l2(skb, &id, &family);

// read L3 info
rtt_fill_in_l3(skb, &id, family, &dscp);
core_fill_in_l3(skb, &id, family, &protocol, &dscp);

if (protocol != IPPROTO_TCP) {
return 0;
}

// read TCP info
rtt_fill_in_tcp(skb, &id, &flags);
core_fill_in_tcp(skb, &id, &flags);

// read TCP socket rtt and store it in nanoseconds
ts = (struct tcp_sock *)(sk);
Expand Down
Loading

0 comments on commit 2358cf4

Please sign in to comment.