diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index 649c372bfd63..30ed913060ba 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -284,6 +284,7 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet var ( annotations = common.MapStr{} nsAnn = common.MapStr{} + events = make([]bus.Event, 0) ) for k, v := range pod.GetObjectMeta().GetAnnotations() { safemapstr.Put(annotations, k, v) @@ -299,7 +300,6 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet } } - emitted := 0 // Emit container and port information for _, c := range containers { // If it doesn't have an ID, container doesn't exist in @@ -345,8 +345,7 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet "kubernetes": meta, }, } - p.publish(event) - emitted++ + events = append(events, event) } for _, port := range c.Ports { @@ -361,16 +360,16 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet "kubernetes": meta, }, } - p.publish(event) - emitted++ + events = append(events, event) } } - // Finally publish a pod level event so that hints that have no exposed ports can get processed. + // Publish a pod level event so that hints that have no exposed ports can get processed. // Log hints would just ignore this event as there is no ${data.container.id} - // Publish the pod level hint only if atleast one container level hint was emitted. This ensures that there is + // Publish the pod level hint only if at least one container level hint was generated. This ensures that there is // no unnecessary pod level events emitted prematurely. - if emitted != 0 { + // We publish the pod level hint first so that it doesn't override a valid container level event. + if len(events) != 0 { meta := p.metagen.Generate(pod) // Information that can be used in discovering a workload @@ -392,6 +391,10 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet }, } p.publish(event) + } + // Publish the container level hints finally. + for _, event := range events { + p.publish(event) } } diff --git a/libbeat/autodiscover/providers/kubernetes/pod_test.go b/libbeat/autodiscover/providers/kubernetes/pod_test.go index 7e5d51b23b06..2dacee2b9fc6 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod_test.go +++ b/libbeat/autodiscover/providers/kubernetes/pod_test.go @@ -353,7 +353,7 @@ func TestEmitEvent(t *testing.T) { Message string Flag string Pod *kubernetes.Pod - Expected bus.Event + Expected []bus.Event }{ { Message: "Test common pod start", @@ -389,44 +389,227 @@ func TestEmitEvent(t *testing.T) { }, }, }, - Expected: bus.Event{ - "start": true, - "host": "127.0.0.1", - "port": 0, - "id": cid, - "provider": UUID, - "kubernetes": common.MapStr{ - "container": common.MapStr{ - "id": "foobar", - "name": "filebeat", - "image": "elastic/filebeat:6.3.0", - "runtime": "docker", + Expected: []bus.Event{ + { + "start": true, + "host": "127.0.0.1", + "id": uid, + "provider": UUID, + "kubernetes": common.MapStr{ + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, }, - "pod": common.MapStr{ - "name": "filebeat", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, }, - "node": common.MapStr{ - "name": "node", + "config": []*common.Config{}, + }, + { + "start": true, + "host": "127.0.0.1", + "port": 0, + "id": cid, + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "foobar", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "docker", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, }, - "namespace": "default", - "annotations": common.MapStr{}, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "container": common.MapStr{ + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + }, "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, + }, + }, + }, + { + Message: "Test common pod start with multiple ports exposed", + Flag: "start", + Pod: &kubernetes.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, }, - "meta": common.MapStr{ + TypeMeta: typeMeta, + Status: v1.PodStatus{ + PodIP: podIP, + ContainerStatuses: []kubernetes.PodContainerStatus{ + { + Name: name, + ContainerID: containerID, + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{}, + }, + }, + }, + }, + Spec: v1.PodSpec{ + NodeName: node, + Containers: []kubernetes.Container{ + { + Image: containerImage, + Name: name, + Ports: []v1.ContainerPort{ + { + ContainerPort: 8080, + }, + { + ContainerPort: 9090, + }, + }, + }, + }, + }, + }, + Expected: []bus.Event{ + { + "start": true, + "host": "127.0.0.1", + "id": uid, + "provider": UUID, + "kubernetes": common.MapStr{ + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, + }, + { + "start": true, + "host": "127.0.0.1", + "port": int32(8080), + "id": cid, + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "foobar", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "docker", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "container": common.MapStr{ + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + }, "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, + }, + { + "start": true, + "host": "127.0.0.1", + "port": int32(9090), + "id": cid, + "provider": UUID, "kubernetes": common.MapStr{ - "namespace": "default", "container": common.MapStr{ - "name": "filebeat", - "image": "elastic/filebeat:6.3.0", - }, "pod": common.MapStr{ + "id": "foobar", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "docker", + }, + "pod": common.MapStr{ "name": "filebeat", "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, "node": common.MapStr{ + }, + "node": common.MapStr{ "name": "node", }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "container": common.MapStr{ + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + }, "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, }, + "config": []*common.Config{}, }, - "config": []*common.Config{}, }, }, { @@ -522,44 +705,75 @@ func TestEmitEvent(t *testing.T) { }, }, }, - Expected: bus.Event{ - "stop": true, - "host": "", - "id": cid, - "port": 0, - "provider": UUID, - "kubernetes": common.MapStr{ - "container": common.MapStr{ - "id": "", - "name": "filebeat", - "image": "elastic/filebeat:6.3.0", - "runtime": "", - }, - "pod": common.MapStr{ - "name": "filebeat", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + Expected: []bus.Event{ + { + "stop": true, + "host": "", + "id": uid, + "provider": UUID, + "kubernetes": common.MapStr{ + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, }, - "node": common.MapStr{ - "name": "node", + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, }, - "namespace": "default", - "annotations": common.MapStr{}, + "config": []*common.Config{}, }, - "meta": common.MapStr{ + { + "stop": true, + "host": "", + "id": cid, + "port": 0, + "provider": UUID, "kubernetes": common.MapStr{ - "namespace": "default", "container": common.MapStr{ - "name": "filebeat", - "image": "elastic/filebeat:6.3.0", - }, "pod": common.MapStr{ + "id": "", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "", + }, + "pod": common.MapStr{ "name": "filebeat", "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, "node": common.MapStr{ + }, + "node": common.MapStr{ "name": "node", }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "container": common.MapStr{ + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + }, "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, }, + "config": []*common.Config{}, }, - "config": []*common.Config{}, }, }, { @@ -592,44 +806,176 @@ func TestEmitEvent(t *testing.T) { }, }, }, - Expected: bus.Event{ - "stop": true, - "host": "127.0.0.1", - "port": 0, - "id": cid, - "provider": UUID, - "kubernetes": common.MapStr{ - "container": common.MapStr{ - "id": "", - "name": "filebeat", - "image": "elastic/filebeat:6.3.0", - "runtime": "", + Expected: []bus.Event{ + { + "stop": true, + "host": "127.0.0.1", + "id": uid, + "provider": UUID, + "kubernetes": common.MapStr{ + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, }, - "pod": common.MapStr{ - "name": "filebeat", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, + }, + { + "stop": true, + "host": "127.0.0.1", + "port": 0, + "id": cid, + "provider": UUID, + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "", + }, + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, }, - "node": common.MapStr{ - "name": "node", + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "container": common.MapStr{ + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + }, "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, }, - "namespace": "default", - "annotations": common.MapStr{}, + "config": []*common.Config{}, }, - "meta": common.MapStr{ + }, + }, + { + Message: "Test stop pod without container id", + Flag: "stop", + Pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + TypeMeta: typeMeta, + Status: v1.PodStatus{ + PodIP: podIP, + ContainerStatuses: []kubernetes.PodContainerStatus{ + { + Name: name, + }, + }, + }, + Spec: v1.PodSpec{ + NodeName: node, + Containers: []kubernetes.Container{ + { + Image: containerImage, + Name: name, + }, + }, + }, + }, + Expected: []bus.Event{ + { + "stop": true, + "host": "127.0.0.1", + "id": uid, + "provider": UUID, + "kubernetes": common.MapStr{ + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, + }, + { + "stop": true, + "host": "127.0.0.1", + "port": 0, + "id": cid, + "provider": UUID, "kubernetes": common.MapStr{ - "namespace": "default", "container": common.MapStr{ - "name": "filebeat", - "image": "elastic/filebeat:6.3.0", - }, "pod": common.MapStr{ + "id": "", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "", + }, + "pod": common.MapStr{ "name": "filebeat", "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", - }, "node": common.MapStr{ + }, + "node": common.MapStr{ "name": "node", }, + "namespace": "default", + "annotations": common.MapStr{}, }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "container": common.MapStr{ + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + }, "pod": common.MapStr{ + "name": "filebeat", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + "config": []*common.Config{}, }, - "config": []*common.Config{}, }, }, } @@ -663,14 +1009,17 @@ func TestEmitEvent(t *testing.T) { pod.emit(test.Pod, test.Flag) - select { - case event := <-listener.Events(): - assert.Equal(t, test.Expected, event, test.Message) - case <-time.After(2 * time.Second): - if test.Expected != nil { - t.Fatal("Timeout while waiting for event") + for i := 0; i < len(test.Expected); i++ { + select { + case event := <-listener.Events(): + assert.Equal(t, test.Expected[i], event, test.Message) + case <-time.After(2 * time.Second): + if test.Expected != nil { + t.Fatal("Timeout while waiting for event") + } } } + }) } } diff --git a/metricbeat/autodiscover/builder/hints/metrics.go b/metricbeat/autodiscover/builder/hints/metrics.go index 52eba34a1ff5..37ec6f150e0f 100644 --- a/metricbeat/autodiscover/builder/hints/metrics.go +++ b/metricbeat/autodiscover/builder/hints/metrics.go @@ -133,7 +133,6 @@ func (m *metricHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*c moduleConfig := common.MapStr{ "module": mod, "metricsets": msets, - "hosts": hosts, "timeout": tout, "period": ival, "enabled": true, @@ -154,15 +153,30 @@ func (m *metricHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*c moduleConfig["password"] = password } - logp.Debug("hints.builder", "generated config: %v", moduleConfig) + // If there are hosts that match, ensure that there is a module config for each valid host. + // We do this because every config that is from a Pod that has an exposed port will generate a valid + // module config. However, the pod level hint will generate a config with all hosts that are defined in the + // config. To make sure that these pod level configs get deduped, it is essential that we generate exactly one + // module config per host. + if len(hosts) != 0 { + for _, h := range hosts { + mod := moduleConfig.Clone() + mod["hosts"] = []string{h} + + logp.Debug("hints.builder", "generated config: %v", mod) + + // Create config object + cfg := m.generateConfig(mod) + configs = append(configs, cfg) + } + } else { + logp.Debug("hints.builder", "generated config: %v", moduleConfig) - // Create config object - cfg, err := common.NewConfigFrom(moduleConfig) - if err != nil { - logp.Debug("hints.builder", "config merge failed with error: %v", err) + // Create config object + cfg := m.generateConfig(moduleConfig) + configs = append(configs, cfg) } - logp.Debug("hints.builder", "generated config: %+v", common.DebugString(cfg, true)) - configs = append(configs, cfg) + } // Apply information in event to the template to generate the final config @@ -171,6 +185,15 @@ func (m *metricHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*c return template.ApplyConfigTemplate(event, configs, options...) } +func (m *metricHints) generateConfig(mod common.MapStr) *common.Config { + cfg, err := common.NewConfigFrom(mod) + if err != nil { + logp.Debug("hints.builder", "config merge failed with error: %v", err) + } + logp.Debug("hints.builder", "generated config: %+v", common.DebugString(cfg, true)) + return cfg +} + func (m *metricHints) getModule(hints common.MapStr) string { return builder.GetHintString(hints, m.Key, module) } diff --git a/metricbeat/autodiscover/builder/hints/metrics_test.go b/metricbeat/autodiscover/builder/hints/metrics_test.go index d2d0462a0476..4badd9c2a026 100644 --- a/metricbeat/autodiscover/builder/hints/metrics_test.go +++ b/metricbeat/autodiscover/builder/hints/metrics_test.go @@ -409,6 +409,66 @@ func TestGenerateHints(t *testing.T) { }, }, }, + { + message: "Module with multiple hosts returns the right number of hints. Pod level hints need to be one per host", + event: bus.Event{ + "host": "1.2.3.4", + "hints": common.MapStr{ + "metrics": common.MapStr{ + "module": "mockmoduledefaults", + "namespace": "test", + "hosts": "${data.host}:9090, ${data.host}:9091", + }, + }, + }, + len: 2, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "namespace": "test", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + "hosts": []interface{}{"1.2.3.4:9090"}, + }, + { + "module": "mockmoduledefaults", + "namespace": "test", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + "hosts": []interface{}{"1.2.3.4:9091"}, + }, + }, + }, + { + message: "Module with multiple hosts and an exposed port creates a config for just the exposed port", + event: bus.Event{ + "host": "1.2.3.4", + "port": 9091, + "hints": common.MapStr{ + "metrics": common.MapStr{ + "module": "mockmoduledefaults", + "namespace": "test", + "hosts": "${data.host}:9090, ${data.host}:9091", + }, + }, + }, + len: 1, + result: []common.MapStr{ + { + "module": "mockmoduledefaults", + "namespace": "test", + "metricsets": []string{"default"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + "hosts": []interface{}{"1.2.3.4:9091"}, + }, + }, + }, } for _, test := range tests { mockRegister := mb.NewRegister() @@ -423,7 +483,7 @@ func TestGenerateHints(t *testing.T) { logger: logp.NewLogger("hints.builder"), } cfgs := m.CreateConfig(test.event) - assert.Equal(t, len(cfgs), test.len) + assert.Equal(t, len(cfgs), test.len, test.message) // The check below helps skipping config validation if there is no config supposed to be emitted. if len(cfgs) == 0 {