Skip to content

Commit

Permalink
Bugfix of pool-coordinator and enable unit test to pass (#1137)
Browse files Browse the repository at this point in the history
* initialize etcdClient of componentKeyCache

Signed-off-by: Congrool <[email protected]>
  • Loading branch information
Congrool authored Jan 11, 2023
1 parent 07fc159 commit 7fb43e7
Show file tree
Hide file tree
Showing 13 changed files with 379 additions and 116 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/stretchr/testify v1.7.0
github.com/vishvananda/netlink v1.1.1-0.20200603190939-5a869a71f0cb
github.com/wI2L/jsondiff v0.3.0
go.etcd.io/etcd/api/v3 v3.5.0
go.etcd.io/etcd/client/pkg/v3 v3.5.0
go.etcd.io/etcd/client/v3 v3.5.0
golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,7 @@ github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
Expand Down
24 changes: 13 additions & 11 deletions pkg/profile/profile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ func fakeServer(h http.Handler) error {
}

func TestInstall(t *testing.T) {
m := mux.NewRouter()
Install(m)
go fakeServer(m)
r, err := http.Get("http://localhost:9090/debug/pprof/")
if err != nil {
t.Error(" failed to send request to fake server")
}

if r.StatusCode != http.StatusOK {
t.Error(err)
}
t.Run("TestInstall", func(t *testing.T) {
m := mux.NewRouter()
Install(m)
go fakeServer(m)
r, err := http.Get("http://localhost:9090/debug/pprof/")
if err != nil {
t.Errorf("failed to send request to fake server, %v", err)
}

if r.StatusCode != http.StatusOK {
t.Error(err)
}
})
}
2 changes: 1 addition & 1 deletion pkg/yurthub/certificate/hubself/cert_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func Test_createInsecureRestClientConfig(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := CreateInsecureRestClientConfig(tt.args.remoteServer)
_, err := createInsecureRestClientConfig(tt.args.remoteServer)
if (err != nil) != tt.wantErr {
t.Errorf("CreateInsecureRestClientConfig() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
6 changes: 5 additions & 1 deletion pkg/yurthub/storage/disk/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,15 @@ func (ds *diskStorage) Name() string {

// Create will create a new file with content. key indicates the path of the file.
func (ds *diskStorage) Create(key storage.Key, content []byte) error {
if err := utils.ValidateKV(key, content, storageKey{}); err != nil {
if err := utils.ValidateKey(key, storageKey{}); err != nil {
return err
}
storageKey := key.(storageKey)

if !storageKey.isRootKey() && len(content) == 0 {
return storage.ErrKeyHasNoContent
}

if !ds.lockKey(storageKey) {
return storage.ErrStorageAccessConflict
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/yurthub/storage/disk/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,10 @@ var _ = Describe("Test DiskStorage Exposed Functions", func() {
Expect(err).To(BeNil())
Expect(info.IsDir()).To(BeTrue())
})
It("should return ErrKeyHasNoContent if it is not rootKey and has no content", func() {
err = store.Create(podKey, []byte{})
Expect(err).To(Equal(storage.ErrKeyHasNoContent))
})
It("should return ErrKeyIsEmpty if key is empty", func() {
err = store.Create(storageKey{}, podBytes)
Expect(err).To(Equal(storage.ErrKeyIsEmpty))
Expand Down
46 changes: 45 additions & 1 deletion pkg/yurthub/storage/etcd/etcd_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ limitations under the License.
package etcd

import (
"fmt"
"os"
"os/exec"
"path/filepath"
"testing"

. "github.com/onsi/ginkgo/v2"
Expand All @@ -29,6 +31,9 @@ var keyCacheDir = "/tmp/etcd-test"
var etcdDataDir = "/tmp/storagetest.etcd"
var devNull *os.File
var etcdCmd *exec.Cmd
var downloadURL = "https://github.com/etcd-io/etcd/releases/download"
var etcdVersion = "v3.5.0"
var etcdCmdPath = "/tmp/etcd/etcd"

var _ = BeforeSuite(func() {
Expect(os.RemoveAll(keyCacheDir)).To(BeNil())
Expand All @@ -38,7 +43,14 @@ var _ = BeforeSuite(func() {
var err error
devNull, err = os.OpenFile("/dev/null", os.O_RDWR, 0755)
Expect(err).To(BeNil())
etcdCmd = exec.Command("/usr/local/etcd/etcd", "--data-dir="+etcdDataDir)

// It will check if etcd cmd can be found in PATH, otherwise
// it will be installed.
etcdCmdPath, err = ensureEtcdCmd()
Expect(err).To(BeNil())
Expect(len(etcdCmdPath)).ShouldNot(BeZero())

etcdCmd = exec.Command(etcdCmdPath, "--data-dir="+etcdDataDir)
etcdCmd.Stdout = devNull
etcdCmd.Stderr = devNull
Expect(etcdCmd.Start()).To(BeNil())
Expand All @@ -52,6 +64,38 @@ var _ = AfterSuite(func() {
Expect(devNull.Close()).To(BeNil())
})

func ensureEtcdCmd() (string, error) {
path, err := exec.LookPath("etcd")
if err == nil {
return path, nil
}

return installEtcd()
}

func installEtcd() (string, error) {
releaseURL := fmt.Sprintf("%s/%s/etcd-%s-linux-amd64.tar.gz", downloadURL, etcdVersion, etcdVersion)
downloadPath := fmt.Sprintf("/tmp/etcd/etcd-%s-linux-amd64.tar.gz", etcdVersion)
downloadDir := "/tmp/etcd"
if err := exec.Command("bash", "-c", "rm -rf "+downloadDir).Run(); err != nil {
return "", fmt.Errorf("failed to delete %s, %v", downloadDir, err)
}

if err := exec.Command("bash", "-c", "mkdir "+downloadDir).Run(); err != nil {
return "", fmt.Errorf("failed to create dir %s, %v", downloadDir, err)
}

if err := exec.Command("bash", "-c", "curl -L "+releaseURL+" -o "+downloadPath).Run(); err != nil {
return "", fmt.Errorf("failed to download etcd release %s at %s, %v", releaseURL, downloadPath, err)
}

if err := exec.Command("tar", "-zxvf", downloadPath, "-C", downloadDir, "--strip-components=1").Run(); err != nil {
return "", fmt.Errorf("failed to extract tar at %s, %v", downloadPath, err)
}

return filepath.Join(downloadDir, "etcd"), nil
}

func TestEtcd(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "ComponentKeyCache Test Suite")
Expand Down
15 changes: 15 additions & 0 deletions pkg/yurthub/storage/etcd/key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestKeyFunc(t *testing.T) {
}{
"core group normal case": {
info: storage.KeyBuildInfo{
Component: "kubelet",
Group: "",
Resources: "pods",
Version: "v1",
Expand All @@ -47,6 +48,7 @@ func TestKeyFunc(t *testing.T) {

"special prefix for node resource": {
info: storage.KeyBuildInfo{
Component: "kubelet",
Group: "",
Resources: "nodes",
Version: "v1",
Expand All @@ -57,6 +59,7 @@ func TestKeyFunc(t *testing.T) {
},
"not core group": {
info: storage.KeyBuildInfo{
Component: "kubelet",
Group: "apps",
Resources: "deployments",
Version: "v1",
Expand All @@ -67,6 +70,7 @@ func TestKeyFunc(t *testing.T) {
},
"special prefix for service resource": {
info: storage.KeyBuildInfo{
Component: "kube-proxy",
Group: "networking.k8s.io",
Resources: "ingresses",
Version: "v1",
Expand All @@ -77,6 +81,7 @@ func TestKeyFunc(t *testing.T) {
},
"empty resources": {
info: storage.KeyBuildInfo{
Component: "yurthub",
Group: "",
Resources: "",
Version: "v1",
Expand All @@ -85,6 +90,16 @@ func TestKeyFunc(t *testing.T) {
},
err: storage.ErrEmptyResource,
},
"empty component": {
info: storage.KeyBuildInfo{
Group: "",
Resources: "nodes",
Version: "v1",
Namespace: "",
Name: "test-node",
},
err: storage.ErrEmptyComponent,
},
}

for n, c := range cases {
Expand Down
135 changes: 61 additions & 74 deletions pkg/yurthub/storage/etcd/keycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ func (s keySet) Difference(s2 keySet) []storageKey {
// ...
type componentKeyCache struct {
sync.Mutex
ctx context.Context
cache map[string]keySet
filePath string
keyFunc func(storage.KeyBuildInfo) (storage.Key, error)
fsOperator fs.FileSystemOperator
getEtcdClient func() *clientv3.Client
ctx context.Context
cache map[string]keySet
filePath string
keyFunc func(storage.KeyBuildInfo) (storage.Key, error)
fsOperator fs.FileSystemOperator
etcdClient *clientv3.Client
}

func (c *componentKeyCache) Recover() error {
Expand All @@ -84,27 +84,28 @@ func (c *componentKeyCache) Recover() error {
if err := c.fsOperator.CreateFile(c.filePath, []byte{}); err != nil {
return fmt.Errorf("failed to create cache file at %s, %v", c.filePath, err)
}
return nil
} else if err != nil {
return fmt.Errorf("failed to recover key cache from %s, %v", c.filePath, err)
}

// successfully read from file
if len(buf) == 0 {
return nil
}
lines := strings.Split(string(buf), "\n")
for i, l := range lines {
s := strings.Split(l, ":")
if len(s) != 2 {
return fmt.Errorf("failed to parse line %d, invalid format", i)
}
comp, keys := s[0], strings.Split(s[1], ",")
ks := keySet{m: map[storageKey]struct{}{}}
for _, key := range keys {
ks.m[storageKey{path: key}] = struct{}{}
if len(buf) != 0 {
// We've got content from file
lines := strings.Split(string(buf), "\n")
for i, l := range lines {
s := strings.Split(l, ":")
if len(s) != 2 {
return fmt.Errorf("failed to parse line %d, invalid format", i)
}
comp, keys := s[0], strings.Split(s[1], ",")
ks := keySet{m: map[storageKey]struct{}{}}
for _, key := range keys {
ks.m[storageKey{
comp: comp,
path: key,
}] = struct{}{}
}
c.cache[comp] = ks
}
c.cache[comp] = ks
}

poolScopedKeyset, err := c.getPoolScopedKeyset()
Expand All @@ -120,49 +121,43 @@ func (c *componentKeyCache) Recover() error {
}

func (c *componentKeyCache) getPoolScopedKeyset() (*keySet, error) {
// FIXME: now getEtcdClient would cause nil pointer
//client := c.getEtcdClient()
//if client == nil {
// return nil, fmt.Errorf("got empty etcd client")
//}

keys := &keySet{m: map[storageKey]struct{}{}}
//for gvr := range coordinatorconstants.PoolScopedResources {
//getCtx, cancel := context.WithTimeout(c.ctx, defaultTimeout)
//defer cancel()
//rootKey, err := c.keyFunc(storage.KeyBuildInfo{
// Component: coordinatorconstants.DefaultPoolScopedUserAgent,
// Group: gvr.Group,
// Version: gvr.Version,
// Resources: gvr.Resource,
//})
//if err != nil {
// return nil, fmt.Errorf("failed to generate keys for %s, %v", gvr.String(), err)
//}
//getResp, err := client.Get(getCtx, rootKey.Key(), clientv3.WithPrefix(), clientv3.WithKeysOnly())
//if err != nil {
// return nil, fmt.Errorf("failed to get from etcd for %s, %v", gvr.String(), err)
//}
//
//for _, kv := range getResp.Kvs {
// ns, name, err := getNamespaceAndNameFromKeyPath(string(kv.Key))
// if err != nil {
// return nil, fmt.Errorf("failed to parse namespace and name of %s", kv.Key)
// }
// key, err := c.keyFunc(storage.KeyBuildInfo{
// Component: coordinatorconstants.DefaultPoolScopedUserAgent,
// Group: gvr.Group,
// Version: gvr.Version,
// Resources: gvr.Resource,
// Namespace: ns,
// Name: name,
// })
// if err != nil {
// return nil, fmt.Errorf("failed to create resource key for %v", kv.Key)
// }
// keys.m[key.(storageKey)] = struct{}{}
//}
//}
for gvr := range coordinatorconstants.PoolScopedResources {
getCtx, cancel := context.WithTimeout(c.ctx, defaultTimeout)
defer cancel()
rootKey, err := c.keyFunc(storage.KeyBuildInfo{
Component: coordinatorconstants.DefaultPoolScopedUserAgent,
Group: gvr.Group,
Version: gvr.Version,
Resources: gvr.Resource,
})
if err != nil {
return nil, fmt.Errorf("failed to generate keys for %s, %v", gvr.String(), err)
}
getResp, err := c.etcdClient.Get(getCtx, rootKey.Key(), clientv3.WithPrefix(), clientv3.WithKeysOnly())
if err != nil {
return nil, fmt.Errorf("failed to get from etcd for %s, %v", gvr.String(), err)
}

for _, kv := range getResp.Kvs {
ns, name, err := getNamespaceAndNameFromKeyPath(string(kv.Key))
if err != nil {
return nil, fmt.Errorf("failed to parse namespace and name of %s", kv.Key)
}
key, err := c.keyFunc(storage.KeyBuildInfo{
Component: coordinatorconstants.DefaultPoolScopedUserAgent,
Group: gvr.Group,
Version: gvr.Version,
Resources: gvr.Resource,
Namespace: ns,
Name: name,
})
if err != nil {
return nil, fmt.Errorf("failed to create resource key for %v", kv.Key)
}
keys.m[key.(storageKey)] = struct{}{}
}
}
return keys, nil
}

Expand Down Expand Up @@ -262,19 +257,11 @@ func (c *componentKeyCache) flush() error {
return nil
}

func newComponentKeyCache(filePath string) *componentKeyCache {
return &componentKeyCache{
filePath: filePath,
cache: map[string]keySet{},
fsOperator: fs.FileSystemOperator{},
}
}

// We assume that path points to a namespaced resource.
func getNamespaceAndNameFromKeyPath(path string) (string, string, error) {
elems := strings.Split(path, "/")
elems := strings.Split(strings.TrimPrefix(path, "/"), "/")
if len(elems) < 2 {
return "", "", fmt.Errorf("unrecognized path: %v", path)
return "", "", fmt.Errorf("unrecognized path: %s", path)
}

return elems[len(elems)-2], elems[len(elems)-1], nil
Expand Down
Loading

0 comments on commit 7fb43e7

Please sign in to comment.