diff --git a/clientconn_test.go b/clientconn_test.go index 281c9618606f..c742e144a94c 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -1143,18 +1143,10 @@ func testDefaultServiceConfigWhenResolverReturnInvalidServiceConfig(t *testing.T } type stateRecordingBalancer struct { - notifier chan<- connectivity.State balancer.Balancer } -func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { - b.notifier <- s.ConnectivityState - b.Balancer.UpdateSubConnState(sc, s) -} - -func (b *stateRecordingBalancer) ResetNotifier(r chan<- connectivity.State) { - b.notifier = r -} +func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {} func (b *stateRecordingBalancer) Close() { b.Balancer.Close() @@ -1179,8 +1171,7 @@ func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balan b.notifier = stateNotifications b.mu.Unlock() return &stateRecordingBalancer{ - notifier: stateNotifications, - Balancer: balancer.Get("pick_first").Build(cc, opts), + Balancer: balancer.Get("pick_first").Build(&stateRecordingCCWrapper{cc, stateNotifications}, opts), } } @@ -1192,6 +1183,20 @@ func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity. return ret } +type stateRecordingCCWrapper struct { + balancer.ClientConn + notifier chan<- connectivity.State +} + +func (ccw *stateRecordingCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + oldListener := opts.StateListener + opts.StateListener = func(s balancer.SubConnState) { + ccw.notifier <- s.ConnectivityState + oldListener(s) + } + return ccw.ClientConn.NewSubConn(addrs, opts) +} + // Keep reading until something causes the connection to die (EOF, server // closed, etc). Useful as a tool for mindlessly keeping the connection // healthy, since the client will error if things like client prefaces are not diff --git a/pickfirst.go b/pickfirst.go index d41949f1c8a5..00dd7633ebf1 100644 --- a/pickfirst.go +++ b/pickfirst.go @@ -146,7 +146,12 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState return nil } - subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{}) + var subConn balancer.SubConn + subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{ + StateListener: func(state balancer.SubConnState) { + b.updateSubConnState(subConn, state) + }, + }) if err != nil { if b.logger.V(2) { b.logger.Infof("Failed to create new SubConn: %v", err) @@ -168,7 +173,13 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState return nil } +// UpdateSubConnState is unused as a StateListener is always registered when +// creating SubConns. func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) { + b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", subConn, state) +} + +func (b *pickfirstBalancer) updateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) { if b.logger.V(2) { b.logger.Infof("Received SubConn state update: %p, %+v", subConn, state) } diff --git a/test/clientconn_state_transition_test.go b/test/clientconn_state_transition_test.go index a14ff4588a0f..545e01246ab6 100644 --- a/test/clientconn_state_transition_test.go +++ b/test/clientconn_state_transition_test.go @@ -444,19 +444,9 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) { } type stateRecordingBalancer struct { - notifier chan<- connectivity.State balancer.Balancer } -func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { - b.notifier <- s.ConnectivityState - b.Balancer.UpdateSubConnState(sc, s) -} - -func (b *stateRecordingBalancer) ResetNotifier(r chan<- connectivity.State) { - b.notifier = r -} - func (b *stateRecordingBalancer) Close() { b.Balancer.Close() } @@ -480,8 +470,7 @@ func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balan b.notifier = stateNotifications b.mu.Unlock() return &stateRecordingBalancer{ - notifier: stateNotifications, - Balancer: balancer.Get("pick_first").Build(cc, opts), + Balancer: balancer.Get("pick_first").Build(&stateRecordingCCWrapper{cc, stateNotifications}, opts), } } @@ -493,6 +482,20 @@ func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity. return ret } +type stateRecordingCCWrapper struct { + balancer.ClientConn + notifier chan<- connectivity.State +} + +func (ccw *stateRecordingCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + oldListener := opts.StateListener + opts.StateListener = func(s balancer.SubConnState) { + ccw.notifier <- s.ConnectivityState + oldListener(s) + } + return ccw.ClientConn.NewSubConn(addrs, opts) +} + // Keep reading until something causes the connection to die (EOF, server // closed, etc). Useful as a tool for mindlessly keeping the connection // healthy, since the client will error if things like client prefaces are not diff --git a/xds/internal/balancer/clustermanager/clustermanager_test.go b/xds/internal/balancer/clustermanager/clustermanager_test.go index dec07b240c0b..f330613541d9 100644 --- a/xds/internal/balancer/clustermanager/clustermanager_test.go +++ b/xds/internal/balancer/clustermanager/clustermanager_test.go @@ -703,7 +703,7 @@ func TestClusterGracefulSwitch(t *testing.T) { // Update the pick first balancers SubConn as CONNECTING. This will cause // the pick first balancer to UpdateState() with CONNECTING, which shouldn't send // a Picker update back, as the Graceful Switch process is not complete. - rtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) defer cancel() select { @@ -716,7 +716,7 @@ func TestClusterGracefulSwitch(t *testing.T) { // the pick first balancer to UpdateState() with READY, which should send a // Picker update back, as the Graceful Switch process is complete. This // Picker should always pick the pick first's created SubConn. - rtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) p2 := <-cc.NewPickerCh testPick(t, p2, pi, sc2, nil) // The Graceful Switch process completing for the child should cause the