Skip to content

Commit

Permalink
WIP: initial implementation of OVS monitoring
Browse files Browse the repository at this point in the history
Signed-off-by: Mohamed Mahmoud <[email protected]>
  • Loading branch information
msherif1234 committed Mar 5, 2024
1 parent 1d85464 commit 02419f3
Show file tree
Hide file tree
Showing 15 changed files with 168 additions and 28 deletions.
34 changes: 22 additions & 12 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,36 @@
#include "configs.h"
#include "utils.h"

/* Defines a packet drops statistics tracker,
which attaches at kfree_skb hook. Is optional.
*/
/*
* Defines a packet drops statistics tracker,
* which attaches at kfree_skb hook. Is optional.
*/
#include "pkt_drops.h"

/* Defines a dns tracker,
which attaches at net_dev_queue hook. Is optional.
*/
/*
* Defines a dns tracker,
* which attaches at net_dev_queue hook. Is optional.
*/
#include "dns_tracker.h"

/* Defines an rtt tracker,
which runs inside flow_monitor. Is optional.
*/
/*
* Defines an rtt tracker,
* which runs inside flow_monitor. Is optional.
*/
#include "rtt_tracker.h"

/* Defines a Packet Capture Agent (PCA) tracker,
It is enabled by setting env var ENABLE_PCA= true. Is Optional
*/
/*
* Defines a Packet Capture Agent (PCA) tracker,
* It is enabled by setting env var ENABLE_PCA= true. Is Optional
*/
#include "pca.h"

/*
* Defines an OVS monitoring tracker,
* which runs inside flow_monitor. Is optional.
*/
#include "ovs_monitoring.h"

static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
// If sampling is defined, will only parse 1 out of "sampling" flows
if (sampling != 0 && (bpf_get_prandom_u32() % sampling) != 0) {
Expand Down
88 changes: 88 additions & 0 deletions bpf/ovs_monitoring.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* OVS monitoring trace point eBPF hook.
*/

#ifndef __OVS_MONITORING_H__
#define __OVS_MONITORING_H__
#include "utils.h"

struct sw_flow_key {
u64 key [2];
};

static inline int trace_ovs_dp(struct sk_buff *skb, struct sw_flow_key *key) {
flow_id id;
__builtin_memset(&id, 0, sizeof(id));

u8 protocol = 0;
u16 family = 0,flags = 0;

id.if_index = skb->skb_iif;
// filter out TCP sockets with unknown or loopback interface
if (id.if_index == 0 || id.if_index == 1) {
return 0;
}
// read L2 info
set_key_with_l2_info(skb, &id, &family);

// read L3 info
set_key_with_l3_info(skb, family, &id, &protocol);

// read L4 info
switch (protocol) {
case IPPROTO_TCP:
set_key_with_tcp_info(skb, &id, protocol, &flags);
break;
case IPPROTO_UDP:
set_key_with_udp_info(skb, &id, protocol);
break;
case IPPROTO_SCTP:
set_key_with_sctp_info(skb, &id, protocol);
break;
case IPPROTO_ICMP:
set_key_with_icmpv4_info(skb, &id, protocol);
break;
case IPPROTO_ICMPV6:
set_key_with_icmpv6_info(skb, &id, protocol);
break;
default:
return 0;
}

long ret = 0;
for (direction_t dir = INGRESS; dir < MAX_DIRECTION; dir++) {
id.direction = dir;
flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
aggregate_flow->ovs_dp_keys[0] = BPF_CORE_READ(key, key[0]);
aggregate_flow->ovs_dp_keys[1] = BPF_CORE_READ(key, key[1]);
ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY);
if (ret == 0) {
return 0;
}
}
}
// there is no matching flows so lets create new one and add the ovs datapath keys
u64 current_time = bpf_ktime_get_ns();
id.direction = INGRESS;
flow_metrics new_flow = {
.start_mono_time_ts = current_time,
.end_mono_time_ts = current_time,
.flags = flags,
.ovs_dp_keys[0] = key->key[0],
.ovs_dp_keys[1] = key->key[1],
};
ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY);
if (trace_messages && ret != 0) {
bpf_printk("error ovs datapath creating new flow %d\n", ret);
}

return ret;
}

SEC("tracepoint/openvswitch/ovs_dp_monitor")
int ovs_dp_monitor(struct sk_buff *skb, struct sw_flow_key *key) {
return trace_ovs_dp(skb, key);
}

#endif /* __OVS_MONITORING_H__ */
1 change: 1 addition & 0 deletions bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ typedef struct flow_metrics_t {
u8 errno;
} __attribute__((packed)) dns_record;
u64 flow_rtt;
u64 ovs_dp_keys[2];
} __attribute__((packed)) flow_metrics;

// Force emitting struct pkt_drops into the ELF.
Expand Down
17 changes: 9 additions & 8 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,15 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
}

