Skip to content
/ etcd Public
forked from etcd-io/etcd

Commit

Permalink
client: do not timeout when wait is true
Browse files Browse the repository at this point in the history
Current V2 watch waits by encoding URL with wait=true.
When a client sets 'no-sync', it requests directly to
proxy and the proxy redirects it by cloning the request
object, which leads to cancel the original request when
it times out and the cloned request gets closed prematurely.

This fixes etcd-io#3894 by querying
the original client request in order to not use context timeout
when 'wait=true'.
  • Loading branch information
gyuho committed Jan 22, 2016
1 parent 1db0148 commit c595075
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 13 deletions.
17 changes: 16 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/url"
"reflect"
"sort"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -123,6 +124,8 @@ type Config struct {
// watch start. But if server is behind some kind of proxy, the response
// header may be cached at proxy, and Client cannot rely on this behavior.
//
// Especially, wait request will ignore this timeout.
//
// One API call may send multiple requests to different etcd servers until it
// succeeds. Use context of the API to specify the overall timeout.
//
Expand Down Expand Up @@ -442,9 +445,21 @@ func (c *simpleHTTPClient) Do(ctx context.Context, act httpAction) (*http.Respon
return nil, nil, err
}

isWait := false
if req != nil && req.URL != nil {
ws := req.URL.Query().Get("wait")
if len(ws) != 0 {
var err error
isWait, err = strconv.ParseBool(ws)
if err != nil {
return nil, nil, fmt.Errorf("wrong wait value %s (%v for %+v)", ws, err, req)
}
}
}

var hctx context.Context
var hcancel context.CancelFunc
if c.headerTimeout > 0 {
if !isWait && c.headerTimeout > 0 {
hctx, hcancel = context.WithTimeout(ctx, c.headerTimeout)
} else {
hctx, hcancel = context.WithCancel(ctx)
Expand Down
194 changes: 182 additions & 12 deletions e2e/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ package e2e
import (
"fmt"
"math/rand"
"net"
"net/url"
"os"
"strings"
"sync"
"testing"
"time"

"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/gexpect"
"github.com/coreos/etcd/pkg/fileutil"
Expand All @@ -42,6 +45,7 @@ func TestBasicOpsNoTLS(t *testing.T) {
isPeerTLS: false,
initialToken: "new",
},
etcdProcessBasePort,
)
}

Expand All @@ -54,6 +58,7 @@ func TestBasicOpsAllTLS(t *testing.T) {
isPeerTLS: true,
initialToken: "new",
},
etcdProcessBasePort+10,
)
}

Expand All @@ -66,6 +71,7 @@ func TestBasicOpsPeerTLS(t *testing.T) {
isPeerTLS: true,
initialToken: "new",
},
etcdProcessBasePort+20,
)
}

Expand All @@ -78,11 +84,12 @@ func TestBasicOpsClientTLS(t *testing.T) {
isPeerTLS: false,
initialToken: "new",
},
etcdProcessBasePort+30,
)
}

func testProcessClusterPutGet(t *testing.T, cfg *etcdProcessClusterConfig) {
epc, err := newEtcdProcessCluster(cfg)
func testProcessClusterPutGet(t *testing.T, cfg *etcdProcessClusterConfig, basePort int) {
epc, err := newEtcdProcessCluster(cfg, basePort)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
Expand All @@ -103,6 +110,107 @@ func testProcessClusterPutGet(t *testing.T, cfg *etcdProcessClusterConfig) {
}
}

func TestBasicOpsV2CtlWatchWithProxy(t *testing.T) {
testProcessClusterV2CtlWatch(
t,
&etcdProcessClusterConfig{
clusterSize: 3,
proxySize: 1,
isClientTLS: false,
isPeerTLS: false,
initialToken: "new",
},
etcdProcessBasePort+40,
false,
"foo", "bar",
)
}

func TestBasicOpsV2CtlWatchWithProxyNoSync(t *testing.T) {
testProcessClusterV2CtlWatch(
t,
&etcdProcessClusterConfig{
clusterSize: 3,
proxySize: 1,
isClientTLS: false,
isPeerTLS: false,
initialToken: "new",
},
etcdProcessBasePort+50,
true,
"foo", "bar",
)
}

func testProcessClusterV2CtlWatch(t *testing.T, cfg *etcdProcessClusterConfig, basePort int, noSync bool, key, value string) {
if fileutil.Exist("../bin/etcdctl") == false {
t.Fatalf("could not find etcdctl binary")
}

epc, errC := newEtcdProcessCluster(cfg, basePort)
if errC != nil {
t.Fatalf("could not start etcd process cluster (%v)", errC)
}
defer func() {
if errC := epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
}
}()

endpoint, be := "", ""
for _, p := range epc.procs {
if p.cfg.isProxy {
endpoint = p.cfg.acurl.String()
time.Sleep(3 * time.Second)
break
} else {
be = p.cfg.acurl.String()
}
}
if endpoint == "" {
endpoint = be
}

args := []string{"../bin/etcdctl", "--endpoint", endpoint}
if noSync {
args = append(args, "--no-sync")
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
args = append(args, "watch", key)
proc, err := spawnCmd(args)
if err != nil {
t.Fatalf("failed watch (%v)", err)
}
defer proc.Close()
s, _ := proc.ReadLine()
if strings.Contains(s, "client: etcd cluster is unavailable or misconfigured") {
t.Fatalf("failed watch (%v)", s)
}
}()

putArgs := []string{"../bin/etcdctl", "--endpoint", endpoint}
if noSync {
args = append(args, "--no-sync")
}
putArgs = append(putArgs, "set", key, value)
proc, err := spawnCmd(putArgs)
if err != nil {
t.Fatalf("failed put (%v)", err)
}
defer proc.Close()

s, _ := proc.ReadLine()
if !strings.Contains(s, value) {
t.Fatalf("failed put (%v)", s)
}

wg.Wait()
}

