Skip to content

Commit

Permalink
tests: Test separate http port connection multiplexing
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Mar 30, 2023
1 parent c0421c7 commit 6637aee
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 45 deletions.
6 changes: 4 additions & 2 deletions tests/e2e/cluster_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ func newProxyEtcdProcess(cfg *etcdServerProcessConfig) (*proxyEtcdProcess, error

func (p *proxyEtcdProcess) Config() *etcdServerProcessConfig { return p.etcdProc.Config() }

func (p *proxyEtcdProcess) EndpointsV2() []string { return p.proxyV2.endpoints() }
func (p *proxyEtcdProcess) EndpointsV3() []string { return p.proxyV3.endpoints() }
func (p *proxyEtcdProcess) EndpointsV2() []string { return p.EndpointsHTTP() }
func (p *proxyEtcdProcess) EndpointsV3() []string { return p.EndpointsGRPC() }
func (p *proxyEtcdProcess) EndpointsHTTP() []string { return p.proxyV2.endpoints() }
func (p *proxyEtcdProcess) EndpointsGRPC() []string { return p.proxyV3.endpoints() }
func (p *proxyEtcdProcess) EndpointsMetrics() []string {
panic("not implemented; proxy doesn't provide health information")
}
Expand Down
58 changes: 35 additions & 23 deletions tests/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,19 +246,17 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []*
initialCluster := make([]string, cfg.clusterSize)
for i := 0; i < cfg.clusterSize; i++ {
var curls []string
var curl, curltls string
var curl string
port := cfg.basePort + 5*i
clientPort := port
clientHttpPort := port + 4
curlHost := fmt.Sprintf("localhost:%d", port)

switch cfg.clientTLS {
case clientNonTLS, clientTLS:
curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String()
if cfg.clientTLS == clientTLSAndNonTLS {
curl = clientURL(clientPort, clientNonTLS)
curls = []string{curl, clientURL(clientPort, clientTLS)}
} else {
curl = clientURL(clientPort, cfg.clientTLS)
curls = []string{curl}
case clientTLSAndNonTLS:
curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
curls = []string{curl, curltls}
}

purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
Expand All @@ -279,9 +277,10 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []*
"--data-dir", dataDirPath,
"--snapshot-count", fmt.Sprintf("%d", cfg.snapshotCount),
}
var clientHttpUrl string
if cfg.clientHttpSeparate {
clientHttpUrl := url.URL{Scheme: cfg.clientScheme(), Host: fmt.Sprintf("localhost:%d", clientHttpPort)}
args = append(args, "--listen-client-http-urls", clientHttpUrl.String())
clientHttpUrl = clientURL(clientHttpPort, cfg.clientTLS)
args = append(args, "--listen-client-http-urls", clientHttpUrl)
}
args = addV2Args(args)
if cfg.forceNewCluster {
Expand Down Expand Up @@ -342,18 +341,19 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []*
}

etcdCfgs[i] = &etcdServerProcessConfig{
lg: lg,
execPath: cfg.execPath,
args: args,
envVars: cfg.envVars,
tlsArgs: cfg.tlsArgs(),
dataDirPath: dataDirPath,
keepDataDir: cfg.keepDataDir,
name: name,
purl: purl,
acurl: curl,
murl: murl,
initialToken: cfg.initialToken,
lg: lg,
execPath: cfg.execPath,
args: args,
envVars: cfg.envVars,
tlsArgs: cfg.tlsArgs(),
dataDirPath: dataDirPath,
keepDataDir: cfg.keepDataDir,
name: name,
purl: purl,
acurl: curl,
murl: murl,
initialToken: cfg.initialToken,
clientHttpUrl: clientHttpUrl,
}
}

Expand All @@ -366,6 +366,18 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []*
return etcdCfgs
}

func clientURL(port int, connType clientConnType) string {
curlHost := fmt.Sprintf("localhost:%d", port)
switch connType {
case clientNonTLS:
return (&url.URL{Scheme: "http", Host: curlHost}).String()
case clientTLS:
return (&url.URL{Scheme: "https", Host: curlHost}).String()
default:
panic(fmt.Sprintf("Unsupported connection type %v", connType))
}
}

