Skip to content

Commit

Permalink
Move the disabled events recording outside of spec-sync
Browse files Browse the repository at this point in the history
There were issues with duplicated compliance events and having a
different code path in the governance-policy-propagator for a
self-managed hub. This will unifies the approaches.

It takes the approach of using a retry watcher and just generating
the disabled event when the policy is deleted. The controller-runtime
library cannot be used because when there is a reconcile event for a
deleted object, you don't have access to the object as it was right
before deletion.

Relates:
https://issues.redhat.com/browse/ACM-10418

Signed-off-by: mprahl <[email protected]>
  • Loading branch information
mprahl committed Apr 16, 2024
1 parent 9340840 commit 7dace50
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 59 deletions.
140 changes: 140 additions & 0 deletions controllers/complianceeventssync/disabled_events_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package complianceeventssync

import (
"context"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
apiWatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
apiCache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/watch"
"k8s.io/client-go/util/workqueue"
policyv1 "open-cluster-management.io/governance-policy-propagator/api/v1"
ctrl "sigs.k8s.io/controller-runtime"

"open-cluster-management.io/governance-policy-framework-addon/controllers/utils"
)

var (
log = ctrl.Log.WithName("disabled-events-recorder")
GVRPolicy = schema.GroupVersionResource{
Group: "policy.open-cluster-management.io",
Version: "v1",
Resource: "policies",
}
)

