diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 637203dbe..5b22a5168 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "k8s.io/apimachinery/pkg/util/intstr" "net" "net/http" "time" @@ -177,14 +178,23 @@ func FlowsAgent(cfg *Config) (*Flows, error) { } ebpfConfig := &ebpf.FlowFetcherConfig{ - EnableIngress: ingress, - EnableEgress: egress, - Debug: debug, - Sampling: cfg.Sampling, - CacheMaxSize: cfg.CacheMaxFlows, - PktDrops: cfg.EnablePktDrops, - DNSTracker: cfg.EnableDNSTracking, - EnableRTT: cfg.EnableRTT, + EnableIngress: ingress, + EnableEgress: egress, + Debug: debug, + Sampling: cfg.Sampling, + CacheMaxSize: cfg.CacheMaxFlows, + PktDrops: cfg.EnablePktDrops, + DNSTracker: cfg.EnableDNSTracking, + EnableRTT: cfg.EnableRTT, + EnableFlowFilter: cfg.EnableFlowFilter, + FlowFilterConfig: &ebpf.FlowFilterConfig{ + FlowFilterDirection: cfg.FlowFilterDirection, + FlowFilterIPCIDR: cfg.FlowFilterIPCIDR, + FlowFilterProtocol: cfg.FlowFilterProtocol, + FlowFilterIP: cfg.FlowFilterIP, + FlowFilterDestinationPort: convertFilterPortsToInstr(cfg.FlowFilterDestinationPort, cfg.FlowFilterDestinationPortRange), + FlowFilterSourcePort: convertFilterPortsToInstr(cfg.FlowFilterSourcePort, cfg.FlowFilterSourcePortRange), + }, } fetcher, err := ebpf.NewFlowFetcher(ebpfConfig) @@ -195,6 +205,13 @@ func FlowsAgent(cfg *Config) (*Flows, error) { return flowsAgent(cfg, m, informer, fetcher, exportFunc, agentIP) } +func convertFilterPortsToInstr(intPort int, rangePorts string) intstr.IntOrString { + if rangePorts == "" { + return intstr.FromInt(intPort) + } + return intstr.FromString(rangePorts) +} + // flowsAgent is a private constructor with injectable dependencies, usable for tests func flowsAgent(cfg *Config, m *metrics.Metrics, informer ifaces.Informer, diff --git a/pkg/agent/config.go b/pkg/agent/config.go index ec6e99d2a..1c347ab4d 100644 --- a/pkg/agent/config.go +++ b/pkg/agent/config.go @@ -182,6 +182,35 @@ type Config struct { // MetricsPrefix is the prefix of the metrics that are sent to the server. MetricsPrefix string `env:"METRICS_PREFIX" envDefault:"ebpf_agent_"` + // EnableFlowFilter enables flow filter, default is false. + EnableFlowFilter bool `env:"ENABLE_FLOW_FILTER" envDefault:"false"` + // FlowFilterDirection is the direction of the flow filter. + // Possible values are "ingress" and "egress". + FlowFilterDirection string `env:"FLOW_FILTER_DIRECTION" envDefault:"ingress"` + // FlowFilterIPCIDR is the IP CIDR to filter flows. + // Example: 10.10.10.0/24 or 100:100:100:100::/64 + FlowFilterIPCIDR string `env:"FLOW_FILTER_IP_CIDR"` + // FlowFilterProtocol is the protocol to filter flows. + // Example: tcp,udp,sctp,icmpv4,icmpv6 + FlowFilterProtocol string `env:"FLOW_FILTER_PROTOCOL"` + // FlowFilterSourcePort is the source port to filter flows. + FlowFilterSourcePort int `env:"FLOW_FILTER_SOURCE_PORT"` + // FlowFilterDestinationPort is the destination port to filter flows. + FlowFilterDestinationPort int `env:"FLOW_FILTER_DESTINATION_PORT"` + // FlowFilterSourcePortRange is the source port range to filter flows. + // Example: 8000-8010 + FlowFilterSourcePortRange string `env:"FLOW_FILTER_SOURCE_PORT_RANGE"` + // FlowFilterDestinationPortRange is the destination port range to filter flows. + // Example: 8000-8010 + FlowFilterDestinationPortRange string `env:"FLOW_FILTER_DESTINATION_PORT_RANGE"` + // FlowFilterICMPType is the ICMP type to filter flows. + FlowFilterICMPType int `env:"FLOW_FILTER_ICMP_TYPE"` + // FlowFilterICMPCode is the ICMP code to filter flows. + FlowFilterICMPCode int `env:"FLOW_FILTER_ICMP_CODE"` + // FlowFilterIP is the IP to filter flows. + // Example: 10.10.10.10 + FlowFilterIP string `env:"FLOW_FILTER_IP"` + /* Deprecated configs are listed below this line * See manageDeprecatedConfigs function for details */ diff --git a/pkg/ebpf/flow_filter.go b/pkg/ebpf/flow_filter.go new file mode 100644 index 000000000..6d4c9ab2a --- /dev/null +++ b/pkg/ebpf/flow_filter.go @@ -0,0 +1,167 @@ +package ebpf + +import ( + "fmt" + "net" + "strconv" + "strings" + "syscall" + + "github.com/cilium/ebpf" + "k8s.io/apimachinery/pkg/util/intstr" +) + +type FlowFilterConfig struct { + FlowFilterDirection string + FlowFilterIPCIDR string + FlowFilterProtocol string + FlowFilterSourcePort intstr.IntOrString + FlowFilterDestinationPort intstr.IntOrString + FlowFilterIcmpType int + FlowFilterIcmpCode int + FlowFilterIP string +} + +type FlowFilter struct { + // eBPF objs to create/update eBPF maps + objects *BpfObjects + config *FlowFilterConfig +} + +func NewFlowFilter(objects *BpfObjects, cfg *FlowFilterConfig) *FlowFilter { + return &FlowFilter{ + objects: objects, + config: cfg, + } +} + +func (f *FlowFilter) ProgramFlowFilter() error { + log.Infof("Flow filter config: %v", f.config) + key, err := f.getFlowFilterKey(f.config) + if err != nil { + return fmt.Errorf("failed to get flow filter key: %w", err) + } + + val, err := f.getFlowFilterValue(f.config) + if err != nil { + return fmt.Errorf("failed to get flow filter value: %w", err) + } + + err = f.objects.FilterMap.Update(key, val, ebpf.UpdateAny) + if err != nil { + return fmt.Errorf("failed to update flow filter map: %w", err) + } + + log.Infof("Programmed flow filter with key: %v, value: %v", key, val) + + return nil +} + +func (f *FlowFilter) getFlowFilterKey(config *FlowFilterConfig) (BpfFilterKeyT, error) { + key := BpfFilterKeyT{} + + ip, ipNet, err := net.ParseCIDR(config.FlowFilterIPCIDR) + if err != nil { + return key, fmt.Errorf("failed to parse FlowFilterIPCIDR: %w", err) + } + if ip.To4() != nil { + copy(key.IpData[:], ip.To4()) + } else { + copy(key.IpData[:], ip.To16()) + } + pfLen, _ := ipNet.Mask.Size() + key.PrefixLen = uint32(pfLen) + + return key, nil +} + +func (f *FlowFilter) getFlowFilterValue(config *FlowFilterConfig) (BpfFilterValueT, error) { + val := BpfFilterValueT{} + + switch config.FlowFilterDirection { + case "ingress": + val.Direction = BpfDirectionTINGRESS + case "egress": + val.Direction = BpfDirectionTEGRESS + } + + switch config.FlowFilterProtocol { + case "tcp": + val.Protocol = syscall.IPPROTO_TCP + val.DstPortStart, val.DstPortEnd = getDstPorts(config) + val.SrcPortStart, val.SrcPortEnd = getSrcPorts(config) + case "udp": + val.Protocol = syscall.IPPROTO_UDP + val.DstPortStart, val.DstPortEnd = getDstPorts(config) + val.SrcPortStart, val.SrcPortEnd = getSrcPorts(config) + case "sctp": + val.Protocol = syscall.IPPROTO_SCTP + val.DstPortStart, val.DstPortEnd = getDstPorts(config) + val.SrcPortStart, val.SrcPortEnd = getSrcPorts(config) + case "icmpv4": + val.Protocol = syscall.IPPROTO_ICMP + val.IcmpType = uint8(config.FlowFilterIcmpType) + val.IcmpCode = uint8(config.FlowFilterIcmpCode) + case "icmpv6": + val.Protocol = syscall.IPPROTO_ICMPV6 + val.IcmpType = uint8(config.FlowFilterIcmpType) + val.IcmpCode = uint8(config.FlowFilterIcmpCode) + } + + if config.FlowFilterIP != "" { + ip := net.ParseIP(config.FlowFilterIP) + if ip.To4() != nil { + copy(val.Ip[:], ip.To4()) + } else { + copy(val.Ip[:], ip.To16()) + } + } + return val, nil +} + +func getSrcPorts(config *FlowFilterConfig) (uint16, uint16) { + if config.FlowFilterSourcePort.Type == intstr.Int { + return uint16(config.FlowFilterSourcePort.IntVal), 0 + } + start, end, err := getPortsFromString(config.FlowFilterSourcePort.String()) + if err != nil { + return 0, 0 + } + return start, end +} + +func getDstPorts(config *FlowFilterConfig) (uint16, uint16) { + if config.FlowFilterDestinationPort.Type == intstr.Int { + return uint16(config.FlowFilterDestinationPort.IntVal), 0 + } + start, end, err := getPortsFromString(config.FlowFilterDestinationPort.String()) + if err != nil { + return 0, 0 + } + return start, end +} + +func getPortsFromString(s string) (uint16, uint16, error) { + ps := strings.SplitN(s, "-", 2) + if len(ps) != 2 { + return 0, 0, fmt.Errorf("invalid ports range. Expected two integers separated by hyphen but found %s", s) + } + startPort, err := strconv.ParseUint(ps[0], 10, 16) + if err != nil { + return 0, 0, fmt.Errorf("invalid start port number %w", err) + } + endPort, err := strconv.ParseUint(ps[1], 10, 16) + if err != nil { + return 0, 0, fmt.Errorf("invalid end port number %w", err) + } + if startPort > endPort { + return 0, 0, fmt.Errorf("invalid port range. Start port is greater than end port") + } + if startPort == endPort { + return 0, 0, fmt.Errorf("invalid port range. Start and end port are equal. Remove the hyphen and enter a single port") + } + if startPort == 0 { + return 0, 0, fmt.Errorf("invalid start port 0") + } + return uint16(startPort), uint16(endPort), nil +} diff --git a/pkg/ebpf/tracer.go b/pkg/ebpf/tracer.go index 99683f063..ecef730f4 100644 --- a/pkg/ebpf/tracer.go +++ b/pkg/ebpf/tracer.go @@ -27,7 +27,7 @@ import ( ) // $BPF_CLANG and $BPF_CFLAGS are set by the Makefile. -//go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS -target amd64,arm64,ppc64le,s390x -type flow_metrics_t -type flow_id_t -type flow_record_t -type pkt_drops_t -type dns_record_t -type global_counters_key_t Bpf ../../bpf/flows.c -- -I../../bpf/headers +//go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS -target amd64,arm64,ppc64le,s390x -type flow_metrics_t -type flow_id_t -type flow_record_t -type pkt_drops_t -type dns_record_t -type global_counters_key_t -type direction_t Bpf ../../bpf/flows.c -- -I../../bpf/headers const ( qdiscType = "clsact" @@ -35,14 +35,15 @@ const ( aggregatedFlowsMap = "aggregated_flows" dnsLatencyMap = "dns_flows" // constants defined in flows.c as "volatile const" - constSampling = "sampling" - constTraceMessages = "trace_messages" - constEnableRtt = "enable_rtt" - constEnableDNSTracking = "enable_dns_tracking" - pktDropHook = "kfree_skb" - constPcaPort = "pca_port" - constPcaProto = "pca_proto" - pcaRecordsMap = "packet_record" + constSampling = "sampling" + constTraceMessages = "trace_messages" + constEnableRtt = "enable_rtt" + constEnableDNSTracking = "enable_dns_tracking" + constEnableFlowFiltering = "enable_flows_filtering" + pktDropHook = "kfree_skb" + constPcaPort = "pca_port" + constPcaProto = "pca_proto" + pcaRecordsMap = "packet_record" ) var log = logrus.WithField("component", "ebpf.FlowFetcher") @@ -68,14 +69,16 @@ type FlowFetcher struct { } type FlowFetcherConfig struct { - EnableIngress bool - EnableEgress bool - Debug bool - Sampling int - CacheMaxSize int - PktDrops bool - DNSTracker bool - EnableRTT bool + EnableIngress bool + EnableEgress bool + Debug bool + Sampling int + CacheMaxSize int + PktDrops bool + DNSTracker bool + EnableRTT bool + EnableFlowFilter bool + FlowFilterConfig *FlowFilterConfig } func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) { @@ -111,11 +114,17 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) { spec.Maps[dnsLatencyMap].MaxEntries = 1 } + enableFlowFiltering := 0 + if cfg.EnableFlowFilter { + enableFlowFiltering = 1 + } + if err := spec.RewriteConstants(map[string]interface{}{ - constSampling: uint32(cfg.Sampling), - constTraceMessages: uint8(traceMsgs), - constEnableRtt: uint8(enableRtt), - constEnableDNSTracking: uint8(enableDNSTracking), + constSampling: uint32(cfg.Sampling), + constTraceMessages: uint8(traceMsgs), + constEnableRtt: uint8(enableRtt), + constEnableDNSTracking: uint8(enableDNSTracking), + constEnableFlowFiltering: uint8(enableFlowFiltering), }); err != nil { return nil, fmt.Errorf("rewriting BPF constants definition: %w", err) } @@ -129,6 +138,13 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) { return nil, err } + if cfg.EnableFlowFilter { + f := NewFlowFilter(&objects, cfg.FlowFilterConfig) + if err := f.ProgramFlowFilter(); err != nil { + return nil, fmt.Errorf("programming flow filter: %w", err) + } + } + log.Debugf("Deleting specs for PCA") // Deleting specs for PCA // Always set pcaRecordsMap to the minimum in FlowFetcher - PCA and Flow Fetcher are mutually exclusive. @@ -346,6 +362,9 @@ func (m *FlowFetcher) Close() error { if err := m.objects.GlobalCounters.Close(); err != nil { errs = append(errs, err) } + if err := m.objects.FilterMap.Close(); err != nil { + errs = append(errs, err) + } if len(errs) == 0 { m.objects = nil } @@ -455,18 +474,20 @@ func (m *FlowFetcher) LookupAndDeleteMap(met *metrics.Metrics) map[BpfFlowId][]B return flows } -// ReadGlobalCounter reads the global counter and updates hashmap update error counter metrics +// ReadGlobalCounter reads the global counter and updates drop flows counter metrics func (m *FlowFetcher) ReadGlobalCounter(met *metrics.Metrics) { var allCPUValue []uint32 - key := BpfGlobalCountersKeyTHASHMAP_FLOWS_DROPPED_KEY + reasons := []string{"CannotUpdateHashMapCounter", "FlowFilterDropCounter"} - if err := m.objects.GlobalCounters.Lookup(key, &allCPUValue); err != nil { - log.WithError(err).Warnf("couldn't read global counter") - return - } - // aggregate all the counters - for _, counter := range allCPUValue { - met.DroppedFlowsCounter.WithSourceAndReason("flow-fetcher", "CannotUpdateHashMapCounter").Add(float64(counter)) + for key := BpfGlobalCountersKeyTHASHMAP_FLOWS_DROPPED_KEY; key < BpfGlobalCountersKeyTMAX_DROPPED_FLOWS_KEY; key++ { + if err := m.objects.GlobalCounters.Lookup(key, &allCPUValue); err != nil { + log.WithError(err).Warnf("couldn't read global counter") + return + } + // aggregate all the counters + for _, counter := range allCPUValue { + met.DroppedFlowsCounter.WithSourceAndReason("flow-fetcher", reasons[key]).Add(float64(counter)) + } } } @@ -601,6 +622,7 @@ func NewPacketFetcher( delete(spec.Programs, constSampling) delete(spec.Programs, constTraceMessages) delete(spec.Programs, constEnableDNSTracking) + delete(spec.Programs, constEnableFlowFiltering) pcaPort := 0 pcaProto := 0