Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed May 23, 2013
1 parent 82be80d commit fb15280
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 12 deletions.
31 changes: 19 additions & 12 deletions nsqd/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,24 +231,31 @@ func (c *ClientV2) SetHeartbeatInterval(desiredInterval int) error {
return nil
}

func (c *ClientV2) SetOutputBufferSize(size int) error {
func (c *ClientV2) SetOutputBufferSize(desiredSize int) error {
c.Lock()
defer c.Unlock()

err := c.Writer.Flush()
if err != nil {
return err
}
var size int

if size < 0 || int64(size) > nsqd.options.maxOutputBufferSize {
return errors.New(fmt.Sprintf("output buffer size (%d) is invalid", size))
}

if size == 0 {
switch {
case desiredSize == -1:
// effectively no buffer (every write will go directly to the wrapped net.Conn)
size = 1
case desiredSize == 0:
// do nothing (use default)
case desiredSize >= 64 && desiredSize <= int(nsqd.options.maxOutputBufferSize):
size = desiredSize
default:
return errors.New(fmt.Sprintf("output buffer size (%d) is invalid", desiredSize))
}

if size > 0 {
err := c.Writer.Flush()
if err != nil {
return err
}
c.Writer = bufio.NewWriterSize(c.Conn, size)
}
c.Writer = bufio.NewWriterSize(c.Conn, size)

return nil
}
Expand All @@ -261,7 +268,7 @@ func (c *ClientV2) SetOutputBufferTimeout(desiredTimeout int) error {
timeout = -1
case desiredTimeout == 0:
// do nothing (use default)
case desiredTimeout >= 1000 &&
case desiredTimeout >= 5 &&
desiredTimeout <= int(nsqd.options.maxOutputBufferTimeout/time.Millisecond):
timeout = (time.Duration(desiredTimeout) * time.Millisecond)
default:
Expand Down
79 changes: 79 additions & 0 deletions nsqd/protocol_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ func identifyFeatureNegotiation(t *testing.T, conn net.Conn) []byte {
return data
}

func identifyOutputBuffering(t *testing.T, conn net.Conn, size int, timeout int, f int32, d string) {
ci := make(map[string]interface{})
ci["short_id"] = "test"
ci["long_id"] = "test"
ci["output_buffer_size"] = size
ci["output_buffer_timeout"] = timeout
cmd, _ := nsq.Identify(ci)
err := cmd.Write(conn)
assert.Equal(t, err, nil)
readValidate(t, conn, f, d)
}

func sub(t *testing.T, conn net.Conn, topicName string, channelName string) {
err := nsq.Subscribe(topicName, channelName).Write(conn)
assert.Equal(t, err, nil)
Expand Down Expand Up @@ -610,6 +622,73 @@ func TestFatalError(t *testing.T) {
assert.NotEqual(t, err, nil)
}

func TestOutputBuffering(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)

*verbose = true
options := NewNsqdOptions()
options.maxOutputBufferSize = 512 * 1024
options.maxOutputBufferTimeout = time.Second
tcpAddr, _ := mustStartNSQd(options)
defer nsqd.Exit()

topicName := "test_output_buffering" + strconv.Itoa(int(time.Now().Unix()))

conn, err := mustConnectNSQd(tcpAddr)
assert.Equal(t, err, nil)

outputBufferSize := 256 * 1024
outputBufferTimeout := 500

topic := nsqd.GetTopic(topicName)
msg := nsq.NewMessage(<-nsqd.idChan, make([]byte, outputBufferSize-1024))
topic.PutMessage(msg)

identifyOutputBuffering(t, conn, outputBufferSize, outputBufferTimeout, nsq.FrameTypeResponse, "OK")
sub(t, conn, topicName, "ch")

err = nsq.Ready(10).Write(conn)
assert.Equal(t, err, nil)
start := time.Now()

resp, err := nsq.ReadResponse(conn)
assert.Equal(t, err, nil)
end := time.Now()

assert.Equal(t, int(end.Sub(start)/time.Millisecond) >= outputBufferTimeout, true)

frameType, data, err := nsq.UnpackResponse(resp)
msgOut, _ := nsq.DecodeMessage(data)
assert.Equal(t, frameType, nsq.FrameTypeMessage)
assert.Equal(t, msgOut.Id, msg.Id)
}

func TestOutputBufferingValidity(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)

*verbose = true
options := NewNsqdOptions()
options.maxOutputBufferSize = 512 * 1024
options.maxOutputBufferTimeout = time.Second
tcpAddr, _ := mustStartNSQd(options)
defer nsqd.Exit()

conn, err := mustConnectNSQd(tcpAddr)
assert.Equal(t, err, nil)

identifyOutputBuffering(t, conn, 512*1024, 1000, nsq.FrameTypeResponse, "OK")
identifyOutputBuffering(t, conn, -1, -1, nsq.FrameTypeResponse, "OK")
identifyOutputBuffering(t, conn, 0, 0, nsq.FrameTypeResponse, "OK")
identifyOutputBuffering(t, conn, 512*1024+1, 0, nsq.FrameTypeError, fmt.Sprintf("E_BAD_BODY IDENTIFY output buffer size (%d) is invalid", 512*1024+1))

conn, err = mustConnectNSQd(tcpAddr)
assert.Equal(t, err, nil)

identifyOutputBuffering(t, conn, 0, 1001, nsq.FrameTypeError, "E_BAD_BODY IDENTIFY output buffer timeout (1001) is invalid")
}

func BenchmarkProtocolV2Exec(b *testing.B) {
b.StopTimer()
log.SetOutput(ioutil.Discard)
Expand Down

0 comments on commit fb15280

Please sign in to comment.