From 5f2153fb301af5293d6c9bb993432316bd9ab310 Mon Sep 17 00:00:00 2001 From: Pierce Lopez Date: Tue, 15 Oct 2019 02:32:25 -0400 Subject: [PATCH] nsqd: fix stall in Exit() due to tcp-protocol producer connections Consumer connections are closed when topics are closed, but tcp-protocol publish connections are not, so add tcpServer.CloseAll(). problem introduced by https://github.com/nsqio/nsq/pull/1190 --- nsqd/nsqd.go | 11 +++++++++-- nsqd/tcp.go | 16 ++++++++++++++-- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 285ae8713..6fbf35d22 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -61,6 +61,7 @@ type NSQD struct { lookupPeers atomic.Value + tcpServer *tcpServer tcpListener net.Listener httpListener net.Listener httpsListener net.Listener @@ -154,6 +155,7 @@ func New(opts *Options) (*NSQD, error) { n.logf(LOG_INFO, version.String("nsqd")) n.logf(LOG_INFO, "ID: %d", opts.ID) + n.tcpServer = &tcpServer{} n.tcpListener, err = net.Listen("tcp", opts.TCPAddress) if err != nil { return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress, err) @@ -255,14 +257,16 @@ func (n *NSQD) Main() error { }) } - tcpServer := &tcpServer{ctx: ctx} + n.tcpServer.ctx = ctx n.waitGroup.Wrap(func() { - exitFunc(protocol.TCPServer(n.tcpListener, tcpServer, n.logf)) + exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf)) }) + httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired) n.waitGroup.Wrap(func() { exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)) }) + if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" { httpsServer := newHTTPServer(ctx, true, true) n.waitGroup.Wrap(func() { @@ -423,6 +427,9 @@ func (n *NSQD) Exit() { if n.tcpListener != nil { n.tcpListener.Close() } + if n.tcpServer != nil { + n.tcpServer.CloseAll() + } if n.httpListener != nil { n.httpListener.Close() diff --git a/nsqd/tcp.go b/nsqd/tcp.go index cbb9de203..527d66ccd 100644 --- a/nsqd/tcp.go +++ b/nsqd/tcp.go @@ -3,12 +3,14 @@ package nsqd import ( "io" "net" + "sync" "github.com/nsqio/nsq/internal/protocol" ) type tcpServer struct { - ctx *context + ctx *context + conns sync.Map } func (p *tcpServer) Handle(clientConn net.Conn) { @@ -41,9 +43,19 @@ func (p *tcpServer) Handle(clientConn net.Conn) { return } + p.conns.Store(clientConn.RemoteAddr(), clientConn) + err = prot.IOLoop(clientConn) if err != nil { p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err) - return } + + p.conns.Delete(clientConn.RemoteAddr()) +} + +func (p *tcpServer) CloseAll() { + p.conns.Range(func(k, v interface{}) bool { + v.(net.Conn).Close() + return true + }) }