Skip to content

Commit

Permalink
client: fix Connect to handle channel idleness properly (#6331)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley authored May 31, 2023
1 parent 3ea58ce commit 1f23f6c
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 12 deletions.
6 changes: 5 additions & 1 deletion clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ func (cc *ClientConn) exitIdleMode() error {
return errConnClosing
}
if cc.idlenessState != ccIdlenessStateIdle {
logger.Error("ClientConn asked to exit idle mode when not in idle mode")
cc.mu.Unlock()
logger.Info("ClientConn asked to exit idle mode when not in idle mode")
return nil
}

Expand Down Expand Up @@ -706,6 +707,9 @@ func (cc *ClientConn) GetState() connectivity.State {
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
// release.
func (cc *ClientConn) Connect() {
cc.exitIdleMode()
// If the ClientConn was not in idle mode, we need to call ExitIdle on the
// LB policy so that connections can be created.
cc.balancerWrapper.exitIdleMode()
}

Expand Down
58 changes: 47 additions & 11 deletions test/idleness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ func (s) TestChannelIdleness_Disabled_NoActivity(t *testing.T) {
t.Cleanup(backend.Stop)
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Veirfy that the ClientConn moves to READY.
// Verify that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
awaitState(ctx, t, cc, connectivity.Ready)

// Veirfy that the ClientConn stay in READY.
// Verify that the ClientConn stay in READY.
sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout)
defer sCancel()
awaitNoStateChange(sCtx, t, cc, connectivity.Ready)
Expand Down Expand Up @@ -152,12 +152,12 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
t.Cleanup(backend.Stop)
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Veirfy that the ClientConn moves to READY.
// Verify that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
awaitState(ctx, t, cc, connectivity.Ready)

// Veirfy that the ClientConn moves to IDLE as there is no activity.
// Verify that the ClientConn moves to IDLE as there is no activity.
awaitState(ctx, t, cc, connectivity.Idle)

// Verify idleness related channelz events.
Expand Down Expand Up @@ -203,7 +203,7 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
t.Cleanup(backend.Stop)
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Veirfy that the ClientConn moves to READY.
// Verify that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
awaitState(ctx, t, cc, connectivity.Ready)
Expand All @@ -213,7 +213,7 @@ func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
// the server RPC handler and the unary call below.
errCh := make(chan error, 1)
go func() {
// Veirfy that the ClientConn stay in READY.
// Verify that the ClientConn stay in READY.
sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout)
defer sCancel()
awaitNoStateChange(sCtx, t, cc, connectivity.Ready)
Expand Down Expand Up @@ -277,7 +277,7 @@ func (s) TestChannelIdleness_Enabled_ActiveSinceLastCheck(t *testing.T) {
t.Cleanup(backend.Stop)
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Veirfy that the ClientConn moves to READY.
// Verify that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
awaitState(ctx, t, cc, connectivity.Ready)
Expand All @@ -302,7 +302,7 @@ func (s) TestChannelIdleness_Enabled_ActiveSinceLastCheck(t *testing.T) {
}
}()

// Veirfy that the ClientConn stay in READY.
// Verify that the ClientConn stay in READY.
awaitNoStateChange(sCtx, t, cc, connectivity.Ready)

// Verify that there are no idleness related channelz events.
Expand Down Expand Up @@ -343,12 +343,12 @@ func (s) TestChannelIdleness_Enabled_ExitIdleOnRPC(t *testing.T) {
}
t.Cleanup(func() { cc.Close() })

// Veirfy that the ClientConn moves to READY.
// Verify that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
awaitState(ctx, t, cc, connectivity.Ready)

// Veirfy that the ClientConn moves to IDLE as there is no activity.
// Verify that the ClientConn moves to IDLE as there is no activity.
awaitState(ctx, t, cc, connectivity.Idle)

// Verify idleness related channelz events.
Expand Down Expand Up @@ -405,7 +405,7 @@ func (s) TestChannelIdleness_Enabled_IdleTimeoutRacesWithRPCs(t *testing.T) {
}
t.Cleanup(func() { cc.Close() })

// Veirfy that the ClientConn moves to READY.
// Verify that the ClientConn moves to READY.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
awaitState(ctx, t, cc, connectivity.Ready)
Expand All @@ -421,3 +421,39 @@ func (s) TestChannelIdleness_Enabled_IdleTimeoutRacesWithRPCs(t *testing.T) {
}
}
}

// Tests the case where the channel is IDLE and we call cc.Connect.
func (s) TestChannelIdleness_Connect(t *testing.T) {
// Start a test backend and set the bootstrap state of the resolver to
// include this address. This will ensure that when the resolver is
// restarted when exiting idle, it will push the same address to grpc again.
r := manual.NewBuilderWithScheme("whatever")
backend := stubserver.StartTestService(t, nil)
t.Cleanup(backend.Stop)
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Create a ClientConn with a short idle_timeout.
dopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r),
grpc.WithIdleTimeout(defaultTestShortIdleTimeout),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })

// Verify that the ClientConn moves to IDLE.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

awaitState(ctx, t, cc, connectivity.Idle)

// Connect should exit channel idleness.
cc.Connect()

// Verify that the ClientConn moves back to READY.
awaitState(ctx, t, cc, connectivity.Ready)
}

0 comments on commit 1f23f6c

Please sign in to comment.