Skip to content

Commit

Permalink
Merge pull request #294 from elubow/sampling_admin
Browse files Browse the repository at this point in the history
nsqadmin: add client attributes
  • Loading branch information
mreiferson committed Jan 25, 2014
2 parents d82453c + 1514a6a commit b442615
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 2 deletions.
15 changes: 15 additions & 0 deletions nsqadmin/templates/channel.html.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func init() {
<tr>
<th>Client Host</th>
<th>Protocol</th>
<th>Attributes</th>
<th>NSQd Host</th>
<th>In-Flight</th>
<th>Ready Count</th>
Expand All @@ -201,6 +202,20 @@ func init() {
<tr>
<td>{{.ClientIdentifier}}</td>
<td>{{.ClientVersion}}</td>
<td>
{{if gt .SampleRate 0}}
<span class="label label-info">Sampled {{.SampleRate}}%</span>
{{end}}
{{if .TLS}}
<span class="label label-warning">TLS</span>
{{end}}
{{if .Deflate}}
<span class="label label-default">Delfate</span>
{{end}}
{{if .Snappy}}
<span class="label label-primary">Snappy</span>
{{end}}
</td>
<td><a href="/node/{{.HostAddress}}">{{.HostAddress}}</a></td>
<td>{{.InFlightCount | commafy}}</td>
<td>{{.ReadyCount | commafy}}</td>
Expand Down
18 changes: 16 additions & 2 deletions nsqd/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ type ClientV2 struct {
SampleRate int32
SampleRateUpdateChan chan int32

// states for exposing to nsqadmin
TLS int32
Snappy int32
Deflate int32

// re-usable buffer for reading the 4-byte lengths off the wire
lenBuf [4]byte
lenSlice []byte
Expand Down Expand Up @@ -157,7 +162,10 @@ func (c *ClientV2) Stats() ClientStats {
FinishCount: atomic.LoadUint64(&c.FinishCount),
RequeueCount: atomic.LoadUint64(&c.RequeueCount),
ConnectTime: c.ConnectTime.Unix(),
SampleRate: c.SampleRate,
SampleRate: atomic.LoadInt32(&c.SampleRate),
TLS: atomic.LoadInt32(&c.TLS) == 1,
Deflate: atomic.LoadInt32(&c.Deflate) == 1,
Snappy: atomic.LoadInt32(&c.Snappy) == 1,
}
}

Expand Down Expand Up @@ -330,7 +338,7 @@ func (c *ClientV2) SetSampleRate(sampleRate int32) error {
}

if sampleRate != 0 {
c.SampleRate = sampleRate
atomic.StoreInt32(&c.SampleRate, sampleRate)
select {
case c.SampleRateUpdateChan <- sampleRate:
default:
Expand All @@ -354,6 +362,8 @@ func (c *ClientV2) UpgradeTLS() error {
c.Reader = bufio.NewReaderSize(c.tlsConn, DefaultBufferSize)
c.Writer = bufio.NewWriterSize(c.tlsConn, c.OutputBufferSize)

atomic.StoreInt32(&c.TLS, 1)

return nil
}

Expand All @@ -372,6 +382,8 @@ func (c *ClientV2) UpgradeDeflate(level int) error {
c.flateWriter = fw
c.Writer = bufio.NewWriterSize(fw, c.OutputBufferSize)

atomic.StoreInt32(&c.Deflate, 1)

return nil
}

Expand All @@ -387,6 +399,8 @@ func (c *ClientV2) UpgradeSnappy() error {
c.Reader = bufio.NewReaderSize(snappystream.NewReader(conn, snappystream.SkipVerifyChecksum), DefaultBufferSize)
c.Writer = bufio.NewWriterSize(snappystream.NewWriter(conn), c.OutputBufferSize)

atomic.StoreInt32(&c.Snappy, 1)

return nil
}

Expand Down
3 changes: 3 additions & 0 deletions nsqd/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ type ClientStats struct {
RequeueCount uint64 `json:"requeue_count"`
ConnectTime int64 `json:"connect_ts"`
SampleRate int32 `json:"sample_rate"`
TLS bool `json:"tls"`
Deflate bool `json:"deflate"`
Snappy bool `json:"snappy"`
}

type Topics []*Topic
Expand Down
4 changes: 4 additions & 0 deletions util/lookupd/lookupd.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,10 @@ func GetNSQDStats(nsqdHTTPAddrs []string, selectedTopic string) ([]*TopicStats,
FinishCount: client.Get("finish_count").MustInt64(),
RequeueCount: client.Get("requeue_count").MustInt64(),
MessageCount: client.Get("message_count").MustInt64(),
SampleRate: int32(client.Get("sample_rate").MustInt()),
TLS: client.Get("tls").MustBool(),
Deflate: client.Get("deflate").MustBool(),
Snappy: client.Get("snappy").MustBool(),
}
hostChannelStats.Clients = append(hostChannelStats.Clients, clientInfo)
channelStats.Clients = append(channelStats.Clients, clientInfo)
Expand Down
4 changes: 4 additions & 0 deletions util/lookupd/statsinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ type ClientInfo struct {
FinishCount int64
RequeueCount int64
MessageCount int64
SampleRate int32
TLS bool
Deflate bool
Snappy bool
}

type ChannelStatsList []*ChannelStats
Expand Down

0 comments on commit b442615

Please sign in to comment.