From f3b1d300b37cdbfd1f71322fa0cec2081b345c93 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Thu, 28 Jan 2016 12:08:05 -0800 Subject: [PATCH] e2e: etcdctl test for proxy no-sync For https://github.com/coreos/etcd/issues/3894. --- e2e/etcd_test.go | 172 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 147 insertions(+), 25 deletions(-) diff --git a/e2e/etcd_test.go b/e2e/etcd_test.go index a8f180ba595c..718c0a6d4c97 100644 --- a/e2e/etcd_test.go +++ b/e2e/etcd_test.go @@ -20,6 +20,7 @@ import ( "net/url" "os" "strings" + "sync" "testing" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/gexpect" @@ -39,10 +40,11 @@ func TestBasicOpsNoTLS(t *testing.T) { testProcessClusterPutGet( t, &etcdProcessClusterConfig{ - clusterSize: 3, - isClientTLS: false, - isPeerTLS: false, - initialToken: "new", + etcdProcessBasePort: etcdProcessBasePort, + clusterSize: 3, + isClientTLS: false, + isPeerTLS: false, + initialToken: "new", }, ) } @@ -52,10 +54,11 @@ func TestBasicOpsAllTLS(t *testing.T) { testProcessClusterPutGet( t, &etcdProcessClusterConfig{ - clusterSize: 3, - isClientTLS: true, - isPeerTLS: true, - initialToken: "new", + etcdProcessBasePort: etcdProcessBasePort, + clusterSize: 3, + isClientTLS: true, + isPeerTLS: true, + initialToken: "new", }, ) } @@ -65,10 +68,11 @@ func TestBasicOpsPeerTLS(t *testing.T) { testProcessClusterPutGet( t, &etcdProcessClusterConfig{ - clusterSize: 3, - isClientTLS: false, - isPeerTLS: true, - initialToken: "new", + etcdProcessBasePort: etcdProcessBasePort, + clusterSize: 3, + isClientTLS: false, + isPeerTLS: true, + initialToken: "new", }, ) } @@ -78,10 +82,11 @@ func TestBasicOpsClientTLS(t *testing.T) { testProcessClusterPutGet( t, &etcdProcessClusterConfig{ - clusterSize: 3, - isClientTLS: true, - isPeerTLS: false, - initialToken: "new", + etcdProcessBasePort: etcdProcessBasePort, + clusterSize: 3, + isClientTLS: true, + isPeerTLS: false, + initialToken: "new", }, ) } @@ -145,13 +150,16 @@ type etcdProcessConfig struct { args []string dataDirPath string acurl url.URL + isProxy bool } type etcdProcessClusterConfig struct { - clusterSize int - isClientTLS bool - isPeerTLS bool - initialToken string + etcdProcessBasePort int + clusterSize int + proxySize int + isClientTLS bool + isPeerTLS bool + initialToken string } // newEtcdProcessCluster launches a new cluster from etcd processes, returning @@ -160,7 +168,7 @@ func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, etcdCfgs := cfg.etcdProcessConfigs() epc := &etcdProcessCluster{ cfg: cfg, - procs: make([]*etcdProcess, cfg.clusterSize), + procs: make([]*etcdProcess, cfg.clusterSize+cfg.proxySize), } // launch etcd processes @@ -174,11 +182,15 @@ func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, } // wait for cluster to start - readyC := make(chan error, cfg.clusterSize) + readyC := make(chan error, cfg.clusterSize+cfg.proxySize) readyStr := "set the initial cluster version" for i := range etcdCfgs { go func(etcdp *etcdProcess) { - _, err := etcdp.proc.ExpectRegex(readyStr) + rs := readyStr + if etcdp.cfg.isProxy { + rs = "listening to" + } + _, err := etcdp.proc.ExpectRegex(rs) readyC <- err etcdp.proc.ReadLine() etcdp.proc.Interact() // this blocks(leaks) if another goroutine is reading @@ -220,10 +232,10 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig { peerScheme = "https" } - etcdCfgs := make([]*etcdProcessConfig, cfg.clusterSize) + etcdCfgs := make([]*etcdProcessConfig, cfg.clusterSize+cfg.proxySize) initialCluster := make([]string, cfg.clusterSize) for i := 0; i < cfg.clusterSize; i++ { - port := etcdProcessBasePort + 2*i + port := cfg.etcdProcessBasePort + 2*i curl := url.URL{Scheme: clientScheme, Host: fmt.Sprintf("localhost:%d", port)} purl := url.URL{Scheme: peerScheme, Host: fmt.Sprintf("localhost:%d", port+1)} name := fmt.Sprintf("testname%d", i) @@ -262,6 +274,24 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig { acurl: curl, } } + for i := 0; i < cfg.proxySize; i++ { + port := cfg.etcdProcessBasePort + 2*cfg.clusterSize + i + 1 + curl := url.URL{Scheme: clientScheme, Host: fmt.Sprintf("localhost:%d", port)} + name := fmt.Sprintf("testname-proxy%d", i) + dataDirPath := name + ".etcd" + args := []string{ + "--name", name, + "--proxy", "on", + "--listen-client-urls", curl.String(), + "--data-dir", dataDirPath, + } + etcdCfgs[cfg.clusterSize+i] = &etcdProcessConfig{ + args: args, + dataDirPath: dataDirPath, + acurl: curl, + isProxy: true, + } + } initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")} for i := range etcdCfgs { @@ -305,3 +335,95 @@ func spawnWithExpect(args []string, expected string) error { } return proc.Close() } + +func TestBasicOpsV2CtlWatchWithProxy(t *testing.T) { + testProcessClusterV2CtlWatch( + t, + &etcdProcessClusterConfig{ + etcdProcessBasePort: etcdProcessBasePort + 10, + clusterSize: 3, + proxySize: 1, + isClientTLS: false, + isPeerTLS: false, + initialToken: "new", + }, + false, + ) +} + +func TestBasicOpsV2CtlWatchWithProxyNoSync(t *testing.T) { + testProcessClusterV2CtlWatch( + t, + &etcdProcessClusterConfig{ + etcdProcessBasePort: etcdProcessBasePort + 20, + clusterSize: 3, + proxySize: 1, + isClientTLS: false, + isPeerTLS: false, + initialToken: "new", + }, + true, + ) +} + +func testProcessClusterV2CtlWatch(t *testing.T, cfg *etcdProcessClusterConfig, noSync bool) { + if fileutil.Exist("../bin/etcdctl") == false { + t.Fatalf("could not find etcdctl binary") + } + + epc, errC := newEtcdProcessCluster(cfg) + if errC != nil { + t.Fatalf("could not start etcd process cluster (%v)", errC) + } + defer func() { + if errC := epc.Close(); errC != nil { + if !strings.Contains(errC.Error(), "os: process already finished") { + t.Fatalf("error closing etcd processes (%v)", errC) + } + } + }() + + endpoint, be := "", "" + for _, p := range epc.procs { + if p.cfg.isProxy { + endpoint = p.cfg.acurl.String() + break + } else { + be = p.cfg.acurl.String() + } + } + if endpoint == "" { + endpoint = be + } + + key, value := "foo", "bar" + + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + + watchArgs := []string{"../bin/etcdctl", "--endpoint", endpoint} + if noSync { + watchArgs = append(watchArgs, "--no-sync") + } + watchArgs = append(watchArgs, "watch", key) + + if err := spawnWithExpect(watchArgs, value); err != nil { + t.Fatalf("failed watch (%v)", err) + } + }() + + putArgs := []string{"../bin/etcdctl", "--endpoint", endpoint} + if noSync { + putArgs = append(putArgs, "--no-sync") + } + putArgs = append(putArgs, "set", key, value) + + if err := spawnWithExpect(putArgs, value); err != nil { + t.Fatalf("failed set (%v)", err) + } + + wg.Wait() +}