Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move the disabled events recording outside of spec-sync #132

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions controllers/complianceeventssync/disabled_events_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package complianceeventssync

import (
"context"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
apiWatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
apiCache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/watch"
"k8s.io/client-go/util/workqueue"
policyv1 "open-cluster-management.io/governance-policy-propagator/api/v1"
ctrl "sigs.k8s.io/controller-runtime"

"open-cluster-management.io/governance-policy-framework-addon/controllers/utils"
)

var (
log = ctrl.Log.WithName("disabled-events-recorder")
GVRPolicy = schema.GroupVersionResource{
Group: "policy.open-cluster-management.io",
Version: "v1",
Resource: "policies",
}
)

// DisabledEventsRecorder watches for deleted policies and records disabled compliance events on the compliance history
// API. Set initialResourceVersion to the list query of policies in the managed cluster namespace from before the
// spec-sync starting to avoid missing events.
func DisabledEventsRecorder(
ctx context.Context,
managedClient dynamic.Interface,
clusterNamespace string,
// EventsQueue is a queue that accepts ComplianceAPIEventRequest to then be recorded in the compliance events
// API by StartComplianceEventsSyncer.
eventsQueue workqueue.RateLimitingInterface,
initialResourceVersion string,
) {
timeout := int64(30)
var watcher *watch.RetryWatcher

resourceVersion := initialResourceVersion

for {
if watcher == nil {
if resourceVersion == "" {
listResult, err := managedClient.Resource(GVRPolicy).Namespace(clusterNamespace).List(
ctx, metav1.ListOptions{TimeoutSeconds: &timeout},
)
if err != nil {
log.Error(err, "Failed to list the policies for recording disabled events. Will retry.")

time.Sleep(time.Second)

continue
}

resourceVersion = listResult.GetResourceVersion()
}

watchFunc := func(options metav1.ListOptions) (apiWatch.Interface, error) {
return managedClient.Resource(GVRPolicy).Namespace(clusterNamespace).Watch(ctx, options)
}

var err error

watcher, err = watch.NewRetryWatcher(resourceVersion, &apiCache.ListWatch{WatchFunc: watchFunc})
if err != nil {
log.Error(err, "Failed to watch the policies for recording disabled events. Will retry.")

time.Sleep(time.Second)

continue
}

// Set the resourceVersion to an empty string after a successful start of the watcher so that if the
// watcher unexpectedly stops, it will just start from the latest.
resourceVersion = ""
}

select {
case <-ctx.Done():
// Stop the retry watcher if the parent context is canceled. It likely already is stopped, but this is not
// documented behavior.
watcher.Stop()

return
case <-watcher.Done():
// Restart the watcher on the next loop since the context wasn't closed which indicates it was not stopped
// on purpose.
watcher = nil
case result := <-watcher.ResultChan():
if result.Type != apiWatch.Deleted {
break
}

unstructuredPolicy, ok := result.Object.(*unstructured.Unstructured)
if !ok {
break
}

managedPolicy := &policyv1.Policy{}

err := k8sruntime.DefaultUnstructuredConverter.FromUnstructured(unstructuredPolicy.Object, managedPolicy)
if err != nil {
log.Error(err, "Failed to convert the unstructured object to a typed policy")

break
}

for _, tmplEntry := range managedPolicy.Spec.PolicyTemplates {
tmpl := &unstructured.Unstructured{}

err := tmpl.UnmarshalJSON(tmplEntry.ObjectDefinition.Raw)
if err != nil {
continue
}

if tmpl.GetAnnotations()[utils.PolicyDBIDAnnotation] == "" {
continue
}

ce, err := utils.GenerateDisabledEvent(
managedPolicy,
tmpl,
"The policy was removed because the parent policy no longer applies to this cluster",
)
if err != nil {
log.Error(err, "Failed to generate a disabled compliance API event")
} else {
eventsQueue.Add(ce)
}
}
}
}
}
51 changes: 0 additions & 51 deletions controllers/specsync/policy_spec_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@ import (

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
policiesv1 "open-cluster-management.io/governance-policy-propagator/api/v1"
"open-cluster-management.io/governance-policy-propagator/controllers/common"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -61,9 +59,6 @@ type PolicyReconciler struct {
// The namespace that the replicated policies should be synced to.
TargetNamespace string
ConcurrentReconciles int
// EventsQueue is a queue that accepts ComplianceAPIEventRequest to then be recorded in the compliance events
// API by StartComplianceEventsSyncer. If the compliance events API is disabled, this will be nil.
EventsQueue workqueue.RateLimitingInterface
}

//+kubebuilder:rbac:groups=policy.open-cluster-management.io,resources=policies,verbs=create;delete;get;list;patch;update;watch
Expand Down Expand Up @@ -100,23 +95,6 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ
// replicated policy on hub was deleted, remove policy on managed cluster
reqLogger.Info("Policy was deleted, removing on managed cluster...")

managedPolicy := &policiesv1.Policy{}
if r.EventsQueue != nil {
err := r.ManagedClient.Get(
ctx, types.NamespacedName{Namespace: r.TargetNamespace, Name: request.Name}, managedPolicy,
)
if errors.IsNotFound(err) {
// The policy is already deleted on the managed cluster so there is nothing to delete
return reconcile.Result{}, nil
}

if err != nil {
reqLogger.Error(err, "Failed to get the replicated policy on the managed cluster")

return reconcile.Result{}, err
}
}

err = r.ManagedClient.Delete(ctx, &policiesv1.Policy{
TypeMeta: metav1.TypeMeta{
Kind: policiesv1.Kind,
Expand All @@ -134,35 +112,6 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, request reconcile.Requ
return reconcile.Result{}, err
}

// Only record disabled compliance events if the deletion timestamp is nil. The reason is that if there
// is a finalizer that prevents immediate deletion and there is another reconcile, the disabled events
// shouldn't be recorded again.
if r.EventsQueue != nil && managedPolicy.GetDeletionTimestamp() == nil {
for _, tmplEntry := range managedPolicy.Spec.PolicyTemplates {
tmpl := &unstructured.Unstructured{}

err := tmpl.UnmarshalJSON(tmplEntry.ObjectDefinition.Raw)
if err != nil {
continue
}

if tmpl.GetAnnotations()[utils.PolicyDBIDAnnotation] == "" {
continue
}

ce, err := utils.GenerateDisabledEvent(
managedPolicy,
tmpl,
"The policy was removed because the parent policy no longer applies to this cluster",
)
if err != nil {
log.Error(err, "Failed to generate a disabled compliance API event")
} else {
r.EventsQueue.Add(ce)
}
}
}

reqLogger.Info("Policy has been removed from managed cluster...Reconciliation complete.")

return reconcile.Result{}, nil
Expand Down
7 changes: 1 addition & 6 deletions controllers/statussync/policy_status_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ func StartComplianceEventsSyncer(
ctx context.Context,
clusterName string,
hubCfg *rest.Config,
managedCfg *rest.Config,
managedClient *dynamic.DynamicClient,
apiURL string,
events workqueue.RateLimitingInterface,
) error {
Expand All @@ -578,11 +578,6 @@ func StartComplianceEventsSyncer(
hubToken = hubCfg.BearerToken
}

managedClient, err := dynamic.NewForConfig(managedCfg)
if err != nil {
return err
}

var clusterID string

idClusterClaim, err := managedClient.Resource(clusterClaimGVR).Get(ctx, "id.k8s.io", metav1.GetOptions{})
Expand Down
31 changes: 29 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/source"

"open-cluster-management.io/governance-policy-framework-addon/controllers/complianceeventssync"
"open-cluster-management.io/governance-policy-framework-addon/controllers/gatekeepersync"
"open-cluster-management.io/governance-policy-framework-addon/controllers/secretsync"
"open-cluster-management.io/governance-policy-framework-addon/controllers/specsync"
Expand Down Expand Up @@ -334,11 +335,29 @@ func main() {

var queue workqueue.RateLimitingInterface

var dynamicClient *dynamic.DynamicClient

var policyListResourceVersion string

if tool.Options.ComplianceAPIURL != "" {
queue = workqueue.NewRateLimitingQueueWithConfig(
workqueue.DefaultControllerRateLimiter(),
workqueue.RateLimitingQueueConfig{Name: "compliance-api-events"},
)

dynamicClient = dynamic.NewForConfigOrDie(managedCfg)

gvr := complianceeventssync.GVRPolicy

listResult, err := dynamicClient.Resource(gvr).Namespace(tool.Options.ClusterNamespace).List(
mgrCtx, metav1.ListOptions{},
)
if err != nil {
log.Error(err, "Failed to list the policies for recording disabled events")
os.Exit(1)
}

policyListResourceVersion = listResult.GetResourceVersion()
}

addControllers(mgrCtx, hubCfg, hubMgr, mgr, queue)
Expand All @@ -362,7 +381,7 @@ func main() {

go func() {
err := statussync.StartComplianceEventsSyncer(
mgrCtx, tool.Options.ClusterNamespace, hubCfg, managedCfg, tool.Options.ComplianceAPIURL, queue,
mgrCtx, tool.Options.ClusterNamespace, hubCfg, dynamicClient, tool.Options.ComplianceAPIURL, queue,
)
if err != nil {
log.Error(err, "Failed to start the compliance events API syncer")
Expand All @@ -372,6 +391,15 @@ func main() {

wg.Done()
}()

wg.Add(1)

go func() {
complianceeventssync.DisabledEventsRecorder(
mgrCtx, dynamicClient, tool.Options.ClusterNamespace, queue, policyListResourceVersion,
)
wg.Done()
}()
}

var errorExit bool
Expand Down Expand Up @@ -881,7 +909,6 @@ func addControllers(
Scheme: hubMgr.GetScheme(),
TargetNamespace: tool.Options.ClusterNamespace,
ConcurrentReconciles: int(tool.Options.EvaluationConcurrency),
EventsQueue: queue,
}).SetupWithManager(hubMgr, specSyncRequestsSource); err != nil {
log.Error(err, "Unable to create the controller", "controller", specsync.ControllerName)
os.Exit(1)
Expand Down