Skip to content

Commit

Permalink
Cherry-pick #14259 to 7.5: Fix cleanup when autodiscover pods are ter…
Browse files Browse the repository at this point in the history
…minated (#14265)

In some cases pods termination doesn't originate a delete event in the
API watchers. Detect termination also by checking if a deletion
timestamp exists in update events.

(cherry picked from commit 1c36118)
  • Loading branch information
jsoriano authored Oct 28, 2019
1 parent a0f7fca commit e6ce8ef
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
*Affecting all Beats*

- Fix a race condition with the Kafka pipeline client, it is possible that `Close()` get called before `Connect()` . {issue}11945[11945]
- Fix memory leak in kubernetes autodiscover provider and add_kubernetes_metadata processor happening when pods are terminated without sending a delete event. {pull}14259[14259]

*Auditbeat*

Expand Down
51 changes: 38 additions & 13 deletions libbeat/autodiscover/providers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,24 +123,49 @@ func AutodiscoverBuilder(bus bus.Bus, uuid uuid.UUID, c *common.Config) (autodis
}

watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
p.logger.Debugf("Watcher Pod add: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "start")
},
UpdateFunc: func(obj interface{}) {
p.logger.Debugf("Watcher Pod update: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "stop")
p.emit(obj.(*kubernetes.Pod), "start")
},
DeleteFunc: func(obj interface{}) {
p.logger.Debugf("Watcher Pod delete: %+v", obj)
time.AfterFunc(config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") })
},
AddFunc: p.handleAdd,
UpdateFunc: p.handleUpdate,
DeleteFunc: p.handleDelete,
})

return p, nil
}

// handleAdd emits a start event for the given pod
func (p *Provider) handleAdd(obj interface{}) {
p.logger.Debugf("Watcher Pod add: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "start")
}

// handleUpdate emits events for a given pod depending on the state of the pod,
// if it is terminating, a stop event is scheduled, if not, a stop and a start
// events are sent sequentially to recreate the resources assotiated to the pod.
func (p *Provider) handleUpdate(obj interface{}) {
pod := obj.(*kubernetes.Pod)
if pod.GetObjectMeta().GetDeletionTimestamp() != nil {
p.logger.Debugf("Watcher Pod update (terminating): %+v", obj)
// Pod is terminating, don't reload its configuration and ignore the event
// if some pod is still running, we will receive more events when containers
// terminate.
for _, container := range pod.Status.ContainerStatuses {
if container.State.Running != nil {
return
}
}
time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(pod, "stop") })
} else {
p.logger.Debugf("Watcher Pod update: %+v", obj)
p.emit(pod, "stop")
p.emit(pod, "start")
}
}

// handleDelete emits a stop event for the given pod
func (p *Provider) handleDelete(obj interface{}) {
p.logger.Debugf("Watcher Pod delete: %+v", obj)
time.AfterFunc(p.config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") })
}

// Start for Runner interface.
func (p *Provider) Start() {
if err := p.watcher.Start(); err != nil {
Expand Down
31 changes: 24 additions & 7 deletions libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ func New(cfg *common.Config) (processors.Processor, error) {
client, err := kubernetes.GetKubernetesClient(config.KubeConfig)
if err != nil {
if kubernetes.IsInCluster(config.KubeConfig) {
logp.Debug("kubernetes", "%v: could not create kubernetes client using in_cluster config", "add_kubernetes_metadata")
logp.Debug("kubernetes", "%v: could not create kubernetes client using in_cluster config: %v", "add_kubernetes_metadata", err)
} else if config.KubeConfig == "" {
logp.Debug("kubernetes", "%v: could not create kubernetes client using config: %v", "add_kubernetes_metadata", os.Getenv("KUBECONFIG"))
logp.Debug("kubernetes", "%v: could not create kubernetes client using config: %v: %v", "add_kubernetes_metadata", os.Getenv("KUBECONFIG"), err)
} else {
logp.Debug("kubernetes", "%v: could not create kubernetes client using config: %v", "add_kubernetes_metadata", config.KubeConfig)
logp.Debug("kubernetes", "%v: could not create kubernetes client using config: %v: %v", "add_kubernetes_metadata", config.KubeConfig, err)
}
return processor, nil
}
Expand Down Expand Up @@ -152,14 +152,19 @@ func New(cfg *common.Config) (processors.Processor, error) {

watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
processor.addPod(obj.(*kubernetes.Pod))
pod := obj.(*kubernetes.Pod)
logp.Debug("kubernetes", "%v: adding pod: %s/%s", "add_kubernetes_metadata", pod.GetNamespace(), pod.GetName())
processor.addPod(pod)
},
UpdateFunc: func(obj interface{}) {
processor.removePod(obj.(*kubernetes.Pod))
processor.addPod(obj.(*kubernetes.Pod))
pod := obj.(*kubernetes.Pod)
logp.Debug("kubernetes", "%v: updating pod: %s/%s", "add_kubernetes_metadata", pod.GetNamespace(), pod.GetName())
processor.updatePod(pod)
},
DeleteFunc: func(obj interface{}) {
processor.removePod(obj.(*kubernetes.Pod))
pod := obj.(*kubernetes.Pod)
logp.Debug("kubernetes", "%v: removing pod: %s/%s", "add_kubernetes_metadata", pod.GetNamespace(), pod.GetName())
processor.removePod(pod)
},
})

Expand Down Expand Up @@ -198,6 +203,18 @@ func (k *kubernetesAnnotator) addPod(pod *kubernetes.Pod) {
}
}

func (k *kubernetesAnnotator) updatePod(pod *kubernetes.Pod) {
k.removePod(pod)

// Add it again only if it is not being deleted
if pod.GetObjectMeta().GetDeletionTimestamp() != nil {
logp.Debug("kubernetes", "%v: removing pod being terminated: %s/%s", "add_kubernetes_metadata", pod.GetNamespace(), pod.GetName())
return
}

k.addPod(pod)
}

func (k *kubernetesAnnotator) removePod(pod *kubernetes.Pod) {
indexes := k.indexers.GetIndexes(pod)
for _, idx := range indexes {
Expand Down

0 comments on commit e6ce8ef

Please sign in to comment.