From f13b34ea9bf745f2c8362edd6f46d4a32daed8b4 Mon Sep 17 00:00:00 2001 From: mmukhi Date: Wed, 9 Aug 2017 14:16:03 -0700 Subject: [PATCH] squashed Server should send 2 goaway messages to gracefully shutdown the connection. (#1403) * First commit. * Basic implementation * Server should send two GoAways to gracefully shutdown the connection. * mend * Post-review updates * Fixed issue after rebase * Fixing typo Add balancer, resolver and connectivity package. add balancer_v1_wrapper.go and remove grpclb.go all test passed, how? end2end passed, nil pointer failure, ac.wait return nil transport fix ac.wait return nil transport, races not fixed (accessing cc.balancer) end2end passed, TODO grpclb all test passed add grpclb back move building balancer out from a goroutine to avoid race Otherwise, Dial() and Close() may race. Mark new APIs experimental split resetAddrConn into newAddrConn and connect add acBalancerWrapper rename file to balancer_conn_wrappers.go remove grpclog.Print make TODO(bar) remove Print comments fixes add missing license fix build failure after merge fix race in grpclb Update comments for balancer and resolver, and rename SubConnection to SubConn Add balancer, resolver and connectivity package. all test passed, how? --- bar_tobeRemoved.go | 78 ++++++++++++++++++++++++++++++++++++++++++++++ clientconn.go | 14 ++++----- doc.go | 6 ++++ 3 files changed, 91 insertions(+), 7 deletions(-) create mode 100644 bar_tobeRemoved.go diff --git a/bar_tobeRemoved.go b/bar_tobeRemoved.go new file mode 100644 index 000000000000..8b6a4fadae69 --- /dev/null +++ b/bar_tobeRemoved.go @@ -0,0 +1,78 @@ +package grpc + +import ( + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/resolver" +) + +////////////////////////////////////////////////////////////////////////////// +////////////////////// To b or not to b. ///////////////////////// +////////////////////////////////////////////////////////////////////////////// + +// newSubConnection adds a new addrConn to clientconn, and returns a wrapper +// subConn for balancer. +func (cc *ClientConn) newSubConnection(addrs []resolver.Address, opts balancer.NewSubConnectionOptions) (*addrConn, error) { + grpclog.Printf("creating new subconn, addrs[0]: %v\n", addrs[0]) + ac, err := cc.resetAddrConn(addrs[0], false, nil) + if err != nil { + // TODO what to do? + grpclog.Printf("neverrrrrrrr got err when resetting addrconn %v", err) + return nil, err + } + return ac, nil +} + +// removeSubConnection removes the addrConn in the subConn from clientConn. +// It also tears down the ac with the given error. +func (cc *ClientConn) removeSubConnection(ac *addrConn, err error) { + grpclog.Printf("removing new subconn") + cc.mu.Lock() + delete(cc.conns, ac) + cc.mu.Unlock() + ac.tearDown(err) +} + +func (cc *ClientConn) updatePicker(p balancer.Picker) { + grpclog.Printf("cc.updatePicker: %p", p) + // TODO add a goroutine and sync it. + cc.pmu.Lock() + cc.picker = p + cc.pmu.Unlock() +} + +// ccBalancerWrapper is a wrapper on top of cc for balancers. +// It implements balancer.ClientConnection interface. +type ccBalancerWrapper struct { + cc *ClientConn +} + +func (ccb *ccBalancerWrapper) NewSubConnection(addrs []resolver.Address, opts balancer.NewSubConnectionOptions) (balancer.SubConnection, error) { + return ccb.cc.newSubConnection(addrs, opts) +} + +func (ccb *ccBalancerWrapper) RemoveSubConnection(sc balancer.SubConnection) { + ac, ok := sc.(*addrConn) + if !ok { + // TODO log or error? + return + } + ccb.cc.removeSubConnection(ac, errConnClosing) +} + +func (ccb *ccBalancerWrapper) UpdateBalancerState(s connectivity.State, p balancer.Picker) { + ccb.cc.updatePicker(p) +} + +func (ccb *ccBalancerWrapper) Target() string { + return ccb.cc.target +} + +////////////////////////////////////////////////////////////////////////////// +////////////////////// To a or not to a. ///////////////////////// +////////////////////////////////////////////////////////////////////////////// + +func (ac *addrConn) UpdateAddresses([]resolver.Address) {} + +func (ac *addrConn) Connect() {} diff --git a/clientconn.go b/clientconn.go index 383170908fb2..2aca4a2cb4c6 100644 --- a/clientconn.go +++ b/clientconn.go @@ -672,15 +672,15 @@ func (ac *addrConn) connect(block bool) error { } if !ac.dopts.insecure { if ac.dopts.copts.TransportCredentials == nil { - return errNoTransportSecurity + return nil, errNoTransportSecurity } } else { if ac.dopts.copts.TransportCredentials != nil { - return errCredentialsConflict + return nil, errCredentialsConflict } for _, cd := range ac.dopts.copts.PerRPCCredentials { if cd.RequireTransportSecurity() { - return errTransportCredentialsMissing + return nil, errTransportCredentialsMissing } } } @@ -691,9 +691,9 @@ func (ac *addrConn) connect(block bool) error { ac.tearDown(err) } if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { - return e.Origin() + return nil, e.Origin() } - return err + return nil, err } // Start to monitor the error status of transport. go ac.transportMonitor() @@ -711,7 +711,7 @@ func (ac *addrConn) connect(block bool) error { ac.transportMonitor() }() } - return nil + return ac, nil } // GetMethodConfig gets the method config of the input method. @@ -873,7 +873,7 @@ func (ac *addrConn) resetTransport(drain bool) error { } ac.errorf("transient failure") oldState := ac.state - ac.state = connectivity.Connecting + ac.state = connectivity.TransientFailure ac.csEvltr.recordTransition(oldState, ac.state) if ac.cc.balancer != nil { ac.cc.balancer.HandleSubConnStateChange(ac.acbw, ac.state) diff --git a/doc.go b/doc.go index 187adbb117f2..a5098bb33a7e 100644 --- a/doc.go +++ b/doc.go @@ -22,3 +22,9 @@ Package grpc implements an RPC system called gRPC. See grpc.io for more information about gRPC. */ package grpc // import "google.golang.org/grpc" + +import ( + _ "google.golang.org/grpc/balancer" // test build + _ "google.golang.org/grpc/connectivity" // test build + _ "google.golang.org/grpc/resolver" // test build +)