diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 3e9dcd68d68..8f21ddb1182 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -40,6 +40,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Affecting all Beats* - Fix a race condition with the Kafka pipeline client, it is possible that `Close()` get called before `Connect()` . {issue}11945[11945] +- Fix memory leak in kubernetes autodiscover provider and add_kubernetes_metadata processor happening when pods are terminated without sending a delete event. {pull}14259[14259] *Auditbeat* diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index 98c729c3329..6040e139421 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -109,24 +109,49 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config) (autodis } watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - p.logger.Debugf("Watcher Pod add: %+v", obj) - p.emit(obj.(*kubernetes.Pod), "start") - }, - UpdateFunc: func(obj interface{}) { - p.logger.Debugf("Watcher Pod update: %+v", obj) - p.emit(obj.(*kubernetes.Pod), "stop") - p.emit(obj.(*kubernetes.Pod), "start") - }, - DeleteFunc: func(obj interface{}) { - p.logger.Debugf("Watcher Pod delete: %+v", obj) - time.AfterFunc(config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") }) - }, + AddFunc: p.handleAdd, + UpdateFunc: p.handleUpdate, + DeleteFunc: p.handleDelete, }) return p, nil } +// handleAdd emits a start event for the given pod +func (p *Provider) handleAdd(obj interface{}) { + p.logger.Debugf("Watcher Pod add: %+v", obj) + p.emit(obj.(*kubernetes.Pod), "start") +} + +// handleUpdate emits events for a given pod depending on the state of the pod, +// if it is terminating, a stop event is scheduled, if not, a stop and a start +// events are sent sequentially to recreate the resources assotiated to the pod. +func (p *Provider) handleUpdate(obj interface{}) { + pod := obj.(*kubernetes.Pod) + if pod.GetObjectMeta().GetDeletionTimestamp() != nil { + p.logger.Debugf("Watcher Pod update (terminating): %+v", obj) + // Pod is terminating, don't reload its configuration and ignore the event + // if some pod is still running, we will receive more events when containers + // terminate. + for _, container := range pod.Status.ContainerStatuses { + if container.State.Running != nil { + return + } + } + time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(pod, "stop") }) + } else { + p.logger.Debugf("Watcher Pod update: %+v", obj) + p.emit(pod, "stop") + p.emit(pod, "start") + } +} + +// handleDelete emits a stop event for the given pod +func (p *Provider) handleDelete(obj interface{}) { + p.logger.Debugf("Watcher Pod delete: %+v", obj) + time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") }) +} + // Start for Runner interface. func (p *Provider) Start() { if err := p.watcher.Start(); err != nil { diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index ea635314b07..d8cfe20d791 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -120,14 +120,19 @@ func New(cfg *common.Config) (processors.Processor, error) { watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - processor.addPod(obj.(*kubernetes.Pod)) + pod := obj.(*kubernetes.Pod) + logp.Debug("kubernetes", "%v: adding pod: %s/%s", "add_kubernetes_metadata", pod.GetNamespace(), pod.GetName()) + processor.addPod(pod) }, UpdateFunc: func(obj interface{}) { - processor.removePod(obj.(*kubernetes.Pod)) - processor.addPod(obj.(*kubernetes.Pod)) + pod := obj.(*kubernetes.Pod) + logp.Debug("kubernetes", "%v: updating pod: %s/%s", "add_kubernetes_metadata", pod.GetNamespace(), pod.GetName()) + processor.updatePod(pod) }, DeleteFunc: func(obj interface{}) { - processor.removePod(obj.(*kubernetes.Pod)) + pod := obj.(*kubernetes.Pod) + logp.Debug("kubernetes", "%v: removing pod: %s/%s", "add_kubernetes_metadata", pod.GetNamespace(), pod.GetName()) + processor.removePod(pod) }, }) @@ -163,6 +168,18 @@ func (k *kubernetesAnnotator) addPod(pod *kubernetes.Pod) { } } +func (k *kubernetesAnnotator) updatePod(pod *kubernetes.Pod) { + k.removePod(pod) + + // Add it again only if it is not being deleted + if pod.GetObjectMeta().GetDeletionTimestamp() != nil { + logp.Debug("kubernetes", "%v: removing pod being terminated: %s/%s", "add_kubernetes_metadata", pod.GetNamespace(), pod.GetName()) + return + } + + k.addPod(pod) +} + func (k *kubernetesAnnotator) removePod(pod *kubernetes.Pod) { indexes := k.indexers.GetIndexes(pod) for _, idx := range indexes {