-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
nsqadmin/nsqd: optimize /stats #925
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -251,7 +251,7 @@ func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps h | |
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) | ||
messages = append(messages, pe.Error()) | ||
} | ||
topicStats, _, err := s.ci.GetNSQDStats(producers, topicName) | ||
topicStats, _, err := s.ci.GetNSQDStats(producers, topicName, "") | ||
if err != nil { | ||
pe, ok := err.(clusterinfo.PartialErr) | ||
if !ok { | ||
|
@@ -291,7 +291,7 @@ func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps | |
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) | ||
messages = append(messages, pe.Error()) | ||
} | ||
_, allChannelStats, err := s.ci.GetNSQDStats(producers, topicName) | ||
_, allChannelStats, err := s.ci.GetNSQDStats(producers, topicName, channelName) | ||
if err != nil { | ||
pe, ok := err.(clusterinfo.PartialErr) | ||
if !ok { | ||
|
@@ -349,7 +349,7 @@ func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps ht | |
return nil, http_api.Err{404, "NODE_NOT_FOUND"} | ||
} | ||
|
||
topicStats, _, err := s.ci.GetNSQDStats(clusterinfo.Producers{producer}, "") | ||
topicStats, _, err := s.ci.GetNSQDStats(clusterinfo.Producers{producer}, "", "") | ||
if err != nil { | ||
s.ctx.nsqadmin.logf(LOG_ERROR, "failed to get nsqd stats - %s", err) | ||
return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)} | ||
|
@@ -359,7 +359,11 @@ func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps ht | |
var totalMessages int64 | ||
for _, ts := range topicStats { | ||
for _, cs := range ts.Channels { | ||
totalClients += int64(len(cs.Clients)) | ||
if len(cs.Clients) > 0 { | ||
totalClients += int64(len(cs.Clients)) | ||
} else { | ||
totalClients += int64(cs.ClientCount) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rather than baking this logic into the client each time, we should just always return |
||
} | ||
totalMessages += ts.MessageCount | ||
} | ||
|
@@ -631,7 +635,7 @@ func (s *httpServer) counterHandler(w http.ResponseWriter, req *http.Request, ps | |
s.ctx.nsqadmin.logf(LOG_WARN, "%s", err) | ||
messages = append(messages, pe.Error()) | ||
} | ||
_, channelStats, err := s.ci.GetNSQDStats(producers, "") | ||
_, channelStats, err := s.ci.GetNSQDStats(producers, "", "") | ||
if err != nil { | ||
pe, ok := err.(clusterinfo.PartialErr) | ||
if !ok { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -494,7 +494,24 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro | |
channelName, _ := reqParams.Get("channel") | ||
jsonFormat := formatString == "json" | ||
|
||
stats := s.ctx.nsqd.GetStats() | ||
var ( | ||
state TopicStats | ||
stats []TopicStats | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the |
||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor: we don't typically declare variables like this, we instead prefer |
||
if topicName == "" { | ||
stats = s.ctx.nsqd.GetStats() | ||
} else if channelName == "" { | ||
state, err = s.ctx.nsqd.GetTopicStats(topicName) | ||
} else { | ||
state, err = s.ctx.nsqd.GetChannelStats(topicName, channelName) | ||
} | ||
if nil != err { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor: we typically prefer the variable in question to be on the left side of the comparison, e.g. |
||
s.ctx.nsqd.logf(LOG_ERROR, "failed to get stats - %s", err) | ||
return nil, http_api.Err{404, "topic/channel is not found"} | ||
} else if nil == stats { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't need to be an |
||
stats = []TopicStats{state} | ||
} | ||
|
||
health := s.ctx.nsqd.GetHealth() | ||
startTime := s.ctx.nsqd.GetStartTime() | ||
uptime := time.Since(startTime) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After these changes above, Is any of the topic/channel looping code necessary below this line? |
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -387,7 +387,7 @@ func TestReconfigure(t *testing.T) { | |
nsqd.triggerOptsNotification() | ||
test.Equal(t, 2, len(nsqd.getOpts().NSQLookupdTCPAddresses)) | ||
|
||
time.Sleep(200 * time.Millisecond) | ||
time.Sleep(300 * time.Millisecond) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
var lookupPeers []string | ||
for _, lp := range nsqd.lookupPeers.Load().([]*lookupPeer) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
package nsqd | ||
|
||
import ( | ||
"errors" | ||
"runtime" | ||
"sort" | ||
"sync/atomic" | ||
|
@@ -40,6 +41,7 @@ type ChannelStats struct { | |
DeferredCount int `json:"deferred_count"` | ||
MessageCount uint64 `json:"message_count"` | ||
RequeueCount uint64 `json:"requeue_count"` | ||
ClientCount int `json:"client_count"` | ||
TimeoutCount uint64 `json:"timeout_count"` | ||
Clients []ClientStats `json:"clients"` | ||
Paused bool `json:"paused"` | ||
|
@@ -56,6 +58,7 @@ func NewChannelStats(c *Channel, clients []ClientStats) ChannelStats { | |
DeferredCount: len(c.deferredMessages), | ||
MessageCount: atomic.LoadUint64(&c.messageCount), | ||
RequeueCount: atomic.LoadUint64(&c.requeueCount), | ||
ClientCount: len(c.clients), | ||
TimeoutCount: atomic.LoadUint64(&c.timeoutCount), | ||
Clients: clients, | ||
Paused: c.IsPaused(), | ||
|
@@ -145,6 +148,75 @@ func (n *NSQD) GetStats() []TopicStats { | |
return topics | ||
} | ||
|
||
func (n *NSQD) GetTopicStats(topic string) (TopicStats, error) { | ||
n.RLock() | ||
var realTopic *Topic | ||
for _, t := range n.topicMap { | ||
if topic == t.name { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same applies to function below |
||
realTopic = t | ||
break | ||
} | ||
} | ||
n.RUnlock() | ||
|
||
if nil == realTopic { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same applies to function below |
||
return TopicStats{}, errors.New("the topic is not found.") | ||
} else { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since the above block returns, we can drop the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same applies to function below |
||
realTopic.RLock() | ||
realChannels := make([]*Channel, 0, len(realTopic.channelMap)) | ||
for _, c := range realTopic.channelMap { | ||
realChannels = append(realChannels, c) | ||
} | ||
realTopic.RUnlock() | ||
sort.Sort(ChannelsByName{realChannels}) | ||
channels := make([]ChannelStats, 0, len(realChannels)) | ||
for _, c := range realChannels { | ||
channels = append(channels, NewChannelStats(c, nil)) | ||
} | ||
topics := NewTopicStats(realTopic, channels) | ||
return topics, nil | ||
} | ||
} | ||
|
||
func (n *NSQD) GetChannelStats(topic, channel string) (TopicStats, error) { | ||
n.RLock() | ||
var realTopic *Topic | ||
for _, t := range n.topicMap { | ||
if topic == t.name { | ||
realTopic = t | ||
break | ||
} | ||
} | ||
n.RUnlock() | ||
|
||
if nil == realTopic { | ||
return TopicStats{}, errors.New("the topic is not found.") | ||
} else { | ||
realTopic.RLock() | ||
realChannels := make([]*Channel, 0, len(realTopic.channelMap)) | ||
for _, c := range realTopic.channelMap { | ||
realChannels = append(realChannels, c) | ||
} | ||
realTopic.RUnlock() | ||
sort.Sort(ChannelsByName{realChannels}) | ||
channels := make([]ChannelStats, 0, 1) | ||
for _, c := range realChannels { | ||
if c.name == channel { | ||
c.RLock() | ||
clients := make([]ClientStats, 0, len(c.clients)) | ||
for _, client := range c.clients { | ||
clients = append(clients, client.Stats()) | ||
} | ||
c.RUnlock() | ||
channels = append(channels, NewChannelStats(c, clients)) | ||
break | ||
} | ||
} | ||
topics := NewTopicStats(realTopic, channels) | ||
return topics, nil | ||
} | ||
} | ||
|
||
type memStats struct { | ||
HeapObjects uint64 `json:"heap_objects"` | ||
HeapIdleBytes uint64 `json:"heap_idle_bytes"` | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/selectChannel/selectedChannel/