Skip to content

Commit

Permalink
Update go-template-utils for better permission handling
Browse files Browse the repository at this point in the history
This will stop the RetryWatcher if the service account used for
templates no longer has access to watch a resource.

Signed-off-by: mprahl <[email protected]>
  • Loading branch information
mprahl authored and openshift-merge-bot[bot] committed Sep 20, 2024
1 parent 166702f commit ee2ea8f
Show file tree
Hide file tree
Showing 12 changed files with 189 additions and 172 deletions.
13 changes: 7 additions & 6 deletions controllers/common/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

var _ handler.EventHandler = &EnqueueRequestsFromMapFunc{}
Expand All @@ -24,34 +25,34 @@ type EnqueueRequestsFromMapFunc struct {

// Create implements EventHandler
func (e *EnqueueRequestsFromMapFunc) Create(ctx context.Context, evt event.CreateEvent,
q workqueue.RateLimitingInterface,
q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.Object)
}

// Update implements EventHandler
func (e *EnqueueRequestsFromMapFunc) Update(ctx context.Context, evt event.UpdateEvent,
q workqueue.RateLimitingInterface,
q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.ObjectNew)
}

// Delete implements EventHandler
func (e *EnqueueRequestsFromMapFunc) Delete(ctx context.Context, evt event.DeleteEvent,
q workqueue.RateLimitingInterface,
q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.Object)
}

// Generic implements EventHandler
func (e *EnqueueRequestsFromMapFunc) Generic(ctx context.Context, evt event.GenericEvent,
q workqueue.RateLimitingInterface,
q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.Object)
}

