Skip to content

Commit

Permalink
Merge pull request #9860 from gyuho/new-balancer-april-2018
Browse files Browse the repository at this point in the history
clientv3: Upgrade to round robin balancer based on gRPC 1.12 balancer API
  • Loading branch information
gyuho authored Jun 16, 2018
2 parents aa03eba + cb6e9d2 commit d866cf8
Show file tree
Hide file tree
Showing 125 changed files with 11,570 additions and 4,073 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG-3.4.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.3.0...v3.4.0) and [

### Improved

- Rewrite [client balancer](TODO) with [new gRPC balancer interface](TODO).
- Rewrite [client balancer](https://github.com/coreos/etcd/pull/9860) with [new gRPC balancer interface](https://github.com/coreos/etcd/issues/9106).
- Add [backoff on watch retries on transient errors](https://github.com/coreos/etcd/pull/9840).
- Add [jitter to watch progress notify](https://github.com/coreos/etcd/pull/9278) to prevent [spikes in `etcd_network_client_grpc_sent_bytes_total`](https://github.com/coreos/etcd/issues/9246).
- Improve [slow request apply warning log](https://github.com/coreos/etcd/pull/9288).
Expand Down
19 changes: 16 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions bill-of-materials.json
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@
}
]
},
{
"project": "github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils",
"licenses": [
{
"type": "Apache License 2.0",
"confidence": 1
}
]
},
{
"project": "github.com/grpc-ecosystem/go-grpc-prometheus",
"licenses": [
Expand Down
5 changes: 2 additions & 3 deletions clientv3/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/coreos/etcd/auth/authpb"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"

"google.golang.org/grpc"
)

Expand Down Expand Up @@ -216,8 +215,8 @@ func (auth *authenticator) close() {
auth.conn.Close()
}

func newAuthenticator(endpoint string, opts []grpc.DialOption, c *Client) (*authenticator, error) {
conn, err := grpc.Dial(endpoint, opts...)
func newAuthenticator(ctx context.Context, target string, opts []grpc.DialOption, c *Client) (*authenticator, error) {
conn, err := grpc.DialContext(ctx, target, opts...)
if err != nil {
return nil, err
}
Expand Down
275 changes: 275 additions & 0 deletions clientv3/balancer/balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package balancer

import (
"fmt"
"strconv"
"sync"
"time"

"github.com/coreos/etcd/clientv3/balancer/picker"

"go.uber.org/zap"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
_ "google.golang.org/grpc/resolver/dns" // register DNS resolver
_ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
)

// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
// must be invoked at initialization time.
func RegisterBuilder(cfg Config) {
bb := &builder{cfg}
balancer.Register(bb)

bb.cfg.Logger.Info(
"registered balancer",
zap.String("policy", bb.cfg.Policy.String()),
zap.String("name", bb.cfg.Name),
)
}

type builder struct {
cfg Config
}

// Build is called initially when creating "ccBalancerWrapper".
// "grpc.Dial" is called to this client connection.
// Then, resolved addresses will be handled via "HandleResolvedAddrs".
func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
bb := &baseBalancer{
id: strconv.FormatInt(time.Now().UnixNano(), 36),
policy: b.cfg.Policy,
name: b.cfg.Policy.String(),
lg: b.cfg.Logger,

addrToSc: make(map[resolver.Address]balancer.SubConn),
scToAddr: make(map[balancer.SubConn]resolver.Address),
scToSt: make(map[balancer.SubConn]connectivity.State),

currentConn: nil,
csEvltr: &connectivityStateEvaluator{},

// initialize picker always returns "ErrNoSubConnAvailable"
Picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
}
if b.cfg.Name != "" {
bb.name = b.cfg.Name
}
if bb.lg == nil {
bb.lg = zap.NewNop()
}

// TODO: support multiple connections
bb.mu.Lock()
bb.currentConn = cc
bb.mu.Unlock()

bb.lg.Info(
"built balancer",
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
zap.String("resolver-target", cc.Target()),
)
return bb
}

// Name implements "grpc/balancer.Builder" interface.
func (b *builder) Name() string { return b.cfg.Name }

// Balancer defines client balancer interface.
type Balancer interface {
// Balancer is called on specified client connection. Client initiates gRPC
// connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved
// addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs".
// For each resolved address, balancer calls "balancer.ClientConn.NewSubConn".
// "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state
// changes, thus requires failover logic in this method.
balancer.Balancer

// Picker calls "Pick" for every client request.
picker.Picker
}

type baseBalancer struct {
id string
policy picker.Policy
name string
lg *zap.Logger

mu sync.RWMutex

addrToSc map[resolver.Address]balancer.SubConn
scToAddr map[balancer.SubConn]resolver.Address
scToSt map[balancer.SubConn]connectivity.State

currentConn balancer.ClientConn
currentState connectivity.State
csEvltr *connectivityStateEvaluator

picker.Picker
}

// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
// gRPC sends initial or updated resolved addresses from "Build".
func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
if err != nil {
bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
return
}
bb.lg.Info("resolved", zap.String("balancer-id", bb.id), zap.Strings("addresses", addrsToStrings(addrs)))

bb.mu.Lock()
defer bb.mu.Unlock()

resolved := make(map[resolver.Address]struct{})
for _, addr := range addrs {
resolved[addr] = struct{}{}
if _, ok := bb.addrToSc[addr]; !ok {
sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
if err != nil {
bb.lg.Warn("NewSubConn failed", zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
continue
}
bb.addrToSc[addr] = sc
bb.scToAddr[sc] = addr
bb.scToSt[sc] = connectivity.Idle
sc.Connect()
}
}

for addr, sc := range bb.addrToSc {
if _, ok := resolved[addr]; !ok {
// was removed by resolver or failed to create subconn
bb.currentConn.RemoveSubConn(sc)
delete(bb.addrToSc, addr)

bb.lg.Info(
"removed subconn",
zap.String("balancer-id", bb.id),
zap.String("address", addr.Addr),
zap.String("subconn", scToString(sc)),
)

// Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown.
// The entry will be deleted in HandleSubConnStateChange.
// (DO NOT) delete(bb.scToAddr, sc)
// (DO NOT) delete(bb.scToSt, sc)
}
}
}

// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
bb.mu.Lock()
defer bb.mu.Unlock()

old, ok := bb.scToSt[sc]
if !ok {
bb.lg.Warn(
"state change for an unknown subconn",
zap.String("balancer-id", bb.id),
zap.String("subconn", scToString(sc)),
zap.String("state", s.String()),
)
return
}

bb.lg.Info(
"state changed",
zap.String("balancer-id", bb.id),
zap.Bool("connected", s == connectivity.Ready),
zap.String("subconn", scToString(sc)),
zap.String("address", bb.scToAddr[sc].Addr),
zap.String("old-state", old.String()),
zap.String("new-state", s.String()),
)

bb.scToSt[sc] = s
switch s {
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scToSt. Remove state for this sc here.
delete(bb.scToAddr, sc)
delete(bb.scToSt, sc)
}

oldAggrState := bb.currentState
bb.currentState = bb.csEvltr.recordTransition(old, s)

// Regenerate picker when one of the following happens:
// - this sc became ready from not-ready
// - this sc became not-ready from ready
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
if (s == connectivity.Ready) != (old == connectivity.Ready) ||
(bb.currentState == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
bb.regeneratePicker()
}

bb.currentConn.UpdateBalancerState(bb.currentState, bb.Picker)
return
}

func (bb *baseBalancer) regeneratePicker() {
if bb.currentState == connectivity.TransientFailure {
bb.lg.Info(
"generated transient error picker",
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
)
bb.Picker = picker.NewErr(balancer.ErrTransientFailure)
return
}

// only pass ready subconns to picker
scs := make([]balancer.SubConn, 0)
addrToSc := make(map[resolver.Address]balancer.SubConn)
scToAddr := make(map[balancer.SubConn]resolver.Address)
for addr, sc := range bb.addrToSc {
if st, ok := bb.scToSt[sc]; ok && st == connectivity.Ready {
scs = append(scs, sc)
addrToSc[addr] = sc
scToAddr[sc] = addr
}
}

switch bb.policy {
case picker.RoundrobinBalanced:
bb.Picker = picker.NewRoundrobinBalanced(bb.lg, scs, addrToSc, scToAddr)

default:
panic(fmt.Errorf("invalid balancer picker policy (%d)", bb.policy))
}

bb.lg.Info(
"generated picker",
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
zap.Strings("subconn-ready", scsToStrings(addrToSc)),
zap.Int("subconn-size", len(addrToSc)),
)
}

// Close implements "grpc/balancer.Balancer" interface.
// Close is a nop because base balancer doesn't have internal state to clean up,
// and it doesn't need to call RemoveSubConn for the SubConns.
func (bb *baseBalancer) Close() {
// TODO
}
Loading

0 comments on commit d866cf8

Please sign in to comment.