Skip to content

Commit

Permalink
fix(pkg/bpf): Use channel to process events (#1671)
Browse files Browse the repository at this point in the history
Processing events in the same goroutine as the ring buffer reader
requires acquiring a mutex, which blocks ringbuf event processing
causing a backlog. To avoid this, send events via a buffered
channel to a dedicated event processing goroutine to ensure
that the ringbuf remains unblocked. This has decreased CPU
load from 1-3% on my machine to 0-1% CPU load.

Signed-off-by: Dave Tucker <[email protected]>
  • Loading branch information
dave-tucker authored Aug 5, 2024
1 parent df3a4b6 commit c99e399
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 56 deletions.
5 changes: 5 additions & 0 deletions cmd/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ var (
apiserverEnabled = flag.Bool("apiserver", true, "if apiserver is disabled, we collect pod information from kubelet")
redfishCredFilePath = flag.String("redfish-cred-file-path", "", "path to the redfish credential file")
exposeEstimatedIdlePower = flag.Bool("expose-estimated-idle-power", false, "estimated idle power is meaningful only if Kepler is running on bare-metal or when there is only one virtual machine on the node")
bpfDebugMetricsEnabled = flag.Bool("bpf-debug-metrics", false, "whether to enable debug metrics for eBPF")
)

func healthProbe(w http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -150,6 +151,10 @@ func main() {
klog.Fatalf("failed to create eBPF exporter: %v", err)
}
defer bpfExporter.Detach()
if *bpfDebugMetricsEnabled {
bpfExporter.RegisterMetrics(registry)
}

stopCh := make(chan struct{})
bpfErrCh := make(chan error)
go func() {
Expand Down
164 changes: 108 additions & 56 deletions pkg/bpf/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cilium/ebpf/ringbuf"
"github.com/cilium/ebpf/rlimit"
"github.com/jaypipes/ghw"
"github.com/prometheus/client_golang/prometheus"
"github.com/sustainable-computing-io/kepler/pkg/config"
"golang.org/x/sys/unix"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -51,6 +52,10 @@ type exporter struct {
enabledHardwareCounters sets.Set[string]
enabledSoftwareCounters sets.Set[string]

eventsRead prometheus.Counter
eventsProcessed prometheus.Counter
channelDepth prometheus.GaugeFunc

// Locks processMetrics and freedPIDs.
// Acquired in CollectProcesses - to prevent new events from being processed
// while summarizing the metrics and resetting the counters.
Expand All @@ -61,6 +66,9 @@ type exporter struct {
mu *sync.Mutex
processMetrics map[uint32]*bpfMetrics
freedPIDs []int

ringbufReader *ringbuf.Reader
eventsChan chan *keplerEvent
}

func NewExporter() (Exporter, error) {
Expand All @@ -70,14 +78,41 @@ func NewExporter() (Exporter, error) {
enabledSoftwareCounters: sets.New[string](),
mu: &sync.Mutex{},
processMetrics: make(map[uint32]*bpfMetrics),
eventsChan: make(chan *keplerEvent, 1024),
}
e.eventsRead = prometheus.NewCounter(prometheus.CounterOpts{
Name: "kepler_bpf_exporter_events_read_total",
Help: "Total number of events read from the ring buffer.",
})
e.eventsProcessed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "kepler_bpf_exporter_events_processed_total",
Help: "Total number of events processed.",
})
e.channelDepth = prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Name: "kepler_bpf_exporter_events_channel_depth",
Help: "Current depth of the events channel",
},
func() float64 {
if e.eventsChan == nil {
return 0
}
return float64(len(e.eventsChan))
},
)
err := e.attach()
if err != nil {
e.Detach()
}
return e, err
}

func (e *exporter) RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(e.eventsRead)
registry.MustRegister(e.eventsProcessed)
registry.MustRegister(e.channelDepth)
}

