Skip to content

Commit

Permalink
flow filter feature userspace code
Browse files Browse the repository at this point in the history
Signed-off-by: Mohamed Mahmoud <[email protected]>
  • Loading branch information
msherif1234 committed Mar 29, 2024
1 parent a248f8a commit 40d2fe5
Show file tree
Hide file tree
Showing 4 changed files with 273 additions and 38 deletions.
33 changes: 25 additions & 8 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"k8s.io/apimachinery/pkg/util/intstr"
"net"
"net/http"
"time"
Expand Down Expand Up @@ -177,14 +178,23 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
}

ebpfConfig := &ebpf.FlowFetcherConfig{
EnableIngress: ingress,
EnableEgress: egress,
Debug: debug,
Sampling: cfg.Sampling,
CacheMaxSize: cfg.CacheMaxFlows,
PktDrops: cfg.EnablePktDrops,
DNSTracker: cfg.EnableDNSTracking,
EnableRTT: cfg.EnableRTT,
EnableIngress: ingress,
EnableEgress: egress,
Debug: debug,
Sampling: cfg.Sampling,
CacheMaxSize: cfg.CacheMaxFlows,
PktDrops: cfg.EnablePktDrops,
DNSTracker: cfg.EnableDNSTracking,
EnableRTT: cfg.EnableRTT,
EnableFlowFilter: cfg.EnableFlowFilter,
FlowFilterConfig: &ebpf.FlowFilterConfig{
FlowFilterDirection: cfg.FlowFilterDirection,
FlowFilterIPCIDR: cfg.FlowFilterIPCIDR,
FlowFilterProtocol: cfg.FlowFilterProtocol,
FlowFilterIP: cfg.FlowFilterIP,
FlowFilterDestinationPort: convertFilterPortsToInstr(cfg.FlowFilterDestinationPort, cfg.FlowFilterDestinationPortRange),
FlowFilterSourcePort: convertFilterPortsToInstr(cfg.FlowFilterSourcePort, cfg.FlowFilterSourcePortRange),
},
}

fetcher, err := ebpf.NewFlowFetcher(ebpfConfig)
Expand All @@ -195,6 +205,13 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
return flowsAgent(cfg, m, informer, fetcher, exportFunc, agentIP)
}

func convertFilterPortsToInstr(intPort int, rangePorts string) intstr.IntOrString {
if rangePorts == "" {
return intstr.FromInt(intPort)
}
return intstr.FromString(rangePorts)
}

