Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server should send 2 goaway messages to gracefully shutdown the connection. #1403

Merged
merged 7 commits into from
Aug 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions transport/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func (*resetStream) item() {}
type goAway struct {
code http2.ErrCode
debugData []byte
headsUp bool
closeConn bool
}

func (*goAway) item() {}
Expand Down
87 changes: 64 additions & 23 deletions transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ type http2Server struct {
framer *framer
hBuf *bytes.Buffer // the buffer for HPACK encoding
hEnc *hpack.Encoder // HPACK encoder

// The max number of concurrent streams.
maxStreams uint32
// controlBuf delivers all the control related tasks (e.g., window
Expand All @@ -77,9 +76,7 @@ type http2Server struct {
fc *inFlow
// sendQuotaPool provides flow control to outbound message.
sendQuotaPool *quotaPool

stats stats.Handler

stats stats.Handler
// Flag to keep track of reading activity on transport.
// 1 is true and 0 is false.
activity uint32 // Accessed atomically.
Expand All @@ -95,14 +92,21 @@ type http2Server struct {
// Flag to signify that number of ping strikes should be reset to 0.
// This is set whenever data or header frames are sent.
// 1 means yes.
resetPingStrikes uint32 // Accessed atomically.

resetPingStrikes uint32 // Accessed atomically.
initialWindowSize int32
bdpEst *bdpEstimator

bdpEst *bdpEstimator
outQuotaVersion uint32

mu sync.Mutex // guard the following
mu sync.Mutex // guard the following

// drainChan is initialized when drain(...) is called the first time.
// After which the server writes out the first GoAway(with ID 2^31-1) frame.
// Then an independent goroutine will be launched to later send the second GoAway.
// During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
// Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
// already underway.
drainChan chan struct{}
state transportState
activeStreams map[uint32]*Stream
// the per-stream outbound flow control window size set by the peer.
Expand Down Expand Up @@ -593,6 +597,10 @@ const (

func (t *http2Server) handlePing(f *http2.PingFrame) {
if f.IsAck() {
if f.Data == goAwayPing.data && t.drainChan != nil {
close(t.drainChan)
return
}
// Maybe it's a BDP ping.
if t.bdpEst != nil {
t.bdpEst.calculate(f.Data)
Expand Down Expand Up @@ -632,7 +640,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {

if t.pingStrikes > maxPingStrikes {
// Send goaway and close the connection.
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings")})
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
}
}

Expand Down Expand Up @@ -978,23 +986,18 @@ func (t *http2Server) keepalive() {
continue
}
val := t.kp.MaxConnectionIdle - time.Since(idle)
t.mu.Unlock()
if val <= 0 {
// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
// Gracefully close the connection.
t.state = draining
t.mu.Unlock()
t.Drain()
t.drain(http2.ErrCodeNo, []byte{})
// Reseting the timer so that the clean-up doesn't deadlock.
maxIdle.Reset(infinity)
return
}
t.mu.Unlock()
maxIdle.Reset(val)
case <-maxAge.C:
t.mu.Lock()
t.state = draining
t.mu.Unlock()
t.Drain()
t.drain(http2.ErrCodeNo, []byte{})
maxAge.Reset(t.kp.MaxConnectionAgeGrace)
select {
case <-maxAge.C:
Expand Down Expand Up @@ -1026,6 +1029,8 @@ func (t *http2Server) keepalive() {
}
}

var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}

// controller running in a separate goroutine takes charge of sending control
// frames (e.g., window update, reset stream, setting, etc.) to the server.
func (t *http2Server) controller() {
Expand Down Expand Up @@ -1055,12 +1060,38 @@ func (t *http2Server) controller() {
return
}
sid := t.maxStreamID
t.state = draining
t.mu.Unlock()
t.framer.writeGoAway(true, sid, i.code, i.debugData)
if i.code == http2.ErrCodeEnhanceYourCalm {
t.Close()
if !i.headsUp {
// Stop accepting more streams now.
t.state = draining
t.mu.Unlock()
t.framer.writeGoAway(true, sid, i.code, i.debugData)
if i.closeConn {
// Abruptly close the connection following the GoAway.
t.Close()
}
t.writableChan <- 0
continue
}
t.mu.Unlock()
// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
// Follow that with a ping and wait for the ack to come back or a timer
// to expire. During this time accept new streams since they might have
// originated before the GoAway reaches the client.
// After getting the ack or timer expiration send out another GoAway this
// time with an ID of the max stream server intends to process.
t.framer.writeGoAway(true, math.MaxUint32, http2.ErrCodeNo, []byte{})
t.framer.writePing(true, false, goAwayPing.data)
go func() {
timer := time.NewTimer(time.Minute)
defer timer.Stop()
select {
case <-t.drainChan:
case <-timer.C:
case <-t.shutdownChan:
return
}
t.controlBuf.put(&goAway{code: i.code, debugData: i.debugData})
}()
case *flushIO:
t.framer.flushWrite()
case *ping:
Expand Down Expand Up @@ -1138,7 +1169,17 @@ func (t *http2Server) RemoteAddr() net.Addr {
}

func (t *http2Server) Drain() {
t.controlBuf.put(&goAway{code: http2.ErrCodeNo})
t.drain(http2.ErrCodeNo, []byte{})
}

func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
t.mu.Lock()
defer t.mu.Unlock()
if t.drainChan != nil {
return
}
t.drainChan = make(chan struct{})
t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
}

var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))
Expand Down