ebpfConfig := &ebpf.FlowFetcherConfig{
EnableIngress: ingress,
EnableEgress: egress,
Debug: debug,
Sampling: cfg.Sampling,
CacheMaxSize: cfg.CacheMaxFlows,
PktDrops: cfg.EnablePktDrops,
DNSTracker: cfg.EnableDNSTracking,
EnableRTT: cfg.EnableRTT,
EnableIngress: ingress,
EnableEgress: egress,
Debug: debug,
Sampling: cfg.Sampling,
CacheMaxSize: cfg.CacheMaxFlows,
PktDrops: cfg.EnablePktDrops,
DNSTracker: cfg.EnableDNSTracking,
EnableRTT: cfg.EnableRTT,
EnableOVSMonitoring: cfg.EnableOVSMonitoring,
}

fetcher, err := ebpf.NewFlowFetcher(ebpfConfig)
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,6 @@ type Config struct {
MetricsPort int `env:"METRICS_SERVER_PORT" envDefault:"9090"`
// MetricsPrefix is the prefix of the metrics that are sent to the server.
MetricsPrefix string `env:"METRICS_PREFIX" envDefault:"ebpf_agent_"`
// EnableOVSMonitoring enables monitoring of OVS flows, default is false.
EnableOVSMonitoring bool `env:"ENABLE_OVS_MONITORING" envDefault:"false"`
}
4 changes: 4 additions & 0 deletions pkg/ebpf/bpf_arm64_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_arm64_bpfel.o
Binary file not shown.
4 changes: 4 additions & 0 deletions pkg/ebpf/bpf_powerpc_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_powerpc_bpfel.o
Binary file not shown.
4 changes: 4 additions & 0 deletions pkg/ebpf/bpf_s390_bpfeb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_s390_bpfeb.o
Binary file not shown.
4 changes: 4 additions & 0 deletions pkg/ebpf/bpf_x86_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_x86_bpfel.o
Binary file not shown.
32 changes: 24 additions & 8 deletions pkg/ebpf/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,19 @@ type FlowFetcher struct {
pktDropsTracePoint link.Link
rttFentryLink link.Link
rttKprobeLink link.Link
ovsMonitoringLink link.Link
}

type FlowFetcherConfig struct {
EnableIngress bool
EnableEgress bool
Debug bool
Sampling int
CacheMaxSize int
PktDrops bool
DNSTracker bool
EnableRTT bool
EnableIngress bool
EnableEgress bool
Debug bool
Sampling int
CacheMaxSize int
PktDrops bool
DNSTracker bool
EnableRTT bool
EnableOVSMonitoring bool
}

func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
Expand Down Expand Up @@ -143,6 +145,14 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
}
}

var ovsMonitoringLink link.Link
if cfg.EnableOVSMonitoring {
ovsMonitoringLink, err = link.Tracepoint("net", "openvswitch", objects.OvsDpMonitor, nil)
if err != nil {
return nil, fmt.Errorf("failed to attach the BPF program to openvswitch tracepoint: %w", err)
}
}

var rttFentryLink, rttKprobeLink link.Link
if cfg.EnableRTT {
rttFentryLink, err = link.AttachTracing(link.TracingOptions{
Expand Down Expand Up @@ -176,6 +186,7 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
pktDropsTracePoint: pktDropsLink,
rttFentryLink: rttFentryLink,
rttKprobeLink: rttKprobeLink,
ovsMonitoringLink: ovsMonitoringLink,
}, nil
}

Expand Down Expand Up @@ -314,6 +325,11 @@ func (m *FlowFetcher) Close() error {
errs = append(errs, err)
}
}
if m.ovsMonitoringLink != nil {
if err := m.ovsMonitoringLink.Close(); err != nil {
errs = append(errs, err)
}
}
// m.ringbufReader.Read is a blocking operation, so we need to close the ring buffer
// from another goroutine to avoid the system not being able to exit if there
// isn't traffic in a given interface
Expand Down
6 changes: 6 additions & 0 deletions pkg/flow/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func TestRecordBinaryEncoding(t *testing.T) {
0x00, // errno
// u64 flow_rtt
0xad, 0xde, 0xef, 0xbe, 0xef, 0xbe, 0xad, 0xde,
// u64 ovs_dp_keys[2]
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
}))
require.NoError(t, err)

Expand Down Expand Up @@ -86,6 +88,10 @@ func TestRecordBinaryEncoding(t *testing.T) {
Errno: 0,
},
FlowRtt: 0xdeadbeefbeefdead,
OvsDpKeys: [2]uint64{
0x0000000000000000,
0x0000000000000000,
},
},
}, *fr)
// assert that IP addresses are interpreted as IPv4 addresses
Expand Down

0 comments on commit 02419f3

Please sign in to comment.