// flowsAgent is a private constructor with injectable dependencies, usable for tests
func flowsAgent(cfg *Config, m *metrics.Metrics,
informer ifaces.Informer,
Expand Down
29 changes: 29 additions & 0 deletions pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,35 @@ type Config struct {
// MetricsPrefix is the prefix of the metrics that are sent to the server.
MetricsPrefix string `env:"METRICS_PREFIX" envDefault:"ebpf_agent_"`

// EnableFlowFilter enables flow filter, default is false.
EnableFlowFilter bool `env:"ENABLE_FLOW_FILTER" envDefault:"false"`
// FlowFilterDirection is the direction of the flow filter.
// Possible values are "ingress" and "egress".
FlowFilterDirection string `env:"FLOW_FILTER_DIRECTION" envDefault:"ingress"`
// FlowFilterIPCIDR is the IP CIDR to filter flows.
// Example: 10.10.10.0/24 or 100:100:100:100::/64
FlowFilterIPCIDR string `env:"FLOW_FILTER_IP_CIDR"`
// FlowFilterProtocol is the protocol to filter flows.
// Example: tcp,udp,sctp,icmpv4,icmpv6
FlowFilterProtocol string `env:"FLOW_FILTER_PROTOCOL"`
// FlowFilterSourcePort is the source port to filter flows.
FlowFilterSourcePort int `env:"FLOW_FILTER_SOURCE_PORT"`
// FlowFilterDestinationPort is the destination port to filter flows.
FlowFilterDestinationPort int `env:"FLOW_FILTER_DESTINATION_PORT"`
// FlowFilterSourcePortRange is the source port range to filter flows.
// Example: 8000-8010
FlowFilterSourcePortRange string `env:"FLOW_FILTER_SOURCE_PORT_RANGE"`
// FlowFilterDestinationPortRange is the destination port range to filter flows.
// Example: 8000-8010
FlowFilterDestinationPortRange string `env:"FLOW_FILTER_DESTINATION_PORT_RANGE"`
// FlowFilterICMPType is the ICMP type to filter flows.
FlowFilterICMPType int `env:"FLOW_FILTER_ICMP_TYPE"`
// FlowFilterICMPCode is the ICMP code to filter flows.
FlowFilterICMPCode int `env:"FLOW_FILTER_ICMP_CODE"`
// FlowFilterIP is the IP to filter flows.
// Example: 10.10.10.10
FlowFilterIP string `env:"FLOW_FILTER_IP"`

/* Deprecated configs are listed below this line
* See manageDeprecatedConfigs function for details
*/
Expand Down
167 changes: 167 additions & 0 deletions pkg/ebpf/flow_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package ebpf

import (
"fmt"
"net"
"strconv"
"strings"
"syscall"

"github.com/cilium/ebpf"
"k8s.io/apimachinery/pkg/util/intstr"
)

type FlowFilterConfig struct {
FlowFilterDirection string
FlowFilterIPCIDR string
FlowFilterProtocol string
FlowFilterSourcePort intstr.IntOrString
FlowFilterDestinationPort intstr.IntOrString
FlowFilterIcmpType int
FlowFilterIcmpCode int
FlowFilterIP string
}

type FlowFilter struct {
// eBPF objs to create/update eBPF maps
objects *BpfObjects
config *FlowFilterConfig
}

func NewFlowFilter(objects *BpfObjects, cfg *FlowFilterConfig) *FlowFilter {
return &FlowFilter{
objects: objects,
config: cfg,
}
}

func (f *FlowFilter) ProgramFlowFilter() error {
log.Infof("Flow filter config: %v", f.config)
key, err := f.getFlowFilterKey(f.config)
if err != nil {
return fmt.Errorf("failed to get flow filter key: %w", err)
}

val, err := f.getFlowFilterValue(f.config)
if err != nil {
return fmt.Errorf("failed to get flow filter value: %w", err)
}

err = f.objects.FilterMap.Update(key, val, ebpf.UpdateAny)
if err != nil {
return fmt.Errorf("failed to update flow filter map: %w", err)
}

log.Infof("Programmed flow filter with key: %v, value: %v", key, val)

return nil
}

func (f *FlowFilter) getFlowFilterKey(config *FlowFilterConfig) (BpfFilterKeyT, error) {
key := BpfFilterKeyT{}

ip, ipNet, err := net.ParseCIDR(config.FlowFilterIPCIDR)
if err != nil {
return key, fmt.Errorf("failed to parse FlowFilterIPCIDR: %w", err)
}
if ip.To4() != nil {
copy(key.IpData[:], ip.To4())
} else {
copy(key.IpData[:], ip.To16())
}
pfLen, _ := ipNet.Mask.Size()
key.PrefixLen = uint32(pfLen)

return key, nil
}

func (f *FlowFilter) getFlowFilterValue(config *FlowFilterConfig) (BpfFilterValueT, error) {
val := BpfFilterValueT{}

switch config.FlowFilterDirection {
case "ingress":
val.Direction = BpfDirectionTINGRESS
case "egress":
val.Direction = BpfDirectionTEGRESS
}

switch config.FlowFilterProtocol {
case "tcp":
val.Protocol = syscall.IPPROTO_TCP
val.DstPortStart, val.DstPortEnd = getDstPorts(config)
val.SrcPortStart, val.SrcPortEnd = getSrcPorts(config)
case "udp":
val.Protocol = syscall.IPPROTO_UDP
val.DstPortStart, val.DstPortEnd = getDstPorts(config)
val.SrcPortStart, val.SrcPortEnd = getSrcPorts(config)
case "sctp":
val.Protocol = syscall.IPPROTO_SCTP
val.DstPortStart, val.DstPortEnd = getDstPorts(config)
val.SrcPortStart, val.SrcPortEnd = getSrcPorts(config)
case "icmpv4":
val.Protocol = syscall.IPPROTO_ICMP
val.IcmpType = uint8(config.FlowFilterIcmpType)
val.IcmpCode = uint8(config.FlowFilterIcmpCode)
case "icmpv6":
val.Protocol = syscall.IPPROTO_ICMPV6
val.IcmpType = uint8(config.FlowFilterIcmpType)
val.IcmpCode = uint8(config.FlowFilterIcmpCode)
}

if config.FlowFilterIP != "" {
ip := net.ParseIP(config.FlowFilterIP)
if ip.To4() != nil {
copy(val.Ip[:], ip.To4())
} else {
copy(val.Ip[:], ip.To16())
}
}
return val, nil
}

func getSrcPorts(config *FlowFilterConfig) (uint16, uint16) {
if config.FlowFilterSourcePort.Type == intstr.Int {
return uint16(config.FlowFilterSourcePort.IntVal), 0
}
start, end, err := getPortsFromString(config.FlowFilterSourcePort.String())
if err != nil {
return 0, 0
}
return start, end
}

func getDstPorts(config *FlowFilterConfig) (uint16, uint16) {
if config.FlowFilterDestinationPort.Type == intstr.Int {
return uint16(config.FlowFilterDestinationPort.IntVal), 0
}
start, end, err := getPortsFromString(config.FlowFilterDestinationPort.String())
if err != nil {
return 0, 0
}
return start, end
}

func getPortsFromString(s string) (uint16, uint16, error) {
ps := strings.SplitN(s, "-", 2)
if len(ps) != 2 {
return 0, 0, fmt.Errorf("invalid ports range. Expected two integers separated by hyphen but found %s", s)
}
startPort, err := strconv.ParseUint(ps[0], 10, 16)
if err != nil {
return 0, 0, fmt.Errorf("invalid start port number %w", err)
}
endPort, err := strconv.ParseUint(ps[1], 10, 16)
if err != nil {
return 0, 0, fmt.Errorf("invalid end port number %w", err)
}
if startPort > endPort {
return 0, 0, fmt.Errorf("invalid port range. Start port is greater than end port")
}
if startPort == endPort {
return 0, 0, fmt.Errorf("invalid port range. Start and end port are equal. Remove the hyphen and enter a single port")
}
if startPort == 0 {
return 0, 0, fmt.Errorf("invalid start port 0")
}
return uint16(startPort), uint16(endPort), nil
}
82 changes: 52 additions & 30 deletions pkg/ebpf/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,23 @@ import (
)

// $BPF_CLANG and $BPF_CFLAGS are set by the Makefile.
//go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS -target amd64,arm64,ppc64le,s390x -type flow_metrics_t -type flow_id_t -type flow_record_t -type pkt_drops_t -type dns_record_t -type global_counters_key_t Bpf ../../bpf/flows.c -- -I../../bpf/headers
//go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS -target amd64,arm64,ppc64le,s390x -type flow_metrics_t -type flow_id_t -type flow_record_t -type pkt_drops_t -type dns_record_t -type global_counters_key_t -type direction_t Bpf ../../bpf/flows.c -- -I../../bpf/headers

const (
qdiscType = "clsact"
// ebpf map names as defined in bpf/maps_definition.h
aggregatedFlowsMap = "aggregated_flows"
dnsLatencyMap = "dns_flows"
// constants defined in flows.c as "volatile const"
constSampling = "sampling"
constTraceMessages = "trace_messages"
constEnableRtt = "enable_rtt"
constEnableDNSTracking = "enable_dns_tracking"
pktDropHook = "kfree_skb"
constPcaPort = "pca_port"
constPcaProto = "pca_proto"
pcaRecordsMap = "packet_record"
constSampling = "sampling"
constTraceMessages = "trace_messages"
constEnableRtt = "enable_rtt"
constEnableDNSTracking = "enable_dns_tracking"
constEnableFlowFiltering = "enable_flows_filtering"
pktDropHook = "kfree_skb"
constPcaPort = "pca_port"
constPcaProto = "pca_proto"
pcaRecordsMap = "packet_record"
)

var log = logrus.WithField("component", "ebpf.FlowFetcher")
Expand All @@ -68,14 +69,16 @@ type FlowFetcher struct {
}

type FlowFetcherConfig struct {
EnableIngress bool
EnableEgress bool
Debug bool
Sampling int
CacheMaxSize int
PktDrops bool
DNSTracker bool
EnableRTT bool
EnableIngress bool
EnableEgress bool
Debug bool
Sampling int
CacheMaxSize int
PktDrops bool
DNSTracker bool
EnableRTT bool
EnableFlowFilter bool
FlowFilterConfig *FlowFilterConfig
}

func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
Expand Down Expand Up @@ -111,11 +114,17 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
spec.Maps[dnsLatencyMap].MaxEntries = 1
}

enableFlowFiltering := 0
if cfg.EnableFlowFilter {
enableFlowFiltering = 1
}

if err := spec.RewriteConstants(map[string]interface{}{
constSampling: uint32(cfg.Sampling),
constTraceMessages: uint8(traceMsgs),
constEnableRtt: uint8(enableRtt),
constEnableDNSTracking: uint8(enableDNSTracking),
constSampling: uint32(cfg.Sampling),
constTraceMessages: uint8(traceMsgs),
constEnableRtt: uint8(enableRtt),
constEnableDNSTracking: uint8(enableDNSTracking),
constEnableFlowFiltering: uint8(enableFlowFiltering),
}); err != nil {
return nil, fmt.Errorf("rewriting BPF constants definition: %w", err)
}
Expand All @@ -129,6 +138,13 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
return nil, err
}

