Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balancer: add ExitIdle optional interface #4673

Merged
merged 6 commits into from
Aug 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 31 additions & 15 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,24 +75,26 @@ func Get(name string) Builder {
return nil
}

// SubConn represents a gRPC sub connection.
// Each sub connection contains a list of addresses. gRPC will
// try to connect to them (in sequence), and stop trying the
// remainder once one connection is successful.
// A SubConn represents a single connection to a gRPC backend service.
//
// The reconnect backoff will be applied on the list, not a single address.
// For example, try_on_all_addresses -> backoff -> try_on_all_addresses.
// Each SubConn contains a list of addresses.
//
// All SubConns start in IDLE, and will not try to connect. To trigger
// the connecting, Balancers must call Connect.
// When the connection encounters an error, it will reconnect immediately.
// When the connection becomes IDLE, it will not reconnect unless Connect is
// called.
// All SubConns start in IDLE, and will not try to connect. To trigger the
// connecting, Balancers must call Connect. If a connection re-enters IDLE,
// Balancers must call Connect again to trigger a new connection attempt.
//
// This interface is to be implemented by gRPC. Users should not need a
// brand new implementation of this interface. For the situations like
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
// gRPC will try to connect to the addresses in sequence, and stop trying the
// remainder once the first connection is successful. If an attempt to connect
// to all addresses encounters an error, the SubConn will enter
// TRANSIENT_FAILURE for a backoff period, and then transition to IDLE.
//
// Once established, if a connection is lost, the SubConn will transition
// directly to IDLE.
//
// This interface is to be implemented by gRPC. Users should not need their own
// implementation of this interface. For situations like testing, any
// implementations should embed this interface. This allows gRPC to add new
// methods to this interface.
type SubConn interface {
// UpdateAddresses updates the addresses used in this SubConn.
// gRPC checks if currently-connected address is still in the new list.
Expand Down Expand Up @@ -326,6 +328,20 @@ type Balancer interface {
Close()
}

// ExitIdler is an optional interface for balancers to implement. If
// implemented, ExitIdle will be called when ClientConn.Connect is called, if
// the ClientConn is idle. If unimplemented, ClientConn.Connect will cause
// all SubConns to connect.
//
// Notice: it will be required for all balancers to implement this in a future
// release.
type ExitIdler interface {
// ExitIdle instructs the LB policy to reconnect to backends / exit the
// IDLE state, if appropriate and possible. Note that SubConns that enter
// the IDLE state will not reconnect until SubConn.Connect is called.
ExitIdle()
}

// SubConnState describes the state of a SubConn.
type SubConnState struct {
// ConnectivityState is the connectivity state of the SubConn.
Expand Down
5 changes: 5 additions & 0 deletions balancer/base/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
func (b *baseBalancer) Close() {
}

// ExitIdle is a nop because the base balancer attempts to stay connected to
// all SubConns at all times.
func (b *baseBalancer) ExitIdle() {
}

// NewErrPicker returns a Picker that always returns err on Pick().
func NewErrPicker(err error) balancer.Picker {
return &errPicker{err: err}
Expand Down
2 changes: 2 additions & 0 deletions balancer/grpclb/grpclb.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,3 +488,5 @@ func (lb *lbBalancer) Close() {
}
lb.cc.close()
}

func (lb *lbBalancer) ExitIdle() {}
4 changes: 4 additions & 0 deletions balancer/rls/internal/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ func (lb *rlsBalancer) Close() {
}
}

func (lb *rlsBalancer) ExitIdle() {
// TODO: are we 100% sure this should be a nop?
}

// updateControlChannel updates the RLS client if required.
// Caller must hold lb.mu.
func (lb *rlsBalancer) updateControlChannel(newCfg *lbConfig) {
Expand Down
39 changes: 32 additions & 7 deletions balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,20 @@ type scStateUpdate struct {
err error
}

// exitIdle contains no data and is just a signal sent on the updateCh in
// ccBalancerWrapper to instruct the balancer to exit idle.
type exitIdle struct{}

// ccBalancerWrapper is a wrapper on top of cc for balancers.
// It implements balancer.ClientConn interface.
type ccBalancerWrapper struct {
cc *ClientConn
balancerMu sync.Mutex // synchronizes calls to the balancer
balancer balancer.Balancer
updateCh *buffer.Unbounded
closed *grpcsync.Event
done *grpcsync.Event
cc *ClientConn
balancerMu sync.Mutex // synchronizes calls to the balancer
balancer balancer.Balancer
hasExitIdle bool
updateCh *buffer.Unbounded
closed *grpcsync.Event
done *grpcsync.Event

mu sync.Mutex
subConns map[*acBalancerWrapper]struct{}
Expand All @@ -61,6 +66,7 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui
}
go ccb.watcher()
ccb.balancer = b.Build(ccb, bopts)
_, ccb.hasExitIdle = ccb.balancer.(balancer.ExitIdler)
return ccb
}

Expand All @@ -86,6 +92,17 @@ func (ccb *ccBalancerWrapper) watcher() {
ccb.cc.removeAddrConn(u.getAddrConn(), errConnDrain)
}
ccb.mu.Unlock()
case exitIdle:
if ccb.cc.GetState() == connectivity.Idle {
if ei, ok := ccb.balancer.(balancer.ExitIdler); ok {
// We already checked that the balancer implements
// ExitIdle before pushing the event to updateCh, but
// check conditionally again as defensive programming.
ccb.balancerMu.Lock()
ei.ExitIdle()
ccb.balancerMu.Unlock()
}
}
default:
logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", t, t)
}
Expand Down Expand Up @@ -118,6 +135,14 @@ func (ccb *ccBalancerWrapper) close() {
<-ccb.done.Done()
}

