Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsq: fix requeue accounting #805

Closed
wants to merge 1 commit into from
Closed

nsq: fix requeue accounting #805

wants to merge 1 commit into from

Conversation

sdbaiguanghe
Copy link

No description provided.

@mreiferson
Copy link
Member

I think this is correct!

@mreiferson mreiferson changed the title repair statistics nsq: fix requeue accounting Nov 3, 2016
@mreiferson mreiferson added the bug label Nov 3, 2016
@sdbaiguanghe
Copy link
Author

My point is correct or the original?
All of my colleagues think the Channel. Messages should means number of messages published to this channel(PUB & DPUB)
and Channel. Requeued should be the number of REQ received.

@mreiferson
Copy link
Member

Yes, I agree.

Thoughts @jehiah

@jehiah
Copy link
Member

jehiah commented Nov 6, 2016

👍 this means REQ counts are incremented immediately instead of when it's re-added to the queue for processing. That probably makes more intuitive sense anyway.

@@ -275,6 +275,7 @@ func (t *Topic) messagePump() {
chanMsg.deferred = msg.deferred
}
if chanMsg.deferred != 0 {
atomic.AddUint64(&channel.messageCount, 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want this change, though, because when the deferred message is finally published this will be incremented.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But when the defered message finally published the messageCount do not increase.

    if chanMsg.deferred != 0 {
        atomic.AddUint64(&channel.messageCount, 1)
        channel.StartDeferredTimeout(chanMsg, chanMsg.deferred)
        continue
    }

StartDeferredTimeout() will push the message into channel.deferredPQ and when the timeout message poped from channel.deferredPQ it will be published through doRequeue(). The messageCount hasn't changed in the process.
I reviewed the code so many times and verified it in practice

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mreiferson @jehiah Am i wrong?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhh, I missed that this was an increment for the channel message count. Yes, it looks like you are correct. However, it doesn't make sense that the topic is now responsible for atomically incrementing this channel's counter, we should push this responsibility into the channel logic somewhere.

@mreiferson
Copy link
Member

@sdbaiguanghe given that these are regressions, would you mind adding test cases for each of these scenarios?

Thanks!

@mreiferson
Copy link
Member

see #832

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants