Skip to content

Commit

Permalink
clientv3: notify active addresses to balancer with keepalive
Browse files Browse the repository at this point in the history
Signed-off-by: Gyu-Ho Lee <[email protected]>
  • Loading branch information
gyuho committed Aug 19, 2017
1 parent 9023ba2 commit 35e9a88
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 10 deletions.
68 changes: 58 additions & 10 deletions clientv3/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"net/url"
"strings"
"sync"
"time"

"golang.org/x/net/context"
"google.golang.org/grpc"
Expand All @@ -32,8 +33,10 @@ var ErrNoAddrAvilable = grpc.Errorf(codes.Unavailable, "there is no address avai
// simpleBalancer does the bare minimum to expose multiple eps
// to the grpc reconnection code path
type simpleBalancer struct {
// addrs are the client's endpoints for grpc
addrs []grpc.Address
keepAlive bool
// addrs are the client's endpoints for grpc,
// mapped to connection activity status
addrs map[grpc.Address]addrConn
// notifyCh notifies grpc of the set of addresses for connecting
notifyCh chan []grpc.Address

Expand Down Expand Up @@ -73,9 +76,9 @@ type simpleBalancer struct {

func newSimpleBalancer(eps []string) *simpleBalancer {
notifyCh := make(chan []grpc.Address, 1)
addrs := make([]grpc.Address, len(eps))
addrs := make(map[grpc.Address]addrConn, len(eps))
for i := range eps {
addrs[i].Addr = getHost(eps[i])
addrs[grpc.Address{Addr: getHost(eps[i])}] = addrConn{active: true, last: time.Now()}
}
sb := &simpleBalancer{
addrs: addrs,
Expand Down Expand Up @@ -136,9 +139,9 @@ func (b *simpleBalancer) updateAddrs(eps []string) {

b.host2ep = np

addrs := make([]grpc.Address, 0, len(eps))
addrs := make(map[grpc.Address]addrConn, len(eps))
for i := range eps {
addrs = append(addrs, grpc.Address{Addr: getHost(eps[i])})
addrs[grpc.Address{Addr: getHost(eps[i])}] = addrConn{active: true, last: time.Now()}
}
b.addrs = addrs

Expand All @@ -156,15 +159,38 @@ func (b *simpleBalancer) updateAddrs(eps []string) {
}
}

func hasAddr(addrs []grpc.Address, targetAddr string) bool {
for _, addr := range addrs {
func hasAddr(addrs map[grpc.Address]addrConn, targetAddr string) bool {
for addr := range addrs {
if targetAddr == addr.Addr {
return true
}
}
return false
}

type addrConn struct {
active bool
last time.Time
}

func setActive(addrs map[grpc.Address]addrConn, targetAddr string, active bool) (down bool) {
for addr, v := range addrs {
if targetAddr == addr.Addr {
// TODO: configure interval
if !v.active && time.Since(v.last) < time.Minute {
return false
}
ac := addrConn{active: active, last: v.last}
if active {
ac.last = time.Now()
}
addrs[addr] = ac
return true
}
}
return false
}

func (b *simpleBalancer) updateNotifyLoop() {
defer close(b.donec)

Expand Down Expand Up @@ -221,14 +247,24 @@ func (b *simpleBalancer) updateNotifyLoop() {

func (b *simpleBalancer) notifyAddrs() {
b.mu.RLock()
addrs := b.addrs
multi := len(b.addrs) > 1 // if single, retry the only endpoint
addrs := make([]grpc.Address, 0, len(b.addrs))
for addr, ac := range b.addrs {
if b.keepAlive && multi && !ac.active {
continue
}
addrs = append(addrs, addr)
}
b.mu.RUnlock()
select {
case b.notifyCh <- addrs:
case <-b.stopc:
}
}

// Up is called by gRPC client after address connection state
// becomes connectivity.Ready. This is after HTTP/2 client
// establishes the transport.
func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
b.mu.Lock()
defer b.mu.Unlock()
Expand All @@ -244,6 +280,11 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
if !hasAddr(b.addrs, addr.Addr) {
return func(err error) {}
}
if !setActive(b.addrs, addr.Addr, true) { // mark connectivity state as active
// it is possible that Up is called before gRPC receives Notify()
// and tears down keepalive timed-out endpoints
return func(err error) {}
}
if b.pinAddr != "" {
return func(err error) {}
}
Expand All @@ -255,8 +296,15 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
b.readyOnce.Do(func() { close(b.readyc) })
return func(err error) {
b.mu.Lock()
if b.keepAlive &&
(err.Error() == "grpc: failed with network I/O error" ||
err.Error() == "grpc: the connection is drained") {
// set as connectivity.TransientFailure until next Up
// TODO: undo this when connection is up
setActive(b.addrs, addr.Addr, false)
}
b.upc = make(chan struct{})
close(b.downc)
close(b.downc) // trigger notifyAddrs
b.pinAddr = ""
b.mu.Unlock()
}
Expand Down
4 changes: 4 additions & 0 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,10 @@ func newClient(cfg *Config) (*Client, error) {
}

client.balancer = newSimpleBalancer(cfg.Endpoints)
client.balancer.mu.Lock()
client.balancer.keepAlive = cfg.DialKeepAliveTime > 0
client.balancer.mu.Unlock()

// use Endpoints[0] so that for https:// without any tls config given, then
// grpc will assume the ServerName is in the endpoint.
conn, err := client.dial(cfg.Endpoints[0], grpc.WithBalancer(client.balancer))
Expand Down

0 comments on commit 35e9a88

Please sign in to comment.