Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(inputs.prometheus): moved from watcher to informer #10932

Merged
merged 6 commits into from
Apr 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions plugins/inputs/prometheus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ in Prometheus format.
# eg. To scrape pods on a specific node
# kubernetes_field_selector = "spec.nodeName=$HOSTNAME"

# cache refresh interval to set the interval for re-sync of pods list.
# Default is 60 minutes.
# cache_refresh_interval = 60

## Scrape Services available in Consul Catalog
# [inputs.prometheus.consul]
# enabled = true
Expand Down
146 changes: 105 additions & 41 deletions plugins/inputs/prometheus/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)

type podMetadata struct {
Expand Down Expand Up @@ -89,10 +91,7 @@ func (p *Prometheus) startK8s(ctx context.Context) error {
p.Log.Errorf("Unable to monitor pods with node scrape scope: %s", err.Error())
}
} else {
err = p.watchPod(ctx, client)
if err != nil {
p.Log.Errorf("Unable to watch resources: %s", err.Error())
}
p.watchPod(ctx, client)
}
}
}
Expand All @@ -105,48 +104,114 @@ func (p *Prometheus) startK8s(ctx context.Context) error {
// (without the scrape annotations). K8s may re-assign the old pod ip to the non-scrape
// pod, causing errors in the logs. This is only true if the pod going offline is not
// directed to do so by K8s.
func (p *Prometheus) watchPod(ctx context.Context, client *kubernetes.Clientset) error {
watcher, err := client.CoreV1().Pods(p.PodNamespace).Watch(ctx, metav1.ListOptions{
LabelSelector: p.KubernetesLabelSelector,
FieldSelector: p.KubernetesFieldSelector,
})
if err != nil {
return err
func (p *Prometheus) watchPod(ctx context.Context, clientset *kubernetes.Clientset) {
var resyncinterval time.Duration

if p.CacheRefreshInterval != 0 {
resyncinterval = time.Duration(p.CacheRefreshInterval) * time.Minute
} else {
resyncinterval = 60 * time.Minute
}
defer watcher.Stop()

for {
select {
case <-ctx.Done():
return nil
default:
for event := range watcher.ResultChan() {
pod, ok := event.Object.(*corev1.Pod)
if !ok {
return fmt.Errorf("Unexpected object when getting pods")
informerfactory := informers.NewSharedInformerFactory(clientset, resyncinterval)

podinformer := informerfactory.Core().V1().Pods()
podinformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
p.Log.Errorf("getting key from cache %s\n", err.Error())
}

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
p.Log.Errorf("splitting key into namespace and name %s\n", err.Error())
}

pod, _ := clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})

if pod.Annotations["prometheus.io/scrape"] == "true" &&
podReady(pod.Status.ContainerStatuses) &&
podHasMatchingNamespace(pod, p) &&
podHasMatchingLabelSelector(pod, p.podLabelSelector) &&
podHasMatchingFieldSelector(pod, p.podFieldSelector) {
registerPod(pod, p)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
newKey, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
p.Log.Errorf("getting key from cache %s\n", err.Error())
}

newNamespace, newName, err := cache.SplitMetaNamespaceKey(newKey)
if err != nil {
p.Log.Errorf("splitting key into namespace and name %s\n", err.Error())
}

newPod, _ := clientset.CoreV1().Pods(newNamespace).Get(ctx, newName, metav1.GetOptions{})

if newPod.Annotations["prometheus.io/scrape"] == "true" &&
podReady(newPod.Status.ContainerStatuses) &&
podHasMatchingNamespace(newPod, p) &&
podHasMatchingLabelSelector(newPod, p.podLabelSelector) &&
podHasMatchingFieldSelector(newPod, p.podFieldSelector) {
if newPod.GetDeletionTimestamp() == nil {
registerPod(newPod, p)
}
}

oldKey, err := cache.MetaNamespaceKeyFunc(oldObj)
if err != nil {
p.Log.Errorf("getting key from cache %s\n", err.Error())
}

oldNamespace, oldName, err := cache.SplitMetaNamespaceKey(oldKey)
if err != nil {
p.Log.Errorf("splitting key into namespace and name %s\n", err.Error())
}

oldPod, _ := clientset.CoreV1().Pods(oldNamespace).Get(ctx, oldName, metav1.GetOptions{})

// If the pod is not "ready", there will be no ip associated with it.
if pod.Annotations["prometheus.io/scrape"] != "true" ||
!podReady(pod.Status.ContainerStatuses) {
continue
if oldPod.Annotations["prometheus.io/scrape"] == "true" &&
podReady(oldPod.Status.ContainerStatuses) &&
podHasMatchingNamespace(oldPod, p) &&
podHasMatchingLabelSelector(oldPod, p.podLabelSelector) &&
podHasMatchingFieldSelector(oldPod, p.podFieldSelector) {
if oldPod.GetDeletionTimestamp() != nil {
unregisterPod(oldPod, p)
}
}
},
DeleteFunc: func(oldObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(oldObj)
if err != nil {
p.Log.Errorf("getting key from cache %s", err.Error())
}

switch event.Type {
case watch.Added:
registerPod(pod, p)
case watch.Modified:
// To avoid multiple actions for each event, unregister on the first event
// in the delete sequence, when the containers are still "ready".
if pod.GetDeletionTimestamp() != nil {
unregisterPod(pod, p)
} else {
registerPod(pod, p)
}
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
p.Log.Errorf("splitting key into namespace and name %s\n", err.Error())
}

pod, _ := clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})

if pod.Annotations["prometheus.io/scrape"] == "true" &&
podReady(pod.Status.ContainerStatuses) &&
podHasMatchingNamespace(pod, p) &&
podHasMatchingLabelSelector(pod, p.podLabelSelector) &&
podHasMatchingFieldSelector(pod, p.podFieldSelector) {
if pod.GetDeletionTimestamp() != nil {
unregisterPod(pod, p)
}
}
}
}
},
})

informerfactory.Start(wait.NeverStop)
informerfactory.WaitForCacheSync(wait.NeverStop)

<-ctx.Done()
}

func (p *Prometheus) cAdvisor(ctx context.Context, bearerToken string) error {
Expand Down Expand Up @@ -372,11 +437,10 @@ func unregisterPod(pod *corev1.Pod, p *Prometheus) {
return
}

p.Log.Debugf("registered a delete request for %q in namespace %q", pod.Name, pod.Namespace)

p.lock.Lock()
defer p.lock.Unlock()
if _, ok := p.kubernetesPods[targetURL.String()]; ok {
p.Log.Debugf("registered a delete request for %q in namespace %q", pod.Name, pod.Namespace)
delete(p.kubernetesPods, targetURL.String())
p.Log.Debugf("will stop scraping for %q", targetURL.String())
}
Expand Down
7 changes: 7 additions & 0 deletions plugins/inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ type Prometheus struct {
podFieldSelector fields.Selector
isNodeScrapeScope bool

// Only for monitor_kubernetes_pods=true
CacheRefreshInterval int `toml:"cache_refresh_interval"`

// List of consul services to scrape
consulServices map[string]URLAndAddress
}
Expand Down Expand Up @@ -140,6 +143,10 @@ var sampleConfig = `
# eg. To scrape pods on a specific node
# kubernetes_field_selector = "spec.nodeName=$HOSTNAME"

# cache refresh interval to set the interval for re-sync of pods list.
# Default is 60 minutes.
# cache_refresh_interval = 60

## Scrape Services available in Consul Catalog
# [inputs.prometheus.consul]
# enabled = true
Expand Down