// cURLPrefixArgs builds the beginning of a curl command for a given key
// addressed to a random URL in the given cluster.
func cURLPrefixArgs(clus *etcdProcessCluster, key string) []string {
Expand Down Expand Up @@ -140,22 +248,24 @@ type etcdProcessConfig struct {
args []string
dataDirPath string
acurl url.URL
isProxy bool
}

type etcdProcessClusterConfig struct {
clusterSize int
proxySize int
isClientTLS bool
isPeerTLS bool
initialToken string
}

// newEtcdProcessCluster launches a new cluster from etcd processes, returning
// a new etcdProcessCluster once all nodes are ready to accept client requests.
func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
etcdCfgs := cfg.etcdProcessConfigs()
func newEtcdProcessCluster(cfg *etcdProcessClusterConfig, basePort int) (*etcdProcessCluster, error) {
etcdCfgs := cfg.etcdProcessConfigs(basePort)
epc := &etcdProcessCluster{
cfg: cfg,
procs: make([]*etcdProcess, cfg.clusterSize),
procs: make([]*etcdProcess, cfg.clusterSize+cfg.proxySize),
}

// launch etcd processes
Expand All @@ -169,14 +279,25 @@ func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster,
}

// wait for cluster to start
readyC := make(chan error, cfg.clusterSize)
readyStr := "set the initial cluster version"
readyC := make(chan error, cfg.clusterSize+cfg.proxySize)
readyStr := "etcdserver: published"
readyStrProxy := "etcdmain: proxy: listening for client requests on"
for i := range etcdCfgs {
go func(etcdp *etcdProcess) {
_, err := etcdp.proc.ExpectRegex(readyStr)
var err error
checkSt := readyStr
if etcdp.cfg.isProxy {
checkSt = readyStrProxy
}
ready := false
for !ready {
var l string
l, err = etcdp.proc.ReadLine()
if strings.Contains(l, checkSt) {
ready = true
}
}
readyC <- err
etcdp.proc.ReadUntil('\n') // don't display rest of line
etcdp.proc.Interact()
close(etcdp.donec)
}(epc.procs[i])
}
Expand Down Expand Up @@ -204,7 +325,19 @@ func newEtcdProcess(cfg *etcdProcessConfig) (*etcdProcess, error) {
return &etcdProcess{cfg: cfg, proc: child, donec: make(chan struct{})}, nil
}

func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
func freePort() (int, error) {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
return 0, err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, err
}
return l.Addr().(*net.TCPAddr).Port, nil
}

func (cfg *etcdProcessClusterConfig) etcdProcessConfigs(basePort int) []*etcdProcessConfig {
clientScheme := "http"
if cfg.isClientTLS {
clientScheme = "https"
Expand All @@ -217,7 +350,7 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
etcdCfgs := make([]*etcdProcessConfig, cfg.clusterSize)
initialCluster := make([]string, cfg.clusterSize)
for i := 0; i < cfg.clusterSize; i++ {
port := etcdProcessBasePort + 2*i
port := basePort + 2*i
curl := url.URL{Scheme: clientScheme, Host: fmt.Sprintf("localhost:%d", port)}
purl := url.URL{Scheme: peerScheme, Host: fmt.Sprintf("localhost:%d", port+1)}
name := fmt.Sprintf("testname%d", i)
Expand Down Expand Up @@ -257,6 +390,43 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
}
}

for i := 0; i < cfg.proxySize; i++ {
port := basePort + 2*cfg.clusterSize + i
curl := url.URL{Scheme: clientScheme, Host: fmt.Sprintf("localhost:%d", port)}
name := fmt.Sprintf("testname-proxy%d", i)
dataDirPath := name + ".etcd"

args := []string{
"--name", name,
"--proxy", "on",
"--listen-client-urls", curl.String(),
"--data-dir", dataDirPath,
}
if cfg.isClientTLS {
tlsClientArgs := []string{
"--cert-file", certPath,
"--key-file", privateKeyPath,
"--ca-file", caPath,
}
args = append(args, tlsClientArgs...)
}
if cfg.isPeerTLS {
tlsPeerArgs := []string{
"--peer-cert-file", certPath,
"--peer-key-file", privateKeyPath,
"--peer-ca-file", caPath,
}
args = append(args, tlsPeerArgs...)
}

etcdCfgs = append(etcdCfgs, &etcdProcessConfig{
args: args,
dataDirPath: dataDirPath,
acurl: curl,
isProxy: true,
})
}

initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
for i := range etcdCfgs {
etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
Expand Down

0 comments on commit c595075

Please sign in to comment.