Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore performances in filtering case #496

Merged
merged 3 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bpf/configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

// Constant definitions, to be overridden by the invoker
volatile const u32 sampling = 0;
volatile const u8 has_filter_sampling = 0;
volatile const u8 trace_messages = 0;
volatile const u8 enable_rtt = 0;
volatile const u8 enable_pca = 0;
Expand Down
20 changes: 10 additions & 10 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,13 @@ static inline void add_observed_intf(additional_metrics *value, u32 if_index, u8
}

static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
u32 filter_sampling = 0;

if (!is_filter_enabled()) {
if (!has_filter_sampling) {
// When no filter sampling is defined, run the sampling check at the earliest for better performances
// If sampling is defined, will only parse 1 out of "sampling" flows
if (sampling > 1 && (bpf_get_prandom_u32() % sampling) != 0) {
do_sampling = 0;
return TC_ACT_OK;
}
filter_sampling = sampling;
do_sampling = 1;
}

Expand All @@ -128,9 +127,10 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
}

// check if this packet need to be filtered if filtering feature is enabled
if (is_filter_enabled()) {
bool skip = check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, &filter_sampling,
direction);
u32 filter_sampling = 0;
bool skip =
check_and_do_flow_filtering(&id, pkt.flags, 0, eth_protocol, &filter_sampling, direction);
if (has_filter_sampling) {
if (filter_sampling == 0) {
filter_sampling = sampling;
}
Expand All @@ -140,9 +140,9 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
return TC_ACT_OK;
}
do_sampling = 1;
if (skip) {
return TC_ACT_OK;
}
}
if (skip) {
return TC_ACT_OK;
}

int dns_errno = 0;
Expand Down
Binary file modified pkg/ebpf/bpf_arm64_bpfel.o
Binary file not shown.
Binary file modified pkg/ebpf/bpf_powerpc_bpfel.o
Binary file not shown.
Binary file modified pkg/ebpf/bpf_s390_bpfeb.o
Binary file not shown.
Binary file modified pkg/ebpf/bpf_x86_bpfel.o
Binary file not shown.
25 changes: 14 additions & 11 deletions pkg/tracer/flow_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,14 @@ type FilterConfig struct {
}

type Filter struct {
// eBPF objs to create/update eBPF maps
objects *ebpf.BpfObjects
config []*FilterConfig
config []*FilterConfig
}

func NewFilter(objects *ebpf.BpfObjects, cfg []*FilterConfig) *Filter {
return &Filter{
objects: objects,
config: cfg,
}
func NewFilter(cfg []*FilterConfig) *Filter {
return &Filter{config: cfg}
}

func (f *Filter) ProgramFilter() error {

func (f *Filter) ProgramFilter(objects *ebpf.BpfObjects) error {
for _, config := range f.config {
log.Infof("Flow filter config: %v", f.config)
key, err := f.getFilterKey(config)
Expand All @@ -55,7 +49,7 @@ func (f *Filter) ProgramFilter() error {
return fmt.Errorf("failed to get filter value: %w", err)
}

err = f.objects.FilterMap.Update(key, val, cilium.UpdateAny)
err = objects.FilterMap.Update(key, val, cilium.UpdateAny)
if err != nil {
return fmt.Errorf("failed to update filter map: %w", err)
}
Expand Down Expand Up @@ -264,3 +258,12 @@ func ConvertFilterPortsToInstr(intPort int32, rangePorts, ports string) intstr.I
}
return intstr.FromInt32(intPort)
}

func (f *Filter) hasSampling() uint8 {
for _, r := range f.config {
if r.FilterSample > 0 {
return 1
}
}
return 0
}
21 changes: 15 additions & 6 deletions pkg/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
dnsLatencyMap = "dns_flows"
// constants defined in flows.c as "volatile const"
constSampling = "sampling"
constHasFilterSampling = "has_filter_sampling"
constTraceMessages = "trace_messages"
constEnableRtt = "enable_rtt"
constEnableDNSTracking = "enable_dns_tracking"
Expand Down Expand Up @@ -114,6 +115,10 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
var err error
objects := ebpf.BpfObjects{}
var pinDir string
var filter *Filter
if cfg.EnableFlowFilter {
filter = NewFilter(cfg.FilterConfig)
}

if !cfg.UseEbpfManager {
if err := rlimit.RemoveMemlock(); err != nil {
Expand Down Expand Up @@ -162,8 +167,10 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
}

enableFlowFiltering := 0
if cfg.EnableFlowFilter {
hasFilterSampling := uint8(0)
if filter != nil {
enableFlowFiltering = 1
hasFilterSampling = filter.hasSampling()
}
enableNetworkEventsMonitoring := 0
if cfg.EnableNetworkEventsMonitoring {
Expand All @@ -178,7 +185,9 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
enablePktTranslation = 1
}
if err := spec.RewriteConstants(map[string]interface{}{
// When adding constants here, remember to delete them in NewPacketFetcher
constSampling: uint32(cfg.Sampling),
constHasFilterSampling: hasFilterSampling,
constTraceMessages: uint8(traceMsgs),
constEnableRtt: uint8(enableRtt),
constEnableDNSTracking: uint8(enableDNSTracking),
Expand Down Expand Up @@ -326,9 +335,8 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) {
}
}

if cfg.EnableFlowFilter {
f := NewFilter(&objects, cfg.FilterConfig)
if err := f.ProgramFilter(); err != nil {
if filter != nil {
if err := filter.ProgramFilter(&objects); err != nil {
return nil, fmt.Errorf("programming flow filter: %w", err)
}
}
Expand Down Expand Up @@ -1291,6 +1299,7 @@ func NewPacketFetcher(cfg *FlowFetcherConfig) (*PacketFetcher, error) {
delete(spec.Programs, aggregatedFlowsMap)
delete(spec.Programs, additionalFlowMetrics)
delete(spec.Programs, constSampling)
delete(spec.Programs, constHasFilterSampling)
delete(spec.Programs, constTraceMessages)
delete(spec.Programs, constEnableDNSTracking)
delete(spec.Programs, constDNSTrackingPort)
Expand Down Expand Up @@ -1330,8 +1339,8 @@ func NewPacketFetcher(cfg *FlowFetcherConfig) (*PacketFetcher, error) {
},
}

f := NewFilter(&objects, cfg.FilterConfig)
if err := f.ProgramFilter(); err != nil {
f := NewFilter(cfg.FilterConfig)
if err := f.ProgramFilter(&objects); err != nil {
return nil, fmt.Errorf("programming flow filter: %w", err)
}

Expand Down
Loading