From a9dfa796c88577e8dca1d4589704d656eafc3933 Mon Sep 17 00:00:00 2001 From: Per Goncalves da Silva Date: Tue, 21 Jan 2025 12:13:17 +0100 Subject: [PATCH] [CARRY] address gc queue problem Signed-off-by: Per Goncalves da Silva --- .../pkg/controller/operators/olm/operator.go | 11 ++-- .../pkg/lib/queueinformer/config.go | 2 - .../queueinformer/queueinformer_operator.go | 59 ++++++++++--------- .../pkg/lib/queueinformer/resourcequeue.go | 9 +++ .../pkg/controller/operators/olm/operator.go | 11 ++-- .../pkg/lib/queueinformer/config.go | 2 - .../queueinformer/queueinformer_operator.go | 59 ++++++++++--------- .../pkg/lib/queueinformer/resourcequeue.go | 9 +++ 8 files changed, 94 insertions(+), 68 deletions(-) diff --git a/staging/operator-lifecycle-manager/pkg/controller/operators/olm/operator.go b/staging/operator-lifecycle-manager/pkg/controller/operators/olm/operator.go index 3f05070719..f65cea2e85 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/operators/olm/operator.go +++ b/staging/operator-lifecycle-manager/pkg/controller/operators/olm/operator.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client" "strings" "time" @@ -896,7 +897,7 @@ func (a *Operator) syncObject(obj interface{}) (syncError error) { } else { switch metaObj.(type) { case *rbacv1.ClusterRole, *rbacv1.ClusterRoleBinding, *admissionregistrationv1.MutatingWebhookConfiguration, *admissionregistrationv1.ValidatingWebhookConfiguration: - if syncError = a.objGCQueueSet.Requeue(metaObj.GetNamespace(), metaObj.GetName()); syncError != nil { + if syncError = a.objGCQueueSet.RequeueObject(metaObj.(client.Object)); syncError != nil { logger.WithError(syncError).Warnf("failed to requeue gc event: %s/%s", metaObj.GetNamespace(), metaObj.GetName()) } return @@ -1163,7 +1164,7 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) { logger.WithError(err).Warn("cannot list cluster role bindings") } for _, crb := range crbs { - if err := a.objGCQueueSet.Requeue(crb.GetNamespace(), crb.GetName()); err != nil { + if err := a.objGCQueueSet.RequeueObject(crb); err != nil { logger.WithError(err).Warnf("failed to requeue gc event: %v", crb) } } @@ -1173,7 +1174,7 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) { logger.WithError(err).Warn("cannot list cluster roles") } for _, cr := range crs { - if err := a.objGCQueueSet.Requeue(cr.GetNamespace(), cr.GetName()); err != nil { + if err := a.objGCQueueSet.RequeueObject(cr); err != nil { logger.WithError(err).Warnf("failed to requeue gc event: %v", cr) } } @@ -1185,7 +1186,7 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) { } for _, webhook := range mWebhooks.Items { w := webhook - if err := a.objGCQueueSet.Requeue(w.GetNamespace(), w.GetName()); err != nil { + if err := a.objGCQueueSet.RequeueObject(&w); err != nil { logger.WithError(err).Warnf("failed to requeue gc event: %v", webhook) } } @@ -1196,7 +1197,7 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) { } for _, webhook := range vWebhooks.Items { w := webhook - if err := a.objGCQueueSet.Requeue(w.GetNamespace(), w.GetName()); err != nil { + if err := a.objGCQueueSet.RequeueObject(&w); err != nil { logger.WithError(err).Warnf("failed to requeue gc event: %v", webhook) } } diff --git a/staging/operator-lifecycle-manager/pkg/lib/queueinformer/config.go b/staging/operator-lifecycle-manager/pkg/lib/queueinformer/config.go index e5882f6c7b..c47ad9e190 100644 --- a/staging/operator-lifecycle-manager/pkg/lib/queueinformer/config.go +++ b/staging/operator-lifecycle-manager/pkg/lib/queueinformer/config.go @@ -51,8 +51,6 @@ func (c *queueInformerConfig) validateQueueInformer() (err error) { err = newInvalidConfigError("nil logger") case config.queue == nil: err = newInvalidConfigError("nil queue") - case config.indexer == nil: - err = newInvalidConfigError("nil indexer") case config.syncer == nil: err = newInvalidConfigError("nil syncer") } diff --git a/staging/operator-lifecycle-manager/pkg/lib/queueinformer/queueinformer_operator.go b/staging/operator-lifecycle-manager/pkg/lib/queueinformer/queueinformer_operator.go index 323b1899d2..722579c6c7 100644 --- a/staging/operator-lifecycle-manager/pkg/lib/queueinformer/queueinformer_operator.go +++ b/staging/operator-lifecycle-manager/pkg/lib/queueinformer/queueinformer_operator.go @@ -281,36 +281,41 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) logger := o.logger.WithField("item", item) logger.WithField("queue-length", queue.Len()).Trace("popped queue") - typedItem, ok := item.(types.NamespacedName) - if !ok { - panic(fmt.Sprintf("item %T is not a NamespacedName", item)) - } - key := keyForNamespacedName(typedItem) - logger = logger.WithField("cache-key", key) - - // Get the current cached version of the resource - var exists bool - var err error - resource, exists, err := loop.indexer.GetByKey(key) - if err != nil { - logger.WithError(err).Error("cache get failed") - queue.Forget(item) - return true - } - if !exists { - logger.WithField("existing-cache-keys", loop.indexer.ListKeys()).Debug("cache get failed, key not in cache") - queue.Forget(item) - return true - } - obj, ok := resource.(client.Object) - if !ok { - logger.Warn("cached object is not a kubernetes resource (client.Object)") - queue.Forget(item) - return true + var obj client.Object + switch typedItem := item.(type) { + case types.NamespacedName: + key := keyForNamespacedName(typedItem) + logger = logger.WithField("cache-key", key) + + // Get the current cached version of the resource + var exists bool + var err error + resource, exists, err := loop.indexer.GetByKey(key) + if err != nil { + logger.WithError(err).Error("cache get failed") + queue.Forget(item) + return true + } + if !exists { + logger.WithField("existing-cache-keys", loop.indexer.ListKeys()).Debug("cache get failed, key not in cache") + queue.Forget(item) + return true + } + var ok bool + obj, ok = resource.(client.Object) + if !ok { + logger.Warn("cached object is not a kubernetes resource (client.Object)") + queue.Forget(item) + return true + } + case client.Object: + obj = typedItem + default: + panic(fmt.Sprintf("unexpected item type %T", item)) } // Sync and requeue on error - err = loop.Sync(ctx, obj) + err := loop.Sync(ctx, obj) if requeues := queue.NumRequeues(item); err != nil && requeues < 8 { logger.WithField("requeues", requeues).Trace("requeuing with rate limiting") utilruntime.HandleError(errors.Wrap(err, fmt.Sprintf("sync %q failed", item))) diff --git a/staging/operator-lifecycle-manager/pkg/lib/queueinformer/resourcequeue.go b/staging/operator-lifecycle-manager/pkg/lib/queueinformer/resourcequeue.go index 98067ab832..91dab76040 100644 --- a/staging/operator-lifecycle-manager/pkg/lib/queueinformer/resourcequeue.go +++ b/staging/operator-lifecycle-manager/pkg/lib/queueinformer/resourcequeue.go @@ -2,6 +2,7 @@ package queueinformer import ( "fmt" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client" "k8s.io/apimachinery/pkg/types" "sync" "time" @@ -39,6 +40,14 @@ func (r *ResourceQueueSet) Set(key string, queue workqueue.RateLimitingInterface r.queueSet[key] = queue } +// RequeueObject requeues a client object +func (r *ResourceQueueSet) RequeueObject(obj client.Object) error { + if queue, ok := r.queueSet[obj.GetNamespace()]; ok { + queue.Add(obj) + } + return nil +} + // Requeue requeues the resource in the set with the given name and namespace func (r *ResourceQueueSet) Requeue(namespace, name string) error { r.mutex.RLock() diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/olm/operator.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/olm/operator.go index 3f05070719..f65cea2e85 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/olm/operator.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/olm/operator.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client" "strings" "time" @@ -896,7 +897,7 @@ func (a *Operator) syncObject(obj interface{}) (syncError error) { } else { switch metaObj.(type) { case *rbacv1.ClusterRole, *rbacv1.ClusterRoleBinding, *admissionregistrationv1.MutatingWebhookConfiguration, *admissionregistrationv1.ValidatingWebhookConfiguration: - if syncError = a.objGCQueueSet.Requeue(metaObj.GetNamespace(), metaObj.GetName()); syncError != nil { + if syncError = a.objGCQueueSet.RequeueObject(metaObj.(client.Object)); syncError != nil { logger.WithError(syncError).Warnf("failed to requeue gc event: %s/%s", metaObj.GetNamespace(), metaObj.GetName()) } return @@ -1163,7 +1164,7 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) { logger.WithError(err).Warn("cannot list cluster role bindings") } for _, crb := range crbs { - if err := a.objGCQueueSet.Requeue(crb.GetNamespace(), crb.GetName()); err != nil { + if err := a.objGCQueueSet.RequeueObject(crb); err != nil { logger.WithError(err).Warnf("failed to requeue gc event: %v", crb) } } @@ -1173,7 +1174,7 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) { logger.WithError(err).Warn("cannot list cluster roles") } for _, cr := range crs { - if err := a.objGCQueueSet.Requeue(cr.GetNamespace(), cr.GetName()); err != nil { + if err := a.objGCQueueSet.RequeueObject(cr); err != nil { logger.WithError(err).Warnf("failed to requeue gc event: %v", cr) } } @@ -1185,7 +1186,7 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) { } for _, webhook := range mWebhooks.Items { w := webhook - if err := a.objGCQueueSet.Requeue(w.GetNamespace(), w.GetName()); err != nil { + if err := a.objGCQueueSet.RequeueObject(&w); err != nil { logger.WithError(err).Warnf("failed to requeue gc event: %v", webhook) } } @@ -1196,7 +1197,7 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) { } for _, webhook := range vWebhooks.Items { w := webhook - if err := a.objGCQueueSet.Requeue(w.GetNamespace(), w.GetName()); err != nil { + if err := a.objGCQueueSet.RequeueObject(&w); err != nil { logger.WithError(err).Warnf("failed to requeue gc event: %v", webhook) } } diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer/config.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer/config.go index e5882f6c7b..c47ad9e190 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer/config.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer/config.go @@ -51,8 +51,6 @@ func (c *queueInformerConfig) validateQueueInformer() (err error) { err = newInvalidConfigError("nil logger") case config.queue == nil: err = newInvalidConfigError("nil queue") - case config.indexer == nil: - err = newInvalidConfigError("nil indexer") case config.syncer == nil: err = newInvalidConfigError("nil syncer") } diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer/queueinformer_operator.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer/queueinformer_operator.go index 323b1899d2..722579c6c7 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer/queueinformer_operator.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer/queueinformer_operator.go @@ -281,36 +281,41 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) logger := o.logger.WithField("item", item) logger.WithField("queue-length", queue.Len()).Trace("popped queue") - typedItem, ok := item.(types.NamespacedName) - if !ok { - panic(fmt.Sprintf("item %T is not a NamespacedName", item)) - } - key := keyForNamespacedName(typedItem) - logger = logger.WithField("cache-key", key) - - // Get the current cached version of the resource - var exists bool - var err error - resource, exists, err := loop.indexer.GetByKey(key) - if err != nil { - logger.WithError(err).Error("cache get failed") - queue.Forget(item) - return true - } - if !exists { - logger.WithField("existing-cache-keys", loop.indexer.ListKeys()).Debug("cache get failed, key not in cache") - queue.Forget(item) - return true - } - obj, ok := resource.(client.Object) - if !ok { - logger.Warn("cached object is not a kubernetes resource (client.Object)") - queue.Forget(item) - return true + var obj client.Object + switch typedItem := item.(type) { + case types.NamespacedName: + key := keyForNamespacedName(typedItem) + logger = logger.WithField("cache-key", key) + + // Get the current cached version of the resource + var exists bool + var err error + resource, exists, err := loop.indexer.GetByKey(key) + if err != nil { + logger.WithError(err).Error("cache get failed") + queue.Forget(item) + return true + } + if !exists { + logger.WithField("existing-cache-keys", loop.indexer.ListKeys()).Debug("cache get failed, key not in cache") + queue.Forget(item) + return true + } + var ok bool + obj, ok = resource.(client.Object) + if !ok { + logger.Warn("cached object is not a kubernetes resource (client.Object)") + queue.Forget(item) + return true + } + case client.Object: + obj = typedItem + default: + panic(fmt.Sprintf("unexpected item type %T", item)) } // Sync and requeue on error - err = loop.Sync(ctx, obj) + err := loop.Sync(ctx, obj) if requeues := queue.NumRequeues(item); err != nil && requeues < 8 { logger.WithField("requeues", requeues).Trace("requeuing with rate limiting") utilruntime.HandleError(errors.Wrap(err, fmt.Sprintf("sync %q failed", item))) diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer/resourcequeue.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer/resourcequeue.go index 98067ab832..91dab76040 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer/resourcequeue.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer/resourcequeue.go @@ -2,6 +2,7 @@ package queueinformer import ( "fmt" + "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client" "k8s.io/apimachinery/pkg/types" "sync" "time" @@ -39,6 +40,14 @@ func (r *ResourceQueueSet) Set(key string, queue workqueue.RateLimitingInterface r.queueSet[key] = queue } +// RequeueObject requeues a client object +func (r *ResourceQueueSet) RequeueObject(obj client.Object) error { + if queue, ok := r.queueSet[obj.GetNamespace()]; ok { + queue.Add(obj) + } + return nil +} + // Requeue requeues the resource in the set with the given name and namespace func (r *ResourceQueueSet) Requeue(namespace, name string) error { r.mutex.RLock()