Subject: [PATCH] support for older kernels --- Index: pkg/flow/tracer_ringbuf.go IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/pkg/flow/tracer_ringbuf.go b/pkg/flow/tracer_ringbuf.go --- a/pkg/flow/tracer_ringbuf.go (revision 434968e1d41e05dea89fa96d5652f9ed126a09a0) +++ b/pkg/flow/tracer_ringbuf.go (date 1721768181441) @@ -5,6 +5,7 @@ "context" "errors" "fmt" + "github.com/cilium/ebpf/perf" "sync/atomic" "syscall" "time" @@ -30,6 +31,7 @@ type ringBufReader interface { ReadRingBuf() (ringbuf.Record, error) + ReadPerf() (perf.Record, error) } // stats supports atomic logging of ringBuffer metrics @@ -76,7 +78,7 @@ } func (m *RingBufTracer) listenAndForwardRingBuffer(debugging bool, forwardCh chan<- *RawRecord) error { - event, err := m.ringBuffer.ReadRingBuf() + event, err := m.ringBuffer.ReadPerf() if err != nil { m.metrics.Errors.WithErrorName("ringbuffer", "CannotReadRingbuffer").Inc() return fmt.Errorf("reading from ring buffer: %w", err) Index: bpf/maps_definition.h IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/bpf/maps_definition.h b/bpf/maps_definition.h --- a/bpf/maps_definition.h (revision 434968e1d41e05dea89fa96d5652f9ed126a09a0) +++ b/bpf/maps_definition.h (date 1721814042775) @@ -3,9 +3,17 @@ #include -// Common Ringbuffer as a conduit for ingress/egress flows to userspace +// Common PerfEvent Array as a conduit for ingress/egress flows to userspace struct { - __uint(type, BPF_MAP_TYPE_RINGBUF); + __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); + __type(key, u32); + __type(value, u32); + __uint(max_entries, 1 << 18); +} direct_flows SEC(".maps"); + + +struct { + __uint(type, BPF_MAP:_TYPE_RINGBUF); __uint(max_entries, 1 << 24); } direct_flows SEC(".maps"); Index: pkg/ebpf/tracer.go IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/pkg/ebpf/tracer.go b/pkg/ebpf/tracer.go --- a/pkg/ebpf/tracer.go (revision 434968e1d41e05dea89fa96d5652f9ed126a09a0) +++ b/pkg/ebpf/tracer.go (date 1721804675852) @@ -60,6 +60,7 @@ egressFilters map[ifaces.Interface]*netlink.BpfFilter ingressFilters map[ifaces.Interface]*netlink.BpfFilter ringbufReader *ringbuf.Reader + perfReader *perf.Reader cacheMaxSize int enableIngress bool enableEgress bool @@ -169,7 +170,7 @@ var rttFentryLink, rttKprobeLink link.Link if cfg.EnableRTT { rttFentryLink, err = link.AttachTracing(link.TracingOptions{ - Program: objects.BpfPrograms.TcpRcvFentry, + Program: objects.BpfPrograms.TcpRcvKprobe, }) if err != nil { log.Warningf("failed to attach the BPF program to tcpReceiveFentry: %v fallback to use kprobe", err) @@ -182,14 +183,14 @@ } // read events from igress+egress ringbuffer - flows, err := ringbuf.NewReader(objects.DirectFlows) + flows, err := perf.NewReader(objects.DirectFlows, os.Getpagesize()) if err != nil { - return nil, fmt.Errorf("accessing to ringbuffer: %w", err) + return nil, fmt.Errorf("accessing to perf: %w", err) } return &FlowFetcher{ objects: &objects, - ringbufReader: flows, + perfReader: flows, egressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{}, @@ -488,6 +489,11 @@ errs = append(errs, err) } } + if m.perfReader != nil { + if err := m.perfReader.Close(); err != nil { + errs = append(errs, err) + } + } if m.objects != nil { if err := m.objects.TcEgressFlowParse.Close(); err != nil { errs = append(errs, err) @@ -594,6 +600,10 @@ return m.ringbufReader.Read() } +func (m *FlowFetcher) ReadPerf() (perf.Record, error) { + return m.perfReader.Read() +} + // LookupAndDeleteMap reads all the entries from the eBPF map and removes them from it. // TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively // Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md @@ -717,8 +727,8 @@ TcIngressPcaParse *ebpf.Program `ebpf:"tc_ingress_pca_parse"` TcxEgressPcaParse *ebpf.Program `ebpf:"tcx_egress_pca_parse"` TcxIngressPcaParse *ebpf.Program `ebpf:"tcx_ingress_pca_parse"` - TCPRcvFentry *ebpf.Program `ebpf:"tcp_rcv_fentry"` - TCPRcvKprobe *ebpf.Program `ebpf:"tcp_rcv_kprobe"` + //TCPRcvFentry *ebpf.Program `ebpf:"tcp_rcv_fentry"` + TCPRcvKprobe *ebpf.Program `ebpf:"tcp_rcv_kprobe"` } type NewBpfObjects struct { NewBpfPrograms @@ -752,7 +762,7 @@ objects.TcIngressPcaParse = newObjects.TcIngressPcaParse objects.TcxEgressPcaParse = newObjects.TcxEgressPcaParse objects.TcxIngressPcaParse = newObjects.TcxIngressPcaParse - objects.TcpRcvFentry = newObjects.TCPRcvFentry + //objects.TcpRcvFentry = newObjects.TCPRcvFentry objects.TcpRcvKprobe = newObjects.TCPRcvKprobe objects.KfreeSkb = nil } else { Index: pkg/agent/agent.go IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go --- a/pkg/agent/agent.go (revision 434968e1d41e05dea89fa96d5652f9ed126a09a0) +++ b/pkg/agent/agent.go (date 1721768181434) @@ -4,6 +4,7 @@ "context" "errors" "fmt" + "github.com/cilium/ebpf/perf" "io" "net" "net/http" @@ -130,6 +131,7 @@ LookupAndDeleteMap(*metrics.Metrics) map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics DeleteMapsStaleEntries(timeOut time.Duration) ReadRingBuf() (ringbuf.Record, error) + ReadPerf() (perf.Record, error) } // FlowsAgent instantiates a new agent, given a configuration. Index: bpf/flows.c IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/bpf/flows.c b/bpf/flows.c --- a/bpf/flows.c (revision 434968e1d41e05dea89fa96d5652f9ed126a09a0) +++ b/bpf/flows.c (date 1721735473400) @@ -152,17 +152,18 @@ } new_flow.errno = -ret; - flow_record *record = - (flow_record *)bpf_ringbuf_reserve(&direct_flows, sizeof(flow_record), 0); - if (!record) { + + flow_record record; + record.id = id; + record.metrics = new_flow; + ret = bpf_perf_event_output(skb, &direct_flows, BPF_F_CURRENT_CPU, &record, + sizeof(record)); + if (ret) { if (trace_messages) { - bpf_printk("couldn't reserve space in the ringbuf. Dropping flow"); + bpf_printk("failed to send flow via perf event: %d\n", ret); } return TC_ACT_OK; } - record->id = id; - record->metrics = new_flow; - bpf_ringbuf_submit(record, 0); } } return TC_ACT_OK; Index: bpf/rtt_tracker.h IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/bpf/rtt_tracker.h b/bpf/rtt_tracker.h --- a/bpf/rtt_tracker.h (revision 434968e1d41e05dea89fa96d5652f9ed126a09a0) +++ b/bpf/rtt_tracker.h (date 1721769269405) @@ -155,14 +155,6 @@ return 0; } -SEC("fentry/tcp_rcv_established") -int BPF_PROG(tcp_rcv_fentry, struct sock *sk, struct sk_buff *skb) { - if (sk == NULL || skb == NULL || do_sampling == 0) { - return 0; - } - return calculate_flow_rtt_tcp(sk, skb); -} - SEC("kprobe/tcp_rcv_established") int BPF_KPROBE(tcp_rcv_kprobe, struct sock *sk, struct sk_buff *skb) { if (sk == NULL || skb == NULL || do_sampling == 0) {