if cfg.EnableFlowFilter {
f := NewFlowFilter(&objects, cfg.FlowFilterConfig)
if err := f.ProgramFlowFilter(); err != nil {
return nil, fmt.Errorf("programming flow filter: %w", err)
}
}

log.Debugf("Deleting specs for PCA")
// Deleting specs for PCA
// Always set pcaRecordsMap to the minimum in FlowFetcher - PCA and Flow Fetcher are mutually exclusive.
Expand Down Expand Up @@ -346,6 +362,9 @@ func (m *FlowFetcher) Close() error {
if err := m.objects.GlobalCounters.Close(); err != nil {
errs = append(errs, err)
}
if err := m.objects.FilterMap.Close(); err != nil {
errs = append(errs, err)
}
if len(errs) == 0 {
m.objects = nil
}
Expand Down Expand Up @@ -455,18 +474,20 @@ func (m *FlowFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[BpfFlowId][]B
return flows
}

// ReadGlobalCounter reads the global counter and updates hashmap update error counter metrics
// ReadGlobalCounter reads the global counter and updates drop flows counter metrics
func (m *FlowFetcher) ReadGlobalCounter(met *metrics.Metrics) {
var allCPUValue []uint32
key := BpfGlobalCountersKeyTHASHMAP_FLOWS_DROPPED_KEY
reasons := []string{"CannotUpdateHashMapCounter", "FlowFilterDropCounter"}

if err := m.objects.GlobalCounters.Lookup(key, &allCPUValue); err != nil {
log.WithError(err).Warnf("couldn't read global counter")
return
}
// aggregate all the counters
for _, counter := range allCPUValue {
met.DroppedFlowsCounter.WithSourceAndReason("flow-fetcher", "CannotUpdateHashMapCounter").Add(float64(counter))
for key := BpfGlobalCountersKeyTHASHMAP_FLOWS_DROPPED_KEY; key < BpfGlobalCountersKeyTMAX_DROPPED_FLOWS_KEY; key++ {
if err := m.objects.GlobalCounters.Lookup(key, &allCPUValue); err != nil {
log.WithError(err).Warnf("couldn't read global counter")
return
}
// aggregate all the counters
for _, counter := range allCPUValue {
met.DroppedFlowsCounter.WithSourceAndReason("flow-fetcher", reasons[key]).Add(float64(counter))
}
}
}

Expand Down Expand Up @@ -601,6 +622,7 @@ func NewPacketFetcher(
delete(spec.Programs, constSampling)
delete(spec.Programs, constTraceMessages)
delete(spec.Programs, constEnableDNSTracking)
delete(spec.Programs, constEnableFlowFiltering)

pcaPort := 0
pcaProto := 0
Expand Down

0 comments on commit 40d2fe5

Please sign in to comment.