Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't emit Kubernetes autodiscover events without host #7235

Merged
merged 1 commit into from
Jun 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- Negotiate Docker API version from our client instead of using a hardcoded one. {pull}7165[7165]
- Fix delays on autodiscovery events handling caused by blocking runner stops. {pull}7170[7170]
- Error out on invalid Autodiscover template conditions settings. {pull}7200[7200]
- Do not emit Kubernetes autodiscover events for Pods without IP address. {pull}7235[7235]

*Auditbeat*

Expand Down
8 changes: 8 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,16 @@ func AutodiscoverBuilder(bus bus.Bus, c *common.Config) (autodiscover.Provider,

watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj kubernetes.Resource) {
logp.Debug("kubernetes", "Watcher Pod add: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "start")
},
UpdateFunc: func(obj kubernetes.Resource) {
logp.Debug("kubernetes", "Watcher Pod update: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "stop")
p.emit(obj.(*kubernetes.Pod), "start")
},
DeleteFunc: func(obj kubernetes.Resource) {
logp.Debug("kubernetes", "Watcher Pod delete: %+v", obj)
time.AfterFunc(config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") })
},
})
Expand All @@ -120,6 +123,11 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []kub
containerstatuses []kubernetes.PodContainerStatus) {
host := pod.Status.PodIP

// Do not emit events without host (container is still being configured)
if host == "" {
return
}

// Collect all container IDs and runtimes from status information.
containerIDs := map[string]string{}
runtimes := map[string]string{}
Expand Down
137 changes: 137 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package kubernetes

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/autodiscover/template"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/bus"
"github.com/elastic/beats/libbeat/common/kubernetes"
)

func TestGenerateHints(t *testing.T) {
Expand Down Expand Up @@ -129,6 +132,140 @@ func TestGenerateHints(t *testing.T) {
}
}

func TestEmitEvent(t *testing.T) {
tests := []struct {
Message string
Flag string
Pod *kubernetes.Pod
Expected bus.Event
}{
{
Message: "Test common pod start",
Flag: "start",
Pod: &kubernetes.Pod{
Metadata: kubernetes.ObjectMeta{
Name: "filebeat",
Namespace: "default",
Labels: map[string]string{},
Annotations: map[string]string{},
},
Status: kubernetes.PodStatus{
PodIP: "127.0.0.1",
ContainerStatuses: []kubernetes.PodContainerStatus{
{
Name: "filebeat",
ContainerID: "docker://foobar",
},
},
},
Spec: kubernetes.PodSpec{
NodeName: "node",
Containers: []kubernetes.Container{
{
Image: "elastic/filebeat:6.3.0",
Name: "filebeat",
},
},
},
},
Expected: bus.Event{
"start": true,
"host": "127.0.0.1",
"kubernetes": common.MapStr{
"container": common.MapStr{
"id": "foobar",
"name": "filebeat",
"image": "elastic/filebeat:6.3.0",
"runtime": "docker",
},
"pod": common.MapStr{
"name": "filebeat",
},
"node": common.MapStr{
"name": "node",
},
"namespace": "default",
"annotations": common.MapStr{},
},
"meta": common.MapStr{
"kubernetes": common.MapStr{
"namespace": "default",
"container": common.MapStr{
"name": "filebeat",
}, "pod": common.MapStr{
"name": "filebeat",
}, "node": common.MapStr{
"name": "node",
},
},
},
},
},
{
Message: "Test pod without host",
Flag: "start",
Pod: &kubernetes.Pod{
Metadata: kubernetes.ObjectMeta{
Name: "filebeat",
Namespace: "default",
Labels: map[string]string{},
Annotations: map[string]string{},
},
Status: kubernetes.PodStatus{
ContainerStatuses: []kubernetes.PodContainerStatus{
{
Name: "filebeat",
ContainerID: "docker://foobar",
},
},
},
Spec: kubernetes.PodSpec{
NodeName: "node",
Containers: []kubernetes.Container{
{
Image: "elastic/filebeat:6.3.0",
Name: "filebeat",
},
},
},
},
Expected: nil,
},
}

for _, test := range tests {
mapper, err := template.NewConfigMapper(nil)
if err != nil {
t.Fatal(err)
}

metaGen, err := kubernetes.NewMetaGenerator(common.NewConfig())
if err != nil {
t.Fatal(err)
}

p := &Provider{
config: defaultConfig(),
bus: bus.New("test"),
metagen: metaGen,
templates: mapper,
}

listener := p.bus.Subscribe()

p.emit(test.Pod, test.Flag)

select {
case event := <-listener.Events():
assert.Equal(t, test.Expected, event)
case <-time.After(2 * time.Second):
if test.Expected != nil {
t.Fatal("Timeout while waiting for event")
}
}
}
}

func getNestedAnnotations(in common.MapStr) common.MapStr {
out := common.MapStr{}

Expand Down