func (e *EnqueueRequestsFromMapFunc) mapAndEnqueue(ctx context.Context, q workqueue.RateLimitingInterface,
object client.Object,
func (e *EnqueueRequestsFromMapFunc) mapAndEnqueue(
ctx context.Context, q workqueue.TypedRateLimitingInterface[reconcile.Request], object client.Object,
) {
for _, req := range e.ToRequests(ctx, object) {
q.Add(req)
Expand Down
8 changes: 4 additions & 4 deletions controllers/common/policyset_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func mapPolicySetToRequests(object client.Object) []reconcile.Request {

// Create implements EventHandler
func (e *EnqueueRequestsFromPolicySet) Create(_ context.Context, evt event.CreateEvent,
q workqueue.RateLimitingInterface,
q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
for _, policy := range mapPolicySetToRequests(evt.Object) {
q.Add(policy)
Expand All @@ -53,7 +53,7 @@ func (e *EnqueueRequestsFromPolicySet) Create(_ context.Context, evt event.Creat
// Update implements EventHandler
// Enqueues the diff between the new and old policy sets in the UpdateEvent
func (e *EnqueueRequestsFromPolicySet) Update(_ context.Context, evt event.UpdateEvent,
q workqueue.RateLimitingInterface,
q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
//nolint:forcetypeassert
newPolicySet := evt.ObjectNew.(*policiesv1beta1.PolicySet)
Expand Down Expand Up @@ -97,7 +97,7 @@ func (e *EnqueueRequestsFromPolicySet) Update(_ context.Context, evt event.Updat

// Delete implements EventHandler
func (e *EnqueueRequestsFromPolicySet) Delete(_ context.Context, evt event.DeleteEvent,
q workqueue.RateLimitingInterface,
q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
for _, policy := range mapPolicySetToRequests(evt.Object) {
q.Add(policy)
Expand All @@ -106,7 +106,7 @@ func (e *EnqueueRequestsFromPolicySet) Delete(_ context.Context, evt event.Delet

// Generic implements EventHandler
func (e *EnqueueRequestsFromPolicySet) Generic(_ context.Context, evt event.GenericEvent,
q workqueue.RateLimitingInterface,
q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
for _, policy := range mapPolicySetToRequests(evt.Object) {
q.Add(policy)
Expand Down
27 changes: 12 additions & 15 deletions controllers/complianceeventsapi/complianceeventsapi_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type ComplianceServerCtx struct {
// A read lock should be used when the DB is accessed.
Lock sync.RWMutex
DB *sql.DB
Queue workqueue.Interface
Queue workqueue.TypedInterface[types.NamespacedName]
needsMigration bool
// Required to run a migration after the database connection changed or the feature was enabled.
connectionURL string
Expand Down Expand Up @@ -103,7 +103,7 @@ func NewComplianceServerCtx(dbConnectionURL string, clusterID string) (*Complian

return &ComplianceServerCtx{
Lock: sync.RWMutex{},
Queue: workqueue.New(),
Queue: workqueue.NewTyped[types.NamespacedName](),
connectionURL: dbConnectionURL,
DB: db,
ClusterID: clusterID,
Expand Down Expand Up @@ -293,20 +293,17 @@ func MonitorDatabaseConnection(
for complianceServerCtx.Queue.Len() > 0 {
request, shutdown := complianceServerCtx.Queue.Get()

switch v := request.(type) {
case types.NamespacedName:
reconcileRequests <- event.GenericEvent{
Object: &common.GuttedObject{
TypeMeta: metav1.TypeMeta{
APIVersion: policyv1.GroupVersion.String(),
Kind: "Policy",
},
ObjectMeta: metav1.ObjectMeta{
Name: v.Name,
Namespace: v.Namespace,
},
reconcileRequests <- event.GenericEvent{
Object: &common.GuttedObject{
TypeMeta: metav1.TypeMeta{
APIVersion: policyv1.GroupVersion.String(),
Kind: "Policy",
},
}
ObjectMeta: metav1.ObjectMeta{
Name: request.Name,
Namespace: request.Namespace,
},
},
}

complianceServerCtx.Queue.Done(request)
Expand Down
2 changes: 1 addition & 1 deletion controllers/propagator/replicatedpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func (r *ReplicatedPolicyReconciler) Reconcile(ctx context.Context, request ctrl

// Retry template errors due to permission issues. This isn't ideal, but there's no good event driven way to
// be notified when the permissions are given to the service account.
if k8serrors.IsForbidden(tmplErr) {
if k8serrors.IsForbidden(tmplErr) || k8serrors.IsUnauthorized(tmplErr) {
returnErr = tmplErr
}

Expand Down
9 changes: 4 additions & 5 deletions controllers/propagator/replicatedpolicy_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"

Expand All @@ -33,10 +32,10 @@ func (r *ReplicatedPolicyReconciler) SetupWithManager(
For(
&policiesv1.Policy{},
builder.WithPredicates(replicatedPolicyPredicates(r.ResourceVersions))).
WatchesRawSource(dependenciesSource, &handler.EnqueueRequestForObject{}).
WatchesRawSource(updateSrc, &handler.EnqueueRequestForObject{}).
WatchesRawSource(templateSrc, &handler.EnqueueRequestForObject{}).
WatchesRawSource(saTemplateSrc, &handler.EnqueueRequestForObject{}).
WatchesRawSource(dependenciesSource).
WatchesRawSource(updateSrc).
WatchesRawSource(templateSrc).
WatchesRawSource(saTemplateSrc).
Watches(
&clusterv1beta1.PlacementDecision{},
HandlerForDecision(mgr.GetClient()),
Expand Down
10 changes: 5 additions & 5 deletions controllers/propagator/replicatepolicy_pb_eventHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ type handlerForBinding struct {

// Create implements EventHandler.
func (e *handlerForBinding) Create(ctx context.Context,
evt event.CreateEvent, q workqueue.RateLimitingInterface,
evt event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.Object)
}

// Update implements EventHandler. Update only targeted(modified) objects
func (e *handlerForBinding) Update(ctx context.Context,
evt event.UpdateEvent, q workqueue.RateLimitingInterface,
evt event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
log.V(1).Info("Detect placementBinding and update targeted replicated-policies")
//nolint:forcetypeassert
Expand Down Expand Up @@ -73,20 +73,20 @@ func (e *handlerForBinding) Update(ctx context.Context,

// Delete implements EventHandler.
func (e *handlerForBinding) Delete(ctx context.Context,
evt event.DeleteEvent, q workqueue.RateLimitingInterface,
evt event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.Object)
}

// Generic implements EventHandler.
func (e *handlerForBinding) Generic(ctx context.Context,
evt event.GenericEvent, q workqueue.RateLimitingInterface,
evt event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.Object)
}

func (e *handlerForBinding) mapAndEnqueue(ctx context.Context,
q workqueue.RateLimitingInterface, obj client.Object,
q workqueue.TypedRateLimitingInterface[reconcile.Request], obj client.Object,
) {
pBinding := obj.(*policiesv1.PlacementBinding)
reqs := e.getMappedReplicatedPolicy(ctx, pBinding)
Expand Down
10 changes: 5 additions & 5 deletions controllers/propagator/replicatepolicy_pd_eventHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ type handlerForDecision struct {

// Create implements EventHandler.
func (e *handlerForDecision) Create(ctx context.Context,
evt event.CreateEvent, q workqueue.RateLimitingInterface,
evt event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.Object)
}

// Update implements EventHandler. Update only targeted(modified) objects
func (e *handlerForDecision) Update(ctx context.Context,
evt event.UpdateEvent, q workqueue.RateLimitingInterface,
evt event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
log.V(1).Info("Detect placementDecision and update targeted replicated-policies")
//nolint:forcetypeassert
Expand Down Expand Up @@ -80,20 +80,20 @@ func (e *handlerForDecision) Update(ctx context.Context,

// Delete implements EventHandler.
func (e *handlerForDecision) Delete(ctx context.Context,
evt event.DeleteEvent, q workqueue.RateLimitingInterface,
evt event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.Object)
}

// Generic implements EventHandler.
func (e *handlerForDecision) Generic(ctx context.Context,
evt event.GenericEvent, q workqueue.RateLimitingInterface,
evt event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.Object)
}

func (e *handlerForDecision) mapAndEnqueue(ctx context.Context,
q workqueue.RateLimitingInterface, obj client.Object,
q workqueue.TypedRateLimitingInterface[reconcile.Request], obj client.Object,
) {
pDecision := obj.(*clusterv1beta1.PlacementDecision)
reqs := e.getMappedReplicatedPolicy(ctx, pDecision)
Expand Down
10 changes: 5 additions & 5 deletions controllers/propagator/replicatepolicy_pr_eventHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ type handlerForRule struct {

// Create implements EventHandler.
func (e *handlerForRule) Create(ctx context.Context,
evt event.CreateEvent, q workqueue.RateLimitingInterface,
evt event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.Object)
}

// Update implements EventHandler. Update only targeted(modified) objects
func (e *handlerForRule) Update(ctx context.Context,
evt event.UpdateEvent, q workqueue.RateLimitingInterface,
evt event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
log.Info("Detect placementDecision and update targeted replicated-policies")
//nolint:forcetypeassert
Expand Down Expand Up @@ -76,20 +76,20 @@ func (e *handlerForRule) Update(ctx context.Context,

// Delete implements EventHandler.
func (e *handlerForRule) Delete(ctx context.Context,
evt event.DeleteEvent, q workqueue.RateLimitingInterface,
evt event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.Object)
}

// Generic implements EventHandler.
func (e *handlerForRule) Generic(ctx context.Context,
evt event.GenericEvent, q workqueue.RateLimitingInterface,
evt event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.Object)
}

func (e *handlerForRule) mapAndEnqueue(ctx context.Context,
q workqueue.RateLimitingInterface, obj client.Object,
q workqueue.TypedRateLimitingInterface[reconcile.Request], obj client.Object,
) {
pRule := obj.(*appsv1.PlacementRule)
reqs := e.getMappedReplicatedPolicy(ctx, pRule)
Expand Down
3 changes: 2 additions & 1 deletion controllers/propagator/template_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

policiesv1 "open-cluster-management.io/governance-policy-propagator/api/v1"
Expand Down Expand Up @@ -120,7 +121,7 @@ func NewTemplateResolvers(
mgrClient client.Client,
defaultTemplateResolver *templates.TemplateResolver,
replicatedPolicyUpdates chan event.GenericEvent,
) (*TemplateResolvers, *source.Channel) {
) (*TemplateResolvers, source.TypedSource[reconcile.Request]) {
dynamicWatcherReconciler, dynamicWatcherSource := k8sdepwatches.NewControllerRuntimeSource()

return &TemplateResolvers{
Expand Down
Loading

0 comments on commit ee2ea8f

Please sign in to comment.