diff --git a/bpf/flows.c b/bpf/flows.c index a226af3c8..70c7a9ee6 100644 --- a/bpf/flows.c +++ b/bpf/flows.c @@ -79,6 +79,23 @@ static inline void update_dns(additional_metrics *extra_metrics, pkt_info *pkt, } } +static inline void add_observed_intf(additional_metrics *value, u32 if_index, u8 direction) { + if (value->nb_observed_intf < MAX_OBSERVED_INTERFACES) { + for (u8 i = 0; i < value->nb_observed_intf; i++) { + if (value->observed_intf[i].if_index == if_index && + value->observed_intf[i].direction == direction) { + return; + } + } + value->observed_intf[value->nb_observed_intf].if_index = if_index; + value->observed_intf[value->nb_observed_intf].direction = direction; + value->nb_observed_intf++; + } else { + increase_counter(OBSERVED_INTF_MISSED); + BPF_PRINTK("observed interface missed (array capacity reached) for ifindex %d\n", if_index); + } +} + static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { u32 filter_sampling = 0; @@ -110,13 +127,10 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { return TC_ACT_OK; } - //Set extra fields - id.if_index = skb->ifindex; - id.direction = direction; - // check if this packet need to be filtered if filtering feature is enabled if (is_filter_enabled()) { - bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, &filter_sampling); + bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, &filter_sampling, + direction); if (filter_sampling == 0) { filter_sampling = sampling; } @@ -137,19 +151,47 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { } flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id); if (aggregate_flow != NULL) { - update_existing_flow(aggregate_flow, &pkt, len, filter_sampling); + if (aggregate_flow->if_index_first_seen == skb->ifindex) { + update_existing_flow(aggregate_flow, &pkt, len, filter_sampling); + } else if (skb->ifindex != 0) { + // Only add info that we've seen this interface + additional_metrics *extra_metrics = + (additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id); + if (extra_metrics != NULL) { + add_observed_intf(extra_metrics, skb->ifindex, direction); + } else { + additional_metrics new_metrics = { + .eth_protocol = eth_protocol, + .start_mono_time_ts = pkt.current_ts, + .end_mono_time_ts = pkt.current_ts, + }; + add_observed_intf(&new_metrics, skb->ifindex, direction); + long ret = + bpf_map_update_elem(&additional_flow_metrics, &id, &new_metrics, BPF_NOEXIST); + if (ret == -EEXIST) { + extra_metrics = + (additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id); + if (extra_metrics != NULL) { + add_observed_intf(extra_metrics, skb->ifindex, direction); + } + } else if (ret != 0 && trace_messages) { + bpf_printk("error creating new observed_intf: %d\n", ret); + } + } + } } else { // Key does not exist in the map, and will need to create a new entry. - flow_metrics new_flow = { - .packets = 1, - .bytes = len, - .eth_protocol = eth_protocol, - .start_mono_time_ts = pkt.current_ts, - .end_mono_time_ts = pkt.current_ts, - .flags = pkt.flags, - .dscp = pkt.dscp, - .sampling = filter_sampling, - }; + flow_metrics new_flow; + __builtin_memset(&new_flow, 0, sizeof(new_flow)); + new_flow.if_index_first_seen = skb->ifindex; + new_flow.direction_first_seen = direction; + new_flow.packets = 1; + new_flow.bytes = len; + new_flow.eth_protocol = eth_protocol; + new_flow.start_mono_time_ts = pkt.current_ts; + new_flow.end_mono_time_ts = pkt.current_ts; + new_flow.dscp = pkt.dscp; + new_flow.sampling = filter_sampling; __builtin_memcpy(new_flow.dst_mac, eth->h_dest, ETH_ALEN); __builtin_memcpy(new_flow.src_mac, eth->h_source, ETH_ALEN); @@ -194,9 +236,6 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { // Update additional metrics (per-CPU map) if (pkt.dns_id != 0 || dns_errno != 0) { - // hack on id will be removed with dedup-in-kernel work - id.direction = 0; - id.if_index = 0; additional_metrics *extra_metrics = (additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id); if (extra_metrics != NULL) { diff --git a/bpf/flows_filter.h b/bpf/flows_filter.h index 74e8553aa..43b211bfb 100644 --- a/bpf/flows_filter.h +++ b/bpf/flows_filter.h @@ -33,7 +33,8 @@ static __always_inline int is_equal_ip(u8 *ip1, u8 *ip2, u8 len) { static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_t *key, filter_action *action, u8 len, u8 offset, - u16 flags, u32 drop_reason, u32 *sampling) { + u16 flags, u32 drop_reason, u32 *sampling, + u8 direction) { int result = 0; struct filter_value_t *rule = (struct filter_value_t *)bpf_map_lookup_elem(&filter_map, key); @@ -161,7 +162,7 @@ static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_ if (!is_zero_ip(rule->ip, len)) { // for Ingress side we can filter using dstIP and for Egress side we can filter using srcIP - if (id->direction == INGRESS) { + if (direction == INGRESS) { if (is_equal_ip(rule->ip, id->dst_ip + offset, len)) { BPF_PRINTK("dstIP matched\n"); result++; @@ -181,7 +182,7 @@ static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_ } if (rule->direction != MAX_DIRECTION) { - if (rule->direction == id->direction) { + if (rule->direction == direction) { BPF_PRINTK("direction matched\n"); result++; } else { @@ -237,7 +238,8 @@ static __always_inline int flow_filter_setup_lookup_key(flow_id *id, struct filt * check if the flow match filter rule and return >= 1 if the flow is to be dropped */ static __always_inline int is_flow_filtered(flow_id *id, filter_action *action, u16 flags, - u32 drop_reason, u16 eth_protocol, u32 *sampling) { + u32 drop_reason, u16 eth_protocol, u32 *sampling, + u8 direction) { struct filter_key_t key; u8 len, offset; int result = 0; @@ -251,7 +253,8 @@ static __always_inline int is_flow_filtered(flow_id *id, filter_action *action, return result; } - result = do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason, sampling); + result = do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason, sampling, + direction); // we have a match so return if (result > 0) { return result; @@ -263,7 +266,8 @@ static __always_inline int is_flow_filtered(flow_id *id, filter_action *action, return result; } - return do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason, sampling); + return do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason, sampling, + direction); } #endif //__FLOWS_FILTER_H__ diff --git a/bpf/network_events_monitoring.h b/bpf/network_events_monitoring.h index ef1839f87..96bac5a1f 100644 --- a/bpf/network_events_monitoring.h +++ b/bpf/network_events_monitoring.h @@ -94,17 +94,14 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me } // check if this packet need to be filtered if filtering feature is enabled - bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL); + bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL, 0); if (skip) { return 0; } - for (direction dir = INGRESS; dir < MAX_DIRECTION; dir++) { - id.direction = dir; - ret = lookup_and_update_existing_flow_network_events(&id, md_len, user_cookie); - if (ret == 0) { - return ret; - } + ret = lookup_and_update_existing_flow_network_events(&id, md_len, user_cookie); + if (ret == 0) { + return ret; } // there is no matching flows so lets create new one and add the network event metadata diff --git a/bpf/pca.h b/bpf/pca.h index 7f4acd563..8c77d08f8 100644 --- a/bpf/pca.h +++ b/bpf/pca.h @@ -53,12 +53,8 @@ static inline bool validate_pca_filter(struct __sk_buff *skb, direction dir) { return false; } - //Set extra fields - id.if_index = skb->ifindex; - id.direction = dir; - // check if this packet need to be filtered if filtering feature is enabled - bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, NULL); + bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, NULL, dir); if (skip) { return false; } diff --git a/bpf/pkt_drops.h b/bpf/pkt_drops.h index 5684caba6..a0733197c 100644 --- a/bpf/pkt_drops.h +++ b/bpf/pkt_drops.h @@ -64,7 +64,7 @@ static inline int trace_pkt_drop(void *ctx, u8 state, struct sk_buff *skb, } // check if this packet need to be filtered if filtering feature is enabled - bool skip = check_and_do_flow_filtering(&id, flags, reason, eth_protocol, NULL); + bool skip = check_and_do_flow_filtering(&id, flags, reason, eth_protocol, NULL, 0); if (skip) { return 0; } diff --git a/bpf/pkt_translation.h b/bpf/pkt_translation.h index 00c0397da..1602665c3 100644 --- a/bpf/pkt_translation.h +++ b/bpf/pkt_translation.h @@ -163,7 +163,7 @@ static inline int trace_nat_manip_pkt(struct nf_conn *ct, struct sk_buff *skb) { } // check if this packet need to be filtered if filtering feature is enabled - bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL); + bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL, 0); if (skip) { return 0; } diff --git a/bpf/rtt_tracker.h b/bpf/rtt_tracker.h index f3ac87478..c102557af 100644 --- a/bpf/rtt_tracker.h +++ b/bpf/rtt_tracker.h @@ -59,7 +59,7 @@ static inline int calculate_flow_rtt_tcp(struct sock *sk, struct sk_buff *skb) { rtt *= 1000u; // check if this packet need to be filtered if filtering feature is enabled - bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL); + bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL, 0); if (skip) { return 0; } diff --git a/bpf/types.h b/bpf/types.h index 558ba2f17..ca876393e 100644 --- a/bpf/types.h +++ b/bpf/types.h @@ -66,6 +66,7 @@ typedef __u64 u64; #define MAX_FILTER_ENTRIES 16 #define MAX_EVENT_MD 8 #define MAX_NETWORK_EVENTS 4 +#define MAX_OBSERVED_INTERFACES 4 // according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml typedef enum direction_t { @@ -80,26 +81,29 @@ const enum direction_t *unused1 __attribute__((unused)); const u8 ip4in6[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}; typedef struct flow_metrics_t { - struct bpf_spin_lock lock; - u16 eth_protocol; - // L2 data link layer - u8 src_mac[ETH_ALEN]; - u8 dst_mac[ETH_ALEN]; - u32 packets; - u64 bytes; // Flow start and end times as monotomic timestamps in nanoseconds // as output from bpf_ktime_get_ns() u64 start_mono_time_ts; u64 end_mono_time_ts; + u64 bytes; + u32 packets; + u16 eth_protocol; // TCP Flags from https://www.ietf.org/rfc/rfc793.txt u16 flags; + // L2 data link layer + u8 src_mac[ETH_ALEN]; + u8 dst_mac[ETH_ALEN]; + // OS interface index + u32 if_index_first_seen; + struct bpf_spin_lock lock; + u32 sampling; + u8 direction_first_seen; // The positive errno of a failed map insertion that caused a flow // to be sent via ringbuffer. // 0 otherwise // https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md u8 errno; u8 dscp; - u32 sampling; } flow_metrics; // Force emitting enums/structs into the ELF @@ -109,22 +113,20 @@ typedef struct additional_metrics_t { u64 start_mono_time_ts; u64 end_mono_time_ts; struct dns_record_t { + u64 latency; u16 id; u16 flags; - u64 latency; u8 errno; } dns_record; struct pkt_drops_t { - u32 packets; u64 bytes; + u32 packets; + u32 latest_drop_cause; u16 latest_flags; u8 latest_state; - u32 latest_drop_cause; } pkt_drops; u64 flow_rtt; - u8 network_events_idx; u8 network_events[MAX_NETWORK_EVENTS][MAX_EVENT_MD]; - u16 eth_protocol; struct translated_flow_t { u8 saddr[IP_MAX_LEN]; u8 daddr[IP_MAX_LEN]; @@ -133,23 +135,24 @@ typedef struct additional_metrics_t { u16 zone_id; u8 icmp_id; } translated_flow; + struct observed_intf_t { + u8 direction; + u32 if_index; + } observed_intf[MAX_OBSERVED_INTERFACES]; + u16 eth_protocol; + u8 network_events_idx; + u8 nb_observed_intf; } additional_metrics; // Force emitting enums/structs into the ELF const struct additional_metrics_t *unused3 __attribute__((unused)); - -// Force emitting enums/structs into the ELF const struct dns_record_t *unused4 __attribute__((unused)); - -// Force emitting enums/structs into the ELF const struct pkt_drops_t *unused5 __attribute__((unused)); - -// Force emitting struct translated_flow_t into the ELF. const struct translated_flow_t *unused6 __attribute__((unused)); +const struct observed_intf_t *unused13 __attribute__((unused)); // Attributes that uniquely identify a flow typedef struct flow_id_t { - u8 direction; // L3 network layer // IPv4 addresses are encoded as IPv6 addresses with prefix ::ffff/96 // as described in https://datatracker.ietf.org/doc/html/rfc4038#section-4.2 @@ -162,8 +165,6 @@ typedef struct flow_id_t { // ICMP protocol u8 icmp_type; u8 icmp_code; - // OS interface index - u32 if_index; } flow_id; // Force emitting enums/structs into the ELF @@ -220,6 +221,7 @@ typedef enum global_counters_key_t { NETWORK_EVENTS_ERR_GROUPID_MISMATCH, NETWORK_EVENTS_ERR_UPDATE_MAP_FLOWS, NETWORK_EVENTS_GOOD, + OBSERVED_INTF_MISSED, MAX_COUNTERS, } global_counters_key; @@ -230,7 +232,7 @@ const enum global_counters_key_t *unused9 __attribute__((unused)); struct filter_key_t { u32 prefix_len; u8 ip_data[IP_MAX_LEN]; -} __attribute__((packed)); +} filter_key; // Force emitting enums/structs into the ELF const struct filter_key_t *unused10 __attribute__((unused)); diff --git a/bpf/utils.h b/bpf/utils.h index 64b6a6ce9..face7d000 100644 --- a/bpf/utils.h +++ b/bpf/utils.h @@ -184,12 +184,14 @@ static inline bool is_filter_enabled() { /* * check if flow filter is enabled and if we need to continue processing the packet or not */ -static inline bool check_and_do_flow_filtering(flow_id *id, u16 flags, u32 drop_reason, - u16 eth_protocol, u32 *sampling) { +static __always_inline bool check_and_do_flow_filtering(flow_id *id, u16 flags, u32 drop_reason, + u16 eth_protocol, u32 *sampling, + u8 direction) { // check if this packet need to be filtered if filtering feature is enabled if (is_filter_enabled()) { filter_action action = ACCEPT; - if (is_flow_filtered(id, &action, flags, drop_reason, eth_protocol, sampling) != 0 && + if (is_flow_filtered(id, &action, flags, drop_reason, eth_protocol, sampling, direction) != + 0 && action != MAX_FILTER_ACTIONS) { // we have matching rules follow through the actions to decide if we should accept or reject the flow // and update global counter for both cases diff --git a/cmd/netobserv-ebpf-agent.go b/cmd/netobserv-ebpf-agent.go index 4da302c44..fe76df419 100644 --- a/cmd/netobserv-ebpf-agent.go +++ b/cmd/netobserv-ebpf-agent.go @@ -51,9 +51,6 @@ func main() { Error("PProf HTTP listener stopped working") }() } - if config.DeduperFCExpiry == 0 { - config.DeduperFCExpiry = 2 * config.CacheActiveTimeout - } logrus.WithField("configuration", fmt.Sprintf("%#v", config)).Debugf("configuration loaded") diff --git a/docs/architecture.md b/docs/architecture.md index fea886396..dc7e75c70 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -6,6 +6,8 @@ The following graph provides a birds' eye view on how the different components a For more info on each component, please check their corresponding Go docs. + + ### Kernel space ```mermaid @@ -33,21 +35,13 @@ flowchart TD ```mermaid flowchart TD E(ebpf.FlowFetcher) --> |"pushes via
RingBuffer"| RB(flow.RingBufTracer) - style E fill:#990 + style E fill:#7CA E --> |"polls
HashMap"| M(flow.MapTracer) RB --> |chan *model.Record| ACC(flow.Accounter) RB -.-> |flushes| M - ACC --> |"chan []*model.Record"| DD(flow.Deduper) - M --> |"chan []*model.Record"| DD - - subgraph Optional - DD - end - - DD --> |"chan []*model.Record"| CL(flow.CapacityLimiter) + ACC --> |"chan []*model.Record"| CL(flow.CapacityLimiter) + M --> |"chan []*model.Record"| CL - CL --> |"chan []*model.Record"| DC(flow.Decorator) - - DC --> |"chan []*model.Record"| EX("export.GRPCProto
or
export.KafkaProto
or
export.DirectFLP") + CL --> |"chan []*model.Record"| EX("export.GRPCProto
or
export.KafkaProto
or
export.DirectFLP") ``` diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 47fab9897..ccf8a7e82 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -119,13 +119,8 @@ type Flows struct { rbTracer *flow.RingBufTracer accounter *flow.Accounter limiter *flow.CapacityLimiter - deduper node.MiddleFunc[[]*model.Record, []*model.Record] exporter node.TerminalFunc[[]*model.Record] - // elements used to decorate flows with extra information - interfaceNamer flow.InterfaceNamer - agentIP net.IP - status Status promoServer *http.Server sampleDecoder *ovnobserv.SampleDecoder @@ -290,6 +285,8 @@ func flowsAgent(cfg *Config, m *metrics.Metrics, } return iface } + model.SetGlobals(agentIP, interfaceNamer) + var promoServer *http.Server if cfg.MetricsEnable { promoServer = promo.InitializePrometheus(m.Settings) @@ -302,26 +299,19 @@ func flowsAgent(cfg *Config, m *metrics.Metrics, rbTracer := flow.NewRingBufTracer(fetcher, mapTracer, cfg.CacheActiveTimeout, m) accounter := flow.NewAccounter(cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now, m) limiter := flow.NewCapacityLimiter(m) - var deduper node.MiddleFunc[[]*model.Record, []*model.Record] - if cfg.Deduper == DeduperFirstCome { - deduper = flow.Dedupe(cfg.DeduperFCExpiry, cfg.DeduperJustMark, cfg.DeduperMerge, interfaceNamer, m) - } return &Flows{ - ebpf: fetcher, - exporter: exporter, - interfaces: registerer, - filter: filter, - cfg: cfg, - mapTracer: mapTracer, - rbTracer: rbTracer, - accounter: accounter, - limiter: limiter, - deduper: deduper, - agentIP: agentIP, - interfaceNamer: interfaceNamer, - promoServer: promoServer, - sampleDecoder: s, + ebpf: fetcher, + exporter: exporter, + interfaces: registerer, + filter: filter, + cfg: cfg, + mapTracer: mapTracer, + rbTracer: rbTracer, + accounter: accounter, + limiter: limiter, + promoServer: promoServer, + sampleDecoder: s, }, nil } @@ -517,9 +507,6 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*mo limiter := node.AsMiddle(f.limiter.Limit, node.ChannelBufferLen(f.cfg.BuffersLength)) - decorator := node.AsMiddle(flow.Decorate(f.agentIP, f.interfaceNamer), - node.ChannelBufferLen(f.cfg.BuffersLength)) - ebl := f.cfg.ExporterBufferLength if ebl == 0 { ebl = f.cfg.BuffersLength @@ -530,17 +517,9 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*mo rbTracer.SendsTo(accounter) - if f.deduper != nil { - deduper := node.AsMiddle(f.deduper, node.ChannelBufferLen(f.cfg.BuffersLength)) - mapTracer.SendsTo(deduper) - accounter.SendsTo(deduper) - deduper.SendsTo(limiter) - } else { - mapTracer.SendsTo(limiter) - accounter.SendsTo(limiter) - } - limiter.SendsTo(decorator) - decorator.SendsTo(export) + mapTracer.SendsTo(limiter) + accounter.SendsTo(limiter) + limiter.SendsTo(export) alog.Debug("starting graph") mapTracer.Start() diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go index 4df5d352f..908b17a48 100644 --- a/pkg/agent/agent_test.go +++ b/pkg/agent/agent_test.go @@ -48,165 +48,72 @@ var ( key1 = ebpf.BpfFlowId{ SrcPort: 123, DstPort: 456, - IfIndex: 3, } - key1Dupe = ebpf.BpfFlowId{ - SrcPort: 123, - DstPort: 456, - IfIndex: 4, - } - key2 = ebpf.BpfFlowId{ SrcPort: 333, DstPort: 532, - IfIndex: 3, } ) -func TestFlowsAgent_Deduplication(t *testing.T) { - export := testAgent(t, &Config{ - CacheActiveTimeout: 10 * time.Millisecond, - CacheMaxFlows: 100, - DeduperJustMark: false, - Deduper: DeduperFirstCome, - }) - - exported := export.Get(t, timeout) - assert.Len(t, exported, 2) - - receivedKeys := map[ebpf.BpfFlowId]struct{}{} - - var key1Flows []*model.Record - for _, f := range exported { - require.NotContains(t, receivedKeys, f.ID) - receivedKeys[f.ID] = struct{}{} - switch f.ID { - case key1: - assert.EqualValues(t, 4, f.Metrics.Packets) - assert.EqualValues(t, 66, f.Metrics.Bytes) - assert.False(t, f.Duplicate) - assert.Equal(t, "foo", f.Interface) - key1Flows = append(key1Flows, f) - case key1Dupe: - assert.EqualValues(t, 4, f.Metrics.Packets) - assert.EqualValues(t, 66, f.Metrics.Bytes) - assert.False(t, f.Duplicate) - assert.Equal(t, "bar", f.Interface) - key1Flows = append(key1Flows, f) - case key2: - assert.EqualValues(t, 7, f.Metrics.Packets) - assert.EqualValues(t, 33, f.Metrics.Bytes) - assert.False(t, f.Duplicate) - } +func TestFlowsAgent_Decoration(t *testing.T) { + now := uint64(monotime.Now()) + metrics1 := model.BpfFlowContent{ + BpfFlowMetrics: &ebpf.BpfFlowMetrics{Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000, + IfIndexFirstSeen: 1, + DirectionFirstSeen: 1, + }, + AdditionalMetrics: &ebpf.BpfAdditionalMetrics{NbObservedIntf: 1, + ObservedIntf: [model.MaxObservedInterfaces]ebpf.BpfObservedIntfT{{IfIndex: 3, Direction: 0}}, + }, } - assert.Lenf(t, key1Flows, 1, "only one flow should have been forwarded: %#v", key1Flows) -} - -func TestFlowsAgent_DeduplicationJustMark(t *testing.T) { - export := testAgent(t, &Config{ - CacheActiveTimeout: 10 * time.Millisecond, - CacheMaxFlows: 100, - DeduperJustMark: true, - Deduper: DeduperFirstCome, - }) - - exported := export.Get(t, timeout) - receivedKeys := map[ebpf.BpfFlowId]struct{}{} - - assert.Len(t, exported, 3) - duplicates := 0 - for _, f := range exported { - require.NotContains(t, receivedKeys, f.ID) - receivedKeys[f.ID] = struct{}{} - switch f.ID { - case key1: - assert.EqualValues(t, 4, f.Metrics.Packets) - assert.EqualValues(t, 66, f.Metrics.Bytes) - if f.Duplicate { - duplicates++ - } - assert.Equal(t, "foo", f.Interface) - case key1Dupe: - assert.EqualValues(t, 4, f.Metrics.Packets) - assert.EqualValues(t, 66, f.Metrics.Bytes) - if f.Duplicate { - duplicates++ - } - assert.Equal(t, "bar", f.Interface) - case key2: - assert.EqualValues(t, 7, f.Metrics.Packets) - assert.EqualValues(t, 33, f.Metrics.Bytes) - assert.False(t, f.Duplicate) - } + metrics2 := model.BpfFlowContent{ + BpfFlowMetrics: &ebpf.BpfFlowMetrics{Packets: 7, Bytes: 33, StartMonoTimeTs: now, EndMonoTimeTs: now + 2_000_000_000, + IfIndexFirstSeen: 4, + DirectionFirstSeen: 0, + }, + AdditionalMetrics: &ebpf.BpfAdditionalMetrics{NbObservedIntf: 2, + ObservedIntf: [model.MaxObservedInterfaces]ebpf.BpfObservedIntfT{{IfIndex: 1, Direction: 1}, {IfIndex: 99, Direction: 1}}, + }, } - assert.Equalf(t, 1, duplicates, "exported flows should have only one duplicate: %#v", exported) -} - -func TestFlowsAgent_Deduplication_None(t *testing.T) { - export := testAgent(t, &Config{ - CacheActiveTimeout: 10 * time.Millisecond, - CacheMaxFlows: 100, - Deduper: DeduperNone, - }) - - exported := export.Get(t, timeout) - assert.Len(t, exported, 3) - receivedKeys := map[ebpf.BpfFlowId]struct{}{} - - var key1Flows []*model.Record - for _, f := range exported { - require.NotContains(t, receivedKeys, f.ID) - receivedKeys[f.ID] = struct{}{} - switch f.ID { - case key1: - assert.EqualValues(t, 4, f.Metrics.Packets) - assert.EqualValues(t, 66, f.Metrics.Bytes) - assert.False(t, f.Duplicate) - assert.Equal(t, "foo", f.Interface) - key1Flows = append(key1Flows, f) - case key1Dupe: - assert.EqualValues(t, 4, f.Metrics.Packets) - assert.EqualValues(t, 66, f.Metrics.Bytes) - assert.False(t, f.Duplicate) - assert.Equal(t, "bar", f.Interface) - key1Flows = append(key1Flows, f) - case key2: - assert.EqualValues(t, 7, f.Metrics.Packets) - assert.EqualValues(t, 33, f.Metrics.Bytes) - assert.False(t, f.Duplicate) - } + flows := map[ebpf.BpfFlowId]model.BpfFlowContent{ + key1: metrics1, + key2: metrics2, } - assert.Lenf(t, key1Flows, 2, "both key1 flows should have been forwarded: %#v", key1Flows) -} -func TestFlowsAgent_Decoration(t *testing.T) { - export := testAgent(t, &Config{ - CacheActiveTimeout: 10 * time.Millisecond, - CacheMaxFlows: 100, - }) - - exported := export.Get(t, timeout) - assert.Len(t, exported, 3) + exported := testAgent(t, flows) + assert.Len(t, exported, 2) // Tests that the decoration stage has been properly executed. It should // add the interface name and the agent IP for _, f := range exported { assert.Equal(t, agentIP, f.AgentIP.String()) switch f.ID { - case key1, key2: - assert.Equal(t, "foo", f.Interface) + case key1: + assert.Len(t, f.Interfaces, 2) + assert.Equal(t, "eth0", f.Interfaces[0].Interface) + assert.Equal(t, "foo", f.Interfaces[1].Interface) + case key2: + assert.Len(t, f.Interfaces, 3) + assert.Equal(t, "bar", f.Interfaces[0].Interface) + assert.Equal(t, "eth0", f.Interfaces[1].Interface) + assert.Equal(t, "unknown", f.Interfaces[2].Interface) default: - assert.Equal(t, "bar", f.Interface) + assert.Failf(t, "unexpected key", "key: %v", f.ID) } } } -func testAgent(t *testing.T, cfg *Config) *test.ExporterFake { +func testAgent(t *testing.T, flows map[ebpf.BpfFlowId]model.BpfFlowContent) []*model.Record { ebpfTracer := test.NewTracerFake() export := test.NewExporterFake() - agent, err := flowsAgent(cfg, + agent, err := flowsAgent( + &Config{ + CacheActiveTimeout: 10 * time.Millisecond, + CacheMaxFlows: 100, + }, metrics.NewMetrics(&metrics.Settings{}), test.SliceInformerFake{ + {Name: "eth0", Index: 1}, {Name: "foo", Index: 3}, {Name: "bar", Index: 4}, }, ebpfTracer, export.Export, @@ -220,25 +127,6 @@ func testAgent(t *testing.T, cfg *Config) *test.ExporterFake { require.Equal(t, StatusStarted, agent.status) }) - now := uint64(monotime.Now()) - key1Metrics := model.BpfFlowContents{ - { - BpfFlowMetrics: &ebpf.BpfFlowMetrics{Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000}, - }, - { - BpfFlowMetrics: &ebpf.BpfFlowMetrics{Packets: 1, Bytes: 22, StartMonoTimeTs: now, EndMonoTimeTs: now + 3000}, - }, - } - key2Metrics := model.BpfFlowContents{ - { - BpfFlowMetrics: &ebpf.BpfFlowMetrics{Packets: 7, Bytes: 33, StartMonoTimeTs: now, EndMonoTimeTs: now + 2_000_000_000}, - }, - } - acc := key1Metrics.Accumulate() - ebpfTracer.AppendLookupResults(map[ebpf.BpfFlowId]model.BpfFlowContent{ - key1: acc, - key1Dupe: acc, - key2: key2Metrics.Accumulate(), - }) - return export + ebpfTracer.AppendLookupResults(flows) + return export.Get(t, timeout) } diff --git a/pkg/agent/config.go b/pkg/agent/config.go index 9968b01a5..1eebf5b6a 100644 --- a/pkg/agent/config.go +++ b/pkg/agent/config.go @@ -11,8 +11,6 @@ var clog = logrus.WithField("component", "config") const ( ListenPoll = "poll" ListenWatch = "watch" - DeduperNone = "none" - DeduperFirstCome = "firstCome" DirectionIngress = "ingress" DirectionEgress = "egress" DirectionBoth = "both" @@ -131,21 +129,6 @@ type Config struct { // CacheActiveTimeout specifies the maximum duration that flows are kept in the accounting // cache before being flushed for its later export CacheActiveTimeout time.Duration `env:"CACHE_ACTIVE_TIMEOUT" envDefault:"5s"` - // Deduper specifies the deduper type. Accepted values are "none" (disabled) and "firstCome". - // When enabled, it will detect duplicate flows (flows that have been detected e.g. through - // both the physical and a virtual interface). - // "firstCome" will forward only flows from the first interface the flows are received from. - Deduper string `env:"DEDUPER" envDefault:"none"` - // DeduperFCExpiry specifies the expiry duration of the flows "firstCome" deduplicator. After - // a flow hasn't been received for that expiry time, the deduplicator forgets it. That means - // that a flow from a connection that has been inactive during that period could be forwarded - // again from a different interface. - // If the value is not set, it will default to 2 * CacheActiveTimeout - DeduperFCExpiry time.Duration `env:"DEDUPER_FC_EXPIRY"` - // DeduperJustMark will just mark duplicates (boolean field) instead of dropping them. - DeduperJustMark bool `env:"DEDUPER_JUST_MARK" envDefault:"false"` - // DeduperMerge will merge duplicated flows and generate list of interfaces and direction pairs - DeduperMerge bool `env:"DEDUPER_MERGE" envDefault:"true"` // Direction allows selecting which flows to trace according to its direction. Accepted values // are "ingress", "egress" or "both" (default). Direction string `env:"DIRECTION" envDefault:"both"` diff --git a/pkg/agent/packets_agent.go b/pkg/agent/packets_agent.go index 5c76bc9ed..3b714d150 100644 --- a/pkg/agent/packets_agent.go +++ b/pkg/agent/packets_agent.go @@ -34,7 +34,7 @@ type Packets struct { exporter node.TerminalFunc[[]*model.PacketRecord] // elements used to decorate flows with extra information - interfaceNamer flow.InterfaceNamer + interfaceNamer model.InterfaceNamer agentIP net.IP status Status diff --git a/pkg/decode/decode_protobuf.go b/pkg/decode/decode_protobuf.go index f98e6b340..7f9fe6170 100644 --- a/pkg/decode/decode_protobuf.go +++ b/pkg/decode/decode_protobuf.go @@ -67,9 +67,14 @@ func RecordToMap(fr *model.Record) config.GenericMap { "AgentIP": fr.AgentIP.String(), } - if fr.Duplicate { - out["Duplicate"] = true + var directions []uint8 + var interfaces []string + for _, intf := range fr.Interfaces { + directions = append(directions, intf.Direction) + interfaces = append(interfaces, intf.Interface) } + out["IfDirections"] = directions + out["Interfaces"] = interfaces if fr.Metrics.Bytes != 0 { out["Bytes"] = fr.Metrics.Bytes @@ -82,22 +87,6 @@ func RecordToMap(fr *model.Record) config.GenericMap { if fr.Metrics.Sampling != 0 { out["Sampling"] = fr.Metrics.Sampling } - var interfaces []string - var directions []int - if len(fr.DupList) != 0 { - for _, m := range fr.DupList { - for key, value := range m { - interfaces = append(interfaces, key) - directions = append(directions, int(model.Direction(value))) - } - } - } else { - interfaces = append(interfaces, fr.Interface) - directions = append(directions, int(fr.ID.Direction)) - } - out["Interfaces"] = interfaces - out["IfDirections"] = directions - if fr.Metrics.EthProtocol == uint16(ethernet.EtherTypeIPv4) || fr.Metrics.EthProtocol == uint16(ethernet.EtherTypeIPv6) { out["SrcAddr"] = model.IP(fr.ID.SrcIp).String() out["DstAddr"] = model.IP(fr.ID.DstIp).String() diff --git a/pkg/decode/decode_protobuf_test.go b/pkg/decode/decode_protobuf_test.go index 50c1ef3bd..061cc1523 100644 --- a/pkg/decode/decode_protobuf_test.go +++ b/pkg/decode/decode_protobuf_test.go @@ -18,7 +18,6 @@ func TestPBFlowToMap(t *testing.T) { someTime := time.Now() var someDuration time.Duration = 10000000 // 10ms flow := &pbflow.Record{ - Interface: "eth0", DupList: []*pbflow.DupMapEntry{ { Interface: "5e6e92caa1d51cf", @@ -32,10 +31,8 @@ func TestPBFlowToMap(t *testing.T) { EthProtocol: 2048, Bytes: 456, Packets: 123, - Direction: pbflow.Direction_EGRESS, TimeFlowStart: timestamppb.New(someTime), TimeFlowEnd: timestamppb.New(someTime), - Duplicate: true, Network: &pbflow.Network{ SrcAddr: &pbflow.IP{ IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x01020304}, @@ -105,7 +102,7 @@ func TestPBFlowToMap(t *testing.T) { assert.NotZero(t, out["TimeReceived"]) delete(out, "TimeReceived") assert.Equal(t, config.GenericMap{ - "IfDirections": []int{0, 1}, + "IfDirections": []uint8{0, 1}, "Bytes": uint64(456), "SrcAddr": "1.2.3.4", "DstAddr": "5.6.7.8", @@ -114,7 +111,6 @@ func TestPBFlowToMap(t *testing.T) { "SrcMac": "01:02:03:04:05:06", "SrcPort": uint16(23000), "DstPort": uint16(443), - "Duplicate": true, "Etype": uint16(2048), "Packets": uint32(123), "Proto": uint8(6), @@ -156,5 +152,4 @@ func TestPBFlowToMap(t *testing.T) { "ZoneId": uint16(100), "XlatIcmpId": uint8(0), }, out) - } diff --git a/pkg/ebpf/bpf_arm64_bpfel.go b/pkg/ebpf/bpf_arm64_bpfel.go index 44dc2de78..968a8e117 100644 --- a/pkg/ebpf/bpf_arm64_bpfel.go +++ b/pkg/ebpf/bpf_arm64_bpfel.go @@ -18,11 +18,12 @@ type BpfAdditionalMetrics struct { DnsRecord BpfDnsRecordT PktDrops BpfPktDropsT FlowRtt uint64 - NetworkEventsIdx uint8 NetworkEvents [4][8]uint8 - _ [1]byte - EthProtocol uint16 TranslatedFlow BpfTranslatedFlowT + ObservedIntf [4]BpfObservedIntfT + EthProtocol uint16 + NetworkEventsIdx uint8 + NbObservedIntf uint8 _ [4]byte } @@ -45,12 +46,11 @@ type BpfDnsFlowId struct { } type BpfDnsRecordT struct { + Latency uint64 Id uint16 Flags uint16 - _ [4]byte - Latency uint64 Errno uint8 - _ [7]byte + _ [3]byte } type BpfFilterActionT uint32 @@ -93,35 +93,34 @@ type BpfFilterValueT struct { type BpfFlowId BpfFlowIdT type BpfFlowIdT struct { - Direction uint8 SrcIp [16]uint8 DstIp [16]uint8 - _ [1]byte SrcPort uint16 DstPort uint16 TransportProtocol uint8 IcmpType uint8 IcmpCode uint8 - _ [3]byte - IfIndex uint32 + _ [1]byte } type BpfFlowMetrics BpfFlowMetricsT type BpfFlowMetricsT struct { - Lock struct{ Val uint32 } - EthProtocol uint16 - SrcMac [6]uint8 - DstMac [6]uint8 - _ [2]byte - Packets uint32 - Bytes uint64 - StartMonoTimeTs uint64 - EndMonoTimeTs uint64 - Flags uint16 - Errno uint8 - Dscp uint8 - Sampling uint32 + StartMonoTimeTs uint64 + EndMonoTimeTs uint64 + Bytes uint64 + Packets uint32 + EthProtocol uint16 + Flags uint16 + SrcMac [6]uint8 + DstMac [6]uint8 + IfIndexFirstSeen uint32 + Lock struct{ Val uint32 } + Sampling uint32 + DirectionFirstSeen uint8 + Errno uint8 + Dscp uint8 + _ [5]byte } type BpfFlowRecordT struct { @@ -141,17 +140,23 @@ const ( BpfGlobalCountersKeyTNETWORK_EVENTS_ERR_GROUPID_MISMATCH BpfGlobalCountersKeyT = 6 BpfGlobalCountersKeyTNETWORK_EVENTS_ERR_UPDATE_MAP_FLOWS BpfGlobalCountersKeyT = 7 BpfGlobalCountersKeyTNETWORK_EVENTS_GOOD BpfGlobalCountersKeyT = 8 - BpfGlobalCountersKeyTMAX_COUNTERS BpfGlobalCountersKeyT = 9 + BpfGlobalCountersKeyTOBSERVED_INTF_MISSED BpfGlobalCountersKeyT = 9 + BpfGlobalCountersKeyTMAX_COUNTERS BpfGlobalCountersKeyT = 10 ) +type BpfObservedIntfT struct { + Direction uint8 + _ [3]byte + IfIndex uint32 +} + type BpfPktDropsT struct { - Packets uint32 - _ [4]byte Bytes uint64 + Packets uint32 + LatestDropCause uint32 LatestFlags uint16 LatestState uint8 - _ [1]byte - LatestDropCause uint32 + _ [5]byte } type BpfTcpFlagsT uint32 diff --git a/pkg/ebpf/bpf_arm64_bpfel.o b/pkg/ebpf/bpf_arm64_bpfel.o index f11c52f93..025691ca1 100644 Binary files a/pkg/ebpf/bpf_arm64_bpfel.o and b/pkg/ebpf/bpf_arm64_bpfel.o differ diff --git a/pkg/ebpf/bpf_powerpc_bpfel.go b/pkg/ebpf/bpf_powerpc_bpfel.go index b6381dd1c..99bad4e85 100644 --- a/pkg/ebpf/bpf_powerpc_bpfel.go +++ b/pkg/ebpf/bpf_powerpc_bpfel.go @@ -18,11 +18,12 @@ type BpfAdditionalMetrics struct { DnsRecord BpfDnsRecordT PktDrops BpfPktDropsT FlowRtt uint64 - NetworkEventsIdx uint8 NetworkEvents [4][8]uint8 - _ [1]byte - EthProtocol uint16 TranslatedFlow BpfTranslatedFlowT + ObservedIntf [4]BpfObservedIntfT + EthProtocol uint16 + NetworkEventsIdx uint8 + NbObservedIntf uint8 _ [4]byte } @@ -45,12 +46,11 @@ type BpfDnsFlowId struct { } type BpfDnsRecordT struct { + Latency uint64 Id uint16 Flags uint16 - _ [4]byte - Latency uint64 Errno uint8 - _ [7]byte + _ [3]byte } type BpfFilterActionT uint32 @@ -93,35 +93,34 @@ type BpfFilterValueT struct { type BpfFlowId BpfFlowIdT type BpfFlowIdT struct { - Direction uint8 SrcIp [16]uint8 DstIp [16]uint8 - _ [1]byte SrcPort uint16 DstPort uint16 TransportProtocol uint8 IcmpType uint8 IcmpCode uint8 - _ [3]byte - IfIndex uint32 + _ [1]byte } type BpfFlowMetrics BpfFlowMetricsT type BpfFlowMetricsT struct { - Lock struct{ Val uint32 } - EthProtocol uint16 - SrcMac [6]uint8 - DstMac [6]uint8 - _ [2]byte - Packets uint32 - Bytes uint64 - StartMonoTimeTs uint64 - EndMonoTimeTs uint64 - Flags uint16 - Errno uint8 - Dscp uint8 - Sampling uint32 + StartMonoTimeTs uint64 + EndMonoTimeTs uint64 + Bytes uint64 + Packets uint32 + EthProtocol uint16 + Flags uint16 + SrcMac [6]uint8 + DstMac [6]uint8 + IfIndexFirstSeen uint32 + Lock struct{ Val uint32 } + Sampling uint32 + DirectionFirstSeen uint8 + Errno uint8 + Dscp uint8 + _ [5]byte } type BpfFlowRecordT struct { @@ -141,17 +140,23 @@ const ( BpfGlobalCountersKeyTNETWORK_EVENTS_ERR_GROUPID_MISMATCH BpfGlobalCountersKeyT = 6 BpfGlobalCountersKeyTNETWORK_EVENTS_ERR_UPDATE_MAP_FLOWS BpfGlobalCountersKeyT = 7 BpfGlobalCountersKeyTNETWORK_EVENTS_GOOD BpfGlobalCountersKeyT = 8 - BpfGlobalCountersKeyTMAX_COUNTERS BpfGlobalCountersKeyT = 9 + BpfGlobalCountersKeyTOBSERVED_INTF_MISSED BpfGlobalCountersKeyT = 9 + BpfGlobalCountersKeyTMAX_COUNTERS BpfGlobalCountersKeyT = 10 ) +type BpfObservedIntfT struct { + Direction uint8 + _ [3]byte + IfIndex uint32 +} + type BpfPktDropsT struct { - Packets uint32 - _ [4]byte Bytes uint64 + Packets uint32 + LatestDropCause uint32 LatestFlags uint16 LatestState uint8 - _ [1]byte - LatestDropCause uint32 + _ [5]byte } type BpfTcpFlagsT uint32 diff --git a/pkg/ebpf/bpf_powerpc_bpfel.o b/pkg/ebpf/bpf_powerpc_bpfel.o index 1d54f4b45..41bac3b88 100644 Binary files a/pkg/ebpf/bpf_powerpc_bpfel.o and b/pkg/ebpf/bpf_powerpc_bpfel.o differ diff --git a/pkg/ebpf/bpf_s390_bpfeb.go b/pkg/ebpf/bpf_s390_bpfeb.go index c13de86c1..08e9d2e22 100644 --- a/pkg/ebpf/bpf_s390_bpfeb.go +++ b/pkg/ebpf/bpf_s390_bpfeb.go @@ -18,11 +18,12 @@ type BpfAdditionalMetrics struct { DnsRecord BpfDnsRecordT PktDrops BpfPktDropsT FlowRtt uint64 - NetworkEventsIdx uint8 NetworkEvents [4][8]uint8 - _ [1]byte - EthProtocol uint16 TranslatedFlow BpfTranslatedFlowT + ObservedIntf [4]BpfObservedIntfT + EthProtocol uint16 + NetworkEventsIdx uint8 + NbObservedIntf uint8 _ [4]byte } @@ -45,12 +46,11 @@ type BpfDnsFlowId struct { } type BpfDnsRecordT struct { + Latency uint64 Id uint16 Flags uint16 - _ [4]byte - Latency uint64 Errno uint8 - _ [7]byte + _ [3]byte } type BpfFilterActionT uint32 @@ -93,35 +93,34 @@ type BpfFilterValueT struct { type BpfFlowId BpfFlowIdT type BpfFlowIdT struct { - Direction uint8 SrcIp [16]uint8 DstIp [16]uint8 - _ [1]byte SrcPort uint16 DstPort uint16 TransportProtocol uint8 IcmpType uint8 IcmpCode uint8 - _ [3]byte - IfIndex uint32 + _ [1]byte } type BpfFlowMetrics BpfFlowMetricsT type BpfFlowMetricsT struct { - Lock struct{ Val uint32 } - EthProtocol uint16 - SrcMac [6]uint8 - DstMac [6]uint8 - _ [2]byte - Packets uint32 - Bytes uint64 - StartMonoTimeTs uint64 - EndMonoTimeTs uint64 - Flags uint16 - Errno uint8 - Dscp uint8 - Sampling uint32 + StartMonoTimeTs uint64 + EndMonoTimeTs uint64 + Bytes uint64 + Packets uint32 + EthProtocol uint16 + Flags uint16 + SrcMac [6]uint8 + DstMac [6]uint8 + IfIndexFirstSeen uint32 + Lock struct{ Val uint32 } + Sampling uint32 + DirectionFirstSeen uint8 + Errno uint8 + Dscp uint8 + _ [5]byte } type BpfFlowRecordT struct { @@ -141,17 +140,23 @@ const ( BpfGlobalCountersKeyTNETWORK_EVENTS_ERR_GROUPID_MISMATCH BpfGlobalCountersKeyT = 6 BpfGlobalCountersKeyTNETWORK_EVENTS_ERR_UPDATE_MAP_FLOWS BpfGlobalCountersKeyT = 7 BpfGlobalCountersKeyTNETWORK_EVENTS_GOOD BpfGlobalCountersKeyT = 8 - BpfGlobalCountersKeyTMAX_COUNTERS BpfGlobalCountersKeyT = 9 + BpfGlobalCountersKeyTOBSERVED_INTF_MISSED BpfGlobalCountersKeyT = 9 + BpfGlobalCountersKeyTMAX_COUNTERS BpfGlobalCountersKeyT = 10 ) +type BpfObservedIntfT struct { + Direction uint8 + _ [3]byte + IfIndex uint32 +} + type BpfPktDropsT struct { - Packets uint32 - _ [4]byte Bytes uint64 + Packets uint32 + LatestDropCause uint32 LatestFlags uint16 LatestState uint8 - _ [1]byte - LatestDropCause uint32 + _ [5]byte } type BpfTcpFlagsT uint32 diff --git a/pkg/ebpf/bpf_s390_bpfeb.o b/pkg/ebpf/bpf_s390_bpfeb.o index 1c4e40035..860a0ff52 100644 Binary files a/pkg/ebpf/bpf_s390_bpfeb.o and b/pkg/ebpf/bpf_s390_bpfeb.o differ diff --git a/pkg/ebpf/bpf_x86_bpfel.go b/pkg/ebpf/bpf_x86_bpfel.go index c81facb65..b8f401958 100644 --- a/pkg/ebpf/bpf_x86_bpfel.go +++ b/pkg/ebpf/bpf_x86_bpfel.go @@ -18,11 +18,12 @@ type BpfAdditionalMetrics struct { DnsRecord BpfDnsRecordT PktDrops BpfPktDropsT FlowRtt uint64 - NetworkEventsIdx uint8 NetworkEvents [4][8]uint8 - _ [1]byte - EthProtocol uint16 TranslatedFlow BpfTranslatedFlowT + ObservedIntf [4]BpfObservedIntfT + EthProtocol uint16 + NetworkEventsIdx uint8 + NbObservedIntf uint8 _ [4]byte } @@ -45,12 +46,11 @@ type BpfDnsFlowId struct { } type BpfDnsRecordT struct { + Latency uint64 Id uint16 Flags uint16 - _ [4]byte - Latency uint64 Errno uint8 - _ [7]byte + _ [3]byte } type BpfFilterActionT uint32 @@ -93,35 +93,34 @@ type BpfFilterValueT struct { type BpfFlowId BpfFlowIdT type BpfFlowIdT struct { - Direction uint8 SrcIp [16]uint8 DstIp [16]uint8 - _ [1]byte SrcPort uint16 DstPort uint16 TransportProtocol uint8 IcmpType uint8 IcmpCode uint8 - _ [3]byte - IfIndex uint32 + _ [1]byte } type BpfFlowMetrics BpfFlowMetricsT type BpfFlowMetricsT struct { - Lock struct{ Val uint32 } - EthProtocol uint16 - SrcMac [6]uint8 - DstMac [6]uint8 - _ [2]byte - Packets uint32 - Bytes uint64 - StartMonoTimeTs uint64 - EndMonoTimeTs uint64 - Flags uint16 - Errno uint8 - Dscp uint8 - Sampling uint32 + StartMonoTimeTs uint64 + EndMonoTimeTs uint64 + Bytes uint64 + Packets uint32 + EthProtocol uint16 + Flags uint16 + SrcMac [6]uint8 + DstMac [6]uint8 + IfIndexFirstSeen uint32 + Lock struct{ Val uint32 } + Sampling uint32 + DirectionFirstSeen uint8 + Errno uint8 + Dscp uint8 + _ [5]byte } type BpfFlowRecordT struct { @@ -141,17 +140,23 @@ const ( BpfGlobalCountersKeyTNETWORK_EVENTS_ERR_GROUPID_MISMATCH BpfGlobalCountersKeyT = 6 BpfGlobalCountersKeyTNETWORK_EVENTS_ERR_UPDATE_MAP_FLOWS BpfGlobalCountersKeyT = 7 BpfGlobalCountersKeyTNETWORK_EVENTS_GOOD BpfGlobalCountersKeyT = 8 - BpfGlobalCountersKeyTMAX_COUNTERS BpfGlobalCountersKeyT = 9 + BpfGlobalCountersKeyTOBSERVED_INTF_MISSED BpfGlobalCountersKeyT = 9 + BpfGlobalCountersKeyTMAX_COUNTERS BpfGlobalCountersKeyT = 10 ) +type BpfObservedIntfT struct { + Direction uint8 + _ [3]byte + IfIndex uint32 +} + type BpfPktDropsT struct { - Packets uint32 - _ [4]byte Bytes uint64 + Packets uint32 + LatestDropCause uint32 LatestFlags uint16 LatestState uint8 - _ [1]byte - LatestDropCause uint32 + _ [5]byte } type BpfTcpFlagsT uint32 diff --git a/pkg/ebpf/bpf_x86_bpfel.o b/pkg/ebpf/bpf_x86_bpfel.o index 6c80da7da..2faa53c1a 100644 Binary files a/pkg/ebpf/bpf_x86_bpfel.o and b/pkg/ebpf/bpf_x86_bpfel.o differ diff --git a/pkg/ebpf/gen.go b/pkg/ebpf/gen.go index 3ec45b49c..a95d9ba48 100644 --- a/pkg/ebpf/gen.go +++ b/pkg/ebpf/gen.go @@ -1,4 +1,4 @@ package ebpf // $BPF_CLANG and $BPF_CFLAGS are set by the Makefile. -//go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS -target amd64,arm64,ppc64le,s390x -type flow_metrics_t -type flow_id_t -type flow_record_t -type pkt_drops_t -type dns_record_t -type global_counters_key_t -type direction_t -type filter_action_t -type tcp_flags_t -type translated_flow_t Bpf ../../bpf/flows.c -- -I../../bpf/headers +//go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS -target amd64,arm64,ppc64le,s390x -type flow_metrics_t -type flow_id_t -type flow_record_t -type pkt_drops_t -type dns_record_t -type global_counters_key_t -type direction_t -type filter_action_t -type tcp_flags_t -type translated_flow_t -type observed_intf_t Bpf ../../bpf/flows.c -- -I../../bpf/headers diff --git a/pkg/exporter/converters_test.go b/pkg/exporter/converters_test.go index 13cafb949..1fad579b0 100644 --- a/pkg/exporter/converters_test.go +++ b/pkg/exporter/converters_test.go @@ -32,7 +32,6 @@ func TestConversions(t *testing.T) { name: "TCP record", flow: &model.Record{ ID: ebpf.BpfFlowId{ - Direction: model.DirectionEgress, SrcIp: model.IPAddr{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x06, 0x07, 0x08, 0x09}, DstIp: model.IPAddr{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x0a, 0x0b, 0x0c, 0x0d}, SrcPort: 23000, @@ -56,7 +55,7 @@ func TestConversions(t *testing.T) { }, }, }, - Interface: "eth0", + Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)}, TimeFlowStart: someTime, TimeFlowEnd: someTime, AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d), @@ -86,7 +85,6 @@ func TestConversions(t *testing.T) { name: "UDP record", flow: &model.Record{ ID: ebpf.BpfFlowId{ - Direction: model.DirectionEgress, SrcIp: model.IPAddr{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x06, 0x07, 0x08, 0x09}, DstIp: model.IPAddr{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x0a, 0x0b, 0x0c, 0x0d}, SrcPort: 23000, @@ -104,7 +102,7 @@ func TestConversions(t *testing.T) { Sampling: 2, }, }, - Interface: "eth0", + Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)}, TimeFlowStart: someTime, TimeFlowEnd: someTime, AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d), @@ -133,7 +131,6 @@ func TestConversions(t *testing.T) { name: "ICMPv4 record", flow: &model.Record{ ID: ebpf.BpfFlowId{ - Direction: model.DirectionEgress, SrcIp: model.IPAddr{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x06, 0x07, 0x08, 0x09}, DstIp: model.IPAddr{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x0a, 0x0b, 0x0c, 0x0d}, TransportProtocol: 1, @@ -150,7 +147,7 @@ func TestConversions(t *testing.T) { Dscp: 64, }, }, - Interface: "eth0", + Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)}, TimeFlowStart: someTime, TimeFlowEnd: someTime, AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d), @@ -178,7 +175,6 @@ func TestConversions(t *testing.T) { name: "ICMPv6 record", flow: &model.Record{ ID: ebpf.BpfFlowId{ - Direction: model.DirectionEgress, SrcIp: model.IPAddr{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, DstIp: model.IPAddr{11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26}, TransportProtocol: 58, @@ -195,7 +191,7 @@ func TestConversions(t *testing.T) { Dscp: 64, }, }, - Interface: "eth0", + Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)}, TimeFlowStart: someTime, TimeFlowEnd: someTime, AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d), @@ -223,7 +219,6 @@ func TestConversions(t *testing.T) { name: "ARP layer2", flow: &model.Record{ ID: ebpf.BpfFlowId{ - Direction: model.DirectionEgress, SrcIp: model.IPAddr{}, DstIp: model.IPAddr{}, SrcPort: 0, @@ -241,7 +236,7 @@ func TestConversions(t *testing.T) { Packets: 128, }, }, - Interface: "eth0", + Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)}, TimeFlowStart: someTime, TimeFlowEnd: someTime, AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d), @@ -263,7 +258,6 @@ func TestConversions(t *testing.T) { name: "L2 drops", flow: &model.Record{ ID: ebpf.BpfFlowId{ - Direction: model.DirectionEgress, SrcIp: model.IPAddr{}, DstIp: model.IPAddr{}, SrcPort: 0, @@ -288,7 +282,7 @@ func TestConversions(t *testing.T) { }, }, }, - Interface: "eth0", + Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)}, TimeFlowStart: someTime, TimeFlowEnd: someTime, AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d), @@ -315,7 +309,6 @@ func TestConversions(t *testing.T) { name: "TCP + drop + DNS + RTT record", flow: &model.Record{ ID: ebpf.BpfFlowId{ - Direction: model.DirectionEgress, SrcIp: model.IPAddr{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x06, 0x07, 0x08, 0x09}, DstIp: model.IPAddr{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x0a, 0x0b, 0x0c, 0x0d}, SrcPort: 23000, @@ -348,7 +341,7 @@ func TestConversions(t *testing.T) { }, }, }, - Interface: "eth0", + Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)}, TimeFlowStart: someTime, TimeFlowEnd: someTime, AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d), @@ -389,7 +382,6 @@ func TestConversions(t *testing.T) { name: "Multiple interfaces record", flow: &model.Record{ ID: ebpf.BpfFlowId{ - Direction: model.DirectionEgress, SrcIp: model.IPAddr{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x06, 0x07, 0x08, 0x09}, DstIp: model.IPAddr{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x0a, 0x0b, 0x0c, 0x0d}, SrcPort: 23000, @@ -412,9 +404,9 @@ func TestConversions(t *testing.T) { }, }, }, - DupList: []map[string]uint8{ - {"5e6e92caa1d51cf": 0}, - {"eth0": 1}, + Interfaces: []model.IntfDir{ + model.NewIntfDir("5e6e92caa1d51cf", model.DirectionIngress), + model.NewIntfDir("eth0", model.DirectionEgress), }, TimeFlowStart: someTime, TimeFlowEnd: someTime, diff --git a/pkg/exporter/grpc_proto_test.go b/pkg/exporter/grpc_proto_test.go index ca822abd6..39f0fe42e 100644 --- a/pkg/exporter/grpc_proto_test.go +++ b/pkg/exporter/grpc_proto_test.go @@ -123,7 +123,7 @@ func TestGRPCProto_SplitLargeMessages(t *testing.T) { for i := 0; i < 25000; i++ { input = append(input, &model.Record{Metrics: model.BpfFlowContent{BpfFlowMetrics: &ebpf.BpfFlowMetrics{ EthProtocol: model.IPv6Type, - }}, AgentIP: net.ParseIP("1111::1111"), Interface: "12345678"}) + }}, AgentIP: net.ParseIP("1111::1111"), Interfaces: []model.IntfDir{model.NewIntfDir("12345678", 0)}}) } flows <- input go exporter.ExportFlows(flows) diff --git a/pkg/exporter/ipfix.go b/pkg/exporter/ipfix.go index 379401ccf..60deb2f79 100644 --- a/pkg/exporter/ipfix.go +++ b/pkg/exporter/ipfix.go @@ -287,7 +287,7 @@ func setIERecordValue(record *model.Record, ieValPtr *entities.InfoElementWithVa case "packetDeltaCount": ieVal.SetUnsigned64Value(uint64(record.Metrics.Packets)) case "interfaceName": - ieVal.SetStringValue(record.Interface) + ieVal.SetStringValue(record.Interfaces[0].Interface) } } func setIEValue(record *model.Record, ieValPtr *entities.InfoElementWithValue) { @@ -296,7 +296,7 @@ func setIEValue(record *model.Record, ieValPtr *entities.InfoElementWithValue) { case "ethernetType": ieVal.SetUnsigned16Value(record.Metrics.EthProtocol) case "flowDirection": - ieVal.SetUnsigned8Value(record.ID.Direction) + ieVal.SetUnsigned8Value(record.Interfaces[0].Direction) case "sourceMacAddress": ieVal.SetMacAddressValue(record.Metrics.SrcMac[:]) case "destinationMacAddress": diff --git a/pkg/exporter/kafka_proto_test.go b/pkg/exporter/kafka_proto_test.go index 846d9fbc4..9a0f38997 100644 --- a/pkg/exporter/kafka_proto_test.go +++ b/pkg/exporter/kafka_proto_test.go @@ -33,7 +33,6 @@ func TestProtoConversion(t *testing.T) { TimeFlowStart: time.Now().Add(-5 * time.Second), TimeFlowEnd: time.Now(), ID: ebpf.BpfFlowId{ - Direction: 1, SrcIp: model.IPAddrFromNetIP(net.ParseIP("192.1.2.3")), DstIp: model.IPAddrFromNetIP(net.ParseIP("127.3.2.1")), SrcPort: 4321, @@ -43,15 +42,16 @@ func TestProtoConversion(t *testing.T) { }, Metrics: model.BpfFlowContent{ BpfFlowMetrics: &ebpf.BpfFlowMetrics{ - EthProtocol: 3, - SrcMac: [...]byte{0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff}, - DstMac: [...]byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66}, - Bytes: 789, - Packets: 987, - Flags: uint16(1), + DirectionFirstSeen: 1, + EthProtocol: 3, + SrcMac: [...]byte{0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff}, + DstMac: [...]byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66}, + Bytes: 789, + Packets: 987, + Flags: uint16(1), }, }, - Interface: "veth0", + Interfaces: []model.IntfDir{model.NewIntfDir("veth0", 0), model.NewIntfDir("abcde", 1)}, } input <- []*model.Record{&record} @@ -62,10 +62,12 @@ func TestProtoConversion(t *testing.T) { var r pbflow.Record require.NoError(t, proto.Unmarshal(wc.messages[0].Value, &r)) assert.EqualValues(t, 3, r.EthProtocol) - for _, e := range r.DupList { - assert.EqualValues(t, 1, e.Direction) - assert.Equal(t, "veth0", e.Interface) - } + assert.EqualValues(t, 1, r.Direction) + assert.Len(t, r.DupList, 2) + assert.EqualValues(t, 0, r.DupList[0].Direction) + assert.Equal(t, "veth0", r.DupList[0].Interface) + assert.EqualValues(t, 1, r.DupList[1].Direction) + assert.Equal(t, "abcde", r.DupList[1].Interface) assert.EqualValues(t, uint64(0xaabbccddeeff), r.DataLink.SrcMac) assert.EqualValues(t, uint64(0x112233445566), r.DataLink.DstMac) assert.EqualValues(t, uint64(0xC0010203) /* 192.1.2.3 */, r.Network.SrcAddr.GetIpv4()) @@ -88,7 +90,6 @@ func TestIdenticalKeys(t *testing.T) { TimeFlowStart: time.Now().Add(-5 * time.Second), TimeFlowEnd: time.Now(), ID: ebpf.BpfFlowId{ - Direction: 1, SrcIp: model.IPAddrFromNetIP(net.ParseIP("192.1.2.3")), DstIp: model.IPAddrFromNetIP(net.ParseIP("127.3.2.1")), SrcPort: 4321, @@ -98,15 +99,16 @@ func TestIdenticalKeys(t *testing.T) { }, Metrics: model.BpfFlowContent{ BpfFlowMetrics: &ebpf.BpfFlowMetrics{ - EthProtocol: 3, - SrcMac: [...]byte{0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff}, - DstMac: [...]byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66}, - Bytes: 789, - Packets: 987, - Flags: uint16(1), + DirectionFirstSeen: 1, + EthProtocol: 3, + SrcMac: [...]byte{0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff}, + DstMac: [...]byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66}, + Bytes: 789, + Packets: 987, + Flags: uint16(1), }, }, - Interface: "veth0", + Interfaces: []model.IntfDir{model.NewIntfDir("veth0", 0), model.NewIntfDir("abcde", 1)}, } key1 := getFlowKey(&record) diff --git a/pkg/flow/account.go b/pkg/flow/account.go index f850f960d..9431b3b1b 100644 --- a/pkg/flow/account.go +++ b/pkg/flow/account.go @@ -96,7 +96,8 @@ func (c *Accounter) evict(entries map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics, evict monotonicNow := uint64(c.monoClock()) records := make([]*model.Record, 0, len(entries)) for key, metrics := range entries { - records = append(records, model.NewRecord(key, &model.BpfFlowContent{BpfFlowMetrics: metrics}, now, monotonicNow)) + flowContent := model.NewBpfFlowContent(*metrics) + records = append(records, model.NewRecord(key, &flowContent, now, monotonicNow)) } c.metrics.EvictionCounter.WithSourceAndReason("accounter", reason).Inc() c.metrics.EvictedFlowsCounter.WithSourceAndReason("accounter", reason).Add(float64(len(records))) diff --git a/pkg/flow/account_test.go b/pkg/flow/account_test.go index d91afad10..6bb9cec67 100644 --- a/pkg/flow/account_test.go +++ b/pkg/flow/account_test.go @@ -112,8 +112,8 @@ func TestEvict_MaxEntries(t *testing.T) { }, TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond), TimeFlowEnd: now.Add(-(1000 - 789) * time.Nanosecond), - DupList: make([]map[string]uint8, 0), NetworkMonitorEventsMD: make([]config.GenericMap, 0), + Interfaces: []model.IntfDir{model.NewIntfDir("[namer unset] 0", 0)}, }, k2: { ID: k2, @@ -124,8 +124,8 @@ func TestEvict_MaxEntries(t *testing.T) { }, TimeFlowStart: now.Add(-(1000 - 456) * time.Nanosecond), TimeFlowEnd: now.Add(-(1000 - 456) * time.Nanosecond), - DupList: make([]map[string]uint8, 0), NetworkMonitorEventsMD: make([]config.GenericMap, 0), + Interfaces: []model.IntfDir{model.NewIntfDir("[namer unset] 0", 0)}, }, }, received) } @@ -194,8 +194,8 @@ func TestEvict_Period(t *testing.T) { }, TimeFlowStart: now.Add(-1000 + 123), TimeFlowEnd: now.Add(-1000 + 789), - DupList: make([]map[string]uint8, 0), NetworkMonitorEventsMD: make([]config.GenericMap, 0), + Interfaces: []model.IntfDir{model.NewIntfDir("[namer unset] 0", 0)}, }, *records[0]) records = receiveTimeout(t, evictor) require.Len(t, records, 1) @@ -212,8 +212,8 @@ func TestEvict_Period(t *testing.T) { }, TimeFlowStart: now.Add(-1000 + 1123), TimeFlowEnd: now.Add(-1000 + 1456), - DupList: make([]map[string]uint8, 0), NetworkMonitorEventsMD: make([]config.GenericMap, 0), + Interfaces: []model.IntfDir{model.NewIntfDir("[namer unset] 0", 0)}, }, *records[0]) // no more flows are evicted diff --git a/pkg/flow/decorator.go b/pkg/flow/decorator.go deleted file mode 100644 index 75d90bdd3..000000000 --- a/pkg/flow/decorator.go +++ /dev/null @@ -1,24 +0,0 @@ -package flow - -import ( - "net" - - "github.com/netobserv/netobserv-ebpf-agent/pkg/model" -) - -type InterfaceNamer func(ifIndex int) string - -// Decorate adds to the flows extra metadata fields that are not directly fetched by eBPF: -// - The interface name (corresponding to the interface index in the flow). -// - The IP address of the agent host. -func Decorate(agentIP net.IP, ifaceNamer InterfaceNamer) func(in <-chan []*model.Record, out chan<- []*model.Record) { - return func(in <-chan []*model.Record, out chan<- []*model.Record) { - for flows := range in { - for _, flow := range flows { - flow.Interface = ifaceNamer(int(flow.ID.IfIndex)) - flow.AgentIP = agentIP - } - out <- flows - } - } -} diff --git a/pkg/flow/deduper.go b/pkg/flow/deduper.go deleted file mode 100644 index 06f7b07f5..000000000 --- a/pkg/flow/deduper.go +++ /dev/null @@ -1,177 +0,0 @@ -package flow - -import ( - "container/list" - "reflect" - "time" - - "github.com/sirupsen/logrus" - - "github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf" - "github.com/netobserv/netobserv-ebpf-agent/pkg/metrics" - "github.com/netobserv/netobserv-ebpf-agent/pkg/model" -) - -var dlog = logrus.WithField("component", "flow/Deduper") -var timeNow = time.Now - -// deduperCache implement a LRU cache whose elements are evicted if they haven't been accessed -// during the expire duration. -// It is not safe for concurrent access. -type deduperCache struct { - expire time.Duration - // key: ebpf.BpfFlowId with the interface and MACs erased, to detect duplicates - // value: listElement pointing to a struct entry - ifaces map[ebpf.BpfFlowId]*list.Element - // element: entry structs of the ifaces map ordered by expiry time - entries *list.List -} - -type entry struct { - key *ebpf.BpfFlowId - dnsRecord *ebpf.BpfDnsRecordT - flowRTT *uint64 - networkEvents *[model.MaxNetworkEvents][model.NetworkEventsMaxEventsMD]uint8 - ifIndex uint32 - expiryTime time.Time - dupList *[]map[string]uint8 -} - -// Dedupe receives flows and filters these belonging to duplicate interfaces. It will forward -// the flows from the first interface coming to it, until that flow expires in the cache -// (no activity for it during the expiration time) -// The justMark argument tells that the deduper should not drop the duplicate flows but -// set their Duplicate field. -func Dedupe(expireTime time.Duration, justMark, mergeDup bool, ifaceNamer InterfaceNamer, m *metrics.Metrics) func(in <-chan []*model.Record, out chan<- []*model.Record) { - cache := &deduperCache{ - expire: expireTime, - entries: list.New(), - ifaces: map[ebpf.BpfFlowId]*list.Element{}, - } - return func(in <-chan []*model.Record, out chan<- []*model.Record) { - for records := range in { - cache.removeExpired() - fwd := make([]*model.Record, 0, len(records)) - for _, record := range records { - cache.checkDupe(record, justMark, mergeDup, &fwd, ifaceNamer) - } - if len(fwd) > 0 { - out <- fwd - m.EvictionCounter.WithSource("deduper").Inc() - m.EvictedFlowsCounter.WithSource("deduper").Add(float64(len(fwd))) - } - m.BufferSizeGauge.WithBufferName("deduper-list").Set(float64(cache.entries.Len())) - m.BufferSizeGauge.WithBufferName("deduper-map").Set(float64(len(cache.ifaces))) - } - } -} - -// checkDupe check current record if its already available nad if not added to fwd records list -func (c *deduperCache) checkDupe(r *model.Record, justMark, mergeDup bool, fwd *[]*model.Record, ifaceNamer InterfaceNamer) { - mergeEntry := make(map[string]uint8) - rk := r.ID - // zeroes fields from key that should be ignored from the flow comparison - rk.IfIndex = 0 - rk.Direction = 0 - if r.Metrics.AdditionalMetrics == nil { - r.Metrics.AdditionalMetrics = &ebpf.BpfAdditionalMetrics{} - } - // If a flow has been accounted previously, whatever its interface was, - // it updates the expiry time for that flow - if ele, ok := c.ifaces[rk]; ok { - fEntry := ele.Value.(*entry) - fEntry.expiryTime = timeNow().Add(c.expire) - c.entries.MoveToFront(ele) - // The input flow is duplicate if its interface is different to the interface - // of the non-duplicate flow that was first registered in the cache - // except if the new flow has DNS enrichment in this case will enrich the flow in the cache - // with DNS info and mark the current flow as duplicate - if r.Metrics.AdditionalMetrics.DnsRecord.Latency != 0 && fEntry.dnsRecord.Latency == 0 { - // copy DNS record to the cached entry and mark it as duplicate - fEntry.dnsRecord.Flags = r.Metrics.AdditionalMetrics.DnsRecord.Flags - fEntry.dnsRecord.Id = r.Metrics.AdditionalMetrics.DnsRecord.Id - fEntry.dnsRecord.Latency = r.Metrics.AdditionalMetrics.DnsRecord.Latency - fEntry.dnsRecord.Errno = r.Metrics.AdditionalMetrics.DnsRecord.Errno - } - // If the new flow has flowRTT then enrich the flow in the case with the same RTT and mark it duplicate - if r.Metrics.AdditionalMetrics.FlowRtt != 0 && *fEntry.flowRTT == 0 { - *fEntry.flowRTT = r.Metrics.AdditionalMetrics.FlowRtt - } - // If the new flows have network events, then enrich the flow in the cache and mark the flow as duplicate - for i, md := range r.Metrics.AdditionalMetrics.NetworkEvents { - if !model.AllZerosMetaData(md) && model.AllZerosMetaData(fEntry.networkEvents[i]) { - copy(fEntry.networkEvents[i][:], md[:]) - } - } - if fEntry.ifIndex != r.ID.IfIndex { - if justMark { - r.Duplicate = true - *fwd = append(*fwd, r) - } - if mergeDup { - ifName := ifaceNamer(int(r.ID.IfIndex)) - mergeEntry[ifName] = r.ID.Direction - if dupEntryNew(*fEntry.dupList, mergeEntry) { - *fEntry.dupList = append(*fEntry.dupList, mergeEntry) - dlog.Debugf("merge list entries dump:") - for _, entry := range *fEntry.dupList { - for k, v := range entry { - dlog.Debugf("interface %s dir %d", k, v) - } - } - } - } - return - } - *fwd = append(*fwd, r) - return - } - // The flow has not been accounted previously (or was forgotten after expiration) - // so we register it for that concrete interface - e := entry{ - key: &rk, - dnsRecord: &r.Metrics.AdditionalMetrics.DnsRecord, - flowRTT: &r.Metrics.AdditionalMetrics.FlowRtt, - networkEvents: &r.Metrics.AdditionalMetrics.NetworkEvents, - ifIndex: r.ID.IfIndex, - expiryTime: timeNow().Add(c.expire), - } - if mergeDup { - ifName := ifaceNamer(int(r.ID.IfIndex)) - mergeEntry[ifName] = r.ID.Direction - r.DupList = append(r.DupList, mergeEntry) - e.dupList = &r.DupList - } - c.ifaces[rk] = c.entries.PushFront(&e) - *fwd = append(*fwd, r) -} - -func dupEntryNew(dupList []map[string]uint8, mergeEntry map[string]uint8) bool { - for _, entry := range dupList { - if reflect.DeepEqual(entry, mergeEntry) { - return false - } - } - return true -} - -func (c *deduperCache) removeExpired() { - now := timeNow() - ele := c.entries.Back() - evicted := 0 - for ele != nil && now.After(ele.Value.(*entry).expiryTime) { - evicted++ - c.entries.Remove(ele) - fEntry := ele.Value.(*entry) - fEntry.dupList = nil - delete(c.ifaces, *fEntry.key) - ele = c.entries.Back() - } - if evicted > 0 { - dlog.WithFields(logrus.Fields{ - "current": c.entries.Len(), - "evicted": evicted, - "expiryTime": c.expire, - }).Debug("entries evicted from the deduper cache") - } -} diff --git a/pkg/flow/deduper_test.go b/pkg/flow/deduper_test.go deleted file mode 100644 index 13c3a5695..000000000 --- a/pkg/flow/deduper_test.go +++ /dev/null @@ -1,345 +0,0 @@ -package flow - -import ( - "net" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "github.com/netobserv/flowlogs-pipeline/pkg/config" - "github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf" - "github.com/netobserv/netobserv-ebpf-agent/pkg/metrics" - "github.com/netobserv/netobserv-ebpf-agent/pkg/model" -) - -var ( - // the same flow from 2 different interfaces - oneIf1 = &model.Record{ - ID: ebpf.BpfFlowId{ - Direction: 1, - SrcPort: 123, - DstPort: 456, - IfIndex: 1, - }, - Metrics: model.BpfFlowContent{ - BpfFlowMetrics: &ebpf.BpfFlowMetrics{ - EthProtocol: 1, - SrcMac: model.MacAddr{0x1}, - DstMac: model.MacAddr{0x1}, - Packets: 2, - Bytes: 456, - Flags: 1, - }, - }, - Interface: "eth0", - } - oneIf2 = &model.Record{ - ID: ebpf.BpfFlowId{ - Direction: 1, - SrcPort: 123, - DstPort: 456, - IfIndex: 2, - }, - Metrics: model.BpfFlowContent{ - BpfFlowMetrics: &ebpf.BpfFlowMetrics{ - EthProtocol: 1, - SrcMac: model.MacAddr{0x2}, - DstMac: model.MacAddr{0x2}, - Packets: 2, - Bytes: 456, - Flags: 1, - }, - }, - Interface: "123456789", - } - // another flow from 2 different interfaces and directions - twoIf1 = &model.Record{ - ID: ebpf.BpfFlowId{ - Direction: 1, - SrcPort: 333, - DstPort: 456, - IfIndex: 1, - }, - Metrics: model.BpfFlowContent{ - BpfFlowMetrics: &ebpf.BpfFlowMetrics{ - EthProtocol: 1, - SrcMac: model.MacAddr{0x1}, - DstMac: model.MacAddr{0x1}, - Packets: 2, - Bytes: 456, - Flags: 1, - }, - }, - Interface: "eth0", - } - twoIf2 = &model.Record{ - ID: ebpf.BpfFlowId{ - Direction: 0, - SrcPort: 333, - DstPort: 456, - IfIndex: 2, - }, - Metrics: model.BpfFlowContent{ - BpfFlowMetrics: &ebpf.BpfFlowMetrics{ - EthProtocol: 1, - SrcMac: model.MacAddr{0x2}, - DstMac: model.MacAddr{0x2}, - Packets: 2, - Bytes: 456, - Flags: 1, - }, - }, - Interface: "123456789", - } - // another flow from 2 different interfaces and directions with DNS latency set on the latest - threeIf1 = &model.Record{ - ID: ebpf.BpfFlowId{ - Direction: 1, - SrcPort: 433, - DstPort: 456, - IfIndex: 1, - }, - Metrics: model.BpfFlowContent{ - BpfFlowMetrics: &ebpf.BpfFlowMetrics{ - EthProtocol: 1, - SrcMac: model.MacAddr{0x1}, - DstMac: model.MacAddr{0x1}, - Packets: 2, - Bytes: 456, - Flags: 1, - }, - }, - Interface: "eth0", - } - threeIf2 = &model.Record{ - ID: ebpf.BpfFlowId{ - Direction: 0, - SrcPort: 433, - DstPort: 456, - IfIndex: 2, - }, - Metrics: model.BpfFlowContent{ - BpfFlowMetrics: &ebpf.BpfFlowMetrics{ - EthProtocol: 1, - DstMac: model.MacAddr{0x2}, - SrcMac: model.MacAddr{0x2}, - Packets: 2, - Bytes: 456, - Flags: 1, - }, - AdditionalMetrics: &ebpf.BpfAdditionalMetrics{ - DnsRecord: ebpf.BpfDnsRecordT{Id: 1, Flags: 100, Latency: 1000}, - }, - }, - Interface: "123456789", - DNSLatency: time.Millisecond, - } - // another flow from 2 different interfaces and directions with RTT set on the latest - fourIf1 = &model.Record{ - ID: ebpf.BpfFlowId{ - Direction: 1, - SrcPort: 533, - DstPort: 456, - IfIndex: 1, - }, - Metrics: model.BpfFlowContent{ - BpfFlowMetrics: &ebpf.BpfFlowMetrics{ - EthProtocol: 1, - DstMac: model.MacAddr{0x1}, - SrcMac: model.MacAddr{0x1}, - Packets: 2, - Bytes: 456, - Flags: 1, - }, - }, - Interface: "eth0", - } - fourIf2 = &model.Record{ - ID: ebpf.BpfFlowId{ - Direction: 0, - SrcPort: 533, - DstPort: 456, - IfIndex: 2, - }, - Metrics: model.BpfFlowContent{ - BpfFlowMetrics: &ebpf.BpfFlowMetrics{ - EthProtocol: 1, - DstMac: model.MacAddr{0x2}, - SrcMac: model.MacAddr{0x2}, - Packets: 2, - Bytes: 456, - Flags: 1, - }, - AdditionalMetrics: &ebpf.BpfAdditionalMetrics{ - FlowRtt: 100, - }, - }, - Interface: "123456789", - TimeFlowRtt: 100, - } - // another flow from 2 different interfaces and directions with NetworkEvents set on the latest - fiveIf1 = &model.Record{ - ID: ebpf.BpfFlowId{ - Direction: 1, - SrcPort: 633, - DstPort: 456, - IfIndex: 1, - }, - Metrics: model.BpfFlowContent{ - BpfFlowMetrics: &ebpf.BpfFlowMetrics{ - EthProtocol: 1, - DstMac: model.MacAddr{0x1}, - SrcMac: model.MacAddr{0x1}, - Packets: 2, - Bytes: 456, - Flags: 1, - }, - }, - Interface: "eth0", - } - fiveIf2 = &model.Record{ - ID: ebpf.BpfFlowId{ - Direction: 0, - SrcPort: 633, - DstPort: 456, - IfIndex: 2, - }, - Metrics: model.BpfFlowContent{ - BpfFlowMetrics: &ebpf.BpfFlowMetrics{ - EthProtocol: 1, - DstMac: model.MacAddr{0x2}, - SrcMac: model.MacAddr{0x2}, - Packets: 2, - Bytes: 456, - Flags: 1, - }, - AdditionalMetrics: &ebpf.BpfAdditionalMetrics{ - FlowRtt: 100, - }, - }, - Interface: "123456789", - NetworkMonitorEventsMD: []config.GenericMap{{"Name": "test netpol1"}}} -) - -func TestDedupe(t *testing.T) { - input := make(chan []*model.Record, 100) - output := make(chan []*model.Record, 100) - - go Dedupe(time.Minute, false, false, interfaceNamer, metrics.NewMetrics(&metrics.Settings{}))(input, output) - - input <- []*model.Record{ - oneIf2, // record 1 at interface 2: should be accepted - twoIf1, // record 2 at interface 1: should be accepted - oneIf1, // record 1 duplicate at interface 1: should NOT be accepted - oneIf1, // (same record key, different interface) - twoIf2, // record 2 duplicate at interface 2: should NOT be accepted - oneIf2, // record 1 at interface 1: should be accepted (same record key, same interface) - threeIf1, // record 1 has no DNS so it get enriched with DNS record from the following record - threeIf2, // record 2 is duplicate of record1 and have DNS info , should not be accepted - fourIf1, // record 1 has no RTT so it get enriched with RTT from the following record - fourIf2, // record 2 is duplicate of record1 and have RTT , should not be accepted - fiveIf1, // record 1 has no NetworkEvents so it get enriched with NetworkEvents from the following record - fiveIf2, // record 2 is duplicate of record1 and have NetworkEvents, should not be accepted - } - deduped := receiveTimeout(t, output) - assert.Equal(t, []*model.Record{oneIf2, twoIf1, oneIf2, threeIf1, fourIf1, fiveIf1}, deduped) - - // should still accept records with same key, same interface, - // and discard these with same key, different interface - input <- []*model.Record{oneIf1, oneIf2} - deduped = receiveTimeout(t, output) - assert.Equal(t, []*model.Record{oneIf2}, deduped) - - // make sure flow with no DNS get enriched with the DNS record - assert.Equal(t, threeIf1.Metrics.AdditionalMetrics.DnsRecord.Id, threeIf2.Metrics.AdditionalMetrics.DnsRecord.Id) - assert.Equal(t, threeIf1.Metrics.AdditionalMetrics.DnsRecord.Flags, threeIf2.Metrics.AdditionalMetrics.DnsRecord.Flags) - assert.Equal(t, threeIf1.Metrics.AdditionalMetrics.DnsRecord.Latency, threeIf2.Metrics.AdditionalMetrics.DnsRecord.Latency) - - // make sure flow with no RTT get enriched from the dup flow with RTT - assert.Equal(t, fourIf1.Metrics.AdditionalMetrics.FlowRtt, fourIf2.Metrics.AdditionalMetrics.FlowRtt) - - // make sure flow with no NetworkEvents gets enriched from dup flow with NetworkEvents - assert.Equal(t, fiveIf1.Metrics.AdditionalMetrics.NetworkEvents, fiveIf2.Metrics.AdditionalMetrics.NetworkEvents) -} - -func TestDedupe_EvictFlows(t *testing.T) { - tm := &timerMock{now: time.Now()} - timeNow = tm.Now - input := make(chan []*model.Record, 100) - output := make(chan []*model.Record, 100) - - go Dedupe(15*time.Second, false, false, interfaceNamer, metrics.NewMetrics(&metrics.Settings{}))(input, output) - - // Should only accept records 1 and 2, at interface 1 - input <- []*model.Record{oneIf1, twoIf1, oneIf2} - assert.Equal(t, []*model.Record{oneIf1, twoIf1}, - receiveTimeout(t, output)) - - tm.now = tm.now.Add(10 * time.Second) - - // After 10 seconds, it still filters existing flows from different interfaces - input <- []*model.Record{oneIf2} - time.Sleep(100 * time.Millisecond) - requireNoEviction(t, output) - - tm.now = tm.now.Add(10 * time.Second) - - // model.Record 2 hasn't been accounted for >expiryTime, so it will accept the it again - // whatever the interface. - // Since record 1 was accessed 10 seconds ago (=4.20 kernels with LookupAndDelete - for iterator.Next(&id, &metrics) { + for iterator.Next(&id, &baseMetrics) { count++ if err := flowMap.Delete(id); err != nil { log.WithError(err).WithField("flowId", id).Warnf("couldn't delete flow entry") met.Errors.WithErrorName("flow-fetcher-legacy", "CannotDeleteFlows").Inc() } - // We observed that eBFP PerCPU map might insert multiple times the same key in the map - // (probably due to race conditions) so we need to re-join metrics again at userspace - aggr := model.BpfFlowContent{} - for i := range metrics { - aggr.AccumulateBase(&metrics[i]) - } - flows[id] = aggr + flows[id] = model.NewBpfFlowContent(baseMetrics) } met.BufferSizeGauge.WithBufferName("hashmap-legacy-total").Set(float64(count)) met.BufferSizeGauge.WithBufferName("hashmap-legacy-unique").Set(float64(len(flows)))