From 0baec479033aaca3d1744ec3010e156afd0075e8 Mon Sep 17 00:00:00 2001 From: Dave Tucker Date: Sat, 18 May 2024 17:44:01 +0100 Subject: [PATCH] feat: Fixed eBPF Feature Detection (#1443) On working to migrate to cilium/ebpf I started to hit issues with nil pointer dereferences that indicated that something odd was happening between eBPF feature detection and metrics export. I discovered that there were additional global variables in many of the packages whose values are set before eBPF feature detection has happened. It appears that in these cases - through the various layers of indirection - we may try and read/write metrics that do not exist causing a nil pointer dereference. There appear to be several places where we're accessing map keys without checking they are nil - that will need to be fixed in a follow up. This PR attempts to at least ensure that from the BPF side that the keys we expect to exist, do actually exist in the maps that matter. eBPF feature detection how produces a bpf.SupportedMetrics struct, which is now consumed in various places in the codebase to ensure that the metrics we're exporting match those supported by the eBPF exporter. Signed-off-by: Dave Tucker --- cmd/exporter/exporter.go | 2 +- go.mod | 2 +- pkg/bpf/exporter.go | 163 ++++++++++-------- pkg/bpf/exporter_stub.go | 13 +- pkg/bpf/test_utils.go | 54 +++--- pkg/bpf/types.go | 20 ++- pkg/collector/metric_collector.go | 30 ++-- pkg/collector/metric_collector_test.go | 7 +- .../accelerator/process_gpu_collector.go | 11 +- .../bpf/process_bpf_collector.go | 37 ++-- pkg/collector/stats/benchmark_test.go | 5 +- pkg/collector/stats/container_stats.go | 5 +- pkg/collector/stats/container_stats_test.go | 3 +- pkg/collector/stats/node_stats.go | 5 +- pkg/collector/stats/process_stats.go | 11 +- pkg/collector/stats/stats.go | 19 +- pkg/collector/stats/stats_test.go | 11 +- pkg/collector/stats/test_utils.go | 20 +-- pkg/collector/stats/utils.go | 37 +--- pkg/collector/stats/utils_test.go | 38 ---- pkg/collector/stats/vm_stats.go | 6 +- pkg/collector/stats/vm_stats_test.go | 3 +- pkg/collector/utils.go | 4 +- pkg/config/config.go | 7 - pkg/kubernetes/watcher.go | 22 +-- pkg/manager/manager.go | 7 +- pkg/manager/manager_test.go | 2 +- pkg/metrics/consts/conts.go | 15 -- pkg/metrics/container/metrics.go | 25 ++- pkg/metrics/metricfactory/metric_factory.go | 23 +-- pkg/metrics/process/metrics.go | 24 +-- pkg/metrics/prometheus_collector.go | 15 +- pkg/metrics/prometheus_collector_test.go | 9 +- pkg/metrics/utils/utils.go | 26 ++- pkg/metrics/virtualmachine/metrics.go | 24 +-- pkg/model/benchmark_test.go | 5 +- pkg/model/model.go | 5 +- pkg/model/process_energy.go | 13 +- pkg/model/process_energy_test.go | 7 +- 39 files changed, 348 insertions(+), 387 deletions(-) delete mode 100644 pkg/collector/stats/utils_test.go diff --git a/cmd/exporter/exporter.go b/cmd/exporter/exporter.go index 874924f509..d8f25df664 100644 --- a/cmd/exporter/exporter.go +++ b/cmd/exporter/exporter.go @@ -110,7 +110,7 @@ func main() { } defer bpfExporter.Detach() - stats.InitAvailableParamAndMetrics(bpfExporter.GetEnabledBPFHWCounters(), bpfExporter.GetEnabledBPFSWCounters()) + stats.InitAvailableParamAndMetrics() if config.EnabledGPU { klog.Infof("Initializing the GPU collector") diff --git a/go.mod b/go.mod index 29ca368f0a..125f798398 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,6 @@ require ( github.com/prometheus/common v0.48.0 github.com/prometheus/prometheus v0.48.1 github.com/sirupsen/logrus v1.9.0 - golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 golang.org/x/sys v0.20.0 gopkg.in/yaml.v3 v3.0.1 k8s.io/api v0.28.2 @@ -75,6 +74,7 @@ require ( github.com/prometheus/procfs v0.12.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/testify v1.8.4 // indirect + golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/net v0.25.0 // indirect golang.org/x/oauth2 v0.16.0 // indirect golang.org/x/term v0.20.0 // indirect diff --git a/pkg/bpf/exporter.go b/pkg/bpf/exporter.go index be63187b04..8c92a5fe64 100644 --- a/pkg/bpf/exporter.go +++ b/pkg/bpf/exporter.go @@ -33,8 +33,8 @@ import ( "github.com/jaypipes/ghw" "github.com/sustainable-computing-io/kepler/pkg/config" "github.com/sustainable-computing-io/kepler/pkg/utils" - "golang.org/x/exp/slices" "golang.org/x/sys/unix" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" ) @@ -43,19 +43,10 @@ const ( bpfAssestsLocation = "/var/lib/kepler/bpfassets" cpuOnline = "/sys/devices/system/cpu/online" bpfPerfArraySuffix = "_event_reader" - - // Per /sys/kernel/debug/tracing/events/irq/softirq_entry/format - // { 0, "HI" }, { 1, "TIMER" }, { 2, "NET_TX" }, { 3, "NET_RX" }, { 4, "BLOCK" }, { 5, "IRQ_POLL" }, { 6, "TASKLET" }, { 7, "SCHED" }, { 8, "HRTIMER" }, { 9, "RCU" } - - // IRQ vector to IRQ number - IRQNetTX = 2 - IRQNetRX = 3 - IRQBlock = 4 - - TableProcessName = "processes" - TableCPUFreqName = "cpu_freq_array" - MapSize = 10240 - CPUNumSize = 128 + TableProcessName = "processes" + TableCPUFreqName = "cpu_freq_array" + MapSize = 10240 + CPUNumSize = 128 ) type exporter struct { @@ -66,11 +57,10 @@ type exporter struct { cpuCores int // due to performance reason we keep an empty struct to verify if a new read is also empty emptyct ProcessBPFMetrics - hardwareCountersEnabled bool byteOrder binary.ByteOrder perfEventFds []int - enabledHardwareCounters []string - enabledSoftwareCounters []string + enabledHardwareCounters sets.Set[string] + enabledSoftwareCounters sets.Set[string] } func NewExporter() (Exporter, error) { @@ -80,11 +70,10 @@ func NewExporter() (Exporter, error) { ebpfBatchGetAndDelete: true, cpuCores: getCPUCores(), emptyct: ProcessBPFMetrics{}, - hardwareCountersEnabled: true, byteOrder: utils.DetermineHostByteOrder(), perfEventFds: []int{}, - enabledHardwareCounters: []string{}, - enabledSoftwareCounters: []string{}, + enabledHardwareCounters: sets.New[string](), + enabledSoftwareCounters: sets.New[string](), } err := e.attach() if err != nil { @@ -98,16 +87,11 @@ type perfCounter struct { EvConfig int } -func (e *exporter) GetEnabledBPFHWCounters() []string { - return e.enabledHardwareCounters -} - -func (e *exporter) GetEnabledBPFSWCounters() []string { - return e.enabledSoftwareCounters -} - -func (e *exporter) HardwareCountersEnabled() bool { - return e.hardwareCountersEnabled +func (e *exporter) SupportedMetrics() SupportedMetrics { + return SupportedMetrics{ + HardwareCounters: e.enabledHardwareCounters.Clone(), + SoftwareCounters: e.enabledSoftwareCounters.Clone(), + } } func getLibbpfObjectFilePath(byteOrder binary.ByteOrder) (string, error) { @@ -132,7 +116,7 @@ func getLibbpfObjectFilePath(byteOrder binary.ByteOrder) (string, error) { return nil }) if err != nil { - return "", fmt.Errorf("failed to find bpf object file: %v", err) + return "", fmt.Errorf("failed to find bpf object file: %w", err) } if len(matches) < 1 { return "", fmt.Errorf("failed to find bpf object file: no matches found") @@ -146,12 +130,12 @@ func getLibbpfObjectFilePath(byteOrder binary.ByteOrder) (string, error) { func (e *exporter) attach() error { libbpfObjectFilePath, err := getLibbpfObjectFilePath(e.byteOrder) if err != nil { - return fmt.Errorf("failed to load module: %v", err) + return fmt.Errorf("failed to load module: %w", err) } e.module, err = bpf.NewModuleFromFile(libbpfObjectFilePath) if err != nil { - return fmt.Errorf("failed to load eBPF module from libbpf object: %v", err) + return fmt.Errorf("failed to load eBPF module from libbpf object: %w", err) } // resize array entries @@ -162,59 +146,62 @@ func (e *exporter) attach() error { } for _, arrayName := range toResize { if err = resizeArrayEntries(e.module, arrayName, e.cpuCores); err != nil { - return fmt.Errorf("failed to resize array %s: %v\n", arrayName, err) + return fmt.Errorf("failed to resize array %s: %w", arrayName, err) } } // set the sample rate, this must be done before loading the object sampleRate := config.BPFSampleRate if err := e.module.InitGlobalVariable("SAMPLE_RATE", int32(sampleRate)); err != nil { - return fmt.Errorf("failed to set sample rate: %v", err) + return fmt.Errorf("failed to set sample rate: %w", err) } if err := e.module.BPFLoadObject(); err != nil { - return fmt.Errorf("failed to load eBPF object: %v", err) + return fmt.Errorf("failed to load eBPF object: %w", err) } // attach to kprobe__finish_task_switch kprobe function prog, err := e.module.GetProgram("kepler_sched_switch_trace") if err != nil { - return fmt.Errorf("failed to get kepler_sched_switch_trace: %v", err) + return fmt.Errorf("failed to get kepler_sched_switch_trace: %w", err) } if _, err = prog.AttachGeneric(); err != nil { klog.Infof("failed to attach tracepoint/sched/sched_switch: %v", err) } else { - e.enabledSoftwareCounters = append(e.enabledSoftwareCounters, config.CPUTime) + e.enabledSoftwareCounters[config.CPUTime] = struct{}{} } if config.ExposeIRQCounterMetrics { - // attach softirq_entry tracepoint to kepler_irq_trace function - irq_prog, err := e.module.GetProgram("kepler_irq_trace") - if err != nil { - klog.Warningf("could not get kepler_irq_trace: %v", err) - // disable IRQ metric - config.ExposeIRQCounterMetrics = false - } else { + err := func() error { + // attach softirq_entry tracepoint to kepler_irq_trace function + irq_prog, err := e.module.GetProgram("kepler_irq_trace") + if err != nil { + return fmt.Errorf("could not get kepler_irq_trace: %w", err) + } if _, err := irq_prog.AttachGeneric(); err != nil { - klog.Warningf("could not attach irq/softirq_entry: %v", err) - // disable IRQ metric - config.ExposeIRQCounterMetrics = false + return fmt.Errorf("could not attach irq/softirq_entry: %w", err) } - e.enabledSoftwareCounters = append(e.enabledSoftwareCounters, SoftIRQEvents...) + e.enabledSoftwareCounters[config.IRQNetTXLabel] = struct{}{} + e.enabledSoftwareCounters[config.IRQNetRXLabel] = struct{}{} + e.enabledSoftwareCounters[config.IRQBlockLabel] = struct{}{} + return nil + }() + if err != nil { + klog.Warningf("IRQ tracing disabled: %v", err) } } // attach function page_write, err := e.module.GetProgram("kepler_write_page_trace") if err != nil { - return fmt.Errorf("failed to get kepler_write_page_trace: %v", err) + return fmt.Errorf("failed to get kepler_write_page_trace: %w", err) } else { _, err = page_write.AttachTracepoint("writeback", "writeback_dirty_folio") if err != nil { klog.Warningf("failed to attach tp/writeback/writeback_dirty_folio: %v. Kepler will not collect page cache write events. This will affect the DRAM power model estimation on VMs.", err) } else { - e.enabledSoftwareCounters = append(e.enabledSoftwareCounters, config.PageCacheHit) + e.enabledSoftwareCounters[config.PageCacheHit] = struct{}{} } } @@ -226,47 +213,75 @@ func (e *exporter) attach() error { if _, err = page_read.AttachGeneric(); err != nil { klog.Warningf("failed to attach fentry/mark_page_accessed: %v. Kepler will not collect page cache read events. This will affect the DRAM power model estimation on VMs.", err) } else { - if !slices.Contains(e.enabledSoftwareCounters, config.PageCacheHit) { - e.enabledSoftwareCounters = append(e.enabledSoftwareCounters, config.PageCacheHit) - } + e.enabledSoftwareCounters[config.PageCacheHit] = struct{}{} } } + if !config.ExposeHardwareCounterMetrics { + klog.Infof("Hardware counter metrics are disabled") + return nil + } + // attach performance counter fd to BPF_PERF_EVENT_ARRAY - counters := map[string]perfCounter{ + hardwareCounters := map[string]perfCounter{ config.CPUCycle: {unix.PERF_TYPE_HARDWARE, unix.PERF_COUNT_HW_CPU_CYCLES}, // CPURefCycles aren't populated from the eBPF programs // If this is a bug, we should fix that and bring this map back // config.CPURefCycle: {unix.PERF_TYPE_HARDWARE, unix.PERF_COUNT_HW_REF_CPU_CYCLES, true}, config.CPUInstruction: {unix.PERF_TYPE_HARDWARE, unix.PERF_COUNT_HW_INSTRUCTIONS}, config.CacheMiss: {unix.PERF_TYPE_HARDWARE, unix.PERF_COUNT_HW_CACHE_MISSES}, - config.TaskClock: {unix.PERF_TYPE_SOFTWARE, unix.PERF_COUNT_SW_TASK_CLOCK}, } - for arrayName, counter := range counters { + cleanup := func() error { + unixClosePerfEvents(e.perfEventFds) + e.perfEventFds = []int{} + e.enabledHardwareCounters.Clear() + return nil + } + + for arrayName, counter := range hardwareCounters { bpfPerfArrayName := arrayName + bpfPerfArraySuffix bpfMap, perfErr := e.module.GetMap(bpfPerfArrayName) if perfErr != nil { klog.Warningf("could not get ebpf map for perf event %s: %v\n", bpfPerfArrayName, perfErr) - continue - } else { - fds, perfErr := unixOpenPerfEvent(counter.EvType, counter.EvConfig, e.cpuCores) - if perfErr != nil { - // some hypervisors don't expose perf counters - klog.Warningf("could not attach perf event %s: %v. Are you using a VM?\n", bpfPerfArrayName, perfErr) - // if any counter is not enabled, we need disable HardwareCountersEnabled - e.hardwareCountersEnabled = false - } - for i, fd := range fds { - err = bpfMap.Update(unsafe.Pointer(&i), unsafe.Pointer(&fd)) - if err != nil { - return fmt.Errorf("failed to update bpf map: %v", err) - } + return cleanup() + } + fds, perfErr := unixOpenPerfEvent(counter.EvType, counter.EvConfig, e.cpuCores) + if perfErr != nil { + klog.Warningf("could not attach perf event %s: %v. Are you using a VM?\n", bpfPerfArrayName, perfErr) + return cleanup() + } + for i, fd := range fds { + err = bpfMap.Update(unsafe.Pointer(&i), unsafe.Pointer(&fd)) + if err != nil { + klog.Warningf("failed to update bpf map: %v", err) + return cleanup() } - e.perfEventFds = append(e.perfEventFds, fds...) - e.enabledHardwareCounters = append(e.enabledHardwareCounters, arrayName) } + e.perfEventFds = append(e.perfEventFds, fds...) + e.enabledHardwareCounters[arrayName] = struct{}{} } + + // attach task clock perf event. this is a software counter, not a hardware counter + bpfPerfArrayName := config.TaskClock + bpfPerfArraySuffix + bpfMap, err := e.module.GetMap(bpfPerfArrayName) + if err != nil { + return fmt.Errorf("could not get ebpf map for perf event %s: %w", bpfPerfArrayName, err) + } + fds, perfErr := unixOpenPerfEvent(unix.PERF_TYPE_SOFTWARE, unix.PERF_COUNT_SW_TASK_CLOCK, e.cpuCores) + if perfErr != nil { + return fmt.Errorf("could not attach perf event %s: %w", bpfPerfArrayName, perfErr) + } + for i, fd := range fds { + err = bpfMap.Update(unsafe.Pointer(&i), unsafe.Pointer(&fd)) + if err != nil { + klog.Warningf("failed to update bpf map: %v", err) + return cleanup() + } + } + e.perfEventFds = append(e.perfEventFds, fds...) + e.enabledSoftwareCounters[config.TaskClock] = struct{}{} + klog.Infof("Successfully load eBPF module from libbpf object") return nil } @@ -362,7 +377,7 @@ func unixOpenPerfEvent(typ, conf, cpuCores int) ([]int, error) { cloexecFlags := unix.PERF_FLAG_FD_CLOEXEC fd, err := unix.PerfEventOpen(sysAttr, -1, int(i), -1, cloexecFlags) if fd < 0 { - return nil, fmt.Errorf("failed to open bpf perf event on cpu %d: %v", i, err) + return nil, fmt.Errorf("failed to open bpf perf event on cpu %d: %w", i, err) } fds = append(fds, int(fd)) } diff --git a/pkg/bpf/exporter_stub.go b/pkg/bpf/exporter_stub.go index c025a6ff91..ab7fc7291a 100644 --- a/pkg/bpf/exporter_stub.go +++ b/pkg/bpf/exporter_stub.go @@ -19,18 +19,19 @@ limitations under the License. package bpf +import "k8s.io/apimachinery/pkg/util/sets" + type stubAttacher struct{} func NewExporter() (Exporter, error) { return &stubAttacher{}, nil } -func (a *stubAttacher) GetEnabledBPFHWCounters() []string { - return []string{} -} - -func (a *stubAttacher) GetEnabledBPFSWCounters() []string { - return []string{} +func (a *stubAttacher) SupportedMetrics() SupportedMetrics { + return SupportedMetrics{ + HardwareCounters: sets.New[string](), + SoftwareCounters: sets.New[string](), + } } func (a *stubAttacher) Detach() { diff --git a/pkg/bpf/test_utils.go b/pkg/bpf/test_utils.go index 157a9be9d3..b5269bc145 100644 --- a/pkg/bpf/test_utils.go +++ b/pkg/bpf/test_utils.go @@ -1,19 +1,46 @@ package bpf -import "github.com/sustainable-computing-io/kepler/pkg/config" +import ( + "github.com/sustainable-computing-io/kepler/pkg/config" + "k8s.io/apimachinery/pkg/util/sets" +) type mockExporter struct { - hardwareCountersEnabled bool + softwareCounters sets.Set[string] + hardwareCounters sets.Set[string] } -func NewMockExporter(hardwareCountersEnabled bool) Exporter { +func DefaultSupportedMetrics() SupportedMetrics { + return SupportedMetrics{ + HardwareCounters: defaultHardwareCounters(), + SoftwareCounters: defaultSoftwareCounters(), + } +} + +func defaultHardwareCounters() sets.Set[string] { + return sets.New(config.CPUCycle, config.CPUInstruction, config.CacheMiss, config.TaskClock) +} + +func defaultSoftwareCounters() sets.Set[string] { + swCounters := sets.New(config.CPUTime, config.PageCacheHit) + if config.ExposeIRQCounterMetrics { + swCounters.Insert(config.IRQNetTXLabel, config.IRQNetRXLabel, config.IRQBlockLabel) + } + return swCounters +} + +func NewMockExporter(bpfSupportedMetrics SupportedMetrics) Exporter { return &mockExporter{ - hardwareCountersEnabled: hardwareCountersEnabled, + softwareCounters: bpfSupportedMetrics.SoftwareCounters.Clone(), + hardwareCounters: bpfSupportedMetrics.HardwareCounters.Clone(), } } -func (m *mockExporter) HardwareCountersEnabled() bool { - return m.hardwareCountersEnabled +func (m *mockExporter) SupportedMetrics() SupportedMetrics { + return SupportedMetrics{ + HardwareCounters: m.hardwareCounters, + SoftwareCounters: m.softwareCounters, + } } func (m *mockExporter) Detach() {} @@ -39,18 +66,3 @@ func (m *mockExporter) CollectProcesses() ([]ProcessBPFMetrics, error) { func (m *mockExporter) CollectCPUFreq() (map[int32]uint64, error) { return map[int32]uint64{0: 0}, nil } - -func (m *mockExporter) GetEnabledBPFHWCounters() []string { - if !m.hardwareCountersEnabled { - return []string{} - } - return []string{config.CPUCycle, config.CPUInstruction, config.CacheMiss, config.TaskClock} -} - -func (m *mockExporter) GetEnabledBPFSWCounters() []string { - swCounters := []string{config.CPUTime, config.TaskClock, config.PageCacheHit} - if config.ExposeIRQCounterMetrics { - swCounters = append(swCounters, SoftIRQEvents...) - } - return swCounters -} diff --git a/pkg/bpf/types.go b/pkg/bpf/types.go index aea9f4b5e2..a5be755f71 100644 --- a/pkg/bpf/types.go +++ b/pkg/bpf/types.go @@ -18,17 +18,29 @@ package bpf import ( "github.com/sustainable-computing-io/kepler/pkg/config" + "k8s.io/apimachinery/pkg/util/sets" ) -var SoftIRQEvents = []string{config.IRQNetTXLabel, config.IRQNetRXLabel, config.IRQBlockLabel} +const ( + // Per /sys/kernel/debug/tracing/events/irq/softirq_entry/format + // { 0, "HI" }, { 1, "TIMER" }, { 2, "NET_TX" }, { 3, "NET_RX" }, { 4, "BLOCK" }, { 5, "IRQ_POLL" }, { 6, "TASKLET" }, { 7, "SCHED" }, { 8, "HRTIMER" }, { 9, "RCU" } + + // IRQ vector to IRQ number + IRQNetTX = 2 + IRQNetRX = 3 + IRQBlock = 4 +) type Exporter interface { - HardwareCountersEnabled() bool + SupportedMetrics() SupportedMetrics Detach() CollectProcesses() ([]ProcessBPFMetrics, error) CollectCPUFreq() (map[int32]uint64, error) - GetEnabledBPFHWCounters() []string - GetEnabledBPFSWCounters() []string +} + +type SupportedMetrics struct { + HardwareCounters sets.Set[string] + SoftwareCounters sets.Set[string] } // must be in sync with bpf program diff --git a/pkg/collector/metric_collector.go b/pkg/collector/metric_collector.go index 4edc613af8..e72d0ad288 100644 --- a/pkg/collector/metric_collector.go +++ b/pkg/collector/metric_collector.go @@ -58,17 +58,21 @@ type Collector struct { // VMStats holds the aggregated processes metrics for all virtual machines VMStats map[string]*stats.VMStats - // Attacher handles the attachment of the bpf probes + // bpfExporter handles gathering metrics from bpf probes bpfExporter bpf.Exporter + // bpfSupportedMetrics holds the supported metrics by the bpf exporter + bpfSupportedMetrics bpf.SupportedMetrics } func NewCollector(bpfExporter bpf.Exporter) *Collector { + bpfSupportedMetrics := bpfExporter.SupportedMetrics() c := &Collector{ - NodeStats: *stats.NewNodeStats(bpfExporter.HardwareCountersEnabled()), - ContainerStats: map[string]*stats.ContainerStats{}, - ProcessStats: map[uint64]*stats.ProcessStats{}, - VMStats: map[string]*stats.VMStats{}, - bpfExporter: bpfExporter, + NodeStats: *stats.NewNodeStats(bpfSupportedMetrics), + ContainerStats: map[string]*stats.ContainerStats{}, + ProcessStats: map[uint64]*stats.ProcessStats{}, + VMStats: map[string]*stats.VMStats{}, + bpfExporter: bpfExporter, + bpfSupportedMetrics: bpfSupportedMetrics, } return c } @@ -85,10 +89,10 @@ func (c *Collector) Initialize() error { // For local estimator, there is endpoint provided, thus we should let // model component decide whether/how to init model.CreatePowerEstimatorModels( - stats.ProcessFeaturesNames, + stats.GetProcessFeatureNames(c.bpfSupportedMetrics), stats.NodeMetadataFeatureNames, stats.NodeMetadataFeatureValues, - c.bpfExporter.HardwareCountersEnabled(), + c.bpfSupportedMetrics, ) return nil @@ -163,7 +167,7 @@ func (c *Collector) updateResourceUtilizationMetrics() { // updateNodeAvgCPUFrequencyFromEBPF updates the average CPU frequency in each core func (c *Collector) updateNodeAvgCPUFrequencyFromEBPF() { // update the cpu frequency using hardware counters when available because reading files can be very expensive - if config.IsExposeCPUFrequencyMetricsEnabled() && c.bpfExporter.HardwareCountersEnabled() { + if config.IsExposeCPUFrequencyMetricsEnabled() && c.bpfSupportedMetrics.HardwareCounters.Has(config.CPUFrequency) { cpuFreq, err := c.bpfExporter.CollectCPUFreq() if err == nil { for cpu, freq := range cpuFreq { @@ -177,7 +181,7 @@ func (c *Collector) updateNodeAvgCPUFrequencyFromEBPF() { func (c *Collector) updateNodeResourceUtilizationMetrics(wg *sync.WaitGroup) { defer wg.Done() if config.IsExposeQATMetricsEnabled() && qat.IsQATCollectionSupported() { - accelerator.UpdateNodeQATMetrics(stats.NewNodeStats(c.bpfExporter.HardwareCountersEnabled())) + accelerator.UpdateNodeQATMetrics(stats.NewNodeStats(c.bpfSupportedMetrics)) } if config.ExposeCPUFrequencyMetrics { c.updateNodeAvgCPUFrequencyFromEBPF() @@ -190,7 +194,7 @@ func (c *Collector) updateProcessResourceUtilizationMetrics(wg *sync.WaitGroup) // we first updates the bpf which is resposible to include new processes in the ProcessStats collection resourceBpf.UpdateProcessBPFMetrics(c.bpfExporter, c.ProcessStats) if config.EnabledGPU && gpu.IsGPUCollectionSupported() { - accelerator.UpdateProcessGPUUtilizationMetrics(c.ProcessStats, c.bpfExporter.HardwareCountersEnabled()) + accelerator.UpdateProcessGPUUtilizationMetrics(c.ProcessStats, c.bpfSupportedMetrics) } } @@ -230,7 +234,7 @@ func (c *Collector) AggregateProcessResourceUtilizationMetrics() { if config.IsExposeVMStatsEnabled() { if process.VMID != "" { if _, ok := c.VMStats[process.VMID]; !ok { - c.VMStats[process.VMID] = stats.NewVMStats(process.PID, process.VMID, c.bpfExporter.HardwareCountersEnabled()) + c.VMStats[process.VMID] = stats.NewVMStats(process.PID, process.VMID, c.bpfSupportedMetrics) } c.VMStats[process.VMID].ResourceUsage[metricName].AddDeltaStat(id, delta) foundVM[process.VMID] = true @@ -315,7 +319,7 @@ func (c *Collector) AggregateProcessEnergyUtilizationMetrics() { if config.IsExposeVMStatsEnabled() { if process.VMID != "" { if _, ok := c.VMStats[process.VMID]; !ok { - c.VMStats[process.VMID] = stats.NewVMStats(process.PID, process.VMID, c.bpfExporter.HardwareCountersEnabled()) + c.VMStats[process.VMID] = stats.NewVMStats(process.PID, process.VMID, c.bpfSupportedMetrics) } c.VMStats[process.VMID].EnergyUsage[metricName].AddDeltaStat(id, delta) } diff --git a/pkg/collector/metric_collector_test.go b/pkg/collector/metric_collector_test.go index 0ff7e9d812..81d9600434 100644 --- a/pkg/collector/metric_collector_test.go +++ b/pkg/collector/metric_collector_test.go @@ -39,10 +39,11 @@ func newMockCollector(mockAttacher bpf.Exporter) *Collector { var _ = Describe("Test Collector Unit", func() { It("Get container power", func() { - bpfExporter := bpf.NewMockExporter(true) + bpfExporter := bpf.NewMockExporter(bpf.DefaultSupportedMetrics()) metricCollector := newMockCollector(bpfExporter) // The default estimator model is the ratio - model.CreatePowerEstimatorModels(stats.ProcessFeaturesNames, stats.NodeMetadataFeatureNames, stats.NodeMetadataFeatureValues, false) + bpfSupportedMetrics := bpfExporter.SupportedMetrics() + model.CreatePowerEstimatorModels(stats.GetProcessFeatureNames(bpfSupportedMetrics), stats.NodeMetadataFeatureNames, stats.NodeMetadataFeatureValues, bpfSupportedMetrics) // update container and node metrics metricCollector.UpdateProcessEnergyUtilizationMetrics() metricCollector.AggregateProcessEnergyUtilizationMetrics() @@ -56,7 +57,7 @@ var _ = Describe("Test Collector Unit", func() { }) It("HandleInactiveContainers without error", func() { - bpfExporter := bpf.NewMockExporter(true) + bpfExporter := bpf.NewMockExporter(bpf.DefaultSupportedMetrics()) metricCollector := newMockCollector(bpfExporter) foundContainer := make(map[string]bool) foundContainer["container1"] = true diff --git a/pkg/collector/resourceutilization/accelerator/process_gpu_collector.go b/pkg/collector/resourceutilization/accelerator/process_gpu_collector.go index eb6e649857..1adb559617 100644 --- a/pkg/collector/resourceutilization/accelerator/process_gpu_collector.go +++ b/pkg/collector/resourceutilization/accelerator/process_gpu_collector.go @@ -21,6 +21,7 @@ import ( "os" "time" + "github.com/sustainable-computing-io/kepler/pkg/bpf" "github.com/sustainable-computing-io/kepler/pkg/cgroup" "github.com/sustainable-computing-io/kepler/pkg/collector/stats" "github.com/sustainable-computing-io/kepler/pkg/config" @@ -42,26 +43,26 @@ var ( ) // UpdateProcessGPUUtilizationMetrics reads the GPU metrics of each process using the GPU -func UpdateProcessGPUUtilizationMetrics(processStats map[uint64]*stats.ProcessStats, hardwareCountersEnabled bool) { +func UpdateProcessGPUUtilizationMetrics(processStats map[uint64]*stats.ProcessStats, bpfSupportedMetrics bpf.SupportedMetrics) { // calculate the gpu's processes energy consumption for each gpu migDevices := gpu.GetMIGInstances() for _, device := range gpu.GetGpus() { // we need to use MIG device handler if the GPU has MIG slices, otherwise, we use the GPU device handler if _, hasMIG := migDevices[device.GPUID]; !hasMIG { - addGPUUtilizationToProcessStats(processStats, device, device.GPUID, hardwareCountersEnabled) + addGPUUtilizationToProcessStats(processStats, device, device.GPUID, bpfSupportedMetrics) } else { // if the device has MIG slices, we should collect the process information directly from the MIG device handler for _, migDevice := range migDevices[device.GPUID] { // device.GPUID is equal to migDevice.ParentGpuID // we add the process metrics with the parent GPU ID, so that the Ratio power model will use this data to split the GPU power among the process - addGPUUtilizationToProcessStats(processStats, migDevice, migDevice.ParentGpuID, hardwareCountersEnabled) + addGPUUtilizationToProcessStats(processStats, migDevice, migDevice.ParentGpuID, bpfSupportedMetrics) } } } lastUtilizationTimestamp = time.Now() } -func addGPUUtilizationToProcessStats(processStats map[uint64]*stats.ProcessStats, device gpu_source.Device, gpuID int, hardwareCountersEnabled bool) { +func addGPUUtilizationToProcessStats(processStats map[uint64]*stats.ProcessStats, device gpu_source.Device, gpuID int, bpfSupportedMetrics bpf.SupportedMetrics) { var err error var processesUtilization map[uint32]gpu_source.ProcessUtilizationSample if processesUtilization, err = gpu.GetProcessResourceUtilizationPerDevice(device, time.Since(lastUtilizationTimestamp)); err != nil { @@ -93,7 +94,7 @@ func addGPUUtilizationToProcessStats(processStats map[uint64]*stats.ProcessStats } } } - processStats[uintPid] = stats.NewProcessStats(uintPid, uint64(0), containerID, vmID, command, hardwareCountersEnabled) + processStats[uintPid] = stats.NewProcessStats(uintPid, uint64(0), containerID, vmID, command, bpfSupportedMetrics) } gpuName := fmt.Sprintf("%d", gpuID) // GPU ID or Parent GPU ID for MIG slices processStats[uintPid].ResourceUsage[config.GPUComputeUtilization].AddDeltaStat(gpuName, uint64(processUtilization.ComputeUtil)) diff --git a/pkg/collector/resourceutilization/bpf/process_bpf_collector.go b/pkg/collector/resourceutilization/bpf/process_bpf_collector.go index 493dc8d791..befc5fe967 100644 --- a/pkg/collector/resourceutilization/bpf/process_bpf_collector.go +++ b/pkg/collector/resourceutilization/bpf/process_bpf_collector.go @@ -33,22 +33,32 @@ import ( type ProcessBPFMetrics = bpf.ProcessBPFMetrics // update software counter metrics -func updateSWCounters(key uint64, ct *ProcessBPFMetrics, processStats map[uint64]*stats.ProcessStats) { +func updateSWCounters(key uint64, ct *ProcessBPFMetrics, processStats map[uint64]*stats.ProcessStats, bpfSupportedMetrics bpf.SupportedMetrics) { // update ebpf metrics // first update CPU time and Page Cache Hit - processStats[key].ResourceUsage[config.CPUTime].AddDeltaStat(utils.GenericSocketID, ct.ProcessRunTime) - processStats[key].ResourceUsage[config.TaskClock].AddDeltaStat(utils.GenericSocketID, ct.TaskClockTime) - processStats[key].ResourceUsage[config.PageCacheHit].AddDeltaStat(utils.GenericSocketID, ct.PageCacheHit/(1000*1000)) - // update IRQ vector. Soft IRQ events has the events ordered - for i, event := range bpf.SoftIRQEvents { - processStats[key].ResourceUsage[event].AddDeltaStat(utils.GenericSocketID, uint64(ct.VecNR[i])) + for counterKey := range bpfSupportedMetrics.SoftwareCounters { + switch counterKey { + case config.CPUTime: + processStats[key].ResourceUsage[config.CPUTime].AddDeltaStat(utils.GenericSocketID, ct.ProcessRunTime) + case config.TaskClock: + processStats[key].ResourceUsage[config.TaskClock].AddDeltaStat(utils.GenericSocketID, ct.TaskClockTime) + case config.PageCacheHit: + processStats[key].ResourceUsage[config.PageCacheHit].AddDeltaStat(utils.GenericSocketID, ct.PageCacheHit/(1000*1000)) + case config.IRQNetTXLabel: + processStats[key].ResourceUsage[config.IRQNetTXLabel].AddDeltaStat(utils.GenericSocketID, uint64(ct.VecNR[bpf.IRQNetTX])) + case config.IRQNetRXLabel: + processStats[key].ResourceUsage[config.IRQNetRXLabel].AddDeltaStat(utils.GenericSocketID, uint64(ct.VecNR[bpf.IRQNetRX])) + case config.IRQBlockLabel: + processStats[key].ResourceUsage[config.IRQBlockLabel].AddDeltaStat(utils.GenericSocketID, uint64(ct.VecNR[bpf.IRQBlock])) + default: + klog.Errorf("counter %s is not supported\n", counterKey) + } } } // update hardware counter metrics -func updateHWCounters(key uint64, ct *ProcessBPFMetrics, processStats map[uint64]*stats.ProcessStats) { - // update HW counters - for _, counterKey := range stats.AvailableBPFHWCounters { +func updateHWCounters(key uint64, ct *ProcessBPFMetrics, processStats map[uint64]*stats.ProcessStats, bpfSupportedMetrics bpf.SupportedMetrics) { + for counterKey := range bpfSupportedMetrics.HardwareCounters { var val uint64 var event string switch counterKey { @@ -115,10 +125,11 @@ func UpdateProcessBPFMetrics(bpfExporter bpf.Exporter, processStats map[uint64]* mapKey = 1 } + bpfSupportedMetrics := bpfExporter.SupportedMetrics() var ok bool var pStat *stats.ProcessStats if pStat, ok = processStats[mapKey]; !ok { - pStat = stats.NewProcessStats(ct.PID, ct.CGroupID, containerID, vmID, comm, bpfExporter.HardwareCountersEnabled()) + pStat = stats.NewProcessStats(ct.PID, ct.CGroupID, containerID, vmID, comm, bpfSupportedMetrics) processStats[mapKey] = pStat } else if pStat.Command == "" { pStat.Command = comm @@ -126,7 +137,7 @@ func UpdateProcessBPFMetrics(bpfExporter bpf.Exporter, processStats map[uint64]* // when the process metrics are updated, reset the idle counter pStat.IdleCounter = 0 - updateSWCounters(mapKey, &ct, processStats) - updateHWCounters(mapKey, &ct, processStats) + updateSWCounters(mapKey, &ct, processStats, bpfSupportedMetrics) + updateHWCounters(mapKey, &ct, processStats, bpfSupportedMetrics) } } diff --git a/pkg/collector/stats/benchmark_test.go b/pkg/collector/stats/benchmark_test.go index 868f332f91..d4433f71bb 100644 --- a/pkg/collector/stats/benchmark_test.go +++ b/pkg/collector/stats/benchmark_test.go @@ -28,7 +28,7 @@ func benchmarkNtesting(b *testing.B, processNumber int) { // enable metrics stats.SetMockedCollectorMetrics() // create node node metrics - bpfExporter := bpf.NewMockExporter(true) + bpfExporter := bpf.NewMockExporter(bpf.DefaultSupportedMetrics()) metricCollector := collector.NewCollector(bpfExporter) // create processes @@ -38,7 +38,8 @@ func benchmarkNtesting(b *testing.B, processNumber int) { metricCollector.AggregateProcessResourceUtilizationMetrics() // The default estimator model is the ratio - model.CreatePowerEstimatorModels(stats.ProcessFeaturesNames, stats.NodeMetadataFeatureNames, stats.NodeMetadataFeatureValues, true) + bpfSupportedMetrics := bpf.DefaultSupportedMetrics() + model.CreatePowerEstimatorModels(stats.GetProcessFeatureNames(bpfSupportedMetrics), stats.NodeMetadataFeatureNames, stats.NodeMetadataFeatureValues, bpfSupportedMetrics) // update container and node metrics b.ReportAllocs() diff --git a/pkg/collector/stats/container_stats.go b/pkg/collector/stats/container_stats.go index 19ec5ec6c2..587ce6f2d4 100644 --- a/pkg/collector/stats/container_stats.go +++ b/pkg/collector/stats/container_stats.go @@ -19,6 +19,7 @@ package stats import ( "fmt" + "github.com/sustainable-computing-io/kepler/pkg/bpf" "github.com/sustainable-computing-io/kepler/pkg/cgroup" "github.com/sustainable-computing-io/kepler/pkg/collector/stats/types" "github.com/sustainable-computing-io/kepler/pkg/config" @@ -39,9 +40,9 @@ type ContainerStats struct { } // NewContainerStats creates a new ContainerStats instance -func NewContainerStats(containerName, podName, podNamespace, containerID string, hardwareCountersEnabled bool) *ContainerStats { +func NewContainerStats(containerName, podName, podNamespace, containerID string, bpfSupportedMetrics bpf.SupportedMetrics) *ContainerStats { c := &ContainerStats{ - Stats: *NewStats(hardwareCountersEnabled), + Stats: *NewStats(bpfSupportedMetrics), PIDS: make(map[uint64]bool), ContainerID: containerID, PodName: podName, diff --git a/pkg/collector/stats/container_stats_test.go b/pkg/collector/stats/container_stats_test.go index fc53c446fb..6ad0f108ce 100644 --- a/pkg/collector/stats/container_stats_test.go +++ b/pkg/collector/stats/container_stats_test.go @@ -19,6 +19,7 @@ package stats import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/sustainable-computing-io/kepler/pkg/bpf" "github.com/sustainable-computing-io/kepler/pkg/config" ) @@ -26,7 +27,7 @@ var _ = Describe("Test Container Metric", func() { It("Test ResetDeltaValues", func() { SetMockedCollectorMetrics() - c := NewContainerStats("containerA", "podA", "test", "containerIDA", true) + c := NewContainerStats("containerA", "podA", "test", "containerIDA", bpf.DefaultSupportedMetrics()) c.ResourceUsage[config.CPUCycle].SetDeltaStat(MockedSocketID, 30000) c.ResourceUsage[config.CPUInstruction].SetDeltaStat(MockedSocketID, 30000) c.ResourceUsage[config.CacheMiss].SetDeltaStat(MockedSocketID, 30000) diff --git a/pkg/collector/stats/node_stats.go b/pkg/collector/stats/node_stats.go index e03227bef1..d68b7d0429 100644 --- a/pkg/collector/stats/node_stats.go +++ b/pkg/collector/stats/node_stats.go @@ -19,6 +19,7 @@ package stats import ( "fmt" + "github.com/sustainable-computing-io/kepler/pkg/bpf" "github.com/sustainable-computing-io/kepler/pkg/config" "github.com/sustainable-computing-io/kepler/pkg/sensors/accelerator/gpu" "github.com/sustainable-computing-io/kepler/pkg/utils" @@ -42,9 +43,9 @@ type NodeStats struct { IdleResUtilization map[string]uint64 } -func NewNodeStats(hardwareCountersEnabled bool) *NodeStats { +func NewNodeStats(bpfSupportedMetrics bpf.SupportedMetrics) *NodeStats { return &NodeStats{ - Stats: *NewStats(hardwareCountersEnabled), + Stats: *NewStats(bpfSupportedMetrics), IdleResUtilization: map[string]uint64{}, } } diff --git a/pkg/collector/stats/process_stats.go b/pkg/collector/stats/process_stats.go index 1f5086e499..ef575eccac 100644 --- a/pkg/collector/stats/process_stats.go +++ b/pkg/collector/stats/process_stats.go @@ -18,13 +18,8 @@ package stats import ( "fmt" -) -var ( - // ProcessMetricNames holds the list of names of the container metric - ProcessMetricNames []string - // ProcessFeaturesNames holds all the feature name of the container stats. This is specific for the machine-learning based models. - ProcessFeaturesNames []string + "github.com/sustainable-computing-io/kepler/pkg/bpf" ) type ProcessStats struct { @@ -38,14 +33,14 @@ type ProcessStats struct { } // NewProcessStats creates a new ProcessStats instance -func NewProcessStats(pid, cGroupID uint64, containerID, vmID, command string, hardwareCountersEnabled bool) *ProcessStats { +func NewProcessStats(pid, cGroupID uint64, containerID, vmID, command string, bpfSupportedMetrics bpf.SupportedMetrics) *ProcessStats { p := &ProcessStats{ PID: pid, CGroupID: cGroupID, ContainerID: containerID, VMID: vmID, Command: command, - Stats: *NewStats(hardwareCountersEnabled), + Stats: *NewStats(bpfSupportedMetrics), } return p } diff --git a/pkg/collector/stats/stats.go b/pkg/collector/stats/stats.go index 5a8ffe06b9..0bb23a022d 100644 --- a/pkg/collector/stats/stats.go +++ b/pkg/collector/stats/stats.go @@ -20,6 +20,7 @@ import ( "fmt" "strings" + "github.com/sustainable-computing-io/kepler/pkg/bpf" "github.com/sustainable-computing-io/kepler/pkg/collector/stats/types" "github.com/sustainable-computing-io/kepler/pkg/config" "github.com/sustainable-computing-io/kepler/pkg/sensors/accelerator/gpu" @@ -27,10 +28,6 @@ import ( ) var ( - // AvailableBPFSWCounters holds a list of eBPF counters that might be collected - AvailableBPFSWCounters []string - // AvailableBPFHWCounters holds a list of hardware counters that might be collected - AvailableBPFHWCounters []string // AvailableCGroupMetrics holds a list of cgroup metrics exposed by the cgroup that might be collected AvailableCGroupMetrics []string // AvailableAbsEnergyMetrics holds a list of absolute energy metrics @@ -39,8 +36,6 @@ var ( AvailableDynEnergyMetrics []string // AvailableIdleEnergyMetrics holds a list of idle energy metrics AvailableIdleEnergyMetrics []string - // CPUHardwareCounterEnabled defined if hardware counters should be accounted and exported - CPUHardwareCounterEnabled = false ) type Stats struct { @@ -49,7 +44,7 @@ type Stats struct { } // NewStats creates a new Stats instance -func NewStats(hardwareCountersEnabled bool) *Stats { +func NewStats(bpfSupportedMetrics bpf.SupportedMetrics) *Stats { m := &Stats{ ResourceUsage: make(map[string]*types.UInt64StatCollection), EnergyUsage: make(map[string]*types.UInt64StatCollection), @@ -66,8 +61,12 @@ func NewStats(hardwareCountersEnabled bool) *Stats { // initialize the resource utilization metrics in the map resMetrics := []string{} - resMetrics = append(resMetrics, AvailableBPFHWCounters...) - resMetrics = append(resMetrics, AvailableBPFSWCounters...) + for metricName := range bpfSupportedMetrics.HardwareCounters { + resMetrics = append(resMetrics, metricName) + } + for metricName := range bpfSupportedMetrics.SoftwareCounters { + resMetrics = append(resMetrics, metricName) + } // CGroup metrics are deprecated, it will be removed in the future resMetrics = append(resMetrics, AvailableCGroupMetrics...) for _, metricName := range resMetrics { @@ -83,7 +82,7 @@ func NewStats(hardwareCountersEnabled bool) *Stats { m.ResourceUsage[config.QATUtilization] = types.NewUInt64StatCollection() } - if config.IsExposeCPUFrequencyMetricsEnabled() && hardwareCountersEnabled { + if config.IsExposeCPUFrequencyMetricsEnabled() && bpfSupportedMetrics.HardwareCounters.Has(config.CPUFrequency) { m.ResourceUsage[config.CPUFrequency] = types.NewUInt64StatCollection() } diff --git a/pkg/collector/stats/stats_test.go b/pkg/collector/stats/stats_test.go index 194ad69794..dd7ffbbb9c 100644 --- a/pkg/collector/stats/stats_test.go +++ b/pkg/collector/stats/stats_test.go @@ -5,27 +5,26 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/sustainable-computing-io/kepler/pkg/bpf" "github.com/sustainable-computing-io/kepler/pkg/config" ) var _ = Describe("Stats", func() { It("Test InitAvailableParamAndMetrics", func() { config.ExposeHardwareCounterMetrics = false - clearPlatformDependentAvailability() - // why metric depends on cgroup? - // why here is a null pointer? - InitAvailableParamAndMetrics([]string{}, []string{}) + supportedMetrics := bpf.DefaultSupportedMetrics() + InitAvailableParamAndMetrics() if runtime.GOOS == "linux" { exp := []string{ config.BytesReadIO, config.BytesWriteIO, config.BlockDevicesIO, } - Expect(len(ProcessFeaturesNames) >= len(exp)).To(BeTrue()) + Expect(len(GetProcessFeatureNames(supportedMetrics)) >= len(exp)).To(BeTrue()) } if runtime.GOOS == "darwin" { exp := []string{config.BlockDevicesIO} - Expect(len(ProcessFeaturesNames) >= len(exp)).To(BeTrue()) + Expect(len(GetProcessFeatureNames(supportedMetrics)) >= len(exp)).To(BeTrue()) } }) }) diff --git a/pkg/collector/stats/test_utils.go b/pkg/collector/stats/test_utils.go index 21c833cc48..004e2b072b 100644 --- a/pkg/collector/stats/test_utils.go +++ b/pkg/collector/stats/test_utils.go @@ -19,6 +19,7 @@ package stats import ( "strconv" + "github.com/sustainable-computing-io/kepler/pkg/bpf" "github.com/sustainable-computing-io/kepler/pkg/config" "github.com/sustainable-computing-io/kepler/pkg/sensors/accelerator/gpu" "k8s.io/klog/v2" @@ -36,15 +37,6 @@ func SetMockedCollectorMetrics() { klog.Fatalln(err) } // initialize the Available metrics since they are used to create a new processMetrics instance - AvailableBPFHWCounters = []string{ - config.CPUCycle, - config.CPUInstruction, - config.CacheMiss, - } - AvailableBPFSWCounters = []string{ - config.CPUTime, - config.PageCacheHit, - } AvailableCGroupMetrics = []string{ config.CgroupfsMemory, config.CgroupfsKernelMemory, @@ -56,12 +48,6 @@ func SetMockedCollectorMetrics() { config.CgroupfsWriteIO, config.BlockDevicesIO, } - // ProcessFeaturesNames is used by the nodeMetrics to extract the resource usage. Only the metrics in ProcessFeaturesNames will be used. - ProcessFeaturesNames = []string{} - ProcessFeaturesNames = append(ProcessFeaturesNames, AvailableBPFSWCounters...) - ProcessFeaturesNames = append(ProcessFeaturesNames, AvailableBPFHWCounters...) - ProcessFeaturesNames = append(ProcessFeaturesNames, AvailableCGroupMetrics...) - AvailableAbsEnergyMetrics = []string{ config.AbsEnergyInCore, config.AbsEnergyInDRAM, config.AbsEnergyInUnCore, config.AbsEnergyInPkg, config.AbsEnergyInGPU, config.AbsEnergyInOther, config.AbsEnergyInPlatform, @@ -94,7 +80,7 @@ func createMockedProcessMetric(idx int) *ProcessStats { vmID := "vm" + strconv.Itoa(idx) command := "command" + strconv.Itoa(idx) uintPid := uint64(idx) - processMetrics := NewProcessStats(uintPid, uintPid, containerID, vmID, command, true) + processMetrics := NewProcessStats(uintPid, uintPid, containerID, vmID, command, bpf.DefaultSupportedMetrics()) // counter - attacher package processMetrics.ResourceUsage[config.CPUCycle].SetDeltaStat(MockedSocketID, 30000) processMetrics.ResourceUsage[config.CPUInstruction].SetDeltaStat(MockedSocketID, 30000) @@ -106,7 +92,7 @@ func createMockedProcessMetric(idx int) *ProcessStats { // CreateMockedNodeStats creates a node metric with power consumption and add the process resource utilization func CreateMockedNodeStats() NodeStats { - nodeMetrics := NewNodeStats(true) + nodeMetrics := NewNodeStats(bpf.DefaultSupportedMetrics()) // add power metrics // add first values to be the idle power nodeMetrics.EnergyUsage[config.AbsEnergyInPkg].SetDeltaStat(MockedSocketID, 5000) // mili joules diff --git a/pkg/collector/stats/utils.go b/pkg/collector/stats/utils.go index 4116591e04..2dd6148a78 100644 --- a/pkg/collector/stats/utils.go +++ b/pkg/collector/stats/utils.go @@ -27,6 +27,7 @@ import ( "k8s.io/klog/v2" + "github.com/sustainable-computing-io/kepler/pkg/bpf" "github.com/sustainable-computing-io/kepler/pkg/config" "github.com/sustainable-computing-io/kepler/pkg/sensors/accelerator/gpu" @@ -52,15 +53,12 @@ type CPUS struct { cpusInfo []CPUModelData } -func InitAvailableParamAndMetrics(enabledHwCounters, enabledSwCounters []string) { - AvailableBPFHWCounters = enabledHwCounters - AvailableBPFSWCounters = enabledSwCounters +func InitAvailableParamAndMetrics() { AvailableCGroupMetrics = []string{ config.CgroupfsMemory, config.CgroupfsKernelMemory, config.CgroupfsTCPMemory, config.CgroupfsCPU, config.CgroupfsSystemCPU, config.CgroupfsUserCPU, config.CgroupfsReadIO, config.CgroupfsWriteIO, config.BlockDevicesIO, } - CPUHardwareCounterEnabled = isCounterStatEnabled(config.CPUInstruction) AvailableAbsEnergyMetrics = []string{ config.AbsEnergyInCore, config.AbsEnergyInDRAM, config.AbsEnergyInUnCore, config.AbsEnergyInPkg, config.AbsEnergyInGPU, config.AbsEnergyInOther, config.AbsEnergyInPlatform, @@ -73,22 +71,19 @@ func InitAvailableParamAndMetrics(enabledHwCounters, enabledSwCounters []string) config.IdleEnergyInCore, config.IdleEnergyInDRAM, config.IdleEnergyInUnCore, config.IdleEnergyInPkg, config.IdleEnergyInGPU, config.IdleEnergyInOther, config.IdleEnergyInPlatform, } - - // defined in utils to init metrics - setEnabledProcessMetrics() } -func getProcessFeatureNames() []string { +func GetProcessFeatureNames(bpfSupportedMetrics bpf.SupportedMetrics) []string { var metrics []string // bpf software counter metrics - metrics = append(metrics, AvailableBPFSWCounters...) - klog.V(3).Infof("Available ebpf software counters: %v", AvailableBPFSWCounters) - + for counterKey := range bpfSupportedMetrics.SoftwareCounters { + metrics = append(metrics, counterKey) + } // bpf hardware counter metrics - if config.IsHCMetricsEnabled() { - metrics = append(metrics, AvailableBPFHWCounters...) - klog.V(3).Infof("Available ebpf hardware counters: %v", AvailableBPFHWCounters) + for counterKey := range bpfSupportedMetrics.HardwareCounters { + metrics = append(metrics, counterKey) } + klog.V(3).Infof("Available ebpf counters: %v", metrics) // gpu metric if config.EnabledGPU && gpu.IsGPUCollectionSupported() { @@ -105,20 +100,6 @@ func getProcessFeatureNames() []string { return metrics } -func setEnabledProcessMetrics() { - ProcessMetricNames = []string{} - ProcessFeaturesNames = getProcessFeatureNames() -} - -func isCounterStatEnabled(label string) bool { - for _, counter := range AvailableBPFHWCounters { - if counter == label { - return true - } - } - return false -} - func GetNodeName() string { if nodeName := os.Getenv("NODE_NAME"); nodeName != "" { return nodeName diff --git a/pkg/collector/stats/utils_test.go b/pkg/collector/stats/utils_test.go deleted file mode 100644 index 0e5ba2110d..0000000000 --- a/pkg/collector/stats/utils_test.go +++ /dev/null @@ -1,38 +0,0 @@ -package stats - -import ( - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - "github.com/sustainable-computing-io/kepler/pkg/config" -) - -func clearPlatformDependentAvailability() { - AvailableBPFSWCounters = []string{config.CPUTime} - AvailableBPFHWCounters = []string{} - AvailableCGroupMetrics = []string{} - - ProcessFeaturesNames = getProcessFeatureNames() -} - -var _ = Describe("Test Metric Unit", func() { - It("Test isCounterStatEnabled for True", func() { - AvailableBPFHWCounters = []string{config.BlockDevicesIO} - exp := isCounterStatEnabled(config.CPUTime) - Expect(exp).To(BeFalse()) - }) - - It("Test isCounterStatEnabled for False", func() { - AvailableBPFHWCounters = []string{config.BlockDevicesIO} - exp := isCounterStatEnabled("") - Expect(exp).To(BeFalse()) - }) - - It("Test setEnabledProcessMetrics", func() { - config.ExposeHardwareCounterMetrics = false - clearPlatformDependentAvailability() - setEnabledProcessMetrics() - exp := []string{config.CPUTime} - Expect(exp).To(Equal(ProcessFeaturesNames)) - }) -}) diff --git a/pkg/collector/stats/vm_stats.go b/pkg/collector/stats/vm_stats.go index ea2b34f20a..2855bfb705 100644 --- a/pkg/collector/stats/vm_stats.go +++ b/pkg/collector/stats/vm_stats.go @@ -16,6 +16,8 @@ limitations under the License. package stats +import "github.com/sustainable-computing-io/kepler/pkg/bpf" + var ( // VMMetricNames holds the list of names of the vm metric VMMetricNames []string @@ -34,11 +36,11 @@ type VMStats struct { } // NewVMStats creates a new VMStats instance -func NewVMStats(pid uint64, vmID string, hardwareCountersEnabled bool) *VMStats { +func NewVMStats(pid uint64, vmID string, bpfSupportedMetrics bpf.SupportedMetrics) *VMStats { vm := &VMStats{ PID: pid, VMID: vmID, - Stats: *NewStats(hardwareCountersEnabled), + Stats: *NewStats(bpfSupportedMetrics), } return vm } diff --git a/pkg/collector/stats/vm_stats_test.go b/pkg/collector/stats/vm_stats_test.go index a18c380dd9..1ba5ad2ddc 100644 --- a/pkg/collector/stats/vm_stats_test.go +++ b/pkg/collector/stats/vm_stats_test.go @@ -3,6 +3,7 @@ package stats import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/sustainable-computing-io/kepler/pkg/bpf" "github.com/sustainable-computing-io/kepler/pkg/config" ) @@ -10,7 +11,7 @@ var _ = Describe("VMMetric", func() { It("Test ResetDeltaValues", func() { SetMockedCollectorMetrics() - vm := NewVMStats(0, "name", true) + vm := NewVMStats(0, "name", bpf.DefaultSupportedMetrics()) vm.ResourceUsage[config.CPUTime].AddDeltaStat("socket0", 30000) vm.ResetDeltaValues() Expect(vm.ResourceUsage[config.CPUTime].SumAllDeltaValues()).To(Equal(uint64(0))) diff --git a/pkg/collector/utils.go b/pkg/collector/utils.go index 26917ea13a..18feced185 100644 --- a/pkg/collector/utils.go +++ b/pkg/collector/utils.go @@ -36,7 +36,7 @@ func (c *Collector) createContainerStatsIfNotExist(containerID string, cGroupID, if !kubernetes.IsWatcherEnabled { info, _ := cgroup.GetContainerInfo(cGroupID, pid, withCGroupID) c.ContainerStats[containerID] = stats.NewContainerStats( - info.ContainerName, info.PodName, info.Namespace, containerID, c.bpfExporter.HardwareCountersEnabled()) + info.ContainerName, info.PodName, info.Namespace, containerID, c.bpfSupportedMetrics) } else { name := utils.SystemProcessName namespace := utils.SystemProcessNamespace @@ -47,7 +47,7 @@ func (c *Collector) createContainerStatsIfNotExist(containerID string, cGroupID, } // We feel the info with generic values because the watcher will eventually update it. c.ContainerStats[containerID] = stats.NewContainerStats( - name, name, namespace, containerID, c.bpfExporter.HardwareCountersEnabled()) + name, name, namespace, containerID, c.bpfSupportedMetrics) } } else { // TODO set only the most resource intensive PID for the container diff --git a/pkg/config/config.go b/pkg/config/config.go index 6572f2a8b3..132488ccf4 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -453,17 +453,10 @@ func GetModelConfigMap() map[string]string { return configMap } -func IsHCMetricsEnabled() bool { - return ExposeHardwareCounterMetrics -} func IsCgroupMetricsEnabled() bool { return ExposeCgroupMetrics } -func IsIRQCounterMetricsEnabled() bool { - return ExposeIRQCounterMetrics -} - func SetGpuUsageMetric(metric string) { GpuUsageMetric = metric } diff --git a/pkg/kubernetes/watcher.go b/pkg/kubernetes/watcher.go index 3d873b1087..a6f3a3aeda 100644 --- a/pkg/kubernetes/watcher.go +++ b/pkg/kubernetes/watcher.go @@ -30,6 +30,7 @@ import ( "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" + "github.com/sustainable-computing-io/kepler/pkg/bpf" "github.com/sustainable-computing-io/kepler/pkg/collector/stats" "github.com/sustainable-computing-io/kepler/pkg/config" ) @@ -48,11 +49,11 @@ type ObjListWatcher struct { // Lock to syncronize the collector update with the watcher Mx *sync.Mutex - k8sCli *kubernetes.Clientset - ResourceKind string - informer cache.SharedInformer - stopChannel chan struct{} - hardwareCountersEnabled bool + k8sCli *kubernetes.Clientset + ResourceKind string + informer cache.SharedInformer + stopChannel chan struct{} + bpfSupportedMetrics bpf.SupportedMetrics // ContainerStats holds all container energy and resource usage metrics ContainerStats *map[string]*stats.ContainerStats @@ -82,11 +83,12 @@ func newK8sClient() *kubernetes.Clientset { return clientset } -func NewObjListWatcher() *ObjListWatcher { +func NewObjListWatcher(bpfSupportedMetrics bpf.SupportedMetrics) *ObjListWatcher { w := &ObjListWatcher{ - stopChannel: make(chan struct{}), - k8sCli: newK8sClient(), - ResourceKind: podResourceType, + stopChannel: make(chan struct{}), + k8sCli: newK8sClient(), + ResourceKind: podResourceType, + bpfSupportedMetrics: bpfSupportedMetrics, } if w.k8sCli == nil || !config.EnableAPIServer { return w @@ -206,7 +208,7 @@ func (w *ObjListWatcher) fillInfo(pod *k8sv1.Pod, containers []k8sv1.ContainerSt continue } if _, exist = (*w.ContainerStats)[containerID]; !exist { - (*w.ContainerStats)[containerID] = stats.NewContainerStats(containers[j].Name, pod.Name, pod.Namespace, containerID, w.hardwareCountersEnabled) + (*w.ContainerStats)[containerID] = stats.NewContainerStats(containers[j].Name, pod.Name, pod.Namespace, containerID, w.bpfSupportedMetrics) } klog.V(5).Infof("receiving container %s %s %s %s", containers[j].Name, pod.Name, pod.Namespace, containerID) (*w.ContainerStats)[containerID].ContainerName = containers[j].Name diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index de52673462..c233946438 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -43,15 +43,16 @@ type CollectorManager struct { func New(bpfExporter bpf.Exporter) *CollectorManager { manager := &CollectorManager{} + supportedMetrics := bpfExporter.SupportedMetrics() manager.StatsCollector = collector.NewCollector(bpfExporter) - manager.PrometheusCollector = exporter.NewPrometheusExporter() + manager.PrometheusCollector = exporter.NewPrometheusExporter(supportedMetrics) // the collector and prometheusExporter share structures and collections manager.PrometheusCollector.NewProcessCollector(manager.StatsCollector.ProcessStats) manager.PrometheusCollector.NewContainerCollector(manager.StatsCollector.ContainerStats) manager.PrometheusCollector.NewVMCollector(manager.StatsCollector.VMStats) manager.PrometheusCollector.NewNodeCollector(&manager.StatsCollector.NodeStats) - // configure the wather - manager.Watcher = kubernetes.NewObjListWatcher() + // configure the watcher + manager.Watcher = kubernetes.NewObjListWatcher(supportedMetrics) manager.Watcher.Mx = &manager.PrometheusCollector.Mx manager.Watcher.ContainerStats = &manager.StatsCollector.ContainerStats manager.Watcher.Run() diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 40fbb7bd29..07c55f2f89 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -12,7 +12,7 @@ import ( var _ = Describe("Manager", func() { It("Should work properly", func() { - bpfExporter := bpf.NewMockExporter(true) + bpfExporter := bpf.NewMockExporter(bpf.DefaultSupportedMetrics()) CollectorManager := New(bpfExporter) err := CollectorManager.Start() Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/metrics/consts/conts.go b/pkg/metrics/consts/conts.go index 1a467b0244..735fab6da5 100644 --- a/pkg/metrics/consts/conts.go +++ b/pkg/metrics/consts/conts.go @@ -70,21 +70,6 @@ var ( config.IdleEnergyInGPU, config.IdleEnergyInPlatform, } - HCMetricNames = []string{ - config.CPUCycle, - config.CPUInstruction, - config.CacheMiss, - } - SCMetricNames = []string{ - config.CPUTime, - config.TaskClock, - config.PageCacheHit, - } - IRQMetricNames = []string{ - config.IRQNetTXLabel, - config.IRQNetRXLabel, - config.IRQBlockLabel, - } CGroupMetricNames = []string{ config.CgroupfsCPU, config.CgroupfsMemory, diff --git a/pkg/metrics/container/metrics.go b/pkg/metrics/container/metrics.go index a6b6f41ddd..36df11f0df 100644 --- a/pkg/metrics/container/metrics.go +++ b/pkg/metrics/container/metrics.go @@ -20,6 +20,7 @@ import ( "sync" "github.com/prometheus/client_golang/prometheus" + "github.com/sustainable-computing-io/kepler/pkg/bpf" "github.com/sustainable-computing-io/kepler/pkg/collector/stats" "github.com/sustainable-computing-io/kepler/pkg/config" "github.com/sustainable-computing-io/kepler/pkg/metrics/consts" @@ -41,14 +42,17 @@ type collector struct { // Lock to syncronize the collector update with prometheus exporter Mx *sync.Mutex + + bpfSupportedMetrics bpf.SupportedMetrics } -func NewContainerCollector(containerMetrics map[string]*stats.ContainerStats, mx *sync.Mutex) prometheus.Collector { +func NewContainerCollector(containerMetrics map[string]*stats.ContainerStats, mx *sync.Mutex, bpfSupportedMetrics bpf.SupportedMetrics) prometheus.Collector { c := &collector{ - ContainerStats: containerMetrics, - descriptions: make(map[string]*prometheus.Desc), - collectors: make(map[string]metricfactory.PromMetric), - Mx: mx, + ContainerStats: containerMetrics, + descriptions: make(map[string]*prometheus.Desc), + collectors: make(map[string]metricfactory.PromMetric), + Mx: mx, + bpfSupportedMetrics: bpfSupportedMetrics, } c.initMetrics() return c @@ -59,15 +63,11 @@ func (c *collector) initMetrics() { if !config.IsExposeContainerStatsEnabled() { return } - for name, desc := range metricfactory.HCMetricsPromDesc(context) { + for name, desc := range metricfactory.HCMetricsPromDesc(context, c.bpfSupportedMetrics) { c.descriptions[name] = desc c.collectors[name] = metricfactory.NewPromCounter(desc) } - for name, desc := range metricfactory.SCMetricsPromDesc(context) { - c.descriptions[name] = desc - c.collectors[name] = metricfactory.NewPromCounter(desc) - } - for name, desc := range metricfactory.IRQMetricsPromDesc(context) { + for name, desc := range metricfactory.SCMetricsPromDesc(context, c.bpfSupportedMetrics) { c.descriptions[name] = desc c.collectors[name] = metricfactory.NewPromCounter(desc) } @@ -99,8 +99,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { c.Mx.Lock() for _, container := range c.ContainerStats { utils.CollectEnergyMetrics(ch, container, c.collectors) - utils.CollectResUtilizationMetrics(ch, container, c.collectors) - + utils.CollectResUtilizationMetrics(ch, container, c.collectors, c.bpfSupportedMetrics) // update container total joules c.collectTotalEnergyMetrics(ch, container) } diff --git a/pkg/metrics/metricfactory/metric_factory.go b/pkg/metrics/metricfactory/metric_factory.go index 6f12d17c25..2fb03c29d1 100644 --- a/pkg/metrics/metricfactory/metric_factory.go +++ b/pkg/metrics/metricfactory/metric_factory.go @@ -20,6 +20,7 @@ import ( "strings" "github.com/prometheus/client_golang/prometheus" + "github.com/sustainable-computing-io/kepler/pkg/bpf" "github.com/sustainable-computing-io/kepler/pkg/config" "github.com/sustainable-computing-io/kepler/pkg/metrics/consts" modeltypes "github.com/sustainable-computing-io/kepler/pkg/model/types" @@ -65,30 +66,18 @@ func energyMetricsPromDesc(context, name, source string) (desc *prometheus.Desc) return MetricsPromDesc(context, name, consts.EnergyMetricNameSuffix, source, labels) } -func HCMetricsPromDesc(context string) (descriptions map[string]*prometheus.Desc) { +func HCMetricsPromDesc(context string, bpfSupportedMetrics bpf.SupportedMetrics) (descriptions map[string]*prometheus.Desc) { descriptions = make(map[string]*prometheus.Desc) - if config.IsHCMetricsEnabled() { - for _, name := range consts.HCMetricNames { - descriptions[name] = resMetricsPromDesc(context, name, "bpf") - } - } - return descriptions -} - -func SCMetricsPromDesc(context string) (descriptions map[string]*prometheus.Desc) { - descriptions = make(map[string]*prometheus.Desc) - for _, name := range consts.SCMetricNames { + for name := range bpfSupportedMetrics.HardwareCounters { descriptions[name] = resMetricsPromDesc(context, name, "bpf") } return descriptions } -func IRQMetricsPromDesc(context string) (descriptions map[string]*prometheus.Desc) { +func SCMetricsPromDesc(context string, bpfSupportedMetrics bpf.SupportedMetrics) (descriptions map[string]*prometheus.Desc) { descriptions = make(map[string]*prometheus.Desc) - if config.IsIRQCounterMetricsEnabled() { - for _, name := range consts.IRQMetricNames { - descriptions[name] = resMetricsPromDesc(context, name, "bpf") - } + for name := range bpfSupportedMetrics.SoftwareCounters { + descriptions[name] = resMetricsPromDesc(context, name, "bpf") } return descriptions } diff --git a/pkg/metrics/process/metrics.go b/pkg/metrics/process/metrics.go index 0cf029f9a0..82035bf2d1 100644 --- a/pkg/metrics/process/metrics.go +++ b/pkg/metrics/process/metrics.go @@ -20,6 +20,7 @@ import ( "sync" "github.com/prometheus/client_golang/prometheus" + "github.com/sustainable-computing-io/kepler/pkg/bpf" "github.com/sustainable-computing-io/kepler/pkg/collector/stats" "github.com/sustainable-computing-io/kepler/pkg/config" "github.com/sustainable-computing-io/kepler/pkg/metrics/metricfactory" @@ -40,14 +41,17 @@ type collector struct { // Lock to syncronize the collector update with prometheus exporter Mx *sync.Mutex + + bpfSupportedMetrics bpf.SupportedMetrics } -func NewProcessCollector(processMetrics map[uint64]*stats.ProcessStats, mx *sync.Mutex) prometheus.Collector { +func NewProcessCollector(processMetrics map[uint64]*stats.ProcessStats, mx *sync.Mutex, bpfSupportedMetrics bpf.SupportedMetrics) prometheus.Collector { c := &collector{ - ProcessStats: processMetrics, - descriptions: make(map[string]*prometheus.Desc), - collectors: make(map[string]metricfactory.PromMetric), - Mx: mx, + ProcessStats: processMetrics, + descriptions: make(map[string]*prometheus.Desc), + collectors: make(map[string]metricfactory.PromMetric), + Mx: mx, + bpfSupportedMetrics: bpfSupportedMetrics, } c.initMetrics() return c @@ -58,15 +62,11 @@ func (c *collector) initMetrics() { if !config.IsExposeProcessStatsEnabled() { return } - for name, desc := range metricfactory.HCMetricsPromDesc(context) { - c.descriptions[name] = desc - c.collectors[name] = metricfactory.NewPromCounter(desc) - } - for name, desc := range metricfactory.SCMetricsPromDesc(context) { + for name, desc := range metricfactory.HCMetricsPromDesc(context, c.bpfSupportedMetrics) { c.descriptions[name] = desc c.collectors[name] = metricfactory.NewPromCounter(desc) } - for name, desc := range metricfactory.IRQMetricsPromDesc(context) { + for name, desc := range metricfactory.SCMetricsPromDesc(context, c.bpfSupportedMetrics) { c.descriptions[name] = desc c.collectors[name] = metricfactory.NewPromCounter(desc) } @@ -94,7 +94,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { c.Mx.Lock() for _, process := range c.ProcessStats { utils.CollectEnergyMetrics(ch, process, c.collectors) - utils.CollectResUtilizationMetrics(ch, process, c.collectors) + utils.CollectResUtilizationMetrics(ch, process, c.collectors, c.bpfSupportedMetrics) } c.Mx.Unlock() } diff --git a/pkg/metrics/prometheus_collector.go b/pkg/metrics/prometheus_collector.go index 6afb48c0dd..98f5b379d8 100644 --- a/pkg/metrics/prometheus_collector.go +++ b/pkg/metrics/prometheus_collector.go @@ -21,6 +21,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/version" + "github.com/sustainable-computing-io/kepler/pkg/bpf" "github.com/sustainable-computing-io/kepler/pkg/collector/stats" "github.com/sustainable-computing-io/kepler/pkg/config" "github.com/sustainable-computing-io/kepler/pkg/metrics/container" @@ -44,26 +45,30 @@ type PrometheusExporter struct { // Lock to syncronize the collector update with prometheus exporter Mx sync.Mutex + + bpfSupportedMetrics bpf.SupportedMetrics } // NewPrometheusExporter creates a new prometheus exporter -func NewPrometheusExporter() *PrometheusExporter { - return &PrometheusExporter{} +func NewPrometheusExporter(bpfSupportedMetrics bpf.SupportedMetrics) *PrometheusExporter { + return &PrometheusExporter{ + bpfSupportedMetrics: bpfSupportedMetrics, + } } // NewProcessCollector creates a new prometheus collector for process metrics func (e *PrometheusExporter) NewProcessCollector(processMetrics map[uint64]*stats.ProcessStats) { - e.ProcessStatsCollector = process.NewProcessCollector(processMetrics, &e.Mx) + e.ProcessStatsCollector = process.NewProcessCollector(processMetrics, &e.Mx, e.bpfSupportedMetrics) } // NewContainerCollector creates a new prometheus collector for container metrics func (e *PrometheusExporter) NewContainerCollector(containerMetrics map[string]*stats.ContainerStats) { - e.ContainerStatsCollector = container.NewContainerCollector(containerMetrics, &e.Mx) + e.ContainerStatsCollector = container.NewContainerCollector(containerMetrics, &e.Mx, e.bpfSupportedMetrics) } // NewVMCollector creates a new prometheus collector for vm metrics func (e *PrometheusExporter) NewVMCollector(vmMetrics map[string]*stats.VMStats) { - e.VMStatsCollector = virtualmachine.NewVMCollector(vmMetrics, &e.Mx) + e.VMStatsCollector = virtualmachine.NewVMCollector(vmMetrics, &e.Mx, e.bpfSupportedMetrics) } // NewNodeCollector creates a new prometheus collector for node metrics diff --git a/pkg/metrics/prometheus_collector_test.go b/pkg/metrics/prometheus_collector_test.go index 48b90ae9ff..3ded24a329 100644 --- a/pkg/metrics/prometheus_collector_test.go +++ b/pkg/metrics/prometheus_collector_test.go @@ -70,7 +70,7 @@ var _ = Describe("Test Prometheus Collector Unit", func() { processStats := stats.CreateMockedProcessStats(2) nodeStats := stats.CreateMockedNodeStats() - bpfExporter := bpf.NewMockExporter(true) + bpfExporter := bpf.NewMockExporter(bpf.DefaultSupportedMetrics()) metricCollector := collector.NewCollector(bpfExporter) metricCollector.ProcessStats = processStats metricCollector.NodeStats = nodeStats @@ -78,7 +78,8 @@ var _ = Describe("Test Prometheus Collector Unit", func() { metricCollector.AggregateProcessResourceUtilizationMetrics() // the collector and prometheusExporter share structures and collections - exporter := NewPrometheusExporter() + bpfSupportedMetrics := bpfExporter.SupportedMetrics() + exporter := NewPrometheusExporter(bpfSupportedMetrics) exporter.NewProcessCollector(metricCollector.ProcessStats) exporter.NewContainerCollector(metricCollector.ContainerStats) exporter.NewVMCollector(metricCollector.VMStats) @@ -86,10 +87,10 @@ var _ = Describe("Test Prometheus Collector Unit", func() { nodeStats.UpdateDynEnergy() - model.CreatePowerEstimatorModels(stats.ProcessFeaturesNames, + model.CreatePowerEstimatorModels(stats.GetProcessFeatureNames(bpfSupportedMetrics), stats.NodeMetadataFeatureNames, stats.NodeMetadataFeatureValues, - true) + bpfSupportedMetrics) model.UpdateProcessEnergy(processStats, &nodeStats) // get metrics from prometheus diff --git a/pkg/metrics/utils/utils.go b/pkg/metrics/utils/utils.go index 9803d68fa9..b1c299a2b3 100644 --- a/pkg/metrics/utils/utils.go +++ b/pkg/metrics/utils/utils.go @@ -20,6 +20,7 @@ import ( "strconv" "github.com/prometheus/client_golang/prometheus" + "github.com/sustainable-computing-io/kepler/pkg/bpf" "github.com/sustainable-computing-io/kepler/pkg/collector/stats" "github.com/sustainable-computing-io/kepler/pkg/config" "github.com/sustainable-computing-io/kepler/pkg/metrics/consts" @@ -42,32 +43,21 @@ func CollectEnergyMetrics(ch chan<- prometheus.Metric, instance interface{}, col } } -func CollectResUtilizationMetrics(ch chan<- prometheus.Metric, instance interface{}, collectors map[string]metricfactory.PromMetric) { +func CollectResUtilizationMetrics(ch chan<- prometheus.Metric, instance interface{}, collectors map[string]metricfactory.PromMetric, bpfSupportedMetrics bpf.SupportedMetrics) { // collect the BPF Software Counters - for _, collectorName := range consts.SCMetricNames { + for collectorName := range bpfSupportedMetrics.SoftwareCounters { CollectResUtil(ch, instance, collectorName, collectors[collectorName]) } - - if config.IsIRQCounterMetricsEnabled() { - for _, collectorName := range consts.IRQMetricNames { - CollectResUtil(ch, instance, collectorName, collectors[collectorName]) - } - } - - // collect the BPF Hardware Counters - if config.IsHCMetricsEnabled() { - for _, collectorName := range consts.HCMetricNames { - CollectResUtil(ch, instance, collectorName, collectors[collectorName]) - } + for collectorName := range bpfSupportedMetrics.HardwareCounters { + CollectResUtil(ch, instance, collectorName, collectors[collectorName]) } - // collect the deprecated cGroup metrics, this metrics will be removed in the future if config.IsCgroupMetricsEnabled() { for _, collectorName := range consts.CGroupMetricNames { CollectResUtil(ch, instance, collectorName, collectors[collectorName]) } } - + klog.Info("Collecting GPU metrics") if config.EnabledGPU && gpu.IsGPUCollectionSupported() { for _, collectorName := range consts.GPUMetricNames { CollectResUtil(ch, instance, collectorName, collectors[collectorName]) @@ -138,6 +128,10 @@ func CollectResUtil(ch chan<- prometheus.Metric, instance interface{}, metricNam collect(ch, collector, value, labelValues) } } else { + if _, exist := container.ResourceUsage[metricName]; !exist { + klog.Errorf("ContainerStats %s does not have metric %s\n", container.ContainerID, metricName) + return + } value = float64(container.ResourceUsage[metricName].SumAllAggrValues()) labelValues = []string{container.ContainerID, container.PodName, container.ContainerName, container.Namespace} collect(ch, collector, value, labelValues) diff --git a/pkg/metrics/virtualmachine/metrics.go b/pkg/metrics/virtualmachine/metrics.go index 79050d9a22..f389b74ea3 100644 --- a/pkg/metrics/virtualmachine/metrics.go +++ b/pkg/metrics/virtualmachine/metrics.go @@ -20,6 +20,7 @@ import ( "sync" "github.com/prometheus/client_golang/prometheus" + "github.com/sustainable-computing-io/kepler/pkg/bpf" "github.com/sustainable-computing-io/kepler/pkg/collector/stats" "github.com/sustainable-computing-io/kepler/pkg/config" "github.com/sustainable-computing-io/kepler/pkg/metrics/metricfactory" @@ -40,14 +41,17 @@ type collector struct { // Lock to syncronize the collector update with prometheus exporter Mx *sync.Mutex + + bpfSupportedMetrics bpf.SupportedMetrics } -func NewVMCollector(vmMetrics map[string]*stats.VMStats, mx *sync.Mutex) prometheus.Collector { +func NewVMCollector(vmMetrics map[string]*stats.VMStats, mx *sync.Mutex, bpfSupportedMetrics bpf.SupportedMetrics) prometheus.Collector { c := &collector{ - VMStats: vmMetrics, - descriptions: make(map[string]*prometheus.Desc), - collectors: make(map[string]metricfactory.PromMetric), - Mx: mx, + VMStats: vmMetrics, + descriptions: make(map[string]*prometheus.Desc), + collectors: make(map[string]metricfactory.PromMetric), + Mx: mx, + bpfSupportedMetrics: bpfSupportedMetrics, } c.initMetrics() return c @@ -58,15 +62,11 @@ func (c *collector) initMetrics() { if !config.IsExposeVMStatsEnabled() { return } - for name, desc := range metricfactory.HCMetricsPromDesc(context) { - c.descriptions[name] = desc - c.collectors[name] = metricfactory.NewPromCounter(desc) - } - for name, desc := range metricfactory.SCMetricsPromDesc(context) { + for name, desc := range metricfactory.HCMetricsPromDesc(context, c.bpfSupportedMetrics) { c.descriptions[name] = desc c.collectors[name] = metricfactory.NewPromCounter(desc) } - for name, desc := range metricfactory.IRQMetricsPromDesc(context) { + for name, desc := range metricfactory.SCMetricsPromDesc(context, c.bpfSupportedMetrics) { c.descriptions[name] = desc c.collectors[name] = metricfactory.NewPromCounter(desc) } @@ -90,7 +90,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { c.Mx.Lock() for _, vm := range c.VMStats { utils.CollectEnergyMetrics(ch, vm, c.collectors) - utils.CollectResUtilizationMetrics(ch, vm, c.collectors) + utils.CollectResUtilizationMetrics(ch, vm, c.collectors, c.bpfSupportedMetrics) } c.Mx.Unlock() } diff --git a/pkg/model/benchmark_test.go b/pkg/model/benchmark_test.go index cebbfcc506..71f3950766 100644 --- a/pkg/model/benchmark_test.go +++ b/pkg/model/benchmark_test.go @@ -33,7 +33,7 @@ func benchmarkNtesting(b *testing.B, processNumber int) { // enable metrics stats.SetMockedCollectorMetrics() // create node node metrics - bpfExporter := bpf.NewMockExporter(true) + bpfExporter := bpf.NewMockExporter(bpf.DefaultSupportedMetrics()) metricCollector := collector.NewCollector(bpfExporter) // create processes @@ -43,7 +43,8 @@ func benchmarkNtesting(b *testing.B, processNumber int) { metricCollector.AggregateProcessResourceUtilizationMetrics() // The default estimator model is the ratio - model.CreatePowerEstimatorModels(stats.ProcessFeaturesNames, stats.NodeMetadataFeatureNames, stats.NodeMetadataFeatureValues, true) + bpfSupportedMetrics := bpf.DefaultSupportedMetrics() + model.CreatePowerEstimatorModels(stats.GetProcessFeatureNames(bpfSupportedMetrics), stats.NodeMetadataFeatureNames, stats.NodeMetadataFeatureValues, bpfSupportedMetrics) // update container and node metrics b.ReportAllocs() diff --git a/pkg/model/model.go b/pkg/model/model.go index 1755464967..82cc106613 100644 --- a/pkg/model/model.go +++ b/pkg/model/model.go @@ -20,6 +20,7 @@ import ( "fmt" "strings" + "github.com/sustainable-computing-io/kepler/pkg/bpf" "github.com/sustainable-computing-io/kepler/pkg/config" "github.com/sustainable-computing-io/kepler/pkg/model/estimator/local" "github.com/sustainable-computing-io/kepler/pkg/model/estimator/local/regressor" @@ -73,9 +74,9 @@ type PowerModelInterface interface { } // CreatePowerEstimatorModels checks validity of power model and set estimate functions -func CreatePowerEstimatorModels(processFeatureNames, systemMetaDataFeatureNames, systemMetaDataFeatureValues []string, hardwareCountersEnabled bool) { +func CreatePowerEstimatorModels(processFeatureNames, systemMetaDataFeatureNames, systemMetaDataFeatureValues []string, bpfSupportedMetrics bpf.SupportedMetrics) { config.InitModelConfigMap() - CreateProcessPowerEstimatorModel(processFeatureNames, systemMetaDataFeatureNames, systemMetaDataFeatureValues, hardwareCountersEnabled) + CreateProcessPowerEstimatorModel(processFeatureNames, systemMetaDataFeatureNames, systemMetaDataFeatureValues, bpfSupportedMetrics) // Node power estimator uses the process features to estimate node power, expect for the Ratio power model that contains additional metrics. CreateNodePlatformPoweEstimatorModel(processFeatureNames, systemMetaDataFeatureNames, systemMetaDataFeatureValues) CreateNodeComponentPoweEstimatorModel(processFeatureNames, systemMetaDataFeatureNames, systemMetaDataFeatureValues) diff --git a/pkg/model/process_energy.go b/pkg/model/process_energy.go index b14e003744..9419724264 100644 --- a/pkg/model/process_energy.go +++ b/pkg/model/process_energy.go @@ -19,6 +19,7 @@ package model import ( "fmt" + "github.com/sustainable-computing-io/kepler/pkg/bpf" "github.com/sustainable-computing-io/kepler/pkg/collector/stats" "github.com/sustainable-computing-io/kepler/pkg/config" "github.com/sustainable-computing-io/kepler/pkg/model/types" @@ -34,7 +35,7 @@ var ( ) // createProcessPowerModelConfig: the process component power model must be set by default. -func createProcessPowerModelConfig(powerSourceTarget string, processFeatureNames, systemMetaDataFeatureNames, systemMetaDataFeatureValues []string, energySource string, hardwareCountersEnabled bool) (modelConfig *types.ModelConfig) { +func createProcessPowerModelConfig(powerSourceTarget string, processFeatureNames, systemMetaDataFeatureNames, systemMetaDataFeatureValues []string, energySource string, bpfSupportedMetrics bpf.SupportedMetrics) (modelConfig *types.ModelConfig) { modelConfig = CreatePowerModelConfig(powerSourceTarget) if modelConfig == nil { return nil @@ -53,7 +54,7 @@ func createProcessPowerModelConfig(powerSourceTarget string, processFeatureNames pkgUsageMetric := config.CoreUsageMetric coreUsageMetric := config.CoreUsageMetric dramUsageMetric := config.DRAMUsageMetric - if !hardwareCountersEnabled { + if !bpfSupportedMetrics.HardwareCounters.Has(config.CPUTime) { // Given that there is no HW counter in some scenarios (e.g. on VMs), we have to use CPUTime data. // Although a busy CPU is more likely to be accessing memory the CPU utilization (CPUTime) does not directly // represent memory access, but it remains the only viable proxy available to approximate such information. @@ -86,7 +87,7 @@ func createProcessPowerModelConfig(powerSourceTarget string, processFeatureNames }...) } else if powerSourceTarget == config.ProcessPlatformPowerKey { platformUsageMetric := config.CoreUsageMetric - if !hardwareCountersEnabled { + if !bpfSupportedMetrics.HardwareCounters.Has(config.CPUTime) { // Given that there is no HW counter in some scenarios (e.g. on VMs), we have to use CPUTime data. platformUsageMetric = config.CPUTime } @@ -104,9 +105,9 @@ func createProcessPowerModelConfig(powerSourceTarget string, processFeatureNames return modelConfig } -func CreateProcessPowerEstimatorModel(processFeatureNames, systemMetaDataFeatureNames, systemMetaDataFeatureValues []string, hardwareCountersEnabled bool) { +func CreateProcessPowerEstimatorModel(processFeatureNames, systemMetaDataFeatureNames, systemMetaDataFeatureValues []string, bpfSupportedMetrics bpf.SupportedMetrics) { var err error - modelConfig := createProcessPowerModelConfig(config.ProcessPlatformPowerKey, processFeatureNames, systemMetaDataFeatureNames, systemMetaDataFeatureValues, types.PlatformEnergySource, hardwareCountersEnabled) + modelConfig := createProcessPowerModelConfig(config.ProcessPlatformPowerKey, processFeatureNames, systemMetaDataFeatureNames, systemMetaDataFeatureValues, types.PlatformEnergySource, bpfSupportedMetrics) modelConfig.IsNodePowerModel = false ProcessPlatformPowerModel, err = createPowerModelEstimator(modelConfig) if err == nil { @@ -116,7 +117,7 @@ func CreateProcessPowerEstimatorModel(processFeatureNames, systemMetaDataFeature klog.Infof("Failed to create %s Power Model to estimate Process Platform Power: %v\n", modelConfig.ModelType.String()+"/"+modelConfig.ModelOutputType.String(), err) } - modelConfig = createProcessPowerModelConfig(config.ProcessComponentsPowerKey, processFeatureNames, systemMetaDataFeatureNames, systemMetaDataFeatureValues, types.ComponentEnergySource, hardwareCountersEnabled) + modelConfig = createProcessPowerModelConfig(config.ProcessComponentsPowerKey, processFeatureNames, systemMetaDataFeatureNames, systemMetaDataFeatureValues, types.ComponentEnergySource, bpfSupportedMetrics) modelConfig.IsNodePowerModel = false ProcessComponentPowerModel, err = createPowerModelEstimator(modelConfig) if err == nil { diff --git a/pkg/model/process_energy_test.go b/pkg/model/process_energy_test.go index 60920bd3b7..67b1f576f5 100644 --- a/pkg/model/process_energy_test.go +++ b/pkg/model/process_energy_test.go @@ -21,6 +21,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/sustainable-computing-io/kepler/pkg/bpf" "github.com/sustainable-computing-io/kepler/pkg/collector/stats" "github.com/sustainable-computing-io/kepler/pkg/config" "github.com/sustainable-computing-io/kepler/pkg/sensors/components" @@ -67,7 +68,8 @@ var _ = Describe("ProcessPower", func() { os.Setenv("MODEL_CONFIG", configStr) // getEstimatorMetrics - CreatePowerEstimatorModels(stats.ProcessFeaturesNames, systemMetaDataFeatureNames, systemMetaDataFeatureValues, true) + bpfSupportedMetrics := bpf.DefaultSupportedMetrics() + CreatePowerEstimatorModels(stats.GetProcessFeatureNames(bpfSupportedMetrics), systemMetaDataFeatureNames, systemMetaDataFeatureValues, bpfSupportedMetrics) // initialize the node energy with aggregated energy, which will be used to calculate delta energy // add first values to be the idle power @@ -102,7 +104,8 @@ var _ = Describe("ProcessPower", func() { os.Setenv("MODEL_CONFIG", configStr) // getEstimatorMetrics - CreatePowerEstimatorModels(stats.ProcessFeaturesNames, systemMetaDataFeatureNames, systemMetaDataFeatureValues, true) + bpfSupportedMetrics := bpf.DefaultSupportedMetrics() + CreatePowerEstimatorModels(stats.GetProcessFeatureNames(bpfSupportedMetrics), systemMetaDataFeatureNames, systemMetaDataFeatureValues, bpfSupportedMetrics) // initialize the node energy with aggregated energy, which will be used to calculate delta energy // add first values to be the idle power