diff --git a/apps/nsq_stat/nsq_stat.go b/apps/nsq_stat/nsq_stat.go index b84ebf9db..ceb14703e 100644 --- a/apps/nsq_stat/nsq_stat.go +++ b/apps/nsq_stat/nsq_stat.go @@ -73,12 +73,12 @@ func statLoop(interval time.Duration, connectTimeout time.Duration, requestTimeo log.Fatalf("ERROR: failed to get topic producers - %s", err) } - _, allChannelStats, err := ci.GetNSQDStats(producers, topic) + _, channelStats, err := ci.GetNSQDStats(producers, topic, channel) if err != nil { log.Fatalf("ERROR: failed to get nsqd stats - %s", err) } - c, ok := allChannelStats[channel] + c, ok := channelStats[channel] if !ok { log.Fatalf("ERROR: failed to find channel(%s) in stats metadata for topic(%s)", channel, topic) } diff --git a/internal/clusterinfo/data.go b/internal/clusterinfo/data.go index 0cea856a1..b1bb9bafe 100644 --- a/internal/clusterinfo/data.go +++ b/internal/clusterinfo/data.go @@ -450,7 +450,7 @@ func (c *ClusterInfo) GetNSQDTopicProducers(topic string, nsqdHTTPAddrs []string go func(addr string) { defer wg.Done() - endpoint := fmt.Sprintf("http://%s/stats?format=json", addr) + endpoint := fmt.Sprintf("http://%s/stats?format=json&topic=%s", addr, topic) c.logf("CI: querying nsqd %s", endpoint) var statsResp statsRespType @@ -528,9 +528,10 @@ func (c *ClusterInfo) GetNSQDTopicProducers(topic string, nsqdHTTPAddrs []string // GetNSQDStats returns aggregate topic and channel stats from the given Producers // +// if selectedChannel is empty, this will return stats for topic/channel // if selectedTopic is empty, this will return stats for *all* topic/channels // and the ChannelStats dict will be keyed by topic + ':' + channel -func (c *ClusterInfo) GetNSQDStats(producers Producers, selectedTopic string) ([]*TopicStats, map[string]*ChannelStats, error) { +func (c *ClusterInfo) GetNSQDStats(producers Producers, selectedTopic string, selectedChannel string) ([]*TopicStats, map[string]*ChannelStats, error) { var lock sync.Mutex var wg sync.WaitGroup var topicStatsList TopicStatsList @@ -548,7 +549,16 @@ func (c *ClusterInfo) GetNSQDStats(producers Producers, selectedTopic string) ([ defer wg.Done() addr := p.HTTPAddress() - endpoint := fmt.Sprintf("http://%s/stats?format=json", addr) + + var endpoint string + if len(selectedChannel) != 0 { + endpoint = fmt.Sprintf("http://%s/stats?format=json&topic=%s&channel=%s", addr, selectedTopic, selectedChannel) + } else if len(selectedTopic) != 0 { + endpoint = fmt.Sprintf("http://%s/stats?format=json&topic=%s", addr, selectedTopic) + } else { + endpoint = fmt.Sprintf("http://%s/stats?format=json", addr) + } + c.logf("CI: querying nsqd %s", endpoint) var resp respType diff --git a/nsqadmin/http.go b/nsqadmin/http.go index e1fd2b137..8c60e740d 100644 --- a/nsqadmin/http.go +++ b/nsqadmin/http.go @@ -251,7 +251,7 @@ func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps h s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) messages = append(messages, pe.Error()) } - topicStats, _, err := s.ci.GetNSQDStats(producers, topicName) + topicStats, _, err := s.ci.GetNSQDStats(producers, topicName, "") if err != nil { pe, ok := err.(clusterinfo.PartialErr) if !ok { @@ -291,7 +291,7 @@ func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) messages = append(messages, pe.Error()) } - _, allChannelStats, err := s.ci.GetNSQDStats(producers, topicName) + _, channelStats, err := s.ci.GetNSQDStats(producers, topicName, channelName) if err != nil { pe, ok := err.(clusterinfo.PartialErr) if !ok { @@ -305,7 +305,7 @@ func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps return struct { *clusterinfo.ChannelStats Message string `json:"message"` - }{allChannelStats[channelName], maybeWarnMsg(messages)}, nil + }{channelStats[channelName], maybeWarnMsg(messages)}, nil } func (s *httpServer) nodesHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { @@ -349,7 +349,7 @@ func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps ht return nil, http_api.Err{404, "NODE_NOT_FOUND"} } - topicStats, _, err := s.ci.GetNSQDStats(clusterinfo.Producers{producer}, "") + topicStats, _, err := s.ci.GetNSQDStats(clusterinfo.Producers{producer}, "", "") if err != nil { s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get nsqd stats - %s", err) return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)} @@ -631,7 +631,7 @@ func (s *httpServer) counterHandler(w http.ResponseWriter, req *http.Request, ps s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) messages = append(messages, pe.Error()) } - _, channelStats, err := s.ci.GetNSQDStats(producers, "") + _, channelStats, err := s.ci.GetNSQDStats(producers, "", "") if err != nil { pe, ok := err.(clusterinfo.PartialErr) if !ok { diff --git a/nsqd/http.go b/nsqd/http.go index 5e38cab09..48d1745ad 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -494,7 +494,7 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro channelName, _ := reqParams.Get("channel") jsonFormat := formatString == "json" - stats := s.ctx.nsqd.GetStats() + stats := s.ctx.nsqd.GetStats(topicName, channelName) health := s.ctx.nsqd.GetHealth() startTime := s.ctx.nsqd.GetStartTime() uptime := time.Since(startTime) diff --git a/nsqd/stats.go b/nsqd/stats.go index 53dd18fa4..57d7cfa7a 100644 --- a/nsqd/stats.go +++ b/nsqd/stats.go @@ -113,20 +113,32 @@ type ChannelsByName struct { func (c ChannelsByName) Less(i, j int) bool { return c.Channels[i].name < c.Channels[j].name } -func (n *NSQD) GetStats() []TopicStats { +func (n *NSQD) GetStats(topic string, channel string) []TopicStats { n.RLock() - realTopics := make([]*Topic, 0, len(n.topicMap)) - for _, t := range n.topicMap { - realTopics = append(realTopics, t) + var realTopics []*Topic + if len(topic) == 0 { + realTopics = make([]*Topic, 0, len(n.topicMap)) + for _, t := range n.topicMap { + realTopics = append(realTopics, t) + } + } else { + realTopics = make([]*Topic, 0, 1) + realTopics = append(realTopics, n.topicMap[topic]) } n.RUnlock() sort.Sort(TopicsByName{realTopics}) topics := make([]TopicStats, 0, len(realTopics)) for _, t := range realTopics { t.RLock() - realChannels := make([]*Channel, 0, len(t.channelMap)) - for _, c := range t.channelMap { - realChannels = append(realChannels, c) + var realChannels []*Channel + if len(channel) == 0 { + realChannels = make([]*Channel, 0, len(t.channelMap)) + for _, c := range t.channelMap { + realChannels = append(realChannels, c) + } + } else { + realChannels = make([]*Channel, 0, 1) + realChannels = append(realChannels, t.channelMap[channel]) } t.RUnlock() sort.Sort(ChannelsByName{realChannels}) diff --git a/nsqd/stats_test.go b/nsqd/stats_test.go index 5e5939baa..878f4a282 100644 --- a/nsqd/stats_test.go +++ b/nsqd/stats_test.go @@ -32,7 +32,7 @@ func TestStats(t *testing.T) { identify(t, conn, nil, frameTypeResponse) sub(t, conn, topicName, "ch") - stats := nsqd.GetStats() + stats := nsqd.GetStats(topicName, "ch") t.Logf("stats: %+v", stats) test.Equal(t, 1, len(stats)) diff --git a/nsqd/statsd.go b/nsqd/statsd.go index f532221be..f6c3b492b 100644 --- a/nsqd/statsd.go +++ b/nsqd/statsd.go @@ -40,7 +40,7 @@ func (n *NSQD) statsdLoop() { n.logf(LOG_INFO, "STATSD: pushing stats to %s", client) - stats := n.GetStats() + stats := n.GetStats("", "") for _, topic := range stats { // try to find the topic in the last collection lastTopic := TopicStats{}