From cbb334c6c6a607ac3432da1eaa179074aa0c06fa Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Wed, 8 May 2013 12:27:20 -0400 Subject: [PATCH 1/4] nsqd: add max_msg_timeout and msg_timeout to IDENTIFY response --- nsqd/protocol_v2.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index b170f767b..39530cd18 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -319,11 +319,15 @@ func (p *ProtocolV2) IDENTIFY(client *ClientV2, params [][]byte) ([]byte, error) resp := okBytes if clientInfo.FeatureNegotiation { resp, err = json.Marshal(struct { - MaxRdyCount int64 `json:"max_rdy_count"` - Version string `json:"version"` + MaxRdyCount int64 `json:"max_rdy_count"` + Version string `json:"version"` + MaxMsgTimeout int64 `json:"max_msg_timeout"` + MsgTimeout int64 `json:"msg_timeout"` }{ - MaxRdyCount: nsqd.options.maxRdyCount, - Version: util.BINARY_VERSION, + MaxRdyCount: nsqd.options.maxRdyCount, + Version: util.BINARY_VERSION, + MaxMsgTimeout: int64(nsqd.options.maxMsgTimeout / time.Millisecond), + MsgTimeout: int64(nsqd.options.msgTimeout / time.Millisecond), }) if err != nil { panic("should never happen") From c2a2e4eab0b43c4d4082266f0e514ff66f93b471 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Sun, 5 May 2013 17:33:13 -0400 Subject: [PATCH 2/4] nsqd: add client configurable output buffering * add --max-output-buffer-size command line option * add --max-output-buffer-timeout command line option --- nsqd/client_v2.go | 84 ++++++++++++++++++++++++++++++++++++++++++--- nsqd/main.go | 5 +++ nsqd/nsqd.go | 6 ++++ nsqd/protocol_v2.go | 40 ++++++++++----------- 4 files changed, 110 insertions(+), 25 deletions(-) diff --git a/nsqd/client_v2.go b/nsqd/client_v2.go index 3abc91d85..d10af30f1 100644 --- a/nsqd/client_v2.go +++ b/nsqd/client_v2.go @@ -12,13 +12,24 @@ import ( "time" ) +type IdentifyDataV2 struct { + ShortId string `json:"short_id"` + LongId string `json:"long_id"` + HeartbeatInterval int `json:"heartbeat_interval"` + OutputBufferSize int `json:"output_buffer_size"` + OutputBufferTimeout int `json:"output_buffer_timeout"` + FeatureNegotiation bool `json:"feature_negotiation"` +} + type ClientV2 struct { net.Conn sync.Mutex // buffered IO - Reader *bufio.Reader - Writer *bufio.Writer + Reader *bufio.Reader + Writer *bufio.Writer + OutputBufferTimeout *time.Ticker + OutputBufferTimeoutUpdateChan chan time.Duration State int32 ReadyCount int64 @@ -53,6 +64,12 @@ func NewClientV2(conn net.Conn) *ClientV2 { c := &ClientV2{ Conn: conn, + + Reader: bufio.NewReaderSize(conn, 16*1024), + Writer: bufio.NewWriterSize(conn, 16*1024), + OutputBufferTimeout: time.NewTicker(5 * time.Millisecond), + OutputBufferTimeoutUpdateChan: make(chan time.Duration, 1), + // ReadyStateChan has a buffer of 1 to guarantee that in the event // there is a race the state update is not lost ReadyStateChan: make(chan int, 1), @@ -60,8 +77,6 @@ func NewClientV2(conn net.Conn) *ClientV2 { ConnectTime: time.Now(), ShortIdentifier: identifier, LongIdentifier: identifier, - Reader: bufio.NewReaderSize(conn, 16*1024), - Writer: bufio.NewWriterSize(conn, 16*1024), State: nsq.StateInit, SubEventChan: make(chan *Channel, 1), @@ -78,6 +93,20 @@ func (c *ClientV2) String() string { return c.RemoteAddr().String() } +func (c *ClientV2) Identify(data IdentifyDataV2) error { + c.ShortIdentifier = data.ShortId + c.LongIdentifier = data.LongId + err := c.SetHeartbeatInterval(data.HeartbeatInterval) + if err != nil { + return err + } + err = c.SetOutputBufferSize(data.OutputBufferSize) + if err != nil { + return err + } + return c.SetOutputBufferTimeout(data.OutputBufferTimeout) +} + func (c *ClientV2) Stats() ClientStats { return ClientStats{ Version: "V2", @@ -201,3 +230,50 @@ func (c *ClientV2) SetHeartbeatInterval(desiredInterval int) error { return nil } + +func (c *ClientV2) SetOutputBufferSize(size int) error { + c.Lock() + defer c.Unlock() + + err := c.Writer.Flush() + if err != nil { + return err + } + + if size < 0 || int64(size) > nsqd.options.maxOutputBufferSize { + return errors.New(fmt.Sprintf("output buffer size (%d) is invalid", size)) + } + + if size == 0 { + // effectively no buffer (every write will go directly to the wrapped net.Conn) + size = 1 + } + c.Writer = bufio.NewWriterSize(c.Conn, size) + + return nil +} + +func (c *ClientV2) SetOutputBufferTimeout(desiredTimeout int) error { + var timeout time.Duration + + switch { + case desiredTimeout == -1: + timeout = -1 + case desiredTimeout == 0: + // do nothing (use default) + case desiredTimeout >= 1000 && + desiredTimeout <= int(nsqd.options.maxOutputBufferTimeout/time.Millisecond): + timeout = (time.Duration(desiredTimeout) * time.Millisecond) + default: + return errors.New(fmt.Sprintf("output buffer timeout (%d) is invalid", desiredTimeout)) + } + + if desiredTimeout != 0 { + select { + case c.OutputBufferTimeoutUpdateChan <- timeout: + default: + } + } + + return nil +} diff --git a/nsqd/main.go b/nsqd/main.go index 40d5ce18e..54876512e 100644 --- a/nsqd/main.go +++ b/nsqd/main.go @@ -39,6 +39,9 @@ var ( statsdInterval = flag.Int("statsd-interval", 30, "seconds between pushing to statsd") broadcastAddress = flag.String("broadcast-address", "", "address that will be registered with lookupd, (default to the OS hostname)") lookupdTCPAddrs = util.StringArray{} + + maxOutputBufferSize = flag.Int64("max-output-buffer-size", 64*1024, "maximum client configurable size (in bytes) for a client output buffer") + maxOutputBufferTimeout = flag.Duration("max-output-buffer-timeout", 1*time.Second, "maximum duration of time between flushing to clients that a client can configure") ) func init() { @@ -126,6 +129,8 @@ func main() { options.maxMsgTimeout = *maxMsgTimeout options.broadcastAddress = *broadcastAddress options.maxHeartbeatInterval = *maxHeartbeatInterval + options.maxOutputBufferSize = *maxOutputBufferSize + options.maxOutputBufferTimeout = *maxOutputBufferTimeout nsqd = NewNSQd(*workerId, options) nsqd.tcpAddr = tcpAddr diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 9f3eea280..afbfb58ec 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -52,6 +52,9 @@ type nsqdOptions struct { clientTimeout time.Duration maxHeartbeatInterval time.Duration broadcastAddress string + + maxOutputBufferSize int64 + maxOutputBufferTimeout time.Duration } func NewNsqdOptions() *nsqdOptions { @@ -68,6 +71,9 @@ func NewNsqdOptions() *nsqdOptions { clientTimeout: nsq.DefaultClientTimeout, maxHeartbeatInterval: 60 * time.Second, broadcastAddress: "", + + maxOutputBufferSize: 64 * 1024, + maxOutputBufferTimeout: 1 * time.Second, } } diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 39530cd18..fb0c112b8 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -181,6 +181,9 @@ func (p *ProtocolV2) messagePump(client *ClientV2) { var buf bytes.Buffer var clientMsgChan chan *nsq.Message var subChannel *Channel + // NOTE: `flusherChan` is used to bound message latency for + // the pathological case of a channel on a low volume topic + // with >1 clients having >1 RDY counts var flusherChan <-chan time.Time // v2 opportunistically buffers data to clients to reduce write system calls @@ -189,13 +192,10 @@ func (p *ProtocolV2) messagePump(client *ClientV2) { // 2. we're buffered and the channel has nothing left to send us // (ie. we would block in this loop anyway) // - // NOTE: `flusher` is used to bound message latency for - // the pathological case of a channel on a low volume topic - // with >1 clients having >1 RDY counts - flusher := time.NewTicker(5 * time.Millisecond) - flushed := true subEventChan := client.SubEventChan heartbeatUpdateChan := client.HeartbeatUpdateChan + outputBufferTimeoutUpdateChan := client.OutputBufferTimeoutUpdateChan + flushed := true for { if subChannel == nil || !client.IsReadyForMessages() { @@ -217,7 +217,7 @@ func (p *ProtocolV2) messagePump(client *ClientV2) { // we're buffered (if there isn't any more data we should flush)... // select on the flusher ticker channel, too clientMsgChan = subChannel.clientMsgChan - flusherChan = flusher.C + flusherChan = client.OutputBufferTimeout.C } select { @@ -234,12 +234,18 @@ func (p *ProtocolV2) messagePump(client *ClientV2) { // you can't subscribe anymore subEventChan = nil case <-client.ReadyStateChan: + case timeout := <-outputBufferTimeoutUpdateChan: + client.OutputBufferTimeout.Stop() + if timeout > 0 { + client.OutputBufferTimeout = time.NewTicker(timeout) + } + // you can't update output buffer timeout anymore + outputBufferTimeoutUpdateChan = nil case interval := <-heartbeatUpdateChan: client.Heartbeat.Stop() if interval > 0 { client.Heartbeat = time.NewTicker(interval) } - // you can't update heartbeat anymore heartbeatUpdateChan = nil case <-client.Heartbeat.C: @@ -251,7 +257,6 @@ func (p *ProtocolV2) messagePump(client *ClientV2) { if !ok { goto exit } - err = p.SendMessage(client, msg, &buf) if err != nil { goto exit @@ -265,7 +270,7 @@ func (p *ProtocolV2) messagePump(client *ClientV2) { exit: log.Printf("PROTOCOL(V2): [%s] exiting messagePump", client) client.Heartbeat.Stop() - flusher.Stop() + client.OutputBufferTimeout.Stop() if subChannel != nil { subChannel.RemoveClient(client) } @@ -298,31 +303,24 @@ func (p *ProtocolV2) IDENTIFY(client *ClientV2, params [][]byte) ([]byte, error) } // body is a json structure with producer information - clientInfo := struct { - ShortId string `json:"short_id"` - LongId string `json:"long_id"` - HeartbeatInterval int `json:"heartbeat_interval"` - FeatureNegotiation bool `json:"feature_negotiation"` - }{} - err = json.Unmarshal(body, &clientInfo) + var identifyData IdentifyDataV2 + err = json.Unmarshal(body, &identifyData) if err != nil { return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to decode JSON body") } - client.ShortIdentifier = clientInfo.ShortId - client.LongIdentifier = clientInfo.LongId - err = client.SetHeartbeatInterval(clientInfo.HeartbeatInterval) + err = client.Identify(identifyData) if err != nil { return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY "+err.Error()) } resp := okBytes - if clientInfo.FeatureNegotiation { + if identifyData.FeatureNegotiation { resp, err = json.Marshal(struct { MaxRdyCount int64 `json:"max_rdy_count"` Version string `json:"version"` MaxMsgTimeout int64 `json:"max_msg_timeout"` - MsgTimeout int64 `json:"msg_timeout"` + MsgTimeout int64 `json:"msg_timeout"` }{ MaxRdyCount: nsqd.options.maxRdyCount, Version: util.BINARY_VERSION, From 82be80d6ea23da25223445137a5608afeccbfc56 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Fri, 10 May 2013 16:38:47 -0400 Subject: [PATCH 3/4] nsqd: re-organize command line flags --- nsqd/main.go | 50 +++++++++++++++++++++++++++++--------------------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/nsqd/main.go b/nsqd/main.go index 54876512e..0e570a2e7 100644 --- a/nsqd/main.go +++ b/nsqd/main.go @@ -20,28 +20,36 @@ import ( ) var ( - showVersion = flag.Bool("version", false, "print version string") - httpAddress = flag.String("http-address", "0.0.0.0:4151", ": to listen on for HTTP clients") - tcpAddress = flag.String("tcp-address", "0.0.0.0:4150", ": to listen on for TCP clients") - memQueueSize = flag.Int64("mem-queue-size", 10000, "number of messages to keep in memory (per topic/channel)") - maxBytesPerFile = flag.Int64("max-bytes-per-file", 104857600, "number of bytes per diskqueue file before rolling") - syncEvery = flag.Int64("sync-every", 2500, "number of messages between diskqueue syncs") - msgTimeout = flag.String("msg-timeout", "60s", "duration to wait before auto-requeing a message") - maxMessageSize = flag.Int64("max-message-size", 1024768, "maximum size of a single message in bytes") - maxBodySize = flag.Int64("max-body-size", 5*1024768, "maximum size of a single command body") - maxMsgTimeout = flag.Duration("max-msg-timeout", 15*time.Minute, "maximum duration before a message will timeout") - maxHeartbeatInterval = flag.Duration("max-heartbeat-interval", 60*time.Second, "maximum duration of time between heartbeats that a client can configure") - maxRdyCount = flag.Int64("max-rdy-count", 2500, "maximum RDY count for a single client") - dataPath = flag.String("data-path", "", "path to store disk-backed messages") - workerId = flag.Int64("worker-id", 0, "unique identifier (int) for this worker (will default to a hash of hostname)") - verbose = flag.Bool("verbose", false, "enable verbose logging") - statsdAddress = flag.String("statsd-address", "", "UDP : of a statsd daemon for writing stats") - statsdInterval = flag.Int("statsd-interval", 30, "seconds between pushing to statsd") - broadcastAddress = flag.String("broadcast-address", "", "address that will be registered with lookupd, (default to the OS hostname)") - lookupdTCPAddrs = util.StringArray{} - + // basic options + showVersion = flag.Bool("version", false, "print version string") + verbose = flag.Bool("verbose", false, "enable verbose logging") + workerId = flag.Int64("worker-id", 0, "unique identifier (int) for this worker (will default to a hash of hostname)") + httpAddress = flag.String("http-address", "0.0.0.0:4151", ": to listen on for HTTP clients") + tcpAddress = flag.String("tcp-address", "0.0.0.0:4150", ": to listen on for TCP clients") + broadcastAddress = flag.String("broadcast-address", "", "address that will be registered with lookupd (defaults to the OS hostname)") + lookupdTCPAddrs = util.StringArray{} + + // diskqueue options + dataPath = flag.String("data-path", "", "path to store disk-backed messages") + memQueueSize = flag.Int64("mem-queue-size", 10000, "number of messages to keep in memory (per topic/channel)") + maxBytesPerFile = flag.Int64("max-bytes-per-file", 104857600, "number of bytes per diskqueue file before rolling") + syncEvery = flag.Int64("sync-every", 2500, "number of messages between diskqueue syncs") + + // msg and command options + msgTimeout = flag.String("msg-timeout", "60s", "duration to wait before auto-requeing a message") + maxMsgTimeout = flag.Duration("max-msg-timeout", 15*time.Minute, "maximum duration before a message will timeout") + maxMessageSize = flag.Int64("max-message-size", 1024768, "maximum size of a single message in bytes") + maxBodySize = flag.Int64("max-body-size", 5*1024768, "maximum size of a single command body") + + // client overridable configuration options + maxHeartbeatInterval = flag.Duration("max-heartbeat-interval", 60*time.Second, "maximum client configurable duration of time between client heartbeats") + maxRdyCount = flag.Int64("max-rdy-count", 2500, "maximum RDY count for a client") maxOutputBufferSize = flag.Int64("max-output-buffer-size", 64*1024, "maximum client configurable size (in bytes) for a client output buffer") - maxOutputBufferTimeout = flag.Duration("max-output-buffer-timeout", 1*time.Second, "maximum duration of time between flushing to clients that a client can configure") + maxOutputBufferTimeout = flag.Duration("max-output-buffer-timeout", 1*time.Second, "maximum client configurable duration of time between flushing to a client") + + // statsd integration options + statsdAddress = flag.String("statsd-address", "", "UDP : of a statsd daemon for writing stats") + statsdInterval = flag.Int("statsd-interval", 30, "seconds between pushing to statsd") ) func init() { From fb152807bd3a7b6b16280e11023dc31841a67d17 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Thu, 23 May 2013 16:03:39 -0400 Subject: [PATCH 4/4] add tests --- nsqd/client_v2.go | 31 ++++++++++------ nsqd/protocol_v2_test.go | 79 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 12 deletions(-) diff --git a/nsqd/client_v2.go b/nsqd/client_v2.go index d10af30f1..38f7d0ddc 100644 --- a/nsqd/client_v2.go +++ b/nsqd/client_v2.go @@ -231,24 +231,31 @@ func (c *ClientV2) SetHeartbeatInterval(desiredInterval int) error { return nil } -func (c *ClientV2) SetOutputBufferSize(size int) error { +func (c *ClientV2) SetOutputBufferSize(desiredSize int) error { c.Lock() defer c.Unlock() - err := c.Writer.Flush() - if err != nil { - return err - } + var size int - if size < 0 || int64(size) > nsqd.options.maxOutputBufferSize { - return errors.New(fmt.Sprintf("output buffer size (%d) is invalid", size)) - } - - if size == 0 { + switch { + case desiredSize == -1: // effectively no buffer (every write will go directly to the wrapped net.Conn) size = 1 + case desiredSize == 0: + // do nothing (use default) + case desiredSize >= 64 && desiredSize <= int(nsqd.options.maxOutputBufferSize): + size = desiredSize + default: + return errors.New(fmt.Sprintf("output buffer size (%d) is invalid", desiredSize)) + } + + if size > 0 { + err := c.Writer.Flush() + if err != nil { + return err + } + c.Writer = bufio.NewWriterSize(c.Conn, size) } - c.Writer = bufio.NewWriterSize(c.Conn, size) return nil } @@ -261,7 +268,7 @@ func (c *ClientV2) SetOutputBufferTimeout(desiredTimeout int) error { timeout = -1 case desiredTimeout == 0: // do nothing (use default) - case desiredTimeout >= 1000 && + case desiredTimeout >= 5 && desiredTimeout <= int(nsqd.options.maxOutputBufferTimeout/time.Millisecond): timeout = (time.Duration(desiredTimeout) * time.Millisecond) default: diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index f4d3ea6e9..c944d6019 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -75,6 +75,18 @@ func identifyFeatureNegotiation(t *testing.T, conn net.Conn) []byte { return data } +func identifyOutputBuffering(t *testing.T, conn net.Conn, size int, timeout int, f int32, d string) { + ci := make(map[string]interface{}) + ci["short_id"] = "test" + ci["long_id"] = "test" + ci["output_buffer_size"] = size + ci["output_buffer_timeout"] = timeout + cmd, _ := nsq.Identify(ci) + err := cmd.Write(conn) + assert.Equal(t, err, nil) + readValidate(t, conn, f, d) +} + func sub(t *testing.T, conn net.Conn, topicName string, channelName string) { err := nsq.Subscribe(topicName, channelName).Write(conn) assert.Equal(t, err, nil) @@ -610,6 +622,73 @@ func TestFatalError(t *testing.T) { assert.NotEqual(t, err, nil) } +func TestOutputBuffering(t *testing.T) { + log.SetOutput(ioutil.Discard) + defer log.SetOutput(os.Stdout) + + *verbose = true + options := NewNsqdOptions() + options.maxOutputBufferSize = 512 * 1024 + options.maxOutputBufferTimeout = time.Second + tcpAddr, _ := mustStartNSQd(options) + defer nsqd.Exit() + + topicName := "test_output_buffering" + strconv.Itoa(int(time.Now().Unix())) + + conn, err := mustConnectNSQd(tcpAddr) + assert.Equal(t, err, nil) + + outputBufferSize := 256 * 1024 + outputBufferTimeout := 500 + + topic := nsqd.GetTopic(topicName) + msg := nsq.NewMessage(<-nsqd.idChan, make([]byte, outputBufferSize-1024)) + topic.PutMessage(msg) + + identifyOutputBuffering(t, conn, outputBufferSize, outputBufferTimeout, nsq.FrameTypeResponse, "OK") + sub(t, conn, topicName, "ch") + + err = nsq.Ready(10).Write(conn) + assert.Equal(t, err, nil) + start := time.Now() + + resp, err := nsq.ReadResponse(conn) + assert.Equal(t, err, nil) + end := time.Now() + + assert.Equal(t, int(end.Sub(start)/time.Millisecond) >= outputBufferTimeout, true) + + frameType, data, err := nsq.UnpackResponse(resp) + msgOut, _ := nsq.DecodeMessage(data) + assert.Equal(t, frameType, nsq.FrameTypeMessage) + assert.Equal(t, msgOut.Id, msg.Id) +} + +func TestOutputBufferingValidity(t *testing.T) { + log.SetOutput(ioutil.Discard) + defer log.SetOutput(os.Stdout) + + *verbose = true + options := NewNsqdOptions() + options.maxOutputBufferSize = 512 * 1024 + options.maxOutputBufferTimeout = time.Second + tcpAddr, _ := mustStartNSQd(options) + defer nsqd.Exit() + + conn, err := mustConnectNSQd(tcpAddr) + assert.Equal(t, err, nil) + + identifyOutputBuffering(t, conn, 512*1024, 1000, nsq.FrameTypeResponse, "OK") + identifyOutputBuffering(t, conn, -1, -1, nsq.FrameTypeResponse, "OK") + identifyOutputBuffering(t, conn, 0, 0, nsq.FrameTypeResponse, "OK") + identifyOutputBuffering(t, conn, 512*1024+1, 0, nsq.FrameTypeError, fmt.Sprintf("E_BAD_BODY IDENTIFY output buffer size (%d) is invalid", 512*1024+1)) + + conn, err = mustConnectNSQd(tcpAddr) + assert.Equal(t, err, nil) + + identifyOutputBuffering(t, conn, 0, 1001, nsq.FrameTypeError, "E_BAD_BODY IDENTIFY output buffer timeout (1001) is invalid") +} + func BenchmarkProtocolV2Exec(b *testing.B) { b.StopTimer() log.SetOutput(ioutil.Discard)