diff --git a/nsqd/channel.go b/nsqd/channel.go index 0db68ee84..11a8af492 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -85,30 +85,30 @@ func NewChannel(topicName string, channelName string, options *nsqdOptions, notifier Notifier, deleteCallback func(*Channel)) *Channel { // backend names, for uniqueness, automatically include the topic... : backendName := topicName + ":" + channelName - pqSize := int(math.Max(1, float64(options.memQueueSize)/10)) c := &Channel{ - topicName: topicName, - name: channelName, - incomingMsgChan: make(chan *nsq.Message, 1), - memoryMsgChan: make(chan *nsq.Message, options.memQueueSize), - clientMsgChan: make(chan *nsq.Message), - exitChan: make(chan int), - clients: make([]Consumer, 0, 5), - inFlightMessages: make(map[nsq.MessageID]*pqueue.Item), - inFlightPQ: pqueue.New(pqSize), - deferredMessages: make(map[nsq.MessageID]*pqueue.Item), - deferredPQ: pqueue.New(pqSize), - deleteCallback: deleteCallback, - notifier: notifier, - options: options, + topicName: topicName, + name: channelName, + incomingMsgChan: make(chan *nsq.Message, 1), + memoryMsgChan: make(chan *nsq.Message, options.memQueueSize), + clientMsgChan: make(chan *nsq.Message), + exitChan: make(chan int), + clients: make([]Consumer, 0, 5), + deleteCallback: deleteCallback, + notifier: notifier, + options: options, } + + c.initPQ() + if strings.HasSuffix(channelName, "#ephemeral") { c.ephemeralChannel = true c.backend = NewDummyBackendQueue() } else { c.backend = NewDiskQueue(backendName, options.dataPath, options.maxBytesPerFile, options.syncEvery) } + go c.messagePump() + c.waitGroup.Wrap(func() { c.router() }) c.waitGroup.Wrap(func() { c.deferredWorker() }) c.waitGroup.Wrap(func() { c.inFlightWorker() }) @@ -118,6 +118,21 @@ func NewChannel(topicName string, channelName string, options *nsqdOptions, return c } +func (c *Channel) initPQ() { + pqSize := int(math.Max(1, float64(c.options.memQueueSize)/10)) + + c.inFlightMessages = make(map[nsq.MessageID]*pqueue.Item) + c.deferredMessages = make(map[nsq.MessageID]*pqueue.Item) + + c.inFlightMutex.Lock() + c.inFlightPQ = pqueue.New(pqSize) + c.inFlightMutex.Unlock() + + c.deferredMutex.Lock() + c.deferredPQ = pqueue.New(pqSize) + c.deferredMutex.Unlock() +} + // Exiting returns a boolean indicating if this channel is closed/exiting func (c *Channel) Exiting() bool { return atomic.LoadInt32(&c.exitFlag) == 1 @@ -166,44 +181,80 @@ func (c *Channel) exit(deleted bool) error { if deleted { // empty the queue (deletes the backend files, too) - EmptyQueue(c) + c.Empty() } else { // messagePump is responsible for closing the channel it writes to // this will read until its closed (exited) for msg := range c.clientMsgChan { log.Printf("CHANNEL(%s): recovered buffered message from clientMsgChan", c.name) - WriteMessageToBackend(&msgBuf, msg, c) + WriteMessageToBackend(&msgBuf, msg, c.backend) } // write anything leftover to disk - if len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.deferredMessages) > 0 { - log.Printf("CHANNEL(%s): flushing %d memory %d in-flight %d deferred messages to backend", - c.name, len(c.memoryMsgChan), len(c.inFlightMessages), len(c.deferredMessages)) - } - FlushQueue(c) + c.flush() } return c.backend.Close() } -// MemoryChan implements the Queue interface -func (c *Channel) MemoryChan() chan *nsq.Message { - return c.memoryMsgChan -} +func (c *Channel) Empty() error { + c.Lock() + defer c.Unlock() -// BackendQueue implements the Queue interface -func (c *Channel) BackendQueue() BackendQueue { - return c.backend -} + c.initPQ() -// InFlight implements the Queue interface -func (c *Channel) InFlight() map[nsq.MessageID]*pqueue.Item { - return c.inFlightMessages + for { + select { + case <-c.memoryMsgChan: + default: + goto finish + } + } + +finish: + return c.backend.Empty() } -// Deferred implements the Queue interface -func (c *Channel) Deferred() map[nsq.MessageID]*pqueue.Item { - return c.deferredMessages +// flush persists all the messages in internal memory buffers to the backend +// it does not drain inflight/deferred because it is only called in Close() +func (c *Channel) flush() error { + var msgBuf bytes.Buffer + + if len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.deferredMessages) > 0 { + log.Printf("CHANNEL(%s): flushing %d memory %d in-flight %d deferred messages to backend", + c.name, len(c.memoryMsgChan), len(c.inFlightMessages), len(c.deferredMessages)) + } + + for { + select { + case msg := <-c.memoryMsgChan: + err := WriteMessageToBackend(&msgBuf, msg, c.backend) + if err != nil { + log.Printf("ERROR: failed to write message to backend - %s", err.Error()) + } + default: + goto finish + } + } + +finish: + for _, item := range c.inFlightMessages { + msg := item.Value.(*inFlightMessage).msg + err := WriteMessageToBackend(&msgBuf, msg, c.backend) + if err != nil { + log.Printf("ERROR: failed to write message to backend - %s", err.Error()) + } + } + + for _, item := range c.deferredMessages { + msg := item.Value.(*nsq.Message) + err := WriteMessageToBackend(&msgBuf, msg, c.backend) + if err != nil { + log.Printf("ERROR: failed to write message to backend - %s", err.Error()) + } + } + + return nil } func (c *Channel) Depth() int64 { @@ -476,7 +527,7 @@ func (c *Channel) router() { select { case c.memoryMsgChan <- msg: default: - err := WriteMessageToBackend(&msgBuf, msg, c) + err := WriteMessageToBackend(&msgBuf, msg, c.backend) if err != nil { log.Printf("CHANNEL(%s) ERROR: failed to write message to backend - %s", c.name, err.Error()) // theres not really much we can do at this point, you're certainly diff --git a/nsqd/channel_test.go b/nsqd/channel_test.go index cb8970b12..c8f2b1cba 100644 --- a/nsqd/channel_test.go +++ b/nsqd/channel_test.go @@ -67,7 +67,8 @@ func TestInFlightWorker(t *testing.T) { nsqd := NewNSQd(1, options) defer nsqd.Exit() - topic := nsqd.GetTopic("topic") + topicName := "test_in_flight_worker" + strconv.Itoa(int(time.Now().Unix())) + topic := nsqd.GetTopic(topicName) channel := topic.GetChannel("channel") for i := 0; i < 1000; i++ { @@ -83,3 +84,37 @@ func TestInFlightWorker(t *testing.T) { assert.Equal(t, len(channel.inFlightMessages), 0) assert.Equal(t, len(channel.inFlightPQ), 0) } + +func TestChannelEmpty(t *testing.T) { + log.SetOutput(ioutil.Discard) + defer log.SetOutput(os.Stdout) + + nsqd := NewNSQd(1, NewNsqdOptions()) + defer nsqd.Exit() + + topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix())) + topic := nsqd.GetTopic(topicName) + channel := topic.GetChannel("channel") + client := NewClientV2(nil) + + msgs := make([]*nsq.Message, 0, 25) + for i := 0; i < 25; i++ { + msg := nsq.NewMessage(<-nsqd.idChan, []byte("test")) + channel.StartInFlightTimeout(msg, client) + msgs = append(msgs, msg) + } + + channel.RequeueMessage(client, msgs[len(msgs)-1].Id, 100*time.Millisecond) + assert.Equal(t, len(channel.inFlightMessages), 24) + assert.Equal(t, len(channel.inFlightPQ), 24) + assert.Equal(t, len(channel.deferredMessages), 1) + assert.Equal(t, len(channel.deferredPQ), 1) + + channel.Empty() + + assert.Equal(t, len(channel.inFlightMessages), 0) + assert.Equal(t, len(channel.inFlightPQ), 0) + assert.Equal(t, len(channel.deferredMessages), 0) + assert.Equal(t, len(channel.deferredPQ), 0) + assert.Equal(t, channel.Depth(), int64(0)) +} diff --git a/nsqd/http.go b/nsqd/http.go index 96a81f98a..8f782cf5a 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -248,7 +248,7 @@ func emptyChannelHandler(w http.ResponseWriter, req *http.Request) { return } - err = EmptyQueue(channel) + err = channel.Empty() if err != nil { util.ApiResponse(w, 500, "INTERNAL_ERROR", nil) return diff --git a/nsqd/queue.go b/nsqd/queue.go index df773e174..c14ccf151 100644 --- a/nsqd/queue.go +++ b/nsqd/queue.go @@ -2,9 +2,7 @@ package main import ( "../nsq" - "../util/pqueue" "bytes" - "log" ) // BackendQueue represents the behavior for the secondary message @@ -17,13 +15,6 @@ type BackendQueue interface { Empty() error } -type Queue interface { - MemoryChan() chan *nsq.Message - BackendQueue() BackendQueue - InFlight() map[nsq.MessageID]*pqueue.Item - Deferred() map[nsq.MessageID]*pqueue.Item -} - type DummyBackendQueue struct { readChan chan []byte } @@ -52,61 +43,13 @@ func (d *DummyBackendQueue) Empty() error { return nil } -func EmptyQueue(q Queue) error { - for { - select { - case <-q.MemoryChan(): - default: - goto disk - } - } - -disk: - return q.BackendQueue().Empty() -} - -func FlushQueue(q Queue) error { - var msgBuf bytes.Buffer - - for { - select { - case msg := <-q.MemoryChan(): - err := WriteMessageToBackend(&msgBuf, msg, q) - if err != nil { - log.Printf("ERROR: failed to write message to backend - %s", err.Error()) - } - default: - goto finish - } - } - -finish: - for _, item := range q.InFlight() { - msg := item.Value.(*inFlightMessage).msg - err := WriteMessageToBackend(&msgBuf, msg, q) - if err != nil { - log.Printf("ERROR: failed to write message to backend - %s", err.Error()) - } - } - - for _, item := range q.Deferred() { - msg := item.Value.(*nsq.Message) - err := WriteMessageToBackend(&msgBuf, msg, q) - if err != nil { - log.Printf("ERROR: failed to write message to backend - %s", err.Error()) - } - } - - return nil -} - -func WriteMessageToBackend(buf *bytes.Buffer, msg *nsq.Message, q Queue) error { +func WriteMessageToBackend(buf *bytes.Buffer, msg *nsq.Message, bq BackendQueue) error { buf.Reset() err := msg.Write(buf) if err != nil { return err } - err = q.BackendQueue().Put(buf.Bytes()) + err = bq.Put(buf.Bytes()) if err != nil { return err } diff --git a/nsqd/topic.go b/nsqd/topic.go index 4344040f4..e0718f260 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -3,7 +3,6 @@ package main import ( "../nsq" "../util" - "../util/pqueue" "bytes" "errors" "log" @@ -48,22 +47,6 @@ func NewTopic(topicName string, options *nsqdOptions, notifier Notifier) *Topic return topic } -func (t *Topic) MemoryChan() chan *nsq.Message { - return t.memoryMsgChan -} - -func (t *Topic) BackendQueue() BackendQueue { - return t.backend -} - -func (t *Topic) InFlight() map[nsq.MessageID]*pqueue.Item { - return nil -} - -func (t *Topic) Deferred() map[nsq.MessageID]*pqueue.Item { - return nil -} - // Exiting returns a boolean indicating if this topic is closed/exiting func (t *Topic) Exiting() bool { return atomic.LoadInt32(&t.exitFlag) == 1 @@ -222,7 +205,7 @@ func (t *Topic) router() { select { case t.memoryMsgChan <- msg: default: - err := WriteMessageToBackend(&msgBuf, msg, t) + err := WriteMessageToBackend(&msgBuf, msg, t.backend) if err != nil { log.Printf("ERROR: failed to write message to backend - %s", err.Error()) // theres not really much we can do at this point, you're certainly @@ -267,7 +250,7 @@ func (t *Topic) exit(deleted bool) error { if deleted { // empty the queue (deletes the backend files, too) - EmptyQueue(t) + t.Empty() t.Lock() for _, channel := range t.channelMap { @@ -286,11 +269,44 @@ func (t *Topic) exit(deleted bool) error { } // write anything leftover to disk - if len(t.memoryMsgChan) > 0 { - log.Printf("TOPIC(%s): flushing %d memory messages to backend", t.name, len(t.memoryMsgChan)) - } - FlushQueue(t) + t.flush() } return t.backend.Close() } + +func (t *Topic) Empty() error { + for { + select { + case <-t.memoryMsgChan: + default: + goto finish + } + } + +finish: + return t.backend.Empty() +} + +func (t *Topic) flush() error { + var msgBuf bytes.Buffer + + if len(t.memoryMsgChan) > 0 { + log.Printf("TOPIC(%s): flushing %d memory messages to backend", t.name, len(t.memoryMsgChan)) + } + + for { + select { + case msg := <-t.memoryMsgChan: + err := WriteMessageToBackend(&msgBuf, msg, t.backend) + if err != nil { + log.Printf("ERROR: failed to write message to backend - %s", err.Error()) + } + default: + goto finish + } + } + +finish: + return nil +}