Skip to content

Commit

Permalink
Fix cleanup when autodiscover pods are terminated (elastic#14259) (el…
Browse files Browse the repository at this point in the history
…astic#14268)

In some cases pods termination doesn't originate a delete event in the
API watchers. Detect termination also by checking if a deletion
timestamp exists in update events.

(cherry picked from commit 9de0061)
  • Loading branch information
jsoriano authored Nov 5, 2019

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 65f7a01 commit 7b7502e
Showing 3 changed files with 60 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
@@ -40,6 +40,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
*Affecting all Beats*

- Fix a race condition with the Kafka pipeline client, it is possible that `Close()` get called before `Connect()` . {issue}11945[11945]
- Fix memory leak in kubernetes autodiscover provider and add_kubernetes_metadata processor happening when pods are terminated without sending a delete event. {pull}14259[14259]

*Auditbeat*

51 changes: 38 additions & 13 deletions libbeat/autodiscover/providers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
@@ -109,24 +109,49 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config) (autodis
}

watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
p.logger.Debugf("Watcher Pod add: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "start")
},
UpdateFunc: func(obj interface{}) {
p.logger.Debugf("Watcher Pod update: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "stop")
p.emit(obj.(*kubernetes.Pod), "start")
},
DeleteFunc: func(obj interface{}) {
p.logger.Debugf("Watcher Pod delete: %+v", obj)
time.AfterFunc(config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") })
},
AddFunc: p.handleAdd,
UpdateFunc: p.handleUpdate,
DeleteFunc: p.handleDelete,
})

return p, nil
}

// handleAdd emits a start event for the given pod
func (p *Provider) handleAdd(obj interface{}) {
p.logger.Debugf("Watcher Pod add: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "start")
}

// handleUpdate emits events for a given pod depending on the state of the pod,
// if it is terminating, a stop event is scheduled, if not, a stop and a start
// events are sent sequentially to recreate the resources assotiated to the pod.
func (p *Provider) handleUpdate(obj interface{}) {
pod := obj.(*kubernetes.Pod)
if pod.GetObjectMeta().GetDeletionTimestamp() != nil {
p.logger.Debugf("Watcher Pod update (terminating): %+v", obj)
// Pod is terminating, don't reload its configuration and ignore the event
// if some pod is still running, we will receive more events when containers
// terminate.
for _, container := range pod.Status.ContainerStatuses {
if container.State.Running != nil {
return
}
}
time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(pod, "stop") })
} else {
p.logger.Debugf("Watcher Pod update: %+v", obj)
p.emit(pod, "stop")
p.emit(pod, "start")
}
}

// handleDelete emits a stop event for the given pod
func (p *Provider) handleDelete(obj interface{}) {
p.logger.Debugf("Watcher Pod delete: %+v", obj)
time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") })
}

// Start for Runner interface.
func (p *Provider) Start() {
if err := p.watcher.Start(); err != nil {
25 changes: 21 additions & 4 deletions libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
@@ -120,14 +120,19 @@ func New(cfg *common.Config) (processors.Processor, error) {

watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
processor.addPod(obj.(*kubernetes.Pod))
pod := obj.(*kubernetes.Pod)
logp.Debug("kubernetes", "%v: adding pod: %s/%s", "add_kubernetes_metadata", pod.GetNamespace(), pod.GetName())
processor.addPod(pod)
},
UpdateFunc: func(obj interface{}) {
processor.removePod(obj.(*kubernetes.Pod))
processor.addPod(obj.(*kubernetes.Pod))
pod := obj.(*kubernetes.Pod)
logp.Debug("kubernetes", "%v: updating pod: %s/%s", "add_kubernetes_metadata", pod.GetNamespace(), pod.GetName())
processor.updatePod(pod)
},
DeleteFunc: func(obj interface{}) {
processor.removePod(obj.(*kubernetes.Pod))
pod := obj.(*kubernetes.Pod)
logp.Debug("kubernetes", "%v: removing pod: %s/%s", "add_kubernetes_metadata", pod.GetNamespace(), pod.GetName())
processor.removePod(pod)
},
})

@@ -163,6 +168,18 @@ func (k *kubernetesAnnotator) addPod(pod *kubernetes.Pod) {
}
}

func (k *kubernetesAnnotator) updatePod(pod *kubernetes.Pod) {
k.removePod(pod)

// Add it again only if it is not being deleted
if pod.GetObjectMeta().GetDeletionTimestamp() != nil {
logp.Debug("kubernetes", "%v: removing pod being terminated: %s/%s", "add_kubernetes_metadata", pod.GetNamespace(), pod.GetName())
return
}

k.addPod(pod)
}

func (k *kubernetesAnnotator) removePod(pod *kubernetes.Pod) {
indexes := k.indexers.GetIndexes(pod)
for _, idx := range indexes {

0 comments on commit 7b7502e

Please sign in to comment.