From d3d0bbf01e22943700bb61b94fade60f99216f49 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Sat, 13 Jun 2020 21:09:12 -0700 Subject: [PATCH 1/4] Revert "nsqd: fix stall in Exit() due to tcp-protocol producer connections" This reverts commit 5f2153fb301af5293d6c9bb993432316bd9ab310. --- nsqd/nsqd.go | 10 ++-------- nsqd/tcp.go | 15 +-------------- 2 files changed, 3 insertions(+), 22 deletions(-) diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 20792e9bd..9a46d4a0e 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -65,7 +65,6 @@ type NSQD struct { lookupPeers atomic.Value - tcpServer *tcpServer tcpListener net.Listener httpListener net.Listener httpsListener net.Listener @@ -146,7 +145,6 @@ func New(opts *Options) (*NSQD, error) { n.logf(LOG_INFO, version.String("nsqd")) n.logf(LOG_INFO, "ID: %d", opts.ID) - n.tcpServer = &tcpServer{} n.tcpListener, err = net.Listen("tcp", opts.TCPAddress) if err != nil { return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress, err) @@ -264,16 +262,15 @@ func (n *NSQD) Main() error { }) } - n.tcpServer.nsqd = n + tcpServer := &tcpServer{nsqd: n} n.waitGroup.Wrap(func() { - exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf)) + exitFunc(protocol.TCPServer(n.tcpListener, tcpServer, n.logf)) }) httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired) n.waitGroup.Wrap(func() { exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)) }) - if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" { httpsServer := newHTTPServer(n, true, true) n.waitGroup.Wrap(func() { @@ -440,9 +437,6 @@ func (n *NSQD) Exit() { if n.tcpListener != nil { n.tcpListener.Close() } - if n.tcpServer != nil { - n.tcpServer.CloseAll() - } if n.httpListener != nil { n.httpListener.Close() diff --git a/nsqd/tcp.go b/nsqd/tcp.go index 7279b6f3f..26d9c16ef 100644 --- a/nsqd/tcp.go +++ b/nsqd/tcp.go @@ -3,14 +3,12 @@ package nsqd import ( "io" "net" - "sync" "github.com/nsqio/nsq/internal/protocol" ) type tcpServer struct { - nsqd *NSQD - conns sync.Map + nsqd *NSQD } func (p *tcpServer) Handle(clientConn net.Conn) { @@ -43,19 +41,8 @@ func (p *tcpServer) Handle(clientConn net.Conn) { return } - p.conns.Store(clientConn.RemoteAddr(), clientConn) - err = prot.IOLoop(clientConn) if err != nil { p.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err) } - - p.conns.Delete(clientConn.RemoteAddr()) -} - -func (p *tcpServer) CloseAll() { - p.conns.Range(func(k, v interface{}) bool { - v.(net.Conn).Close() - return true - }) } From ccb19eac36ed9001de84701dfa0570b6e202afea Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Sat, 13 Jun 2020 21:09:53 -0700 Subject: [PATCH 2/4] nsqd: close all clients on exit --- internal/protocol/protocol.go | 7 ++++- nsqd/nsqd.go | 35 +++++-------------------- nsqd/protocol_v2.go | 13 +++++----- nsqd/protocol_v2_test.go | 3 ++- nsqd/stats.go | 12 +++++---- nsqd/tcp.go | 37 +++++++++++++++++++-------- nsqlookupd/lookup_protocol_v1.go | 14 +++++++--- nsqlookupd/lookup_protocol_v1_test.go | 3 ++- nsqlookupd/nsqlookupd.go | 4 +-- nsqlookupd/tcp.go | 30 ++++++++++++---------- 10 files changed, 85 insertions(+), 73 deletions(-) diff --git a/internal/protocol/protocol.go b/internal/protocol/protocol.go index 3051e5d65..6ff7f59ad 100644 --- a/internal/protocol/protocol.go +++ b/internal/protocol/protocol.go @@ -6,9 +6,14 @@ import ( "net" ) +type Client interface { + Close() error +} + // Protocol describes the basic behavior of any protocol in the system type Protocol interface { - IOLoop(conn net.Conn) error + NewClient(net.Conn) Client + IOLoop(Client) error } // SendResponse is a server side utility function to prefix data with a length header diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 9a46d4a0e..550647d6e 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -37,11 +37,6 @@ type errStore struct { err error } -type Client interface { - Stats() ClientStats - IsProducer() bool -} - type NSQD struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms clientIDSequence int64 @@ -60,11 +55,9 @@ type NSQD struct { topicMap map[string]*Topic - clientLock sync.RWMutex - clients map[int64]Client - lookupPeers atomic.Value + tcpServer *tcpServer tcpListener net.Listener httpListener net.Listener httpsListener net.Listener @@ -95,7 +88,6 @@ func New(opts *Options) (*NSQD, error) { n := &NSQD{ startTime: time.Now(), topicMap: make(map[string]*Topic), - clients: make(map[int64]Client), exitChan: make(chan int), notifyChan: make(chan interface{}), optsNotificationChan: make(chan struct{}, 1), @@ -145,6 +137,7 @@ func New(opts *Options) (*NSQD, error) { n.logf(LOG_INFO, version.String("nsqd")) n.logf(LOG_INFO, "ID: %d", opts.ID) + n.tcpServer = &tcpServer{nsqd: n} n.tcpListener, err = net.Listen("tcp", opts.TCPAddress) if err != nil { return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress, err) @@ -233,23 +226,6 @@ func (n *NSQD) GetStartTime() time.Time { return n.startTime } -func (n *NSQD) AddClient(clientID int64, client Client) { - n.clientLock.Lock() - n.clients[clientID] = client - n.clientLock.Unlock() -} - -func (n *NSQD) RemoveClient(clientID int64) { - n.clientLock.Lock() - _, ok := n.clients[clientID] - if !ok { - n.clientLock.Unlock() - return - } - delete(n.clients, clientID) - n.clientLock.Unlock() -} - func (n *NSQD) Main() error { exitCh := make(chan error) var once sync.Once @@ -262,9 +238,8 @@ func (n *NSQD) Main() error { }) } - tcpServer := &tcpServer{nsqd: n} n.waitGroup.Wrap(func() { - exitFunc(protocol.TCPServer(n.tcpListener, tcpServer, n.logf)) + exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf)) }) httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired) @@ -438,6 +413,10 @@ func (n *NSQD) Exit() { n.tcpListener.Close() } + if n.tcpServer != nil { + n.tcpServer.Close() + } + if n.httpListener != nil { n.httpListener.Close() } diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index c95aeba6d..ccac1b0f1 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -33,14 +33,17 @@ type protocolV2 struct { nsqd *NSQD } -func (p *protocolV2) IOLoop(conn net.Conn) error { +func (p *protocolV2) NewClient(conn net.Conn) protocol.Client { + clientID := atomic.AddInt64(&p.nsqd.clientIDSequence, 1) + return newClientV2(clientID, conn, p.nsqd) +} + +func (p *protocolV2) IOLoop(c protocol.Client) error { var err error var line []byte var zeroTime time.Time - clientID := atomic.AddInt64(&p.nsqd.clientIDSequence, 1) - client := newClientV2(clientID, conn, p.nsqd) - p.nsqd.AddClient(client.ID, client) + client := c.(*clientV2) // synchronize the startup of messagePump in order // to guarantee that it gets a chance to initialize @@ -112,13 +115,11 @@ func (p *protocolV2) IOLoop(conn net.Conn) error { } p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client) - conn.Close() close(client.ExitChan) if client.Channel != nil { client.Channel.RemoveClient(client.ID) } - p.nsqd.RemoveClient(client.ID) return err } diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index 5ff8ad9d5..84803f007 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -1608,7 +1608,8 @@ func testIOLoopReturnsClientErr(t *testing.T, fakeConn test.FakeNetConn) { prot := &protocolV2{nsqd: nsqd} defer prot.nsqd.Exit() - err = prot.IOLoop(fakeConn) + client := prot.NewClient(fakeConn) + err = prot.IOLoop(client) test.NotNil(t, err) test.Equal(t, "E_INVALID invalid command INVALID_COMMAND", err.Error()) test.NotNil(t, err.(*protocol.FatalClientErr)) diff --git a/nsqd/stats.go b/nsqd/stats.go index 351667814..c739a1ee3 100644 --- a/nsqd/stats.go +++ b/nsqd/stats.go @@ -185,14 +185,16 @@ func (n *NSQD) GetStats(topic string, channel string, includeClients bool) []Top } func (n *NSQD) GetProducerStats() []ClientStats { - n.clientLock.RLock() - var producers []Client - for _, c := range n.clients { + var producers []*clientV2 + + n.tcpServer.conns.Range(func(k, v interface{}) bool { + c := v.(*clientV2) if c.IsProducer() { producers = append(producers, c) } - } - n.clientLock.RUnlock() + return true + }) + producerStats := make([]ClientStats, 0, len(producers)) for _, p := range producers { producerStats = append(producerStats, p.Stats()) diff --git a/nsqd/tcp.go b/nsqd/tcp.go index 26d9c16ef..0241472ca 100644 --- a/nsqd/tcp.go +++ b/nsqd/tcp.go @@ -3,46 +3,61 @@ package nsqd import ( "io" "net" + "sync" "github.com/nsqio/nsq/internal/protocol" ) type tcpServer struct { - nsqd *NSQD + nsqd *NSQD + conns sync.Map } -func (p *tcpServer) Handle(clientConn net.Conn) { - p.nsqd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr()) +func (p *tcpServer) Handle(conn net.Conn) { + p.nsqd.logf(LOG_INFO, "TCP: new client(%s)", conn.RemoteAddr()) // The client should initialize itself by sending a 4 byte sequence indicating // the version of the protocol that it intends to communicate, this will allow us // to gracefully upgrade the protocol away from text/line oriented to whatever... buf := make([]byte, 4) - _, err := io.ReadFull(clientConn, buf) + _, err := io.ReadFull(conn, buf) if err != nil { p.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err) - clientConn.Close() + conn.Close() return } protocolMagic := string(buf) p.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'", - clientConn.RemoteAddr(), protocolMagic) + conn.RemoteAddr(), protocolMagic) var prot protocol.Protocol switch protocolMagic { case " V2": prot = &protocolV2{nsqd: p.nsqd} default: - protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL")) - clientConn.Close() + protocol.SendFramedResponse(conn, frameTypeError, []byte("E_BAD_PROTOCOL")) + conn.Close() p.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'", - clientConn.RemoteAddr(), protocolMagic) + conn.RemoteAddr(), protocolMagic) return } - err = prot.IOLoop(clientConn) + client := prot.NewClient(conn) + p.conns.Store(conn.RemoteAddr(), client) + + err = prot.IOLoop(client) if err != nil { - p.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err) + p.nsqd.logf(LOG_ERROR, "client(%s) - %s", conn.RemoteAddr(), err) } + + p.conns.Delete(conn.RemoteAddr()) + client.Close() +} + +func (p *tcpServer) Close() { + p.conns.Range(func(k, v interface{}) bool { + v.(protocol.Client).Close() + return true + }) } diff --git a/nsqlookupd/lookup_protocol_v1.go b/nsqlookupd/lookup_protocol_v1.go index 1236833ce..af698535a 100644 --- a/nsqlookupd/lookup_protocol_v1.go +++ b/nsqlookupd/lookup_protocol_v1.go @@ -21,11 +21,16 @@ type LookupProtocolV1 struct { nsqlookupd *NSQLookupd } -func (p *LookupProtocolV1) IOLoop(conn net.Conn) error { +func (p *LookupProtocolV1) NewClient(conn net.Conn) protocol.Client { + return NewClientV1(conn) +} + +func (p *LookupProtocolV1) IOLoop(c protocol.Client) error { var err error var line string - client := NewClientV1(conn) + client := c.(*ClientV1) + reader := bufio.NewReader(client) for { line, err = reader.ReadString('\n') @@ -66,8 +71,8 @@ func (p *LookupProtocolV1) IOLoop(conn net.Conn) error { } } - conn.Close() - p.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): closing", client) + p.nsqlookupd.logf(LOG_INFO, "PROTOCOL(V1): [%s] exiting ioloop", client) + if client.peerInfo != nil { registrations := p.nsqlookupd.DB.LookupRegistrations(client.peerInfo.id) for _, r := range registrations { @@ -77,6 +82,7 @@ func (p *LookupProtocolV1) IOLoop(conn net.Conn) error { } } } + return err } diff --git a/nsqlookupd/lookup_protocol_v1_test.go b/nsqlookupd/lookup_protocol_v1_test.go index 86aadefc7..8ce0d3ce0 100644 --- a/nsqlookupd/lookup_protocol_v1_test.go +++ b/nsqlookupd/lookup_protocol_v1_test.go @@ -44,7 +44,8 @@ func testIOLoopReturnsClientErr(t *testing.T, fakeConn test.FakeNetConn) { errChan := make(chan error) testIOLoop := func() { - errChan <- prot.IOLoop(fakeConn) + client := prot.NewClient(fakeConn) + errChan <- prot.IOLoop(client) defer prot.nsqlookupd.Exit() } go testIOLoop() diff --git a/nsqlookupd/nsqlookupd.go b/nsqlookupd/nsqlookupd.go index d56426f57..0996d087a 100644 --- a/nsqlookupd/nsqlookupd.go +++ b/nsqlookupd/nsqlookupd.go @@ -36,6 +36,7 @@ func New(opts *Options) (*NSQLookupd, error) { l.logf(LOG_INFO, version.String("nsqlookupd")) + l.tcpServer = &tcpServer{nsqlookupd: l} l.tcpListener, err = net.Listen("tcp", opts.TCPAddress) if err != nil { return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress, err) @@ -62,7 +63,6 @@ func (l *NSQLookupd) Main() error { }) } - l.tcpServer = &tcpServer{nsqlookupd: l} l.waitGroup.Wrap(func() { exitFunc(protocol.TCPServer(l.tcpListener, l.tcpServer, l.logf)) }) @@ -89,7 +89,7 @@ func (l *NSQLookupd) Exit() { } if l.tcpServer != nil { - l.tcpServer.CloseAll() + l.tcpServer.Close() } if l.httpListener != nil { diff --git a/nsqlookupd/tcp.go b/nsqlookupd/tcp.go index 405d6c6cb..d57a7b353 100644 --- a/nsqlookupd/tcp.go +++ b/nsqlookupd/tcp.go @@ -13,49 +13,51 @@ type tcpServer struct { conns sync.Map } -func (p *tcpServer) Handle(clientConn net.Conn) { - p.nsqlookupd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr()) +func (p *tcpServer) Handle(conn net.Conn) { + p.nsqlookupd.logf(LOG_INFO, "TCP: new client(%s)", conn.RemoteAddr()) // The client should initialize itself by sending a 4 byte sequence indicating // the version of the protocol that it intends to communicate, this will allow us // to gracefully upgrade the protocol away from text/line oriented to whatever... buf := make([]byte, 4) - _, err := io.ReadFull(clientConn, buf) + _, err := io.ReadFull(conn, buf) if err != nil { p.nsqlookupd.logf(LOG_ERROR, "failed to read protocol version - %s", err) - clientConn.Close() + conn.Close() return } protocolMagic := string(buf) p.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'", - clientConn.RemoteAddr(), protocolMagic) + conn.RemoteAddr(), protocolMagic) var prot protocol.Protocol switch protocolMagic { case " V1": prot = &LookupProtocolV1{nsqlookupd: p.nsqlookupd} default: - protocol.SendResponse(clientConn, []byte("E_BAD_PROTOCOL")) - clientConn.Close() + protocol.SendResponse(conn, []byte("E_BAD_PROTOCOL")) + conn.Close() p.nsqlookupd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'", - clientConn.RemoteAddr(), protocolMagic) + conn.RemoteAddr(), protocolMagic) return } - p.conns.Store(clientConn.RemoteAddr(), clientConn) + client := prot.NewClient(conn) + p.conns.Store(conn.RemoteAddr(), client) - err = prot.IOLoop(clientConn) + err = prot.IOLoop(client) if err != nil { - p.nsqlookupd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err) + p.nsqlookupd.logf(LOG_ERROR, "client(%s) - %s", conn.RemoteAddr(), err) } - p.conns.Delete(clientConn.RemoteAddr()) + p.conns.Delete(conn.RemoteAddr()) + client.Close() } -func (p *tcpServer) CloseAll() { +func (p *tcpServer) Close() { p.conns.Range(func(k, v interface{}) bool { - v.(net.Conn).Close() + v.(protocol.Client).Close() return true }) } From bf28ae3753f4671dc4fd7c3ffbebbf2cab67486d Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Thu, 31 Dec 2020 17:02:10 -0800 Subject: [PATCH 3/4] nsqd: more client/stats interface refactoring --- nsqd/channel.go | 2 +- nsqd/channel_test.go | 4 +- nsqd/client_v2.go | 91 +++++++++++++++++++++++++++++++++++---- nsqd/http.go | 100 +++++-------------------------------------- nsqd/stats.go | 85 +++++++++++++----------------------- nsqd/stats_test.go | 10 ++--- nsqd/statsd.go | 6 +-- nsqd/tcp.go | 10 +++++ 8 files changed, 144 insertions(+), 164 deletions(-) diff --git a/nsqd/channel.go b/nsqd/channel.go index 3fc931c38..8838c1e3d 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -21,7 +21,7 @@ type Consumer interface { Pause() Close() error TimedOutMessage() - Stats() ClientStats + Stats(string) ClientStats Empty() } diff --git a/nsqd/channel_test.go b/nsqd/channel_test.go index 5d4d786d5..96386f4cc 100644 --- a/nsqd/channel_test.go +++ b/nsqd/channel_test.go @@ -162,14 +162,14 @@ func TestChannelEmptyConsumer(t *testing.T) { } for _, cl := range channel.clients { - stats := cl.Stats() + stats := cl.Stats("").(ClientV2Stats) test.Equal(t, int64(25), stats.InFlightCount) } channel.Empty() for _, cl := range channel.clients { - stats := cl.Stats() + stats := cl.Stats("").(ClientV2Stats) test.Equal(t, int64(0), stats.InFlightCount) } } diff --git a/nsqd/client_v2.go b/nsqd/client_v2.go index f81b760e0..ee38439f1 100644 --- a/nsqd/client_v2.go +++ b/nsqd/client_v2.go @@ -6,6 +6,7 @@ import ( "crypto/tls" "fmt" "net" + "strings" "sync" "sync/atomic" "time" @@ -47,6 +48,71 @@ type identifyEvent struct { MsgTimeout time.Duration } +type PubCount struct { + Topic string `json:"topic"` + Count uint64 `json:"count"` +} + +type ClientV2Stats struct { + ClientID string `json:"client_id"` + Hostname string `json:"hostname"` + Version string `json:"version"` + RemoteAddress string `json:"remote_address"` + State int32 `json:"state"` + ReadyCount int64 `json:"ready_count"` + InFlightCount int64 `json:"in_flight_count"` + MessageCount uint64 `json:"message_count"` + FinishCount uint64 `json:"finish_count"` + RequeueCount uint64 `json:"requeue_count"` + ConnectTime int64 `json:"connect_ts"` + SampleRate int32 `json:"sample_rate"` + Deflate bool `json:"deflate"` + Snappy bool `json:"snappy"` + UserAgent string `json:"user_agent"` + Authed bool `json:"authed,omitempty"` + AuthIdentity string `json:"auth_identity,omitempty"` + AuthIdentityURL string `json:"auth_identity_url,omitempty"` + + PubCounts []PubCount `json:"pub_counts,omitempty"` + + TLS bool `json:"tls"` + CipherSuite string `json:"tls_cipher_suite"` + TLSVersion string `json:"tls_version"` + TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"` + TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"` +} + +func (s ClientV2Stats) String() string { + connectTime := time.Unix(s.ConnectTime, 0) + duration := time.Since(connectTime).Truncate(time.Second) + if len(s.PubCounts) > 0 { + var total uint64 + var topicOut []string + for _, v := range s.PubCounts { + total += v.Count + topicOut = append(topicOut, fmt.Sprintf("%s=%d", v.Topic, v.Count)) + } + return fmt.Sprintf("[%s %-21s] msgs: %-8d topics: %s connected: %s", + s.Version, + s.ClientID, + total, + strings.Join(topicOut, ","), + duration, + ) + } + return fmt.Sprintf("[%s %-21s] state: %d inflt: %-4d rdy: %-4d fin: %-8d re-q: %-8d msgs: %-8d connected: %s", + s.Version, + s.ClientID, + s.State, + s.InFlightCount, + s.ReadyCount, + s.FinishCount, + s.RequeueCount, + s.MessageCount, + duration, + ) +} + type clientV2 struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms ReadyCount int64 @@ -154,6 +220,16 @@ func (c *clientV2) String() string { return c.RemoteAddr().String() } +func (c *clientV2) Type() int { + c.metaLock.RLock() + hasPublished := len(c.pubCounts) > 0 + c.metaLock.RUnlock() + if hasPublished { + return typeProducer + } + return typeConsumer +} + func (c *clientV2) Identify(data identifyDataV2) error { c.nsqd.logf(LOG_INFO, "[%s] IDENTIFY: %+v", c, data) @@ -199,7 +275,7 @@ func (c *clientV2) Identify(data identifyDataV2) error { return nil } -func (c *clientV2) Stats() ClientStats { +func (c *clientV2) Stats(topicName string) ClientStats { c.metaLock.RLock() clientID := c.ClientID hostname := c.Hostname @@ -212,13 +288,17 @@ func (c *clientV2) Stats() ClientStats { } pubCounts := make([]PubCount, 0, len(c.pubCounts)) for topic, count := range c.pubCounts { + if len(topicName) > 0 && topic != topicName { + continue + } pubCounts = append(pubCounts, PubCount{ Topic: topic, Count: count, }) + break } c.metaLock.RUnlock() - stats := ClientStats{ + stats := ClientV2Stats{ Version: "V2", RemoteAddress: c.RemoteAddr().String(), ClientID: clientID, @@ -250,13 +330,6 @@ func (c *clientV2) Stats() ClientStats { return stats } -func (c *clientV2) IsProducer() bool { - c.metaLock.RLock() - retval := len(c.pubCounts) > 0 - c.metaLock.RUnlock() - return retval -} - // struct to convert from integers to the human readable strings type prettyConnectionState struct { tls.ConnectionState diff --git a/nsqd/http.go b/nsqd/http.go index ffdec549d..f0db91a2c 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -471,8 +471,6 @@ func (s *httpServer) doPauseChannel(w http.ResponseWriter, req *http.Request, ps } func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { - var producerStats []ClientStats - reqParams, err := http_api.NewReqParams(req) if err != nil { s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err) @@ -493,65 +491,22 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro if !ok { includeMem = true } - if includeClients { - producerStats = s.nsqd.GetProducerStats() - } stats := s.nsqd.GetStats(topicName, channelName, includeClients) health := s.nsqd.GetHealth() startTime := s.nsqd.GetStartTime() uptime := time.Since(startTime) - // filter by topic (if specified) - if len(topicName) > 0 { - for _, topicStats := range stats { - if topicStats.TopicName == topicName { - // filter by channel (if specified) - if len(channelName) > 0 { - for _, channelStats := range topicStats.Channels { - if channelStats.ChannelName == channelName { - topicStats.Channels = []ChannelStats{channelStats} - break - } - } - } - stats = []TopicStats{topicStats} - break - } - } - - filteredProducerStats := make([]ClientStats, 0) - for _, clientStat := range producerStats { - var found bool - var count uint64 - for _, v := range clientStat.PubCounts { - if v.Topic == topicName { - count = v.Count - found = true - break - } - } - if !found { - continue - } - clientStat.PubCounts = []PubCount{PubCount{ - Topic: topicName, - Count: count, - }} - filteredProducerStats = append(filteredProducerStats, clientStat) - } - producerStats = filteredProducerStats - } - var ms *memStats if includeMem { m := getMemStats() ms = &m } if !jsonFormat { - return s.printStats(stats, producerStats, ms, health, startTime, uptime), nil + return s.printStats(stats, ms, health, startTime, uptime), nil } + // TODO: should producer stats be hung off topics? return struct { Version string `json:"version"` Health string `json:"health"` @@ -559,15 +514,13 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro Topics []TopicStats `json:"topics"` Memory *memStats `json:"memory,omitempty"` Producers []ClientStats `json:"producers"` - }{version.Binary, health, startTime.Unix(), stats, ms, producerStats}, nil + }{version.Binary, health, startTime.Unix(), stats.Topics, ms, stats.Producers}, nil } -func (s *httpServer) printStats(stats []TopicStats, producerStats []ClientStats, ms *memStats, health string, startTime time.Time, uptime time.Duration) []byte { +func (s *httpServer) printStats(stats Stats, ms *memStats, health string, startTime time.Time, uptime time.Duration) []byte { var buf bytes.Buffer w := &buf - now := time.Now() - fmt.Fprintf(w, "%s\n", version.String("nsqd")) fmt.Fprintf(w, "start_time %v\n", startTime.Format(time.RFC3339)) fmt.Fprintf(w, "uptime %s\n", uptime) @@ -587,13 +540,13 @@ func (s *httpServer) printStats(stats []TopicStats, producerStats []ClientStats, fmt.Fprintf(w, " %-25s\t%d\n", "gc_total_runs", ms.GCTotalRuns) } - if len(stats) == 0 { + if len(stats.Topics) == 0 { fmt.Fprintf(w, "\nTopics: None\n") } else { fmt.Fprintf(w, "\nTopics:") } - for _, t := range stats { + for _, t := range stats.Topics { var pausedPrefix string if t.Paused { pausedPrefix = "*P " @@ -627,48 +580,17 @@ func (s *httpServer) printStats(stats []TopicStats, producerStats []ClientStats, c.E2eProcessingLatency, ) for _, client := range c.Clients { - connectTime := time.Unix(client.ConnectTime, 0) - // truncate to the second - duration := time.Duration(int64(now.Sub(connectTime).Seconds())) * time.Second - fmt.Fprintf(w, " [%s %-21s] state: %d inflt: %-4d rdy: %-4d fin: %-8d re-q: %-8d msgs: %-8d connected: %s\n", - client.Version, - client.ClientID, - client.State, - client.InFlightCount, - client.ReadyCount, - client.FinishCount, - client.RequeueCount, - client.MessageCount, - duration, - ) + fmt.Fprintf(w, " %s\n", client) } } } - if len(producerStats) == 0 { + if len(stats.Producers) == 0 { fmt.Fprintf(w, "\nProducers: None\n") } else { - fmt.Fprintf(w, "\nProducers:") - for _, client := range producerStats { - connectTime := time.Unix(client.ConnectTime, 0) - // truncate to the second - duration := time.Duration(int64(now.Sub(connectTime).Seconds())) * time.Second - var totalPubCount uint64 - for _, v := range client.PubCounts { - totalPubCount += v.Count - } - fmt.Fprintf(w, "\n [%s %-21s] msgs: %-8d connected: %s\n", - client.Version, - client.ClientID, - totalPubCount, - duration, - ) - for _, v := range client.PubCounts { - fmt.Fprintf(w, " [%-15s] msgs: %-8d\n", - v.Topic, - v.Count, - ) - } + fmt.Fprintf(w, "\nProducers:\n") + for _, client := range stats.Producers { + fmt.Fprintf(w, " %s\n", client) } } diff --git a/nsqd/stats.go b/nsqd/stats.go index c739a1ee3..94a7be7d6 100644 --- a/nsqd/stats.go +++ b/nsqd/stats.go @@ -8,6 +8,15 @@ import ( "github.com/nsqio/nsq/internal/quantile" ) +type Stats struct { + Topics []TopicStats + Producers []ClientStats +} + +type ClientStats interface { + String() string +} + type TopicStats struct { TopicName string `json:"topic_name"` Channels []ChannelStats `json:"channels"` @@ -75,40 +84,6 @@ func NewChannelStats(c *Channel, clients []ClientStats, clientCount int) Channel } } -type PubCount struct { - Topic string `json:"topic"` - Count uint64 `json:"count"` -} - -type ClientStats struct { - ClientID string `json:"client_id"` - Hostname string `json:"hostname"` - Version string `json:"version"` - RemoteAddress string `json:"remote_address"` - State int32 `json:"state"` - ReadyCount int64 `json:"ready_count"` - InFlightCount int64 `json:"in_flight_count"` - MessageCount uint64 `json:"message_count"` - FinishCount uint64 `json:"finish_count"` - RequeueCount uint64 `json:"requeue_count"` - ConnectTime int64 `json:"connect_ts"` - SampleRate int32 `json:"sample_rate"` - Deflate bool `json:"deflate"` - Snappy bool `json:"snappy"` - UserAgent string `json:"user_agent"` - Authed bool `json:"authed,omitempty"` - AuthIdentity string `json:"auth_identity,omitempty"` - AuthIdentityURL string `json:"auth_identity_url,omitempty"` - - PubCounts []PubCount `json:"pub_counts,omitempty"` - - TLS bool `json:"tls"` - CipherSuite string `json:"tls_cipher_suite"` - TLSVersion string `json:"tls_version"` - TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"` - TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"` -} - type Topics []*Topic func (t Topics) Len() int { return len(t) } @@ -131,7 +106,9 @@ type ChannelsByName struct { func (c ChannelsByName) Less(i, j int) bool { return c.Channels[i].name < c.Channels[j].name } -func (n *NSQD) GetStats(topic string, channel string, includeClients bool) []TopicStats { +func (n *NSQD) GetStats(topic string, channel string, includeClients bool) Stats { + var stats Stats + n.RLock() var realTopics []*Topic if topic == "" { @@ -143,11 +120,13 @@ func (n *NSQD) GetStats(topic string, channel string, includeClients bool) []Top realTopics = []*Topic{val} } else { n.RUnlock() - return []TopicStats{} + return stats } n.RUnlock() sort.Sort(TopicsByName{realTopics}) + topics := make([]TopicStats, 0, len(realTopics)) + for _, t := range realTopics { t.RLock() var realChannels []*Channel @@ -172,7 +151,7 @@ func (n *NSQD) GetStats(topic string, channel string, includeClients bool) []Top if includeClients { clients = make([]ClientStats, 0, len(c.clients)) for _, client := range c.clients { - clients = append(clients, client.Stats()) + clients = append(clients, client.Stats(topic)) } } clientCount = len(c.clients) @@ -181,25 +160,21 @@ func (n *NSQD) GetStats(topic string, channel string, includeClients bool) []Top } topics = append(topics, NewTopicStats(t, channels)) } - return topics -} - -func (n *NSQD) GetProducerStats() []ClientStats { - var producers []*clientV2 - - n.tcpServer.conns.Range(func(k, v interface{}) bool { - c := v.(*clientV2) - if c.IsProducer() { - producers = append(producers, c) - } - return true - }) - - producerStats := make([]ClientStats, 0, len(producers)) - for _, p := range producers { - producerStats = append(producerStats, p.Stats()) + stats.Topics = topics + + if includeClients { + var producerStats []ClientStats + n.tcpServer.conns.Range(func(k, v interface{}) bool { + c := v.(Client) + if c.Type() == typeProducer { + producerStats = append(producerStats, c.Stats(topic)) + } + return true + }) + stats.Producers = producerStats } - return producerStats + + return stats } type memStats struct { diff --git a/nsqd/stats_test.go b/nsqd/stats_test.go index 7ec8a461f..065dfef6f 100644 --- a/nsqd/stats_test.go +++ b/nsqd/stats_test.go @@ -38,7 +38,7 @@ func TestStats(t *testing.T) { identify(t, conn, nil, frameTypeResponse) sub(t, conn, topicName, "ch") - stats := nsqd.GetStats(topicName, "ch", true) + stats := nsqd.GetStats(topicName, "ch", true).Topics t.Logf("stats: %+v", stats) test.Equal(t, 1, len(stats)) @@ -46,7 +46,7 @@ func TestStats(t *testing.T) { test.Equal(t, 1, len(stats[0].Channels[0].Clients)) test.Equal(t, 1, stats[0].Channels[0].ClientCount) - stats = nsqd.GetStats(topicName, "ch", false) + stats = nsqd.GetStats(topicName, "ch", false).Topics t.Logf("stats: %+v", stats) test.Equal(t, 1, len(stats)) @@ -54,12 +54,12 @@ func TestStats(t *testing.T) { test.Equal(t, 0, len(stats[0].Channels[0].Clients)) test.Equal(t, 1, stats[0].Channels[0].ClientCount) - stats = nsqd.GetStats(topicName, "none_exist_channel", false) + stats = nsqd.GetStats(topicName, "none_exist_channel", false).Topics t.Logf("stats: %+v", stats) test.Equal(t, 0, len(stats)) - stats = nsqd.GetStats("none_exist_topic", "none_exist_channel", false) + stats = nsqd.GetStats("none_exist_topic", "none_exist_channel", false).Topics t.Logf("stats: %+v", stats) test.Equal(t, 0, len(stats)) @@ -150,7 +150,7 @@ func TestStatsChannelLocking(t *testing.T) { wg.Wait() - stats := nsqd.GetStats(topicName, "channel", false) + stats := nsqd.GetStats(topicName, "channel", false).Topics t.Logf("stats: %+v", stats) test.Equal(t, 1, len(stats)) diff --git a/nsqd/statsd.go b/nsqd/statsd.go index 582bf773c..9133dc8ba 100644 --- a/nsqd/statsd.go +++ b/nsqd/statsd.go @@ -26,7 +26,7 @@ func (s Uint64Slice) Less(i, j int) bool { func (n *NSQD) statsdLoop() { var lastMemStats memStats - var lastStats []TopicStats + var lastStats Stats interval := n.getOpts().StatsdInterval ticker := time.NewTicker(interval) for { @@ -48,10 +48,10 @@ func (n *NSQD) statsdLoop() { n.logf(LOG_INFO, "STATSD: pushing stats to %s", addr) stats := n.GetStats("", "", false) - for _, topic := range stats { + for _, topic := range stats.Topics { // try to find the topic in the last collection lastTopic := TopicStats{} - for _, checkTopic := range lastStats { + for _, checkTopic := range lastStats.Topics { if topic.TopicName == checkTopic.TopicName { lastTopic = checkTopic break diff --git a/nsqd/tcp.go b/nsqd/tcp.go index 0241472ca..80637d4df 100644 --- a/nsqd/tcp.go +++ b/nsqd/tcp.go @@ -8,6 +8,16 @@ import ( "github.com/nsqio/nsq/internal/protocol" ) +const ( + typeConsumer = iota + typeProducer +) + +type Client interface { + Type() int + Stats(string) ClientStats +} + type tcpServer struct { nsqd *NSQD conns sync.Map From 11b3d89bfdf208bea21b699edfbd81c23f107d4c Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Thu, 31 Dec 2020 17:27:32 -0800 Subject: [PATCH 4/4] nsqd: hostname/user_agent text stats for clients --- bench/bench_reader/bench_reader.go | 5 ++++- bench/bench_writer/bench_writer.go | 9 +++++++++ nsqd/client_v2.go | 11 +++++++++-- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/bench/bench_reader/bench_reader.go b/bench/bench_reader/bench_reader.go index 4eeffd2b5..7febaba65 100644 --- a/bench/bench_reader/bench_reader.go +++ b/bench/bench_reader/bench_reader.go @@ -3,6 +3,7 @@ package main import ( "bufio" "flag" + "fmt" "log" "net" "runtime" @@ -75,7 +76,9 @@ func subWorker(td time.Duration, workers int, tcpAddr string, topic string, chan conn.Write(nsq.MagicV2) rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) ci := make(map[string]interface{}) - ci["client_id"] = "test" + ci["client_id"] = "reader" + ci["hostname"] = "reader" + ci["user_agent"] = fmt.Sprintf("bench_reader/%s", nsq.VERSION) cmd, _ := nsq.Identify(ci) cmd.WriteTo(rw) nsq.Subscribe(topic, channel).WriteTo(rw) diff --git a/bench/bench_writer/bench_writer.go b/bench/bench_writer/bench_writer.go index 249495967..f3cd30ec8 100644 --- a/bench/bench_writer/bench_writer.go +++ b/bench/bench_writer/bench_writer.go @@ -3,6 +3,7 @@ package main import ( "bufio" "flag" + "fmt" "log" "net" "runtime" @@ -77,8 +78,16 @@ func pubWorker(td time.Duration, tcpAddr string, batchSize int, batch [][]byte, } conn.Write(nsq.MagicV2) rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + ci := make(map[string]interface{}) + ci["client_id"] = "writer" + ci["hostname"] = "writer" + ci["user_agent"] = fmt.Sprintf("bench_writer/%s", nsq.VERSION) + cmd, _ := nsq.Identify(ci) + cmd.WriteTo(rw) rdyChan <- 1 <-goChan + rw.Flush() + nsq.ReadResponse(rw) var msgCount int64 endTime := time.Now().Add(td) for { diff --git a/nsqd/client_v2.go b/nsqd/client_v2.go index ee38439f1..c1abc4dab 100644 --- a/nsqd/client_v2.go +++ b/nsqd/client_v2.go @@ -85,6 +85,11 @@ type ClientV2Stats struct { func (s ClientV2Stats) String() string { connectTime := time.Unix(s.ConnectTime, 0) duration := time.Since(connectTime).Truncate(time.Second) + + _, port, _ := net.SplitHostPort(s.RemoteAddress) + id := fmt.Sprintf("%s:%s %s", s.Hostname, port, s.UserAgent) + + // producer if len(s.PubCounts) > 0 { var total uint64 var topicOut []string @@ -94,15 +99,17 @@ func (s ClientV2Stats) String() string { } return fmt.Sprintf("[%s %-21s] msgs: %-8d topics: %s connected: %s", s.Version, - s.ClientID, + id, total, strings.Join(topicOut, ","), duration, ) } + + // consumer return fmt.Sprintf("[%s %-21s] state: %d inflt: %-4d rdy: %-4d fin: %-8d re-q: %-8d msgs: %-8d connected: %s", s.Version, - s.ClientID, + id, s.State, s.InFlightCount, s.ReadyCount,