From 8b92dd7dddedfde4662d72582727fc5e96b0c649 Mon Sep 17 00:00:00 2001 From: Ryan Zhang Date: Mon, 1 Apr 2024 17:59:51 -0700 Subject: [PATCH] rebase --- cmd/hubagent/main.go | 5 +- cmd/hubagent/workload/setup.go | 6 +- go.mod | 43 +++--- go.sum | 20 +++ .../resource_selector.go | 12 +- .../resourcechange_controller.go | 2 +- pkg/utils/informer/informermanager.go | 137 +++++++++++++----- 7 files changed, 156 insertions(+), 69 deletions(-) diff --git a/cmd/hubagent/main.go b/cmd/hubagent/main.go index 4f297ed05..4d688b847 100644 --- a/cmd/hubagent/main.go +++ b/cmd/hubagent/main.go @@ -17,6 +17,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/klog/v2" + apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/healthz" @@ -27,6 +28,7 @@ import ( workv1alpha1 "sigs.k8s.io/work-api/pkg/apis/v1alpha1" fleetnetworkingv1alpha1 "go.goms.io/fleet-networking/api/v1alpha1" + clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1" placementv1alpha1 "go.goms.io/fleet/apis/placement/v1alpha1" placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" @@ -66,6 +68,7 @@ func init() { utilruntime.Must(apiextensionsv1.AddToScheme(scheme)) utilruntime.Must(fleetnetworkingv1alpha1.AddToScheme(scheme)) utilruntime.Must(placementv1alpha1.AddToScheme(scheme)) + utilruntime.Must(apiregistrationv1.AddToScheme(scheme)) // +kubebuilder:scaffold:scheme klog.InitFlags(nil) @@ -156,7 +159,7 @@ func main() { } ctx := ctrl.SetupSignalHandler() - if err := workload.SetupControllers(ctx, &wg, mgr, config, opts); err != nil { + if err := workload.SetupControllers(ctx, scheme, &wg, mgr, config, opts); err != nil { klog.ErrorS(err, "unable to set up ready check") exitWithErrorFunc() } diff --git a/cmd/hubagent/workload/setup.go b/cmd/hubagent/workload/setup.go index 50625db0b..3014f4656 100644 --- a/cmd/hubagent/workload/setup.go +++ b/cmd/hubagent/workload/setup.go @@ -11,6 +11,8 @@ import ( "strings" "sync" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" @@ -76,7 +78,7 @@ var ( ) // SetupControllers set up the customized controllers we developed -func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager, config *rest.Config, opts *options.Options) error { +func SetupControllers(ctx context.Context, scheme *runtime.Scheme, wg *sync.WaitGroup, mgr ctrl.Manager, config *rest.Config, opts *options.Options) error { dynamicClient, err := dynamic.NewForConfig(config) if err != nil { klog.ErrorS(err, "unable to create the dynamic client") @@ -132,7 +134,7 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager, } // the manager for all the dynamically created informers - dynamicInformerManager := informer.NewInformerManager(dynamicClient, opts.ResyncPeriod.Duration, ctx.Done()) + dynamicInformerManager := informer.NewInformerManager(dynamicClient, scheme, opts.ResyncPeriod.Duration, ctx.Done()) validator.ResourceInformer = dynamicInformerManager // webhook needs this to check resource scope // Set up a custom controller to reconcile cluster resource placement diff --git a/go.mod b/go.mod index 9d847c966..debd3d42a 100644 --- a/go.mod +++ b/go.mod @@ -1,18 +1,18 @@ module go.goms.io/fleet -go 1.20 +go 1.21 require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.0 github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.1 github.com/Azure/karpenter v0.2.0 github.com/crossplane/crossplane-runtime v0.20.1 - github.com/go-logr/logr v1.3.0 + github.com/go-logr/logr v1.4.1 github.com/google/go-cmp v0.6.0 - github.com/onsi/ginkgo/v2 v2.13.0 - github.com/onsi/gomega v1.29.0 + github.com/onsi/ginkgo/v2 v2.14.0 + github.com/onsi/gomega v1.30.0 github.com/openkruise/kruise v1.4.0 - github.com/prometheus/client_golang v1.17.0 + github.com/prometheus/client_golang v1.18.0 github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.4 @@ -22,15 +22,16 @@ require ( golang.org/x/exp v0.0.0-20231006140011-7918f672742d golang.org/x/sync v0.5.0 golang.org/x/time v0.4.0 - k8s.io/api v0.28.3 - k8s.io/apiextensions-apiserver v0.28.3 - k8s.io/apimachinery v0.28.3 - k8s.io/client-go v0.28.3 - k8s.io/component-base v0.28.3 + k8s.io/api v0.29.0 + k8s.io/apiextensions-apiserver v0.29.0 + k8s.io/apimachinery v0.29.0 + k8s.io/client-go v0.29.0 + k8s.io/component-base v0.29.0 k8s.io/klog/v2 v2.110.1 + k8s.io/kube-aggregator v0.29.0 k8s.io/metrics v0.25.2 k8s.io/utils v0.0.0-20230726121419-3b25d923346b - sigs.k8s.io/controller-runtime v0.16.3 + sigs.k8s.io/controller-runtime v0.17.2 sigs.k8s.io/work-api v0.0.0-20220407021756-586d707fdb2c ) @@ -45,8 +46,8 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/evanphx/json-patch v5.6.0+incompatible // indirect - github.com/evanphx/json-patch/v5 v5.7.0 // indirect - github.com/fsnotify/fsnotify v1.6.0 // indirect + github.com/evanphx/json-patch/v5 v5.8.0 // indirect + github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-logr/zapr v1.3.0 // indirect github.com/go-openapi/jsonpointer v0.20.0 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect @@ -76,17 +77,17 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.5.0 // indirect - github.com/prometheus/common v0.44.0 // indirect + github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/samber/lo v1.38.1 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.14.0 // indirect - golang.org/x/net v0.17.0 // indirect + golang.org/x/crypto v0.16.0 // indirect + golang.org/x/net v0.19.0 // indirect golang.org/x/oauth2 v0.13.0 // indirect - golang.org/x/sys v0.13.0 // indirect - golang.org/x/term v0.13.0 // indirect + golang.org/x/sys v0.16.0 // indirect + golang.org/x/term v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect - golang.org/x/tools v0.14.0 // indirect + golang.org/x/tools v0.16.1 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/protobuf v1.33.0 // indirect @@ -96,8 +97,8 @@ require ( k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect knative.dev/pkg v0.0.0-20231010144348-ca8c009405dd // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect - sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect - sigs.k8s.io/yaml v1.3.0 // indirect + sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect + sigs.k8s.io/yaml v1.4.0 // indirect ) replace ( diff --git a/go.sum b/go.sum index 800959f40..875ad7c2d 100644 --- a/go.sum +++ b/go.sum @@ -54,11 +54,14 @@ github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCv github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch/v5 v5.7.0 h1:nJqP7uwL84RJInrohHfW0Fx3awjbm8qZeFv0nW9SYGc= github.com/evanphx/json-patch/v5 v5.7.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= +github.com/evanphx/json-patch/v5 v5.8.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/zapr v1.3.0 h1:XGdV8XW8zdwFiwOA2Dryh1gj2KRQyOOoNmBy4EplIcQ= github.com/go-logr/zapr v1.3.0/go.mod h1:YKepepNBd1u/oyhd/yQmtjVXmm9uML4IXUgMOwR8/Gg= @@ -135,8 +138,10 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4= github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o= +github.com/onsi/ginkgo/v2 v2.14.0/go.mod h1:JkUdW7JkN0V6rFvsHcJ478egV3XH9NxpD27Hal/PhZw= github.com/onsi/gomega v1.29.0 h1:KIA/t2t5UBzoirT4H9tsML45GEbo3ouUnBHsCfD2tVg= github.com/onsi/gomega v1.29.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ= +github.com/onsi/gomega v1.30.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ= github.com/openkruise/kruise v1.4.0 h1:1OTosvEat+2sE59jaj3sqicdkHqMkTrafq0mSaHBLYs= github.com/openkruise/kruise v1.4.0/go.mod h1:b6DHo7c3d8pDyIoT13/dCDiuAn6F8fZR1644c9Dui1I= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= @@ -150,10 +155,12 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= +github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA= github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY= github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= +github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= @@ -209,6 +216,7 @@ golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= golang.org/x/oauth2 v0.13.0 h1:jDDenyj+WgFtmV3zYVoi8aE2BwtXFLWOA67ZfNWftiY= golang.org/x/oauth2 v0.13.0/go.mod h1:/JMhi4ZRXAf4HG9LiNmxvk+45+96RUlVThiH8FzNBn0= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -229,10 +237,12 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -249,6 +259,7 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc= golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg= +golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -274,19 +285,25 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= k8s.io/api v0.28.3 h1:Gj1HtbSdB4P08C8rs9AR94MfSGpRhJgsS+GF9V26xMM= k8s.io/api v0.28.3/go.mod h1:MRCV/jr1dW87/qJnZ57U5Pak65LGmQVkKTzf3AtKFHc= +k8s.io/api v0.29.0/go.mod h1:sdVmXoz2Bo/cb77Pxi71IPTSErEW32xa4aXwKH7gfBA= k8s.io/apiextensions-apiserver v0.28.3 h1:Od7DEnhXHnHPZG+W9I97/fSQkVpVPQx2diy+2EtmY08= k8s.io/apiextensions-apiserver v0.28.3/go.mod h1:NE1XJZ4On0hS11aWWJUTNkmVB03j9LM7gJSisbRt8Lc= +k8s.io/apiextensions-apiserver v0.29.0/go.mod h1:TKmpy3bTS0mr9pylH0nOt/QzQRrW7/h7yLdRForMZwc= k8s.io/apimachinery v0.28.3 h1:B1wYx8txOaCQG0HmYF6nbpU8dg6HvA06x5tEffvOe7A= k8s.io/apimachinery v0.28.3/go.mod h1:uQTKmIqs+rAYaq+DFaoD2X7pcjLOqbQX2AOiO0nIpb8= +k8s.io/apimachinery v0.29.0/go.mod h1:eVBxQ/cwiJxH58eK/jd/vAk4mrxmVlnpBH5J2GbMeis= k8s.io/client-go v0.28.3 h1:2OqNb72ZuTZPKCl+4gTKvqao0AMOl9f3o2ijbAj3LI4= k8s.io/client-go v0.28.3/go.mod h1:LTykbBp9gsA7SwqirlCXBWtK0guzfhpoW4qSm7i9dxo= +k8s.io/client-go v0.29.0/go.mod h1:yLkXH4HKMAywcrD82KMSmfYg2DlE8mepPR4JGSo5n38= k8s.io/cloud-provider v0.28.3 h1:9u+JjA3zIn0nqLOOa8tWnprFkffguSAhfBvo8p7LhBQ= k8s.io/component-base v0.28.3 h1:rDy68eHKxq/80RiMb2Ld/tbH8uAE75JdCqJyi6lXMzI= k8s.io/component-base v0.28.3/go.mod h1:fDJ6vpVNSk6cRo5wmDa6eKIG7UlIQkaFmZN2fYgIUD8= +k8s.io/component-base v0.29.0/go.mod h1:sADonFTQ9Zc9yFLghpDpmNXEdHyQmFIGbiuZbqAXQ1M= k8s.io/component-helpers v0.28.3 h1:te9ieTGzcztVktUs92X53P6BamAoP73MK0qQP0WmDqc= k8s.io/csi-translation-lib v0.28.3 h1:7deV+HZjV418AGikSDPW8dyzTpm4K3tNbQUp3KmR7cs= k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0= k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo= +k8s.io/kube-aggregator v0.29.0/go.mod h1:bjatII63ORkFg5yUFP2qm2OC49R0wwxZhRVIyJ4Z4X0= k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/AuzbMm96cd3YHRTU83I780= k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA= k8s.io/metrics v0.25.2 h1:105TuPaIFfr4EHzN56WwZJO7r1UesuDytNTzeMqGySo= @@ -298,9 +315,12 @@ knative.dev/pkg v0.0.0-20231010144348-ca8c009405dd/go.mod h1:36cYnaOVHkzmhgybmYX sigs.k8s.io/cloud-provider-azure v1.28.2 h1:KKrWdC1+p2xXdT1VRmSkT57MhKNzPXk3yPcrwUDIr5I= sigs.k8s.io/controller-runtime v0.16.3 h1:2TuvuokmfXvDUamSx1SuAOO3eTyye+47mJCigwG62c4= sigs.k8s.io/controller-runtime v0.16.3/go.mod h1:j7bialYoSn142nv9sCOJmQgDXQXxnroFU4VnX/brVJ0= +sigs.k8s.io/controller-runtime v0.17.2/go.mod h1:+MngTvIQQQhfXtwfdGw/UOQ/aIaqsYywfCINOtwMO/s= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/structured-merge-diff/v4 v4.3.0 h1:UZbZAZfX0wV2zr7YZorDz6GXROfDFj6LvqCRm4VUVKk= sigs.k8s.io/structured-merge-diff/v4 v4.3.0/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= +sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= +sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= diff --git a/pkg/controllers/clusterresourceplacement/resource_selector.go b/pkg/controllers/clusterresourceplacement/resource_selector.go index 38b041b9a..81744e886 100644 --- a/pkg/controllers/clusterresourceplacement/resource_selector.go +++ b/pkg/controllers/clusterresourceplacement/resource_selector.go @@ -143,10 +143,10 @@ func (r *Reconciler) fetchClusterScopedResources(selector fleetv1beta1.ClusterRe Kind: selector.Kind, } if !r.InformerManager.IsClusterScopedResources(gvk) { - return nil, controller.NewUserError(fmt.Errorf("invalid placement %s: %+v is not a cluster scoped resource", placeName, restMapping.Resource)) + return nil, controller.NewUserError(fmt.Errorf("invalid placement %s: %+v is not a cluster scoped resource", placeName, gvk)) } - if !r.InformerManager.IsInformerSynced(gvr) { - return nil, controller.NewExpectedBehaviorError(fmt.Errorf("informer cache for %+v is not synced yet", restMapping.Resource)) + if !r.InformerManager.IsInformerSynced(gvk) { + return nil, controller.NewExpectedBehaviorError(fmt.Errorf("informer cache for %+v is not synced yet", gvk)) } lister := r.InformerManager.Lister(gvr) @@ -273,9 +273,9 @@ func (r *Reconciler) fetchAllResourcesInOneNamespace(namespaceName string, place if !r.shouldSelectResource(gvr) { continue } - if !r.InformerManager.IsInformerSynced(gvr) { - return nil, controller.NewExpectedBehaviorError(fmt.Errorf("informer cache for %+v is not synced yet", gvr)) - } + //if !r.InformerManager.IsInformerSynced(gvr) { + // return nil, controller.NewExpectedBehaviorError(fmt.Errorf("informer cache for %+v is not synced yet", gvr)) + //} lister := r.InformerManager.Lister(gvr) objs, err := lister.ByNamespace(namespaceName).List(labels.Everything()) if err != nil { diff --git a/pkg/controllers/resourcechange/resourcechange_controller.go b/pkg/controllers/resourcechange/resourcechange_controller.go index 81d1964e5..9c5b01643 100644 --- a/pkg/controllers/resourcechange/resourcechange_controller.go +++ b/pkg/controllers/resourcechange/resourcechange_controller.go @@ -188,7 +188,7 @@ func (r *Reconciler) getUnstructuredObject(objectKey keys.ClusterWideKey) (runti } gvr := restMapping.Resource isClusterScoped := r.InformerManager.IsClusterScopedResources(objectKey.GroupVersionKind()) - if !r.InformerManager.IsInformerSynced(gvr) { + if !r.InformerManager.IsInformerSynced(objectKey.GroupVersionKind()) { return nil, isClusterScoped, fmt.Errorf("informer cache for %+v is not synced yet", restMapping.Resource) } var obj runtime.Object diff --git a/pkg/utils/informer/informermanager.go b/pkg/utils/informer/informermanager.go index e9bf107c4..81075cc74 100644 --- a/pkg/utils/informer/informermanager.go +++ b/pkg/utils/informer/informermanager.go @@ -8,14 +8,20 @@ package informer import ( "context" "fmt" + "k8s.io/apimachinery/pkg/runtime" "sync" "time" + "sigs.k8s.io/controller-runtime/pkg/client/config" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" - "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + + Cache "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" ) // InformerManager manages dynamic shared informer for all resources, include Kubernetes resource and @@ -33,7 +39,7 @@ type Manager interface { AddStaticResource(resource APIResourceMeta, handler cache.ResourceEventHandler) // IsInformerSynced checks if the resource's informer is synced. - IsInformerSynced(resource schema.GroupVersionResource) bool + IsInformerSynced(resource schema.GroupVersionKind) bool // Start will run all informers, the informers will keep running until the channel closed. // It is intended to be called after create new informer(s), and it's safe to call multi times. @@ -61,15 +67,23 @@ type Manager interface { // NewInformerManager constructs a new instance of informerManagerImpl. // defaultResync with value '0' means no re-sync. -func NewInformerManager(client dynamic.Interface, defaultResync time.Duration, parentCh <-chan struct{}) Manager { +func NewInformerManager(client dynamic.Interface, scheme *runtime.Scheme, defaultResync time.Duration, parentCh <-chan struct{}) Manager { // TODO: replace this with plain context ctx, cancel := ContextForChannel(parentCh) + + // Get the rest config + cfg := config.GetConfigOrDie() + c, err := Cache.New(cfg, Cache.Options{Scheme: scheme, SyncPeriod: &defaultResync}) + if err != nil { + klog.V(2).ErrorS(err, "Failed to create informer cache") + } return &informerManagerImpl{ - dynamicClient: client, - ctx: ctx, - cancel: cancel, - informerFactory: dynamicinformer.NewDynamicSharedInformerFactory(client, defaultResync), - apiResources: make(map[schema.GroupVersionKind]*APIResourceMeta), + dynamicClient: client, + ctx: ctx, + cancel: cancel, + cache: c, + scheme: scheme, + apiResources: make(map[schema.GroupVersionKind]*APIResourceMeta), } } @@ -87,9 +101,8 @@ type APIResourceMeta struct { // isStaticResource indicates if the resource is a static resource that won't be deleted. isStaticResource bool - // isPresent indicates if the resource is still present in the system. We need this because - // the dynamicInformerFactory does not support a good way to remove/stop an informer. - isPresent bool + // Registration is the resource event handler registration after it has been added. + Registration cache.ResourceEventHandlerRegistration } // informerManagerImpl implements the InformerManager interface @@ -101,9 +114,8 @@ type informerManagerImpl struct { ctx context.Context cancel context.CancelFunc - // informerFactory is the client-go built-in informer factory that can create an informer given a gvr. - informerFactory dynamicinformer.DynamicSharedInformerFactory - + cache Cache.Cache + scheme *runtime.Scheme // the apiResources map collects all the api resources we watch apiResources map[schema.GroupVersionKind]*APIResourceMeta resourcesLock sync.RWMutex @@ -111,22 +123,27 @@ type informerManagerImpl struct { func (s *informerManagerImpl) AddDynamicResources(dynResources []APIResourceMeta, handler cache.ResourceEventHandler, listComplete bool) { newGVKs := make(map[schema.GroupVersionKind]bool, len(dynResources)) - addInformerFunc := func(newRes APIResourceMeta) { - dynRes, exist := s.apiResources[newRes.GroupVersionKind] + _, exist := s.apiResources[newRes.GroupVersionKind] if !exist { - newRes.isPresent = true + // TODO: how to add GVK to scheme? + informer, err := s.cache.GetInformerForKind(s.ctx, newRes.GroupVersionKind) + if err != nil { + klog.ErrorS(err, "Failed to create informer for resource", "gvk", newRes.GroupVersionKind, "err", err) + return + } + + // if AddEventHandler returns an error, it is because the informer has stopped and cannot be restarted + if newRes.Registration, err = informer.AddEventHandler(handler); err != nil { + if s.ctx.Err() != nil { + // context is done, so the error is expected + return + } + panic(err) + } + s.apiResources[newRes.GroupVersionKind] = &newRes - // TODO (rzhang): remember the ResourceEventHandlerRegistration and remove it when the resource is deleted - // TODO: handle error which only happens if the informer is stopped - _, _ = s.informerFactory.ForResource(newRes.GroupVersionResource).Informer().AddEventHandler(handler) - klog.InfoS("Added an informer for a new resource", "res", newRes) - } else if !dynRes.isPresent { - // we just mark it as enabled as we should not add another eventhandler to the informer as it's still - // in the informerFactory - // TODO: add the Event handler back - dynRes.isPresent = true - klog.InfoS("Reactivated an informer for a reappeared resource", "res", dynRes) + klog.V(2).InfoS("Added an informer for a new resource", "res", s.apiResources[newRes.GroupVersionKind]) } } @@ -145,13 +162,37 @@ func (s *informerManagerImpl) AddDynamicResources(dynResources []APIResourceMeta } // mark the disappeared dynResources from the handler map + var keysToDelete []schema.GroupVersionKind for gvk, dynRes := range s.apiResources { - if !newGVKs[gvk] && !dynRes.isStaticResource && dynRes.isPresent { - // TODO: Remove the Event handler from the informer using the resourceEventHandlerRegistration during creat - dynRes.isPresent = false - klog.InfoS("Disabled an informer for a disappeared resource", "res", dynRes) + if !newGVKs[gvk] && !dynRes.isStaticResource { + informer, err := s.cache.GetInformerForKind(s.ctx, dynRes.GroupVersionKind) + if err != nil { + klog.ErrorS(err, "Failed to get informer for resource", "gvk", dynRes.GroupVersionKind, "err", err) + return + } + if err := informer.RemoveEventHandler(dynRes.Registration); err != nil { + if s.ctx.Err() == nil { + // context is not done, so the error is unexpected + panic(err) + } + return + } + obj, err := s.scheme.New(gvk) + if err != nil { + klog.V(2).ErrorS(err, "Could not get object of a disappeared resource", "res", dynRes) + } + err = s.cache.RemoveInformer(s.ctx, obj.(client.Object)) + if err != nil { + klog.V(2).ErrorS(err, "Could not remove informer manager for resource", "res", dynRes) + } + klog.V(2).InfoS("Disabled an informer for a disappeared resource", "res", dynRes) + keysToDelete = append(keysToDelete, gvk) } } + // delete the disappeared resources from the map + for _, gvk := range keysToDelete { + delete(s.apiResources, gvk) + } } func (s *informerManagerImpl) AddStaticResource(resource APIResourceMeta, handler cache.ResourceEventHandler) { @@ -165,20 +206,40 @@ func (s *informerManagerImpl) AddStaticResource(resource APIResourceMeta, handle resource.isStaticResource = true s.apiResources[resource.GroupVersionKind] = &resource - _, _ = s.informerFactory.ForResource(resource.GroupVersionResource).Informer().AddEventHandler(handler) + informer, err := s.cache.GetInformerForKind(s.ctx, resource.GroupVersionKind) + if err != nil { + klog.V(2).ErrorS(err, "Failed to create informer for resource", "gvk", resource.GroupVersionKind) + } + resource.Registration, err = informer.AddEventHandler(handler) + if err != nil { + klog.V(2).ErrorS(err, "Failed to add event handler for resource", "gvk", resource.GroupVersionKind) + } } -func (s *informerManagerImpl) IsInformerSynced(resource schema.GroupVersionResource) bool { +func (s *informerManagerImpl) IsInformerSynced(resource schema.GroupVersionKind) bool { // TODO: use a lazy initialized sync map to reduce the number of informer sync look ups - return s.informerFactory.ForResource(resource).Informer().HasSynced() + informer, err := s.cache.GetInformerForKind(s.ctx, resource) + if err != nil { + klog.V(2).ErrorS(err, "Failed to get informer for resource", "gvk", resource) + } + return informer.HasSynced() } func (s *informerManagerImpl) Lister(resource schema.GroupVersionResource) cache.GenericLister { - return s.informerFactory.ForResource(resource).Lister() + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{ + resource.String(): func(obj interface{}) ([]string, error) { + return []string{obj.(*unstructured.Unstructured).GetKind()}, nil + }, + }) + gr := schema.GroupResource{Resource: resource.Resource, Group: resource.Group} + return cache.NewGenericLister(indexer, gr) } func (s *informerManagerImpl) Start() { - s.informerFactory.Start(s.ctx.Done()) + err := s.cache.Start(s.ctx) + if err != nil { + klog.V(2).ErrorS(err, "Failed to start informer manager") + } } func (s *informerManagerImpl) GetClient() dynamic.Interface { @@ -186,7 +247,7 @@ func (s *informerManagerImpl) GetClient() dynamic.Interface { } func (s *informerManagerImpl) WaitForCacheSync() { - s.informerFactory.WaitForCacheSync(s.ctx.Done()) + s.cache.WaitForCacheSync(s.ctx) } func (s *informerManagerImpl) GetNameSpaceScopedResources() []schema.GroupVersionResource { @@ -195,7 +256,7 @@ func (s *informerManagerImpl) GetNameSpaceScopedResources() []schema.GroupVersio res := make([]schema.GroupVersionResource, 0, len(s.apiResources)) for _, resource := range s.apiResources { - if resource.isPresent && !resource.isStaticResource && !resource.IsClusterScoped { + if !resource.isStaticResource && !resource.IsClusterScoped { res = append(res, resource.GroupVersionResource) } }