Skip to content

Commit

Permalink
Fix Node and container resource limit metrics missing intermittently (#…
Browse files Browse the repository at this point in the history
…41453) (#41484)

* Fix Pod and container resource limit metrics missing intermittently

* Add another exception to typecheck linter

(cherry picked from commit e7cc6fc)

Co-authored-by: Mikołaj Świątek <[email protected]>
  • Loading branch information
mergify[bot] and swiatekm authored Oct 30, 2024
1 parent 05125a9 commit f5ecf88
Show file tree
Hide file tree
Showing 3 changed files with 341 additions and 74 deletions.
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ issues:
- text: "imported and not used"
linters:
- typecheck
- text: "previous case"
linters:
- typecheck
# From mage we are printing to the console to ourselves
- path: (.*magefile.go|.*dev-tools/mage/.*)
linters: forbidigo
Expand Down
155 changes: 94 additions & 61 deletions metricbeat/module/kubernetes/util/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
k8sclientmeta "k8s.io/client-go/metadata"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
k8sresource "k8s.io/apimachinery/pkg/api/resource"

"github.com/elastic/elastic-agent-autodiscover/kubernetes"
"github.com/elastic/elastic-agent-autodiscover/kubernetes/metadata"
Expand Down Expand Up @@ -101,7 +101,8 @@ type metaWatcher struct {

metricsetsUsing []string // list of metricsets using this shared watcher(e.g. pod, container, state_pod)

enrichers map[string]*enricher // map of enrichers using this watcher. The key is the metricset name. Each metricset has its own enricher
enrichers map[string]*enricher // map of enrichers using this watcher. The key is the metricset name. Each metricset has its own enricher
metricsRepo *MetricsRepo // used to update container metrics derived from metadata, like resource limits

nodeScope bool // whether this watcher should watch for resources in current node or in whole cluster
restartWatcher kubernetes.Watcher // whether this watcher needs a restart. Only relevant in leader nodes due to metricsets with different nodescope(pod, state_pod)
Expand Down Expand Up @@ -311,6 +312,7 @@ func createWatcher(
client k8sclient.Interface,
metadataClient k8sclientmeta.Interface,
resourceWatchers *Watchers,
metricsRepo *MetricsRepo,
namespace string,
extraWatcher bool) (bool, error) {

Expand Down Expand Up @@ -388,22 +390,73 @@ func createWatcher(
watcher: watcher,
started: false, // not started yet
enrichers: make(map[string]*enricher),
metricsRepo: metricsRepo,
metricsetsUsing: make([]string, 0),
restartWatcher: nil,
nodeScope: nodeScope,
}
resourceWatchers.metaWatchersMap[resourceName] = resourceMetaWatcher

// Add event handlers to the watcher. The only action we need to do here is invalidate the enricher cache.
addEventHandlerToWatcher(resourceMetaWatcher, resourceWatchers)
addEventHandlersToWatcher(resourceMetaWatcher, resourceWatchers)

return true, nil
}

// addEventHandlerToWatcher adds an event handler to the watcher that invalidates the cache of enrichers attached
// to the watcher.
func addEventHandlerToWatcher(metaWatcher *metaWatcher, resourceWatchers *Watchers) {
notifyFunc := func(obj interface{}) {
// addEventHandlerToWatcher adds an event handlers to the watcher that invalidate the cache of enrichers attached
// to the watcher and update container metrics on Pod change events.
func addEventHandlersToWatcher(
metaWatcher *metaWatcher,
resourceWatchers *Watchers,
) {
containerMetricsUpdateFunc := func(pod *kubernetes.Pod) {
nodeStore, _ := metaWatcher.metricsRepo.AddNodeStore(pod.Spec.NodeName)
podId := NewPodId(pod.Namespace, pod.Name)
podStore, _ := nodeStore.AddPodStore(podId)

for _, container := range append(pod.Spec.Containers, pod.Spec.InitContainers...) {
metrics := NewContainerMetrics()

if cpu, ok := container.Resources.Limits["cpu"]; ok {
if q, err := k8sresource.ParseQuantity(cpu.String()); err == nil {
metrics.CoresLimit = NewFloat64Metric(float64(q.MilliValue()) / 1000)
}
}
if memory, ok := container.Resources.Limits["memory"]; ok {
if q, err := k8sresource.ParseQuantity(memory.String()); err == nil {
metrics.MemoryLimit = NewFloat64Metric(float64(q.Value()))
}
}

containerStore, _ := podStore.AddContainerStore(container.Name)
containerStore.SetContainerMetrics(metrics)
}
}

containerMetricsDeleteFunc := func(pod *kubernetes.Pod) {
podId := NewPodId(pod.Namespace, pod.Name)
nodeStore := metaWatcher.metricsRepo.GetNodeStore(pod.Spec.NodeName)
nodeStore.DeletePodStore(podId)
}

nodeMetricsUpdateFunc := func(node *kubernetes.Node) {
nodeName := node.GetObjectMeta().GetName()
metrics := NewNodeMetrics()
if cpu, ok := node.Status.Capacity["cpu"]; ok {
if q, err := k8sresource.ParseQuantity(cpu.String()); err == nil {
metrics.CoresAllocatable = NewFloat64Metric(float64(q.MilliValue()) / 1000)
}
}
if memory, ok := node.Status.Capacity["memory"]; ok {
if q, err := k8sresource.ParseQuantity(memory.String()); err == nil {
metrics.MemoryAllocatable = NewFloat64Metric(float64(q.Value()))
}
}
nodeStore, _ := metaWatcher.metricsRepo.AddNodeStore(nodeName)
nodeStore.SetNodeMetrics(metrics)
}

clearMetadataCacheFunc := func(obj interface{}) {
enrichers := make(map[string]*enricher, len(metaWatcher.enrichers))

resourceWatchers.lock.Lock()
Expand All @@ -420,10 +473,35 @@ func addEventHandlerToWatcher(metaWatcher *metaWatcher, resourceWatchers *Watche
enricher.Unlock()
}
}

metaWatcher.watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {}, // do nothing
UpdateFunc: notifyFunc,
DeleteFunc: notifyFunc,
AddFunc: func(obj interface{}) {
switch res := obj.(type) {
case *kubernetes.Pod:
containerMetricsUpdateFunc(res)
case *kubernetes.Node:
nodeMetricsUpdateFunc(res)
}
},
UpdateFunc: func(obj interface{}) {
clearMetadataCacheFunc(obj)
switch res := obj.(type) {
case *kubernetes.Pod:
containerMetricsUpdateFunc(res)
case *kubernetes.Node:
nodeMetricsUpdateFunc(res)
}
},
DeleteFunc: func(obj interface{}) {
clearMetadataCacheFunc(obj)
switch res := obj.(type) {
case *kubernetes.Pod:
containerMetricsDeleteFunc(res)
case *kubernetes.Node:
nodeName := res.GetObjectMeta().GetName()
metaWatcher.metricsRepo.DeleteNodeStore(nodeName)
}
},
})
}

Expand Down Expand Up @@ -481,6 +559,7 @@ func createAllWatchers(
config *kubernetesConfig,
log *logp.Logger,
resourceWatchers *Watchers,
metricsRepo *MetricsRepo,
) error {
res := getResource(resourceName)
if res == nil {
Expand All @@ -494,7 +573,7 @@ func createAllWatchers(
// Create the main watcher for the given resource.
// For example pod metricset's main watcher will be pod watcher.
// If it fails, we return an error, so we can stop the extra watchers from creating.
created, err := createWatcher(resourceName, res, *options, client, metadataClient, resourceWatchers, config.Namespace, false)
created, err := createWatcher(resourceName, res, *options, client, metadataClient, resourceWatchers, metricsRepo, config.Namespace, false)
if err != nil {
return fmt.Errorf("error initializing Kubernetes watcher %s, required by %s: %w", resourceName, metricsetName, err)
} else if created {
Expand All @@ -509,7 +588,7 @@ func createAllWatchers(
for _, extra := range extraWatchers {
extraRes := getResource(extra)
if extraRes != nil {
created, err = createWatcher(extra, extraRes, *options, client, metadataClient, resourceWatchers, config.Namespace, true)
created, err = createWatcher(extra, extraRes, *options, client, metadataClient, resourceWatchers, metricsRepo, config.Namespace, true)
if err != nil {
log.Errorf("Error initializing Kubernetes watcher %s, required by %s: %s", extra, metricsetName, err)
} else {
Expand Down Expand Up @@ -654,7 +733,7 @@ func NewResourceMetadataEnricher(
metricsetName := base.Name()
resourceName := getResourceName(metricsetName)
// Create all watchers needed for this metricset
err = createAllWatchers(client, metadataClient, metricsetName, resourceName, nodeScope, config, log, resourceWatchers)
err = createAllWatchers(client, metadataClient, metricsetName, resourceName, nodeScope, config, log, resourceWatchers, metricsRepo)
if err != nil {
log.Errorf("Error starting the watchers: %s", err)
return &nilEnricher{}
Expand All @@ -680,20 +759,13 @@ func NewResourceMetadataEnricher(
// It is responsible for generating the metadata for a detected resource by executing the metadata generators Generate method.
// It is a common handler for all resource watchers. The kind of resource(e.g. pod or deployment) is checked inside the function.
// It returns a map of a resource identifier(i.e. namespace-resource_name) as key and the metadata as value.
updateFunc := getEventMetadataFunc(log, generalMetaGen, specificMetaGen, metricsRepo)
updateFunc := getEventMetadataFunc(log, generalMetaGen, specificMetaGen)

// deleteFunc to be used as the resource watcher's delete handler.
// The deleteFunc is executed when a watcher is triggered for a resource deletion(e.g. pod deleted).
// It returns the identifier of the resource.
deleteFunc := func(r kubernetes.Resource) []string {
accessor, _ := meta.Accessor(r)

switch r := r.(type) {
case *kubernetes.Node:
nodeName := r.GetObjectMeta().GetName()
metricsRepo.DeleteNodeStore(nodeName)
}

id := accessor.GetName()
namespace := accessor.GetNamespace()
if namespace != "" {
Expand Down Expand Up @@ -772,7 +844,7 @@ func NewContainerMetadataEnricher(

metricsetName := base.Name()

err = createAllWatchers(client, metadataClient, metricsetName, PodResource, nodeScope, config, log, resourceWatchers)
err = createAllWatchers(client, metadataClient, metricsetName, PodResource, nodeScope, config, log, resourceWatchers, metricsRepo)
if err != nil {
log.Errorf("Error starting the watchers: %s", err)
return &nilEnricher{}
Expand Down Expand Up @@ -802,27 +874,8 @@ func NewContainerMetadataEnricher(
mapStatuses(pod.Status.ContainerStatuses)
mapStatuses(pod.Status.InitContainerStatuses)

nodeStore, _ := metricsRepo.AddNodeStore(pod.Spec.NodeName)
podId := NewPodId(pod.Namespace, pod.Name)
podStore, _ := nodeStore.AddPodStore(podId)

for _, container := range append(pod.Spec.Containers, pod.Spec.InitContainers...) {
cmeta := mapstr.M{}
metrics := NewContainerMetrics()

if cpu, ok := container.Resources.Limits["cpu"]; ok {
if q, err := resource.ParseQuantity(cpu.String()); err == nil {
metrics.CoresLimit = NewFloat64Metric(float64(q.MilliValue()) / 1000)
}
}
if memory, ok := container.Resources.Limits["memory"]; ok {
if q, err := resource.ParseQuantity(memory.String()); err == nil {
metrics.MemoryLimit = NewFloat64Metric(float64(q.Value()))
}
}

containerStore, _ := podStore.AddContainerStore(container.Name)
containerStore.SetContainerMetrics(metrics)

if s, ok := statuses[container.Name]; ok {
// Extracting id and runtime ECS fields from ContainerID
Expand All @@ -849,9 +902,6 @@ func NewContainerMetadataEnricher(
if !ok {
base.Logger().Debugf("Error while casting event: %s", ok)
}
podId := NewPodId(pod.Namespace, pod.Name)
nodeStore := metricsRepo.GetNodeStore(pod.Spec.NodeName)
nodeStore.DeletePodStore(podId)

for _, container := range append(pod.Spec.Containers, pod.Spec.InitContainers...) {
id := join(pod.ObjectMeta.GetNamespace(), pod.GetObjectMeta().GetName(), container.Name)
Expand Down Expand Up @@ -1217,7 +1267,6 @@ func getEventMetadataFunc(
logger *logp.Logger,
generalMetaGen *metadata.Resource,
specificMetaGen metadata.MetaGen,
metricsRepo *MetricsRepo,
) func(r kubernetes.Resource) map[string]mapstr.M {
return func(r kubernetes.Resource) map[string]mapstr.M {
accessor, accErr := meta.Accessor(r)
Expand All @@ -1233,23 +1282,7 @@ func getEventMetadataFunc(
switch r := r.(type) {
case *kubernetes.Pod:
return map[string]mapstr.M{id: specificMetaGen.Generate(r)}

case *kubernetes.Node:
nodeName := r.GetObjectMeta().GetName()
metrics := NewNodeMetrics()
if cpu, ok := r.Status.Capacity["cpu"]; ok {
if q, err := resource.ParseQuantity(cpu.String()); err == nil {
metrics.CoresAllocatable = NewFloat64Metric(float64(q.MilliValue()) / 1000)
}
}
if memory, ok := r.Status.Capacity["memory"]; ok {
if q, err := resource.ParseQuantity(memory.String()); err == nil {
metrics.MemoryAllocatable = NewFloat64Metric(float64(q.Value()))
}
}
nodeStore, _ := metricsRepo.AddNodeStore(nodeName)
nodeStore.SetNodeMetrics(metrics)

return map[string]mapstr.M{id: generalMetaGen.Generate(NodeResource, r)}
case *kubernetes.Deployment:
return map[string]mapstr.M{id: generalMetaGen.Generate(DeploymentResource, r)}
Expand Down
Loading

0 comments on commit f5ecf88

Please sign in to comment.