func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
if cfg.clientTLS != clientNonTLS {
if cfg.isClientAutoTLS {
Expand Down
45 changes: 29 additions & 16 deletions tests/e2e/cmux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ import (
func TestConnectionMultiplexing(t *testing.T) {
BeforeTest(t)
for _, tc := range []struct {
name string
serverTLS clientConnType
name string
serverTLS clientConnType
separateHttpPort bool
}{
{
name: "ServerTLS",
Expand All @@ -52,10 +53,20 @@ func TestConnectionMultiplexing(t *testing.T) {
name: "ServerTLSAndNonTLS",
serverTLS: clientTLSAndNonTLS,
},
{
name: "SeparateHTTP/ServerTLS",
serverTLS: clientTLS,
separateHttpPort: true,
},
{
name: "SeparateHTTP/ServerNonTLS",
serverTLS: clientNonTLS,
separateHttpPort: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
cfg := etcdProcessClusterConfig{clusterSize: 1, clientTLS: tc.serverTLS, enableV2: true}
cfg := etcdProcessClusterConfig{clusterSize: 1, clientTLS: tc.serverTLS, enableV2: true, clientHttpSeparate: tc.separateHttpPort}
clus, err := newEtcdProcessCluster(t, &cfg)
require.NoError(t, err)
defer clus.Close()
Expand All @@ -76,43 +87,45 @@ func TestConnectionMultiplexing(t *testing.T) {
name = "ClientTLS"
}
t.Run(name, func(t *testing.T) {
testConnectionMultiplexing(ctx, t, clus.EndpointsV3()[0], connType)
testConnectionMultiplexing(ctx, t, clus.procs[0], connType)
})
}
})
}

}

func testConnectionMultiplexing(ctx context.Context, t *testing.T, endpoint string, connType clientConnType) {
func testConnectionMultiplexing(ctx context.Context, t *testing.T, member etcdProcess, connType clientConnType) {
httpEndpoint := member.EndpointsHTTP()[0]
grpcEndpoint := member.EndpointsGRPC()[0]
switch connType {
case clientTLS:
endpoint = toTLS(endpoint)
httpEndpoint = toTLS(httpEndpoint)
grpcEndpoint = toTLS(grpcEndpoint)
case clientNonTLS:
default:
panic(fmt.Sprintf("Unsupported conn type %v", connType))
}
t.Run("etcdctl", func(t *testing.T) {
t.Run("v2", func(t *testing.T) {
etcdctl := NewEtcdctl([]string{endpoint}, connType, false, true)
etcdctl := NewEtcdctl([]string{httpEndpoint}, connType, false, true)
err := etcdctl.Set("a", "1")
assert.NoError(t, err)
})
t.Run("v3", func(t *testing.T) {
etcdctl := NewEtcdctl([]string{endpoint}, connType, false, false)
etcdctl := NewEtcdctl([]string{grpcEndpoint}, connType, false, false)
err := etcdctl.Put("a", "1")
assert.NoError(t, err)
})
})
t.Run("clientv2", func(t *testing.T) {
c, err := newClientV2(t, []string{endpoint}, connType, false)
c, err := newClientV2(t, []string{httpEndpoint}, connType, false)
require.NoError(t, err)
kv := clientv2.NewKeysAPI(c)
_, err = kv.Set(ctx, "a", "1", nil)
assert.NoError(t, err)
})
t.Run("clientv3", func(t *testing.T) {
c := newClient(t, []string{endpoint}, connType, false)
c := newClient(t, []string{grpcEndpoint}, connType, false)
_, err := c.Get(ctx, "a")
assert.NoError(t, err)
})
Expand All @@ -123,11 +136,11 @@ func testConnectionMultiplexing(ctx context.Context, t *testing.T, endpoint stri
tname = "default"
}
t.Run(tname, func(t *testing.T) {
assert.NoError(t, fetchGrpcGateway(endpoint, httpVersion, connType))
assert.NoError(t, fetchMetrics(endpoint, httpVersion, connType))
assert.NoError(t, fetchVersion(endpoint, httpVersion, connType))
assert.NoError(t, fetchHealth(endpoint, httpVersion, connType))
assert.NoError(t, fetchDebugVars(endpoint, httpVersion, connType))
assert.NoError(t, fetchGrpcGateway(httpEndpoint, httpVersion, connType))
assert.NoError(t, fetchMetrics(httpEndpoint, httpVersion, connType))
assert.NoError(t, fetchVersion(httpEndpoint, httpVersion, connType))
assert.NoError(t, fetchHealth(httpEndpoint, httpVersion, connType))
assert.NoError(t, fetchDebugVars(httpEndpoint, httpVersion, connType))
})
}
})
Expand Down
18 changes: 14 additions & 4 deletions tests/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ var (
type etcdProcess interface {
EndpointsV2() []string
EndpointsV3() []string
EndpointsGRPC() []string
EndpointsHTTP() []string
EndpointsMetrics() []string

Start() error
Expand Down Expand Up @@ -72,8 +74,9 @@ type etcdServerProcessConfig struct {

purl url.URL

acurl string
murl string
acurl string
murl string
clientHttpUrl string

initialToken string
initialCluster string
Expand All @@ -91,8 +94,15 @@ func newEtcdServerProcess(cfg *etcdServerProcessConfig) (*etcdServerProcess, err
return &etcdServerProcess{cfg: cfg, donec: make(chan struct{})}, nil
}

func (ep *etcdServerProcess) EndpointsV2() []string { return []string{ep.cfg.acurl} }
func (ep *etcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2() }
func (ep *etcdServerProcess) EndpointsV2() []string { return ep.EndpointsHTTP() }
func (ep *etcdServerProcess) EndpointsV3() []string { return ep.EndpointsGRPC() }
func (ep *etcdServerProcess) EndpointsGRPC() []string { return []string{ep.cfg.acurl} }
func (ep *etcdServerProcess) EndpointsHTTP() []string {
if ep.cfg.clientHttpUrl == "" {
return []string{ep.cfg.acurl}
}
return []string{ep.cfg.clientHttpUrl}
}
func (ep *etcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.murl} }

func (ep *etcdServerProcess) Start() error {
Expand Down

0 comments on commit 6637aee

Please sign in to comment.