From 7fb43e70f8c59e87c906ddcad2fb5db8f52d9f72 Mon Sep 17 00:00:00 2001 From: Yifei Zhang Date: Wed, 11 Jan 2023 19:38:42 +0800 Subject: [PATCH] Bugfix of pool-coordinator and enable unit test to pass (#1137) * initialize etcdClient of componentKeyCache Signed-off-by: Congrool --- go.mod | 1 + go.sum | 1 + pkg/profile/profile_test.go | 24 +-- .../certificate/hubself/cert_mgr_test.go | 2 +- pkg/yurthub/storage/disk/storage.go | 6 +- pkg/yurthub/storage/disk/storage_test.go | 4 + pkg/yurthub/storage/etcd/etcd_suite_test.go | 46 +++++- pkg/yurthub/storage/etcd/key_test.go | 15 ++ pkg/yurthub/storage/etcd/keycache.go | 135 ++++++++--------- pkg/yurthub/storage/etcd/keycache_test.go | 140 +++++++++++++++++- pkg/yurthub/storage/etcd/mock/kv.go | 60 ++++++++ pkg/yurthub/storage/etcd/storage.go | 51 ++++--- pkg/yurthub/storage/etcd/storage_test.go | 10 +- 13 files changed, 379 insertions(+), 116 deletions(-) create mode 100644 pkg/yurthub/storage/etcd/mock/kv.go diff --git a/go.mod b/go.mod index 8eb8abf7cfd..ccff9843592 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/stretchr/testify v1.7.0 github.com/vishvananda/netlink v1.1.1-0.20200603190939-5a869a71f0cb github.com/wI2L/jsondiff v0.3.0 + go.etcd.io/etcd/api/v3 v3.5.0 go.etcd.io/etcd/client/pkg/v3 v3.5.0 go.etcd.io/etcd/client/v3 v3.5.0 golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8 diff --git a/go.sum b/go.sum index a8ea7a80f03..95025ae130e 100644 --- a/go.sum +++ b/go.sum @@ -612,6 +612,7 @@ github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= diff --git a/pkg/profile/profile_test.go b/pkg/profile/profile_test.go index 002af5275b3..cfb7f2e7e78 100644 --- a/pkg/profile/profile_test.go +++ b/pkg/profile/profile_test.go @@ -29,15 +29,17 @@ func fakeServer(h http.Handler) error { } func TestInstall(t *testing.T) { - m := mux.NewRouter() - Install(m) - go fakeServer(m) - r, err := http.Get("http://localhost:9090/debug/pprof/") - if err != nil { - t.Error(" failed to send request to fake server") - } - - if r.StatusCode != http.StatusOK { - t.Error(err) - } + t.Run("TestInstall", func(t *testing.T) { + m := mux.NewRouter() + Install(m) + go fakeServer(m) + r, err := http.Get("http://localhost:9090/debug/pprof/") + if err != nil { + t.Errorf("failed to send request to fake server, %v", err) + } + + if r.StatusCode != http.StatusOK { + t.Error(err) + } + }) } diff --git a/pkg/yurthub/certificate/hubself/cert_mgr_test.go b/pkg/yurthub/certificate/hubself/cert_mgr_test.go index 80e202498e6..208b92498fb 100644 --- a/pkg/yurthub/certificate/hubself/cert_mgr_test.go +++ b/pkg/yurthub/certificate/hubself/cert_mgr_test.go @@ -147,7 +147,7 @@ func Test_createInsecureRestClientConfig(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, err := CreateInsecureRestClientConfig(tt.args.remoteServer) + _, err := createInsecureRestClientConfig(tt.args.remoteServer) if (err != nil) != tt.wantErr { t.Errorf("CreateInsecureRestClientConfig() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/pkg/yurthub/storage/disk/storage.go b/pkg/yurthub/storage/disk/storage.go index 794bff3b65f..b052f6d73d5 100644 --- a/pkg/yurthub/storage/disk/storage.go +++ b/pkg/yurthub/storage/disk/storage.go @@ -104,11 +104,15 @@ func (ds *diskStorage) Name() string { // Create will create a new file with content. key indicates the path of the file. func (ds *diskStorage) Create(key storage.Key, content []byte) error { - if err := utils.ValidateKV(key, content, storageKey{}); err != nil { + if err := utils.ValidateKey(key, storageKey{}); err != nil { return err } storageKey := key.(storageKey) + if !storageKey.isRootKey() && len(content) == 0 { + return storage.ErrKeyHasNoContent + } + if !ds.lockKey(storageKey) { return storage.ErrStorageAccessConflict } diff --git a/pkg/yurthub/storage/disk/storage_test.go b/pkg/yurthub/storage/disk/storage_test.go index 88ec1315f9c..4017fb2703d 100644 --- a/pkg/yurthub/storage/disk/storage_test.go +++ b/pkg/yurthub/storage/disk/storage_test.go @@ -332,6 +332,10 @@ var _ = Describe("Test DiskStorage Exposed Functions", func() { Expect(err).To(BeNil()) Expect(info.IsDir()).To(BeTrue()) }) + It("should return ErrKeyHasNoContent if it is not rootKey and has no content", func() { + err = store.Create(podKey, []byte{}) + Expect(err).To(Equal(storage.ErrKeyHasNoContent)) + }) It("should return ErrKeyIsEmpty if key is empty", func() { err = store.Create(storageKey{}, podBytes) Expect(err).To(Equal(storage.ErrKeyIsEmpty)) diff --git a/pkg/yurthub/storage/etcd/etcd_suite_test.go b/pkg/yurthub/storage/etcd/etcd_suite_test.go index 160132c3aa4..f508ee9c8ba 100644 --- a/pkg/yurthub/storage/etcd/etcd_suite_test.go +++ b/pkg/yurthub/storage/etcd/etcd_suite_test.go @@ -17,8 +17,10 @@ limitations under the License. package etcd import ( + "fmt" "os" "os/exec" + "path/filepath" "testing" . "github.com/onsi/ginkgo/v2" @@ -29,6 +31,9 @@ var keyCacheDir = "/tmp/etcd-test" var etcdDataDir = "/tmp/storagetest.etcd" var devNull *os.File var etcdCmd *exec.Cmd +var downloadURL = "https://github.com/etcd-io/etcd/releases/download" +var etcdVersion = "v3.5.0" +var etcdCmdPath = "/tmp/etcd/etcd" var _ = BeforeSuite(func() { Expect(os.RemoveAll(keyCacheDir)).To(BeNil()) @@ -38,7 +43,14 @@ var _ = BeforeSuite(func() { var err error devNull, err = os.OpenFile("/dev/null", os.O_RDWR, 0755) Expect(err).To(BeNil()) - etcdCmd = exec.Command("/usr/local/etcd/etcd", "--data-dir="+etcdDataDir) + + // It will check if etcd cmd can be found in PATH, otherwise + // it will be installed. + etcdCmdPath, err = ensureEtcdCmd() + Expect(err).To(BeNil()) + Expect(len(etcdCmdPath)).ShouldNot(BeZero()) + + etcdCmd = exec.Command(etcdCmdPath, "--data-dir="+etcdDataDir) etcdCmd.Stdout = devNull etcdCmd.Stderr = devNull Expect(etcdCmd.Start()).To(BeNil()) @@ -52,6 +64,38 @@ var _ = AfterSuite(func() { Expect(devNull.Close()).To(BeNil()) }) +func ensureEtcdCmd() (string, error) { + path, err := exec.LookPath("etcd") + if err == nil { + return path, nil + } + + return installEtcd() +} + +func installEtcd() (string, error) { + releaseURL := fmt.Sprintf("%s/%s/etcd-%s-linux-amd64.tar.gz", downloadURL, etcdVersion, etcdVersion) + downloadPath := fmt.Sprintf("/tmp/etcd/etcd-%s-linux-amd64.tar.gz", etcdVersion) + downloadDir := "/tmp/etcd" + if err := exec.Command("bash", "-c", "rm -rf "+downloadDir).Run(); err != nil { + return "", fmt.Errorf("failed to delete %s, %v", downloadDir, err) + } + + if err := exec.Command("bash", "-c", "mkdir "+downloadDir).Run(); err != nil { + return "", fmt.Errorf("failed to create dir %s, %v", downloadDir, err) + } + + if err := exec.Command("bash", "-c", "curl -L "+releaseURL+" -o "+downloadPath).Run(); err != nil { + return "", fmt.Errorf("failed to download etcd release %s at %s, %v", releaseURL, downloadPath, err) + } + + if err := exec.Command("tar", "-zxvf", downloadPath, "-C", downloadDir, "--strip-components=1").Run(); err != nil { + return "", fmt.Errorf("failed to extract tar at %s, %v", downloadPath, err) + } + + return filepath.Join(downloadDir, "etcd"), nil +} + func TestEtcd(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "ComponentKeyCache Test Suite") diff --git a/pkg/yurthub/storage/etcd/key_test.go b/pkg/yurthub/storage/etcd/key_test.go index bc59676282d..7eafaf9fe1a 100644 --- a/pkg/yurthub/storage/etcd/key_test.go +++ b/pkg/yurthub/storage/etcd/key_test.go @@ -36,6 +36,7 @@ func TestKeyFunc(t *testing.T) { }{ "core group normal case": { info: storage.KeyBuildInfo{ + Component: "kubelet", Group: "", Resources: "pods", Version: "v1", @@ -47,6 +48,7 @@ func TestKeyFunc(t *testing.T) { "special prefix for node resource": { info: storage.KeyBuildInfo{ + Component: "kubelet", Group: "", Resources: "nodes", Version: "v1", @@ -57,6 +59,7 @@ func TestKeyFunc(t *testing.T) { }, "not core group": { info: storage.KeyBuildInfo{ + Component: "kubelet", Group: "apps", Resources: "deployments", Version: "v1", @@ -67,6 +70,7 @@ func TestKeyFunc(t *testing.T) { }, "special prefix for service resource": { info: storage.KeyBuildInfo{ + Component: "kube-proxy", Group: "networking.k8s.io", Resources: "ingresses", Version: "v1", @@ -77,6 +81,7 @@ func TestKeyFunc(t *testing.T) { }, "empty resources": { info: storage.KeyBuildInfo{ + Component: "yurthub", Group: "", Resources: "", Version: "v1", @@ -85,6 +90,16 @@ func TestKeyFunc(t *testing.T) { }, err: storage.ErrEmptyResource, }, + "empty component": { + info: storage.KeyBuildInfo{ + Group: "", + Resources: "nodes", + Version: "v1", + Namespace: "", + Name: "test-node", + }, + err: storage.ErrEmptyComponent, + }, } for n, c := range cases { diff --git a/pkg/yurthub/storage/etcd/keycache.go b/pkg/yurthub/storage/etcd/keycache.go index 8122c7f7b66..df670ebca7e 100644 --- a/pkg/yurthub/storage/etcd/keycache.go +++ b/pkg/yurthub/storage/etcd/keycache.go @@ -69,12 +69,12 @@ func (s keySet) Difference(s2 keySet) []storageKey { // ... type componentKeyCache struct { sync.Mutex - ctx context.Context - cache map[string]keySet - filePath string - keyFunc func(storage.KeyBuildInfo) (storage.Key, error) - fsOperator fs.FileSystemOperator - getEtcdClient func() *clientv3.Client + ctx context.Context + cache map[string]keySet + filePath string + keyFunc func(storage.KeyBuildInfo) (storage.Key, error) + fsOperator fs.FileSystemOperator + etcdClient *clientv3.Client } func (c *componentKeyCache) Recover() error { @@ -84,27 +84,28 @@ func (c *componentKeyCache) Recover() error { if err := c.fsOperator.CreateFile(c.filePath, []byte{}); err != nil { return fmt.Errorf("failed to create cache file at %s, %v", c.filePath, err) } - return nil } else if err != nil { return fmt.Errorf("failed to recover key cache from %s, %v", c.filePath, err) } - // successfully read from file - if len(buf) == 0 { - return nil - } - lines := strings.Split(string(buf), "\n") - for i, l := range lines { - s := strings.Split(l, ":") - if len(s) != 2 { - return fmt.Errorf("failed to parse line %d, invalid format", i) - } - comp, keys := s[0], strings.Split(s[1], ",") - ks := keySet{m: map[storageKey]struct{}{}} - for _, key := range keys { - ks.m[storageKey{path: key}] = struct{}{} + if len(buf) != 0 { + // We've got content from file + lines := strings.Split(string(buf), "\n") + for i, l := range lines { + s := strings.Split(l, ":") + if len(s) != 2 { + return fmt.Errorf("failed to parse line %d, invalid format", i) + } + comp, keys := s[0], strings.Split(s[1], ",") + ks := keySet{m: map[storageKey]struct{}{}} + for _, key := range keys { + ks.m[storageKey{ + comp: comp, + path: key, + }] = struct{}{} + } + c.cache[comp] = ks } - c.cache[comp] = ks } poolScopedKeyset, err := c.getPoolScopedKeyset() @@ -120,49 +121,43 @@ func (c *componentKeyCache) Recover() error { } func (c *componentKeyCache) getPoolScopedKeyset() (*keySet, error) { - // FIXME: now getEtcdClient would cause nil pointer - //client := c.getEtcdClient() - //if client == nil { - // return nil, fmt.Errorf("got empty etcd client") - //} - keys := &keySet{m: map[storageKey]struct{}{}} - //for gvr := range coordinatorconstants.PoolScopedResources { - //getCtx, cancel := context.WithTimeout(c.ctx, defaultTimeout) - //defer cancel() - //rootKey, err := c.keyFunc(storage.KeyBuildInfo{ - // Component: coordinatorconstants.DefaultPoolScopedUserAgent, - // Group: gvr.Group, - // Version: gvr.Version, - // Resources: gvr.Resource, - //}) - //if err != nil { - // return nil, fmt.Errorf("failed to generate keys for %s, %v", gvr.String(), err) - //} - //getResp, err := client.Get(getCtx, rootKey.Key(), clientv3.WithPrefix(), clientv3.WithKeysOnly()) - //if err != nil { - // return nil, fmt.Errorf("failed to get from etcd for %s, %v", gvr.String(), err) - //} - // - //for _, kv := range getResp.Kvs { - // ns, name, err := getNamespaceAndNameFromKeyPath(string(kv.Key)) - // if err != nil { - // return nil, fmt.Errorf("failed to parse namespace and name of %s", kv.Key) - // } - // key, err := c.keyFunc(storage.KeyBuildInfo{ - // Component: coordinatorconstants.DefaultPoolScopedUserAgent, - // Group: gvr.Group, - // Version: gvr.Version, - // Resources: gvr.Resource, - // Namespace: ns, - // Name: name, - // }) - // if err != nil { - // return nil, fmt.Errorf("failed to create resource key for %v", kv.Key) - // } - // keys.m[key.(storageKey)] = struct{}{} - //} - //} + for gvr := range coordinatorconstants.PoolScopedResources { + getCtx, cancel := context.WithTimeout(c.ctx, defaultTimeout) + defer cancel() + rootKey, err := c.keyFunc(storage.KeyBuildInfo{ + Component: coordinatorconstants.DefaultPoolScopedUserAgent, + Group: gvr.Group, + Version: gvr.Version, + Resources: gvr.Resource, + }) + if err != nil { + return nil, fmt.Errorf("failed to generate keys for %s, %v", gvr.String(), err) + } + getResp, err := c.etcdClient.Get(getCtx, rootKey.Key(), clientv3.WithPrefix(), clientv3.WithKeysOnly()) + if err != nil { + return nil, fmt.Errorf("failed to get from etcd for %s, %v", gvr.String(), err) + } + + for _, kv := range getResp.Kvs { + ns, name, err := getNamespaceAndNameFromKeyPath(string(kv.Key)) + if err != nil { + return nil, fmt.Errorf("failed to parse namespace and name of %s", kv.Key) + } + key, err := c.keyFunc(storage.KeyBuildInfo{ + Component: coordinatorconstants.DefaultPoolScopedUserAgent, + Group: gvr.Group, + Version: gvr.Version, + Resources: gvr.Resource, + Namespace: ns, + Name: name, + }) + if err != nil { + return nil, fmt.Errorf("failed to create resource key for %v", kv.Key) + } + keys.m[key.(storageKey)] = struct{}{} + } + } return keys, nil } @@ -262,19 +257,11 @@ func (c *componentKeyCache) flush() error { return nil } -func newComponentKeyCache(filePath string) *componentKeyCache { - return &componentKeyCache{ - filePath: filePath, - cache: map[string]keySet{}, - fsOperator: fs.FileSystemOperator{}, - } -} - // We assume that path points to a namespaced resource. func getNamespaceAndNameFromKeyPath(path string) (string, string, error) { - elems := strings.Split(path, "/") + elems := strings.Split(strings.TrimPrefix(path, "/"), "/") if len(elems) < 2 { - return "", "", fmt.Errorf("unrecognized path: %v", path) + return "", "", fmt.Errorf("unrecognized path: %s", path) } return elems[len(elems)-2], elems[len(elems)-1], nil diff --git a/pkg/yurthub/storage/etcd/keycache_test.go b/pkg/yurthub/storage/etcd/keycache_test.go index c9512e221e3..8120ae1bf57 100644 --- a/pkg/yurthub/storage/etcd/keycache_test.go +++ b/pkg/yurthub/storage/etcd/keycache_test.go @@ -17,6 +17,7 @@ limitations under the License. package etcd import ( + "context" "os" "path/filepath" "reflect" @@ -26,7 +27,12 @@ import ( "github.com/google/uuid" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/stretchr/testify/mock" + mvccpb "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" + coordinatorconstants "github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/constants" + etcdmock "github.com/openyurtio/openyurt/pkg/yurthub/storage/etcd/mock" "github.com/openyurtio/openyurt/pkg/yurthub/util/fs" ) @@ -34,9 +40,24 @@ var _ = Describe("Test componentKeyCache setup", func() { var cache *componentKeyCache var fileName string var f fs.FileSystemOperator + var mockedClient *clientv3.Client BeforeEach(func() { + kv := etcdmock.KV{} + kv.On("Get", "/registry/services/endpoints", mock.AnythingOfType("clientv3.OpOption"), mock.AnythingOfType("clientv3.OpOption")). + Return(&clientv3.GetResponse{}) + kv.On("Get", "/registry/endpointslices", mock.AnythingOfType("clientv3.OpOption"), mock.AnythingOfType("clientv3.OpOption")). + Return(&clientv3.GetResponse{}) + etcdStorage := &etcdStorage{prefix: "/registry"} + mockedClient = &clientv3.Client{KV: kv} fileName = uuid.New().String() - cache = newComponentKeyCache(filepath.Join(keyCacheDir, fileName)) + cache = &componentKeyCache{ + ctx: context.Background(), + filePath: filepath.Join(keyCacheDir, fileName), + cache: map[string]keySet{}, + fsOperator: fs.FileSystemOperator{}, + etcdClient: mockedClient, + keyFunc: etcdStorage.KeyFunc, + } }) AfterEach(func() { Expect(os.RemoveAll(filepath.Join(keyCacheDir, fileName))) @@ -44,13 +65,92 @@ var _ = Describe("Test componentKeyCache setup", func() { It("should recover when cache file does not exist", func() { Expect(cache.Recover()).To(BeNil()) - Expect(len(cache.cache)).To(BeZero()) + Expect(len(cache.cache)).To(Equal(1)) }) It("should recover when cache file is empty", func() { Expect(f.CreateFile(filepath.Join(keyCacheDir, fileName), []byte{})).To(BeNil()) Expect(cache.Recover()).To(BeNil()) - Expect(len(cache.cache)).To(BeZero()) + Expect(len(cache.cache)).To(Equal(1)) + }) + + Context("Test get pool-scoped resource keys from etcd", func() { + BeforeEach(func() { + kv := etcdmock.KV{} + kv.On("Get", "/registry/services/endpoints", mock.AnythingOfType("clientv3.OpOption"), mock.AnythingOfType("clientv3.OpOption")). + Return(&clientv3.GetResponse{ + Kvs: []*mvccpb.KeyValue{ + {Key: []byte("/registry/services/endpoints/default/nginx")}, + {Key: []byte("/registry/services/endpoints/kube-system/kube-dns")}, + }, + }) + kv.On("Get", "/registry/endpointslices", mock.AnythingOfType("clientv3.OpOption"), mock.AnythingOfType("clientv3.OpOption")). + Return(&clientv3.GetResponse{ + Kvs: []*mvccpb.KeyValue{ + {Key: []byte("/registry/endpointslices/default/nginx")}, + {Key: []byte("/registry/endpointslices/kube-system/kube-dns")}, + }, + }) + mockedClient.KV = kv + }) + + It("should recover leader-yurthub cache from etcd", func() { + Expect(cache.Recover()).To(BeNil()) + Expect(cache.cache[coordinatorconstants.DefaultPoolScopedUserAgent]).Should(Equal( + keySet{ + m: map[storageKey]struct{}{ + { + comp: coordinatorconstants.DefaultPoolScopedUserAgent, + path: "/registry/services/endpoints/default/nginx", + }: {}, + { + comp: coordinatorconstants.DefaultPoolScopedUserAgent, + path: "/registry/services/endpoints/kube-system/kube-dns", + }: {}, + { + comp: coordinatorconstants.DefaultPoolScopedUserAgent, + path: "/registry/endpointslices/default/nginx", + }: {}, + { + comp: coordinatorconstants.DefaultPoolScopedUserAgent, + path: "/registry/endpointslices/kube-system/kube-dns", + }: {}, + }, + }, + )) + }) + + It("should replace leader-yurthub cache read from local file with keys from etcd", func() { + Expect(f.CreateFile(filepath.Join(keyCacheDir, fileName), []byte( + "leader-yurthub:/registry/services/endpoints/default/nginx-local,"+ + "/registry/services/endpoints/kube-system/kube-dns-local,"+ + "/registry/endpointslices/default/nginx-local,"+ + "/registry/endpointslices/kube-system/kube-dns-local", + ))).To(BeNil()) + Expect(cache.Recover()).To(BeNil()) + Expect(cache.cache[coordinatorconstants.DefaultPoolScopedUserAgent]).Should(Equal( + keySet{ + m: map[storageKey]struct{}{ + { + comp: coordinatorconstants.DefaultPoolScopedUserAgent, + path: "/registry/services/endpoints/default/nginx", + }: {}, + { + comp: coordinatorconstants.DefaultPoolScopedUserAgent, + path: "/registry/services/endpoints/kube-system/kube-dns", + }: {}, + { + comp: coordinatorconstants.DefaultPoolScopedUserAgent, + path: "/registry/endpointslices/default/nginx", + }: {}, + { + comp: coordinatorconstants.DefaultPoolScopedUserAgent, + path: "/registry/endpointslices/kube-system/kube-dns", + }: {}, + }, + }, + )) + }) }) It("should recover when cache file exists and contains valid data", func() { @@ -62,15 +162,27 @@ var _ = Describe("Test componentKeyCache setup", func() { Expect(cache.cache).To(Equal(map[string]keySet{ "kubelet": { m: map[storageKey]struct{}{ - {path: "/registry/pods/default/pod1"}: {}, - {path: "/registry/pods/default/pod2"}: {}, + { + comp: "kubelet", + path: "/registry/pods/default/pod1", + }: {}, + { + comp: "kubelet", + path: "/registry/pods/default/pod2", + }: {}, }, }, "kube-proxy": { m: map[storageKey]struct{}{ - {path: "/registry/configmaps/kube-system/kube-proxy"}: {}, + { + comp: "kube-proxy", + path: "/registry/configmaps/kube-system/kube-proxy", + }: {}, }, }, + coordinatorconstants.DefaultPoolScopedUserAgent: { + m: map[storageKey]struct{}{}, + }, })) }) @@ -87,8 +199,22 @@ var _ = Describe("Test componentKeyCache function", func() { var fileName string var key1, key2, key3 storageKey BeforeEach(func() { + kv := etcdmock.KV{} + kv.On("Get", "/registry/services/endpoints", mock.AnythingOfType("clientv3.OpOption"), mock.AnythingOfType("clientv3.OpOption")). + Return(&clientv3.GetResponse{}) + kv.On("Get", "/registry/endpointslices", mock.AnythingOfType("clientv3.OpOption"), mock.AnythingOfType("clientv3.OpOption")). + Return(&clientv3.GetResponse{}) + mockedClient := &clientv3.Client{KV: kv} + etcdStorage := etcdStorage{prefix: "/registry"} fileName = uuid.New().String() - cache = newComponentKeyCache(filepath.Join(keyCacheDir, fileName)) + cache = &componentKeyCache{ + ctx: context.Background(), + filePath: filepath.Join(keyCacheDir, fileName), + cache: map[string]keySet{}, + fsOperator: fs.FileSystemOperator{}, + etcdClient: mockedClient, + keyFunc: etcdStorage.KeyFunc, + } key1 = storageKey{ path: "/registry/pods/default/pod1", } diff --git a/pkg/yurthub/storage/etcd/mock/kv.go b/pkg/yurthub/storage/etcd/mock/kv.go new file mode 100644 index 00000000000..3839aa01dfa --- /dev/null +++ b/pkg/yurthub/storage/etcd/mock/kv.go @@ -0,0 +1,60 @@ +/* +Copyright 2023 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mock + +import ( + "context" + + "github.com/stretchr/testify/mock" + clientv3 "go.etcd.io/etcd/client/v3" +) + +var _ clientv3.KV = KV{} + +type KV struct { + mock.Mock +} + +func (kv KV) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) { + return nil, nil +} + +func (kv KV) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { + interfaceOpts := []interface{}{key} + for _, opt := range opts { + interfaceOpts = append(interfaceOpts, opt) + } + args := kv.Called(interfaceOpts...) + resp := args.Get(0).(*clientv3.GetResponse) + return resp, nil +} + +func (kv KV) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) { + return nil, nil +} + +func (kv KV) Compact(ctx context.Context, rev int64, opts ...clientv3.CompactOption) (*clientv3.CompactResponse, error) { + return nil, nil +} + +func (kv KV) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse, error) { + return clientv3.OpResponse{}, nil +} + +func (kv KV) Txn(ctx context.Context) clientv3.Txn { + return nil +} diff --git a/pkg/yurthub/storage/etcd/storage.go b/pkg/yurthub/storage/etcd/storage.go index fe0b9503b1f..7e5df7b0b61 100644 --- a/pkg/yurthub/storage/etcd/storage.go +++ b/pkg/yurthub/storage/etcd/storage.go @@ -18,6 +18,7 @@ package etcd import ( "context" + "crypto/tls" "fmt" "path/filepath" "strings" @@ -33,6 +34,7 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/storage" "github.com/openyurtio/openyurt/pkg/yurthub/storage/utils" + "github.com/openyurtio/openyurt/pkg/yurthub/util/fs" ) const ( @@ -57,6 +59,7 @@ type EtcdStorageConfig struct { KeyFile string CaFile string LocalCacheDir string + UnSecure bool } // TODO: consider how to recover the work if it was interrupted because of restart, in @@ -86,21 +89,20 @@ type etcdStorage struct { } func NewStorage(ctx context.Context, cfg *EtcdStorageConfig) (storage.Store, error) { + var tlsConfig *tls.Config + var err error cacheFilePath := filepath.Join(cfg.LocalCacheDir, defaultComponentCacheFileName) - cache := newComponentKeyCache(cacheFilePath) - if err := cache.Recover(); err != nil { - return nil, fmt.Errorf("failed to recover component key cache from %s, %v", cacheFilePath, err) - } - - tlsInfo := transport.TLSInfo{ - CertFile: cfg.CertFile, - KeyFile: cfg.KeyFile, - TrustedCAFile: cfg.CaFile, - } + if !cfg.UnSecure { + tlsInfo := transport.TLSInfo{ + CertFile: cfg.CertFile, + KeyFile: cfg.KeyFile, + TrustedCAFile: cfg.CaFile, + } - tlsConfig, err := tlsInfo.ClientConfig() - if err != nil { - return nil, fmt.Errorf("failed to create tls config for etcd client, %v", err) + tlsConfig, err = tlsInfo.ClientConfig() + if err != nil { + return nil, fmt.Errorf("failed to create tls config for etcd client, %v", err) + } } clientConfig := clientv3.Config{ @@ -115,17 +117,30 @@ func NewStorage(ctx context.Context, cfg *EtcdStorageConfig) (storage.Store, err } s := &etcdStorage{ - ctx: ctx, - prefix: cfg.Prefix, - client: client, - clientConfig: clientConfig, - localComponentKeyCache: cache, + ctx: ctx, + prefix: cfg.Prefix, + client: client, + clientConfig: clientConfig, mirrorPrefixMap: map[pathType]string{ rvType: "/mirror/rv", }, } + cache := &componentKeyCache{ + ctx: ctx, + filePath: cacheFilePath, + cache: map[string]keySet{}, + fsOperator: fs.FileSystemOperator{}, + keyFunc: s.KeyFunc, + etcdClient: client, + } + if err := cache.Recover(); err != nil { + return nil, fmt.Errorf("failed to recover component key cache from %s, %v", cacheFilePath, err) + } + s.localComponentKeyCache = cache + go s.clientLifeCycleManagement() + return s, nil } diff --git a/pkg/yurthub/storage/etcd/storage_test.go b/pkg/yurthub/storage/etcd/storage_test.go index ea3d0e3952c..20e1ccbd79b 100644 --- a/pkg/yurthub/storage/etcd/storage_test.go +++ b/pkg/yurthub/storage/etcd/storage_test.go @@ -34,6 +34,9 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/storage" ) +// TODO: These tests should be integration tests instead of unit tests. +// Currently, we will install the etcd cmd BeforeSuite to make these tests work around. +// But they are better moved to integration test dir. var _ = Describe("Test EtcdStorage", func() { var etcdstore *etcdStorage var key1 storage.Key @@ -47,6 +50,7 @@ var _ = Describe("Test EtcdStorage", func() { Prefix: "/" + randomize, EtcdEndpoints: []string{"127.0.0.1:2379"}, LocalCacheDir: filepath.Join(keyCacheDir, randomize), + UnSecure: true, } s, err := NewStorage(context.Background(), cfg) Expect(err).To(BeNil()) @@ -75,8 +79,8 @@ var _ = Describe("Test EtcdStorage", func() { Expect(err).To(BeNil()) }) - Context("Test Lifecycle", Focus, func() { - It("should reconnect to etcd if connect once broken", Focus, func() { + Context("Test Lifecycle", func() { + It("should reconnect to etcd if connect once broken", func() { Expect(etcdstore.Create(key1, podJson)).Should(BeNil()) Expect(etcdCmd.Process.Kill()).To(BeNil()) By("waiting for the etcd exited") @@ -87,7 +91,7 @@ var _ = Describe("Test EtcdStorage", func() { devNull, err := os.OpenFile("/dev/null", os.O_RDWR, 0755) Expect(err).To(BeNil()) - etcdCmd = exec.Command("/usr/local/etcd/etcd", "--data-dir="+etcdDataDir) + etcdCmd = exec.Command(etcdCmdPath, "--data-dir="+etcdDataDir) etcdCmd.Stdout = devNull etcdCmd.Stderr = devNull Expect(etcdCmd.Start()).To(BeNil())