From f2865b626f2422073f41b0b1a876de0d8db72dbd Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Fri, 21 Dec 2012 07:52:55 -0500 Subject: [PATCH] nsqd: simplify optimized v2 message pump --- nsqd/protocol_v2.go | 96 +++++++++++++++++---------------------------- 1 file changed, 37 insertions(+), 59 deletions(-) diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 2c61e9adb..4c688bc7b 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -161,6 +161,8 @@ func (p *ProtocolV2) Exec(client *ClientV2, params [][]byte) ([]byte, error) { func (p *ProtocolV2) messagePump(client *ClientV2) { var err error var buf bytes.Buffer + var clientMsgChan chan *nsq.Message + var flusherChan <-chan time.Time // v2 opportunistically buffers data to clients to reduce write system calls // we force flush in two cases: @@ -178,78 +180,54 @@ func (p *ProtocolV2) messagePump(client *ClientV2) { for { if !client.IsReadyForMessages() { // the client is not ready to receive messages... + clientMsgChan = nil + flusherChan = nil // force flush err = p.Flush(client) if err != nil { goto exit } flushed = true - - select { - case <-client.ReadyStateChan: - case <-heartbeat.C: - err = p.Send(client, nsq.FrameTypeResponse, []byte("_heartbeat_")) - if err != nil { - log.Printf("PROTOCOL(V2): error sending heartbeat - %s", err.Error()) - } - case <-client.ExitChan: - goto exit - } } else if flushed { // last iteration we flushed... // do not select on the flusher ticker channel - select { - case <-client.ReadyStateChan: - case <-heartbeat.C: - err = p.Send(client, nsq.FrameTypeResponse, []byte("_heartbeat_")) - if err != nil { - log.Printf("PROTOCOL(V2): error sending heartbeat - %s", err.Error()) - } - case msg, ok := <-client.Channel.clientMsgChan: - if !ok { - goto exit - } - - err = p.SendMessage(client, msg, &buf) - if err != nil { - goto exit - } - flushed = false - case <-client.ExitChan: - goto exit - } + clientMsgChan = client.Channel.clientMsgChan + flusherChan = nil } else { - // if there isn't any more data we should flush... + // we're buffered (if there isn't any more data we should flush)... // select on the flusher ticker channel, too - select { - case <-flusher.C: - // if this case wins, we're either starved - // or we won the race between other channels... - // in either case, force flush - err := p.Flush(client) - if err != nil { - goto exit - } - flushed = true - case <-client.ReadyStateChan: - case <-heartbeat.C: - err = p.Send(client, nsq.FrameTypeResponse, []byte("_heartbeat_")) - if err != nil { - log.Printf("PROTOCOL(V2): error sending heartbeat - %s", err.Error()) - } - case msg, ok := <-client.Channel.clientMsgChan: - if !ok { - goto exit - } - - err = p.SendMessage(client, msg, &buf) - if err != nil { - goto exit - } - flushed = false - case <-client.ExitChan: + clientMsgChan = client.Channel.clientMsgChan + flusherChan = flusher.C + } + + select { + case <-flusherChan: + // if this case wins, we're either starved + // or we won the race between other channels... + // in either case, force flush + err := p.Flush(client) + if err != nil { + goto exit + } + flushed = true + case <-client.ReadyStateChan: + case <-heartbeat.C: + err = p.Send(client, nsq.FrameTypeResponse, []byte("_heartbeat_")) + if err != nil { + log.Printf("PROTOCOL(V2): error sending heartbeat - %s", err.Error()) + } + case msg, ok := <-clientMsgChan: + if !ok { + goto exit + } + + err = p.SendMessage(client, msg, &buf) + if err != nil { goto exit } + flushed = false + case <-client.ExitChan: + goto exit } }