Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Metricbeat k8s metadata sometimes not being present at startup #41216

Merged
merged 2 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add AWS OwningAccount support for cross account monitoring {issue}40570[40570] {pull}40691[40691]
- Use namespace for GetListMetrics when exists in AWS {pull}41022[41022]
- Fix http server helper SSL config. {pull}39405[39405]
- Fix Kubernetes metadata sometimes not being present after startup {pull}41216[41216]

*Osquerybeat*

Expand Down
280 changes: 131 additions & 149 deletions metricbeat/module/kubernetes/util/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package util
import (
"errors"
"fmt"
"maps"
"strings"
"sync"
"time"
Expand All @@ -39,6 +40,10 @@ import (
"github.com/elastic/beats/v7/metricbeat/mb"
)

// Resource metadata keys are composed of multiple parts - usually just the namespace and name. This string is the
// separator between the parts when treating the key as a single string.
const resourceMetadataKeySeparator = "/"

type kubernetesConfig struct {
KubeConfig string `config:"kube_config"`
KubeAdm bool `config:"use_kubeadm"`
Expand Down Expand Up @@ -67,12 +72,13 @@ type Enricher interface {

type enricher struct {
sync.RWMutex
metadata map[string]mapstr.M
metadataCache map[string]mapstr.M
index func(mapstr.M) string
updateFunc func(kubernetes.Resource) map[string]mapstr.M
deleteFunc func(kubernetes.Resource) []string
metricsetName string
resourceName string
watcher *metaWatcher
isPod bool
config *kubernetesConfig
log *logp.Logger
Expand All @@ -90,8 +96,7 @@ type metaWatcher struct {

metricsetsUsing []string // list of metricsets using this shared watcher(e.g. pod, container, state_pod)

enrichers map[string]*enricher // map of enrichers using this watcher. The key is the metricset name. Each metricset has its own enricher
metadataObjects map[string]bool // representation of a set of ids(in the form of namespace_name-resource_name) of each object received by the watcher's handler functions
enrichers map[string]*enricher // map of enrichers using this watcher. The key is the metricset name. Each metricset has its own enricher

nodeScope bool // whether this watcher should watch for resources in current node or in whole cluster
restartWatcher kubernetes.Watcher // whether this watcher needs a restart. Only relevant in leader nodes due to metricsets with different nodescope(pod, state_pod)
Expand Down Expand Up @@ -179,10 +184,10 @@ func getExtraWatchers(resourceName string, addResourceMetadata *metadata.AddReso
// in order to be able to retrieve 2nd layer Owner metadata like in case of:
// Deployment -> Replicaset -> Pod
// CronJob -> job -> Pod
if addResourceMetadata != nil && addResourceMetadata.Deployment {
if addResourceMetadata.Deployment {
extra = append(extra, ReplicaSetResource)
}
if addResourceMetadata != nil && addResourceMetadata.CronJob {
if addResourceMetadata.CronJob {
extra = append(extra, JobResource)
}
return extra
Expand Down Expand Up @@ -320,47 +325,82 @@ func createWatcher(
// Check if a watcher for the specific resource already exists.
resourceMetaWatcher, ok := resourceWatchers.metaWatchersMap[resourceName]

// If it does not exist, create the resourceMetaWatcher.
if !ok {
// Check if we need to add namespace to the watcher's options.
if isNamespaced(resourceName) {
options.Namespace = namespace
}
watcher, err := kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil)
if err != nil {
return false, err
}
resourceWatchers.metaWatchersMap[resourceName] = &metaWatcher{
watcher: watcher,
started: false, // not started yet
metadataObjects: make(map[string]bool),
enrichers: make(map[string]*enricher),
metricsetsUsing: make([]string, 0),
restartWatcher: nil,
nodeScope: nodeScope,
}
return true, nil
} else if resourceMetaWatcher.nodeScope != nodeScope && resourceMetaWatcher.nodeScope {
// It might happen that the watcher already exists, but is only being used to monitor the resources
// of a single node(e.g. created by pod metricset). In that case, we need to check if we are trying to create a new watcher that will track
// the resources of whole cluster(e.g. in case of state_pod metricset).
// If it is the case, then we need to update the watcher by changing its watch options (removing options.Node)
// A running watcher cannot be updated directly. Instead, we must create a new one with the correct watch options.
// The new restartWatcher must be identical to the old watcher, including the same handler function, with the only difference being the watch options.

if isNamespaced(resourceName) {
options.Namespace = namespace
// If the watcher exists, exit
if ok {
if resourceMetaWatcher.nodeScope != nodeScope && resourceMetaWatcher.nodeScope {
// It might happen that the watcher already exists, but is only being used to monitor the resources
// of a single node(e.g. created by pod metricset). In that case, we need to check if we are trying to create a new watcher that will track
// the resources of whole cluster(e.g. in case of state_pod metricset).
// If it is the case, then we need to update the watcher by changing its watch options (removing options.Node)
// A running watcher cannot be updated directly. Instead, we must create a new one with the correct watch options.
// The new restartWatcher must be identical to the old watcher, including the same handler function, with the only difference being the watch options.

if isNamespaced(resourceName) {
options.Namespace = namespace
}
restartWatcher, err := kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil)
if err != nil {
return false, err
}
// update the handler of the restartWatcher to match the current watcher's handler.
restartWatcher.AddEventHandler(resourceMetaWatcher.watcher.GetEventHandler())
resourceMetaWatcher.restartWatcher = restartWatcher
resourceMetaWatcher.nodeScope = nodeScope
}
restartWatcher, err := kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil)
if err != nil {
return false, err
return false, nil
}
// Watcher doesn't exist, create it

// Check if we need to add namespace to the watcher's options.
if isNamespaced(resourceName) {
options.Namespace = namespace
}
watcher, err := kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil)
if err != nil {
return false, err
}

resourceMetaWatcher = &metaWatcher{
watcher: watcher,
started: false, // not started yet
enrichers: make(map[string]*enricher),
metricsetsUsing: make([]string, 0),
restartWatcher: nil,
nodeScope: nodeScope,
}
resourceWatchers.metaWatchersMap[resourceName] = resourceMetaWatcher

// Add event handlers to the watcher. The only action we need to do here is invalidate the enricher cache.
addEventHandlerToWatcher(resourceMetaWatcher, resourceWatchers)

return true, nil
}

// addEventHandlerToWatcher adds an event handler to the watcher that invalidates the cache of enrichers attached
// to the watcher.
func addEventHandlerToWatcher(metaWatcher *metaWatcher, resourceWatchers *Watchers) {
notifyFunc := func(obj interface{}) {
enrichers := make(map[string]*enricher, len(metaWatcher.enrichers))

resourceWatchers.lock.Lock()
maps.Copy(enrichers, metaWatcher.enrichers)
resourceWatchers.lock.Unlock()

for _, enricher := range enrichers {
enricher.Lock()
ids := enricher.deleteFunc(obj.(kubernetes.Resource))
// update this watcher events by removing all the metadata[id]
for _, id := range ids {
delete(enricher.metadataCache, id)
}
enricher.Unlock()
}
// update the handler of the restartWatcher to match the current watcher's handler.
restartWatcher.AddEventHandler(resourceMetaWatcher.watcher.GetEventHandler())
resourceMetaWatcher.restartWatcher = restartWatcher
resourceMetaWatcher.nodeScope = nodeScope
}
return false, nil
metaWatcher.watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {}, // do nothing
UpdateFunc: notifyFunc,
DeleteFunc: notifyFunc,
})
}

// addToMetricsetsUsing adds metricset identified by metricsetUsing to the list of resources using the shared watcher
Expand Down Expand Up @@ -613,6 +653,7 @@ func NewResourceMetadataEnricher(
return &nilEnricher{}
}

_, _ = specificMetaGen, generalMetaGen // necessary for earlier versions of golangci-lint
// updateFunc to be used as the resource watchers add and update handler.
// The handler function is executed when a watcher is triggered(i.e. new/updated resource).
// It is responsible for generating the metadata for a detected resource by executing the metadata generators Generate method.
Expand Down Expand Up @@ -922,7 +963,7 @@ func getString(m mapstr.M, key string) string {
}

func join(fields ...string) string {
return strings.Join(fields, ":")
return strings.Join(fields, resourceMetadataKeySeparator)
}

// buildMetadataEnricher builds and returns a metadata enricher for a given metricset.
Expand All @@ -940,7 +981,7 @@ func buildMetadataEnricher(
log *logp.Logger) *enricher {

enricher := &enricher{
metadata: map[string]mapstr.M{},
metadataCache: map[string]mapstr.M{},
index: indexFunc,
updateFunc: updateFunc,
deleteFunc: deleteFunc,
Expand All @@ -958,104 +999,7 @@ func buildMetadataEnricher(
if resourceMetaWatcher != nil {
// Append the new enricher to watcher's enrichers map.
resourceMetaWatcher.enrichers[metricsetName] = enricher

// Check if this shared watcher has already detected resources and collected their
// metadata for another enricher.
// In that case, for each resource, call the updateFunc of the current enricher to
// generate its metadata. This is needed in cases where the watcher has already been
// notified for new/updated resources while the enricher for current metricset has not
// built yet (example is pod, state_pod metricsets).
for key := range resourceMetaWatcher.metadataObjects {
obj, exists, err := resourceMetaWatcher.watcher.Store().GetByKey(key)
if err != nil {
log.Errorf("Error trying to get the object from the store: %s", err)
} else {
if exists {
newMetadataEvents := enricher.updateFunc(obj.(kubernetes.Resource))
// add the new metadata to the watcher received metadata
for id, metadata := range newMetadataEvents {
enricher.metadata[id] = metadata
}
}
}
}

// AddEventHandler sets add, update and delete methods of watcher.
// Those methods are triggered when an event is detected for a
// resource creation, update or deletion.
resourceMetaWatcher.watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
resourceWatchers.lock.Lock()
defer resourceWatchers.lock.Unlock()

// Add object(detected resource) to the list of metadata objects of this watcher,
// so it can be used by enrichers created after the event is triggered.
// The identifier of the object is in the form of namespace/name so that
// it can be easily fetched from watcher's store in previous step.
accessor, _ := meta.Accessor(obj.(kubernetes.Resource))
id := accessor.GetName()
namespace := accessor.GetNamespace()
if namespace != "" {
id = namespace + "/" + id
}
resourceMetaWatcher.metadataObjects[id] = true
// Execute the updateFunc of each enricher associated to thos watcher.
for _, enricher := range resourceMetaWatcher.enrichers {
enricher.Lock()
newMetadataEvents := enricher.updateFunc(obj.(kubernetes.Resource))
// add the new metadata to the watcher received metadata
for id, metadata := range newMetadataEvents {
enricher.metadata[id] = metadata
}
enricher.Unlock()
}
},
UpdateFunc: func(obj interface{}) {
resourceWatchers.lock.Lock()
defer resourceWatchers.lock.Unlock()

// Add object to the list of metadata objects of this watcher
accessor, _ := meta.Accessor(obj.(kubernetes.Resource))
id := accessor.GetName()
namespace := accessor.GetNamespace()
if namespace != "" {
id = namespace + "/" + id
}
resourceMetaWatcher.metadataObjects[id] = true

for _, enricher := range resourceMetaWatcher.enrichers {
enricher.Lock()
updatedMetadataEvents := enricher.updateFunc(obj.(kubernetes.Resource))
for id, metadata := range updatedMetadataEvents {
enricher.metadata[id] = metadata
}
enricher.Unlock()
}
},
DeleteFunc: func(obj interface{}) {
resourceWatchers.lock.Lock()
defer resourceWatchers.lock.Unlock()

// Remove object from the list of metadata objects of this watcher
accessor, _ := meta.Accessor(obj.(kubernetes.Resource))
id := accessor.GetName()
namespace := accessor.GetNamespace()
if namespace != "" {
id = namespace + "/" + id
}
delete(resourceMetaWatcher.metadataObjects, id)

for _, enricher := range resourceMetaWatcher.enrichers {
enricher.Lock()
ids := enricher.deleteFunc(obj.(kubernetes.Resource))
// update this watcher events by removing all the metadata[id]
for _, id := range ids {
delete(enricher.metadata, id)
}
enricher.Unlock()
}
},
})
enricher.watcher = resourceMetaWatcher
}

return enricher
Expand Down Expand Up @@ -1142,11 +1086,8 @@ func (e *enricher) Stop(resourceWatchers *Watchers) {
// This method is executed whenever a new event is created and about to be published.
// The enricher's index method is used to retrieve the resource identifier from each event.
func (e *enricher) Enrich(events []mapstr.M) {
e.RLock()
defer e.RUnlock()

for _, event := range events {
if meta := e.metadata[e.index(event)]; meta != nil {
if meta := e.getMetadata(event); meta != nil {
pkoutsovasilis marked this conversation as resolved.
Show resolved Hide resolved
k8s, err := meta.GetValue("kubernetes")
if err != nil {
continue
Expand All @@ -1163,10 +1104,9 @@ func (e *enricher) Enrich(events []mapstr.M) {
}

// don't apply pod metadata to module level
k8sMeta = k8sMeta.Clone()
delete(k8sMeta, "pod")
}
ecsMeta := meta.Clone()
ecsMeta := meta
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not required since the Clone is now handled by the e.getMetadata(...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just renaming the variable for clarity. Maps are reference types in Go, so this really just copies a pointer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that maps are built-in references ty 🙂 I have to admit that this is the first time I witness a new variable assignment in the same scope for clarity reasons; however, I have seen comments added or renaming the variable at the creation time, for clarity reasons

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The real meaning of this series of statements is "ecsMeta is meta without the kubernetes key". It was clearer when we were actually cloning meta, but we don't need to do that anymore. I can change it to only use meta if you think this is confusing.

err = ecsMeta.Delete("kubernetes")
if err != nil {
logp.Debug("kubernetes", "Failed to delete field '%s': %s", "kubernetes", err)
Expand All @@ -1180,6 +1120,48 @@ func (e *enricher) Enrich(events []mapstr.M) {
}
}

// getMetadata returns metadata for the given event. If the metadata doesn't exist in the cache, we try to get it
// from the watcher store.
// The returned map is copy to be owned by the caller.
func (e *enricher) getMetadata(event mapstr.M) mapstr.M {
e.Lock()
defer e.Unlock()
metaKey := e.index(event)
eventMeta := e.metadataCache[metaKey]
if eventMeta == nil {
e.updateMetadataCacheFromWatcher(metaKey)
eventMeta = e.metadataCache[metaKey]
}
if eventMeta != nil {
eventMeta = eventMeta.Clone()
}
return eventMeta
}

// updateMetadataCacheFromWatcher updates the metadata cache for the given key with data from the watcher.
func (e *enricher) updateMetadataCacheFromWatcher(key string) {
storeKey := getWatcherStoreKeyFromMetadataKey(key)
if res, exists, _ := e.watcher.watcher.Store().GetByKey(storeKey); exists {
eventMetaMap := e.updateFunc(res.(kubernetes.Resource))
for k, v := range eventMetaMap {
e.metadataCache[k] = v
}
}
}

// getWatcherStoreKeyFromMetadataKey returns a watcher store key for a given metadata cache key. These are identical
// for nearly all resources, and have the form `{namespace}/{name}`, with the exception of containers, where it's
// `{namespace}/{pod_name}/{container_name}`. In that case, we want the Pod key, so we drop the final part.
func getWatcherStoreKeyFromMetadataKey(metaKey string) string {
parts := strings.Split(metaKey, resourceMetadataKeySeparator)
if len(parts) <= 2 { // normal K8s resource
return metaKey
}

// container, we need to remove the final part to get the Pod key
return strings.Join(parts[:2], resourceMetadataKeySeparator)
}

func CreateEvent(event mapstr.M, namespace string) (mb.Event, error) {
var moduleFieldsMapStr mapstr.M
moduleFields, ok := event[mb.ModuleDataKey]
Expand Down
Loading
Loading