// DisabledEventsRecorder watches for deleted policies and records disabled compliance events on the compliance history
// API. Set initialResourceVersion to the list query of policies in the managed cluster namespace from before the
// spec-sync starting to avoid missing events.
func DisabledEventsRecorder(
ctx context.Context,
managedClient dynamic.Interface,
clusterNamespace string,
// EventsQueue is a queue that accepts ComplianceAPIEventRequest to then be recorded in the compliance events
// API by StartComplianceEventsSyncer.
eventsQueue workqueue.RateLimitingInterface,
initialResourceVersion string,
) {
timeout := int64(30)
var watcher *watch.RetryWatcher

resourceVersion := initialResourceVersion

for {
if watcher == nil {
if resourceVersion == "" {
listResult, err := managedClient.Resource(GVRPolicy).Namespace(clusterNamespace).List(
ctx, metav1.ListOptions{TimeoutSeconds: &timeout},
)
if err != nil {
log.Error(err, "Failed to list the policies for recording disabled events. Will retry.")

time.Sleep(time.Second)

continue
}

resourceVersion = listResult.GetResourceVersion()
}

watchFunc := func(options metav1.ListOptions) (apiWatch.Interface, error) {
return managedClient.Resource(GVRPolicy).Namespace(clusterNamespace).Watch(ctx, options)
}

var err error

watcher, err = watch.NewRetryWatcher(resourceVersion, &apiCache.ListWatch{WatchFunc: watchFunc})
if err != nil {
log.Error(err, "Failed to watch the policies for recording disabled events. Will retry.")

time.Sleep(time.Second)

continue
}

// Set the resourceVersion to an empty string after a successful start of the watcher so that if the
// watcher unexpectedly stops, it will just start from the latest.
resourceVersion = ""
}

select {
case <-ctx.Done():
// Stop the retry watcher if the parent context is canceled. It likely already is stopped, but this is not
// documented behavior.
watcher.Stop()

return
case <-watcher.Done():
// Restart the watcher on the next loop since the context wasn't closed which indicates it was not stopped
// on purpose.
watcher = nil
case result := <-watcher.ResultChan():
if result.Type != apiWatch.Deleted {
break
}

unstructuredPolicy, ok := result.Object.(*unstructured.Unstructured)
if !ok {
break
}

managedPolicy := &policyv1.Policy{}

err := k8sruntime.DefaultUnstructuredConverter.FromUnstructured(unstructuredPolicy.Object, managedPolicy)
if err != nil {
log.Error(err, "Failed to convert the unstructured object to a typed policy")

break
}

for _, tmplEntry := range managedPolicy.Spec.PolicyTemplates {
tmpl := &unstructured.Unstructured{}

err := tmpl.UnmarshalJSON(tmplEntry.ObjectDefinition.Raw)
if err != nil {
continue
}

if tmpl.GetAnnotations()[utils.PolicyDBIDAnnotation] == "" {
continue
}

ce, err := utils.GenerateDisabledEvent(
managedPolicy,
tmpl,
"The policy was removed because the parent policy no longer applies to this cluster",
)
if err != nil {
log.Error(err, "Failed to generate a disabled compliance API event")
} else {
eventsQueue.Add(ce)
}
}
}
}
}
51 changes: 0 additions & 51 deletions controllers/specsync/policy_spec_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@ import (

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
policiesv1 "open-cluster-management.io/governance-policy-propagator/api/v1"
"open-cluster-management.io/governance-policy-propagator/controllers/common"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -61,9 +59,6 @@ type PolicyReconciler struct {
// The namespace that the replicated policies should be synced to.
TargetNamespace string
ConcurrentReconciles int
// EventsQueue is a queue that accepts ComplianceAPIEventRequest to then be recorded in the compliance events
// API by StartComplianceEventsSyncer. If the compliance events API is disabled, this will be nil.
EventsQueue workqueue.RateLimitingInterface
}

//+kubebuilder:rbac:groups=policy.open-cluster-management.io,resources=policies,verbs=create;delete;get;list;patch;update;watch
Expand Down Expand Up @@ -100,23 +95,6 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ
// replicated policy on hub was deleted, remove policy on managed cluster
reqLogger.Info("Policy was deleted, removing on managed cluster...")

managedPolicy := &policiesv1.Policy{}
if r.EventsQueue != nil {
err := r.ManagedClient.Get(
ctx, types.NamespacedName{Namespace: r.TargetNamespace, Name: request.Name}, managedPolicy,
)
if errors.IsNotFound(err) {
// The policy is already deleted on the managed cluster so there is nothing to delete
return reconcile.Result{}, nil
}

if err != nil {
reqLogger.Error(err, "Failed to get the replicated policy on the managed cluster")

return reconcile.Result{}, err
}
}

err = r.ManagedClient.Delete(ctx, &policiesv1.Policy{
TypeMeta: metav1.TypeMeta{
Kind: policiesv1.Kind,
Expand All @@ -134,35 +112,6 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ
return reconcile.Result{}, err
}

// Only record disabled compliance events if the deletion timestamp is nil. The reason is that if there
// is a finalizer that prevents immediate deletion and there is another reconcile, the disabled events
// shouldn't be recorded again.
if r.EventsQueue != nil && managedPolicy.GetDeletionTimestamp() == nil {
for _, tmplEntry := range managedPolicy.Spec.PolicyTemplates {
tmpl := &unstructured.Unstructured{}

err := tmpl.UnmarshalJSON(tmplEntry.ObjectDefinition.Raw)
if err != nil {
continue
}

if tmpl.GetAnnotations()[utils.PolicyDBIDAnnotation] == "" {
continue
}

ce, err := utils.GenerateDisabledEvent(
managedPolicy,
tmpl,
"The policy was removed because the parent policy no longer applies to this cluster",
)
if err != nil {
log.Error(err, "Failed to generate a disabled compliance API event")
} else {
r.EventsQueue.Add(ce)
}
}
}

reqLogger.Info("Policy has been removed from managed cluster...Reconciliation complete.")

return reconcile.Result{}, nil
Expand Down
7 changes: 1 addition & 6 deletions controllers/statussync/policy_status_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ func StartComplianceEventsSyncer(
ctx context.Context,
clusterName string,
hubCfg *rest.Config,
managedCfg *rest.Config,
managedClient *dynamic.DynamicClient,
apiURL string,
events workqueue.RateLimitingInterface,
) error {
Expand All @@ -578,11 +578,6 @@ func StartComplianceEventsSyncer(
hubToken = hubCfg.BearerToken
}

managedClient, err := dynamic.NewForConfig(managedCfg)
if err != nil {
return err
}

var clusterID string

idClusterClaim, err := managedClient.Resource(clusterClaimGVR).Get(ctx, "id.k8s.io", metav1.GetOptions{})
Expand Down
31 changes: 29 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/source"

"open-cluster-management.io/governance-policy-framework-addon/controllers/complianceeventssync"
"open-cluster-management.io/governance-policy-framework-addon/controllers/gatekeepersync"
"open-cluster-management.io/governance-policy-framework-addon/controllers/secretsync"
"open-cluster-management.io/governance-policy-framework-addon/controllers/specsync"
Expand Down Expand Up @@ -334,11 +335,29 @@ func main() {

var queue workqueue.RateLimitingInterface

var dynamicClient *dynamic.DynamicClient

var policyListResourceVersion string

if tool.Options.ComplianceAPIURL != "" {
queue = workqueue.NewRateLimitingQueueWithConfig(
workqueue.DefaultControllerRateLimiter(),
workqueue.RateLimitingQueueConfig{Name: "compliance-api-events"},
)

dynamicClient = dynamic.NewForConfigOrDie(managedCfg)

gvr := complianceeventssync.GVRPolicy

listResult, err := dynamicClient.Resource(gvr).Namespace(tool.Options.ClusterNamespace).List(
mgrCtx, metav1.ListOptions{},
)
if err != nil {
log.Error(err, "Failed to list the policies for recording disabled events")
os.Exit(1)
}

policyListResourceVersion = listResult.GetResourceVersion()
}

addControllers(mgrCtx, hubCfg, hubMgr, mgr, queue)
Expand All @@ -362,7 +381,7 @@ func main() {

go func() {
err := statussync.StartComplianceEventsSyncer(
mgrCtx, tool.Options.ClusterNamespace, hubCfg, managedCfg, tool.Options.ComplianceAPIURL, queue,
mgrCtx, tool.Options.ClusterNamespace, hubCfg, dynamicClient, tool.Options.ComplianceAPIURL, queue,
)
if err != nil {
log.Error(err, "Failed to start the compliance events API syncer")
Expand All @@ -372,6 +391,15 @@ func main() {

wg.Done()
}()

wg.Add(1)

go func() {
complianceeventssync.DisabledEventsRecorder(
mgrCtx, dynamicClient, tool.Options.ClusterNamespace, queue, policyListResourceVersion,
)
wg.Done()
}()
}

var errorExit bool
Expand Down Expand Up @@ -881,7 +909,6 @@ func addControllers(
Scheme: hubMgr.GetScheme(),
TargetNamespace: tool.Options.ClusterNamespace,
ConcurrentReconciles: int(tool.Options.EvaluationConcurrency),
EventsQueue: queue,
}).SetupWithManager(hubMgr, specSyncRequestsSource); err != nil {
log.Error(err, "Unable to create the controller", "controller", specsync.ControllerName)
os.Exit(1)
Expand Down

0 comments on commit 7dace50

Please sign in to comment.