Skip to content

Commit

Permalink
Merge pull request #144 from mreiferson/empty_144
Browse files Browse the repository at this point in the history
nsqd: empty queue does not empty deferred/inflight
  • Loading branch information
jehiah committed Feb 5, 2013
2 parents 504f1d3 + f929bf7 commit a054ba5
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 121 deletions.
125 changes: 88 additions & 37 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,30 +85,30 @@ func NewChannel(topicName string, channelName string, options *nsqdOptions,
notifier Notifier, deleteCallback func(*Channel)) *Channel {
// backend names, for uniqueness, automatically include the topic... <topic>:<channel>
backendName := topicName + ":" + channelName
pqSize := int(math.Max(1, float64(options.memQueueSize)/10))
c := &Channel{
topicName: topicName,
name: channelName,
incomingMsgChan: make(chan *nsq.Message, 1),
memoryMsgChan: make(chan *nsq.Message, options.memQueueSize),
clientMsgChan: make(chan *nsq.Message),
exitChan: make(chan int),
clients: make([]Consumer, 0, 5),
inFlightMessages: make(map[nsq.MessageID]*pqueue.Item),
inFlightPQ: pqueue.New(pqSize),
deferredMessages: make(map[nsq.MessageID]*pqueue.Item),
deferredPQ: pqueue.New(pqSize),
deleteCallback: deleteCallback,
notifier: notifier,
options: options,
topicName: topicName,
name: channelName,
incomingMsgChan: make(chan *nsq.Message, 1),
memoryMsgChan: make(chan *nsq.Message, options.memQueueSize),
clientMsgChan: make(chan *nsq.Message),
exitChan: make(chan int),
clients: make([]Consumer, 0, 5),
deleteCallback: deleteCallback,
notifier: notifier,
options: options,
}

c.initPQ()

if strings.HasSuffix(channelName, "#ephemeral") {
c.ephemeralChannel = true
c.backend = NewDummyBackendQueue()
} else {
c.backend = NewDiskQueue(backendName, options.dataPath, options.maxBytesPerFile, options.syncEvery)
}

go c.messagePump()

c.waitGroup.Wrap(func() { c.router() })
c.waitGroup.Wrap(func() { c.deferredWorker() })
c.waitGroup.Wrap(func() { c.inFlightWorker() })
Expand All @@ -118,6 +118,21 @@ func NewChannel(topicName string, channelName string, options *nsqdOptions,
return c
}

func (c *Channel) initPQ() {
pqSize := int(math.Max(1, float64(c.options.memQueueSize)/10))

c.inFlightMessages = make(map[nsq.MessageID]*pqueue.Item)
c.deferredMessages = make(map[nsq.MessageID]*pqueue.Item)

c.inFlightMutex.Lock()
c.inFlightPQ = pqueue.New(pqSize)
c.inFlightMutex.Unlock()

c.deferredMutex.Lock()
c.deferredPQ = pqueue.New(pqSize)
c.deferredMutex.Unlock()
}

// Exiting returns a boolean indicating if this channel is closed/exiting
func (c *Channel) Exiting() bool {
return atomic.LoadInt32(&c.exitFlag) == 1
Expand Down Expand Up @@ -166,44 +181,80 @@ func (c *Channel) exit(deleted bool) error {

if deleted {
// empty the queue (deletes the backend files, too)
EmptyQueue(c)
c.Empty()
} else {
// messagePump is responsible for closing the channel it writes to
// this will read until its closed (exited)
for msg := range c.clientMsgChan {
log.Printf("CHANNEL(%s): recovered buffered message from clientMsgChan", c.name)
WriteMessageToBackend(&msgBuf, msg, c)
WriteMessageToBackend(&msgBuf, msg, c.backend)
}

// write anything leftover to disk
if len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.deferredMessages) > 0 {
log.Printf("CHANNEL(%s): flushing %d memory %d in-flight %d deferred messages to backend",
c.name, len(c.memoryMsgChan), len(c.inFlightMessages), len(c.deferredMessages))
}
FlushQueue(c)
c.flush()
}

return c.backend.Close()
}

// MemoryChan implements the Queue interface
func (c *Channel) MemoryChan() chan *nsq.Message {
return c.memoryMsgChan
}
func (c *Channel) Empty() error {
c.Lock()
defer c.Unlock()

// BackendQueue implements the Queue interface
func (c *Channel) BackendQueue() BackendQueue {
return c.backend
}
c.initPQ()

// InFlight implements the Queue interface
func (c *Channel) InFlight() map[nsq.MessageID]*pqueue.Item {
return c.inFlightMessages
for {
select {
case <-c.memoryMsgChan:
default:
goto finish
}
}

finish:
return c.backend.Empty()
}

// Deferred implements the Queue interface
func (c *Channel) Deferred() map[nsq.MessageID]*pqueue.Item {
return c.deferredMessages
// flush persists all the messages in internal memory buffers to the backend
// it does not drain inflight/deferred because it is only called in Close()
func (c *Channel) flush() error {
var msgBuf bytes.Buffer

if len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.deferredMessages) > 0 {
log.Printf("CHANNEL(%s): flushing %d memory %d in-flight %d deferred messages to backend",
c.name, len(c.memoryMsgChan), len(c.inFlightMessages), len(c.deferredMessages))
}

for {
select {
case msg := <-c.memoryMsgChan:
err := WriteMessageToBackend(&msgBuf, msg, c.backend)
if err != nil {
log.Printf("ERROR: failed to write message to backend - %s", err.Error())
}
default:
goto finish
}
}

finish:
for _, item := range c.inFlightMessages {
msg := item.Value.(*inFlightMessage).msg
err := WriteMessageToBackend(&msgBuf, msg, c.backend)
if err != nil {
log.Printf("ERROR: failed to write message to backend - %s", err.Error())
}
}

for _, item := range c.deferredMessages {
msg := item.Value.(*nsq.Message)
err := WriteMessageToBackend(&msgBuf, msg, c.backend)
if err != nil {
log.Printf("ERROR: failed to write message to backend - %s", err.Error())
}
}

return nil
}

func (c *Channel) Depth() int64 {
Expand Down Expand Up @@ -476,7 +527,7 @@ func (c *Channel) router() {
select {
case c.memoryMsgChan <- msg:
default:
err := WriteMessageToBackend(&msgBuf, msg, c)
err := WriteMessageToBackend(&msgBuf, msg, c.backend)
if err != nil {
log.Printf("CHANNEL(%s) ERROR: failed to write message to backend - %s", c.name, err.Error())
// theres not really much we can do at this point, you're certainly
Expand Down
37 changes: 36 additions & 1 deletion nsqd/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ func TestInFlightWorker(t *testing.T) {
nsqd := NewNSQd(1, options)
defer nsqd.Exit()

topic := nsqd.GetTopic("topic")
topicName := "test_in_flight_worker" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)
channel := topic.GetChannel("channel")

for i := 0; i < 1000; i++ {
Expand All @@ -83,3 +84,37 @@ func TestInFlightWorker(t *testing.T) {
assert.Equal(t, len(channel.inFlightMessages), 0)
assert.Equal(t, len(channel.inFlightPQ), 0)
}

func TestChannelEmpty(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)

nsqd := NewNSQd(1, NewNsqdOptions())
defer nsqd.Exit()

topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)
channel := topic.GetChannel("channel")
client := NewClientV2(nil)

msgs := make([]*nsq.Message, 0, 25)
for i := 0; i < 25; i++ {
msg := nsq.NewMessage(<-nsqd.idChan, []byte("test"))
channel.StartInFlightTimeout(msg, client)
msgs = append(msgs, msg)
}

channel.RequeueMessage(client, msgs[len(msgs)-1].Id, 100*time.Millisecond)
assert.Equal(t, len(channel.inFlightMessages), 24)
assert.Equal(t, len(channel.inFlightPQ), 24)
assert.Equal(t, len(channel.deferredMessages), 1)
assert.Equal(t, len(channel.deferredPQ), 1)

channel.Empty()

assert.Equal(t, len(channel.inFlightMessages), 0)
assert.Equal(t, len(channel.inFlightPQ), 0)
assert.Equal(t, len(channel.deferredMessages), 0)
assert.Equal(t, len(channel.deferredPQ), 0)
assert.Equal(t, channel.Depth(), int64(0))
}
2 changes: 1 addition & 1 deletion nsqd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func emptyChannelHandler(w http.ResponseWriter, req *http.Request) {
return
}

err = EmptyQueue(channel)
err = channel.Empty()
if err != nil {
util.ApiResponse(w, 500, "INTERNAL_ERROR", nil)
return
Expand Down
61 changes: 2 additions & 59 deletions nsqd/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package main

import (
"../nsq"
"../util/pqueue"
"bytes"
"log"
)

// BackendQueue represents the behavior for the secondary message
Expand All @@ -17,13 +15,6 @@ type BackendQueue interface {
Empty() error
}

type Queue interface {
MemoryChan() chan *nsq.Message
BackendQueue() BackendQueue
InFlight() map[nsq.MessageID]*pqueue.Item
Deferred() map[nsq.MessageID]*pqueue.Item
}

type DummyBackendQueue struct {
readChan chan []byte
}
Expand Down Expand Up @@ -52,61 +43,13 @@ func (d *DummyBackendQueue) Empty() error {
return nil
}

func EmptyQueue(q Queue) error {
for {
select {
case <-q.MemoryChan():
default:
goto disk
}
}

disk:
return q.BackendQueue().Empty()
}

func FlushQueue(q Queue) error {
var msgBuf bytes.Buffer

for {
select {
case msg := <-q.MemoryChan():
err := WriteMessageToBackend(&msgBuf, msg, q)
if err != nil {
log.Printf("ERROR: failed to write message to backend - %s", err.Error())
}
default:
goto finish
}
}

finish:
for _, item := range q.InFlight() {
msg := item.Value.(*inFlightMessage).msg
err := WriteMessageToBackend(&msgBuf, msg, q)
if err != nil {
log.Printf("ERROR: failed to write message to backend - %s", err.Error())
}
}

for _, item := range q.Deferred() {
msg := item.Value.(*nsq.Message)
err := WriteMessageToBackend(&msgBuf, msg, q)
if err != nil {
log.Printf("ERROR: failed to write message to backend - %s", err.Error())
}
}

return nil
}

func WriteMessageToBackend(buf *bytes.Buffer, msg *nsq.Message, q Queue) error {
func WriteMessageToBackend(buf *bytes.Buffer, msg *nsq.Message, bq BackendQueue) error {
buf.Reset()
err := msg.Write(buf)
if err != nil {
return err
}
err = q.BackendQueue().Put(buf.Bytes())
err = bq.Put(buf.Bytes())
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit a054ba5

Please sign in to comment.