diff --git a/.golangci.yml b/.golangci.yml index 52c4fb79720..936215ea909 100755 --- a/.golangci.yml +++ b/.golangci.yml @@ -30,6 +30,9 @@ issues: - text: "imported and not used" linters: - typecheck + - text: "previous case" + linters: + - typecheck # From mage we are printing to the console to ourselves - path: (.*magefile.go|.*dev-tools/mage/.*) linters: forbidigo diff --git a/metricbeat/module/kubernetes/util/kubernetes.go b/metricbeat/module/kubernetes/util/kubernetes.go index 1f301b72571..08ab23674d8 100644 --- a/metricbeat/module/kubernetes/util/kubernetes.go +++ b/metricbeat/module/kubernetes/util/kubernetes.go @@ -33,7 +33,7 @@ import ( k8sclientmeta "k8s.io/client-go/metadata" "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/api/resource" + k8sresource "k8s.io/apimachinery/pkg/api/resource" "github.com/elastic/elastic-agent-autodiscover/kubernetes" "github.com/elastic/elastic-agent-autodiscover/kubernetes/metadata" @@ -101,7 +101,8 @@ type metaWatcher struct { metricsetsUsing []string // list of metricsets using this shared watcher(e.g. pod, container, state_pod) - enrichers map[string]*enricher // map of enrichers using this watcher. The key is the metricset name. Each metricset has its own enricher + enrichers map[string]*enricher // map of enrichers using this watcher. The key is the metricset name. Each metricset has its own enricher + metricsRepo *MetricsRepo // used to update container metrics derived from metadata, like resource limits nodeScope bool // whether this watcher should watch for resources in current node or in whole cluster restartWatcher kubernetes.Watcher // whether this watcher needs a restart. Only relevant in leader nodes due to metricsets with different nodescope(pod, state_pod) @@ -311,6 +312,7 @@ func createWatcher( client k8sclient.Interface, metadataClient k8sclientmeta.Interface, resourceWatchers *Watchers, + metricsRepo *MetricsRepo, namespace string, extraWatcher bool) (bool, error) { @@ -388,6 +390,7 @@ func createWatcher( watcher: watcher, started: false, // not started yet enrichers: make(map[string]*enricher), + metricsRepo: metricsRepo, metricsetsUsing: make([]string, 0), restartWatcher: nil, nodeScope: nodeScope, @@ -395,15 +398,65 @@ func createWatcher( resourceWatchers.metaWatchersMap[resourceName] = resourceMetaWatcher // Add event handlers to the watcher. The only action we need to do here is invalidate the enricher cache. - addEventHandlerToWatcher(resourceMetaWatcher, resourceWatchers) + addEventHandlersToWatcher(resourceMetaWatcher, resourceWatchers) return true, nil } -// addEventHandlerToWatcher adds an event handler to the watcher that invalidates the cache of enrichers attached -// to the watcher. -func addEventHandlerToWatcher(metaWatcher *metaWatcher, resourceWatchers *Watchers) { - notifyFunc := func(obj interface{}) { +// addEventHandlerToWatcher adds an event handlers to the watcher that invalidate the cache of enrichers attached +// to the watcher and update container metrics on Pod change events. +func addEventHandlersToWatcher( + metaWatcher *metaWatcher, + resourceWatchers *Watchers, +) { + containerMetricsUpdateFunc := func(pod *kubernetes.Pod) { + nodeStore, _ := metaWatcher.metricsRepo.AddNodeStore(pod.Spec.NodeName) + podId := NewPodId(pod.Namespace, pod.Name) + podStore, _ := nodeStore.AddPodStore(podId) + + for _, container := range append(pod.Spec.Containers, pod.Spec.InitContainers...) { + metrics := NewContainerMetrics() + + if cpu, ok := container.Resources.Limits["cpu"]; ok { + if q, err := k8sresource.ParseQuantity(cpu.String()); err == nil { + metrics.CoresLimit = NewFloat64Metric(float64(q.MilliValue()) / 1000) + } + } + if memory, ok := container.Resources.Limits["memory"]; ok { + if q, err := k8sresource.ParseQuantity(memory.String()); err == nil { + metrics.MemoryLimit = NewFloat64Metric(float64(q.Value())) + } + } + + containerStore, _ := podStore.AddContainerStore(container.Name) + containerStore.SetContainerMetrics(metrics) + } + } + + containerMetricsDeleteFunc := func(pod *kubernetes.Pod) { + podId := NewPodId(pod.Namespace, pod.Name) + nodeStore := metaWatcher.metricsRepo.GetNodeStore(pod.Spec.NodeName) + nodeStore.DeletePodStore(podId) + } + + nodeMetricsUpdateFunc := func(node *kubernetes.Node) { + nodeName := node.GetObjectMeta().GetName() + metrics := NewNodeMetrics() + if cpu, ok := node.Status.Capacity["cpu"]; ok { + if q, err := k8sresource.ParseQuantity(cpu.String()); err == nil { + metrics.CoresAllocatable = NewFloat64Metric(float64(q.MilliValue()) / 1000) + } + } + if memory, ok := node.Status.Capacity["memory"]; ok { + if q, err := k8sresource.ParseQuantity(memory.String()); err == nil { + metrics.MemoryAllocatable = NewFloat64Metric(float64(q.Value())) + } + } + nodeStore, _ := metaWatcher.metricsRepo.AddNodeStore(nodeName) + nodeStore.SetNodeMetrics(metrics) + } + + clearMetadataCacheFunc := func(obj interface{}) { enrichers := make(map[string]*enricher, len(metaWatcher.enrichers)) resourceWatchers.lock.Lock() @@ -420,10 +473,35 @@ func addEventHandlerToWatcher(metaWatcher *metaWatcher, resourceWatchers *Watche enricher.Unlock() } } + metaWatcher.watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) {}, // do nothing - UpdateFunc: notifyFunc, - DeleteFunc: notifyFunc, + AddFunc: func(obj interface{}) { + switch res := obj.(type) { + case *kubernetes.Pod: + containerMetricsUpdateFunc(res) + case *kubernetes.Node: + nodeMetricsUpdateFunc(res) + } + }, + UpdateFunc: func(obj interface{}) { + clearMetadataCacheFunc(obj) + switch res := obj.(type) { + case *kubernetes.Pod: + containerMetricsUpdateFunc(res) + case *kubernetes.Node: + nodeMetricsUpdateFunc(res) + } + }, + DeleteFunc: func(obj interface{}) { + clearMetadataCacheFunc(obj) + switch res := obj.(type) { + case *kubernetes.Pod: + containerMetricsDeleteFunc(res) + case *kubernetes.Node: + nodeName := res.GetObjectMeta().GetName() + metaWatcher.metricsRepo.DeleteNodeStore(nodeName) + } + }, }) } @@ -481,6 +559,7 @@ func createAllWatchers( config *kubernetesConfig, log *logp.Logger, resourceWatchers *Watchers, + metricsRepo *MetricsRepo, ) error { res := getResource(resourceName) if res == nil { @@ -494,7 +573,7 @@ func createAllWatchers( // Create the main watcher for the given resource. // For example pod metricset's main watcher will be pod watcher. // If it fails, we return an error, so we can stop the extra watchers from creating. - created, err := createWatcher(resourceName, res, *options, client, metadataClient, resourceWatchers, config.Namespace, false) + created, err := createWatcher(resourceName, res, *options, client, metadataClient, resourceWatchers, metricsRepo, config.Namespace, false) if err != nil { return fmt.Errorf("error initializing Kubernetes watcher %s, required by %s: %w", resourceName, metricsetName, err) } else if created { @@ -509,7 +588,7 @@ func createAllWatchers( for _, extra := range extraWatchers { extraRes := getResource(extra) if extraRes != nil { - created, err = createWatcher(extra, extraRes, *options, client, metadataClient, resourceWatchers, config.Namespace, true) + created, err = createWatcher(extra, extraRes, *options, client, metadataClient, resourceWatchers, metricsRepo, config.Namespace, true) if err != nil { log.Errorf("Error initializing Kubernetes watcher %s, required by %s: %s", extra, metricsetName, err) } else { @@ -654,7 +733,7 @@ func NewResourceMetadataEnricher( metricsetName := base.Name() resourceName := getResourceName(metricsetName) // Create all watchers needed for this metricset - err = createAllWatchers(client, metadataClient, metricsetName, resourceName, nodeScope, config, log, resourceWatchers) + err = createAllWatchers(client, metadataClient, metricsetName, resourceName, nodeScope, config, log, resourceWatchers, metricsRepo) if err != nil { log.Errorf("Error starting the watchers: %s", err) return &nilEnricher{} @@ -680,20 +759,13 @@ func NewResourceMetadataEnricher( // It is responsible for generating the metadata for a detected resource by executing the metadata generators Generate method. // It is a common handler for all resource watchers. The kind of resource(e.g. pod or deployment) is checked inside the function. // It returns a map of a resource identifier(i.e. namespace-resource_name) as key and the metadata as value. - updateFunc := getEventMetadataFunc(log, generalMetaGen, specificMetaGen, metricsRepo) + updateFunc := getEventMetadataFunc(log, generalMetaGen, specificMetaGen) // deleteFunc to be used as the resource watcher's delete handler. // The deleteFunc is executed when a watcher is triggered for a resource deletion(e.g. pod deleted). // It returns the identifier of the resource. deleteFunc := func(r kubernetes.Resource) []string { accessor, _ := meta.Accessor(r) - - switch r := r.(type) { - case *kubernetes.Node: - nodeName := r.GetObjectMeta().GetName() - metricsRepo.DeleteNodeStore(nodeName) - } - id := accessor.GetName() namespace := accessor.GetNamespace() if namespace != "" { @@ -772,7 +844,7 @@ func NewContainerMetadataEnricher( metricsetName := base.Name() - err = createAllWatchers(client, metadataClient, metricsetName, PodResource, nodeScope, config, log, resourceWatchers) + err = createAllWatchers(client, metadataClient, metricsetName, PodResource, nodeScope, config, log, resourceWatchers, metricsRepo) if err != nil { log.Errorf("Error starting the watchers: %s", err) return &nilEnricher{} @@ -802,27 +874,8 @@ func NewContainerMetadataEnricher( mapStatuses(pod.Status.ContainerStatuses) mapStatuses(pod.Status.InitContainerStatuses) - nodeStore, _ := metricsRepo.AddNodeStore(pod.Spec.NodeName) - podId := NewPodId(pod.Namespace, pod.Name) - podStore, _ := nodeStore.AddPodStore(podId) - for _, container := range append(pod.Spec.Containers, pod.Spec.InitContainers...) { cmeta := mapstr.M{} - metrics := NewContainerMetrics() - - if cpu, ok := container.Resources.Limits["cpu"]; ok { - if q, err := resource.ParseQuantity(cpu.String()); err == nil { - metrics.CoresLimit = NewFloat64Metric(float64(q.MilliValue()) / 1000) - } - } - if memory, ok := container.Resources.Limits["memory"]; ok { - if q, err := resource.ParseQuantity(memory.String()); err == nil { - metrics.MemoryLimit = NewFloat64Metric(float64(q.Value())) - } - } - - containerStore, _ := podStore.AddContainerStore(container.Name) - containerStore.SetContainerMetrics(metrics) if s, ok := statuses[container.Name]; ok { // Extracting id and runtime ECS fields from ContainerID @@ -849,9 +902,6 @@ func NewContainerMetadataEnricher( if !ok { base.Logger().Debugf("Error while casting event: %s", ok) } - podId := NewPodId(pod.Namespace, pod.Name) - nodeStore := metricsRepo.GetNodeStore(pod.Spec.NodeName) - nodeStore.DeletePodStore(podId) for _, container := range append(pod.Spec.Containers, pod.Spec.InitContainers...) { id := join(pod.ObjectMeta.GetNamespace(), pod.GetObjectMeta().GetName(), container.Name) @@ -1217,7 +1267,6 @@ func getEventMetadataFunc( logger *logp.Logger, generalMetaGen *metadata.Resource, specificMetaGen metadata.MetaGen, - metricsRepo *MetricsRepo, ) func(r kubernetes.Resource) map[string]mapstr.M { return func(r kubernetes.Resource) map[string]mapstr.M { accessor, accErr := meta.Accessor(r) @@ -1233,23 +1282,7 @@ func getEventMetadataFunc( switch r := r.(type) { case *kubernetes.Pod: return map[string]mapstr.M{id: specificMetaGen.Generate(r)} - case *kubernetes.Node: - nodeName := r.GetObjectMeta().GetName() - metrics := NewNodeMetrics() - if cpu, ok := r.Status.Capacity["cpu"]; ok { - if q, err := resource.ParseQuantity(cpu.String()); err == nil { - metrics.CoresAllocatable = NewFloat64Metric(float64(q.MilliValue()) / 1000) - } - } - if memory, ok := r.Status.Capacity["memory"]; ok { - if q, err := resource.ParseQuantity(memory.String()); err == nil { - metrics.MemoryAllocatable = NewFloat64Metric(float64(q.Value())) - } - } - nodeStore, _ := metricsRepo.AddNodeStore(nodeName) - nodeStore.SetNodeMetrics(metrics) - return map[string]mapstr.M{id: generalMetaGen.Generate(NodeResource, r)} case *kubernetes.Deployment: return map[string]mapstr.M{id: generalMetaGen.Generate(DeploymentResource, r)} diff --git a/metricbeat/module/kubernetes/util/kubernetes_test.go b/metricbeat/module/kubernetes/util/kubernetes_test.go index ec2309b08bf..3f38e7656b1 100644 --- a/metricbeat/module/kubernetes/util/kubernetes_test.go +++ b/metricbeat/module/kubernetes/util/kubernetes_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "k8s.io/apimachinery/pkg/api/resource" + "github.com/elastic/beats/v7/metricbeat/mb" "github.com/stretchr/testify/assert" @@ -71,6 +73,7 @@ func TestWatchOptions(t *testing.T) { func TestCreateWatcher(t *testing.T) { resourceWatchers := NewWatchers() + metricsRepo := NewMetricsRepo() client := k8sfake.NewSimpleClientset() metadataClient := k8smetafake.NewSimpleMetadataClient(k8smetafake.NewTestScheme()) @@ -84,7 +87,16 @@ func TestCreateWatcher(t *testing.T) { options, err := getWatchOptions(config, false, client, log) require.NoError(t, err) - created, err := createWatcher(NamespaceResource, &kubernetes.Node{}, *options, client, metadataClient, resourceWatchers, config.Namespace, false) + created, err := createWatcher( + NamespaceResource, + &kubernetes.Node{}, + *options, + client, + metadataClient, + resourceWatchers, + metricsRepo, + config.Namespace, + false) require.True(t, created) require.NoError(t, err) @@ -94,7 +106,15 @@ func TestCreateWatcher(t *testing.T) { require.NotNil(t, resourceWatchers.metaWatchersMap[NamespaceResource].watcher) resourceWatchers.lock.Unlock() - created, err = createWatcher(NamespaceResource, &kubernetes.Namespace{}, *options, client, metadataClient, resourceWatchers, config.Namespace, true) + created, err = createWatcher( + NamespaceResource, + &kubernetes.Namespace{}, + *options, client, + metadataClient, + resourceWatchers, + metricsRepo, + config.Namespace, + true) require.False(t, created) require.NoError(t, err) @@ -104,7 +124,15 @@ func TestCreateWatcher(t *testing.T) { require.NotNil(t, resourceWatchers.metaWatchersMap[NamespaceResource].watcher) resourceWatchers.lock.Unlock() - created, err = createWatcher(DeploymentResource, &kubernetes.Deployment{}, *options, client, metadataClient, resourceWatchers, config.Namespace, false) + created, err = createWatcher( + DeploymentResource, + &kubernetes.Deployment{}, + *options, client, + metadataClient, + resourceWatchers, + metricsRepo, + config.Namespace, + false) require.True(t, created) require.NoError(t, err) @@ -117,6 +145,7 @@ func TestCreateWatcher(t *testing.T) { func TestAddToMetricsetsUsing(t *testing.T) { resourceWatchers := NewWatchers() + metricsRepo := NewMetricsRepo() client := k8sfake.NewSimpleClientset() metadataClient := k8smetafake.NewSimpleMetadataClient(k8smetafake.NewTestScheme()) @@ -131,7 +160,15 @@ func TestAddToMetricsetsUsing(t *testing.T) { require.NoError(t, err) // Create the new entry with watcher and nil string array first - created, err := createWatcher(DeploymentResource, &kubernetes.Deployment{}, *options, client, metadataClient, resourceWatchers, config.Namespace, false) + created, err := createWatcher( + DeploymentResource, + &kubernetes.Deployment{}, + *options, client, + metadataClient, + resourceWatchers, + metricsRepo, + config.Namespace, + false) require.True(t, created) require.NoError(t, err) @@ -155,6 +192,7 @@ func TestAddToMetricsetsUsing(t *testing.T) { func TestRemoveFromMetricsetsUsing(t *testing.T) { resourceWatchers := NewWatchers() + metricsRepo := NewMetricsRepo() client := k8sfake.NewSimpleClientset() metadataClient := k8smetafake.NewSimpleMetadataClient(k8smetafake.NewTestScheme()) @@ -169,7 +207,16 @@ func TestRemoveFromMetricsetsUsing(t *testing.T) { require.NoError(t, err) // Create the new entry with watcher and nil string array first - created, err := createWatcher(DeploymentResource, &kubernetes.Deployment{}, *options, client, metadataClient, resourceWatchers, config.Namespace, false) + created, err := createWatcher( + DeploymentResource, + &kubernetes.Deployment{}, + *options, + client, + metadataClient, + resourceWatchers, + metricsRepo, + config.Namespace, + false) require.True(t, created) require.NoError(t, err) @@ -194,8 +241,141 @@ func TestRemoveFromMetricsetsUsing(t *testing.T) { require.Equal(t, 0, size) } +func TestWatcherContainerMetrics(t *testing.T) { + resourceWatchers := NewWatchers() + metricsRepo := NewMetricsRepo() + + containerName := "test" + cpuLimit := resource.MustParse("100m") + memoryLimit := resource.MustParse("100Mi") + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID("mockuid"), + Name: "enrich", + Labels: map[string]string{ + "label": "value", + }, + Namespace: "default", + }, + Spec: v1.PodSpec{ + NodeName: "test-node", + Containers: []v1.Container{ + { + Name: containerName, + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: cpuLimit, + v1.ResourceMemory: memoryLimit, + }, + }, + }, + }, + }, + } + podId := NewPodId(pod.Namespace, pod.Name) + resourceWatchers.lock.Lock() + + watcher := newMockWatcher() + metaWatcher := &metaWatcher{ + watcher: watcher, + started: false, + metricsetsUsing: []string{"pod"}, + enrichers: make(map[string]*enricher), + metricsRepo: metricsRepo, + } + resourceWatchers.metaWatchersMap[PodResource] = metaWatcher + addEventHandlersToWatcher(metaWatcher, resourceWatchers) + resourceWatchers.lock.Unlock() + + // add Pod and verify container metrics are present and valid + watcher.handler.OnAdd(pod) + + containerStore := metricsRepo.GetNodeStore(pod.Spec.NodeName).GetPodStore(podId).GetContainerStore(containerName) + metrics := containerStore.GetContainerMetrics() + require.NotNil(t, metrics) + assert.Equal(t, 0.1, metrics.CoresLimit.Value) + assert.Equal(t, 100*1024*1024.0, metrics.MemoryLimit.Value) + + // modify the limit and verify the new value is present + pod.Spec.Containers[0].Resources.Limits[v1.ResourceCPU] = resource.MustParse("200m") + watcher.handler.OnUpdate(pod) + metrics = containerStore.GetContainerMetrics() + require.NotNil(t, metrics) + assert.Equal(t, 0.2, metrics.CoresLimit.Value) + + // delete the pod and verify no metrics are present + watcher.handler.OnDelete(pod) + containerStore = metricsRepo.GetNodeStore(pod.Spec.NodeName).GetPodStore(podId).GetContainerStore(containerName) + metrics = containerStore.GetContainerMetrics() + require.NotNil(t, metrics) + assert.Nil(t, metrics.CoresLimit) + assert.Nil(t, metrics.MemoryLimit) +} + +func TestWatcherNodeMetrics(t *testing.T) { + resourceWatchers := NewWatchers() + metricsRepo := NewMetricsRepo() + + cpuLimit := resource.MustParse("100m") + memoryLimit := resource.MustParse("100Mi") + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID("mockuid"), + Name: "enrich", + Labels: map[string]string{ + "label": "value", + }, + Namespace: "default", + }, + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourceCPU: cpuLimit, + v1.ResourceMemory: memoryLimit, + }, + }, + } + resourceWatchers.lock.Lock() + + watcher := newMockWatcher() + metaWatcher := &metaWatcher{ + watcher: watcher, + started: false, + metricsetsUsing: []string{"pod"}, + enrichers: make(map[string]*enricher), + metricsRepo: metricsRepo, + } + resourceWatchers.metaWatchersMap[NodeResource] = metaWatcher + addEventHandlersToWatcher(metaWatcher, resourceWatchers) + resourceWatchers.lock.Unlock() + + // add node and verify container metrics are present and valid + watcher.handler.OnAdd(node) + + nodeStore := metricsRepo.GetNodeStore(node.Name) + metrics := nodeStore.GetNodeMetrics() + require.NotNil(t, metrics) + assert.Equal(t, 0.1, metrics.CoresAllocatable.Value) + assert.Equal(t, 100*1024*1024.0, metrics.MemoryAllocatable.Value) + + // modify the limit and verify the new value is present + node.Status.Capacity[v1.ResourceCPU] = resource.MustParse("200m") + watcher.handler.OnUpdate(node) + metrics = nodeStore.GetNodeMetrics() + require.NotNil(t, metrics) + assert.Equal(t, 0.2, metrics.CoresAllocatable.Value) + + // delete the node and verify no metrics are present + watcher.handler.OnDelete(node) + nodeStore = metricsRepo.GetNodeStore(node.Name) + metrics = nodeStore.GetNodeMetrics() + require.NotNil(t, metrics) + assert.Nil(t, metrics.CoresAllocatable) + assert.Nil(t, metrics.MemoryAllocatable) +} + func TestCreateAllWatchers(t *testing.T) { resourceWatchers := NewWatchers() + metricsRepo := NewMetricsRepo() client := k8sfake.NewSimpleClientset() metadataClient := k8smetafake.NewSimpleMetadataClient(k8smetafake.NewTestScheme()) @@ -211,7 +391,16 @@ func TestCreateAllWatchers(t *testing.T) { log := logp.NewLogger("test") // Start watchers based on a resource that does not exist should cause an error - err := createAllWatchers(client, metadataClient, "does-not-exist", "does-not-exist", false, config, log, resourceWatchers) + err := createAllWatchers( + client, + metadataClient, + "does-not-exist", + "does-not-exist", + false, + config, + log, + resourceWatchers, + metricsRepo) require.Error(t, err) resourceWatchers.lock.Lock() require.Equal(t, 0, len(resourceWatchers.metaWatchersMap)) @@ -220,7 +409,16 @@ func TestCreateAllWatchers(t *testing.T) { // Start watcher for a resource that requires other resources, should start all the watchers metricsetPod := "pod" extras := getExtraWatchers(PodResource, config.AddResourceMetadata) - err = createAllWatchers(client, metadataClient, metricsetPod, PodResource, false, config, log, resourceWatchers) + err = createAllWatchers( + client, + metadataClient, + metricsetPod, + PodResource, + false, + config, + log, + resourceWatchers, + metricsRepo) require.NoError(t, err) // Check that all the required watchers are in the map @@ -235,6 +433,7 @@ func TestCreateAllWatchers(t *testing.T) { func TestCreateMetaGen(t *testing.T) { resourceWatchers := NewWatchers() + metricsRepo := NewMetricsRepo() commonMetaConfig := metadata.Config{} commonConfig, err := conf.NewConfigFrom(&commonMetaConfig) @@ -259,7 +458,16 @@ func TestCreateMetaGen(t *testing.T) { // Create the watchers necessary for the metadata generator metricsetDeployment := "state_deployment" - err = createAllWatchers(client, metadataClient, metricsetDeployment, DeploymentResource, false, config, log, resourceWatchers) + err = createAllWatchers( + client, + metadataClient, + metricsetDeployment, + DeploymentResource, + false, + config, + log, + resourceWatchers, + metricsRepo) require.NoError(t, err) // Create the generators, this time without error @@ -269,6 +477,7 @@ func TestCreateMetaGen(t *testing.T) { func TestCreateMetaGenSpecific(t *testing.T) { resourceWatchers := NewWatchers() + metricsRepo := NewMetricsRepo() commonMetaConfig := metadata.Config{} commonConfig, err := conf.NewConfigFrom(&commonMetaConfig) @@ -302,7 +511,16 @@ func TestCreateMetaGenSpecific(t *testing.T) { require.Error(t, err) // Create the pod resource + the extras - err = createAllWatchers(client, metadataClient, metricsetPod, PodResource, false, config, log, resourceWatchers) + err = createAllWatchers( + client, + metadataClient, + metricsetPod, + PodResource, + false, + config, + log, + resourceWatchers, + metricsRepo) require.NoError(t, err) _, err = createMetadataGenSpecific(client, commonConfig, config.AddResourceMetadata, PodResource, resourceWatchers) @@ -315,7 +533,16 @@ func TestCreateMetaGenSpecific(t *testing.T) { // Create the service resource + the extras metricsetService := "state_service" - err = createAllWatchers(client, metadataClient, metricsetService, ServiceResource, false, config, log, resourceWatchers) + err = createAllWatchers( + client, + metadataClient, + metricsetService, + ServiceResource, + false, + config, + log, + resourceWatchers, + metricsRepo) require.NoError(t, err) _, err = createMetadataGenSpecific(client, commonConfig, config.AddResourceMetadata, ServiceResource, resourceWatchers) @@ -478,6 +705,7 @@ func TestBuildMetadataEnricher_Start_Stop_SameResources(t *testing.T) { func TestBuildMetadataEnricher_EventHandler(t *testing.T) { resourceWatchers := NewWatchers() + metricsRepo := NewMetricsRepo() resourceWatchers.lock.Lock() watcher := &metaWatcher{ @@ -485,9 +713,10 @@ func TestBuildMetadataEnricher_EventHandler(t *testing.T) { started: false, metricsetsUsing: []string{"pod"}, enrichers: make(map[string]*enricher), + metricsRepo: metricsRepo, } resourceWatchers.metaWatchersMap[PodResource] = watcher - addEventHandlerToWatcher(watcher, resourceWatchers) + addEventHandlersToWatcher(watcher, resourceWatchers) resourceWatchers.lock.Unlock() funcs := mockFuncs{} @@ -603,6 +832,7 @@ func TestBuildMetadataEnricher_EventHandler(t *testing.T) { func TestBuildMetadataEnricher_PartialMetadata(t *testing.T) { resourceWatchers := NewWatchers() + metricsRepo := NewMetricsRepo() resourceWatchers.lock.Lock() watcher := &metaWatcher{ @@ -612,9 +842,10 @@ func TestBuildMetadataEnricher_PartialMetadata(t *testing.T) { started: false, metricsetsUsing: []string{"replicaset"}, enrichers: make(map[string]*enricher), + metricsRepo: metricsRepo, } resourceWatchers.metaWatchersMap[ReplicaSetResource] = watcher - addEventHandlerToWatcher(watcher, resourceWatchers) + addEventHandlersToWatcher(watcher, resourceWatchers) resourceWatchers.lock.Unlock() isController := true @@ -655,7 +886,7 @@ func TestBuildMetadataEnricher_PartialMetadata(t *testing.T) { client := k8sfake.NewSimpleClientset() generalMetaGen := metadata.NewResourceMetadataGenerator(commonConfig, client) - updateFunc := getEventMetadataFunc(log, generalMetaGen, nil, nil) + updateFunc := getEventMetadataFunc(log, generalMetaGen, nil) deleteFunc := func(r kubernetes.Resource) []string { accessor, _ := meta.Accessor(r)