Skip to content

Commit

Permalink
Server should send 2 goaway messages to gracefully shutdown the conne…
Browse files Browse the repository at this point in the history
…ction. (#1403)

* First commit.

* Basic implementation

* Server should send two GoAways to gracefully shutdown the connection.

* mend

* Post-review updates

* Fixed issue after rebase

* Fixing typo
  • Loading branch information
MakMukhi authored and menghanl committed Aug 14, 2017
1 parent e63eb64 commit db24830
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 23 deletions.
2 changes: 2 additions & 0 deletions transport/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func (*resetStream) item() {}
type goAway struct {
code http2.ErrCode
debugData []byte
headsUp bool
closeConn bool
}

func (*goAway) item() {}
Expand Down
86 changes: 63 additions & 23 deletions transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ type http2Server struct {
framer *framer
hBuf *bytes.Buffer // the buffer for HPACK encoding
hEnc *hpack.Encoder // HPACK encoder

// The max number of concurrent streams.
maxStreams uint32
// controlBuf delivers all the control related tasks (e.g., window
Expand All @@ -77,9 +76,7 @@ type http2Server struct {
fc *inFlow
// sendQuotaPool provides flow control to outbound message.
sendQuotaPool *quotaPool

stats stats.Handler

stats stats.Handler
// Flag to keep track of reading activity on transport.
// 1 is true and 0 is false.
activity uint32 // Accessed atomically.
Expand All @@ -95,13 +92,19 @@ type http2Server struct {
// Flag to signify that number of ping strikes should be reset to 0.
// This is set whenever data or header frames are sent.
// 1 means yes.
resetPingStrikes uint32 // Accessed atomically.

resetPingStrikes uint32 // Accessed atomically.
initialWindowSize int32
bdpEst *bdpEstimator

bdpEst *bdpEstimator
mu sync.Mutex // guard the following

mu sync.Mutex // guard the following
// drainChan is initialized when drain(...) is called the first time.
// After which the server writes out the first GoAway(with ID 2^31-1) frame.
// Then an independent goroutine will be launched to later send the second GoAway.
// During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
// Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
// already underway.
drainChan chan struct{}
state transportState
activeStreams map[uint32]*Stream
// the per-stream outbound flow control window size set by the peer.
Expand Down Expand Up @@ -592,6 +595,10 @@ const (

func (t *http2Server) handlePing(f *http2.PingFrame) {
if f.IsAck() {
if f.Data == goAwayPing.data && t.drainChan != nil {
close(t.drainChan)
return
}
// Maybe it's a BDP ping.
if t.bdpEst != nil {
t.bdpEst.calculate(f.Data)
Expand Down Expand Up @@ -631,7 +638,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {

if t.pingStrikes > maxPingStrikes {
// Send goaway and close the connection.
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings")})
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
}
}

Expand Down Expand Up @@ -956,23 +963,18 @@ func (t *http2Server) keepalive() {
continue
}
val := t.kp.MaxConnectionIdle - time.Since(idle)
t.mu.Unlock()
if val <= 0 {
// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
// Gracefully close the connection.
t.state = draining
t.mu.Unlock()
t.Drain()
t.drain(http2.ErrCodeNo, []byte{})
// Reseting the timer so that the clean-up doesn't deadlock.
maxIdle.Reset(infinity)
return
}
t.mu.Unlock()
maxIdle.Reset(val)
case <-maxAge.C:
t.mu.Lock()
t.state = draining
t.mu.Unlock()
t.Drain()
t.drain(http2.ErrCodeNo, []byte{})
maxAge.Reset(t.kp.MaxConnectionAgeGrace)
select {
case <-maxAge.C:
Expand Down Expand Up @@ -1004,6 +1006,8 @@ func (t *http2Server) keepalive() {
}
}

var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}

// controller running in a separate goroutine takes charge of sending control
// frames (e.g., window update, reset stream, setting, etc.) to the server.
func (t *http2Server) controller() {
Expand Down Expand Up @@ -1033,12 +1037,38 @@ func (t *http2Server) controller() {
return
}
sid := t.maxStreamID
t.state = draining
t.mu.Unlock()
t.framer.writeGoAway(true, sid, i.code, i.debugData)
if i.code == http2.ErrCodeEnhanceYourCalm {
t.Close()
if !i.headsUp {
// Stop accepting more streams now.
t.state = draining
t.mu.Unlock()
t.framer.writeGoAway(true, sid, i.code, i.debugData)
if i.closeConn {
// Abruptly close the connection following the GoAway.
t.Close()
}
t.writableChan <- 0
continue
}
t.mu.Unlock()
// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
// Follow that with a ping and wait for the ack to come back or a timer
// to expire. During this time accept new streams since they might have
// originated before the GoAway reaches the client.
// After getting the ack or timer expiration send out another GoAway this
// time with an ID of the max stream server intends to process.
t.framer.writeGoAway(true, math.MaxUint32, http2.ErrCodeNo, []byte{})
t.framer.writePing(true, false, goAwayPing.data)
go func() {
timer := time.NewTimer(time.Minute)
defer timer.Stop()
select {
case <-t.drainChan:
case <-timer.C:
case <-t.shutdownChan:
return
}
t.controlBuf.put(&goAway{code: i.code, debugData: i.debugData})
}()
case *flushIO:
t.framer.flushWrite()
case *ping:
Expand Down Expand Up @@ -1116,7 +1146,17 @@ func (t *http2Server) RemoteAddr() net.Addr {
}

func (t *http2Server) Drain() {
t.controlBuf.put(&goAway{code: http2.ErrCodeNo})
t.drain(http2.ErrCodeNo, []byte{})
}

func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
t.mu.Lock()
defer t.mu.Unlock()
if t.drainChan != nil {
return
}
t.drainChan = make(chan struct{})
t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
}

var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))
Expand Down

0 comments on commit db24830

Please sign in to comment.