Skip to content

Commit

Permalink
Merge pull request nsqio#93 from zulily/nsqlookupd_error_on_connect
Browse files Browse the repository at this point in the history
consumer: expose a way to retrieve runtime stats
  • Loading branch information
jehiah committed Nov 24, 2014
2 parents faa636a + 0381505 commit a61f4f4
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
19 changes: 19 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ type FailedMessageLogger interface {
LogFailedMessage(message *Message)
}

// ConsumerStats represents a snapshot of the state of a Consumer's connections and the messages
// it has seen
type ConsumerStats struct {
MessagesReceived uint64
MessagesFinished uint64
MessagesRequeued uint64
Connections int
}

var instCount int64

// Consumer is a high-level type to consume from NSQ.
Expand Down Expand Up @@ -168,6 +177,16 @@ func NewConsumer(topic string, channel string, config *Config) (*Consumer, error
return r, nil
}

// Stats retrieves the current connection and message statistics for a Consumer
func (r *Consumer) Stats() *ConsumerStats {
return &ConsumerStats{
MessagesReceived: atomic.LoadUint64(&r.messagesReceived),
MessagesFinished: atomic.LoadUint64(&r.messagesFinished),
MessagesRequeued: atomic.LoadUint64(&r.messagesRequeued),
Connections: len(r.conns()),
}
}

func (r *Consumer) conns() []*Conn {
r.mtx.RLock()
conns := make([]*Conn, 0, len(r.connections))
Expand Down
17 changes: 17 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ func consumerTest(t *testing.T, cb func(c *Config)) {
t.Fatal(err)
}

stats := q.Stats()
if stats.Connections == 0 {
t.Fatal("stats report 0 connections (should be > 0)")
}

err = q.ConnectToNSQD(addr)
if err == nil {
t.Fatal("should not be able to connect to the same NSQ twice")
Expand All @@ -199,6 +204,18 @@ func consumerTest(t *testing.T, cb func(c *Config)) {

<-q.StopChan

stats = q.Stats()
if stats.Connections != 0 {
t.Fatalf("stats report %d active connections (should be 0)", stats.Connections)
}

stats = q.Stats()
if stats.MessagesReceived != uint64(h.messagesReceived+h.messagesFailed) {
t.Fatalf("stats report %d messages received (should be %d)",
stats.MessagesReceived,
h.messagesReceived+h.messagesFailed)
}

if h.messagesReceived != 8 || h.messagesSent != 4 {
t.Fatalf("end of test. should have handled a diff number of messages (got %d, sent %d)", h.messagesReceived, h.messagesSent)
}
Expand Down

0 comments on commit a61f4f4

Please sign in to comment.