Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update go-template-utils for better permission handling #243

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions controllers/common/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

var _ handler.EventHandler = &EnqueueRequestsFromMapFunc{}
Expand All @@ -24,34 +25,34 @@ type EnqueueRequestsFromMapFunc struct {

// Create implements EventHandler
func (e *EnqueueRequestsFromMapFunc) Create(ctx context.Context, evt event.CreateEvent,
q workqueue.RateLimitingInterface,
q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.Object)
}

// Update implements EventHandler
func (e *EnqueueRequestsFromMapFunc) Update(ctx context.Context, evt event.UpdateEvent,
q workqueue.RateLimitingInterface,
q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.ObjectNew)
}

// Delete implements EventHandler
func (e *EnqueueRequestsFromMapFunc) Delete(ctx context.Context, evt event.DeleteEvent,
q workqueue.RateLimitingInterface,
q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.Object)
}

// Generic implements EventHandler
func (e *EnqueueRequestsFromMapFunc) Generic(ctx context.Context, evt event.GenericEvent,
q workqueue.RateLimitingInterface,
q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.Object)
}

func (e *EnqueueRequestsFromMapFunc) mapAndEnqueue(ctx context.Context, q workqueue.RateLimitingInterface,
object client.Object,
func (e *EnqueueRequestsFromMapFunc) mapAndEnqueue(
ctx context.Context, q workqueue.TypedRateLimitingInterface[reconcile.Request], object client.Object,
) {
for _, req := range e.ToRequests(ctx, object) {
q.Add(req)
Expand Down
8 changes: 4 additions & 4 deletions controllers/common/policyset_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func mapPolicySetToRequests(object client.Object) []reconcile.Request {

// Create implements EventHandler
func (e *EnqueueRequestsFromPolicySet) Create(_ context.Context, evt event.CreateEvent,
q workqueue.RateLimitingInterface,
q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
for _, policy := range mapPolicySetToRequests(evt.Object) {
q.Add(policy)
Expand All @@ -53,7 +53,7 @@ func (e *EnqueueRequestsFromPolicySet) Create(_ context.Context, evt event.Creat
// Update implements EventHandler
// Enqueues the diff between the new and old policy sets in the UpdateEvent
func (e *EnqueueRequestsFromPolicySet) Update(_ context.Context, evt event.UpdateEvent,
q workqueue.RateLimitingInterface,
q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
//nolint:forcetypeassert
newPolicySet := evt.ObjectNew.(*policiesv1beta1.PolicySet)
Expand Down Expand Up @@ -97,7 +97,7 @@ func (e *EnqueueRequestsFromPolicySet) Update(_ context.Context, evt event.Updat

// Delete implements EventHandler
func (e *EnqueueRequestsFromPolicySet) Delete(_ context.Context, evt event.DeleteEvent,
q workqueue.RateLimitingInterface,
q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
for _, policy := range mapPolicySetToRequests(evt.Object) {
q.Add(policy)
Expand All @@ -106,7 +106,7 @@ func (e *EnqueueRequestsFromPolicySet) Delete(_ context.Context, evt event.Delet

// Generic implements EventHandler
func (e *EnqueueRequestsFromPolicySet) Generic(_ context.Context, evt event.GenericEvent,
q workqueue.RateLimitingInterface,
q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
for _, policy := range mapPolicySetToRequests(evt.Object) {
q.Add(policy)
Expand Down
27 changes: 12 additions & 15 deletions controllers/complianceeventsapi/complianceeventsapi_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type ComplianceServerCtx struct {
// A read lock should be used when the DB is accessed.
Lock sync.RWMutex
DB *sql.DB
Queue workqueue.Interface
Queue workqueue.TypedInterface[types.NamespacedName]
needsMigration bool
// Required to run a migration after the database connection changed or the feature was enabled.
connectionURL string
Expand Down Expand Up @@ -103,7 +103,7 @@ func NewComplianceServerCtx(dbConnectionURL string, clusterID string) (*Complian

return &ComplianceServerCtx{
Lock: sync.RWMutex{},
Queue: workqueue.New(),
Queue: workqueue.NewTyped[types.NamespacedName](),
connectionURL: dbConnectionURL,
DB: db,
ClusterID: clusterID,
Expand Down Expand Up @@ -293,20 +293,17 @@ func MonitorDatabaseConnection(
for complianceServerCtx.Queue.Len() > 0 {
request, shutdown := complianceServerCtx.Queue.Get()

switch v := request.(type) {
case types.NamespacedName:
reconcileRequests <- event.GenericEvent{
Object: &common.GuttedObject{
TypeMeta: metav1.TypeMeta{
APIVersion: policyv1.GroupVersion.String(),
Kind: "Policy",
},
ObjectMeta: metav1.ObjectMeta{
Name: v.Name,
Namespace: v.Namespace,
},
reconcileRequests <- event.GenericEvent{
Object: &common.GuttedObject{
TypeMeta: metav1.TypeMeta{
APIVersion: policyv1.GroupVersion.String(),
Kind: "Policy",
},
}
ObjectMeta: metav1.ObjectMeta{
Name: request.Name,
Namespace: request.Namespace,
},
},
}

complianceServerCtx.Queue.Done(request)
Expand Down
2 changes: 1 addition & 1 deletion controllers/propagator/replicatedpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func (r *ReplicatedPolicyReconciler) Reconcile(ctx context.Context, request ctrl

// Retry template errors due to permission issues. This isn't ideal, but there's no good event driven way to
// be notified when the permissions are given to the service account.
if k8serrors.IsForbidden(tmplErr) {
if k8serrors.IsForbidden(tmplErr) || k8serrors.IsUnauthorized(tmplErr) {
returnErr = tmplErr
}

Expand Down
9 changes: 4 additions & 5 deletions controllers/propagator/replicatedpolicy_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"

Expand All @@ -33,10 +32,10 @@ func (r *ReplicatedPolicyReconciler) SetupWithManager(
For(
&policiesv1.Policy{},
builder.WithPredicates(replicatedPolicyPredicates(r.ResourceVersions))).
WatchesRawSource(dependenciesSource, &handler.EnqueueRequestForObject{}).
WatchesRawSource(updateSrc, &handler.EnqueueRequestForObject{}).
WatchesRawSource(templateSrc, &handler.EnqueueRequestForObject{}).
WatchesRawSource(saTemplateSrc, &handler.EnqueueRequestForObject{}).
WatchesRawSource(dependenciesSource).
WatchesRawSource(updateSrc).
WatchesRawSource(templateSrc).
WatchesRawSource(saTemplateSrc).
Watches(
&clusterv1beta1.PlacementDecision{},
HandlerForDecision(mgr.GetClient()),
Expand Down
10 changes: 5 additions & 5 deletions controllers/propagator/replicatepolicy_pb_eventHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ type handlerForBinding struct {

// Create implements EventHandler.
func (e *handlerForBinding) Create(ctx context.Context,
evt event.CreateEvent, q workqueue.RateLimitingInterface,
evt event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.Object)
}

// Update implements EventHandler. Update only targeted(modified) objects
func (e *handlerForBinding) Update(ctx context.Context,
evt event.UpdateEvent, q workqueue.RateLimitingInterface,
evt event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
log.V(1).Info("Detect placementBinding and update targeted replicated-policies")
//nolint:forcetypeassert
Expand Down Expand Up @@ -73,20 +73,20 @@ func (e *handlerForBinding) Update(ctx context.Context,

// Delete implements EventHandler.
func (e *handlerForBinding) Delete(ctx context.Context,
evt event.DeleteEvent, q workqueue.RateLimitingInterface,
evt event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.Object)
}

// Generic implements EventHandler.
func (e *handlerForBinding) Generic(ctx context.Context,
evt event.GenericEvent, q workqueue.RateLimitingInterface,
evt event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.Object)
}

func (e *handlerForBinding) mapAndEnqueue(ctx context.Context,
q workqueue.RateLimitingInterface, obj client.Object,
q workqueue.TypedRateLimitingInterface[reconcile.Request], obj client.Object,
) {
pBinding := obj.(*policiesv1.PlacementBinding)
reqs := e.getMappedReplicatedPolicy(ctx, pBinding)
Expand Down
10 changes: 5 additions & 5 deletions controllers/propagator/replicatepolicy_pd_eventHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ type handlerForDecision struct {

// Create implements EventHandler.
func (e *handlerForDecision) Create(ctx context.Context,
evt event.CreateEvent, q workqueue.RateLimitingInterface,
evt event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.Object)
}

// Update implements EventHandler. Update only targeted(modified) objects
func (e *handlerForDecision) Update(ctx context.Context,
evt event.UpdateEvent, q workqueue.RateLimitingInterface,
evt event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
log.V(1).Info("Detect placementDecision and update targeted replicated-policies")
//nolint:forcetypeassert
Expand Down Expand Up @@ -80,20 +80,20 @@ func (e *handlerForDecision) Update(ctx context.Context,

// Delete implements EventHandler.
func (e *handlerForDecision) Delete(ctx context.Context,
evt event.DeleteEvent, q workqueue.RateLimitingInterface,
evt event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.Object)
}

// Generic implements EventHandler.
func (e *handlerForDecision) Generic(ctx context.Context,
evt event.GenericEvent, q workqueue.RateLimitingInterface,
evt event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.Object)
}

func (e *handlerForDecision) mapAndEnqueue(ctx context.Context,
q workqueue.RateLimitingInterface, obj client.Object,
q workqueue.TypedRateLimitingInterface[reconcile.Request], obj client.Object,
) {
pDecision := obj.(*clusterv1beta1.PlacementDecision)
reqs := e.getMappedReplicatedPolicy(ctx, pDecision)
Expand Down
10 changes: 5 additions & 5 deletions controllers/propagator/replicatepolicy_pr_eventHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ type handlerForRule struct {

// Create implements EventHandler.
func (e *handlerForRule) Create(ctx context.Context,
evt event.CreateEvent, q workqueue.RateLimitingInterface,
evt event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.Object)
}

// Update implements EventHandler. Update only targeted(modified) objects
func (e *handlerForRule) Update(ctx context.Context,
evt event.UpdateEvent, q workqueue.RateLimitingInterface,
evt event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
log.Info("Detect placementDecision and update targeted replicated-policies")
//nolint:forcetypeassert
Expand Down Expand Up @@ -76,20 +76,20 @@ func (e *handlerForRule) Update(ctx context.Context,

// Delete implements EventHandler.
func (e *handlerForRule) Delete(ctx context.Context,
evt event.DeleteEvent, q workqueue.RateLimitingInterface,
evt event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.Object)
}

// Generic implements EventHandler.
func (e *handlerForRule) Generic(ctx context.Context,
evt event.GenericEvent, q workqueue.RateLimitingInterface,
evt event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
e.mapAndEnqueue(ctx, q, evt.Object)
}

func (e *handlerForRule) mapAndEnqueue(ctx context.Context,
q workqueue.RateLimitingInterface, obj client.Object,
q workqueue.TypedRateLimitingInterface[reconcile.Request], obj client.Object,
) {
pRule := obj.(*appsv1.PlacementRule)
reqs := e.getMappedReplicatedPolicy(ctx, pRule)
Expand Down
3 changes: 2 additions & 1 deletion controllers/propagator/template_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

policiesv1 "open-cluster-management.io/governance-policy-propagator/api/v1"
Expand Down Expand Up @@ -120,7 +121,7 @@ func NewTemplateResolvers(
mgrClient client.Client,
defaultTemplateResolver *templates.TemplateResolver,
replicatedPolicyUpdates chan event.GenericEvent,
) (*TemplateResolvers, *source.Channel) {
) (*TemplateResolvers, source.TypedSource[reconcile.Request]) {
dynamicWatcherReconciler, dynamicWatcherSource := k8sdepwatches.NewControllerRuntimeSource()

return &TemplateResolvers{
Expand Down
Loading