Skip to content

Commit

Permalink
fix(inputs.prometheus): moved from watcher to informer (#10932)
Browse files Browse the repository at this point in the history
(cherry picked from commit 777f8bf)
  • Loading branch information
mesksr authored and Sebastian Spaink committed Apr 25, 2022
1 parent fc8301a commit ca76637
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 41 deletions.
4 changes: 4 additions & 0 deletions plugins/inputs/prometheus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ in Prometheus format.
# eg. To scrape pods on a specific node
# kubernetes_field_selector = "spec.nodeName=$HOSTNAME"

# cache refresh interval to set the interval for re-sync of pods list.
# Default is 60 minutes.
# cache_refresh_interval = 60

## Scrape Services available in Consul Catalog
# [inputs.prometheus.consul]
# enabled = true
Expand Down
146 changes: 105 additions & 41 deletions plugins/inputs/prometheus/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)

type podMetadata struct {
Expand Down Expand Up @@ -89,10 +91,7 @@ func (p *Prometheus) startK8s(ctx context.Context) error {
p.Log.Errorf("Unable to monitor pods with node scrape scope: %s", err.Error())
}
} else {
err = p.watchPod(ctx, client)
if err != nil {
p.Log.Errorf("Unable to watch resources: %s", err.Error())
}
p.watchPod(ctx, client)
}
}
}
Expand All @@ -105,48 +104,114 @@ func (p *Prometheus) startK8s(ctx context.Context) error {
// (without the scrape annotations). K8s may re-assign the old pod ip to the non-scrape
// pod, causing errors in the logs. This is only true if the pod going offline is not
// directed to do so by K8s.
func (p *Prometheus) watchPod(ctx context.Context, client *kubernetes.Clientset) error {
watcher, err := client.CoreV1().Pods(p.PodNamespace).Watch(ctx, metav1.ListOptions{
LabelSelector: p.KubernetesLabelSelector,
FieldSelector: p.KubernetesFieldSelector,
})
if err != nil {
return err
func (p *Prometheus) watchPod(ctx context.Context, clientset *kubernetes.Clientset) {
var resyncinterval time.Duration

if p.CacheRefreshInterval != 0 {
resyncinterval = time.Duration(p.CacheRefreshInterval) * time.Minute
} else {
resyncinterval = 60 * time.Minute
}
defer watcher.Stop()

for {
select {
case <-ctx.Done():
return nil
default:
for event := range watcher.ResultChan() {
pod, ok := event.Object.(*corev1.Pod)
if !ok {
return fmt.Errorf("Unexpected object when getting pods")
informerfactory := informers.NewSharedInformerFactory(clientset, resyncinterval)

podinformer := informerfactory.Core().V1().Pods()
podinformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
p.Log.Errorf("getting key from cache %s\n", err.Error())
}

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
p.Log.Errorf("splitting key into namespace and name %s\n", err.Error())
}

pod, _ := clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})

if pod.Annotations["prometheus.io/scrape"] == "true" &&
podReady(pod.Status.ContainerStatuses) &&
podHasMatchingNamespace(pod, p) &&
podHasMatchingLabelSelector(pod, p.podLabelSelector) &&
podHasMatchingFieldSelector(pod, p.podFieldSelector) {
registerPod(pod, p)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
newKey, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
p.Log.Errorf("getting key from cache %s\n", err.Error())
}

newNamespace, newName, err := cache.SplitMetaNamespaceKey(newKey)
if err != nil {
p.Log.Errorf("splitting key into namespace and name %s\n", err.Error())
}

newPod, _ := clientset.CoreV1().Pods(newNamespace).Get(ctx, newName, metav1.GetOptions{})

if newPod.Annotations["prometheus.io/scrape"] == "true" &&
podReady(newPod.Status.ContainerStatuses) &&
podHasMatchingNamespace(newPod, p) &&
podHasMatchingLabelSelector(newPod, p.podLabelSelector) &&
podHasMatchingFieldSelector(newPod, p.podFieldSelector) {
if newPod.GetDeletionTimestamp() == nil {
registerPod(newPod, p)
}
}

oldKey, err := cache.MetaNamespaceKeyFunc(oldObj)
if err != nil {
p.Log.Errorf("getting key from cache %s\n", err.Error())
}

oldNamespace, oldName, err := cache.SplitMetaNamespaceKey(oldKey)
if err != nil {
p.Log.Errorf("splitting key into namespace and name %s\n", err.Error())
}

oldPod, _ := clientset.CoreV1().Pods(oldNamespace).Get(ctx, oldName, metav1.GetOptions{})

// If the pod is not "ready", there will be no ip associated with it.
if pod.Annotations["prometheus.io/scrape"] != "true" ||
!podReady(pod.Status.ContainerStatuses) {
continue
if oldPod.Annotations["prometheus.io/scrape"] == "true" &&
podReady(oldPod.Status.ContainerStatuses) &&
podHasMatchingNamespace(oldPod, p) &&
podHasMatchingLabelSelector(oldPod, p.podLabelSelector) &&
podHasMatchingFieldSelector(oldPod, p.podFieldSelector) {
if oldPod.GetDeletionTimestamp() != nil {
unregisterPod(oldPod, p)
}
}
},
DeleteFunc: func(oldObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(oldObj)
if err != nil {
p.Log.Errorf("getting key from cache %s", err.Error())
}

switch event.Type {
case watch.Added:
registerPod(pod, p)
case watch.Modified:
// To avoid multiple actions for each event, unregister on the first event
// in the delete sequence, when the containers are still "ready".
if pod.GetDeletionTimestamp() != nil {
unregisterPod(pod, p)
} else {
registerPod(pod, p)
}
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
p.Log.Errorf("splitting key into namespace and name %s\n", err.Error())
}

pod, _ := clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})

if pod.Annotations["prometheus.io/scrape"] == "true" &&
podReady(pod.Status.ContainerStatuses) &&
podHasMatchingNamespace(pod, p) &&
podHasMatchingLabelSelector(pod, p.podLabelSelector) &&
podHasMatchingFieldSelector(pod, p.podFieldSelector) {
if pod.GetDeletionTimestamp() != nil {
unregisterPod(pod, p)
}
}
}
}
},
})

