Skip to content

Commit

Permalink
Remove the code to generate disabled compliance events for local-cluster
Browse files Browse the repository at this point in the history
This is now handled in the governance-policy-framework-addon:
open-cluster-management-io/governance-policy-framework-addon#132

Relates:
https://issues.redhat.com/browse/ACM-10418

Signed-off-by: mprahl <[email protected]>
  • Loading branch information
mprahl committed Apr 18, 2024
1 parent 6280a9b commit 0a78fa3
Show file tree
Hide file tree
Showing 5 changed files with 1 addition and 291 deletions.
79 changes: 0 additions & 79 deletions controllers/complianceeventsapi/complianceeventsapi_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
_ "github.com/golang-migrate/migrate/v4/database/postgres"
"github.com/golang-migrate/migrate/v4/source"
"github.com/golang-migrate/migrate/v4/source/iofs"
"github.com/lib/pq"
k8sdepwatches "github.com/stolostron/kubernetes-dependency-watches/client"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -308,34 +307,6 @@ func MonitorDatabaseConnection(
},
},
}
case *EventDetailsQueued:
complianceEvent := v

err := RecordLocalClusterComplianceEvent(
ctx, complianceServerCtx, complianceEvent.EventDetails(),
)

requeue := errors.Is(err, ErrRetryable)

if requeue {
complianceServerCtx.Queue.Add(request)
}

if err != nil {
log.Info(
"Failed to record the queued compliance event",
"requeue", requeue,
"error", err.Error(),
"eventMessage", complianceEvent.Message,
"policyID", complianceEvent.PolicyID,
)
} else {
log.V(2).Info(
"Recorded the queued compliance event",
"eventMessage", complianceEvent.Message,
"policyID", complianceEvent.PolicyID,
)
}
}

complianceServerCtx.Queue.Done(request)
Expand All @@ -362,56 +333,6 @@ func MonitorDatabaseConnection(
}
}

// RecordLocalClusterComplianceEvent will record the input compliance event. It returns ErrRetryable if the compliance
// event should be requeued to record again later.
func RecordLocalClusterComplianceEvent(
ctx context.Context, complianceServerCtx *ComplianceServerCtx, complianceEvent *EventDetails,
) error {
clusterFK, err := GetClusterForeignKey(
ctx,
complianceServerCtx.DB,
Cluster{ClusterID: complianceServerCtx.ClusterID, Name: "local-cluster"},
)
if err != nil {
return fmt.Errorf(
"%wfailed to get the cluster foreign key to generate a compliance event: %w", ErrRetryable, err,
)
}

complianceEvent.ClusterID = clusterFK

query, args := complianceEvent.InsertQuery()

_, err = complianceServerCtx.DB.ExecContext(ctx, query, args...)
if err != nil {
// If it's a unique constraint violation, then the event is a duplicate and can be ignored. If it's a foreign
// key violation, that means the database experienced data loss and the foreign key is invalid, so the
// compliance event can't be recorded.
var pqErr *pq.Error

if errors.As(err, &pqErr) {
if pqErr.Code == postgresUniqueViolationCode {
return nil
}

if pqErr.Code == postgresForeignKeyViolationCode {
return fmt.Errorf(
"failed to record the compliance event because the foreign keys no longer apply: %w", err,
)
}
}

// If the error was because the database was down, then queue it up for later
if complianceServerCtx.DB.PingContext(ctx) != nil {
return errors.Join(ErrRetryable, ErrDBConnectionFailed)
}

return fmt.Errorf("failed to record the compliance event: %w", err)
}

return nil
}

