diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 973b0aa4db3..b3ec8886839 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -66,6 +66,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff] - Negotiate Docker API version from our client instead of using a hardcoded one. {pull}7165[7165] - Fix delays on autodiscovery events handling caused by blocking runner stops. {pull}7170[7170] - Error out on invalid Autodiscover template conditions settings. {pull}7200[7200] +- Do not emit Kubernetes autodiscover events for Pods without IP address. {pull}7235[7235] *Auditbeat* diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index 786dacd400e..d3b3880ff57 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -87,13 +87,16 @@ func AutodiscoverBuilder(bus bus.Bus, c *common.Config) (autodiscover.Provider, watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{ AddFunc: func(obj kubernetes.Resource) { + logp.Debug("kubernetes", "Watcher Pod add: %+v", obj) p.emit(obj.(*kubernetes.Pod), "start") }, UpdateFunc: func(obj kubernetes.Resource) { + logp.Debug("kubernetes", "Watcher Pod update: %+v", obj) p.emit(obj.(*kubernetes.Pod), "stop") p.emit(obj.(*kubernetes.Pod), "start") }, DeleteFunc: func(obj kubernetes.Resource) { + logp.Debug("kubernetes", "Watcher Pod delete: %+v", obj) time.AfterFunc(config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") }) }, }) @@ -120,6 +123,11 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []kub containerstatuses []kubernetes.PodContainerStatus) { host := pod.Status.PodIP + // Do not emit events without host (container is still being configured) + if host == "" { + return + } + // Collect all container IDs and runtimes from status information. containerIDs := map[string]string{} runtimes := map[string]string{} diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go b/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go index 810692eff25..4a005597625 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go @@ -2,11 +2,14 @@ package kubernetes import ( "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/elastic/beats/libbeat/autodiscover/template" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/bus" + "github.com/elastic/beats/libbeat/common/kubernetes" ) func TestGenerateHints(t *testing.T) { @@ -129,6 +132,140 @@ func TestGenerateHints(t *testing.T) { } } +func TestEmitEvent(t *testing.T) { + tests := []struct { + Message string + Flag string + Pod *kubernetes.Pod + Expected bus.Event + }{ + { + Message: "Test common pod start", + Flag: "start", + Pod: &kubernetes.Pod{ + Metadata: kubernetes.ObjectMeta{ + Name: "filebeat", + Namespace: "default", + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + Status: kubernetes.PodStatus{ + PodIP: "127.0.0.1", + ContainerStatuses: []kubernetes.PodContainerStatus{ + { + Name: "filebeat", + ContainerID: "docker://foobar", + }, + }, + }, + Spec: kubernetes.PodSpec{ + NodeName: "node", + Containers: []kubernetes.Container{ + { + Image: "elastic/filebeat:6.3.0", + Name: "filebeat", + }, + }, + }, + }, + Expected: bus.Event{ + "start": true, + "host": "127.0.0.1", + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "foobar", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "docker", + }, + "pod": common.MapStr{ + "name": "filebeat", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "container": common.MapStr{ + "name": "filebeat", + }, "pod": common.MapStr{ + "name": "filebeat", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + }, + }, + { + Message: "Test pod without host", + Flag: "start", + Pod: &kubernetes.Pod{ + Metadata: kubernetes.ObjectMeta{ + Name: "filebeat", + Namespace: "default", + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + Status: kubernetes.PodStatus{ + ContainerStatuses: []kubernetes.PodContainerStatus{ + { + Name: "filebeat", + ContainerID: "docker://foobar", + }, + }, + }, + Spec: kubernetes.PodSpec{ + NodeName: "node", + Containers: []kubernetes.Container{ + { + Image: "elastic/filebeat:6.3.0", + Name: "filebeat", + }, + }, + }, + }, + Expected: nil, + }, + } + + for _, test := range tests { + mapper, err := template.NewConfigMapper(nil) + if err != nil { + t.Fatal(err) + } + + metaGen, err := kubernetes.NewMetaGenerator(common.NewConfig()) + if err != nil { + t.Fatal(err) + } + + p := &Provider{ + config: defaultConfig(), + bus: bus.New("test"), + metagen: metaGen, + templates: mapper, + } + + listener := p.bus.Subscribe() + + p.emit(test.Pod, test.Flag) + + select { + case event := <-listener.Events(): + assert.Equal(t, test.Expected, event) + case <-time.After(2 * time.Second): + if test.Expected != nil { + t.Fatal("Timeout while waiting for event") + } + } + } +} + func getNestedAnnotations(in common.MapStr) common.MapStr { out := common.MapStr{}