Skip to content

Commit

Permalink
clientv3: Fix TLS test failures by returning DeadlineExceeded error f…
Browse files Browse the repository at this point in the history
…rom dial without any additional wrapping
  • Loading branch information
jpbetz authored and gyuho committed Jun 15, 2018
1 parent ee2747e commit 9304d1a
Show file tree
Hide file tree
Showing 13 changed files with 157 additions and 92 deletions.
5 changes: 2 additions & 3 deletions clientv3/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/coreos/etcd/auth/authpb"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"

"google.golang.org/grpc"
)

Expand Down Expand Up @@ -216,8 +215,8 @@ func (auth *authenticator) close() {
auth.conn.Close()
}

func newAuthenticator(ctx context.Context, endpoint string, opts []grpc.DialOption, c *Client) (*authenticator, error) {
conn, err := grpc.DialContext(ctx, endpoint, opts...)
func newAuthenticator(ctx context.Context, target string, opts []grpc.DialOption, c *Client) (*authenticator, error) {
conn, err := grpc.DialContext(ctx, target, opts...)
if err != nil {
return nil, err
}
Expand Down
40 changes: 20 additions & 20 deletions clientv3/balancer/resolver/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// resolves to etcd entpoints for grpc targets of the form 'endpoint://<cluster-name>/<endpoint>'.
// Package endpoint resolves etcd entpoints using grpc targets of the form 'endpoint://<clientId>/<endpoint>'.
package endpoint

import (
Expand All @@ -36,13 +36,13 @@ var (

func init() {
bldr = &builder{
clusterResolvers: make(map[string]*Resolver),
clientResolvers: make(map[string]*Resolver),
}
resolver.Register(bldr)
}

type builder struct {
clusterResolvers map[string]*Resolver
clientResolvers map[string]*Resolver
sync.RWMutex
}

Expand All @@ -59,30 +59,30 @@ func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts res
return r, nil
}

func (b *builder) getResolver(clusterName string) *Resolver {
func (b *builder) getResolver(clientId string) *Resolver {
b.RLock()
r, ok := b.clusterResolvers[clusterName]
r, ok := b.clientResolvers[clientId]
b.RUnlock()
if !ok {
r = &Resolver{
clusterName: clusterName,
clientId: clientId,
}
b.Lock()
b.clusterResolvers[clusterName] = r
b.clientResolvers[clientId] = r
b.Unlock()
}
return r
}

func (b *builder) addResolver(r *Resolver) {
bldr.Lock()
bldr.clusterResolvers[r.clusterName] = r
bldr.clientResolvers[r.clientId] = r
bldr.Unlock()
}

func (b *builder) removeResolver(r *Resolver) {
bldr.Lock()
delete(bldr.clusterResolvers, r.clusterName)
delete(bldr.clientResolvers, r.clientId)
bldr.Unlock()
}

Expand All @@ -91,15 +91,15 @@ func (r *builder) Scheme() string {
}

// EndpointResolver gets the resolver for given etcd cluster name.
func EndpointResolver(clusterName string) *Resolver {
return bldr.getResolver(clusterName)
func EndpointResolver(clientId string) *Resolver {
return bldr.getResolver(clientId)
}

// Resolver provides a resolver for a single etcd cluster, identified by name.
type Resolver struct {
clusterName string
cc resolver.ClientConn
addrs []resolver.Address
clientId string
cc resolver.ClientConn
addrs []resolver.Address
sync.RWMutex
}

Expand Down Expand Up @@ -146,14 +146,14 @@ func (r *Resolver) Close() {
bldr.removeResolver(r)
}

// Target constructs a endpoint target with current resolver's clusterName.
// Target constructs a endpoint target with current resolver's clientId.
func (r *Resolver) Target(endpoint string) string {
return Target(r.clusterName, endpoint)
return Target(r.clientId, endpoint)
}

// Target constructs a endpoint resolver target.
func Target(clusterName, endpoint string) string {
return fmt.Sprintf("%s://%s/%s", scheme, clusterName, endpoint)
func Target(clientId, endpoint string) string {
return fmt.Sprintf("%s://%s/%s", scheme, clientId, endpoint)
}

// IsTarget checks if a given target string in an endpoint resolver target.
Expand Down Expand Up @@ -185,7 +185,7 @@ func ParseEndpoint(endpoint string) (proto string, host string, scheme string) {
return proto, host, scheme
}

// ParseTarget parses a endpoint://<clusterName>/<endpoint> string and returns the parsed clusterName and endpoint.
// ParseTarget parses a endpoint://<clientId>/<endpoint> string and returns the parsed clientId and endpoint.
// If the target is malformed, an error is returned.
func ParseTarget(target string) (string, string, error) {
noPrefix := strings.TrimPrefix(target, targetPrefix)
Expand All @@ -194,7 +194,7 @@ func ParseTarget(target string) (string, string, error) {
}
parts := strings.SplitN(noPrefix, "/", 2)
if len(parts) != 2 {
return "", "", fmt.Errorf("malformed target, expected %s://<clusterName>/<endpoint>, but got %s", scheme, target)
return "", "", fmt.Errorf("malformed target, expected %s://<clientId>/<endpoint>, but got %s", scheme, target)
}
return parts[0], parts[1], nil
}
12 changes: 7 additions & 5 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,15 +297,17 @@ func (c *Client) getToken(ctx context.Context) error {
var auth *authenticator

for i := 0; i < len(c.cfg.Endpoints); i++ {
endpoint := c.cfg.Endpoints[i]
ep := c.cfg.Endpoints[i]
// use dial options without dopts to avoid reusing the client balancer
var dOpts []grpc.DialOption
dOpts, err = c.dialSetupOpts(c.resolver.Target(endpoint), c.cfg.DialOptions...)
_, host, _ := endpoint.ParseEndpoint(ep)
target := c.resolver.Target(host)
dOpts, err = c.dialSetupOpts(target, c.cfg.DialOptions...)
if err != nil {
err = fmt.Errorf("failed to configure auth dialer: %v", err)
continue
}
auth, err = newAuthenticator(ctx, endpoint, dOpts, c)
auth, err = newAuthenticator(ctx, target, dOpts, c)
if err != nil {
continue
}
Expand Down Expand Up @@ -369,7 +371,7 @@ func (c *Client) dial(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, er
if c.cfg.DialTimeout > 0 {
var cancel context.CancelFunc
dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
defer cancel()
defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
}

conn, err := grpc.DialContext(dctx, target, opts...)
Expand Down Expand Up @@ -456,7 +458,7 @@ func newClient(cfg *Config) (*Client, error) {
if err != nil {
client.cancel()
client.resolver.Close()
return nil, fmt.Errorf("failed to dial initial client connection: %v", err)
return nil, err
}
// TODO: With the old grpc balancer interface, we waited until the dial timeout
// for the balancer to be ready. Is there an equivalent wait we should do with the new grpc balancer interface?
Expand Down
15 changes: 9 additions & 6 deletions clientv3/integration/black_hole_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
"google.golang.org/grpc"
)

// TestBalancerUnderBlackholeKeepAliveWatch tests when watch discovers it cannot talk to
Expand All @@ -44,6 +45,7 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) {
ccfg := clientv3.Config{
Endpoints: []string{eps[0]},
DialTimeout: 1 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
DialKeepAliveTime: 1 * time.Second,
DialKeepAliveTimeout: 500 * time.Millisecond,
}
Expand Down Expand Up @@ -106,7 +108,7 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) {
func TestBalancerUnderBlackholeNoKeepAlivePut(t *testing.T) {
testBalancerUnderBlackholeNoKeepAlive(t, func(cli *clientv3.Client, ctx context.Context) error {
_, err := cli.Put(ctx, "foo", "bar")
if err == context.DeadlineExceeded || isServerCtxTimeout(err) || err == rpctypes.ErrTimeout {
if isClientTimeout(err) || isServerCtxTimeout(err) || err == rpctypes.ErrTimeout {
return errExpected
}
return err
Expand All @@ -116,7 +118,7 @@ func TestBalancerUnderBlackholeNoKeepAlivePut(t *testing.T) {
func TestBalancerUnderBlackholeNoKeepAliveDelete(t *testing.T) {
testBalancerUnderBlackholeNoKeepAlive(t, func(cli *clientv3.Client, ctx context.Context) error {
_, err := cli.Delete(ctx, "foo")
if err == context.DeadlineExceeded || isServerCtxTimeout(err) || err == rpctypes.ErrTimeout {
if isClientTimeout(err) || isServerCtxTimeout(err) || err == rpctypes.ErrTimeout {
return errExpected
}
return err
Expand All @@ -129,7 +131,7 @@ func TestBalancerUnderBlackholeNoKeepAliveTxn(t *testing.T) {
If(clientv3.Compare(clientv3.Version("foo"), "=", 0)).
Then(clientv3.OpPut("foo", "bar")).
Else(clientv3.OpPut("foo", "baz")).Commit()
if err == context.DeadlineExceeded || isServerCtxTimeout(err) || err == rpctypes.ErrTimeout {
if isClientTimeout(err) || isServerCtxTimeout(err) || err == rpctypes.ErrTimeout {
return errExpected
}
return err
Expand All @@ -139,7 +141,7 @@ func TestBalancerUnderBlackholeNoKeepAliveTxn(t *testing.T) {
func TestBalancerUnderBlackholeNoKeepAliveLinearizableGet(t *testing.T) {
testBalancerUnderBlackholeNoKeepAlive(t, func(cli *clientv3.Client, ctx context.Context) error {
_, err := cli.Get(ctx, "a")
if err == context.DeadlineExceeded || isServerCtxTimeout(err) || err == rpctypes.ErrTimeout {
if isClientTimeout(err) || isServerCtxTimeout(err) || err == rpctypes.ErrTimeout {
return errExpected
}
return err
Expand All @@ -149,7 +151,7 @@ func TestBalancerUnderBlackholeNoKeepAliveLinearizableGet(t *testing.T) {
func TestBalancerUnderBlackholeNoKeepAliveSerializableGet(t *testing.T) {
testBalancerUnderBlackholeNoKeepAlive(t, func(cli *clientv3.Client, ctx context.Context) error {
_, err := cli.Get(ctx, "a", clientv3.WithSerializable())
if err == context.DeadlineExceeded || isServerCtxTimeout(err) {
if isClientTimeout(err) || isServerCtxTimeout(err) {
return errExpected
}
return err
Expand All @@ -172,6 +174,7 @@ func testBalancerUnderBlackholeNoKeepAlive(t *testing.T, op func(*clientv3.Clien
ccfg := clientv3.Config{
Endpoints: []string{eps[0]},
DialTimeout: 1 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
}
cli, err := clientv3.New(ccfg)
if err != nil {
Expand All @@ -193,7 +196,7 @@ func testBalancerUnderBlackholeNoKeepAlive(t *testing.T, op func(*clientv3.Clien
// TODO: first operation can succeed
// when gRPC supports better retry on non-delivered request
for i := 0; i < 2; i++ {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
err = op(cli, ctx)
cancel()
if err == nil {
Expand Down
27 changes: 21 additions & 6 deletions clientv3/integration/dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/transport"
"google.golang.org/grpc"
)

var (
Expand Down Expand Up @@ -58,10 +59,11 @@ func TestDialTLSExpired(t *testing.T) {
_, err = clientv3.New(clientv3.Config{
Endpoints: []string{clus.Members[0].GRPCAddr()},
DialTimeout: 3 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
TLS: tls,
})
if err != context.DeadlineExceeded {
t.Fatalf("expected %v, got %v", context.DeadlineExceeded, err)
if !isClientTimeout(err) {
t.Fatalf("expected dial timeout error, got %v", err)
}
}

Expand All @@ -72,12 +74,19 @@ func TestDialTLSNoConfig(t *testing.T) {
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, ClientTLS: &testTLSInfo, SkipCreatingClient: true})
defer clus.Terminate(t)
// expect "signed by unknown authority"
_, err := clientv3.New(clientv3.Config{
c, err := clientv3.New(clientv3.Config{
Endpoints: []string{clus.Members[0].GRPCAddr()},
DialTimeout: time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
})
if err != context.DeadlineExceeded {
t.Fatalf("expected %v, got %v", context.DeadlineExceeded, err)
defer c.Close()

// TODO: this should not be required when we set grpc.WithBlock()
if c != nil {
_, err = c.KV.Get(context.Background(), "/")
}
if !isClientTimeout(err) {
t.Fatalf("expected dial timeout error, got %v", err)
}
}

Expand All @@ -104,7 +113,11 @@ func testDialSetEndpoints(t *testing.T, setBefore bool) {
}
toKill := rand.Intn(len(eps))

cfg := clientv3.Config{Endpoints: []string{eps[toKill]}, DialTimeout: 1 * time.Second}
cfg := clientv3.Config{
Endpoints: []string{eps[toKill]},
DialTimeout: 1 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
}
cli, err := clientv3.New(cfg)
if err != nil {
t.Fatal(err)
Expand All @@ -121,6 +134,7 @@ func testDialSetEndpoints(t *testing.T, setBefore bool) {
if !setBefore {
cli.SetEndpoints(eps[toKill%3], eps[(toKill+1)%3])
}
time.Sleep(time.Second * 2)
ctx, cancel := context.WithTimeout(context.Background(), integration.RequestWaitTimeout)
if _, err = cli.Get(ctx, "foo", clientv3.WithSerializable()); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -158,6 +172,7 @@ func TestRejectOldCluster(t *testing.T) {
cfg := clientv3.Config{
Endpoints: []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()},
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
RejectOldCluster: true,
}
cli, err := clientv3.New(cfg)
Expand Down
14 changes: 8 additions & 6 deletions clientv3/integration/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@ func TestKVGetRetry(t *testing.T) {

time.Sleep(100 * time.Millisecond)
clus.Members[fIdx].Restart(t)
clus.Members[fIdx].WaitOK(t)

select {
case <-time.After(5 * time.Second):
Expand Down Expand Up @@ -792,7 +793,7 @@ func TestKVGetStoppedServerAndClose(t *testing.T) {
// this Get fails and triggers an asynchronous connection retry
_, err := cli.Get(ctx, "abc")
cancel()
if err != nil && err != context.DeadlineExceeded {
if err != nil && !isServerUnavailable(err) {
t.Fatal(err)
}
}
Expand All @@ -814,14 +815,15 @@ func TestKVPutStoppedServerAndClose(t *testing.T) {
// grpc finds out the original connection is down due to the member shutdown.
_, err := cli.Get(ctx, "abc")
cancel()
if err != nil && err != context.DeadlineExceeded {
if err != nil && !isServerUnavailable(err) {
t.Fatal(err)
}

ctx, cancel = context.WithTimeout(context.TODO(), time.Second) // TODO: How was this test not consistently failing with context canceled errors?
// this Put fails and triggers an asynchronous connection retry
_, err = cli.Put(ctx, "abc", "123")
cancel()
if err != nil && err != context.DeadlineExceeded {
if err != nil && !isServerUnavailable(err) {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -906,7 +908,7 @@ func TestKVLargeRequests(t *testing.T) {
maxCallSendBytesClient: 10 * 1024 * 1024,
maxCallRecvBytesClient: 0,
valueSize: 10 * 1024 * 1024,
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max "),
expectError: grpc.Errorf(codes.ResourceExhausted, "trying to send message larger than max "),
},
{
maxRequestBytesServer: 10 * 1024 * 1024,
Expand All @@ -920,7 +922,7 @@ func TestKVLargeRequests(t *testing.T) {
maxCallSendBytesClient: 10 * 1024 * 1024,
maxCallRecvBytesClient: 0,
valueSize: 10*1024*1024 + 5,
expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max "),
expectError: grpc.Errorf(codes.ResourceExhausted, "trying to send message larger than max "),
},
}
for i, test := range tests {
Expand All @@ -940,7 +942,7 @@ func TestKVLargeRequests(t *testing.T) {
t.Errorf("#%d: expected %v, got %v", i, test.expectError, err)
}
} else if err != nil && !strings.HasPrefix(err.Error(), test.expectError.Error()) {
t.Errorf("#%d: expected %v, got %v", i, test.expectError, err)
t.Errorf("#%d: expected error starting with '%s', got '%s'", i, test.expectError.Error(), err.Error())
}

// put request went through, now expects large response back
Expand Down
Loading

0 comments on commit 9304d1a

Please sign in to comment.