From 57a9b926dd0766899ff74502cc9d5d32a0898b02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Fri, 20 Sep 2024 14:52:00 +0200 Subject: [PATCH 1/2] Reduce the amount of stored ReplicaSet data Use a transform function to drop all data except for the owner references, which we need to find the Deployment name. --- .../composable/providers/kubernetes/pod.go | 23 +- .../providers/kubernetes/watcher.go | 285 ++++++++++++++++++ 2 files changed, 306 insertions(+), 2 deletions(-) create mode 100644 internal/pkg/composable/providers/kubernetes/watcher.go diff --git a/internal/pkg/composable/providers/kubernetes/pod.go b/internal/pkg/composable/providers/kubernetes/pod.go index 996b35ee9c4..37311845a7d 100644 --- a/internal/pkg/composable/providers/kubernetes/pod.go +++ b/internal/pkg/composable/providers/kubernetes/pod.go @@ -9,6 +9,8 @@ import ( "sync" "time" + v1 "k8s.io/api/apps/v1" + "github.com/elastic/elastic-agent-autodiscover/kubernetes" "github.com/elastic/elastic-agent-autodiscover/kubernetes/metadata" "github.com/elastic/elastic-agent-autodiscover/utils" @@ -104,11 +106,12 @@ func NewPodEventer( // Deployment -> Replicaset -> Pod // CronJob -> job -> Pod if metaConf.Deployment { - replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{ + // use a custom watcher here, so we can provide a transform function and limit the data we're storing + replicaSetWatcher, err = NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{ SyncTimeout: cfg.SyncPeriod, Namespace: cfg.Namespace, HonorReSyncs: true, - }, nil) + }, nil, removeUnnecessaryReplicaSetData) if err != nil { logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Namespace{}, err) } @@ -539,3 +542,19 @@ func hintsCheck(annotations mapstr.M, container string, prefix string, validate } return hints, incorrecthints } + +// removeUnnecessaryReplicaSetData removes all data from a ReplicaSet resource, except what we need to compute +// Pod metadata. Which is just the name and owner references. +func removeUnnecessaryReplicaSetData(obj interface{}) (interface{}, error) { + old, ok := obj.(*v1.ReplicaSet) + if !ok { + return nil, fmt.Errorf("obj is not a ReplicaSet") + } + transformed := v1.ReplicaSet{} + transformed.ObjectMeta = kubernetes.ObjectMeta{ + Name: old.GetName(), + Namespace: old.GetNamespace(), + OwnerReferences: old.GetOwnerReferences(), + } + return transformed, nil +} diff --git a/internal/pkg/composable/providers/kubernetes/watcher.go b/internal/pkg/composable/providers/kubernetes/watcher.go new file mode 100644 index 00000000000..eda0af70705 --- /dev/null +++ b/internal/pkg/composable/providers/kubernetes/watcher.go @@ -0,0 +1,285 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package kubernetes + +import ( + "context" + "fmt" + "time" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + + autodiscoverK8s "github.com/elastic/elastic-agent-autodiscover/kubernetes" + "github.com/elastic/elastic-agent-libs/logp" +) + +const ( + add = "add" + update = "update" + delete = "delete" +) + +var ( + accessor = meta.NewAccessor() +) + +// some type aliases to avoid unnecessarily long declarations +type ( + ResourceEventHandler = autodiscoverK8s.ResourceEventHandler + Resource = autodiscoverK8s.Resource + Namespace = autodiscoverK8s.Namespace + Node = autodiscoverK8s.Node + WatchOptions = autodiscoverK8s.WatchOptions + Watcher = autodiscoverK8s.Watcher +) + +type item struct { + object interface{} + objectRaw interface{} + state string +} + +type watcher struct { + client kubernetes.Interface + informer cache.SharedInformer + store cache.Store + queue workqueue.Interface //nolint:staticcheck // TODO: use the typed version + ctx context.Context + stop context.CancelFunc + handler ResourceEventHandler + logger *logp.Logger + cachedObject runtime.Object +} + +// NOTE: This watcher implementation is identical to the one in autodiscovery, with the single difference +// that it allows setting a transform function on the informer. +// This is necessary to avoid storing a lot of unnecessary ReplicaSet data. + +// NewWatcher initializes the watcher client to provide a events handler for +// resource from the cluster (filtered to the given node) +func NewWatcher( + client kubernetes.Interface, + resource Resource, + opts WatchOptions, + indexers cache.Indexers, + transformFunc cache.TransformFunc, +) (Watcher, error) { + return NewNamedWatcher("", client, resource, opts, indexers, transformFunc) +} + +// NewNamedWatcher initializes the watcher client to provide an events handler for +// resource from the cluster (filtered to the given node) and also allows to name the k8s +// client's workqueue that is used by the watcher. Workqueue name is important for exposing workqueue +// metrics, if it is empty, its metrics will not be logged by the k8s client. +func NewNamedWatcher( + name string, + client kubernetes.Interface, + resource Resource, + opts WatchOptions, + indexers cache.Indexers, + transformFunc cache.TransformFunc, +) (Watcher, error) { + var store cache.Store + var queue workqueue.Interface //nolint:staticcheck // TODO: use the typed version + var cachedObject runtime.Object + informer, _, err := autodiscoverK8s.NewInformer(client, resource, opts, indexers) + if err != nil { + return nil, err + } + + store = informer.GetStore() + queue = workqueue.NewNamed(name) + + if opts.IsUpdated == nil { + opts.IsUpdated = func(o, n interface{}) bool { + oldVersion, _ := accessor.ResourceVersion(o.(runtime.Object)) + newVersion, _ := accessor.ResourceVersion(n.(runtime.Object)) + + // Only enqueue changes that have a different resource versions to avoid processing resyncs. + return oldVersion != newVersion + } + } + + ctx, cancel := context.WithCancel(context.TODO()) + w := &watcher{ + client: client, + informer: informer, + store: store, + queue: queue, + ctx: ctx, + cachedObject: cachedObject, + stop: cancel, + logger: logp.NewLogger("kubernetes"), + handler: autodiscoverK8s.NoOpEventHandlerFuncs{}, + } + + _, err = w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(o interface{}) { + w.enqueue(o, add) + }, + DeleteFunc: func(o interface{}) { + w.enqueue(o, delete) + }, + UpdateFunc: func(o, n interface{}) { + if opts.IsUpdated(o, n) { + w.enqueue(n, update) + } else if opts.HonorReSyncs { + // HonorReSyncs ensure that at the time when the kubernetes client does a "resync", i.e, a full list of all + // objects we make sure that autodiscover processes them. Why is this necessary? An effective control loop works + // based on two state changes, a list and a watch. A watch is triggered each time the state of the system changes. + // However, there is no guarantee that all events from a watch are processed by the receiver. To ensure that missed events + // are properly handled, a period re-list is done to ensure that every state within the system is effectively handled. + // In this case, we are making sure that we are enqueueing an "add" event because, an runner that is already in Running + // state should just be deduped by autodiscover and not stop/started periodically as would be the case with an update. + w.enqueue(n, add) + } + + //We check the type of resource and only if it is namespace or node return the cacheObject + switch resource.(type) { + case *Namespace: + w.cacheObject(o) + case *Node: + w.cacheObject(o) + } + }, + }) + if err != nil { + return nil, err + } + + if transformFunc != nil { + err = informer.SetTransform(transformFunc) + if err != nil { + return nil, err + } + } + + return w, nil +} + +// AddEventHandler adds a resource handler to process each request that is coming into the watcher +func (w *watcher) AddEventHandler(h ResourceEventHandler) { + w.handler = h +} + +// GetEventHandler returns the watcher's event handler +func (w *watcher) GetEventHandler() ResourceEventHandler { + return w.handler +} + +// Store returns the store object for the resource that is being watched +func (w *watcher) Store() cache.Store { + return w.store +} + +// Client returns the kubernetes client object used by the watcher +func (w *watcher) Client() kubernetes.Interface { + return w.client +} + +// CachedObject returns the old object in cache during the last updated event +func (w *watcher) CachedObject() runtime.Object { + return w.cachedObject +} + +// Start watching pods +func (w *watcher) Start() error { + go w.informer.Run(w.ctx.Done()) + + if !cache.WaitForCacheSync(w.ctx.Done(), w.informer.HasSynced) { + return fmt.Errorf("kubernetes informer unable to sync cache") + } + + w.logger.Debugf("cache sync done") + + // Wrap the process function with wait.Until so that if the controller crashes, it starts up again after a second. + go wait.Until(func() { + for w.process(w.ctx) { + } + }, time.Second*1, w.ctx.Done()) + + return nil +} + +func (w *watcher) Stop() { + w.queue.ShutDown() + w.stop() +} + +// enqueue takes the most recent object that was received, figures out the namespace/name of the object +// and adds it to the work queue for processing. +func (w *watcher) enqueue(obj interface{}, state string) { + // DeletionHandlingMetaNamespaceKeyFunc that we get a key only if the resource's state is not Unknown. + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + return + } + if deleted, ok := obj.(cache.DeletedFinalStateUnknown); ok { + w.logger.Debugf("Enqueued DeletedFinalStateUnknown contained object: %+v", deleted.Obj) + obj = deleted.Obj + } + w.queue.Add(&item{key, obj, state}) +} + +// cacheObject updates watcher with the old version of cache objects before change during update events +func (w *watcher) cacheObject(o interface{}) { + if old, ok := o.(runtime.Object); !ok { + utilruntime.HandleError(fmt.Errorf("expected object in cache got %#v", o)) + } else { + w.cachedObject = old + } +} + +// process gets the top of the work queue and processes the object that is received. +func (w *watcher) process(_ context.Context) bool { + obj, quit := w.queue.Get() + if quit { + return false + } + defer w.queue.Done(obj) + + var entry *item + var ok bool + if entry, ok = obj.(*item); !ok { + utilruntime.HandleError(fmt.Errorf("expected *item in workqueue but got %#v", obj)) + return true + } + + key, ok := entry.object.(string) + if !ok { + return false + } + + o, exists, err := w.store.GetByKey(key) + if err != nil { + utilruntime.HandleError(fmt.Errorf("getting object %#v from cache: %w", obj, err)) + return true + } + if !exists { + if entry.state == delete { + w.logger.Debugf("Object %+v was not found in the store, deleting anyway!", key) + // delete anyway in order to clean states + w.handler.OnDelete(entry.objectRaw) + } + return true + } + + switch entry.state { + case add: + w.handler.OnAdd(o) + case update: + w.handler.OnUpdate(o) + case delete: + w.handler.OnDelete(o) + } + + return true +} From 6412ce3e1b61d039568c6b4c9d66b65357f8a400 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Tue, 24 Sep 2024 14:58:41 +0200 Subject: [PATCH 2/2] Use metadata informer for ReplicaSets --- .../providers/kubernetes/metagen.go | 123 ++++++++++++++++++ .../composable/providers/kubernetes/pod.go | 62 +++++++-- .../providers/kubernetes/watcher.go | 64 +++++---- 3 files changed, 207 insertions(+), 42 deletions(-) create mode 100644 internal/pkg/composable/providers/kubernetes/metagen.go diff --git a/internal/pkg/composable/providers/kubernetes/metagen.go b/internal/pkg/composable/providers/kubernetes/metagen.go new file mode 100644 index 00000000000..c99c2bc65a5 --- /dev/null +++ b/internal/pkg/composable/providers/kubernetes/metagen.go @@ -0,0 +1,123 @@ +package kubernetes + +import ( + "github.com/elastic/elastic-agent-autodiscover/kubernetes" + "github.com/elastic/elastic-agent-autodiscover/kubernetes/metadata" + "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/mapstr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8s "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +type ( + AddResourceMetadataConfig = metadata.AddResourceMetadataConfig + MetaGen = metadata.MetaGen + FieldOptions = metadata.FieldOptions +) + +// GetPodMetaGen is a wrapper function that creates a metaGen for pod resource and has embeeded +// nodeMetaGen and namespaceMetaGen +func GetPodMetaGen( + cfg *config.C, + podWatcher kubernetes.Watcher, + nodeWatcher kubernetes.Watcher, + namespaceWatcher kubernetes.Watcher, + replicasetWatcher kubernetes.Watcher, + jobWatcher kubernetes.Watcher, + metaConf *AddResourceMetadataConfig) MetaGen { + + var nodeMetaGen, namespaceMetaGen, rsMetaGen, jobMetaGen MetaGen + if nodeWatcher != nil && metaConf.Node.Enabled() { + nodeMetaGen = metadata.NewNodeMetadataGenerator(metaConf.Node, nodeWatcher.Store(), nodeWatcher.Client()) + } + if namespaceWatcher != nil && metaConf.Namespace.Enabled() { + namespaceMetaGen = metadata.NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store(), namespaceWatcher.Client()) + } + if replicasetWatcher != nil && metaConf.Deployment { + // use our own implementation of this generator, which can avoid tracking the full ReplicaSet resource + // TODO: Remove this after upstreaming the change to the autodiscovery lib + rsMetaGen = NewReplicasetMetadataGenerator(cfg, replicasetWatcher.Store(), replicasetWatcher.Client()) + } + if jobWatcher != nil && metaConf.CronJob { + jobMetaGen = metadata.NewJobMetadataGenerator(cfg, jobWatcher.Store(), jobWatcher.Client()) + } + metaGen := metadata.NewPodMetadataGenerator( + cfg, + podWatcher.Store(), + podWatcher.Client(), + nodeMetaGen, + namespaceMetaGen, + rsMetaGen, + jobMetaGen, + metaConf) + return metaGen +} + +const resourceType = "replicaset" + +type replicaset struct { + store cache.Store + resource *metadata.Resource +} + +// NewReplicasetMetadataGenerator creates a metagen for replicaset resources +func NewReplicasetMetadataGenerator(cfg *config.C, replicasets cache.Store, client k8s.Interface) MetaGen { + return &replicaset{ + resource: metadata.NewResourceMetadataGenerator(cfg, client), + store: replicasets, + } +} + +// Generate generates replicaset metadata from a resource object +// Metadata map is in the following form: +// +// { +// "kubernetes": {}, +// "some.ecs.field": "asdf" +// } +// +// All Kubernetes fields that need to be stored under kuberentes. prefix are populetad by +// GenerateK8s method while fields that are part of ECS are generated by GenerateECS method +func (rs *replicaset) Generate(obj kubernetes.Resource, opts ...FieldOptions) mapstr.M { + ecsFields := rs.GenerateECS(obj) + meta := mapstr.M{ + "kubernetes": rs.GenerateK8s(obj, opts...), + } + meta.DeepUpdate(ecsFields) + return meta +} + +// GenerateECS generates replicaset ECS metadata from a resource object +func (rs *replicaset) GenerateECS(obj kubernetes.Resource) mapstr.M { + return rs.resource.GenerateECS(obj) +} + +// GenerateK8s generates replicaset metadata from a resource object +func (rs *replicaset) GenerateK8s(obj kubernetes.Resource, opts ...FieldOptions) mapstr.M { + _, ok := obj.(metav1.Object) // one of the changes from upstream autodiscovery + if !ok { + return nil + } + + meta := rs.resource.GenerateK8s(resourceType, obj, opts...) + return meta +} + +// GenerateFromName generates replicaset metadata from a replicaset name +func (rs *replicaset) GenerateFromName(name string, opts ...FieldOptions) mapstr.M { + if rs.store == nil { + return nil + } + + if obj, ok, _ := rs.store.GetByKey(name); ok { + replicaSet, ok := obj.(kubernetes.Resource) // one of the changes from upstream autodiscovery + if !ok { + return nil + } + + return rs.GenerateK8s(replicaSet, opts...) + } + + return nil +} diff --git a/internal/pkg/composable/providers/kubernetes/pod.go b/internal/pkg/composable/providers/kubernetes/pod.go index 37311845a7d..6562bc897d3 100644 --- a/internal/pkg/composable/providers/kubernetes/pod.go +++ b/internal/pkg/composable/providers/kubernetes/pod.go @@ -6,11 +6,11 @@ package kubernetes import ( "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "sync" "time" - v1 "k8s.io/api/apps/v1" - "github.com/elastic/elastic-agent-autodiscover/kubernetes" "github.com/elastic/elastic-agent-autodiscover/kubernetes/metadata" "github.com/elastic/elastic-agent-autodiscover/utils" @@ -20,6 +20,7 @@ import ( "github.com/elastic/elastic-agent-libs/safemapstr" k8s "k8s.io/client-go/kubernetes" + clientgometa "k8s.io/client-go/metadata" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/composable" @@ -106,12 +107,21 @@ func NewPodEventer( // Deployment -> Replicaset -> Pod // CronJob -> job -> Pod if metaConf.Deployment { + metadataClient, err := GetKubernetesMetadataClient(cfg.KubeConfig, cfg.KubeClientOptions) + if err != nil { + logger.Errorf("Error creating metadata client for %T due to error %+v", &kubernetes.Namespace{}, err) + } // use a custom watcher here, so we can provide a transform function and limit the data we're storing - replicaSetWatcher, err = NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{ - SyncTimeout: cfg.SyncPeriod, - Namespace: cfg.Namespace, - HonorReSyncs: true, - }, nil, removeUnnecessaryReplicaSetData) + replicaSetWatcher, err = NewNamedMetaWatcher( + "resource_metadata_enricher_rs", + client, + metadataClient, + schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}, + kubernetes.WatchOptions{ + SyncTimeout: cfg.SyncPeriod, + Namespace: cfg.Namespace, + HonorReSyncs: true, + }, nil, removeUnnecessaryReplicaSetData) if err != nil { logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.Namespace{}, err) } @@ -131,7 +141,7 @@ func NewPodEventer( if err != nil { return nil, errors.New(err, "failed to unpack configuration") } - metaGen := metadata.GetPodMetaGen(rawConfig, watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, metaConf) + metaGen := GetPodMetaGen(rawConfig, watcher, nodeWatcher, namespaceWatcher, replicaSetWatcher, jobWatcher, metaConf) p := &pod{ logger: logger, @@ -546,15 +556,39 @@ func hintsCheck(annotations mapstr.M, container string, prefix string, validate // removeUnnecessaryReplicaSetData removes all data from a ReplicaSet resource, except what we need to compute // Pod metadata. Which is just the name and owner references. func removeUnnecessaryReplicaSetData(obj interface{}) (interface{}, error) { - old, ok := obj.(*v1.ReplicaSet) + old, ok := obj.(*metav1.PartialObjectMetadata) if !ok { return nil, fmt.Errorf("obj is not a ReplicaSet") } - transformed := v1.ReplicaSet{} - transformed.ObjectMeta = kubernetes.ObjectMeta{ - Name: old.GetName(), - Namespace: old.GetNamespace(), - OwnerReferences: old.GetOwnerReferences(), + transformed := &metav1.PartialObjectMetadata{ + ObjectMeta: kubernetes.ObjectMeta{ + Name: old.GetName(), + Namespace: old.GetNamespace(), + OwnerReferences: old.GetOwnerReferences(), + ResourceVersion: old.GetResourceVersion(), + }, } return transformed, nil } + +// GetKubernetesMetadataClient returns a kubernetes metadata client. If inCluster is true, it returns an +// in cluster configuration based on the secrets mounted in the Pod. If kubeConfig is passed, +// it parses the config file to get the config required to build a client. +func GetKubernetesMetadataClient(kubeconfig string, opt kubernetes.KubeClientOptions) (clientgometa.Interface, error) { + if kubeconfig == "" { + kubeconfig = kubernetes.GetKubeConfigEnvironmentVariable() + } + + cfg, err := kubernetes.BuildConfig(kubeconfig) + if err != nil { + return nil, fmt.Errorf("unable to build kube config due to error: %w", err) + } + cfg.QPS = opt.QPS + cfg.Burst = opt.Burst + client, err := clientgometa.NewForConfig(cfg) + if err != nil { + return nil, fmt.Errorf("unable to build kubernetes clientset: %w", err) + } + + return client, nil +} diff --git a/internal/pkg/composable/providers/kubernetes/watcher.go b/internal/pkg/composable/providers/kubernetes/watcher.go index eda0af70705..70eba28562f 100644 --- a/internal/pkg/composable/providers/kubernetes/watcher.go +++ b/internal/pkg/composable/providers/kubernetes/watcher.go @@ -7,6 +7,10 @@ package kubernetes import ( "context" "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + clientgometa "k8s.io/client-go/metadata" "time" "k8s.io/apimachinery/pkg/api/meta" @@ -65,24 +69,26 @@ type watcher struct { // NewWatcher initializes the watcher client to provide a events handler for // resource from the cluster (filtered to the given node) -func NewWatcher( +func NewMetaWatcher( client kubernetes.Interface, - resource Resource, + metadataClient clientgometa.Interface, + gvr schema.GroupVersionResource, opts WatchOptions, indexers cache.Indexers, transformFunc cache.TransformFunc, ) (Watcher, error) { - return NewNamedWatcher("", client, resource, opts, indexers, transformFunc) + return NewNamedMetaWatcher("", client, metadataClient, gvr, opts, indexers, transformFunc) } -// NewNamedWatcher initializes the watcher client to provide an events handler for +// NewNamedMetaWatcher initializes the watcher client to provide an events handler for // resource from the cluster (filtered to the given node) and also allows to name the k8s // client's workqueue that is used by the watcher. Workqueue name is important for exposing workqueue // metrics, if it is empty, its metrics will not be logged by the k8s client. -func NewNamedWatcher( +func NewNamedMetaWatcher( name string, client kubernetes.Interface, - resource Resource, + metadataClient clientgometa.Interface, + gvr schema.GroupVersionResource, opts WatchOptions, indexers cache.Indexers, transformFunc cache.TransformFunc, @@ -90,10 +96,7 @@ func NewNamedWatcher( var store cache.Store var queue workqueue.Interface //nolint:staticcheck // TODO: use the typed version var cachedObject runtime.Object - informer, _, err := autodiscoverK8s.NewInformer(client, resource, opts, indexers) - if err != nil { - return nil, err - } + informer := NewMetadataInformer(metadataClient, gvr, opts, indexers) store = informer.GetStore() queue = workqueue.NewNamed(name) @@ -121,7 +124,7 @@ func NewNamedWatcher( handler: autodiscoverK8s.NoOpEventHandlerFuncs{}, } - _, err = w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + _, err := w.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(o interface{}) { w.enqueue(o, add) }, @@ -141,14 +144,6 @@ func NewNamedWatcher( // state should just be deduped by autodiscover and not stop/started periodically as would be the case with an update. w.enqueue(n, add) } - - //We check the type of resource and only if it is namespace or node return the cacheObject - switch resource.(type) { - case *Namespace: - w.cacheObject(o) - case *Node: - w.cacheObject(o) - } }, }) if err != nil { @@ -229,15 +224,6 @@ func (w *watcher) enqueue(obj interface{}, state string) { w.queue.Add(&item{key, obj, state}) } -// cacheObject updates watcher with the old version of cache objects before change during update events -func (w *watcher) cacheObject(o interface{}) { - if old, ok := o.(runtime.Object); !ok { - utilruntime.HandleError(fmt.Errorf("expected object in cache got %#v", o)) - } else { - w.cachedObject = old - } -} - // process gets the top of the work queue and processes the object that is received. func (w *watcher) process(_ context.Context) bool { obj, quit := w.queue.Get() @@ -283,3 +269,25 @@ func (w *watcher) process(_ context.Context) bool { return true } + +// NewMetadataInformer creates an informer for a given resource that only tracks the resource metadata. +func NewMetadataInformer(client clientgometa.Interface, gvr schema.GroupVersionResource, opts WatchOptions, indexers cache.Indexers) cache.SharedInformer { + ctx := context.Background() + if indexers == nil { + indexers = cache.Indexers{} + } + informer := cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return client.Resource(gvr).List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return client.Resource(gvr).Watch(ctx, options) + }, + }, + &metav1.PartialObjectMetadata{}, + opts.SyncTimeout, + indexers, + ) + return informer +}