func (e *exporter) SupportedMetrics() SupportedMetrics {
return SupportedMetrics{
HardwareCounters: e.enabledHardwareCounters.Clone(),
Expand Down Expand Up @@ -220,29 +255,38 @@ func (e *exporter) Detach() {
}

func (e *exporter) Start(stopChan <-chan struct{}) error {
rd, err := ringbuf.NewReader(e.bpfObjects.Rb)
var err error
e.ringbufReader, err = ringbuf.NewReader(e.bpfObjects.Rb)
if err != nil {
return fmt.Errorf("failed to create ring buffer reader: %w", err)
}
defer rd.Close()
defer e.ringbufReader.Close()

wg := &sync.WaitGroup{}
wg.Add(2)
go e.ringBufReader(wg, stopChan)
go e.eventProcessor(wg, stopChan)
wg.Wait()

return nil
}

func (e *exporter) ringBufReader(wg *sync.WaitGroup, stopChan <-chan struct{}) {
defer wg.Done()
for {
var record *ringbuf.Record

select {
case <-stopChan:
if err := rd.Close(); err != nil {
return fmt.Errorf("closing ring buffer reader: %w", err)
}
return nil
return
default:
var event keplerEvent
record = new(ringbuf.Record)

err := rd.ReadInto(record)
err := e.ringbufReader.ReadInto(record)
if err != nil {
if errors.Is(err, ringbuf.ErrClosed) {
return nil
return
}
if errors.Is(err, ringbuf.ErrFlushed) {
record.RawSample = record.RawSample[:0]
Expand All @@ -255,9 +299,63 @@ func (e *exporter) Start(stopChan <-chan struct{}) error {
klog.Errorf("parsing ringbuf event: %s", err)
continue
}
// process events on another channel to avoid blocking the ring buffer reader
e.eventsChan <- &event
e.eventsRead.Inc()
}
}
}

// Process the event
e.handleEvent(event)
func (e *exporter) eventProcessor(wg *sync.WaitGroup, stopChan <-chan struct{}) {
defer wg.Done()
for {
select {
case <-stopChan:
return
case event := <-e.eventsChan:
e.mu.Lock()

var p *bpfMetrics

if _, ok := e.processMetrics[event.Pid]; !ok {
e.processMetrics[event.Pid] = newBpfMetrics()
}
p = e.processMetrics[event.Pid]

switch event.EventType {
case uint64(keplerEventTypeSCHED_SWITCH):
// Handle the new task going on CPU
p.CPUCyles.Start(event.CpuId, event.Tid, event.CpuCycles)
p.CPUInstructions.Start(event.CpuId, event.Tid, event.CpuInstr)
p.CacheMiss.Start(event.CpuId, event.Tid, event.CacheMiss)
p.CPUTime.Start(event.CpuId, event.Tid, event.Ts)

// Handle the task going OFF CPU
if _, ok := e.processMetrics[event.OffcpuPid]; !ok {
e.processMetrics[event.OffcpuPid] = newBpfMetrics()
}
offcpu := e.processMetrics[event.OffcpuPid]
offcpu.CPUCyles.Stop(event.CpuId, event.OffcpuTid, event.CpuCycles)
offcpu.CPUInstructions.Stop(event.CpuId, event.OffcpuTid, event.CpuInstr)
offcpu.CacheMiss.Stop(event.CpuId, event.OffcpuTid, event.CacheMiss)
offcpu.CPUTime.Stop(event.CpuId, event.OffcpuTid, event.Ts)
offcpu.CGroupID = event.OffcpuCgroupId
case uint64(keplerEventTypePAGE_CACHE_HIT):
p.PageCacheHit += 1
case uint64(keplerEventTypeIRQ):
switch event.IrqNumber {
case uint32(keplerIrqTypeNET_TX):
p.TxIRQ += 1
case uint32(keplerIrqTypeNET_RX):
p.RxIRQ += 1
case uint32(keplerIrqTypeBLOCK):
p.BlockIRQ += 1
}
case uint64(keplerEventTypeFREE):
e.freedPIDs = append(e.freedPIDs, int(event.Pid))
}
e.mu.Unlock()
e.eventsProcessed.Inc()
}
}
}
Expand Down Expand Up @@ -343,52 +441,6 @@ func (p *PerCPUCounter) Reset() {
p.Total = 0
}

func (e *exporter) handleEvent(event keplerEvent) {
e.mu.Lock()
defer e.mu.Unlock()

var p *bpfMetrics

if _, ok := e.processMetrics[event.Pid]; !ok {
e.processMetrics[event.Pid] = newBpfMetrics()
}
p = e.processMetrics[event.Pid]

switch event.EventType {
case uint64(keplerEventTypeSCHED_SWITCH):
// Handle the new task going on CPU
p.CPUCyles.Start(event.CpuId, event.Tid, event.CpuCycles)
p.CPUInstructions.Start(event.CpuId, event.Tid, event.CpuInstr)
p.CacheMiss.Start(event.CpuId, event.Tid, event.CacheMiss)
p.CPUTime.Start(event.CpuId, event.Tid, event.Ts)

// Handle the task going OFF CPU
if _, ok := e.processMetrics[event.OffcpuPid]; !ok {
e.processMetrics[event.OffcpuPid] = newBpfMetrics()
}
offcpu := e.processMetrics[event.OffcpuPid]
offcpu.CPUCyles.Stop(event.CpuId, event.OffcpuTid, event.CpuCycles)
offcpu.CPUInstructions.Stop(event.CpuId, event.OffcpuTid, event.CpuInstr)
offcpu.CacheMiss.Stop(event.CpuId, event.OffcpuTid, event.CacheMiss)
offcpu.CPUTime.Stop(event.CpuId, event.OffcpuTid, event.Ts)
offcpu.CGroupID = event.OffcpuCgroupId
case uint64(keplerEventTypePAGE_CACHE_HIT):
p.PageCacheHit += 1
case uint64(keplerEventTypeIRQ):
switch event.IrqNumber {
case uint32(keplerIrqTypeNET_TX):
p.TxIRQ += 1
case uint32(keplerIrqTypeNET_RX):
p.RxIRQ += 1
case uint32(keplerIrqTypeBLOCK):
p.BlockIRQ += 1
}
return
case uint64(keplerEventTypeFREE):
e.freedPIDs = append(e.freedPIDs, int(event.Pid))
}
}

func (e *exporter) CollectProcesses() (ProcessMetricsCollection, error) {
e.mu.Lock()
defer e.mu.Unlock()
Expand Down
3 changes: 3 additions & 0 deletions pkg/bpf/test_utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bpf

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/sustainable-computing-io/kepler/pkg/config"
"k8s.io/apimachinery/pkg/util/sets"
)
Expand Down Expand Up @@ -68,3 +69,5 @@ func (m *mockExporter) CollectProcesses() (ProcessMetricsCollection, error) {
FreedPIDs: []int{0},
}, nil
}

func (m *mockExporter) RegisterMetrics(registry *prometheus.Registry) {}
2 changes: 2 additions & 0 deletions pkg/bpf/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package bpf

import (
"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/util/sets"
)

Expand All @@ -25,6 +26,7 @@ type Exporter interface {
Detach()
CollectProcesses() (ProcessMetricsCollection, error)
Start(<-chan struct{}) error
RegisterMetrics(registry *prometheus.Registry)
}

type ProcessMetrics struct {
Expand Down

0 comments on commit c99e399

Please sign in to comment.