Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport: refactor to reduce lock contention and improve performance #1962

Merged
merged 1 commit into from
Apr 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1241,7 +1241,20 @@ func (ac *addrConn) transportMonitor() {
// Block until we receive a goaway or an error occurs.
select {
case <-t.GoAway():
done := t.Error()
cleanup := t.Close
// Since this transport will be orphaned (won't have a transportMonitor)
// we need to launch a goroutine to keep track of clientConn.Close()
// happening since it might not be noticed by any other goroutine for a while.
go func() {
<-done
cleanup()
}()
case <-t.Error():
// In case this is triggered because clientConn.Close()
// was called, we want to immeditately close the transport
// since no other goroutine might notice it for a while.
t.Close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jadekler note this new requirement for the grpc layer to manually close the transport. This will need to be done by the onError callback in your PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll add it.

case <-cdeadline:
ac.mu.Lock()
// This implies that client received server preface.
Expand Down
83 changes: 40 additions & 43 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,6 @@ type lazyConn struct {

func (l *lazyConn) Write(b []byte) (int, error) {
if atomic.LoadInt32(&(l.beLazy)) == 1 {
// The sleep duration here needs to less than the leakCheck deadline.
time.Sleep(time.Second)
}
return l.Conn.Write(b)
Expand Down Expand Up @@ -963,7 +962,7 @@ func testServerGoAwayPendingRPC(t *testing.T, e env) {
}
// The existing RPC should be still good to proceed.
if err := stream.Send(req); err != nil {
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
}
if _, err := stream.Recv(); err != nil {
t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
Expand Down Expand Up @@ -3053,7 +3052,6 @@ func testMultipleSetHeaderStreamingRPCError(t *testing.T, e env) {
if !reflect.DeepEqual(header, expectedHeader) {
t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
}

if err := stream.CloseSend(); err != nil {
t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
}
Expand Down Expand Up @@ -3156,44 +3154,6 @@ func testRetry(t *testing.T, e env) {
}
}

func TestRPCTimeout(t *testing.T) {
defer leakcheck.Check(t)
for _, e := range listTestEnv() {
testRPCTimeout(t, e)
}
}

// TODO(zhaoq): Have a better test coverage of timeout and cancellation mechanism.
func testRPCTimeout(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security, unaryCallSleepTime: 50 * time.Millisecond})
defer te.tearDown()

cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)

const argSize = 2718
const respSize = 314

payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
if err != nil {
t.Fatal(err)
}

req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: respSize,
Payload: payload,
}
for i := -1; i <= 10; i++ {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(i)*time.Millisecond)
if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/UnaryCallv(_, _) = _, %v; want <nil>, error code: %s", err, codes.DeadlineExceeded)
}
cancel()
}
}

func TestCancel(t *testing.T) {
defer leakcheck.Check(t)
for _, e := range listTestEnv() {
Expand Down Expand Up @@ -3687,7 +3647,7 @@ func testClientStreaming(t *testing.T, e env, sizes []int) {
Payload: payload,
}
if err := stream.Send(req); err != nil {
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
}
sum += s
}
Expand Down Expand Up @@ -5078,7 +5038,7 @@ func TestTapTimeout(t *testing.T) {
ss := &stubServer{
emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
<-ctx.Done()
return &testpb.Empty{}, nil
return nil, status.Errorf(codes.Canceled, ctx.Err().Error())
},
}
if err := ss.Start(sopts); err != nil {
Expand Down Expand Up @@ -6218,3 +6178,40 @@ func TestFailFastRPCErrorOnBadCertificates(t *testing.T) {
}
te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want err.Error() contains %q", err, clientAlwaysFailCredErrorMsg)
}

func TestRPCTimeout(t *testing.T) {
defer leakcheck.Check(t)
for _, e := range listTestEnv() {
testRPCTimeout(t, e)
}
}

func testRPCTimeout(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security, unaryCallSleepTime: 500 * time.Millisecond})
defer te.tearDown()

cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)

const argSize = 2718
const respSize = 314

payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
if err != nil {
t.Fatal(err)
}

req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: respSize,
Payload: payload,
}
for i := -1; i <= 10; i++ {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(i)*time.Millisecond)
if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/UnaryCallv(_, _) = _, %v; want <nil>, error code: %s", err, codes.DeadlineExceeded)
}
cancel()
}
}
Loading