Skip to content

Commit

Permalink
Merge pull request #390 from mreiferson/stuck_in_flight_390
Browse files Browse the repository at this point in the history
nsqd: messages stuck in-flight
  • Loading branch information
jehiah committed Jul 17, 2014
2 parents e62dc51 + 9412895 commit 0c33ce3
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 9 deletions.
2 changes: 2 additions & 0 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,9 @@ func (c *Channel) inFlightWorker() {
break
}
atomic.AddUint64(&c.timeoutCount, 1)
c.RLock()
client, ok := c.clients[msg.clientID]
c.RUnlock()
if ok {
client.TimedOutMessage()
}
Expand Down
2 changes: 2 additions & 0 deletions nsqd/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ func (c *clientV2) String() string {
}

func (c *clientV2) Identify(data identifyDataV2) error {
log.Printf("[%s] IDENTIFY: %+v", c, data)

// TODO: for backwards compatibility, remove in 1.0
hostname := data.Hostname
if hostname == "" {
Expand Down
13 changes: 8 additions & 5 deletions nsqd/in_flight_pqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,16 @@ func (pq *inFlightPqueue) Pop() *Message {
}

func (pq *inFlightPqueue) Remove(i int) *Message {
n := len(*pq) - 1
if n != i {
pq.Swap(i, n)
pq.down(i, n)
n := len(*pq)
if n-1 != i {
pq.Swap(i, n-1)
pq.down(i, n-1)
pq.up(i)
}
return pq.Pop()
x := (*pq)[n-1]
x.index = -1
*pq = (*pq)[0 : n-1]
return x
}

func (pq *inFlightPqueue) PeekAndShift(max int64) (*Message, int64) {
Expand Down
21 changes: 17 additions & 4 deletions nsqd/in_flight_pqueue_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nsqd

import (
"fmt"
"math/rand"
"sort"
"testing"
Expand Down Expand Up @@ -50,19 +51,31 @@ func TestRemove(t *testing.T) {
c := 100
pq := newInFlightPqueue(c)

msgs := make(map[MessageID]*Message)
for i := 0; i < c; i++ {
v := rand.Int()
pq.Push(&Message{pri: int64(v)})
m := &Message{pri: int64(rand.Intn(100000000))}
copy(m.ID[:], fmt.Sprintf("%016d", m.pri))
msgs[m.ID] = m
pq.Push(m)
}

for i := 0; i < 10; i++ {
pq.Remove(rand.Intn((c - 1) - i))
idx := rand.Intn((c - 1) - i)
var fm *Message
for _, m := range msgs {
if m.index == idx {
fm = m
break
}
}
rm := pq.Remove(idx)
assert.Equal(t, fmt.Sprintf("%s", fm.ID), fmt.Sprintf("%s", rm.ID))
}

lastPriority := pq.Pop().pri
for i := 0; i < (c - 10 - 1); i++ {
msg := pq.Pop()
assert.Equal(t, lastPriority < msg.pri, true)
assert.Equal(t, lastPriority <= msg.pri, true)
lastPriority = msg.pri
}
}

0 comments on commit 0c33ce3

Please sign in to comment.