// MigrateDB will perform a database migration if required and send Kubernetes events if the migration fails.
// ErrDBConnectionFailed will be returned if the database connection failed. Obtain a write lock before calling
// this method if multiple goroutines use this ComplianceServerCtx instance.
Expand Down
1 change: 0 additions & 1 deletion controllers/complianceeventsapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ func init() {

const (
postgresForeignKeyViolationCode = "23503"
postgresUniqueViolationCode = "23505"
)

var (
Expand Down
29 changes: 0 additions & 29 deletions controllers/complianceeventsapi/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,35 +210,6 @@ func (c *Cluster) GetOrCreate(ctx context.Context, db *sql.DB) error {
return getOrCreate(ctx, db, c)
}

// EventDetailsQueued is a slimmed down EventDetails that supports being put in a client-go work queue.
// The client-go work queue rejects an EventDetails object because it is not hashable due to the Metadata field
// using the JSONMap type.
type EventDetailsQueued struct {
ClusterID int32
PolicyID int32
ParentPolicyID int32
Compliance string
Message string
Timestamp time.Time
ReportedBy string
}

func (e *EventDetailsQueued) EventDetails() *EventDetails {
return &EventDetails{
ClusterID: e.ClusterID,
PolicyID: e.PolicyID,
ParentPolicyID: &e.ParentPolicyID,
Compliance: e.Compliance,
Message: e.Message,
Timestamp: e.Timestamp,
ReportedBy: &e.ReportedBy,
}
}

func (e *EventDetailsQueued) InsertQuery() (string, []any) {
return e.EventDetails().InsertQuery()
}

type EventDetails struct {
KeyID int32 `db:"id" json:"-"`
ClusterID int32 `db:"cluster_id" json:"-"`
Expand Down
101 changes: 0 additions & 101 deletions controllers/propagator/replicatedpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strconv"
"strings"
"sync"
"time"

templates "github.com/stolostron/go-template-utils/v4/pkg/templates"
k8sdepwatches "github.com/stolostron/kubernetes-dependency-watches/client"
Expand Down Expand Up @@ -627,111 +626,11 @@ func (r *ReplicatedPolicyReconciler) cleanUpReplicated(ctx context.Context, repl
}
} else {
version.resourceVersion = "deleted"

// Normally the spec-sync controller handles this, however, if it's a self-managed hub policy, the spec-sync
// controller does not run. So this is a special case.
if replicatedPolicy.Namespace == "local-cluster" && r.ComplianceServerCtx.DB != nil {
r.recordDisabledEvents(ctx, replicatedPolicy)
}
}

return errors.Join(watcherErr, deleteErr)
}

// recordDisabledEvents will generate and record disabled compliance events in the compliance history database for each
// policy template. On retryable errors, the generated compliance event is added to the ComplianceServerCtx queue
// for MonitorDatabaseConnection to handle once the database is back up.
func (r *ReplicatedPolicyReconciler) recordDisabledEvents(
ctx context.Context, replicatedPolicy *policiesv1.Policy,
) {
log := log.WithValues("policy", replicatedPolicy.Name)

if replicatedPolicy.Annotations[ParentPolicyIDAnnotation] == "" {
return
}

parentPolicyID, err := strconv.ParseInt(replicatedPolicy.Annotations[ParentPolicyIDAnnotation], 10, 32)
if err != nil {
log.Error(err, "Failed to record a disabled compliance event due to an invalid parent policy ID")

return
}

dbConnectionDown := false

for _, template := range replicatedPolicy.Spec.PolicyTemplates {
plcTmplUnstruct := &unstructured.Unstructured{}

err := plcTmplUnstruct.UnmarshalJSON(template.ObjectDefinition.Raw)
if err != nil {
continue
}

policyIDStr := plcTmplUnstruct.GetAnnotations()[PolicyIDAnnotation]
if policyIDStr == "" {
continue
}

policyID, err := strconv.ParseInt(policyIDStr, 10, 32)
if err != nil {
log.Error(err, "Failed to record a disabled compliance event due to an invalid policy ID")

continue
}

complianceEvent := &complianceeventsapi.EventDetailsQueued{
ParentPolicyID: int32(parentPolicyID),
PolicyID: int32(policyID),
Compliance: "Disabled",
Message: "The policy was removed because the parent policy no longer applies to this cluster",
Timestamp: time.Now().UTC(),
ReportedBy: "governance-policy-framework",
}

if dbConnectionDown {
log.Info(
"Failed to record the compliance event. Will requeue.",
"error", complianceeventsapi.ErrDBConnectionFailed.Error(),
"eventMessage", complianceEvent.Message,
"policyID", complianceEvent.PolicyID,
)

r.ComplianceServerCtx.Queue.Add(complianceEvent)

continue
}

err = complianceeventsapi.RecordLocalClusterComplianceEvent(
ctx, r.ComplianceServerCtx, complianceEvent.EventDetails(),
)

requeue := errors.Is(err, complianceeventsapi.ErrRetryable)
if requeue {
r.ComplianceServerCtx.Queue.Add(complianceEvent)
}

if errors.Is(err, complianceeventsapi.ErrDBConnectionFailed) {
dbConnectionDown = true
}

if err != nil {
log.Info(
"Failed to record the compliance event",
"requeue", requeue,
"error", err.Error(),
"eventMessage", complianceEvent.Message,
"policyID", complianceEvent.PolicyID,
)
} else {
log.V(2).Info(
"Recorded the compliance event",
"eventMessage", complianceEvent.Message,
"policyID", complianceEvent.PolicyID,
)
}
}
}

func (r *ReplicatedPolicyReconciler) singleClusterDecision(
ctx context.Context, rootPlc *policiesv1.Policy, clusterName string,
) (decision clusterDecision, err error) {
Expand Down
82 changes: 1 addition & 81 deletions test/e2e/case20_compliance_api_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"open-cluster-management.io/governance-policy-propagator/test/utils"
)

var _ = Describe("Test governance-policy-database secret changes, DB annotations, and events", Serial, Ordered, func() {
var _ = Describe("Test governance-policy-database secret changes and DB annotations", Serial, Ordered, func() {
const (
case20PolicyName string = "case20-policy"
case20PolicyYAML string = "../resources/case20_compliance_api_controller/policy.yaml"
Expand Down Expand Up @@ -66,48 +66,6 @@ var _ = Describe("Test governance-policy-database secret changes, DB annotations
ExpectWithOffset(1, replicatedPolicy).NotTo(BeNil())
}

waitForDisabledEvent := func(ctx context.Context, after time.Time) {
afterStr := after.Format(time.RFC3339Nano)

By("Waiting for the disabled compliance event after " + afterStr)
EventuallyWithOffset(1, func(g Gomega) {
endpoint := fmt.Sprintf(
"https://localhost:%d/api/v1/compliance-events?cluster.name=local-cluster&event.compliance=Disabled"+
"&event.timestamp_after=%s&policy.name=%s",
complianceAPIPort,
afterStr,
case20PolicyName,
)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
g.Expect(err).ToNot(HaveOccurred())

req.Header.Set("Authorization", "Bearer "+clientToken)

resp, err := httpClient.Do(req)
if err != nil {
return
}

defer resp.Body.Close()

g.Expect(resp.StatusCode).To(Equal(http.StatusOK))

body, err := io.ReadAll(resp.Body)
g.Expect(err).ToNot(HaveOccurred())

result := map[string]any{}

err = json.Unmarshal(body, &result)
g.Expect(err).ToNot(HaveOccurred())

metadata, ok := result["metadata"].(map[string]interface{})
g.Expect(ok).To(BeTrue(), "The metadata key was the wrong type")

g.Expect(metadata["total"]).To(BeEquivalentTo(1))
}, defaultTimeoutSeconds*2, 1).Should(Succeed())
}

BeforeAll(func(ctx context.Context) {
Expect(clientToken).ToNot(BeEmpty(), "Ensure you use the service account kubeconfig (kubeconfig_hub)")

Expand Down Expand Up @@ -183,44 +141,6 @@ var _ = Describe("Test governance-policy-database secret changes, DB annotations
g.Expect(policyID).ToNot(BeEmpty())
}, defaultTimeoutSeconds, 1).Should(Succeed())
})

It("Creates a disabled event for local-cluster", func(ctx context.Context) {
now := time.Now().UTC().Add(-1 * time.Second)

By("Deleting " + case20PolicyName)
utils.Kubectl("delete", "-f", case20PolicyYAML, "-n", nsName, "--kubeconfig="+kubeconfigHub)
plc := utils.GetWithTimeout(
clientHubDynamic, gvrPolicy, case20PolicyName, nsName, false, defaultTimeoutSeconds,
)
Expect(plc).To(BeNil())

waitForDisabledEvent(ctx, now)
})

It("Creates a disabled event for local-cluster when the database is down and restored", func(ctx context.Context) {
createCase20Policy(ctx)

bringDownDBConnection(ctx)

now := time.Now().UTC().Add(-1 * time.Second)

By("Deleting " + case20PolicyName)
utils.Kubectl("delete", "-f", case20PolicyYAML, "-n", nsName, "--kubeconfig="+kubeconfigHub)
plc := utils.GetWithTimeout(
clientHubDynamic, gvrPolicy, case20PolicyName, nsName, false, defaultTimeoutSeconds,
)
Expect(plc).To(BeNil())

By("Waiting for the replicated policy to be deleted")
replicatedPolicy := utils.GetWithTimeout(
clientHubDynamic, gvrPolicy, case20PolicyName, nsName, false, defaultTimeoutSeconds,
)
Expect(replicatedPolicy).To(BeNil())

restoreDBConnection(ctx)

waitForDisabledEvent(ctx, now)
})
})

func bringDownDBConnection(ctx context.Context) {
Expand Down

0 comments on commit 0a78fa3

Please sign in to comment.