Skip to content
This repository has been archived by the owner on Apr 25, 2023. It is now read-only.

Infinite reconciliation loop rawstatuscollection #1378

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/ghodss/yaml v1.0.0
github.com/go-logr/logr v0.3.0 // indirect
github.com/go-logr/zapr v0.2.0 // indirect
github.com/google/go-cmp v0.5.2
github.com/json-iterator/go v1.1.10
github.com/onsi/ginkgo v1.14.2
github.com/onsi/gomega v1.10.3
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/sync/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ func (s *KubeFedSyncController) syncToClusters(fedResource FederatedResource) ut
// Enable raw resource status collection if the statusCollection is enabled for that type
// and the feature is also enabled.
enableRawResourceStatusCollection := s.typeConfig.GetStatusEnabled() && s.rawResourceStatusCollection
klog.V(4).Infof("typeconfigstatus for %v enabled: %v, rawresstatus: %v", fedResource.FederatedName(), s.typeConfig.GetStatusEnabled(), enableRawResourceStatusCollection)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll rephrase this entry to "federated status for '%q' enabled '%b' with resource status collection enabled: '%b'"


clusters, err := s.informer.GetClusters()
if err != nil {
Expand Down Expand Up @@ -387,7 +388,8 @@ func (s *KubeFedSyncController) syncToClusters(fedResource FederatedResource) ut
}

collectedStatus, collectedResourceStatus := dispatcher.CollectedStatus()
klog.V(4).Infof("Setting the federated status '%v'", collectedResourceStatus)
klog.V(4).Infof("Setting the federated status '%v' for %s %q", collectedResourceStatus, kind, key)
klog.V(4).Infof("Collected status '%v' for %s %q", collectedStatus, kind, key)
return s.setFederatedStatus(fedResource, status.AggregateSuccess, &collectedStatus, &collectedResourceStatus, enableRawResourceStatusCollection)
}

Expand Down
50 changes: 42 additions & 8 deletions pkg/controller/sync/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"reflect"
"time"

"github.com/google/go-cmp/cmp"

"github.com/pkg/errors"

apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -125,15 +127,23 @@ type CollectedResourceStatus struct {
// whether status should be written to the API.
func SetFederatedStatus(fedObject *unstructured.Unstructured, reason AggregateReason, collectedStatus CollectedPropagationStatus, collectedResourceStatus CollectedResourceStatus, resourceStatusCollection bool) (bool, error) {
resource := &GenericFederatedResource{}

err := util.UnstructuredToInterface(fedObject, resource)
if err != nil {
return false, errors.Wrapf(err, "Failed to unmarshall to generic resource")
}

normalizedCollectedResourceStatus, err := normalizeStatus(collectedResourceStatus)
if err != nil {
return false, errors.Wrap(err, "Failed to normalize status")
}

if resource.Status == nil {
resource.Status = &GenericFederatedStatus{}
}

changed := resource.Status.update(fedObject.GetGeneration(), reason, collectedStatus, collectedResourceStatus, resourceStatusCollection)
changed := resource.Status.update(fedObject.GetGeneration(), reason, collectedStatus, *normalizedCollectedResourceStatus, resourceStatusCollection)
//changed := resource.Status.update(fedObject.GetGeneration(), reason, collectedStatus, collectedResourceStatus, resourceStatusCollection)
if !changed {
return false, nil
}
Expand Down Expand Up @@ -177,7 +187,7 @@ func (s *GenericFederatedStatus) update(generation int64, reason AggregateReason
}
}

clustersChanged := s.setClusters(collectedStatus.StatusMap, collectedResourceStatus.StatusMap)
clustersChanged := s.setClusters(collectedStatus.StatusMap, collectedResourceStatus.StatusMap, resourceStatusCollection)

// Indicate that changes were propagated if either status.clusters
// was changed or if existing resources were updated (which could
Expand All @@ -196,8 +206,8 @@ func (s *GenericFederatedStatus) update(generation int64, reason AggregateReason
// setClusters sets the status.clusters slice from propagation and resource status
// maps. Returns a boolean indication of whether the status.clusters was
// modified.
func (s *GenericFederatedStatus) setClusters(statusMap PropagationStatusMap, resourceStatusMap map[string]interface{}) bool {
if !s.clustersDiffer(statusMap, resourceStatusMap) {
func (s *GenericFederatedStatus) setClusters(statusMap PropagationStatusMap, resourceStatusMap map[string]interface{}, resourceStatusCollection bool) bool {
if !s.clustersDiffer(statusMap, resourceStatusMap, resourceStatusCollection) {
return false
}
s.Clusters = []GenericClusterStatus{}
Expand All @@ -214,17 +224,19 @@ func (s *GenericFederatedStatus) setClusters(statusMap PropagationStatusMap, res

// clustersDiffer checks whether `status.clusters` differs from the
// given status map.
func (s *GenericFederatedStatus) clustersDiffer(statusMap PropagationStatusMap, resourceStatusMap map[string]interface{}) bool {
if len(s.Clusters) != len(statusMap) || len(s.Clusters) != len(resourceStatusMap) {
klog.V(4).Info("Clusters differs from the size")
func (s *GenericFederatedStatus) clustersDiffer(statusMap PropagationStatusMap, resourceStatusMap map[string]interface{}, resourceStatusCollection bool) bool {
if len(s.Clusters) != len(statusMap) || resourceStatusCollection && (len(s.Clusters) != len(resourceStatusMap)) {
klog.V(4).Infof("Clusters differs from the size: clusters = %v, statusMap = %v, resourceStatusMap = %v", s.Clusters, statusMap, resourceStatusMap)
return true
}
for _, status := range s.Clusters {
if statusMap[status.Name] != status.Status {
return true
}
if !reflect.DeepEqual(resourceStatusMap[status.Name], status.RemoteStatus) {
klog.V(4).Infof("Clusters resource status differ: %v VS %v", resourceStatusMap[status.Name], status.RemoteStatus)
diff := cmp.Diff(resourceStatusMap[status.Name], status.RemoteStatus)
klog.V(4).Infof("Clusters resource status differ: %#v VS %#v", resourceStatusMap[status.Name], status.RemoteStatus)
klog.V(4).Infof("Diff between clusters resources: %s", diff)
return true
}
}
Expand Down Expand Up @@ -278,3 +290,25 @@ func (s *GenericFederatedStatus) setPropagationCondition(reason AggregateReason,

return updateRequired
}

func normalizeStatus(collectedResourceStatus CollectedResourceStatus) (*CollectedResourceStatus, error) {
cleanedStatus := CollectedResourceStatus{
StatusMap: map[string]interface{}{},
ResourcesUpdated: collectedResourceStatus.ResourcesUpdated,
}

for key, value := range collectedResourceStatus.StatusMap {
content, err := json.Marshal(value)
if err != nil {
return nil, errors.Wrapf(err, "Failed to marshall collected resource status for cluster %s", key)
}
var status interface{}
err = json.Unmarshal(content, &status)
if err != nil {
return nil, errors.Wrapf(err, "Failed to unmarshall collected resource status as interface for cluster %s", key)
}
cleanedStatus.StatusMap[key] = status
}

return &cleanedStatus, nil
}
175 changes: 147 additions & 28 deletions pkg/controller/sync/status/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,60 +24,179 @@ import (

func TestGenericPropagationStatusUpdateChanged(t *testing.T) {
testCases := map[string]struct {
generation int64
reason AggregateReason
statusMap PropagationStatusMap
resourceStatusMap map[string]interface{}
resourcesUpdated bool
expectedChanged bool
generation int64
reason AggregateReason
statusMap PropagationStatusMap
resourceStatusMap map[string]interface{}
remoteStatus interface{}
resourcesUpdated bool
expectedChanged bool
resourceStatusCollection bool
}{
"No change in clusters indicates unchanged": {
"Cluster not propagated indicates changed with status collected enabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterNotReady,
},
resourceStatusMap: map[string]interface{}{
"cluster1": map[string]interface{}{},
},
reason: AggregateSuccess,
resourcesUpdated: false,
resourceStatusCollection: true,
expectedChanged: true,
},
"Cluster not propagated indicates changed with status collected disabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterNotReady,
},
resourceStatusMap: map[string]interface{}{
"cluster1": map[string]interface{}{},
},
reason: AggregateSuccess,
resourcesUpdated: false,
resourceStatusCollection: false,
expectedChanged: true,
},
"Cluster status not retrieved indicates changed with status collected enabled": {
statusMap: PropagationStatusMap{},
reason: AggregateSuccess,
resourcesUpdated: false,
resourceStatusCollection: true,
expectedChanged: true,
},
"Cluster status not retrieved indicates changed with status collected disabled": {
statusMap: PropagationStatusMap{},
reason: AggregateSuccess,
resourcesUpdated: false,
resourceStatusCollection: false,
expectedChanged: true,
},
"No collected remote status indicates changed with status collected enabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
},
reason: AggregateSuccess,
resourcesUpdated: false,
resourceStatusCollection: true,
expectedChanged: true,
},
"No collected remote status indicates unchanged with status collected disabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
},
reason: AggregateSuccess,
resourcesUpdated: false,
resourceStatusCollection: false,
expectedChanged: false,
},
"No change in clusters indicates unchanged with status collected enabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
},
resourceStatusMap: map[string]interface{}{
"cluster1": map[string]interface{}{},
},
resourcesUpdated: false,
resourceStatusCollection: true,
expectedChanged: true,
},
"No change in clusters indicates unchanged with status collected disabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
},
resourceStatusMap: map[string]interface{}{
"cluster1": map[string]interface{}{},
},
resourcesUpdated: false,
resourceStatusCollection: false,
expectedChanged: true,
},
"No change in clusters with update indicates changed with status collected enabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
},
resourceStatusMap: map[string]interface{}{
"ready": false,
"stage": "absent",
"cluster1": map[string]interface{}{},
},
resourcesUpdated: false,
expectedChanged: true,
resourcesUpdated: true,
resourceStatusCollection: true,
expectedChanged: true,
},
"No change in clusters with update indicates changed": {
"No change in clusters with update indicates changed with status collected disabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
},
resourceStatusMap: map[string]interface{}{
"ready": false,
"stage": "absent",
"cluster1": map[string]interface{}{},
},
resourcesUpdated: true,
expectedChanged: true,
resourcesUpdated: true,
resourceStatusCollection: false,
expectedChanged: true,
},
"Change in clusters indicates changed": {
"No change in remote status indicates unchanged with status collected enabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
},
resourceStatusMap: map[string]interface{}{
"ready": true,
"stage": "deployed",
"cluster1": map[string]interface{}{
"status": map[string]interface{}{
"status": "remoteStatus",
},
},
},
expectedChanged: true,
remoteStatus: map[string]interface{}{
"status": map[string]interface{}{
"status": "remoteStatus",
},
},
resourcesUpdated: false,
resourceStatusCollection: true,
expectedChanged: false,
},
"Change in clusters indicates changed with status collected enabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
"cluster2": ClusterPropagationOK,
},
resourceStatusCollection: true,
expectedChanged: true,
},
"Change in clusters indicates changed with status collected disabled": {
statusMap: PropagationStatusMap{
"cluster1": ClusterPropagationOK,
"cluster2": ClusterPropagationOK,
},
resourceStatusCollection: false,
expectedChanged: true,
},
"Transition indicates changed with remote status collection enabled": {
reason: NamespaceNotFederated,
resourceStatusCollection: true,
expectedChanged: true,
},
"Transition indicates changed with remote status collection disabled": {
reason: NamespaceNotFederated,
resourceStatusCollection: false,
expectedChanged: true,
},
"Transition indicates changed": {
reason: NamespaceNotFederated,
expectedChanged: true,
"Changed generation indicates changed with remote status collection enabled": {
generation: 1,
resourceStatusCollection: true,
expectedChanged: true,
},
"Changed generation indicates changed": {
generation: 1,
expectedChanged: true,
"Changed generation indicates changed with remote status collection disabled": {
generation: 1,
resourceStatusCollection: false,
expectedChanged: true,
},
}
for testName, tc := range testCases {
t.Run(testName, func(t *testing.T) {
fedStatus := &GenericFederatedStatus{
Clusters: []GenericClusterStatus{
{
Name: "cluster1",
Name: "cluster1",
RemoteStatus: tc.remoteStatus,
},
},
Conditions: []*GenericCondition{
Expand All @@ -95,7 +214,7 @@ func TestGenericPropagationStatusUpdateChanged(t *testing.T) {
StatusMap: tc.resourceStatusMap,
ResourcesUpdated: tc.resourcesUpdated,
}
changed := fedStatus.update(tc.generation, tc.reason, collectedStatus, collectedResourceStatus, true)
changed := fedStatus.update(tc.generation, tc.reason, collectedStatus, collectedResourceStatus, tc.resourceStatusCollection)
if tc.expectedChanged != changed {
t.Fatalf("Expected changed to be %v, got %v", tc.expectedChanged, changed)
}
Expand Down