From 8197ae78661d7352ba002f6a9d9497f15c9c8b75 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Thu, 28 Jan 2016 12:08:05 -0800 Subject: [PATCH] e2e: etcdctl test for proxy no-sync For https://github.com/coreos/etcd/issues/3894. --- e2e/etcd_test.go | 132 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 124 insertions(+), 8 deletions(-) diff --git a/e2e/etcd_test.go b/e2e/etcd_test.go index a8f180ba595c..6911cb04ada7 100644 --- a/e2e/etcd_test.go +++ b/e2e/etcd_test.go @@ -20,6 +20,7 @@ import ( "net/url" "os" "strings" + "sync" "testing" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/gexpect" @@ -145,13 +146,16 @@ type etcdProcessConfig struct { args []string dataDirPath string acurl url.URL + isProxy bool } type etcdProcessClusterConfig struct { - clusterSize int - isClientTLS bool - isPeerTLS bool - initialToken string + etcdProcessBasePort int + clusterSize int + proxySize int + isClientTLS bool + isPeerTLS bool + initialToken string } // newEtcdProcessCluster launches a new cluster from etcd processes, returning @@ -160,7 +164,7 @@ func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, etcdCfgs := cfg.etcdProcessConfigs() epc := &etcdProcessCluster{ cfg: cfg, - procs: make([]*etcdProcess, cfg.clusterSize), + procs: make([]*etcdProcess, cfg.clusterSize+cfg.proxySize), } // launch etcd processes @@ -174,11 +178,15 @@ func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, } // wait for cluster to start - readyC := make(chan error, cfg.clusterSize) + readyC := make(chan error, cfg.clusterSize+cfg.proxySize) readyStr := "set the initial cluster version" for i := range etcdCfgs { go func(etcdp *etcdProcess) { - _, err := etcdp.proc.ExpectRegex(readyStr) + rs := readyStr + if etcdp.cfg.isProxy { + rs = "listening for client requests" + } + _, err := etcdp.proc.ExpectRegex(rs) readyC <- err etcdp.proc.ReadLine() etcdp.proc.Interact() // this blocks(leaks) if another goroutine is reading @@ -220,7 +228,7 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig { peerScheme = "https" } - etcdCfgs := make([]*etcdProcessConfig, cfg.clusterSize) + etcdCfgs := make([]*etcdProcessConfig, cfg.clusterSize+cfg.proxySize) initialCluster := make([]string, cfg.clusterSize) for i := 0; i < cfg.clusterSize; i++ { port := etcdProcessBasePort + 2*i @@ -262,6 +270,24 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig { acurl: curl, } } + for i := 0; i < cfg.proxySize; i++ { + port := etcdProcessBasePort + 2*cfg.clusterSize + i + 1 + curl := url.URL{Scheme: clientScheme, Host: fmt.Sprintf("localhost:%d", port)} + name := fmt.Sprintf("testname-proxy%d", i) + dataDirPath := name + ".etcd" + args := []string{ + "--name", name, + "--proxy", "on", + "--listen-client-urls", curl.String(), + "--data-dir", dataDirPath, + } + etcdCfgs[cfg.clusterSize+i] = &etcdProcessConfig{ + args: args, + dataDirPath: dataDirPath, + acurl: curl, + isProxy: true, + } + } initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")} for i := range etcdCfgs { @@ -305,3 +331,93 @@ func spawnWithExpect(args []string, expected string) error { } return proc.Close() } + +func TestBasicOpsV2CtlWatchWithProxy(t *testing.T) { + testProcessClusterV2CtlWatch( + t, + &etcdProcessClusterConfig{ + clusterSize: 3, + proxySize: 1, + isClientTLS: false, + isPeerTLS: false, + initialToken: "new", + }, + false, + ) +} + +func TestBasicOpsV2CtlWatchWithProxyNoSync(t *testing.T) { + testProcessClusterV2CtlWatch( + t, + &etcdProcessClusterConfig{ + clusterSize: 3, + proxySize: 1, + isClientTLS: false, + isPeerTLS: false, + initialToken: "new", + }, + true, + ) +} + +func testProcessClusterV2CtlWatch(t *testing.T, cfg *etcdProcessClusterConfig, noSync bool) { + if fileutil.Exist("../bin/etcdctl") == false { + t.Fatalf("could not find etcdctl binary") + } + + epc, errC := newEtcdProcessCluster(cfg) + if errC != nil { + t.Fatalf("could not start etcd process cluster (%v)", errC) + } + defer func() { + if errC := epc.Close(); errC != nil { + if !strings.Contains(errC.Error(), "os: process already finished") { + t.Fatalf("error closing etcd processes (%v)", errC) + } + } + }() + + endpoint, be := "", "" + for _, p := range epc.procs { + if p.cfg.isProxy { + endpoint = p.cfg.acurl.String() + break + } else { + be = p.cfg.acurl.String() + } + } + if endpoint == "" { + endpoint = be + } + + key, value := "foo", "bar" + + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + + watchArgs := []string{"../bin/etcdctl", "--endpoint", endpoint} + if noSync { + watchArgs = append(watchArgs, "--no-sync") + } + watchArgs = append(watchArgs, "watch", key) + + if err := spawnWithExpect(watchArgs, value); err != nil { + t.Fatalf("failed watch (%v)", err) + } + }() + + putArgs := []string{"../bin/etcdctl", "--endpoint", endpoint} + if noSync { + putArgs = append(putArgs, "--no-sync") + } + putArgs = append(putArgs, "set", key, value) + + if err := spawnWithExpect(putArgs, value); err != nil { + t.Fatalf("failed set (%v)", err) + } + + wg.Wait() +}