informerfactory.Start(wait.NeverStop)
informerfactory.WaitForCacheSync(wait.NeverStop)

<-ctx.Done()
}

func (p *Prometheus) cAdvisor(ctx context.Context, bearerToken string) error {
Expand Down Expand Up @@ -372,11 +437,10 @@ func unregisterPod(pod *corev1.Pod, p *Prometheus) {
return
}

p.Log.Debugf("registered a delete request for %q in namespace %q", pod.Name, pod.Namespace)

p.lock.Lock()
defer p.lock.Unlock()
if _, ok := p.kubernetesPods[targetURL.String()]; ok {
p.Log.Debugf("registered a delete request for %q in namespace %q", pod.Name, pod.Namespace)
delete(p.kubernetesPods, targetURL.String())
p.Log.Debugf("will stop scraping for %q", targetURL.String())
}
Expand Down
7 changes: 7 additions & 0 deletions plugins/inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ type Prometheus struct {
podFieldSelector fields.Selector
isNodeScrapeScope bool

// Only for monitor_kubernetes_pods=true
CacheRefreshInterval int `toml:"cache_refresh_interval"`

// List of consul services to scrape
consulServices map[string]URLAndAddress
}
Expand Down Expand Up @@ -140,6 +143,10 @@ var sampleConfig = `
# eg. To scrape pods on a specific node
# kubernetes_field_selector = "spec.nodeName=$HOSTNAME"
# cache refresh interval to set the interval for re-sync of pods list.
# Default is 60 minutes.
# cache_refresh_interval = 60
## Scrape Services available in Consul Catalog
# [inputs.prometheus.consul]
# enabled = true
Expand Down

0 comments on commit ca76637

Please sign in to comment.