Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: add producer client connections to /stats #881

Merged
merged 3 commits into from
Aug 20, 2018

Conversation

sparklxb
Copy link
Contributor

add /producer_stats endpoint for nsqd. #879

@ploxiln
Copy link
Member

ploxiln commented Apr 10, 2017

I think the official client libraries either only produce or only consume on a single connection to nsqd, so if that requirement was formalized, it would remove the need to add another counter to the client struct ("messages" could be re-used).

It is completely valid, and not uncommon, to publish to multiple different topics from a single producer tcp client connection to nsqd. But often only a single topic will be published to by a process / connection. It would be a useful enhancement to associate a producer client tcp connection with the topic(s) it publishes to, so one can see producer stats for different topics differentiated. Perhaps the most practical way is to limit this association to one topic, and if more than one, then just consider it "multiple".

@sparklxb
Copy link
Contributor Author

@ploxiln I did consider what you had mentioned. If producer client publishs more than one topic, there is more work to do, such as providing separate client struct for topics published by one producer or changing client struct (but make it more dirty or complicated). So just wait for owners' comment.

@mreiferson
Copy link
Member

I think the official client libraries either only produce or only consume on a single connection to nsqd, so if that requirement was formalized, it would remove the need to add another counter to the client struct ("messages" could be re-used).

Yea, but it's not a real restriction so I don't think we can reasonably do that.

It is completely valid, and not uncommon, to publish to multiple different topics from a single producer tcp client connection to nsqd.

Feels like we can defer tracking stats at that granularity for this first pass.

@mreiferson mreiferson changed the title nsqd/nsqdmin: introspection of producer connections nsqd/nsqadmin: introspection of producer connections Apr 11, 2017
@@ -346,6 +349,10 @@ func (c *clientV2) SendingMessage() {
atomic.AddUint64(&c.MessageCount, 1)
}

func (c *clientV2) PublishMessage(msgNum uint64) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Publish/Published

