From edf81e4ac2f0f778314211188d60eff60a4f7e21 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Wed, 26 Jul 2017 15:56:20 -0700 Subject: [PATCH 1/7] First commit. --- transport/http2_server.go | 37 ++++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/transport/http2_server.go b/transport/http2_server.go index 0c946b127432..c3c190539a4f 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -65,9 +65,16 @@ type http2Server struct { // Blocking operations should select on shutdownChan to avoid // blocking forever after Close. shutdownChan chan struct{} - framer *framer - hBuf *bytes.Buffer // the buffer for HPACK encoding - hEnc *hpack.Encoder // HPACK encoder + // drainingBegun is set to true when the server writes out the first GoAway(with ID 2^31-1) frame out. + // After the first GoAway is sent an independent goroutine will be launched to + // later send the second GoAway. + // During this time we don't want to write another first GoAway(with ID 2^31 -1) to be written + // out. Thus call to Drain() will be a no-op if drainChan is already closed since draining is + // already underway. + drainingBegun bool + framer *framer + hBuf *bytes.Buffer // the buffer for HPACK encoding + hEnc *hpack.Encoder // HPACK encoder // The max number of concurrent streams. maxStreams uint32 @@ -632,7 +639,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) { if t.pingStrikes > maxPingStrikes { // Send goaway and close the connection. - t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings")}) + t.drain(http2.ErrCodeEnhanceYourCalm, []byte("too_many_pings")) } } @@ -978,23 +985,18 @@ func (t *http2Server) keepalive() { continue } val := t.kp.MaxConnectionIdle - time.Since(idle) + t.mu.Unlock() if val <= 0 { // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more. // Gracefully close the connection. - t.state = draining - t.mu.Unlock() - t.Drain() + t.drain(http2.ErrCodeNo, []byte{}) // Reseting the timer so that the clean-up doesn't deadlock. maxIdle.Reset(infinity) return } - t.mu.Unlock() maxIdle.Reset(val) case <-maxAge.C: - t.mu.Lock() - t.state = draining - t.mu.Unlock() - t.Drain() + t.drain(http2.ErrCodeNo, []byte{}) maxAge.Reset(t.kp.MaxConnectionAgeGrace) select { case <-maxAge.C: @@ -1138,7 +1140,16 @@ func (t *http2Server) RemoteAddr() net.Addr { } func (t *http2Server) Drain() { - t.controlBuf.put(&goAway{code: http2.ErrCodeNo}) + t.drain(http2.ErrCodeNo, []byte{}) +} + +func (t *http2Server) drain(code http2.ErrCode, debugData []byte) { + t.mu.Lock() + defer t.mu.Unlock() + if t.drainingBegun { + return + } + t.controlBuf.put(&goAway{code: code, debugData: debugData}) } var rgen = rand.New(rand.NewSource(time.Now().UnixNano())) From d9a12a110d3278d6e5b949f2feb25ce3b07bec5b Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Thu, 27 Jul 2017 17:01:53 -0700 Subject: [PATCH 2/7] Basic implementation --- transport/control.go | 1 + transport/http2_server.go | 45 ++++++++++++++++++++++++++++++++++++--- 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/transport/control.go b/transport/control.go index de9420a393f2..92065c71dadc 100644 --- a/transport/control.go +++ b/transport/control.go @@ -74,6 +74,7 @@ func (*resetStream) item() {} type goAway struct { code http2.ErrCode debugData []byte + isSecond bool } func (*goAway) item() {} diff --git a/transport/http2_server.go b/transport/http2_server.go index c3c190539a4f..0f14c7863af9 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -600,6 +600,10 @@ const ( func (t *http2Server) handlePing(f *http2.PingFrame) { if f.IsAck() { + if f.Data == goAwayPing.data { + close(t.drainChan) + return + } // Maybe it's a BDP ping. if t.bdpEst != nil { t.bdpEst.calculate(f.Data) @@ -1028,6 +1032,8 @@ func (t *http2Server) keepalive() { } } +var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}} + // controller running in a separate goroutine takes charge of sending control // frames (e.g., window update, reset stream, setting, etc.) to the server. func (t *http2Server) controller() { @@ -1056,13 +1062,45 @@ func (t *http2Server) controller() { // The transport is closing. return } + ch := t.drainChan sid := t.maxStreamID + if i.isSecond { + t.mu.Unlock() + t.framer.writeGoAway(true, sid, i.code, i.debugData) + continue + } t.state = draining t.mu.Unlock() - t.framer.writeGoAway(true, sid, i.code, i.debugData) - if i.code == http2.ErrCodeEnhanceYourCalm { + if i.code == http2.ErrCodeEnhanceYourCalm && i.debuData == []byte("too_many_pings") { + // Abruptly close the connection following the first GoAway. + t.framer.writeGoAway(true, sid, i.code, i.debugData) t.Close() + continue } + // For a graceful close, send out a GoAway with stream ID of MaxUInt32, + // Follow that with a ping and wait for the ack to come back or a timer + // to expire. During this time accept new streams since they might have + // originated before the GoAway reaches the client. + // After getting the ack or timer expiration send out another GoAway this + // time with an ID of the max stream server intends to process. + t.framer.writeGoAway(true, math.MaxUint32, i.code, i.debug) + t.framer.writePing(true, false, goAwayPing) + go func() { + timer := time.NewTimer(time.Minute) + defer func() { + if !timer.Stop() { + <-timer.C + } + }() + select { + case <-ch: + case <-timer: + timer.Reset(infinity) + case <-shutdownChan: + return + } + t.controlBuf.put(&goAway{code: i.code, debugData: i.debugData, isSecond: true}) + }() case *flushIO: t.framer.flushWrite() case *ping: @@ -1146,9 +1184,10 @@ func (t *http2Server) Drain() { func (t *http2Server) drain(code http2.ErrCode, debugData []byte) { t.mu.Lock() defer t.mu.Unlock() - if t.drainingBegun { + if t.drainChan != nil { return } + t.drainChan = make(chan struct{}) t.controlBuf.put(&goAway{code: code, debugData: debugData}) } From fb273cd7e73eec130115a868ec59fa5d37a93f3c Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Fri, 28 Jul 2017 15:01:23 -0700 Subject: [PATCH 3/7] Server should send two GoAways to gracefully shutdown the connection. --- transport/control.go | 1 + transport/http2_server.go | 61 ++++++++++++++++++++------------------- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/transport/control.go b/transport/control.go index 92065c71dadc..31e8513e7a52 100644 --- a/transport/control.go +++ b/transport/control.go @@ -75,6 +75,7 @@ type goAway struct { code http2.ErrCode debugData []byte isSecond bool + closeConn bool } func (*goAway) item() {} diff --git a/transport/http2_server.go b/transport/http2_server.go index 0f14c7863af9..3d275ff77e6e 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -65,17 +65,9 @@ type http2Server struct { // Blocking operations should select on shutdownChan to avoid // blocking forever after Close. shutdownChan chan struct{} - // drainingBegun is set to true when the server writes out the first GoAway(with ID 2^31-1) frame out. - // After the first GoAway is sent an independent goroutine will be launched to - // later send the second GoAway. - // During this time we don't want to write another first GoAway(with ID 2^31 -1) to be written - // out. Thus call to Drain() will be a no-op if drainChan is already closed since draining is - // already underway. - drainingBegun bool - framer *framer - hBuf *bytes.Buffer // the buffer for HPACK encoding - hEnc *hpack.Encoder // HPACK encoder - + framer *framer + hBuf *bytes.Buffer // the buffer for HPACK encoding + hEnc *hpack.Encoder // HPACK encoder // The max number of concurrent streams. maxStreams uint32 // controlBuf delivers all the control related tasks (e.g., window @@ -84,9 +76,7 @@ type http2Server struct { fc *inFlow // sendQuotaPool provides flow control to outbound message. sendQuotaPool *quotaPool - - stats stats.Handler - + stats stats.Handler // Flag to keep track of reading activity on transport. // 1 is true and 0 is false. activity uint32 // Accessed atomically. @@ -102,14 +92,22 @@ type http2Server struct { // Flag to signify that number of ping strikes should be reset to 0. // This is set whenever data or header frames are sent. // 1 means yes. - resetPingStrikes uint32 // Accessed atomically. - + resetPingStrikes uint32 // Accessed atomically. initialWindowSize int32 + bdpEst *bdpEstimator bdpEst *bdpEstimator outQuotaVersion uint32 - mu sync.Mutex // guard the following + mu sync.Mutex // guard the following + + // drainChan is initialized when drain(...) is called the first time. + // After which the server writes out the first GoAway(with ID 2^31-1) frame. + // Then an independent goroutine will be launched to later send the second GoAway. + // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame. + // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is + // already underway. + drainChan chan struct{} state transportState activeStreams map[uint32]*Stream // the per-stream outbound flow control window size set by the peer. @@ -643,7 +641,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) { if t.pingStrikes > maxPingStrikes { // Send goaway and close the connection. - t.drain(http2.ErrCodeEnhanceYourCalm, []byte("too_many_pings")) + t.drain(http2.ErrCodeEnhanceYourCalm, []byte("too_many_pings"), true) } } @@ -993,14 +991,14 @@ func (t *http2Server) keepalive() { if val <= 0 { // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more. // Gracefully close the connection. - t.drain(http2.ErrCodeNo, []byte{}) + t.drain(http2.ErrCodeNo, []byte{}, false) // Reseting the timer so that the clean-up doesn't deadlock. maxIdle.Reset(infinity) return } maxIdle.Reset(val) case <-maxAge.C: - t.drain(http2.ErrCodeNo, []byte{}) + t.drain(http2.ErrCodeNo, []byte{}, false) maxAge.Reset(t.kp.MaxConnectionAgeGrace) select { case <-maxAge.C: @@ -1065,16 +1063,19 @@ func (t *http2Server) controller() { ch := t.drainChan sid := t.maxStreamID if i.isSecond { + // Stop accepting more stream now. + t.state = draining t.mu.Unlock() t.framer.writeGoAway(true, sid, i.code, i.debugData) + t.writableChan <- 0 continue } - t.state = draining t.mu.Unlock() - if i.code == http2.ErrCodeEnhanceYourCalm && i.debuData == []byte("too_many_pings") { + if i.closeConn { // Abruptly close the connection following the first GoAway. - t.framer.writeGoAway(true, sid, i.code, i.debugData) + t.framer.writeGoAway(true, 0, i.code, i.debugData) t.Close() + t.writableChan <- 0 continue } // For a graceful close, send out a GoAway with stream ID of MaxUInt32, @@ -1083,8 +1084,8 @@ func (t *http2Server) controller() { // originated before the GoAway reaches the client. // After getting the ack or timer expiration send out another GoAway this // time with an ID of the max stream server intends to process. - t.framer.writeGoAway(true, math.MaxUint32, i.code, i.debug) - t.framer.writePing(true, false, goAwayPing) + t.framer.writeGoAway(true, math.MaxUint32, i.code, i.debugData) + t.framer.writePing(true, false, goAwayPing.data) go func() { timer := time.NewTimer(time.Minute) defer func() { @@ -1094,9 +1095,9 @@ func (t *http2Server) controller() { }() select { case <-ch: - case <-timer: + case <-timer.C: timer.Reset(infinity) - case <-shutdownChan: + case <-t.shutdownChan: return } t.controlBuf.put(&goAway{code: i.code, debugData: i.debugData, isSecond: true}) @@ -1178,17 +1179,17 @@ func (t *http2Server) RemoteAddr() net.Addr { } func (t *http2Server) Drain() { - t.drain(http2.ErrCodeNo, []byte{}) + t.drain(http2.ErrCodeNo, []byte{}, false) } -func (t *http2Server) drain(code http2.ErrCode, debugData []byte) { +func (t *http2Server) drain(code http2.ErrCode, debugData []byte, closeConn bool) { t.mu.Lock() defer t.mu.Unlock() if t.drainChan != nil { return } t.drainChan = make(chan struct{}) - t.controlBuf.put(&goAway{code: code, debugData: debugData}) + t.controlBuf.put(&goAway{code: code, debugData: debugData, closeConn: closeConn}) } var rgen = rand.New(rand.NewSource(time.Now().UnixNano())) From 6b2f810f5c60082dcaf2a4ed89ff4e7605ad57d0 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Fri, 28 Jul 2017 15:18:24 -0700 Subject: [PATCH 4/7] mend --- transport/http2_server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/transport/http2_server.go b/transport/http2_server.go index 3d275ff77e6e..90d62d618c7e 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -1073,7 +1073,7 @@ func (t *http2Server) controller() { t.mu.Unlock() if i.closeConn { // Abruptly close the connection following the first GoAway. - t.framer.writeGoAway(true, 0, i.code, i.debugData) + t.framer.writeGoAway(true, sid, i.code, i.debugData) t.Close() t.writableChan <- 0 continue @@ -1084,7 +1084,7 @@ func (t *http2Server) controller() { // originated before the GoAway reaches the client. // After getting the ack or timer expiration send out another GoAway this // time with an ID of the max stream server intends to process. - t.framer.writeGoAway(true, math.MaxUint32, i.code, i.debugData) + t.framer.writeGoAway(true, math.MaxUint32, http2.ErrCodeNo, []byte{}) t.framer.writePing(true, false, goAwayPing.data) go func() { timer := time.NewTimer(time.Minute) From 113585b51857638c3e19b0cbcb1a7ede0aeb4b01 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Mon, 31 Jul 2017 11:44:02 -0700 Subject: [PATCH 5/7] Post-review updates --- transport/control.go | 2 +- transport/http2_server.go | 39 +++++++++++++++------------------------ 2 files changed, 16 insertions(+), 25 deletions(-) diff --git a/transport/control.go b/transport/control.go index 31e8513e7a52..501eb03c49f7 100644 --- a/transport/control.go +++ b/transport/control.go @@ -74,7 +74,7 @@ func (*resetStream) item() {} type goAway struct { code http2.ErrCode debugData []byte - isSecond bool + headsUp bool closeConn bool } diff --git a/transport/http2_server.go b/transport/http2_server.go index 90d62d618c7e..a9f637d95267 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -598,7 +598,7 @@ const ( func (t *http2Server) handlePing(f *http2.PingFrame) { if f.IsAck() { - if f.Data == goAwayPing.data { + if f.Data == goAwayPing.data && t.drainChan != nil { close(t.drainChan) return } @@ -641,7 +641,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) { if t.pingStrikes > maxPingStrikes { // Send goaway and close the connection. - t.drain(http2.ErrCodeEnhanceYourCalm, []byte("too_many_pings"), true) + t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true}) } } @@ -991,14 +991,14 @@ func (t *http2Server) keepalive() { if val <= 0 { // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more. // Gracefully close the connection. - t.drain(http2.ErrCodeNo, []byte{}, false) + t.drain(http2.ErrCodeNo, []byte{}) // Reseting the timer so that the clean-up doesn't deadlock. maxIdle.Reset(infinity) return } maxIdle.Reset(val) case <-maxAge.C: - t.drain(http2.ErrCodeNo, []byte{}, false) + t.drain(http2.ErrCodeNo, []byte{}) maxAge.Reset(t.kp.MaxConnectionAgeGrace) select { case <-maxAge.C: @@ -1060,24 +1060,20 @@ func (t *http2Server) controller() { // The transport is closing. return } - ch := t.drainChan sid := t.maxStreamID - if i.isSecond { + if !i.headsUp { // Stop accepting more stream now. t.state = draining t.mu.Unlock() t.framer.writeGoAway(true, sid, i.code, i.debugData) + if i.closeConn { + // Abruptly close the connection following the GoAway. + t.Close() + } t.writableChan <- 0 continue } t.mu.Unlock() - if i.closeConn { - // Abruptly close the connection following the first GoAway. - t.framer.writeGoAway(true, sid, i.code, i.debugData) - t.Close() - t.writableChan <- 0 - continue - } // For a graceful close, send out a GoAway with stream ID of MaxUInt32, // Follow that with a ping and wait for the ack to come back or a timer // to expire. During this time accept new streams since they might have @@ -1088,19 +1084,14 @@ func (t *http2Server) controller() { t.framer.writePing(true, false, goAwayPing.data) go func() { timer := time.NewTimer(time.Minute) - defer func() { - if !timer.Stop() { - <-timer.C - } - }() + defer timer.Stop() select { - case <-ch: + case <-t.drainChan: case <-timer.C: - timer.Reset(infinity) case <-t.shutdownChan: return } - t.controlBuf.put(&goAway{code: i.code, debugData: i.debugData, isSecond: true}) + t.controlBuf.put(&goAway{code: i.code, debugData: i.debugData}) }() case *flushIO: t.framer.flushWrite() @@ -1179,17 +1170,17 @@ func (t *http2Server) RemoteAddr() net.Addr { } func (t *http2Server) Drain() { - t.drain(http2.ErrCodeNo, []byte{}, false) + t.drain(http2.ErrCodeNo, []byte{}) } -func (t *http2Server) drain(code http2.ErrCode, debugData []byte, closeConn bool) { +func (t *http2Server) drain(code http2.ErrCode, debugData []byte) { t.mu.Lock() defer t.mu.Unlock() if t.drainChan != nil { return } t.drainChan = make(chan struct{}) - t.controlBuf.put(&goAway{code: code, debugData: debugData, closeConn: closeConn}) + t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true}) } var rgen = rand.New(rand.NewSource(time.Now().UnixNano())) From a3cf7e9c77a6a69ea19b0decd6aa72ba9a15f776 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Tue, 1 Aug 2017 11:49:35 -0700 Subject: [PATCH 6/7] Fixed issue after rebase --- transport/http2_server.go | 1 - 1 file changed, 1 deletion(-) diff --git a/transport/http2_server.go b/transport/http2_server.go index a9f637d95267..6c32460222ab 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -96,7 +96,6 @@ type http2Server struct { initialWindowSize int32 bdpEst *bdpEstimator - bdpEst *bdpEstimator outQuotaVersion uint32 mu sync.Mutex // guard the following From 309ac89132ad28a9aa999c73b9f7a5247903f668 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Wed, 9 Aug 2017 10:50:49 -0700 Subject: [PATCH 7/7] Fixing typo --- transport/http2_server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transport/http2_server.go b/transport/http2_server.go index 6c32460222ab..b6f93e3c0c9f 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -1061,7 +1061,7 @@ func (t *http2Server) controller() { } sid := t.maxStreamID if !i.headsUp { - // Stop accepting more stream now. + // Stop accepting more streams now. t.state = draining t.mu.Unlock() t.framer.writeGoAway(true, sid, i.code, i.debugData)