Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clientv3: Upgrade to round robin balancer based on gRPC 1.12 balancer API #9860

Merged
merged 39 commits into from
Jun 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
6e2bf40
vendor: upgrade "grpc/grpc-go" to v1.11.1
gyuho Mar 19, 2018
7fe4a08
clientv3/balancer: initial commit
gyuho Mar 19, 2018
657c2e1
*: introduce mock server for testing load balancing and add a simple …
jpbetz Mar 28, 2018
f1aa428
pkg/mock/mockserver: support restart
gyuho Apr 2, 2018
7c92185
clientv3/balancer: use new mock server in tests
gyuho Apr 2, 2018
370761d
clientv3/balancer: add more failover tests with resolver
gyuho Apr 2, 2018
9867210
clientv3/balancer: add "TestRoundRobinBalancedPassthrough" (WIP)
gyuho Apr 2, 2018
4d2a25b
clientv3/balancer: add endpoints resolver
jpbetz Apr 3, 2018
ed6bc2b
clientv3: add load balancer unix socket test
jpbetz Apr 4, 2018
6080fa1
clientv3: Integrate new grpc load balancer interface with etcd client
jpbetz Apr 6, 2018
f20a117
clientv3: Fix new load balancer integration issues
jpbetz Apr 6, 2018
12acfc0
vendor: upgrade grpc/grpc-go to v1.11.3
jpbetz Apr 9, 2018
309208d
clientv3: Split out grpc balancer builder to ensure there is a balanc…
jpbetz Apr 10, 2018
7ac2a2d
clientv3: Fix dialer for new balancer to correctly handle first are a…
jpbetz Apr 10, 2018
037d7b4
clientv3: dial with context when creating authenticator
gyuho Apr 13, 2018
994a569
clientv3: pass "grpc.WithBlock" on "TestDialTimeout"
gyuho Apr 13, 2018
bb032f3
clientv3: deprecate "grpc.WithTimeout" in favor of "grpc.DialContext"
gyuho Apr 13, 2018
66e65cd
clientv3: Avoid timeouts in ordering test
jpbetz Apr 16, 2018
f84f554
clientv3: Fix auth client to use endpoints instead of host when diali…
jpbetz Apr 20, 2018
ee2747e
clientv3: Fix dial calls to consistently use endpoint resolver, attem…
jpbetz Apr 26, 2018
9304d1a
clientv3: Fix TLS test failures by returning DeadlineExceeded error f…
jpbetz Apr 26, 2018
8569b9c
clientv3: Fix endpoint resolver to create a new resolver for each grp…
jpbetz May 2, 2018
1f6548b
clientv3: Stop expecting retry in integration tests with new grpc bal…
jpbetz May 2, 2018
4065735
clientv3: remove unused "dialerrc"
gyuho May 7, 2018
a5b2fb5
clientv3: Introduce custom retry interceptor based on go-grpc-middlew…
jpbetz May 11, 2018
3130e4d
vendor: add "go-grpc-middleware/util/backoffutils"
gyuho May 22, 2018
55ef9cc
clientv3: Add auth retry to retry interceptor
jpbetz May 22, 2018
05c57a0
integration: Fix unit test failures from new grpc LB changes, fix bom
jpbetz May 24, 2018
3b84117
clientv3/integration: Add err check to TestDialTLSNoConfig to prevent…
jpbetz May 25, 2018
a3032d3
*: fix fmt tests, reenable "testEmbedEtcdGracefulStop"
gyuho Jun 8, 2018
dd520ce
clientv3: put "defaultCallOpts" back to "Client" object
gyuho Jun 8, 2018
08da08b
clientv3: clarify retry function names, do not retry on dial error
gyuho Jun 8, 2018
a766810
clientv3: add "zap.Config" to replace global logger
gyuho Jun 14, 2018
6e521d2
clientv3: add "IsConnCanceled", deprecate "grpc.ErrClientConnClosing"
gyuho Jun 15, 2018
d922069
grpcproxy: fix "grpc.ErrClientConnClosing" handling
gyuho Jun 15, 2018
6572d60
vendor: Bump to grpc v1.12.2
jpbetz Jun 15, 2018
6309e4b
docs: Add client architecture doc
jpbetz Jun 15, 2018
8451a17
clientv3: Enable balancer logging if ETCD_CLIENT_DEBUG environment va…
jpbetz Jun 15, 2018
cb6e9d2
CHANGELOG: Add PR and issue links for new client balancer
jpbetz Jun 15, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG-3.4.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.3.0...v3.4.0) and [

### Improved

- Rewrite [client balancer](TODO) with [new gRPC balancer interface](TODO).
- Rewrite [client balancer](https://github.com/coreos/etcd/pull/9860) with [new gRPC balancer interface](https://github.com/coreos/etcd/issues/9106).
- Add [backoff on watch retries on transient errors](https://github.com/coreos/etcd/pull/9840).
- Add [jitter to watch progress notify](https://github.com/coreos/etcd/pull/9278) to prevent [spikes in `etcd_network_client_grpc_sent_bytes_total`](https://github.com/coreos/etcd/issues/9246).
- Improve [slow request apply warning log](https://github.com/coreos/etcd/pull/9288).
Expand Down
19 changes: 16 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions bill-of-materials.json
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@
}
]
},
{
"project": "github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils",
"licenses": [
{
"type": "Apache License 2.0",
"confidence": 1
}
]
},
{
"project": "github.com/grpc-ecosystem/go-grpc-prometheus",
"licenses": [
Expand Down
5 changes: 2 additions & 3 deletions clientv3/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/coreos/etcd/auth/authpb"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"

"google.golang.org/grpc"
)

Expand Down Expand Up @@ -216,8 +215,8 @@ func (auth *authenticator) close() {
auth.conn.Close()
}

func newAuthenticator(endpoint string, opts []grpc.DialOption, c *Client) (*authenticator, error) {
conn, err := grpc.Dial(endpoint, opts...)
func newAuthenticator(ctx context.Context, target string, opts []grpc.DialOption, c *Client) (*authenticator, error) {
conn, err := grpc.DialContext(ctx, target, opts...)
if err != nil {
return nil, err
}
Expand Down
275 changes: 275 additions & 0 deletions clientv3/balancer/balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package balancer

import (
"fmt"
"strconv"
"sync"
"time"

"github.com/coreos/etcd/clientv3/balancer/picker"

"go.uber.org/zap"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
_ "google.golang.org/grpc/resolver/dns" // register DNS resolver
_ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
)

// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
// must be invoked at initialization time.
func RegisterBuilder(cfg Config) {
bb := &builder{cfg}
balancer.Register(bb)

bb.cfg.Logger.Info(
"registered balancer",
zap.String("policy", bb.cfg.Policy.String()),
zap.String("name", bb.cfg.Name),
)
}

type builder struct {
cfg Config
}

// Build is called initially when creating "ccBalancerWrapper".
// "grpc.Dial" is called to this client connection.
// Then, resolved addresses will be handled via "HandleResolvedAddrs".
func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
bb := &baseBalancer{
id: strconv.FormatInt(time.Now().UnixNano(), 36),
policy: b.cfg.Policy,
name: b.cfg.Policy.String(),
lg: b.cfg.Logger,

addrToSc: make(map[resolver.Address]balancer.SubConn),
scToAddr: make(map[balancer.SubConn]resolver.Address),
scToSt: make(map[balancer.SubConn]connectivity.State),

currentConn: nil,
csEvltr: &connectivityStateEvaluator{},

// initialize picker always returns "ErrNoSubConnAvailable"
Picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
}
if b.cfg.Name != "" {
bb.name = b.cfg.Name
}
if bb.lg == nil {
bb.lg = zap.NewNop()
}

// TODO: support multiple connections
bb.mu.Lock()
bb.currentConn = cc
bb.mu.Unlock()

bb.lg.Info(
"built balancer",
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
zap.String("resolver-target", cc.Target()),
)
return bb
}

// Name implements "grpc/balancer.Builder" interface.
func (b *builder) Name() string { return b.cfg.Name }

// Balancer defines client balancer interface.
type Balancer interface {
// Balancer is called on specified client connection. Client initiates gRPC
// connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved
// addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs".
// For each resolved address, balancer calls "balancer.ClientConn.NewSubConn".
// "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state
// changes, thus requires failover logic in this method.
balancer.Balancer

// Picker calls "Pick" for every client request.
picker.Picker
}

type baseBalancer struct {
id string
policy picker.Policy
name string
lg *zap.Logger

mu sync.RWMutex

addrToSc map[resolver.Address]balancer.SubConn
scToAddr map[balancer.SubConn]resolver.Address
scToSt map[balancer.SubConn]connectivity.State

currentConn balancer.ClientConn
currentState connectivity.State
csEvltr *connectivityStateEvaluator

picker.Picker
}

// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
// gRPC sends initial or updated resolved addresses from "Build".
func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
if err != nil {
bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
return
}
bb.lg.Info("resolved", zap.String("balancer-id", bb.id), zap.Strings("addresses", addrsToStrings(addrs)))

bb.mu.Lock()
defer bb.mu.Unlock()

resolved := make(map[resolver.Address]struct{})
for _, addr := range addrs {
resolved[addr] = struct{}{}
if _, ok := bb.addrToSc[addr]; !ok {
sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
if err != nil {
bb.lg.Warn("NewSubConn failed", zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
continue
}
bb.addrToSc[addr] = sc
bb.scToAddr[sc] = addr
bb.scToSt[sc] = connectivity.Idle
sc.Connect()
}
}

for addr, sc := range bb.addrToSc {
if _, ok := resolved[addr]; !ok {
// was removed by resolver or failed to create subconn
bb.currentConn.RemoveSubConn(sc)
delete(bb.addrToSc, addr)

bb.lg.Info(
"removed subconn",
zap.String("balancer-id", bb.id),
zap.String("address", addr.Addr),
zap.String("subconn", scToString(sc)),
)

// Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown.
// The entry will be deleted in HandleSubConnStateChange.
// (DO NOT) delete(bb.scToAddr, sc)
// (DO NOT) delete(bb.scToSt, sc)
}
}
}

// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
bb.mu.Lock()
defer bb.mu.Unlock()

old, ok := bb.scToSt[sc]
if !ok {
bb.lg.Warn(
"state change for an unknown subconn",
zap.String("balancer-id", bb.id),
zap.String("subconn", scToString(sc)),
zap.String("state", s.String()),
)
return
}

bb.lg.Info(
"state changed",
zap.String("balancer-id", bb.id),
zap.Bool("connected", s == connectivity.Ready),
zap.String("subconn", scToString(sc)),
zap.String("address", bb.scToAddr[sc].Addr),
zap.String("old-state", old.String()),
zap.String("new-state", s.String()),
)

bb.scToSt[sc] = s
switch s {
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scToSt. Remove state for this sc here.
delete(bb.scToAddr, sc)
delete(bb.scToSt, sc)
}

oldAggrState := bb.currentState
bb.currentState = bb.csEvltr.recordTransition(old, s)

// Regenerate picker when one of the following happens:
// - this sc became ready from not-ready
// - this sc became not-ready from ready
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
if (s == connectivity.Ready) != (old == connectivity.Ready) ||
(bb.currentState == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
bb.regeneratePicker()
}

bb.currentConn.UpdateBalancerState(bb.currentState, bb.Picker)
return
}

func (bb *baseBalancer) regeneratePicker() {
if bb.currentState == connectivity.TransientFailure {
bb.lg.Info(
"generated transient error picker",
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
)
bb.Picker = picker.NewErr(balancer.ErrTransientFailure)
return
}

// only pass ready subconns to picker
scs := make([]balancer.SubConn, 0)
addrToSc := make(map[resolver.Address]balancer.SubConn)
scToAddr := make(map[balancer.SubConn]resolver.Address)
for addr, sc := range bb.addrToSc {
if st, ok := bb.scToSt[sc]; ok && st == connectivity.Ready {
scs = append(scs, sc)
addrToSc[addr] = sc
scToAddr[sc] = addr
}
}

switch bb.policy {
case picker.RoundrobinBalanced:
bb.Picker = picker.NewRoundrobinBalanced(bb.lg, scs, addrToSc, scToAddr)

default:
panic(fmt.Errorf("invalid balancer picker policy (%d)", bb.policy))
}

bb.lg.Info(
"generated picker",
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
zap.Strings("subconn-ready", scsToStrings(addrToSc)),
zap.Int("subconn-size", len(addrToSc)),
)
}

// Close implements "grpc/balancer.Balancer" interface.
// Close is a nop because base balancer doesn't have internal state to clean up,
// and it doesn't need to call RemoveSubConn for the SubConns.
func (bb *baseBalancer) Close() {
// TODO
}
Loading