func (ccb *ccBalancerWrapper) exitIdle() bool {
if !ccb.hasExitIdle {
return false
}
ccb.updateCh.Put(exitIdle{})
return true
}

func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
// When updating addresses for a SubConn, if the address in use is not in
// the new addresses, the old ac will be tearDown() and a new ac will be
Expand All @@ -144,8 +169,8 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat

func (ccb *ccBalancerWrapper) resolverError(err error) {
ccb.balancerMu.Lock()
defer ccb.balancerMu.Unlock()
ccb.balancer.ResolverError(err)
ccb.balancerMu.Unlock()
}

func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
Expand Down
2 changes: 2 additions & 0 deletions balancer_switching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func (b *magicalLB) UpdateClientConnState(balancer.ClientConnState) error {

func (b *magicalLB) Close() {}

func (b *magicalLB) ExitIdle() {}

func init() {
balancer.Register(&magicalLB{})
}
Expand Down
14 changes: 7 additions & 7 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,13 +555,13 @@ func (cc *ClientConn) GetState() connectivity.State {
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
// release.
func (cc *ClientConn) Connect() {
if cc.GetState() == connectivity.Idle {
cc.mu.Lock()
for ac := range cc.conns {
// TODO: should this be a signal to the LB policy instead?
go ac.connect()
}
cc.mu.Unlock()
cc.mu.Lock()
defer cc.mu.Unlock()
if cc.balancerWrapper != nil && cc.balancerWrapper.exitIdle() {
return
}
for ac := range cc.conns {
go ac.connect()
}
}

Expand Down
7 changes: 7 additions & 0 deletions internal/balancer/stub/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type BalancerFuncs struct {
ResolverError func(*BalancerData, error)
UpdateSubConnState func(*BalancerData, balancer.SubConn, balancer.SubConnState)
Close func(*BalancerData)
ExitIdle func(*BalancerData)
}

// BalancerData contains data relevant to a stub balancer.
Expand Down Expand Up @@ -75,6 +76,12 @@ func (b *bal) Close() {
}
}

func (b *bal) ExitIdle() {
if b.bf.ExitIdle != nil {
b.bf.ExitIdle(b.bd)
}
}

type bb struct {
name string
bf BalancerFuncs
Expand Down
6 changes: 6 additions & 0 deletions pickfirst.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.S
func (b *pickfirstBalancer) Close() {
}

func (b *pickfirstBalancer) ExitIdle() {
if b.state == connectivity.Idle {
b.sc.Connect()
}
}

type picker struct {
result balancer.PickResult
err error
Expand Down
7 changes: 5 additions & 2 deletions test/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ func (b *testBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubCon

func (b *testBalancer) Close() {}

func (b *testBalancer) ExitIdle() {}

type picker struct {
err error
sc balancer.SubConn
Expand Down Expand Up @@ -373,8 +375,9 @@ func (testBalancerKeepAddresses) UpdateSubConnState(sc balancer.SubConn, s balan
panic("not used")
}

func (testBalancerKeepAddresses) Close() {
}
func (testBalancerKeepAddresses) Close() {}

func (testBalancerKeepAddresses) ExitIdle() {}

// Make sure that non-grpclb balancers don't get grpclb addresses even if name
// resolver sends them
Expand Down
27 changes: 27 additions & 0 deletions xds/internal/balancer/balancergroup/balancergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,22 @@ func (sbc *subBalancerWrapper) startBalancer() {
}
}

func (sbc *subBalancerWrapper) exitIdle() {
b := sbc.balancer
if b == nil {
return
}
if ei, ok := b.(balancer.ExitIdler); ok {
ei.ExitIdle()
return
}
for sc, b := range sbc.group.scToSubBalancer {
if b == sbc {
sc.Connect()
}
}
}

func (sbc *subBalancerWrapper) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
b := sbc.balancer
if b == nil {
Expand Down Expand Up @@ -493,6 +509,17 @@ func (bg *BalancerGroup) Close() {
bg.outgoingMu.Unlock()
}

// ExitIdle should be invoked when the parent LB policy's ExitIdle is invoked.
// It will trigger this on all sub-balancers, or reconnect their subconns if
// not supported.
func (bg *BalancerGroup) ExitIdle() {
bg.outgoingMu.Lock()
for _, config := range bg.idToBalancerConfig {
config.exitIdle()
}
bg.outgoingMu.Unlock()
}

const (
serverLoadCPUName = "cpu_utilization"
serverLoadMemoryName = "mem_utilization"
Expand Down
18 changes: 18 additions & 0 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ type scUpdate struct {
state balancer.SubConnState
}

type exitIdle struct{}

// cdsBalancer implements a CDS based LB policy. It instantiates a
// cluster_resolver balancer to further resolve the serviceName received from
// CDS, into localities and endpoints. Implements the balancer.Balancer
Expand Down Expand Up @@ -376,6 +378,18 @@ func (b *cdsBalancer) run() {
break
}
b.childLB.UpdateSubConnState(update.subConn, update.state)
case exitIdle:
if b.childLB == nil {
b.logger.Errorf("xds: received ExitIdle with no child balancer")
break
}
// This implementation assumes the child balancer supports
// ExitIdle (but still checks for the interface's existence to
// avoid a panic if not). If the child does not, no subconns
// will be connected.
if ei, ok := b.childLB.(balancer.ExitIdler); ok {
ei.ExitIdle()
}
menghanl marked this conversation as resolved.
Show resolved Hide resolved
}
case u := <-b.clusterHandler.updateChannel:
b.handleWatchUpdate(u)
Expand Down Expand Up @@ -494,6 +508,10 @@ func (b *cdsBalancer) Close() {
<-b.done.Done()
}

func (b *cdsBalancer) ExitIdle() {
b.updateCh.Put(exitIdle{})
}

// ccWrapper wraps the balancer.ClientConn passed to the CDS balancer at
// creation and intercepts the NewSubConn() and UpdateAddresses() call from the
// child policy to add security configuration required by xDS credentials.
Expand Down
Loading