diff --git a/nsqd/channel.go b/nsqd/channel.go index 73c40d23b..649ccaa7c 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -632,7 +632,9 @@ func (c *Channel) inFlightWorker() { break } atomic.AddUint64(&c.timeoutCount, 1) + c.RLock() client, ok := c.clients[msg.clientID] + c.RUnlock() if ok { client.TimedOutMessage() } diff --git a/nsqd/client_v2.go b/nsqd/client_v2.go index de55d5a67..ed6b56d02 100644 --- a/nsqd/client_v2.go +++ b/nsqd/client_v2.go @@ -156,6 +156,8 @@ func (c *clientV2) String() string { } func (c *clientV2) Identify(data identifyDataV2) error { + log.Printf("[%s] IDENTIFY: %+v", c, data) + // TODO: for backwards compatibility, remove in 1.0 hostname := data.Hostname if hostname == "" { diff --git a/nsqd/in_flight_pqueue.go b/nsqd/in_flight_pqueue.go index 6af803067..4ad1b491b 100644 --- a/nsqd/in_flight_pqueue.go +++ b/nsqd/in_flight_pqueue.go @@ -43,13 +43,16 @@ func (pq *inFlightPqueue) Pop() *Message { } func (pq *inFlightPqueue) Remove(i int) *Message { - n := len(*pq) - 1 - if n != i { - pq.Swap(i, n) - pq.down(i, n) + n := len(*pq) + if n-1 != i { + pq.Swap(i, n-1) + pq.down(i, n-1) pq.up(i) } - return pq.Pop() + x := (*pq)[n-1] + x.index = -1 + *pq = (*pq)[0 : n-1] + return x } func (pq *inFlightPqueue) PeekAndShift(max int64) (*Message, int64) { diff --git a/nsqd/in_flight_pqueue_test.go b/nsqd/in_flight_pqueue_test.go index 91a0399f7..97a2d0bc1 100644 --- a/nsqd/in_flight_pqueue_test.go +++ b/nsqd/in_flight_pqueue_test.go @@ -1,6 +1,7 @@ package nsqd import ( + "fmt" "math/rand" "sort" "testing" @@ -50,19 +51,31 @@ func TestRemove(t *testing.T) { c := 100 pq := newInFlightPqueue(c) + msgs := make(map[MessageID]*Message) for i := 0; i < c; i++ { - v := rand.Int() - pq.Push(&Message{pri: int64(v)}) + m := &Message{pri: int64(rand.Intn(100000000))} + copy(m.ID[:], fmt.Sprintf("%016d", m.pri)) + msgs[m.ID] = m + pq.Push(m) } for i := 0; i < 10; i++ { - pq.Remove(rand.Intn((c - 1) - i)) + idx := rand.Intn((c - 1) - i) + var fm *Message + for _, m := range msgs { + if m.index == idx { + fm = m + break + } + } + rm := pq.Remove(idx) + assert.Equal(t, fmt.Sprintf("%s", fm.ID), fmt.Sprintf("%s", rm.ID)) } lastPriority := pq.Pop().pri for i := 0; i < (c - 10 - 1); i++ { msg := pq.Pop() - assert.Equal(t, lastPriority < msg.pri, true) + assert.Equal(t, lastPriority <= msg.pri, true) lastPriority = msg.pri } }