nsqd/http.go Outdated
@@ -52,6 +52,7 @@ func newHTTPServer(ctx *context, tlsEnabled bool, tlsRequired bool) *httpServer
router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1))
router.Handle("POST", "/mpub", http_api.Decorate(s.doMPUB, http_api.V1))
router.Handle("GET", "/stats", http_api.Decorate(s.doStats, log, http_api.V1))
router.Handle("GET", "/producer_stats", http_api.Decorate(s.doProducerStats, log, http_api.V1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still strongly in favor of keeping this all in /stats, perhaps with a flag to enable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A flag to return one or all?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think he means "all", e.g. a flag/param like "with_producers=true" or "with_producers=1" or "producers=1" (brevity to taste ;).

If there was (minimal) channel information associated with the producers, then this new flag could combine with the existing "topic" flag in the logical way.

nsqd/nsqd.go Outdated
@@ -51,7 +55,8 @@ type NSQD struct {
errValue atomic.Value
startTime time.Time

topicMap map[string]*Topic
topicMap map[string]*Topic
producers map[int64]Producer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling this variable producers seems misleading because we can't really know or categorize for sure. I'd rather call this clients.

@@ -796,6 +797,9 @@ func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error())
}

p.ctx.nsqd.AddClient(client.ID, client)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we add the client every time here, can't we do this once when any client connects? This relates to my comments above about variable naming.

Copy link
Contributor Author

@sparklxb sparklxb Apr 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we can't make sure if a client is a producer, until it's first PUB or MPUB.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what mreiferson is suggesting is to keep a set of all clients (added to the set when they connect). Then, when generating publisher statistics, you can iterate over all clients and include them only if they have published any messages.

nsqd/nsqd.go Outdated
// AddClient adds a client to the producers map
func (n *NSQD) AddClient(clientID int64, client Producer) {
n.Lock()
defer n.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we're really going to have to call AddClient() on every single PUB, we can make it a bit more efficient by not using defer

Copy link
Member

@ploxiln ploxiln Apr 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... but if there was another client property, from which it could be determined if the client connection had already published anything, AddClient() could usually be skipped

EDIT: possibly obsoleted by comments above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can check if PublishCount property I added is greater than 0.

@ploxiln
Copy link
Member

ploxiln commented Apr 11, 2017

Just to be clear, @mreiferson, I proposed not tracking multiple topics per publisher, just one. Are you agreeing with that, or saying we should never track any topic for publisher stats (in this "first pass")?

@mreiferson
Copy link
Member

@ploxiln I'm suggesting that, for now, perhaps it's easiest to land something that doesn't track the destination topic at all (but iterate on that after).

But, as I'm writing this, I'm realizing that really wouldn't be all that helpful for the use case here.

@@ -346,6 +353,10 @@ func (c *clientV2) SendingMessage() {
atomic.AddUint64(&c.MessageCount, 1)
}

func (c *clientV2) MessagesPublished(msgNum uint64) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call this PublishedMessage

nsqd/http.go Outdated
jsonFormat := formatString == "json"

var producersStats []ClientStats
if withProducers == "1" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 seems like an overly specific value... how about if it's a non-empty string instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's sometimes confusing if "0" and "no" and "false" are considered "true", and awkward if you need to override a parameter with a blank string e.g. in your own helper functions. So I humbly suggest checking against "1", "yes", and "true", perhaps case-insensitive.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a fair point, 👍 to 1, yes, and true case-insensitive

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, with that, I would argue that the name of the param should be producers, not with_producers.

@sparklxb
Copy link
Contributor Author

Ready for review and follow-up for nsqadmin. By the way, certificates under the nsqd/test directory has expired, so some test cases fail. please make a update.

nsqd/nsqd.go Outdated

_, ok := n.clients[clientID]
if !ok {
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlock also needed here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

nsqd/nsqd.go Outdated
// AddClient adds a client to the producers map
func (n *NSQD) AddClient(clientID int64, client Client) {
n.Lock()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whitespace 👮

nsqd/nsqd.go Outdated
// RemoveClient removes a client from the producers map
func (n *NSQD) RemoveClient(clientID int64) {
n.Lock()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whitespace 👮

@mreiferson
Copy link
Member

This is looking good to me after those minor nitpicks.

@ploxiln
Copy link
Member

ploxiln commented Apr 17, 2017

mreiferson updated the certificates. Rebasing on master will fix those test failures.

mreiferson
mreiferson previously approved these changes Oct 2, 2017
@mreiferson
Copy link
Member

I've rebased this, waiting for tests to pass.

@mreiferson
Copy link
Member

mreiferson commented Oct 2, 2017

Given that we're marching towards an actual 1.0 release, I want to be really careful of the new interfaces we add prior to that, since we won't have as much of a chance to work and play with them in production.

In that vein, after thinking more about this, it's certainly a good start but it seems like what one would really want is to understand which topics a producer is publishing too, right?

@mreiferson mreiferson changed the title nsqd/nsqadmin: introspection of producer connections nsqd: add producer client connections to /stats Jul 31, 2018
@mreiferson
Copy link
Member

I've resurrected this PR, rebased, and implemented this from my last comment:

In that vein, after thinking more about this, it's certainly a good start but it seems like what one would really want is to understand which topics a producer is publishing too, right?

WDYT @sparklxb @ploxiln @jehiah ?

@mreiferson
Copy link
Member

/stats output

nsqd v1.1.0-rc1 (built w/go1.10.3)
start_time 2018-08-01T15:52:49-07:00
uptime 24.680560278s

Health: OK

Memory:
   heap_objects                 8393
   heap_idle_bytes              360448
   heap_in_use_bytes            2195456
   heap_released_bytes          0
   gc_pause_usec_100            0
   gc_pause_usec_99             0
   gc_pause_usec_95             0
   next_gc_bytes                4473924
   gc_total_runs                0

Topics:
   [test           ] depth: 4     be-depth: 4     msgs: 0        e2e%:

   [test2          ] depth: 2     be-depth: 1     msgs: 1        e2e%:

   [test3          ] depth: 2     be-depth: 1     msgs: 1        e2e%:

Producers:     
   [V2 PROSNAKES            ] msgs: 1        connected: 10s
      [test2          ] msgs: 1

   [V2 PROSNAKES            ] msgs: 1        connected: 8s
      [test3          ] msgs: 1

/stats?format=json output

{
  "version": "1.1.0-rc1",
  "health": "OK",
  "start_time": 1533163969,
  "topics": [
    {
      "topic_name": "test",
      "channels": [],
      "depth": 4,
      "backend_depth": 4,
      "message_count": 0,
      "paused": false,
      "e2e_processing_latency": {
        "count": 0,
        "percentiles": null
      }
    },
    {
      "topic_name": "test2",
      "channels": [],
      "depth": 2,
      "backend_depth": 1,
      "message_count": 1,
      "paused": false,
      "e2e_processing_latency": {
        "count": 0,
        "percentiles": null
      }
    },
    {
      "topic_name": "test3",
      "channels": [],
      "depth": 2,
      "backend_depth": 1,
      "message_count": 1,
      "paused": false,
      "e2e_processing_latency": {
        "count": 0,
        "percentiles": null
      }
    }
  ],
  "memory": {
    "heap_objects": 9671,
    "heap_idle_bytes": 311296,
    "heap_in_use_bytes": 2244608,
    "heap_released_bytes": 0,
    "gc_pause_usec_100": 0,
    "gc_pause_usec_99": 0,
    "gc_pause_usec_95": 0,
    "next_gc_bytes": 4473924,
    "gc_total_runs": 0
  },
  "producers": [
    {
      "client_id": "PROSNAKES",
      "hostname": "PROSNAKES.local",
      "version": "V2",
      "remote_address": "127.0.0.1:61367",
      "state": 0,
      "ready_count": 0,
      "in_flight_count": 0,
      "message_count": 0,
      "finish_count": 0,
      "requeue_count": 0,
      "connect_ts": 1533163986,
      "sample_rate": 0,
      "deflate": false,
      "snappy": false,
      "user_agent": "to_nsq/1.1.0-rc1 go-nsq/1.0.6",
      "pub_counts": [
        {
          "topic": "test3",
          "count": 1
        }
      ],
      "tls": false,
      "tls_cipher_suite": "",
      "tls_version": "",
      "tls_negotiated_protocol": "",
      "tls_negotiated_protocol_is_mutual": false
    }
  ]
}

@mreiferson mreiferson dismissed their stale review August 1, 2018 23:59

I am now the owner

func (c *clientV2) PublishedMessage(topic string, count uint64) {
c.metaLock.Lock()
counter, ok := c.pubCounts[topic]
if ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking up a missing key in a golang map returns the zero value ... it would work, but would it be bad style, to just:

c.pubCounts[topic] += count

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@ploxiln
Copy link
Member

ploxiln commented Aug 2, 2018

overall looks great

@mreiferson
Copy link
Member

mreiferson commented Aug 2, 2018

I'll squash this down, but I don't want to land this until we stamp stable 1.1.0.

@ploxiln
Copy link
Member

ploxiln commented Aug 2, 2018

you mean "stable 1.1.0" right :)
sounds good

@ploxiln
Copy link
Member

ploxiln commented Aug 2, 2018

there seems to be a possible data race

WARNING: DATA RACE
Read at 0x00c420320330 by goroutine 85:
  github.com/nsqio/nsq/nsqd.(*Channel).flush()
      /home/travis/gopath/src/github.com/nsqio/nsq/nsqd/channel.go:212 +0xb50
  github.com/nsqio/nsq/nsqd.(*Channel).exit()
      /home/travis/gopath/src/github.com/nsqio/nsq/nsqd/channel.go:182 +0x3dc
  github.com/nsqio/nsq/nsqd.(*Channel).Close()
      /home/travis/gopath/src/github.com/nsqio/nsq/nsqd/channel.go:147 +0x3d
  github.com/nsqio/nsq/nsqd.(*Topic).exit()
      /home/travis/gopath/src/github.com/nsqio/nsq/nsqd/topic.go:373 +0x4dd
  github.com/nsqio/nsq/nsqd.(*Topic).Close()
      /home/travis/gopath/src/github.com/nsqio/nsq/nsqd/topic.go:335 +0x3d
  github.com/nsqio/nsq/nsqd.(*NSQD).Exit()
      /home/travis/gopath/src/github.com/nsqio/nsq/nsqd/nsqd.go:497 +0x298
  github.com/nsqio/nsq/nsqd.TestTouch()
      /home/travis/gopath/src/github.com/nsqio/nsq/nsqd/protocol_v2_test.go:645 +0xdb5
  testing.tRunner()
      /home/travis/.gimme/versions/go1.8.7.linux.amd64/src/testing/testing.go:657 +0x107
Previous write at 0x00c420320330 by goroutine 49:
  runtime.mapdelete()
      /home/travis/.gimme/versions/go1.8.7.linux.amd64/src/runtime/hashmap.go:598 +0x0
  github.com/nsqio/nsq/nsqd.(*Channel).popInFlightMessage()
      /home/travis/gopath/src/github.com/nsqio/nsq/nsqd/channel.go:468 +0x202
  github.com/nsqio/nsq/nsqd.(*Channel).FinishMessage()
      /home/travis/gopath/src/github.com/nsqio/nsq/nsqd/channel.go:350 +0x50
  github.com/nsqio/nsq/nsqd.(*protocolV2).FIN()
      /home/travis/gopath/src/github.com/nsqio/nsq/nsqd/protocol_v2.go:687 +0x457
  github.com/nsqio/nsq/nsqd.(*protocolV2).Exec()
      /home/travis/gopath/src/github.com/nsqio/nsq/nsqd/protocol_v2.go:177 +0x13d2
  github.com/nsqio/nsq/nsqd.(*protocolV2).IOLoop()
      /home/travis/gopath/src/github.com/nsqio/nsq/nsqd/protocol_v2.go:84 +0x634
  github.com/nsqio/nsq/nsqd.(*tcpServer).Handle()
      /home/travis/gopath/src/github.com/nsqio/nsq/nsqd/tcp.go:43 +0x86f

@mreiferson
Copy link
Member

you mean "stable 1.1.0" right :)

yep, edited

there seems to be a possible data race

I'll take a look

@mreiferson
Copy link
Member

mreiferson commented Aug 2, 2018

It does look like there is indeed a race, but it's in an entirely unrelated code path. It's interesting that we've never observed it before, and that does indicate that this change might have made it more likely, but then again it did happen on an i386 and go 1.8.3 build.

IMO we can fix it separately if it continues to rear its ugly head.

@ploxiln
Copy link
Member

ploxiln commented Aug 2, 2018

agreed, we can fix separately if we ever see it again

@mreiferson mreiferson merged commit e0e08a2 into nsqio:master Aug 20, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants