Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pickfirst: receive state updates via callback instead of UpdateSubConnState #6495

Merged
merged 2 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions clientconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1143,18 +1143,10 @@ func testDefaultServiceConfigWhenResolverReturnInvalidServiceConfig(t *testing.T
}

type stateRecordingBalancer struct {
notifier chan<- connectivity.State
balancer.Balancer
}

func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
b.notifier <- s.ConnectivityState
b.Balancer.UpdateSubConnState(sc, s)
}

func (b *stateRecordingBalancer) ResetNotifier(r chan<- connectivity.State) {
b.notifier = r
}
func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {}

func (b *stateRecordingBalancer) Close() {
b.Balancer.Close()
Expand All @@ -1179,8 +1171,7 @@ func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balan
b.notifier = stateNotifications
b.mu.Unlock()
return &stateRecordingBalancer{
notifier: stateNotifications,
Balancer: balancer.Get("pick_first").Build(cc, opts),
Balancer: balancer.Get("pick_first").Build(&stateRecordingCCWrapper{cc, stateNotifications}, opts),
}
}

Expand All @@ -1192,6 +1183,20 @@ func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity.
return ret
}

type stateRecordingCCWrapper struct {
balancer.ClientConn
notifier chan<- connectivity.State
}

func (ccw *stateRecordingCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
oldListener := opts.StateListener
opts.StateListener = func(s balancer.SubConnState) {
ccw.notifier <- s.ConnectivityState
oldListener(s)
}
return ccw.ClientConn.NewSubConn(addrs, opts)
}

// Keep reading until something causes the connection to die (EOF, server
// closed, etc). Useful as a tool for mindlessly keeping the connection
// healthy, since the client will error if things like client prefaces are not
Expand Down
13 changes: 12 additions & 1 deletion pickfirst.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,12 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
return nil
}

subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{})
var subConn balancer.SubConn
subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{
StateListener: func(state balancer.SubConnState) {
b.updateSubConnState(subConn, state)
},
})
if err != nil {
if b.logger.V(2) {
b.logger.Infof("Failed to create new SubConn: %v", err)
Expand All @@ -168,7 +173,13 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
return nil
}

// UpdateSubConnState is unused as a StateListener is always registered when
// creating SubConns.
func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", subConn, state)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we throw an error log here, just to ensure that it is caught by our tests if by any chance this ends up happening?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


func (b *pickfirstBalancer) updateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
if b.logger.V(2) {
b.logger.Infof("Received SubConn state update: %p, %+v", subConn, state)
}
Expand Down
27 changes: 15 additions & 12 deletions test/clientconn_state_transition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,19 +444,9 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
}

type stateRecordingBalancer struct {
notifier chan<- connectivity.State
balancer.Balancer
}

func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
b.notifier <- s.ConnectivityState
b.Balancer.UpdateSubConnState(sc, s)
}

func (b *stateRecordingBalancer) ResetNotifier(r chan<- connectivity.State) {
b.notifier = r
}

func (b *stateRecordingBalancer) Close() {
b.Balancer.Close()
}
Expand All @@ -480,8 +470,7 @@ func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balan
b.notifier = stateNotifications
b.mu.Unlock()
return &stateRecordingBalancer{
notifier: stateNotifications,
Balancer: balancer.Get("pick_first").Build(cc, opts),
Balancer: balancer.Get("pick_first").Build(&stateRecordingCCWrapper{cc, stateNotifications}, opts),
}
}

Expand All @@ -493,6 +482,20 @@ func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity.
return ret
}

type stateRecordingCCWrapper struct {
balancer.ClientConn
notifier chan<- connectivity.State
}

func (ccw *stateRecordingCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
oldListener := opts.StateListener
opts.StateListener = func(s balancer.SubConnState) {
ccw.notifier <- s.ConnectivityState
oldListener(s)
}
return ccw.ClientConn.NewSubConn(addrs, opts)
}

// Keep reading until something causes the connection to die (EOF, server
// closed, etc). Useful as a tool for mindlessly keeping the connection
// healthy, since the client will error if things like client prefaces are not
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/balancer/clustermanager/clustermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ func TestClusterGracefulSwitch(t *testing.T) {
// Update the pick first balancers SubConn as CONNECTING. This will cause
// the pick first balancer to UpdateState() with CONNECTING, which shouldn't send
// a Picker update back, as the Graceful Switch process is not complete.
rtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
select {
Expand All @@ -716,7 +716,7 @@ func TestClusterGracefulSwitch(t *testing.T) {
// the pick first balancer to UpdateState() with READY, which should send a
// Picker update back, as the Graceful Switch process is complete. This
// Picker should always pick the pick first's created SubConn.
rtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
p2 := <-cc.NewPickerCh
testPick(t, p2, pi, sc2, nil)
// The Graceful Switch process completing for the child should cause the
Expand Down