diff --git a/plugins/inputs/prometheus/README.md b/plugins/inputs/prometheus/README.md index 74f49a2649c83..074994620cf12 100644 --- a/plugins/inputs/prometheus/README.md +++ b/plugins/inputs/prometheus/README.md @@ -63,6 +63,10 @@ in Prometheus format. # eg. To scrape pods on a specific node # kubernetes_field_selector = "spec.nodeName=$HOSTNAME" + # cache refresh interval to set the interval for re-sync of pods list. + # Default is 60 minutes. + # cache_refresh_interval = 60 + ## Scrape Services available in Consul Catalog # [inputs.prometheus.consul] # enabled = true diff --git a/plugins/inputs/prometheus/kubernetes.go b/plugins/inputs/prometheus/kubernetes.go index e3217e697d914..f7a78dd8c0497 100644 --- a/plugins/inputs/prometheus/kubernetes.go +++ b/plugins/inputs/prometheus/kubernetes.go @@ -18,9 +18,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/watch" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" ) type podMetadata struct { @@ -89,10 +91,7 @@ func (p *Prometheus) startK8s(ctx context.Context) error { p.Log.Errorf("Unable to monitor pods with node scrape scope: %s", err.Error()) } } else { - err = p.watchPod(ctx, client) - if err != nil { - p.Log.Errorf("Unable to watch resources: %s", err.Error()) - } + p.watchPod(ctx, client) } } } @@ -105,48 +104,114 @@ func (p *Prometheus) startK8s(ctx context.Context) error { // (without the scrape annotations). K8s may re-assign the old pod ip to the non-scrape // pod, causing errors in the logs. This is only true if the pod going offline is not // directed to do so by K8s. -func (p *Prometheus) watchPod(ctx context.Context, client *kubernetes.Clientset) error { - watcher, err := client.CoreV1().Pods(p.PodNamespace).Watch(ctx, metav1.ListOptions{ - LabelSelector: p.KubernetesLabelSelector, - FieldSelector: p.KubernetesFieldSelector, - }) - if err != nil { - return err +func (p *Prometheus) watchPod(ctx context.Context, clientset *kubernetes.Clientset) { + var resyncinterval time.Duration + + if p.CacheRefreshInterval != 0 { + resyncinterval = time.Duration(p.CacheRefreshInterval) * time.Minute + } else { + resyncinterval = 60 * time.Minute } - defer watcher.Stop() - for { - select { - case <-ctx.Done(): - return nil - default: - for event := range watcher.ResultChan() { - pod, ok := event.Object.(*corev1.Pod) - if !ok { - return fmt.Errorf("Unexpected object when getting pods") + informerfactory := informers.NewSharedInformerFactory(clientset, resyncinterval) + + podinformer := informerfactory.Core().V1().Pods() + podinformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(newObj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(newObj) + if err != nil { + p.Log.Errorf("getting key from cache %s\n", err.Error()) + } + + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + p.Log.Errorf("splitting key into namespace and name %s\n", err.Error()) + } + + pod, _ := clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{}) + + if pod.Annotations["prometheus.io/scrape"] == "true" && + podReady(pod.Status.ContainerStatuses) && + podHasMatchingNamespace(pod, p) && + podHasMatchingLabelSelector(pod, p.podLabelSelector) && + podHasMatchingFieldSelector(pod, p.podFieldSelector) { + registerPod(pod, p) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + newKey, err := cache.MetaNamespaceKeyFunc(newObj) + if err != nil { + p.Log.Errorf("getting key from cache %s\n", err.Error()) + } + + newNamespace, newName, err := cache.SplitMetaNamespaceKey(newKey) + if err != nil { + p.Log.Errorf("splitting key into namespace and name %s\n", err.Error()) + } + + newPod, _ := clientset.CoreV1().Pods(newNamespace).Get(ctx, newName, metav1.GetOptions{}) + + if newPod.Annotations["prometheus.io/scrape"] == "true" && + podReady(newPod.Status.ContainerStatuses) && + podHasMatchingNamespace(newPod, p) && + podHasMatchingLabelSelector(newPod, p.podLabelSelector) && + podHasMatchingFieldSelector(newPod, p.podFieldSelector) { + if newPod.GetDeletionTimestamp() == nil { + registerPod(newPod, p) } + } + + oldKey, err := cache.MetaNamespaceKeyFunc(oldObj) + if err != nil { + p.Log.Errorf("getting key from cache %s\n", err.Error()) + } + + oldNamespace, oldName, err := cache.SplitMetaNamespaceKey(oldKey) + if err != nil { + p.Log.Errorf("splitting key into namespace and name %s\n", err.Error()) + } + + oldPod, _ := clientset.CoreV1().Pods(oldNamespace).Get(ctx, oldName, metav1.GetOptions{}) - // If the pod is not "ready", there will be no ip associated with it. - if pod.Annotations["prometheus.io/scrape"] != "true" || - !podReady(pod.Status.ContainerStatuses) { - continue + if oldPod.Annotations["prometheus.io/scrape"] == "true" && + podReady(oldPod.Status.ContainerStatuses) && + podHasMatchingNamespace(oldPod, p) && + podHasMatchingLabelSelector(oldPod, p.podLabelSelector) && + podHasMatchingFieldSelector(oldPod, p.podFieldSelector) { + if oldPod.GetDeletionTimestamp() != nil { + unregisterPod(oldPod, p) } + } + }, + DeleteFunc: func(oldObj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(oldObj) + if err != nil { + p.Log.Errorf("getting key from cache %s", err.Error()) + } - switch event.Type { - case watch.Added: - registerPod(pod, p) - case watch.Modified: - // To avoid multiple actions for each event, unregister on the first event - // in the delete sequence, when the containers are still "ready". - if pod.GetDeletionTimestamp() != nil { - unregisterPod(pod, p) - } else { - registerPod(pod, p) - } + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + p.Log.Errorf("splitting key into namespace and name %s\n", err.Error()) + } + + pod, _ := clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{}) + + if pod.Annotations["prometheus.io/scrape"] == "true" && + podReady(pod.Status.ContainerStatuses) && + podHasMatchingNamespace(pod, p) && + podHasMatchingLabelSelector(pod, p.podLabelSelector) && + podHasMatchingFieldSelector(pod, p.podFieldSelector) { + if pod.GetDeletionTimestamp() != nil { + unregisterPod(pod, p) } } - } - } + }, + }) + + informerfactory.Start(wait.NeverStop) + informerfactory.WaitForCacheSync(wait.NeverStop) + + <-ctx.Done() } func (p *Prometheus) cAdvisor(ctx context.Context, bearerToken string) error { @@ -372,11 +437,10 @@ func unregisterPod(pod *corev1.Pod, p *Prometheus) { return } - p.Log.Debugf("registered a delete request for %q in namespace %q", pod.Name, pod.Namespace) - p.lock.Lock() defer p.lock.Unlock() if _, ok := p.kubernetesPods[targetURL.String()]; ok { + p.Log.Debugf("registered a delete request for %q in namespace %q", pod.Name, pod.Namespace) delete(p.kubernetesPods, targetURL.String()) p.Log.Debugf("will stop scraping for %q", targetURL.String()) } diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index 2f8e17f196b32..de14449ab4462 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -84,6 +84,9 @@ type Prometheus struct { podFieldSelector fields.Selector isNodeScrapeScope bool + // Only for monitor_kubernetes_pods=true + CacheRefreshInterval int `toml:"cache_refresh_interval"` + // List of consul services to scrape consulServices map[string]URLAndAddress } @@ -140,6 +143,10 @@ var sampleConfig = ` # eg. To scrape pods on a specific node # kubernetes_field_selector = "spec.nodeName=$HOSTNAME" + # cache refresh interval to set the interval for re-sync of pods list. + # Default is 60 minutes. + # cache_refresh_interval = 60 + ## Scrape Services available in Consul Catalog # [inputs.prometheus.consul] # enabled = true