Skip to content

Commit

Permalink
nsqd: simplify optimized v2 message pump
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Dec 21, 2012
1 parent 08f3ce0 commit f2865b6
Showing 1 changed file with 37 additions and 59 deletions.
96 changes: 37 additions & 59 deletions nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ func (p *ProtocolV2) Exec(client *ClientV2, params [][]byte) ([]byte, error) {
func (p *ProtocolV2) messagePump(client *ClientV2) {
var err error
var buf bytes.Buffer
var clientMsgChan chan *nsq.Message
var flusherChan <-chan time.Time

// v2 opportunistically buffers data to clients to reduce write system calls
// we force flush in two cases:
Expand All @@ -178,78 +180,54 @@ func (p *ProtocolV2) messagePump(client *ClientV2) {
for {
if !client.IsReadyForMessages() {
// the client is not ready to receive messages...
clientMsgChan = nil
flusherChan = nil
// force flush
err = p.Flush(client)
if err != nil {
goto exit
}
flushed = true

select {
case <-client.ReadyStateChan:
case <-heartbeat.C:
err = p.Send(client, nsq.FrameTypeResponse, []byte("_heartbeat_"))
if err != nil {
log.Printf("PROTOCOL(V2): error sending heartbeat - %s", err.Error())
}
case <-client.ExitChan:
goto exit
}
} else if flushed {
// last iteration we flushed...
// do not select on the flusher ticker channel
select {
case <-client.ReadyStateChan:
case <-heartbeat.C:
err = p.Send(client, nsq.FrameTypeResponse, []byte("_heartbeat_"))
if err != nil {
log.Printf("PROTOCOL(V2): error sending heartbeat - %s", err.Error())
}
case msg, ok := <-client.Channel.clientMsgChan:
if !ok {
goto exit
}

err = p.SendMessage(client, msg, &buf)
if err != nil {
goto exit
}
flushed = false
case <-client.ExitChan:
goto exit
}
clientMsgChan = client.Channel.clientMsgChan
flusherChan = nil
} else {
// if there isn't any more data we should flush...
// we're buffered (if there isn't any more data we should flush)...
// select on the flusher ticker channel, too
select {
case <-flusher.C:
// if this case wins, we're either starved
// or we won the race between other channels...
// in either case, force flush
err := p.Flush(client)
if err != nil {
goto exit
}
flushed = true
case <-client.ReadyStateChan:
case <-heartbeat.C:
err = p.Send(client, nsq.FrameTypeResponse, []byte("_heartbeat_"))
if err != nil {
log.Printf("PROTOCOL(V2): error sending heartbeat - %s", err.Error())
}
case msg, ok := <-client.Channel.clientMsgChan:
if !ok {
goto exit
}

err = p.SendMessage(client, msg, &buf)
if err != nil {
goto exit
}
flushed = false
case <-client.ExitChan:
clientMsgChan = client.Channel.clientMsgChan
flusherChan = flusher.C
}

select {
case <-flusherChan:
// if this case wins, we're either starved
// or we won the race between other channels...
// in either case, force flush
err := p.Flush(client)
if err != nil {
goto exit
}
flushed = true
case <-client.ReadyStateChan:
case <-heartbeat.C:
err = p.Send(client, nsq.FrameTypeResponse, []byte("_heartbeat_"))
if err != nil {
log.Printf("PROTOCOL(V2): error sending heartbeat - %s", err.Error())
}
case msg, ok := <-clientMsgChan:
if !ok {
goto exit
}

err = p.SendMessage(client, msg, &buf)
if err != nil {
goto exit
}
flushed = false
case <-client.ExitChan:
goto exit
}
}

Expand Down

0 comments on commit f2865b6

Please sign in to comment.