Skip to content

Commit

Permalink
refactor stats query for nsqadmin
Browse files Browse the repository at this point in the history
  • Loading branch information
andyxning committed Aug 17, 2017
1 parent e8e1040 commit d0ae961
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 20 deletions.
4 changes: 2 additions & 2 deletions apps/nsq_stat/nsq_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ func statLoop(interval time.Duration, connectTimeout time.Duration, requestTimeo
log.Fatalf("ERROR: failed to get topic producers - %s", err)
}

_, allChannelStats, err := ci.GetNSQDStats(producers, topic)
_, channelStats, err := ci.GetNSQDStats(producers, topic, channel)
if err != nil {
log.Fatalf("ERROR: failed to get nsqd stats - %s", err)
}

c, ok := allChannelStats[channel]
c, ok := channelStats[channel]
if !ok {
log.Fatalf("ERROR: failed to find channel(%s) in stats metadata for topic(%s)", channel, topic)
}
Expand Down
16 changes: 13 additions & 3 deletions internal/clusterinfo/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func (c *ClusterInfo) GetNSQDTopicProducers(topic string, nsqdHTTPAddrs []string
go func(addr string) {
defer wg.Done()

endpoint := fmt.Sprintf("http://%s/stats?format=json", addr)
endpoint := fmt.Sprintf("http://%s/stats?format=json&topic=%s", addr, topic)
c.logf("CI: querying nsqd %s", endpoint)

var statsResp statsRespType
Expand Down Expand Up @@ -528,9 +528,10 @@ func (c *ClusterInfo) GetNSQDTopicProducers(topic string, nsqdHTTPAddrs []string

// GetNSQDStats returns aggregate topic and channel stats from the given Producers
//
// if selectedChannel is empty, this will return stats for topic/channel
// if selectedTopic is empty, this will return stats for *all* topic/channels
// and the ChannelStats dict will be keyed by topic + ':' + channel
func (c *ClusterInfo) GetNSQDStats(producers Producers, selectedTopic string) ([]*TopicStats, map[string]*ChannelStats, error) {
func (c *ClusterInfo) GetNSQDStats(producers Producers, selectedTopic string, selectedChannel string) ([]*TopicStats, map[string]*ChannelStats, error) {
var lock sync.Mutex
var wg sync.WaitGroup
var topicStatsList TopicStatsList
Expand All @@ -548,7 +549,16 @@ func (c *ClusterInfo) GetNSQDStats(producers Producers, selectedTopic string) ([
defer wg.Done()

addr := p.HTTPAddress()
endpoint := fmt.Sprintf("http://%s/stats?format=json", addr)

var endpoint string
if len(selectedChannel) != 0 {
endpoint = fmt.Sprintf("http://%s/stats?format=json&topic=%s&channel=%s", addr, selectedTopic, selectedChannel)
} else if len(selectedTopic) != 0 {
endpoint = fmt.Sprintf("http://%s/stats?format=json&topic=%s", addr, selectedTopic)
} else {
endpoint = fmt.Sprintf("http://%s/stats?format=json", addr)
}

c.logf("CI: querying nsqd %s", endpoint)

var resp respType
Expand Down
10 changes: 5 additions & 5 deletions nsqadmin/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps h
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err)
messages = append(messages, pe.Error())
}
topicStats, _, err := s.ci.GetNSQDStats(producers, topicName)
topicStats, _, err := s.ci.GetNSQDStats(producers, topicName, "")
if err != nil {
pe, ok := err.(clusterinfo.PartialErr)
if !ok {
Expand Down Expand Up @@ -291,7 +291,7 @@ func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err)
messages = append(messages, pe.Error())
}
_, allChannelStats, err := s.ci.GetNSQDStats(producers, topicName)
_, channelStats, err := s.ci.GetNSQDStats(producers, topicName, channelName)
if err != nil {
pe, ok := err.(clusterinfo.PartialErr)
if !ok {
Expand All @@ -305,7 +305,7 @@ func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps
return struct {
*clusterinfo.ChannelStats
Message string `json:"message"`
}{allChannelStats[channelName], maybeWarnMsg(messages)}, nil
}{channelStats[channelName], maybeWarnMsg(messages)}, nil
}

func (s *httpServer) nodesHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
Expand Down Expand Up @@ -349,7 +349,7 @@ func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps ht
return nil, http_api.Err{404, "NODE_NOT_FOUND"}
}

topicStats, _, err := s.ci.GetNSQDStats(clusterinfo.Producers{producer}, "")
topicStats, _, err := s.ci.GetNSQDStats(clusterinfo.Producers{producer}, "", "")
if err != nil {
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get nsqd stats - %s", err)
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
Expand Down Expand Up @@ -631,7 +631,7 @@ func (s *httpServer) counterHandler(w http.ResponseWriter, req *http.Request, ps
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err)
messages = append(messages, pe.Error())
}
_, channelStats, err := s.ci.GetNSQDStats(producers, "")
_, channelStats, err := s.ci.GetNSQDStats(producers, "", "")
if err != nil {
pe, ok := err.(clusterinfo.PartialErr)
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion nsqd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro
channelName, _ := reqParams.Get("channel")
jsonFormat := formatString == "json"

stats := s.ctx.nsqd.GetStats()
stats := s.ctx.nsqd.GetStats(topicName, channelName)
health := s.ctx.nsqd.GetHealth()
startTime := s.ctx.nsqd.GetStartTime()
uptime := time.Since(startTime)
Expand Down
26 changes: 19 additions & 7 deletions nsqd/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,20 +113,32 @@ type ChannelsByName struct {

func (c ChannelsByName) Less(i, j int) bool { return c.Channels[i].name < c.Channels[j].name }

func (n *NSQD) GetStats() []TopicStats {
func (n *NSQD) GetStats(topic string, channel string) []TopicStats {
n.RLock()
realTopics := make([]*Topic, 0, len(n.topicMap))
for _, t := range n.topicMap {
realTopics = append(realTopics, t)
var realTopics []*Topic
if len(topic) == 0 {
realTopics = make([]*Topic, 0, len(n.topicMap))
for _, t := range n.topicMap {
realTopics = append(realTopics, t)
}
} else {
realTopics = make([]*Topic, 0, 1)
realTopics = append(realTopics, n.topicMap[topic])
}
n.RUnlock()
sort.Sort(TopicsByName{realTopics})
topics := make([]TopicStats, 0, len(realTopics))
for _, t := range realTopics {
t.RLock()
realChannels := make([]*Channel, 0, len(t.channelMap))
for _, c := range t.channelMap {
realChannels = append(realChannels, c)
var realChannels []*Channel
if len(channel) == 0 {
realChannels = make([]*Channel, 0, len(t.channelMap))
for _, c := range t.channelMap {
realChannels = append(realChannels, c)
}
} else {
realChannels = make([]*Channel, 0, 1)
realChannels = append(realChannels, t.channelMap[channel])
}
t.RUnlock()
sort.Sort(ChannelsByName{realChannels})
Expand Down
2 changes: 1 addition & 1 deletion nsqd/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestStats(t *testing.T) {
identify(t, conn, nil, frameTypeResponse)
sub(t, conn, topicName, "ch")

stats := nsqd.GetStats()
stats := nsqd.GetStats(topicName, "ch")
t.Logf("stats: %+v", stats)

test.Equal(t, 1, len(stats))
Expand Down
2 changes: 1 addition & 1 deletion nsqd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (n *NSQD) statsdLoop() {

n.logf(LOG_INFO, "STATSD: pushing stats to %s", client)

stats := n.GetStats()
stats := n.GetStats("", "")
for _, topic := range stats {
// try to find the topic in the last collection
lastTopic := TopicStats{}
Expand Down

0 comments on commit d0ae961

Please sign in to comment.