Skip to content

Commit

Permalink
[CARRY] address gc queue problem
Browse files Browse the repository at this point in the history
Signed-off-by: Per Goncalves da Silva <[email protected]>
  • Loading branch information
Per Goncalves da Silva committed Jan 29, 2025
1 parent 10e0bf9 commit a9dfa79
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client"
"strings"
"time"

Expand Down Expand Up @@ -896,7 +897,7 @@ func (a *Operator) syncObject(obj interface{}) (syncError error) {
} else {
switch metaObj.(type) {
case *rbacv1.ClusterRole, *rbacv1.ClusterRoleBinding, *admissionregistrationv1.MutatingWebhookConfiguration, *admissionregistrationv1.ValidatingWebhookConfiguration:
if syncError = a.objGCQueueSet.Requeue(metaObj.GetNamespace(), metaObj.GetName()); syncError != nil {
if syncError = a.objGCQueueSet.RequeueObject(metaObj.(client.Object)); syncError != nil {
logger.WithError(syncError).Warnf("failed to requeue gc event: %s/%s", metaObj.GetNamespace(), metaObj.GetName())
}
return
Expand Down Expand Up @@ -1163,7 +1164,7 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) {
logger.WithError(err).Warn("cannot list cluster role bindings")
}
for _, crb := range crbs {
if err := a.objGCQueueSet.Requeue(crb.GetNamespace(), crb.GetName()); err != nil {
if err := a.objGCQueueSet.RequeueObject(crb); err != nil {
logger.WithError(err).Warnf("failed to requeue gc event: %v", crb)
}
}
Expand All @@ -1173,7 +1174,7 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) {
logger.WithError(err).Warn("cannot list cluster roles")
}
for _, cr := range crs {
if err := a.objGCQueueSet.Requeue(cr.GetNamespace(), cr.GetName()); err != nil {
if err := a.objGCQueueSet.RequeueObject(cr); err != nil {
logger.WithError(err).Warnf("failed to requeue gc event: %v", cr)
}
}
Expand All @@ -1185,7 +1186,7 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) {
}
for _, webhook := range mWebhooks.Items {
w := webhook
if err := a.objGCQueueSet.Requeue(w.GetNamespace(), w.GetName()); err != nil {
if err := a.objGCQueueSet.RequeueObject(&w); err != nil {
logger.WithError(err).Warnf("failed to requeue gc event: %v", webhook)
}
}
Expand All @@ -1196,7 +1197,7 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) {
}
for _, webhook := range vWebhooks.Items {
w := webhook
if err := a.objGCQueueSet.Requeue(w.GetNamespace(), w.GetName()); err != nil {
if err := a.objGCQueueSet.RequeueObject(&w); err != nil {
logger.WithError(err).Warnf("failed to requeue gc event: %v", webhook)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ func (c *queueInformerConfig) validateQueueInformer() (err error) {
err = newInvalidConfigError("nil logger")
case config.queue == nil:
err = newInvalidConfigError("nil queue")
case config.indexer == nil:
err = newInvalidConfigError("nil indexer")
case config.syncer == nil:
err = newInvalidConfigError("nil syncer")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,36 +281,41 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer)
logger := o.logger.WithField("item", item)
logger.WithField("queue-length", queue.Len()).Trace("popped queue")

typedItem, ok := item.(types.NamespacedName)
if !ok {
panic(fmt.Sprintf("item %T is not a NamespacedName", item))
}
key := keyForNamespacedName(typedItem)
logger = logger.WithField("cache-key", key)

// Get the current cached version of the resource
var exists bool
var err error
resource, exists, err := loop.indexer.GetByKey(key)
if err != nil {
logger.WithError(err).Error("cache get failed")
queue.Forget(item)
return true
}
if !exists {
logger.WithField("existing-cache-keys", loop.indexer.ListKeys()).Debug("cache get failed, key not in cache")
queue.Forget(item)
return true
}
obj, ok := resource.(client.Object)
if !ok {
logger.Warn("cached object is not a kubernetes resource (client.Object)")
queue.Forget(item)
return true
var obj client.Object
switch typedItem := item.(type) {
case types.NamespacedName:
key := keyForNamespacedName(typedItem)
logger = logger.WithField("cache-key", key)

// Get the current cached version of the resource
var exists bool
var err error
resource, exists, err := loop.indexer.GetByKey(key)
if err != nil {
logger.WithError(err).Error("cache get failed")
queue.Forget(item)
return true
}
if !exists {
logger.WithField("existing-cache-keys", loop.indexer.ListKeys()).Debug("cache get failed, key not in cache")
queue.Forget(item)
return true
}
var ok bool
obj, ok = resource.(client.Object)
if !ok {
logger.Warn("cached object is not a kubernetes resource (client.Object)")
queue.Forget(item)
return true
}
case client.Object:
obj = typedItem
default:
panic(fmt.Sprintf("unexpected item type %T", item))
}

// Sync and requeue on error
err = loop.Sync(ctx, obj)
err := loop.Sync(ctx, obj)
if requeues := queue.NumRequeues(item); err != nil && requeues < 8 {
logger.WithField("requeues", requeues).Trace("requeuing with rate limiting")
utilruntime.HandleError(errors.Wrap(err, fmt.Sprintf("sync %q failed", item)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package queueinformer

import (
"fmt"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client"
"k8s.io/apimachinery/pkg/types"
"sync"
"time"
Expand Down Expand Up @@ -39,6 +40,14 @@ func (r *ResourceQueueSet) Set(key string, queue workqueue.RateLimitingInterface
r.queueSet[key] = queue
}

// RequeueObject requeues a client object
func (r *ResourceQueueSet) RequeueObject(obj client.Object) error {
if queue, ok := r.queueSet[obj.GetNamespace()]; ok {
queue.Add(obj)
}
return nil
}

// Requeue requeues the resource in the set with the given name and namespace
func (r *ResourceQueueSet) Requeue(namespace, name string) error {
r.mutex.RLock()
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a9dfa79

Please sign in to comment.