diff --git a/bpf/flows.c b/bpf/flows.c
index a226af3c8..70c7a9ee6 100644
--- a/bpf/flows.c
+++ b/bpf/flows.c
@@ -79,6 +79,23 @@ static inline void update_dns(additional_metrics *extra_metrics, pkt_info *pkt,
}
}
+static inline void add_observed_intf(additional_metrics *value, u32 if_index, u8 direction) {
+ if (value->nb_observed_intf < MAX_OBSERVED_INTERFACES) {
+ for (u8 i = 0; i < value->nb_observed_intf; i++) {
+ if (value->observed_intf[i].if_index == if_index &&
+ value->observed_intf[i].direction == direction) {
+ return;
+ }
+ }
+ value->observed_intf[value->nb_observed_intf].if_index = if_index;
+ value->observed_intf[value->nb_observed_intf].direction = direction;
+ value->nb_observed_intf++;
+ } else {
+ increase_counter(OBSERVED_INTF_MISSED);
+ BPF_PRINTK("observed interface missed (array capacity reached) for ifindex %d\n", if_index);
+ }
+}
+
static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
u32 filter_sampling = 0;
@@ -110,13 +127,10 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
return TC_ACT_OK;
}
- //Set extra fields
- id.if_index = skb->ifindex;
- id.direction = direction;
-
// check if this packet need to be filtered if filtering feature is enabled
if (is_filter_enabled()) {
- bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, &filter_sampling);
+ bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, &filter_sampling,
+ direction);
if (filter_sampling == 0) {
filter_sampling = sampling;
}
@@ -137,19 +151,47 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
}
flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
- update_existing_flow(aggregate_flow, &pkt, len, filter_sampling);
+ if (aggregate_flow->if_index_first_seen == skb->ifindex) {
+ update_existing_flow(aggregate_flow, &pkt, len, filter_sampling);
+ } else if (skb->ifindex != 0) {
+ // Only add info that we've seen this interface
+ additional_metrics *extra_metrics =
+ (additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id);
+ if (extra_metrics != NULL) {
+ add_observed_intf(extra_metrics, skb->ifindex, direction);
+ } else {
+ additional_metrics new_metrics = {
+ .eth_protocol = eth_protocol,
+ .start_mono_time_ts = pkt.current_ts,
+ .end_mono_time_ts = pkt.current_ts,
+ };
+ add_observed_intf(&new_metrics, skb->ifindex, direction);
+ long ret =
+ bpf_map_update_elem(&additional_flow_metrics, &id, &new_metrics, BPF_NOEXIST);
+ if (ret == -EEXIST) {
+ extra_metrics =
+ (additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id);
+ if (extra_metrics != NULL) {
+ add_observed_intf(extra_metrics, skb->ifindex, direction);
+ }
+ } else if (ret != 0 && trace_messages) {
+ bpf_printk("error creating new observed_intf: %d\n", ret);
+ }
+ }
+ }
} else {
// Key does not exist in the map, and will need to create a new entry.
- flow_metrics new_flow = {
- .packets = 1,
- .bytes = len,
- .eth_protocol = eth_protocol,
- .start_mono_time_ts = pkt.current_ts,
- .end_mono_time_ts = pkt.current_ts,
- .flags = pkt.flags,
- .dscp = pkt.dscp,
- .sampling = filter_sampling,
- };
+ flow_metrics new_flow;
+ __builtin_memset(&new_flow, 0, sizeof(new_flow));
+ new_flow.if_index_first_seen = skb->ifindex;
+ new_flow.direction_first_seen = direction;
+ new_flow.packets = 1;
+ new_flow.bytes = len;
+ new_flow.eth_protocol = eth_protocol;
+ new_flow.start_mono_time_ts = pkt.current_ts;
+ new_flow.end_mono_time_ts = pkt.current_ts;
+ new_flow.dscp = pkt.dscp;
+ new_flow.sampling = filter_sampling;
__builtin_memcpy(new_flow.dst_mac, eth->h_dest, ETH_ALEN);
__builtin_memcpy(new_flow.src_mac, eth->h_source, ETH_ALEN);
@@ -194,9 +236,6 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
// Update additional metrics (per-CPU map)
if (pkt.dns_id != 0 || dns_errno != 0) {
- // hack on id will be removed with dedup-in-kernel work
- id.direction = 0;
- id.if_index = 0;
additional_metrics *extra_metrics =
(additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id);
if (extra_metrics != NULL) {
diff --git a/bpf/flows_filter.h b/bpf/flows_filter.h
index 74e8553aa..43b211bfb 100644
--- a/bpf/flows_filter.h
+++ b/bpf/flows_filter.h
@@ -33,7 +33,8 @@ static __always_inline int is_equal_ip(u8 *ip1, u8 *ip2, u8 len) {
static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_t *key,
filter_action *action, u8 len, u8 offset,
- u16 flags, u32 drop_reason, u32 *sampling) {
+ u16 flags, u32 drop_reason, u32 *sampling,
+ u8 direction) {
int result = 0;
struct filter_value_t *rule = (struct filter_value_t *)bpf_map_lookup_elem(&filter_map, key);
@@ -161,7 +162,7 @@ static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_
if (!is_zero_ip(rule->ip, len)) {
// for Ingress side we can filter using dstIP and for Egress side we can filter using srcIP
- if (id->direction == INGRESS) {
+ if (direction == INGRESS) {
if (is_equal_ip(rule->ip, id->dst_ip + offset, len)) {
BPF_PRINTK("dstIP matched\n");
result++;
@@ -181,7 +182,7 @@ static __always_inline int do_flow_filter_lookup(flow_id *id, struct filter_key_
}
if (rule->direction != MAX_DIRECTION) {
- if (rule->direction == id->direction) {
+ if (rule->direction == direction) {
BPF_PRINTK("direction matched\n");
result++;
} else {
@@ -237,7 +238,8 @@ static __always_inline int flow_filter_setup_lookup_key(flow_id *id, struct filt
* check if the flow match filter rule and return >= 1 if the flow is to be dropped
*/
static __always_inline int is_flow_filtered(flow_id *id, filter_action *action, u16 flags,
- u32 drop_reason, u16 eth_protocol, u32 *sampling) {
+ u32 drop_reason, u16 eth_protocol, u32 *sampling,
+ u8 direction) {
struct filter_key_t key;
u8 len, offset;
int result = 0;
@@ -251,7 +253,8 @@ static __always_inline int is_flow_filtered(flow_id *id, filter_action *action,
return result;
}
- result = do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason, sampling);
+ result = do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason, sampling,
+ direction);
// we have a match so return
if (result > 0) {
return result;
@@ -263,7 +266,8 @@ static __always_inline int is_flow_filtered(flow_id *id, filter_action *action,
return result;
}
- return do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason, sampling);
+ return do_flow_filter_lookup(id, &key, action, len, offset, flags, drop_reason, sampling,
+ direction);
}
#endif //__FLOWS_FILTER_H__
diff --git a/bpf/network_events_monitoring.h b/bpf/network_events_monitoring.h
index ef1839f87..96bac5a1f 100644
--- a/bpf/network_events_monitoring.h
+++ b/bpf/network_events_monitoring.h
@@ -94,17 +94,14 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me
}
// check if this packet need to be filtered if filtering feature is enabled
- bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL);
+ bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL, 0);
if (skip) {
return 0;
}
- for (direction dir = INGRESS; dir < MAX_DIRECTION; dir++) {
- id.direction = dir;
- ret = lookup_and_update_existing_flow_network_events(&id, md_len, user_cookie);
- if (ret == 0) {
- return ret;
- }
+ ret = lookup_and_update_existing_flow_network_events(&id, md_len, user_cookie);
+ if (ret == 0) {
+ return ret;
}
// there is no matching flows so lets create new one and add the network event metadata
diff --git a/bpf/pca.h b/bpf/pca.h
index 7f4acd563..8c77d08f8 100644
--- a/bpf/pca.h
+++ b/bpf/pca.h
@@ -53,12 +53,8 @@ static inline bool validate_pca_filter(struct __sk_buff *skb, direction dir) {
return false;
}
- //Set extra fields
- id.if_index = skb->ifindex;
- id.direction = dir;
-
// check if this packet need to be filtered if filtering feature is enabled
- bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, NULL);
+ bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, NULL, dir);
if (skip) {
return false;
}
diff --git a/bpf/pkt_drops.h b/bpf/pkt_drops.h
index 5684caba6..a0733197c 100644
--- a/bpf/pkt_drops.h
+++ b/bpf/pkt_drops.h
@@ -64,7 +64,7 @@ static inline int trace_pkt_drop(void *ctx, u8 state, struct sk_buff *skb,
}
// check if this packet need to be filtered if filtering feature is enabled
- bool skip = check_and_do_flow_filtering(&id, flags, reason, eth_protocol, NULL);
+ bool skip = check_and_do_flow_filtering(&id, flags, reason, eth_protocol, NULL, 0);
if (skip) {
return 0;
}
diff --git a/bpf/pkt_translation.h b/bpf/pkt_translation.h
index 00c0397da..1602665c3 100644
--- a/bpf/pkt_translation.h
+++ b/bpf/pkt_translation.h
@@ -163,7 +163,7 @@ static inline int trace_nat_manip_pkt(struct nf_conn *ct, struct sk_buff *skb) {
}
// check if this packet need to be filtered if filtering feature is enabled
- bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL);
+ bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL, 0);
if (skip) {
return 0;
}
diff --git a/bpf/rtt_tracker.h b/bpf/rtt_tracker.h
index f3ac87478..c102557af 100644
--- a/bpf/rtt_tracker.h
+++ b/bpf/rtt_tracker.h
@@ -59,7 +59,7 @@ static inline int calculate_flow_rtt_tcp(struct sock *sk, struct sk_buff *skb) {
rtt *= 1000u;
// check if this packet need to be filtered if filtering feature is enabled
- bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL);
+ bool skip = check_and_do_flow_filtering(&id, flags, 0, eth_protocol, NULL, 0);
if (skip) {
return 0;
}
diff --git a/bpf/types.h b/bpf/types.h
index 558ba2f17..ca876393e 100644
--- a/bpf/types.h
+++ b/bpf/types.h
@@ -66,6 +66,7 @@ typedef __u64 u64;
#define MAX_FILTER_ENTRIES 16
#define MAX_EVENT_MD 8
#define MAX_NETWORK_EVENTS 4
+#define MAX_OBSERVED_INTERFACES 4
// according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml
typedef enum direction_t {
@@ -80,26 +81,29 @@ const enum direction_t *unused1 __attribute__((unused));
const u8 ip4in6[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff};
typedef struct flow_metrics_t {
- struct bpf_spin_lock lock;
- u16 eth_protocol;
- // L2 data link layer
- u8 src_mac[ETH_ALEN];
- u8 dst_mac[ETH_ALEN];
- u32 packets;
- u64 bytes;
// Flow start and end times as monotomic timestamps in nanoseconds
// as output from bpf_ktime_get_ns()
u64 start_mono_time_ts;
u64 end_mono_time_ts;
+ u64 bytes;
+ u32 packets;
+ u16 eth_protocol;
// TCP Flags from https://www.ietf.org/rfc/rfc793.txt
u16 flags;
+ // L2 data link layer
+ u8 src_mac[ETH_ALEN];
+ u8 dst_mac[ETH_ALEN];
+ // OS interface index
+ u32 if_index_first_seen;
+ struct bpf_spin_lock lock;
+ u32 sampling;
+ u8 direction_first_seen;
// The positive errno of a failed map insertion that caused a flow
// to be sent via ringbuffer.
// 0 otherwise
// https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
u8 errno;
u8 dscp;
- u32 sampling;
} flow_metrics;
// Force emitting enums/structs into the ELF
@@ -109,22 +113,20 @@ typedef struct additional_metrics_t {
u64 start_mono_time_ts;
u64 end_mono_time_ts;
struct dns_record_t {
+ u64 latency;
u16 id;
u16 flags;
- u64 latency;
u8 errno;
} dns_record;
struct pkt_drops_t {
- u32 packets;
u64 bytes;
+ u32 packets;
+ u32 latest_drop_cause;
u16 latest_flags;
u8 latest_state;
- u32 latest_drop_cause;
} pkt_drops;
u64 flow_rtt;
- u8 network_events_idx;
u8 network_events[MAX_NETWORK_EVENTS][MAX_EVENT_MD];
- u16 eth_protocol;
struct translated_flow_t {
u8 saddr[IP_MAX_LEN];
u8 daddr[IP_MAX_LEN];
@@ -133,23 +135,24 @@ typedef struct additional_metrics_t {
u16 zone_id;
u8 icmp_id;
} translated_flow;
+ struct observed_intf_t {
+ u8 direction;
+ u32 if_index;
+ } observed_intf[MAX_OBSERVED_INTERFACES];
+ u16 eth_protocol;
+ u8 network_events_idx;
+ u8 nb_observed_intf;
} additional_metrics;
// Force emitting enums/structs into the ELF
const struct additional_metrics_t *unused3 __attribute__((unused));
-
-// Force emitting enums/structs into the ELF
const struct dns_record_t *unused4 __attribute__((unused));
-
-// Force emitting enums/structs into the ELF
const struct pkt_drops_t *unused5 __attribute__((unused));
-
-// Force emitting struct translated_flow_t into the ELF.
const struct translated_flow_t *unused6 __attribute__((unused));
+const struct observed_intf_t *unused13 __attribute__((unused));
// Attributes that uniquely identify a flow
typedef struct flow_id_t {
- u8 direction;
// L3 network layer
// IPv4 addresses are encoded as IPv6 addresses with prefix ::ffff/96
// as described in https://datatracker.ietf.org/doc/html/rfc4038#section-4.2
@@ -162,8 +165,6 @@ typedef struct flow_id_t {
// ICMP protocol
u8 icmp_type;
u8 icmp_code;
- // OS interface index
- u32 if_index;
} flow_id;
// Force emitting enums/structs into the ELF
@@ -220,6 +221,7 @@ typedef enum global_counters_key_t {
NETWORK_EVENTS_ERR_GROUPID_MISMATCH,
NETWORK_EVENTS_ERR_UPDATE_MAP_FLOWS,
NETWORK_EVENTS_GOOD,
+ OBSERVED_INTF_MISSED,
MAX_COUNTERS,
} global_counters_key;
@@ -230,7 +232,7 @@ const enum global_counters_key_t *unused9 __attribute__((unused));
struct filter_key_t {
u32 prefix_len;
u8 ip_data[IP_MAX_LEN];
-} __attribute__((packed));
+} filter_key;
// Force emitting enums/structs into the ELF
const struct filter_key_t *unused10 __attribute__((unused));
diff --git a/bpf/utils.h b/bpf/utils.h
index 64b6a6ce9..face7d000 100644
--- a/bpf/utils.h
+++ b/bpf/utils.h
@@ -184,12 +184,14 @@ static inline bool is_filter_enabled() {
/*
* check if flow filter is enabled and if we need to continue processing the packet or not
*/
-static inline bool check_and_do_flow_filtering(flow_id *id, u16 flags, u32 drop_reason,
- u16 eth_protocol, u32 *sampling) {
+static __always_inline bool check_and_do_flow_filtering(flow_id *id, u16 flags, u32 drop_reason,
+ u16 eth_protocol, u32 *sampling,
+ u8 direction) {
// check if this packet need to be filtered if filtering feature is enabled
if (is_filter_enabled()) {
filter_action action = ACCEPT;
- if (is_flow_filtered(id, &action, flags, drop_reason, eth_protocol, sampling) != 0 &&
+ if (is_flow_filtered(id, &action, flags, drop_reason, eth_protocol, sampling, direction) !=
+ 0 &&
action != MAX_FILTER_ACTIONS) {
// we have matching rules follow through the actions to decide if we should accept or reject the flow
// and update global counter for both cases
diff --git a/cmd/netobserv-ebpf-agent.go b/cmd/netobserv-ebpf-agent.go
index 4da302c44..fe76df419 100644
--- a/cmd/netobserv-ebpf-agent.go
+++ b/cmd/netobserv-ebpf-agent.go
@@ -51,9 +51,6 @@ func main() {
Error("PProf HTTP listener stopped working")
}()
}
- if config.DeduperFCExpiry == 0 {
- config.DeduperFCExpiry = 2 * config.CacheActiveTimeout
- }
logrus.WithField("configuration", fmt.Sprintf("%#v", config)).Debugf("configuration loaded")
diff --git a/docs/architecture.md b/docs/architecture.md
index fea886396..dc7e75c70 100644
--- a/docs/architecture.md
+++ b/docs/architecture.md
@@ -6,6 +6,8 @@ The following graph provides a birds' eye view on how the different components a
For more info on each component, please check their corresponding Go docs.
+
+
### Kernel space
```mermaid
@@ -33,21 +35,13 @@ flowchart TD
```mermaid
flowchart TD
E(ebpf.FlowFetcher) --> |"pushes via
RingBuffer"| RB(flow.RingBufTracer)
- style E fill:#990
+ style E fill:#7CA
E --> |"polls
HashMap"| M(flow.MapTracer)
RB --> |chan *model.Record| ACC(flow.Accounter)
RB -.-> |flushes| M
- ACC --> |"chan []*model.Record"| DD(flow.Deduper)
- M --> |"chan []*model.Record"| DD
-
- subgraph Optional
- DD
- end
-
- DD --> |"chan []*model.Record"| CL(flow.CapacityLimiter)
+ ACC --> |"chan []*model.Record"| CL(flow.CapacityLimiter)
+ M --> |"chan []*model.Record"| CL
- CL --> |"chan []*model.Record"| DC(flow.Decorator)
-
- DC --> |"chan []*model.Record"| EX("export.GRPCProto
or
export.KafkaProto
or
export.DirectFLP")
+ CL --> |"chan []*model.Record"| EX("export.GRPCProto
or
export.KafkaProto
or
export.DirectFLP")
```
diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go
index 47fab9897..ccf8a7e82 100644
--- a/pkg/agent/agent.go
+++ b/pkg/agent/agent.go
@@ -119,13 +119,8 @@ type Flows struct {
rbTracer *flow.RingBufTracer
accounter *flow.Accounter
limiter *flow.CapacityLimiter
- deduper node.MiddleFunc[[]*model.Record, []*model.Record]
exporter node.TerminalFunc[[]*model.Record]
- // elements used to decorate flows with extra information
- interfaceNamer flow.InterfaceNamer
- agentIP net.IP
-
status Status
promoServer *http.Server
sampleDecoder *ovnobserv.SampleDecoder
@@ -290,6 +285,8 @@ func flowsAgent(cfg *Config, m *metrics.Metrics,
}
return iface
}
+ model.SetGlobals(agentIP, interfaceNamer)
+
var promoServer *http.Server
if cfg.MetricsEnable {
promoServer = promo.InitializePrometheus(m.Settings)
@@ -302,26 +299,19 @@ func flowsAgent(cfg *Config, m *metrics.Metrics,
rbTracer := flow.NewRingBufTracer(fetcher, mapTracer, cfg.CacheActiveTimeout, m)
accounter := flow.NewAccounter(cfg.CacheMaxFlows, cfg.CacheActiveTimeout, time.Now, monotime.Now, m)
limiter := flow.NewCapacityLimiter(m)
- var deduper node.MiddleFunc[[]*model.Record, []*model.Record]
- if cfg.Deduper == DeduperFirstCome {
- deduper = flow.Dedupe(cfg.DeduperFCExpiry, cfg.DeduperJustMark, cfg.DeduperMerge, interfaceNamer, m)
- }
return &Flows{
- ebpf: fetcher,
- exporter: exporter,
- interfaces: registerer,
- filter: filter,
- cfg: cfg,
- mapTracer: mapTracer,
- rbTracer: rbTracer,
- accounter: accounter,
- limiter: limiter,
- deduper: deduper,
- agentIP: agentIP,
- interfaceNamer: interfaceNamer,
- promoServer: promoServer,
- sampleDecoder: s,
+ ebpf: fetcher,
+ exporter: exporter,
+ interfaces: registerer,
+ filter: filter,
+ cfg: cfg,
+ mapTracer: mapTracer,
+ rbTracer: rbTracer,
+ accounter: accounter,
+ limiter: limiter,
+ promoServer: promoServer,
+ sampleDecoder: s,
}, nil
}
@@ -517,9 +507,6 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*mo
limiter := node.AsMiddle(f.limiter.Limit,
node.ChannelBufferLen(f.cfg.BuffersLength))
- decorator := node.AsMiddle(flow.Decorate(f.agentIP, f.interfaceNamer),
- node.ChannelBufferLen(f.cfg.BuffersLength))
-
ebl := f.cfg.ExporterBufferLength
if ebl == 0 {
ebl = f.cfg.BuffersLength
@@ -530,17 +517,9 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*mo
rbTracer.SendsTo(accounter)
- if f.deduper != nil {
- deduper := node.AsMiddle(f.deduper, node.ChannelBufferLen(f.cfg.BuffersLength))
- mapTracer.SendsTo(deduper)
- accounter.SendsTo(deduper)
- deduper.SendsTo(limiter)
- } else {
- mapTracer.SendsTo(limiter)
- accounter.SendsTo(limiter)
- }
- limiter.SendsTo(decorator)
- decorator.SendsTo(export)
+ mapTracer.SendsTo(limiter)
+ accounter.SendsTo(limiter)
+ limiter.SendsTo(export)
alog.Debug("starting graph")
mapTracer.Start()
diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go
index 4df5d352f..908b17a48 100644
--- a/pkg/agent/agent_test.go
+++ b/pkg/agent/agent_test.go
@@ -48,165 +48,72 @@ var (
key1 = ebpf.BpfFlowId{
SrcPort: 123,
DstPort: 456,
- IfIndex: 3,
}
- key1Dupe = ebpf.BpfFlowId{
- SrcPort: 123,
- DstPort: 456,
- IfIndex: 4,
- }
-
key2 = ebpf.BpfFlowId{
SrcPort: 333,
DstPort: 532,
- IfIndex: 3,
}
)
-func TestFlowsAgent_Deduplication(t *testing.T) {
- export := testAgent(t, &Config{
- CacheActiveTimeout: 10 * time.Millisecond,
- CacheMaxFlows: 100,
- DeduperJustMark: false,
- Deduper: DeduperFirstCome,
- })
-
- exported := export.Get(t, timeout)
- assert.Len(t, exported, 2)
-
- receivedKeys := map[ebpf.BpfFlowId]struct{}{}
-
- var key1Flows []*model.Record
- for _, f := range exported {
- require.NotContains(t, receivedKeys, f.ID)
- receivedKeys[f.ID] = struct{}{}
- switch f.ID {
- case key1:
- assert.EqualValues(t, 4, f.Metrics.Packets)
- assert.EqualValues(t, 66, f.Metrics.Bytes)
- assert.False(t, f.Duplicate)
- assert.Equal(t, "foo", f.Interface)
- key1Flows = append(key1Flows, f)
- case key1Dupe:
- assert.EqualValues(t, 4, f.Metrics.Packets)
- assert.EqualValues(t, 66, f.Metrics.Bytes)
- assert.False(t, f.Duplicate)
- assert.Equal(t, "bar", f.Interface)
- key1Flows = append(key1Flows, f)
- case key2:
- assert.EqualValues(t, 7, f.Metrics.Packets)
- assert.EqualValues(t, 33, f.Metrics.Bytes)
- assert.False(t, f.Duplicate)
- }
+func TestFlowsAgent_Decoration(t *testing.T) {
+ now := uint64(monotime.Now())
+ metrics1 := model.BpfFlowContent{
+ BpfFlowMetrics: &ebpf.BpfFlowMetrics{Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000,
+ IfIndexFirstSeen: 1,
+ DirectionFirstSeen: 1,
+ },
+ AdditionalMetrics: &ebpf.BpfAdditionalMetrics{NbObservedIntf: 1,
+ ObservedIntf: [model.MaxObservedInterfaces]ebpf.BpfObservedIntfT{{IfIndex: 3, Direction: 0}},
+ },
}
- assert.Lenf(t, key1Flows, 1, "only one flow should have been forwarded: %#v", key1Flows)
-}
-
-func TestFlowsAgent_DeduplicationJustMark(t *testing.T) {
- export := testAgent(t, &Config{
- CacheActiveTimeout: 10 * time.Millisecond,
- CacheMaxFlows: 100,
- DeduperJustMark: true,
- Deduper: DeduperFirstCome,
- })
-
- exported := export.Get(t, timeout)
- receivedKeys := map[ebpf.BpfFlowId]struct{}{}
-
- assert.Len(t, exported, 3)
- duplicates := 0
- for _, f := range exported {
- require.NotContains(t, receivedKeys, f.ID)
- receivedKeys[f.ID] = struct{}{}
- switch f.ID {
- case key1:
- assert.EqualValues(t, 4, f.Metrics.Packets)
- assert.EqualValues(t, 66, f.Metrics.Bytes)
- if f.Duplicate {
- duplicates++
- }
- assert.Equal(t, "foo", f.Interface)
- case key1Dupe:
- assert.EqualValues(t, 4, f.Metrics.Packets)
- assert.EqualValues(t, 66, f.Metrics.Bytes)
- if f.Duplicate {
- duplicates++
- }
- assert.Equal(t, "bar", f.Interface)
- case key2:
- assert.EqualValues(t, 7, f.Metrics.Packets)
- assert.EqualValues(t, 33, f.Metrics.Bytes)
- assert.False(t, f.Duplicate)
- }
+ metrics2 := model.BpfFlowContent{
+ BpfFlowMetrics: &ebpf.BpfFlowMetrics{Packets: 7, Bytes: 33, StartMonoTimeTs: now, EndMonoTimeTs: now + 2_000_000_000,
+ IfIndexFirstSeen: 4,
+ DirectionFirstSeen: 0,
+ },
+ AdditionalMetrics: &ebpf.BpfAdditionalMetrics{NbObservedIntf: 2,
+ ObservedIntf: [model.MaxObservedInterfaces]ebpf.BpfObservedIntfT{{IfIndex: 1, Direction: 1}, {IfIndex: 99, Direction: 1}},
+ },
}
- assert.Equalf(t, 1, duplicates, "exported flows should have only one duplicate: %#v", exported)
-}
-
-func TestFlowsAgent_Deduplication_None(t *testing.T) {
- export := testAgent(t, &Config{
- CacheActiveTimeout: 10 * time.Millisecond,
- CacheMaxFlows: 100,
- Deduper: DeduperNone,
- })
-
- exported := export.Get(t, timeout)
- assert.Len(t, exported, 3)
- receivedKeys := map[ebpf.BpfFlowId]struct{}{}
-
- var key1Flows []*model.Record
- for _, f := range exported {
- require.NotContains(t, receivedKeys, f.ID)
- receivedKeys[f.ID] = struct{}{}
- switch f.ID {
- case key1:
- assert.EqualValues(t, 4, f.Metrics.Packets)
- assert.EqualValues(t, 66, f.Metrics.Bytes)
- assert.False(t, f.Duplicate)
- assert.Equal(t, "foo", f.Interface)
- key1Flows = append(key1Flows, f)
- case key1Dupe:
- assert.EqualValues(t, 4, f.Metrics.Packets)
- assert.EqualValues(t, 66, f.Metrics.Bytes)
- assert.False(t, f.Duplicate)
- assert.Equal(t, "bar", f.Interface)
- key1Flows = append(key1Flows, f)
- case key2:
- assert.EqualValues(t, 7, f.Metrics.Packets)
- assert.EqualValues(t, 33, f.Metrics.Bytes)
- assert.False(t, f.Duplicate)
- }
+ flows := map[ebpf.BpfFlowId]model.BpfFlowContent{
+ key1: metrics1,
+ key2: metrics2,
}
- assert.Lenf(t, key1Flows, 2, "both key1 flows should have been forwarded: %#v", key1Flows)
-}
-func TestFlowsAgent_Decoration(t *testing.T) {
- export := testAgent(t, &Config{
- CacheActiveTimeout: 10 * time.Millisecond,
- CacheMaxFlows: 100,
- })
-
- exported := export.Get(t, timeout)
- assert.Len(t, exported, 3)
+ exported := testAgent(t, flows)
+ assert.Len(t, exported, 2)
// Tests that the decoration stage has been properly executed. It should
// add the interface name and the agent IP
for _, f := range exported {
assert.Equal(t, agentIP, f.AgentIP.String())
switch f.ID {
- case key1, key2:
- assert.Equal(t, "foo", f.Interface)
+ case key1:
+ assert.Len(t, f.Interfaces, 2)
+ assert.Equal(t, "eth0", f.Interfaces[0].Interface)
+ assert.Equal(t, "foo", f.Interfaces[1].Interface)
+ case key2:
+ assert.Len(t, f.Interfaces, 3)
+ assert.Equal(t, "bar", f.Interfaces[0].Interface)
+ assert.Equal(t, "eth0", f.Interfaces[1].Interface)
+ assert.Equal(t, "unknown", f.Interfaces[2].Interface)
default:
- assert.Equal(t, "bar", f.Interface)
+ assert.Failf(t, "unexpected key", "key: %v", f.ID)
}
}
}
-func testAgent(t *testing.T, cfg *Config) *test.ExporterFake {
+func testAgent(t *testing.T, flows map[ebpf.BpfFlowId]model.BpfFlowContent) []*model.Record {
ebpfTracer := test.NewTracerFake()
export := test.NewExporterFake()
- agent, err := flowsAgent(cfg,
+ agent, err := flowsAgent(
+ &Config{
+ CacheActiveTimeout: 10 * time.Millisecond,
+ CacheMaxFlows: 100,
+ },
metrics.NewMetrics(&metrics.Settings{}),
test.SliceInformerFake{
+ {Name: "eth0", Index: 1},
{Name: "foo", Index: 3},
{Name: "bar", Index: 4},
}, ebpfTracer, export.Export,
@@ -220,25 +127,6 @@ func testAgent(t *testing.T, cfg *Config) *test.ExporterFake {
require.Equal(t, StatusStarted, agent.status)
})
- now := uint64(monotime.Now())
- key1Metrics := model.BpfFlowContents{
- {
- BpfFlowMetrics: &ebpf.BpfFlowMetrics{Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000},
- },
- {
- BpfFlowMetrics: &ebpf.BpfFlowMetrics{Packets: 1, Bytes: 22, StartMonoTimeTs: now, EndMonoTimeTs: now + 3000},
- },
- }
- key2Metrics := model.BpfFlowContents{
- {
- BpfFlowMetrics: &ebpf.BpfFlowMetrics{Packets: 7, Bytes: 33, StartMonoTimeTs: now, EndMonoTimeTs: now + 2_000_000_000},
- },
- }
- acc := key1Metrics.Accumulate()
- ebpfTracer.AppendLookupResults(map[ebpf.BpfFlowId]model.BpfFlowContent{
- key1: acc,
- key1Dupe: acc,
- key2: key2Metrics.Accumulate(),
- })
- return export
+ ebpfTracer.AppendLookupResults(flows)
+ return export.Get(t, timeout)
}
diff --git a/pkg/agent/config.go b/pkg/agent/config.go
index 9968b01a5..1eebf5b6a 100644
--- a/pkg/agent/config.go
+++ b/pkg/agent/config.go
@@ -11,8 +11,6 @@ var clog = logrus.WithField("component", "config")
const (
ListenPoll = "poll"
ListenWatch = "watch"
- DeduperNone = "none"
- DeduperFirstCome = "firstCome"
DirectionIngress = "ingress"
DirectionEgress = "egress"
DirectionBoth = "both"
@@ -131,21 +129,6 @@ type Config struct {
// CacheActiveTimeout specifies the maximum duration that flows are kept in the accounting
// cache before being flushed for its later export
CacheActiveTimeout time.Duration `env:"CACHE_ACTIVE_TIMEOUT" envDefault:"5s"`
- // Deduper specifies the deduper type. Accepted values are "none" (disabled) and "firstCome".
- // When enabled, it will detect duplicate flows (flows that have been detected e.g. through
- // both the physical and a virtual interface).
- // "firstCome" will forward only flows from the first interface the flows are received from.
- Deduper string `env:"DEDUPER" envDefault:"none"`
- // DeduperFCExpiry specifies the expiry duration of the flows "firstCome" deduplicator. After
- // a flow hasn't been received for that expiry time, the deduplicator forgets it. That means
- // that a flow from a connection that has been inactive during that period could be forwarded
- // again from a different interface.
- // If the value is not set, it will default to 2 * CacheActiveTimeout
- DeduperFCExpiry time.Duration `env:"DEDUPER_FC_EXPIRY"`
- // DeduperJustMark will just mark duplicates (boolean field) instead of dropping them.
- DeduperJustMark bool `env:"DEDUPER_JUST_MARK" envDefault:"false"`
- // DeduperMerge will merge duplicated flows and generate list of interfaces and direction pairs
- DeduperMerge bool `env:"DEDUPER_MERGE" envDefault:"true"`
// Direction allows selecting which flows to trace according to its direction. Accepted values
// are "ingress", "egress" or "both" (default).
Direction string `env:"DIRECTION" envDefault:"both"`
diff --git a/pkg/agent/packets_agent.go b/pkg/agent/packets_agent.go
index 5c76bc9ed..3b714d150 100644
--- a/pkg/agent/packets_agent.go
+++ b/pkg/agent/packets_agent.go
@@ -34,7 +34,7 @@ type Packets struct {
exporter node.TerminalFunc[[]*model.PacketRecord]
// elements used to decorate flows with extra information
- interfaceNamer flow.InterfaceNamer
+ interfaceNamer model.InterfaceNamer
agentIP net.IP
status Status
diff --git a/pkg/decode/decode_protobuf.go b/pkg/decode/decode_protobuf.go
index f98e6b340..7f9fe6170 100644
--- a/pkg/decode/decode_protobuf.go
+++ b/pkg/decode/decode_protobuf.go
@@ -67,9 +67,14 @@ func RecordToMap(fr *model.Record) config.GenericMap {
"AgentIP": fr.AgentIP.String(),
}
- if fr.Duplicate {
- out["Duplicate"] = true
+ var directions []uint8
+ var interfaces []string
+ for _, intf := range fr.Interfaces {
+ directions = append(directions, intf.Direction)
+ interfaces = append(interfaces, intf.Interface)
}
+ out["IfDirections"] = directions
+ out["Interfaces"] = interfaces
if fr.Metrics.Bytes != 0 {
out["Bytes"] = fr.Metrics.Bytes
@@ -82,22 +87,6 @@ func RecordToMap(fr *model.Record) config.GenericMap {
if fr.Metrics.Sampling != 0 {
out["Sampling"] = fr.Metrics.Sampling
}
- var interfaces []string
- var directions []int
- if len(fr.DupList) != 0 {
- for _, m := range fr.DupList {
- for key, value := range m {
- interfaces = append(interfaces, key)
- directions = append(directions, int(model.Direction(value)))
- }
- }
- } else {
- interfaces = append(interfaces, fr.Interface)
- directions = append(directions, int(fr.ID.Direction))
- }
- out["Interfaces"] = interfaces
- out["IfDirections"] = directions
-
if fr.Metrics.EthProtocol == uint16(ethernet.EtherTypeIPv4) || fr.Metrics.EthProtocol == uint16(ethernet.EtherTypeIPv6) {
out["SrcAddr"] = model.IP(fr.ID.SrcIp).String()
out["DstAddr"] = model.IP(fr.ID.DstIp).String()
diff --git a/pkg/decode/decode_protobuf_test.go b/pkg/decode/decode_protobuf_test.go
index 50c1ef3bd..061cc1523 100644
--- a/pkg/decode/decode_protobuf_test.go
+++ b/pkg/decode/decode_protobuf_test.go
@@ -18,7 +18,6 @@ func TestPBFlowToMap(t *testing.T) {
someTime := time.Now()
var someDuration time.Duration = 10000000 // 10ms
flow := &pbflow.Record{
- Interface: "eth0",
DupList: []*pbflow.DupMapEntry{
{
Interface: "5e6e92caa1d51cf",
@@ -32,10 +31,8 @@ func TestPBFlowToMap(t *testing.T) {
EthProtocol: 2048,
Bytes: 456,
Packets: 123,
- Direction: pbflow.Direction_EGRESS,
TimeFlowStart: timestamppb.New(someTime),
TimeFlowEnd: timestamppb.New(someTime),
- Duplicate: true,
Network: &pbflow.Network{
SrcAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x01020304},
@@ -105,7 +102,7 @@ func TestPBFlowToMap(t *testing.T) {
assert.NotZero(t, out["TimeReceived"])
delete(out, "TimeReceived")
assert.Equal(t, config.GenericMap{
- "IfDirections": []int{0, 1},
+ "IfDirections": []uint8{0, 1},
"Bytes": uint64(456),
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
@@ -114,7 +111,6 @@ func TestPBFlowToMap(t *testing.T) {
"SrcMac": "01:02:03:04:05:06",
"SrcPort": uint16(23000),
"DstPort": uint16(443),
- "Duplicate": true,
"Etype": uint16(2048),
"Packets": uint32(123),
"Proto": uint8(6),
@@ -156,5 +152,4 @@ func TestPBFlowToMap(t *testing.T) {
"ZoneId": uint16(100),
"XlatIcmpId": uint8(0),
}, out)
-
}
diff --git a/pkg/ebpf/bpf_arm64_bpfel.go b/pkg/ebpf/bpf_arm64_bpfel.go
index 44dc2de78..968a8e117 100644
--- a/pkg/ebpf/bpf_arm64_bpfel.go
+++ b/pkg/ebpf/bpf_arm64_bpfel.go
@@ -18,11 +18,12 @@ type BpfAdditionalMetrics struct {
DnsRecord BpfDnsRecordT
PktDrops BpfPktDropsT
FlowRtt uint64
- NetworkEventsIdx uint8
NetworkEvents [4][8]uint8
- _ [1]byte
- EthProtocol uint16
TranslatedFlow BpfTranslatedFlowT
+ ObservedIntf [4]BpfObservedIntfT
+ EthProtocol uint16
+ NetworkEventsIdx uint8
+ NbObservedIntf uint8
_ [4]byte
}
@@ -45,12 +46,11 @@ type BpfDnsFlowId struct {
}
type BpfDnsRecordT struct {
+ Latency uint64
Id uint16
Flags uint16
- _ [4]byte
- Latency uint64
Errno uint8
- _ [7]byte
+ _ [3]byte
}
type BpfFilterActionT uint32
@@ -93,35 +93,34 @@ type BpfFilterValueT struct {
type BpfFlowId BpfFlowIdT
type BpfFlowIdT struct {
- Direction uint8
SrcIp [16]uint8
DstIp [16]uint8
- _ [1]byte
SrcPort uint16
DstPort uint16
TransportProtocol uint8
IcmpType uint8
IcmpCode uint8
- _ [3]byte
- IfIndex uint32
+ _ [1]byte
}
type BpfFlowMetrics BpfFlowMetricsT
type BpfFlowMetricsT struct {
- Lock struct{ Val uint32 }
- EthProtocol uint16
- SrcMac [6]uint8
- DstMac [6]uint8
- _ [2]byte
- Packets uint32
- Bytes uint64
- StartMonoTimeTs uint64
- EndMonoTimeTs uint64
- Flags uint16
- Errno uint8
- Dscp uint8
- Sampling uint32
+ StartMonoTimeTs uint64
+ EndMonoTimeTs uint64
+ Bytes uint64
+ Packets uint32
+ EthProtocol uint16
+ Flags uint16
+ SrcMac [6]uint8
+ DstMac [6]uint8
+ IfIndexFirstSeen uint32
+ Lock struct{ Val uint32 }
+ Sampling uint32
+ DirectionFirstSeen uint8
+ Errno uint8
+ Dscp uint8
+ _ [5]byte
}
type BpfFlowRecordT struct {
@@ -141,17 +140,23 @@ const (
BpfGlobalCountersKeyTNETWORK_EVENTS_ERR_GROUPID_MISMATCH BpfGlobalCountersKeyT = 6
BpfGlobalCountersKeyTNETWORK_EVENTS_ERR_UPDATE_MAP_FLOWS BpfGlobalCountersKeyT = 7
BpfGlobalCountersKeyTNETWORK_EVENTS_GOOD BpfGlobalCountersKeyT = 8
- BpfGlobalCountersKeyTMAX_COUNTERS BpfGlobalCountersKeyT = 9
+ BpfGlobalCountersKeyTOBSERVED_INTF_MISSED BpfGlobalCountersKeyT = 9
+ BpfGlobalCountersKeyTMAX_COUNTERS BpfGlobalCountersKeyT = 10
)
+type BpfObservedIntfT struct {
+ Direction uint8
+ _ [3]byte
+ IfIndex uint32
+}
+
type BpfPktDropsT struct {
- Packets uint32
- _ [4]byte
Bytes uint64
+ Packets uint32
+ LatestDropCause uint32
LatestFlags uint16
LatestState uint8
- _ [1]byte
- LatestDropCause uint32
+ _ [5]byte
}
type BpfTcpFlagsT uint32
diff --git a/pkg/ebpf/bpf_arm64_bpfel.o b/pkg/ebpf/bpf_arm64_bpfel.o
index f11c52f93..025691ca1 100644
Binary files a/pkg/ebpf/bpf_arm64_bpfel.o and b/pkg/ebpf/bpf_arm64_bpfel.o differ
diff --git a/pkg/ebpf/bpf_powerpc_bpfel.go b/pkg/ebpf/bpf_powerpc_bpfel.go
index b6381dd1c..99bad4e85 100644
--- a/pkg/ebpf/bpf_powerpc_bpfel.go
+++ b/pkg/ebpf/bpf_powerpc_bpfel.go
@@ -18,11 +18,12 @@ type BpfAdditionalMetrics struct {
DnsRecord BpfDnsRecordT
PktDrops BpfPktDropsT
FlowRtt uint64
- NetworkEventsIdx uint8
NetworkEvents [4][8]uint8
- _ [1]byte
- EthProtocol uint16
TranslatedFlow BpfTranslatedFlowT
+ ObservedIntf [4]BpfObservedIntfT
+ EthProtocol uint16
+ NetworkEventsIdx uint8
+ NbObservedIntf uint8
_ [4]byte
}
@@ -45,12 +46,11 @@ type BpfDnsFlowId struct {
}
type BpfDnsRecordT struct {
+ Latency uint64
Id uint16
Flags uint16
- _ [4]byte
- Latency uint64
Errno uint8
- _ [7]byte
+ _ [3]byte
}
type BpfFilterActionT uint32
@@ -93,35 +93,34 @@ type BpfFilterValueT struct {
type BpfFlowId BpfFlowIdT
type BpfFlowIdT struct {
- Direction uint8
SrcIp [16]uint8
DstIp [16]uint8
- _ [1]byte
SrcPort uint16
DstPort uint16
TransportProtocol uint8
IcmpType uint8
IcmpCode uint8
- _ [3]byte
- IfIndex uint32
+ _ [1]byte
}
type BpfFlowMetrics BpfFlowMetricsT
type BpfFlowMetricsT struct {
- Lock struct{ Val uint32 }
- EthProtocol uint16
- SrcMac [6]uint8
- DstMac [6]uint8
- _ [2]byte
- Packets uint32
- Bytes uint64
- StartMonoTimeTs uint64
- EndMonoTimeTs uint64
- Flags uint16
- Errno uint8
- Dscp uint8
- Sampling uint32
+ StartMonoTimeTs uint64
+ EndMonoTimeTs uint64
+ Bytes uint64
+ Packets uint32
+ EthProtocol uint16
+ Flags uint16
+ SrcMac [6]uint8
+ DstMac [6]uint8
+ IfIndexFirstSeen uint32
+ Lock struct{ Val uint32 }
+ Sampling uint32
+ DirectionFirstSeen uint8
+ Errno uint8
+ Dscp uint8
+ _ [5]byte
}
type BpfFlowRecordT struct {
@@ -141,17 +140,23 @@ const (
BpfGlobalCountersKeyTNETWORK_EVENTS_ERR_GROUPID_MISMATCH BpfGlobalCountersKeyT = 6
BpfGlobalCountersKeyTNETWORK_EVENTS_ERR_UPDATE_MAP_FLOWS BpfGlobalCountersKeyT = 7
BpfGlobalCountersKeyTNETWORK_EVENTS_GOOD BpfGlobalCountersKeyT = 8
- BpfGlobalCountersKeyTMAX_COUNTERS BpfGlobalCountersKeyT = 9
+ BpfGlobalCountersKeyTOBSERVED_INTF_MISSED BpfGlobalCountersKeyT = 9
+ BpfGlobalCountersKeyTMAX_COUNTERS BpfGlobalCountersKeyT = 10
)
+type BpfObservedIntfT struct {
+ Direction uint8
+ _ [3]byte
+ IfIndex uint32
+}
+
type BpfPktDropsT struct {
- Packets uint32
- _ [4]byte
Bytes uint64
+ Packets uint32
+ LatestDropCause uint32
LatestFlags uint16
LatestState uint8
- _ [1]byte
- LatestDropCause uint32
+ _ [5]byte
}
type BpfTcpFlagsT uint32
diff --git a/pkg/ebpf/bpf_powerpc_bpfel.o b/pkg/ebpf/bpf_powerpc_bpfel.o
index 1d54f4b45..41bac3b88 100644
Binary files a/pkg/ebpf/bpf_powerpc_bpfel.o and b/pkg/ebpf/bpf_powerpc_bpfel.o differ
diff --git a/pkg/ebpf/bpf_s390_bpfeb.go b/pkg/ebpf/bpf_s390_bpfeb.go
index c13de86c1..08e9d2e22 100644
--- a/pkg/ebpf/bpf_s390_bpfeb.go
+++ b/pkg/ebpf/bpf_s390_bpfeb.go
@@ -18,11 +18,12 @@ type BpfAdditionalMetrics struct {
DnsRecord BpfDnsRecordT
PktDrops BpfPktDropsT
FlowRtt uint64
- NetworkEventsIdx uint8
NetworkEvents [4][8]uint8
- _ [1]byte
- EthProtocol uint16
TranslatedFlow BpfTranslatedFlowT
+ ObservedIntf [4]BpfObservedIntfT
+ EthProtocol uint16
+ NetworkEventsIdx uint8
+ NbObservedIntf uint8
_ [4]byte
}
@@ -45,12 +46,11 @@ type BpfDnsFlowId struct {
}
type BpfDnsRecordT struct {
+ Latency uint64
Id uint16
Flags uint16
- _ [4]byte
- Latency uint64
Errno uint8
- _ [7]byte
+ _ [3]byte
}
type BpfFilterActionT uint32
@@ -93,35 +93,34 @@ type BpfFilterValueT struct {
type BpfFlowId BpfFlowIdT
type BpfFlowIdT struct {
- Direction uint8
SrcIp [16]uint8
DstIp [16]uint8
- _ [1]byte
SrcPort uint16
DstPort uint16
TransportProtocol uint8
IcmpType uint8
IcmpCode uint8
- _ [3]byte
- IfIndex uint32
+ _ [1]byte
}
type BpfFlowMetrics BpfFlowMetricsT
type BpfFlowMetricsT struct {
- Lock struct{ Val uint32 }
- EthProtocol uint16
- SrcMac [6]uint8
- DstMac [6]uint8
- _ [2]byte
- Packets uint32
- Bytes uint64
- StartMonoTimeTs uint64
- EndMonoTimeTs uint64
- Flags uint16
- Errno uint8
- Dscp uint8
- Sampling uint32
+ StartMonoTimeTs uint64
+ EndMonoTimeTs uint64
+ Bytes uint64
+ Packets uint32
+ EthProtocol uint16
+ Flags uint16
+ SrcMac [6]uint8
+ DstMac [6]uint8
+ IfIndexFirstSeen uint32
+ Lock struct{ Val uint32 }
+ Sampling uint32
+ DirectionFirstSeen uint8
+ Errno uint8
+ Dscp uint8
+ _ [5]byte
}
type BpfFlowRecordT struct {
@@ -141,17 +140,23 @@ const (
BpfGlobalCountersKeyTNETWORK_EVENTS_ERR_GROUPID_MISMATCH BpfGlobalCountersKeyT = 6
BpfGlobalCountersKeyTNETWORK_EVENTS_ERR_UPDATE_MAP_FLOWS BpfGlobalCountersKeyT = 7
BpfGlobalCountersKeyTNETWORK_EVENTS_GOOD BpfGlobalCountersKeyT = 8
- BpfGlobalCountersKeyTMAX_COUNTERS BpfGlobalCountersKeyT = 9
+ BpfGlobalCountersKeyTOBSERVED_INTF_MISSED BpfGlobalCountersKeyT = 9
+ BpfGlobalCountersKeyTMAX_COUNTERS BpfGlobalCountersKeyT = 10
)
+type BpfObservedIntfT struct {
+ Direction uint8
+ _ [3]byte
+ IfIndex uint32
+}
+
type BpfPktDropsT struct {
- Packets uint32
- _ [4]byte
Bytes uint64
+ Packets uint32
+ LatestDropCause uint32
LatestFlags uint16
LatestState uint8
- _ [1]byte
- LatestDropCause uint32
+ _ [5]byte
}
type BpfTcpFlagsT uint32
diff --git a/pkg/ebpf/bpf_s390_bpfeb.o b/pkg/ebpf/bpf_s390_bpfeb.o
index 1c4e40035..860a0ff52 100644
Binary files a/pkg/ebpf/bpf_s390_bpfeb.o and b/pkg/ebpf/bpf_s390_bpfeb.o differ
diff --git a/pkg/ebpf/bpf_x86_bpfel.go b/pkg/ebpf/bpf_x86_bpfel.go
index c81facb65..b8f401958 100644
--- a/pkg/ebpf/bpf_x86_bpfel.go
+++ b/pkg/ebpf/bpf_x86_bpfel.go
@@ -18,11 +18,12 @@ type BpfAdditionalMetrics struct {
DnsRecord BpfDnsRecordT
PktDrops BpfPktDropsT
FlowRtt uint64
- NetworkEventsIdx uint8
NetworkEvents [4][8]uint8
- _ [1]byte
- EthProtocol uint16
TranslatedFlow BpfTranslatedFlowT
+ ObservedIntf [4]BpfObservedIntfT
+ EthProtocol uint16
+ NetworkEventsIdx uint8
+ NbObservedIntf uint8
_ [4]byte
}
@@ -45,12 +46,11 @@ type BpfDnsFlowId struct {
}
type BpfDnsRecordT struct {
+ Latency uint64
Id uint16
Flags uint16
- _ [4]byte
- Latency uint64
Errno uint8
- _ [7]byte
+ _ [3]byte
}
type BpfFilterActionT uint32
@@ -93,35 +93,34 @@ type BpfFilterValueT struct {
type BpfFlowId BpfFlowIdT
type BpfFlowIdT struct {
- Direction uint8
SrcIp [16]uint8
DstIp [16]uint8
- _ [1]byte
SrcPort uint16
DstPort uint16
TransportProtocol uint8
IcmpType uint8
IcmpCode uint8
- _ [3]byte
- IfIndex uint32
+ _ [1]byte
}
type BpfFlowMetrics BpfFlowMetricsT
type BpfFlowMetricsT struct {
- Lock struct{ Val uint32 }
- EthProtocol uint16
- SrcMac [6]uint8
- DstMac [6]uint8
- _ [2]byte
- Packets uint32
- Bytes uint64
- StartMonoTimeTs uint64
- EndMonoTimeTs uint64
- Flags uint16
- Errno uint8
- Dscp uint8
- Sampling uint32
+ StartMonoTimeTs uint64
+ EndMonoTimeTs uint64
+ Bytes uint64
+ Packets uint32
+ EthProtocol uint16
+ Flags uint16
+ SrcMac [6]uint8
+ DstMac [6]uint8
+ IfIndexFirstSeen uint32
+ Lock struct{ Val uint32 }
+ Sampling uint32
+ DirectionFirstSeen uint8
+ Errno uint8
+ Dscp uint8
+ _ [5]byte
}
type BpfFlowRecordT struct {
@@ -141,17 +140,23 @@ const (
BpfGlobalCountersKeyTNETWORK_EVENTS_ERR_GROUPID_MISMATCH BpfGlobalCountersKeyT = 6
BpfGlobalCountersKeyTNETWORK_EVENTS_ERR_UPDATE_MAP_FLOWS BpfGlobalCountersKeyT = 7
BpfGlobalCountersKeyTNETWORK_EVENTS_GOOD BpfGlobalCountersKeyT = 8
- BpfGlobalCountersKeyTMAX_COUNTERS BpfGlobalCountersKeyT = 9
+ BpfGlobalCountersKeyTOBSERVED_INTF_MISSED BpfGlobalCountersKeyT = 9
+ BpfGlobalCountersKeyTMAX_COUNTERS BpfGlobalCountersKeyT = 10
)
+type BpfObservedIntfT struct {
+ Direction uint8
+ _ [3]byte
+ IfIndex uint32
+}
+
type BpfPktDropsT struct {
- Packets uint32
- _ [4]byte
Bytes uint64
+ Packets uint32
+ LatestDropCause uint32
LatestFlags uint16
LatestState uint8
- _ [1]byte
- LatestDropCause uint32
+ _ [5]byte
}
type BpfTcpFlagsT uint32
diff --git a/pkg/ebpf/bpf_x86_bpfel.o b/pkg/ebpf/bpf_x86_bpfel.o
index 6c80da7da..2faa53c1a 100644
Binary files a/pkg/ebpf/bpf_x86_bpfel.o and b/pkg/ebpf/bpf_x86_bpfel.o differ
diff --git a/pkg/ebpf/gen.go b/pkg/ebpf/gen.go
index 3ec45b49c..a95d9ba48 100644
--- a/pkg/ebpf/gen.go
+++ b/pkg/ebpf/gen.go
@@ -1,4 +1,4 @@
package ebpf
// $BPF_CLANG and $BPF_CFLAGS are set by the Makefile.
-//go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS -target amd64,arm64,ppc64le,s390x -type flow_metrics_t -type flow_id_t -type flow_record_t -type pkt_drops_t -type dns_record_t -type global_counters_key_t -type direction_t -type filter_action_t -type tcp_flags_t -type translated_flow_t Bpf ../../bpf/flows.c -- -I../../bpf/headers
+//go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS -target amd64,arm64,ppc64le,s390x -type flow_metrics_t -type flow_id_t -type flow_record_t -type pkt_drops_t -type dns_record_t -type global_counters_key_t -type direction_t -type filter_action_t -type tcp_flags_t -type translated_flow_t -type observed_intf_t Bpf ../../bpf/flows.c -- -I../../bpf/headers
diff --git a/pkg/exporter/converters_test.go b/pkg/exporter/converters_test.go
index 13cafb949..1fad579b0 100644
--- a/pkg/exporter/converters_test.go
+++ b/pkg/exporter/converters_test.go
@@ -32,7 +32,6 @@ func TestConversions(t *testing.T) {
name: "TCP record",
flow: &model.Record{
ID: ebpf.BpfFlowId{
- Direction: model.DirectionEgress,
SrcIp: model.IPAddr{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x06, 0x07, 0x08, 0x09},
DstIp: model.IPAddr{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x0a, 0x0b, 0x0c, 0x0d},
SrcPort: 23000,
@@ -56,7 +55,7 @@ func TestConversions(t *testing.T) {
},
},
},
- Interface: "eth0",
+ Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)},
TimeFlowStart: someTime,
TimeFlowEnd: someTime,
AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d),
@@ -86,7 +85,6 @@ func TestConversions(t *testing.T) {
name: "UDP record",
flow: &model.Record{
ID: ebpf.BpfFlowId{
- Direction: model.DirectionEgress,
SrcIp: model.IPAddr{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x06, 0x07, 0x08, 0x09},
DstIp: model.IPAddr{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x0a, 0x0b, 0x0c, 0x0d},
SrcPort: 23000,
@@ -104,7 +102,7 @@ func TestConversions(t *testing.T) {
Sampling: 2,
},
},
- Interface: "eth0",
+ Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)},
TimeFlowStart: someTime,
TimeFlowEnd: someTime,
AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d),
@@ -133,7 +131,6 @@ func TestConversions(t *testing.T) {
name: "ICMPv4 record",
flow: &model.Record{
ID: ebpf.BpfFlowId{
- Direction: model.DirectionEgress,
SrcIp: model.IPAddr{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x06, 0x07, 0x08, 0x09},
DstIp: model.IPAddr{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x0a, 0x0b, 0x0c, 0x0d},
TransportProtocol: 1,
@@ -150,7 +147,7 @@ func TestConversions(t *testing.T) {
Dscp: 64,
},
},
- Interface: "eth0",
+ Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)},
TimeFlowStart: someTime,
TimeFlowEnd: someTime,
AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d),
@@ -178,7 +175,6 @@ func TestConversions(t *testing.T) {
name: "ICMPv6 record",
flow: &model.Record{
ID: ebpf.BpfFlowId{
- Direction: model.DirectionEgress,
SrcIp: model.IPAddr{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
DstIp: model.IPAddr{11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26},
TransportProtocol: 58,
@@ -195,7 +191,7 @@ func TestConversions(t *testing.T) {
Dscp: 64,
},
},
- Interface: "eth0",
+ Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)},
TimeFlowStart: someTime,
TimeFlowEnd: someTime,
AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d),
@@ -223,7 +219,6 @@ func TestConversions(t *testing.T) {
name: "ARP layer2",
flow: &model.Record{
ID: ebpf.BpfFlowId{
- Direction: model.DirectionEgress,
SrcIp: model.IPAddr{},
DstIp: model.IPAddr{},
SrcPort: 0,
@@ -241,7 +236,7 @@ func TestConversions(t *testing.T) {
Packets: 128,
},
},
- Interface: "eth0",
+ Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)},
TimeFlowStart: someTime,
TimeFlowEnd: someTime,
AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d),
@@ -263,7 +258,6 @@ func TestConversions(t *testing.T) {
name: "L2 drops",
flow: &model.Record{
ID: ebpf.BpfFlowId{
- Direction: model.DirectionEgress,
SrcIp: model.IPAddr{},
DstIp: model.IPAddr{},
SrcPort: 0,
@@ -288,7 +282,7 @@ func TestConversions(t *testing.T) {
},
},
},
- Interface: "eth0",
+ Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)},
TimeFlowStart: someTime,
TimeFlowEnd: someTime,
AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d),
@@ -315,7 +309,6 @@ func TestConversions(t *testing.T) {
name: "TCP + drop + DNS + RTT record",
flow: &model.Record{
ID: ebpf.BpfFlowId{
- Direction: model.DirectionEgress,
SrcIp: model.IPAddr{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x06, 0x07, 0x08, 0x09},
DstIp: model.IPAddr{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x0a, 0x0b, 0x0c, 0x0d},
SrcPort: 23000,
@@ -348,7 +341,7 @@ func TestConversions(t *testing.T) {
},
},
},
- Interface: "eth0",
+ Interfaces: []model.IntfDir{model.NewIntfDir("eth0", model.DirectionEgress)},
TimeFlowStart: someTime,
TimeFlowEnd: someTime,
AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d),
@@ -389,7 +382,6 @@ func TestConversions(t *testing.T) {
name: "Multiple interfaces record",
flow: &model.Record{
ID: ebpf.BpfFlowId{
- Direction: model.DirectionEgress,
SrcIp: model.IPAddr{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x06, 0x07, 0x08, 0x09},
DstIp: model.IPAddr{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x0a, 0x0b, 0x0c, 0x0d},
SrcPort: 23000,
@@ -412,9 +404,9 @@ func TestConversions(t *testing.T) {
},
},
},
- DupList: []map[string]uint8{
- {"5e6e92caa1d51cf": 0},
- {"eth0": 1},
+ Interfaces: []model.IntfDir{
+ model.NewIntfDir("5e6e92caa1d51cf", model.DirectionIngress),
+ model.NewIntfDir("eth0", model.DirectionEgress),
},
TimeFlowStart: someTime,
TimeFlowEnd: someTime,
diff --git a/pkg/exporter/grpc_proto_test.go b/pkg/exporter/grpc_proto_test.go
index ca822abd6..39f0fe42e 100644
--- a/pkg/exporter/grpc_proto_test.go
+++ b/pkg/exporter/grpc_proto_test.go
@@ -123,7 +123,7 @@ func TestGRPCProto_SplitLargeMessages(t *testing.T) {
for i := 0; i < 25000; i++ {
input = append(input, &model.Record{Metrics: model.BpfFlowContent{BpfFlowMetrics: &ebpf.BpfFlowMetrics{
EthProtocol: model.IPv6Type,
- }}, AgentIP: net.ParseIP("1111::1111"), Interface: "12345678"})
+ }}, AgentIP: net.ParseIP("1111::1111"), Interfaces: []model.IntfDir{model.NewIntfDir("12345678", 0)}})
}
flows <- input
go exporter.ExportFlows(flows)
diff --git a/pkg/exporter/ipfix.go b/pkg/exporter/ipfix.go
index 379401ccf..60deb2f79 100644
--- a/pkg/exporter/ipfix.go
+++ b/pkg/exporter/ipfix.go
@@ -287,7 +287,7 @@ func setIERecordValue(record *model.Record, ieValPtr *entities.InfoElementWithVa
case "packetDeltaCount":
ieVal.SetUnsigned64Value(uint64(record.Metrics.Packets))
case "interfaceName":
- ieVal.SetStringValue(record.Interface)
+ ieVal.SetStringValue(record.Interfaces[0].Interface)
}
}
func setIEValue(record *model.Record, ieValPtr *entities.InfoElementWithValue) {
@@ -296,7 +296,7 @@ func setIEValue(record *model.Record, ieValPtr *entities.InfoElementWithValue) {
case "ethernetType":
ieVal.SetUnsigned16Value(record.Metrics.EthProtocol)
case "flowDirection":
- ieVal.SetUnsigned8Value(record.ID.Direction)
+ ieVal.SetUnsigned8Value(record.Interfaces[0].Direction)
case "sourceMacAddress":
ieVal.SetMacAddressValue(record.Metrics.SrcMac[:])
case "destinationMacAddress":
diff --git a/pkg/exporter/kafka_proto_test.go b/pkg/exporter/kafka_proto_test.go
index 846d9fbc4..9a0f38997 100644
--- a/pkg/exporter/kafka_proto_test.go
+++ b/pkg/exporter/kafka_proto_test.go
@@ -33,7 +33,6 @@ func TestProtoConversion(t *testing.T) {
TimeFlowStart: time.Now().Add(-5 * time.Second),
TimeFlowEnd: time.Now(),
ID: ebpf.BpfFlowId{
- Direction: 1,
SrcIp: model.IPAddrFromNetIP(net.ParseIP("192.1.2.3")),
DstIp: model.IPAddrFromNetIP(net.ParseIP("127.3.2.1")),
SrcPort: 4321,
@@ -43,15 +42,16 @@ func TestProtoConversion(t *testing.T) {
},
Metrics: model.BpfFlowContent{
BpfFlowMetrics: &ebpf.BpfFlowMetrics{
- EthProtocol: 3,
- SrcMac: [...]byte{0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff},
- DstMac: [...]byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66},
- Bytes: 789,
- Packets: 987,
- Flags: uint16(1),
+ DirectionFirstSeen: 1,
+ EthProtocol: 3,
+ SrcMac: [...]byte{0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff},
+ DstMac: [...]byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66},
+ Bytes: 789,
+ Packets: 987,
+ Flags: uint16(1),
},
},
- Interface: "veth0",
+ Interfaces: []model.IntfDir{model.NewIntfDir("veth0", 0), model.NewIntfDir("abcde", 1)},
}
input <- []*model.Record{&record}
@@ -62,10 +62,12 @@ func TestProtoConversion(t *testing.T) {
var r pbflow.Record
require.NoError(t, proto.Unmarshal(wc.messages[0].Value, &r))
assert.EqualValues(t, 3, r.EthProtocol)
- for _, e := range r.DupList {
- assert.EqualValues(t, 1, e.Direction)
- assert.Equal(t, "veth0", e.Interface)
- }
+ assert.EqualValues(t, 1, r.Direction)
+ assert.Len(t, r.DupList, 2)
+ assert.EqualValues(t, 0, r.DupList[0].Direction)
+ assert.Equal(t, "veth0", r.DupList[0].Interface)
+ assert.EqualValues(t, 1, r.DupList[1].Direction)
+ assert.Equal(t, "abcde", r.DupList[1].Interface)
assert.EqualValues(t, uint64(0xaabbccddeeff), r.DataLink.SrcMac)
assert.EqualValues(t, uint64(0x112233445566), r.DataLink.DstMac)
assert.EqualValues(t, uint64(0xC0010203) /* 192.1.2.3 */, r.Network.SrcAddr.GetIpv4())
@@ -88,7 +90,6 @@ func TestIdenticalKeys(t *testing.T) {
TimeFlowStart: time.Now().Add(-5 * time.Second),
TimeFlowEnd: time.Now(),
ID: ebpf.BpfFlowId{
- Direction: 1,
SrcIp: model.IPAddrFromNetIP(net.ParseIP("192.1.2.3")),
DstIp: model.IPAddrFromNetIP(net.ParseIP("127.3.2.1")),
SrcPort: 4321,
@@ -98,15 +99,16 @@ func TestIdenticalKeys(t *testing.T) {
},
Metrics: model.BpfFlowContent{
BpfFlowMetrics: &ebpf.BpfFlowMetrics{
- EthProtocol: 3,
- SrcMac: [...]byte{0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff},
- DstMac: [...]byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66},
- Bytes: 789,
- Packets: 987,
- Flags: uint16(1),
+ DirectionFirstSeen: 1,
+ EthProtocol: 3,
+ SrcMac: [...]byte{0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff},
+ DstMac: [...]byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66},
+ Bytes: 789,
+ Packets: 987,
+ Flags: uint16(1),
},
},
- Interface: "veth0",
+ Interfaces: []model.IntfDir{model.NewIntfDir("veth0", 0), model.NewIntfDir("abcde", 1)},
}
key1 := getFlowKey(&record)
diff --git a/pkg/flow/account.go b/pkg/flow/account.go
index f850f960d..9431b3b1b 100644
--- a/pkg/flow/account.go
+++ b/pkg/flow/account.go
@@ -96,7 +96,8 @@ func (c *Accounter) evict(entries map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics, evict
monotonicNow := uint64(c.monoClock())
records := make([]*model.Record, 0, len(entries))
for key, metrics := range entries {
- records = append(records, model.NewRecord(key, &model.BpfFlowContent{BpfFlowMetrics: metrics}, now, monotonicNow))
+ flowContent := model.NewBpfFlowContent(*metrics)
+ records = append(records, model.NewRecord(key, &flowContent, now, monotonicNow))
}
c.metrics.EvictionCounter.WithSourceAndReason("accounter", reason).Inc()
c.metrics.EvictedFlowsCounter.WithSourceAndReason("accounter", reason).Add(float64(len(records)))
diff --git a/pkg/flow/account_test.go b/pkg/flow/account_test.go
index d91afad10..6bb9cec67 100644
--- a/pkg/flow/account_test.go
+++ b/pkg/flow/account_test.go
@@ -112,8 +112,8 @@ func TestEvict_MaxEntries(t *testing.T) {
},
TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond),
TimeFlowEnd: now.Add(-(1000 - 789) * time.Nanosecond),
- DupList: make([]map[string]uint8, 0),
NetworkMonitorEventsMD: make([]config.GenericMap, 0),
+ Interfaces: []model.IntfDir{model.NewIntfDir("[namer unset] 0", 0)},
},
k2: {
ID: k2,
@@ -124,8 +124,8 @@ func TestEvict_MaxEntries(t *testing.T) {
},
TimeFlowStart: now.Add(-(1000 - 456) * time.Nanosecond),
TimeFlowEnd: now.Add(-(1000 - 456) * time.Nanosecond),
- DupList: make([]map[string]uint8, 0),
NetworkMonitorEventsMD: make([]config.GenericMap, 0),
+ Interfaces: []model.IntfDir{model.NewIntfDir("[namer unset] 0", 0)},
},
}, received)
}
@@ -194,8 +194,8 @@ func TestEvict_Period(t *testing.T) {
},
TimeFlowStart: now.Add(-1000 + 123),
TimeFlowEnd: now.Add(-1000 + 789),
- DupList: make([]map[string]uint8, 0),
NetworkMonitorEventsMD: make([]config.GenericMap, 0),
+ Interfaces: []model.IntfDir{model.NewIntfDir("[namer unset] 0", 0)},
}, *records[0])
records = receiveTimeout(t, evictor)
require.Len(t, records, 1)
@@ -212,8 +212,8 @@ func TestEvict_Period(t *testing.T) {
},
TimeFlowStart: now.Add(-1000 + 1123),
TimeFlowEnd: now.Add(-1000 + 1456),
- DupList: make([]map[string]uint8, 0),
NetworkMonitorEventsMD: make([]config.GenericMap, 0),
+ Interfaces: []model.IntfDir{model.NewIntfDir("[namer unset] 0", 0)},
}, *records[0])
// no more flows are evicted
diff --git a/pkg/flow/decorator.go b/pkg/flow/decorator.go
deleted file mode 100644
index 75d90bdd3..000000000
--- a/pkg/flow/decorator.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package flow
-
-import (
- "net"
-
- "github.com/netobserv/netobserv-ebpf-agent/pkg/model"
-)
-
-type InterfaceNamer func(ifIndex int) string
-
-// Decorate adds to the flows extra metadata fields that are not directly fetched by eBPF:
-// - The interface name (corresponding to the interface index in the flow).
-// - The IP address of the agent host.
-func Decorate(agentIP net.IP, ifaceNamer InterfaceNamer) func(in <-chan []*model.Record, out chan<- []*model.Record) {
- return func(in <-chan []*model.Record, out chan<- []*model.Record) {
- for flows := range in {
- for _, flow := range flows {
- flow.Interface = ifaceNamer(int(flow.ID.IfIndex))
- flow.AgentIP = agentIP
- }
- out <- flows
- }
- }
-}
diff --git a/pkg/flow/deduper.go b/pkg/flow/deduper.go
deleted file mode 100644
index 06f7b07f5..000000000
--- a/pkg/flow/deduper.go
+++ /dev/null
@@ -1,177 +0,0 @@
-package flow
-
-import (
- "container/list"
- "reflect"
- "time"
-
- "github.com/sirupsen/logrus"
-
- "github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
- "github.com/netobserv/netobserv-ebpf-agent/pkg/metrics"
- "github.com/netobserv/netobserv-ebpf-agent/pkg/model"
-)
-
-var dlog = logrus.WithField("component", "flow/Deduper")
-var timeNow = time.Now
-
-// deduperCache implement a LRU cache whose elements are evicted if they haven't been accessed
-// during the expire duration.
-// It is not safe for concurrent access.
-type deduperCache struct {
- expire time.Duration
- // key: ebpf.BpfFlowId with the interface and MACs erased, to detect duplicates
- // value: listElement pointing to a struct entry
- ifaces map[ebpf.BpfFlowId]*list.Element
- // element: entry structs of the ifaces map ordered by expiry time
- entries *list.List
-}
-
-type entry struct {
- key *ebpf.BpfFlowId
- dnsRecord *ebpf.BpfDnsRecordT
- flowRTT *uint64
- networkEvents *[model.MaxNetworkEvents][model.NetworkEventsMaxEventsMD]uint8
- ifIndex uint32
- expiryTime time.Time
- dupList *[]map[string]uint8
-}
-
-// Dedupe receives flows and filters these belonging to duplicate interfaces. It will forward
-// the flows from the first interface coming to it, until that flow expires in the cache
-// (no activity for it during the expiration time)
-// The justMark argument tells that the deduper should not drop the duplicate flows but
-// set their Duplicate field.
-func Dedupe(expireTime time.Duration, justMark, mergeDup bool, ifaceNamer InterfaceNamer, m *metrics.Metrics) func(in <-chan []*model.Record, out chan<- []*model.Record) {
- cache := &deduperCache{
- expire: expireTime,
- entries: list.New(),
- ifaces: map[ebpf.BpfFlowId]*list.Element{},
- }
- return func(in <-chan []*model.Record, out chan<- []*model.Record) {
- for records := range in {
- cache.removeExpired()
- fwd := make([]*model.Record, 0, len(records))
- for _, record := range records {
- cache.checkDupe(record, justMark, mergeDup, &fwd, ifaceNamer)
- }
- if len(fwd) > 0 {
- out <- fwd
- m.EvictionCounter.WithSource("deduper").Inc()
- m.EvictedFlowsCounter.WithSource("deduper").Add(float64(len(fwd)))
- }
- m.BufferSizeGauge.WithBufferName("deduper-list").Set(float64(cache.entries.Len()))
- m.BufferSizeGauge.WithBufferName("deduper-map").Set(float64(len(cache.ifaces)))
- }
- }
-}
-
-// checkDupe check current record if its already available nad if not added to fwd records list
-func (c *deduperCache) checkDupe(r *model.Record, justMark, mergeDup bool, fwd *[]*model.Record, ifaceNamer InterfaceNamer) {
- mergeEntry := make(map[string]uint8)
- rk := r.ID
- // zeroes fields from key that should be ignored from the flow comparison
- rk.IfIndex = 0
- rk.Direction = 0
- if r.Metrics.AdditionalMetrics == nil {
- r.Metrics.AdditionalMetrics = &ebpf.BpfAdditionalMetrics{}
- }
- // If a flow has been accounted previously, whatever its interface was,
- // it updates the expiry time for that flow
- if ele, ok := c.ifaces[rk]; ok {
- fEntry := ele.Value.(*entry)
- fEntry.expiryTime = timeNow().Add(c.expire)
- c.entries.MoveToFront(ele)
- // The input flow is duplicate if its interface is different to the interface
- // of the non-duplicate flow that was first registered in the cache
- // except if the new flow has DNS enrichment in this case will enrich the flow in the cache
- // with DNS info and mark the current flow as duplicate
- if r.Metrics.AdditionalMetrics.DnsRecord.Latency != 0 && fEntry.dnsRecord.Latency == 0 {
- // copy DNS record to the cached entry and mark it as duplicate
- fEntry.dnsRecord.Flags = r.Metrics.AdditionalMetrics.DnsRecord.Flags
- fEntry.dnsRecord.Id = r.Metrics.AdditionalMetrics.DnsRecord.Id
- fEntry.dnsRecord.Latency = r.Metrics.AdditionalMetrics.DnsRecord.Latency
- fEntry.dnsRecord.Errno = r.Metrics.AdditionalMetrics.DnsRecord.Errno
- }
- // If the new flow has flowRTT then enrich the flow in the case with the same RTT and mark it duplicate
- if r.Metrics.AdditionalMetrics.FlowRtt != 0 && *fEntry.flowRTT == 0 {
- *fEntry.flowRTT = r.Metrics.AdditionalMetrics.FlowRtt
- }
- // If the new flows have network events, then enrich the flow in the cache and mark the flow as duplicate
- for i, md := range r.Metrics.AdditionalMetrics.NetworkEvents {
- if !model.AllZerosMetaData(md) && model.AllZerosMetaData(fEntry.networkEvents[i]) {
- copy(fEntry.networkEvents[i][:], md[:])
- }
- }
- if fEntry.ifIndex != r.ID.IfIndex {
- if justMark {
- r.Duplicate = true
- *fwd = append(*fwd, r)
- }
- if mergeDup {
- ifName := ifaceNamer(int(r.ID.IfIndex))
- mergeEntry[ifName] = r.ID.Direction
- if dupEntryNew(*fEntry.dupList, mergeEntry) {
- *fEntry.dupList = append(*fEntry.dupList, mergeEntry)
- dlog.Debugf("merge list entries dump:")
- for _, entry := range *fEntry.dupList {
- for k, v := range entry {
- dlog.Debugf("interface %s dir %d", k, v)
- }
- }
- }
- }
- return
- }
- *fwd = append(*fwd, r)
- return
- }
- // The flow has not been accounted previously (or was forgotten after expiration)
- // so we register it for that concrete interface
- e := entry{
- key: &rk,
- dnsRecord: &r.Metrics.AdditionalMetrics.DnsRecord,
- flowRTT: &r.Metrics.AdditionalMetrics.FlowRtt,
- networkEvents: &r.Metrics.AdditionalMetrics.NetworkEvents,
- ifIndex: r.ID.IfIndex,
- expiryTime: timeNow().Add(c.expire),
- }
- if mergeDup {
- ifName := ifaceNamer(int(r.ID.IfIndex))
- mergeEntry[ifName] = r.ID.Direction
- r.DupList = append(r.DupList, mergeEntry)
- e.dupList = &r.DupList
- }
- c.ifaces[rk] = c.entries.PushFront(&e)
- *fwd = append(*fwd, r)
-}
-
-func dupEntryNew(dupList []map[string]uint8, mergeEntry map[string]uint8) bool {
- for _, entry := range dupList {
- if reflect.DeepEqual(entry, mergeEntry) {
- return false
- }
- }
- return true
-}
-
-func (c *deduperCache) removeExpired() {
- now := timeNow()
- ele := c.entries.Back()
- evicted := 0
- for ele != nil && now.After(ele.Value.(*entry).expiryTime) {
- evicted++
- c.entries.Remove(ele)
- fEntry := ele.Value.(*entry)
- fEntry.dupList = nil
- delete(c.ifaces, *fEntry.key)
- ele = c.entries.Back()
- }
- if evicted > 0 {
- dlog.WithFields(logrus.Fields{
- "current": c.entries.Len(),
- "evicted": evicted,
- "expiryTime": c.expire,
- }).Debug("entries evicted from the deduper cache")
- }
-}
diff --git a/pkg/flow/deduper_test.go b/pkg/flow/deduper_test.go
deleted file mode 100644
index 13c3a5695..000000000
--- a/pkg/flow/deduper_test.go
+++ /dev/null
@@ -1,345 +0,0 @@
-package flow
-
-import (
- "net"
- "testing"
- "time"
-
- "github.com/stretchr/testify/assert"
-
- "github.com/netobserv/flowlogs-pipeline/pkg/config"
- "github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
- "github.com/netobserv/netobserv-ebpf-agent/pkg/metrics"
- "github.com/netobserv/netobserv-ebpf-agent/pkg/model"
-)
-
-var (
- // the same flow from 2 different interfaces
- oneIf1 = &model.Record{
- ID: ebpf.BpfFlowId{
- Direction: 1,
- SrcPort: 123,
- DstPort: 456,
- IfIndex: 1,
- },
- Metrics: model.BpfFlowContent{
- BpfFlowMetrics: &ebpf.BpfFlowMetrics{
- EthProtocol: 1,
- SrcMac: model.MacAddr{0x1},
- DstMac: model.MacAddr{0x1},
- Packets: 2,
- Bytes: 456,
- Flags: 1,
- },
- },
- Interface: "eth0",
- }
- oneIf2 = &model.Record{
- ID: ebpf.BpfFlowId{
- Direction: 1,
- SrcPort: 123,
- DstPort: 456,
- IfIndex: 2,
- },
- Metrics: model.BpfFlowContent{
- BpfFlowMetrics: &ebpf.BpfFlowMetrics{
- EthProtocol: 1,
- SrcMac: model.MacAddr{0x2},
- DstMac: model.MacAddr{0x2},
- Packets: 2,
- Bytes: 456,
- Flags: 1,
- },
- },
- Interface: "123456789",
- }
- // another flow from 2 different interfaces and directions
- twoIf1 = &model.Record{
- ID: ebpf.BpfFlowId{
- Direction: 1,
- SrcPort: 333,
- DstPort: 456,
- IfIndex: 1,
- },
- Metrics: model.BpfFlowContent{
- BpfFlowMetrics: &ebpf.BpfFlowMetrics{
- EthProtocol: 1,
- SrcMac: model.MacAddr{0x1},
- DstMac: model.MacAddr{0x1},
- Packets: 2,
- Bytes: 456,
- Flags: 1,
- },
- },
- Interface: "eth0",
- }
- twoIf2 = &model.Record{
- ID: ebpf.BpfFlowId{
- Direction: 0,
- SrcPort: 333,
- DstPort: 456,
- IfIndex: 2,
- },
- Metrics: model.BpfFlowContent{
- BpfFlowMetrics: &ebpf.BpfFlowMetrics{
- EthProtocol: 1,
- SrcMac: model.MacAddr{0x2},
- DstMac: model.MacAddr{0x2},
- Packets: 2,
- Bytes: 456,
- Flags: 1,
- },
- },
- Interface: "123456789",
- }
- // another flow from 2 different interfaces and directions with DNS latency set on the latest
- threeIf1 = &model.Record{
- ID: ebpf.BpfFlowId{
- Direction: 1,
- SrcPort: 433,
- DstPort: 456,
- IfIndex: 1,
- },
- Metrics: model.BpfFlowContent{
- BpfFlowMetrics: &ebpf.BpfFlowMetrics{
- EthProtocol: 1,
- SrcMac: model.MacAddr{0x1},
- DstMac: model.MacAddr{0x1},
- Packets: 2,
- Bytes: 456,
- Flags: 1,
- },
- },
- Interface: "eth0",
- }
- threeIf2 = &model.Record{
- ID: ebpf.BpfFlowId{
- Direction: 0,
- SrcPort: 433,
- DstPort: 456,
- IfIndex: 2,
- },
- Metrics: model.BpfFlowContent{
- BpfFlowMetrics: &ebpf.BpfFlowMetrics{
- EthProtocol: 1,
- DstMac: model.MacAddr{0x2},
- SrcMac: model.MacAddr{0x2},
- Packets: 2,
- Bytes: 456,
- Flags: 1,
- },
- AdditionalMetrics: &ebpf.BpfAdditionalMetrics{
- DnsRecord: ebpf.BpfDnsRecordT{Id: 1, Flags: 100, Latency: 1000},
- },
- },
- Interface: "123456789",
- DNSLatency: time.Millisecond,
- }
- // another flow from 2 different interfaces and directions with RTT set on the latest
- fourIf1 = &model.Record{
- ID: ebpf.BpfFlowId{
- Direction: 1,
- SrcPort: 533,
- DstPort: 456,
- IfIndex: 1,
- },
- Metrics: model.BpfFlowContent{
- BpfFlowMetrics: &ebpf.BpfFlowMetrics{
- EthProtocol: 1,
- DstMac: model.MacAddr{0x1},
- SrcMac: model.MacAddr{0x1},
- Packets: 2,
- Bytes: 456,
- Flags: 1,
- },
- },
- Interface: "eth0",
- }
- fourIf2 = &model.Record{
- ID: ebpf.BpfFlowId{
- Direction: 0,
- SrcPort: 533,
- DstPort: 456,
- IfIndex: 2,
- },
- Metrics: model.BpfFlowContent{
- BpfFlowMetrics: &ebpf.BpfFlowMetrics{
- EthProtocol: 1,
- DstMac: model.MacAddr{0x2},
- SrcMac: model.MacAddr{0x2},
- Packets: 2,
- Bytes: 456,
- Flags: 1,
- },
- AdditionalMetrics: &ebpf.BpfAdditionalMetrics{
- FlowRtt: 100,
- },
- },
- Interface: "123456789",
- TimeFlowRtt: 100,
- }
- // another flow from 2 different interfaces and directions with NetworkEvents set on the latest
- fiveIf1 = &model.Record{
- ID: ebpf.BpfFlowId{
- Direction: 1,
- SrcPort: 633,
- DstPort: 456,
- IfIndex: 1,
- },
- Metrics: model.BpfFlowContent{
- BpfFlowMetrics: &ebpf.BpfFlowMetrics{
- EthProtocol: 1,
- DstMac: model.MacAddr{0x1},
- SrcMac: model.MacAddr{0x1},
- Packets: 2,
- Bytes: 456,
- Flags: 1,
- },
- },
- Interface: "eth0",
- }
- fiveIf2 = &model.Record{
- ID: ebpf.BpfFlowId{
- Direction: 0,
- SrcPort: 633,
- DstPort: 456,
- IfIndex: 2,
- },
- Metrics: model.BpfFlowContent{
- BpfFlowMetrics: &ebpf.BpfFlowMetrics{
- EthProtocol: 1,
- DstMac: model.MacAddr{0x2},
- SrcMac: model.MacAddr{0x2},
- Packets: 2,
- Bytes: 456,
- Flags: 1,
- },
- AdditionalMetrics: &ebpf.BpfAdditionalMetrics{
- FlowRtt: 100,
- },
- },
- Interface: "123456789",
- NetworkMonitorEventsMD: []config.GenericMap{{"Name": "test netpol1"}}}
-)
-
-func TestDedupe(t *testing.T) {
- input := make(chan []*model.Record, 100)
- output := make(chan []*model.Record, 100)
-
- go Dedupe(time.Minute, false, false, interfaceNamer, metrics.NewMetrics(&metrics.Settings{}))(input, output)
-
- input <- []*model.Record{
- oneIf2, // record 1 at interface 2: should be accepted
- twoIf1, // record 2 at interface 1: should be accepted
- oneIf1, // record 1 duplicate at interface 1: should NOT be accepted
- oneIf1, // (same record key, different interface)
- twoIf2, // record 2 duplicate at interface 2: should NOT be accepted
- oneIf2, // record 1 at interface 1: should be accepted (same record key, same interface)
- threeIf1, // record 1 has no DNS so it get enriched with DNS record from the following record
- threeIf2, // record 2 is duplicate of record1 and have DNS info , should not be accepted
- fourIf1, // record 1 has no RTT so it get enriched with RTT from the following record
- fourIf2, // record 2 is duplicate of record1 and have RTT , should not be accepted
- fiveIf1, // record 1 has no NetworkEvents so it get enriched with NetworkEvents from the following record
- fiveIf2, // record 2 is duplicate of record1 and have NetworkEvents, should not be accepted
- }
- deduped := receiveTimeout(t, output)
- assert.Equal(t, []*model.Record{oneIf2, twoIf1, oneIf2, threeIf1, fourIf1, fiveIf1}, deduped)
-
- // should still accept records with same key, same interface,
- // and discard these with same key, different interface
- input <- []*model.Record{oneIf1, oneIf2}
- deduped = receiveTimeout(t, output)
- assert.Equal(t, []*model.Record{oneIf2}, deduped)
-
- // make sure flow with no DNS get enriched with the DNS record
- assert.Equal(t, threeIf1.Metrics.AdditionalMetrics.DnsRecord.Id, threeIf2.Metrics.AdditionalMetrics.DnsRecord.Id)
- assert.Equal(t, threeIf1.Metrics.AdditionalMetrics.DnsRecord.Flags, threeIf2.Metrics.AdditionalMetrics.DnsRecord.Flags)
- assert.Equal(t, threeIf1.Metrics.AdditionalMetrics.DnsRecord.Latency, threeIf2.Metrics.AdditionalMetrics.DnsRecord.Latency)
-
- // make sure flow with no RTT get enriched from the dup flow with RTT
- assert.Equal(t, fourIf1.Metrics.AdditionalMetrics.FlowRtt, fourIf2.Metrics.AdditionalMetrics.FlowRtt)
-
- // make sure flow with no NetworkEvents gets enriched from dup flow with NetworkEvents
- assert.Equal(t, fiveIf1.Metrics.AdditionalMetrics.NetworkEvents, fiveIf2.Metrics.AdditionalMetrics.NetworkEvents)
-}
-
-func TestDedupe_EvictFlows(t *testing.T) {
- tm := &timerMock{now: time.Now()}
- timeNow = tm.Now
- input := make(chan []*model.Record, 100)
- output := make(chan []*model.Record, 100)
-
- go Dedupe(15*time.Second, false, false, interfaceNamer, metrics.NewMetrics(&metrics.Settings{}))(input, output)
-
- // Should only accept records 1 and 2, at interface 1
- input <- []*model.Record{oneIf1, twoIf1, oneIf2}
- assert.Equal(t, []*model.Record{oneIf1, twoIf1},
- receiveTimeout(t, output))
-
- tm.now = tm.now.Add(10 * time.Second)
-
- // After 10 seconds, it still filters existing flows from different interfaces
- input <- []*model.Record{oneIf2}
- time.Sleep(100 * time.Millisecond)
- requireNoEviction(t, output)
-
- tm.now = tm.now.Add(10 * time.Second)
-
- // model.Record 2 hasn't been accounted for >expiryTime, so it will accept the it again
- // whatever the interface.
- // Since record 1 was accessed 10 seconds ago (=4.20 kernels with LookupAndDelete
- for iterator.Next(&id, &metrics) {
+ for iterator.Next(&id, &baseMetrics) {
count++
if err := flowMap.Delete(id); err != nil {
log.WithError(err).WithField("flowId", id).Warnf("couldn't delete flow entry")
met.Errors.WithErrorName("flow-fetcher-legacy", "CannotDeleteFlows").Inc()
}
- // We observed that eBFP PerCPU map might insert multiple times the same key in the map
- // (probably due to race conditions) so we need to re-join metrics again at userspace
- aggr := model.BpfFlowContent{}
- for i := range metrics {
- aggr.AccumulateBase(&metrics[i])
- }
- flows[id] = aggr
+ flows[id] = model.NewBpfFlowContent(baseMetrics)
}
met.BufferSizeGauge.WithBufferName("hashmap-legacy-total").Set(float64(count))
met.BufferSizeGauge.WithBufferName("hashmap-legacy-unique").Set(float64(len(flows)))