Skip to content

Commit

Permalink
Merge pull request #196 from mreiferson/flush_196
Browse files Browse the repository at this point in the history
nsqd: client configurable output buffering
  • Loading branch information
jehiah committed Jun 1, 2013
2 parents af39bd2 + fb15280 commit da7057d
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 47 deletions.
91 changes: 87 additions & 4 deletions nsqd/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,24 @@ import (
"time"
)

type IdentifyDataV2 struct {
ShortId string `json:"short_id"`
LongId string `json:"long_id"`
HeartbeatInterval int `json:"heartbeat_interval"`
OutputBufferSize int `json:"output_buffer_size"`
OutputBufferTimeout int `json:"output_buffer_timeout"`
FeatureNegotiation bool `json:"feature_negotiation"`
}

type ClientV2 struct {
net.Conn
sync.Mutex

// buffered IO
Reader *bufio.Reader
Writer *bufio.Writer
Reader *bufio.Reader
Writer *bufio.Writer
OutputBufferTimeout *time.Ticker
OutputBufferTimeoutUpdateChan chan time.Duration

State int32
ReadyCount int64
Expand Down Expand Up @@ -53,15 +64,19 @@ func NewClientV2(conn net.Conn) *ClientV2 {

c := &ClientV2{
Conn: conn,

Reader: bufio.NewReaderSize(conn, 16*1024),
Writer: bufio.NewWriterSize(conn, 16*1024),
OutputBufferTimeout: time.NewTicker(5 * time.Millisecond),
OutputBufferTimeoutUpdateChan: make(chan time.Duration, 1),

// ReadyStateChan has a buffer of 1 to guarantee that in the event
// there is a race the state update is not lost
ReadyStateChan: make(chan int, 1),
ExitChan: make(chan int),
ConnectTime: time.Now(),
ShortIdentifier: identifier,
LongIdentifier: identifier,
Reader: bufio.NewReaderSize(conn, 16*1024),
Writer: bufio.NewWriterSize(conn, 16*1024),
State: nsq.StateInit,
SubEventChan: make(chan *Channel, 1),

Expand All @@ -78,6 +93,20 @@ func (c *ClientV2) String() string {
return c.RemoteAddr().String()
}

func (c *ClientV2) Identify(data IdentifyDataV2) error {
c.ShortIdentifier = data.ShortId
c.LongIdentifier = data.LongId
err := c.SetHeartbeatInterval(data.HeartbeatInterval)
if err != nil {
return err
}
err = c.SetOutputBufferSize(data.OutputBufferSize)
if err != nil {
return err
}
return c.SetOutputBufferTimeout(data.OutputBufferTimeout)
}

func (c *ClientV2) Stats() ClientStats {
return ClientStats{
Version: "V2",
Expand Down Expand Up @@ -201,3 +230,57 @@ func (c *ClientV2) SetHeartbeatInterval(desiredInterval int) error {

return nil
}

func (c *ClientV2) SetOutputBufferSize(desiredSize int) error {
c.Lock()
defer c.Unlock()

var size int

switch {
case desiredSize == -1:
// effectively no buffer (every write will go directly to the wrapped net.Conn)
size = 1
case desiredSize == 0:
// do nothing (use default)
case desiredSize >= 64 && desiredSize <= int(nsqd.options.maxOutputBufferSize):
size = desiredSize
default:
return errors.New(fmt.Sprintf("output buffer size (%d) is invalid", desiredSize))
}

if size > 0 {
err := c.Writer.Flush()
if err != nil {
return err
}
c.Writer = bufio.NewWriterSize(c.Conn, size)
}

return nil
}

func (c *ClientV2) SetOutputBufferTimeout(desiredTimeout int) error {
var timeout time.Duration

switch {
case desiredTimeout == -1:
timeout = -1
case desiredTimeout == 0:
// do nothing (use default)
case desiredTimeout >= 5 &&
desiredTimeout <= int(nsqd.options.maxOutputBufferTimeout/time.Millisecond):
timeout = (time.Duration(desiredTimeout) * time.Millisecond)
default:
return errors.New(fmt.Sprintf("output buffer timeout (%d) is invalid", desiredTimeout))
}

if desiredTimeout != 0 {
select {
case c.OutputBufferTimeoutUpdateChan <- timeout:
default:
}
}

return nil
}
51 changes: 32 additions & 19 deletions nsqd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,36 @@ import (
)

var (
showVersion = flag.Bool("version", false, "print version string")
httpAddress = flag.String("http-address", "0.0.0.0:4151", "<addr>:<port> to listen on for HTTP clients")
tcpAddress = flag.String("tcp-address", "0.0.0.0:4150", "<addr>:<port> to listen on for TCP clients")
memQueueSize = flag.Int64("mem-queue-size", 10000, "number of messages to keep in memory (per topic/channel)")
maxBytesPerFile = flag.Int64("max-bytes-per-file", 104857600, "number of bytes per diskqueue file before rolling")
syncEvery = flag.Int64("sync-every", 2500, "number of messages between diskqueue syncs")
msgTimeout = flag.String("msg-timeout", "60s", "duration to wait before auto-requeing a message")
maxMessageSize = flag.Int64("max-message-size", 1024768, "maximum size of a single message in bytes")
maxBodySize = flag.Int64("max-body-size", 5*1024768, "maximum size of a single command body")
maxMsgTimeout = flag.Duration("max-msg-timeout", 15*time.Minute, "maximum duration before a message will timeout")
maxHeartbeatInterval = flag.Duration("max-heartbeat-interval", 60*time.Second, "maximum duration of time between heartbeats that a client can configure")
maxRdyCount = flag.Int64("max-rdy-count", 2500, "maximum RDY count for a single client")
dataPath = flag.String("data-path", "", "path to store disk-backed messages")
workerId = flag.Int64("worker-id", 0, "unique identifier (int) for this worker (will default to a hash of hostname)")
verbose = flag.Bool("verbose", false, "enable verbose logging")
statsdAddress = flag.String("statsd-address", "", "UDP <addr>:<port> of a statsd daemon for writing stats")
statsdInterval = flag.Int("statsd-interval", 30, "seconds between pushing to statsd")
broadcastAddress = flag.String("broadcast-address", "", "address that will be registered with lookupd, (default to the OS hostname)")
lookupdTCPAddrs = util.StringArray{}
// basic options
showVersion = flag.Bool("version", false, "print version string")
verbose = flag.Bool("verbose", false, "enable verbose logging")
workerId = flag.Int64("worker-id", 0, "unique identifier (int) for this worker (will default to a hash of hostname)")
httpAddress = flag.String("http-address", "0.0.0.0:4151", "<addr>:<port> to listen on for HTTP clients")
tcpAddress = flag.String("tcp-address", "0.0.0.0:4150", "<addr>:<port> to listen on for TCP clients")
broadcastAddress = flag.String("broadcast-address", "", "address that will be registered with lookupd (defaults to the OS hostname)")
lookupdTCPAddrs = util.StringArray{}

// diskqueue options
dataPath = flag.String("data-path", "", "path to store disk-backed messages")
memQueueSize = flag.Int64("mem-queue-size", 10000, "number of messages to keep in memory (per topic/channel)")
maxBytesPerFile = flag.Int64("max-bytes-per-file", 104857600, "number of bytes per diskqueue file before rolling")
syncEvery = flag.Int64("sync-every", 2500, "number of messages between diskqueue syncs")

// msg and command options
msgTimeout = flag.String("msg-timeout", "60s", "duration to wait before auto-requeing a message")
maxMsgTimeout = flag.Duration("max-msg-timeout", 15*time.Minute, "maximum duration before a message will timeout")
maxMessageSize = flag.Int64("max-message-size", 1024768, "maximum size of a single message in bytes")
maxBodySize = flag.Int64("max-body-size", 5*1024768, "maximum size of a single command body")

// client overridable configuration options
maxHeartbeatInterval = flag.Duration("max-heartbeat-interval", 60*time.Second, "maximum client configurable duration of time between client heartbeats")
maxRdyCount = flag.Int64("max-rdy-count", 2500, "maximum RDY count for a client")
maxOutputBufferSize = flag.Int64("max-output-buffer-size", 64*1024, "maximum client configurable size (in bytes) for a client output buffer")
maxOutputBufferTimeout = flag.Duration("max-output-buffer-timeout", 1*time.Second, "maximum client configurable duration of time between flushing to a client")

// statsd integration options
statsdAddress = flag.String("statsd-address", "", "UDP <addr>:<port> of a statsd daemon for writing stats")
statsdInterval = flag.Int("statsd-interval", 30, "seconds between pushing to statsd")
)

func init() {
Expand Down Expand Up @@ -126,6 +137,8 @@ func main() {
options.maxMsgTimeout = *maxMsgTimeout
options.broadcastAddress = *broadcastAddress
options.maxHeartbeatInterval = *maxHeartbeatInterval
options.maxOutputBufferSize = *maxOutputBufferSize
options.maxOutputBufferTimeout = *maxOutputBufferTimeout

nsqd = NewNSQd(*workerId, options)
nsqd.tcpAddr = tcpAddr
Expand Down
6 changes: 6 additions & 0 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type nsqdOptions struct {
clientTimeout time.Duration
maxHeartbeatInterval time.Duration
broadcastAddress string

maxOutputBufferSize int64
maxOutputBufferTimeout time.Duration
}

func NewNsqdOptions() *nsqdOptions {
Expand All @@ -68,6 +71,9 @@ func NewNsqdOptions() *nsqdOptions {
clientTimeout: nsq.DefaultClientTimeout,
maxHeartbeatInterval: 60 * time.Second,
broadcastAddress: "",

maxOutputBufferSize: 64 * 1024,
maxOutputBufferTimeout: 1 * time.Second,
}
}

Expand Down
50 changes: 26 additions & 24 deletions nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ func (p *ProtocolV2) messagePump(client *ClientV2) {
var buf bytes.Buffer
var clientMsgChan chan *nsq.Message
var subChannel *Channel
// NOTE: `flusherChan` is used to bound message latency for
// the pathological case of a channel on a low volume topic
// with >1 clients having >1 RDY counts
var flusherChan <-chan time.Time

// v2 opportunistically buffers data to clients to reduce write system calls
Expand All @@ -189,13 +192,10 @@ func (p *ProtocolV2) messagePump(client *ClientV2) {
// 2. we're buffered and the channel has nothing left to send us
// (ie. we would block in this loop anyway)
//
// NOTE: `flusher` is used to bound message latency for
// the pathological case of a channel on a low volume topic
// with >1 clients having >1 RDY counts
flusher := time.NewTicker(5 * time.Millisecond)
flushed := true
subEventChan := client.SubEventChan
heartbeatUpdateChan := client.HeartbeatUpdateChan
outputBufferTimeoutUpdateChan := client.OutputBufferTimeoutUpdateChan
flushed := true

for {
if subChannel == nil || !client.IsReadyForMessages() {
Expand All @@ -217,7 +217,7 @@ func (p *ProtocolV2) messagePump(client *ClientV2) {
// we're buffered (if there isn't any more data we should flush)...
// select on the flusher ticker channel, too
clientMsgChan = subChannel.clientMsgChan
flusherChan = flusher.C
flusherChan = client.OutputBufferTimeout.C
}

select {
Expand All @@ -234,12 +234,18 @@ func (p *ProtocolV2) messagePump(client *ClientV2) {
// you can't subscribe anymore
subEventChan = nil
case <-client.ReadyStateChan:
case timeout := <-outputBufferTimeoutUpdateChan:
client.OutputBufferTimeout.Stop()
if timeout > 0 {
client.OutputBufferTimeout = time.NewTicker(timeout)
}
// you can't update output buffer timeout anymore
outputBufferTimeoutUpdateChan = nil
case interval := <-heartbeatUpdateChan:
client.Heartbeat.Stop()
if interval > 0 {
client.Heartbeat = time.NewTicker(interval)
}

// you can't update heartbeat anymore
heartbeatUpdateChan = nil
case <-client.Heartbeat.C:
Expand All @@ -251,7 +257,6 @@ func (p *ProtocolV2) messagePump(client *ClientV2) {
if !ok {
goto exit
}

err = p.SendMessage(client, msg, &buf)
if err != nil {
goto exit
Expand All @@ -265,7 +270,7 @@ func (p *ProtocolV2) messagePump(client *ClientV2) {
exit:
log.Printf("PROTOCOL(V2): [%s] exiting messagePump", client)
client.Heartbeat.Stop()
flusher.Stop()
client.OutputBufferTimeout.Stop()
if subChannel != nil {
subChannel.RemoveClient(client)
}
Expand Down Expand Up @@ -298,32 +303,29 @@ func (p *ProtocolV2) IDENTIFY(client *ClientV2, params [][]byte) ([]byte, error)
}

// body is a json structure with producer information
clientInfo := struct {
ShortId string `json:"short_id"`
LongId string `json:"long_id"`
HeartbeatInterval int `json:"heartbeat_interval"`
FeatureNegotiation bool `json:"feature_negotiation"`
}{}
err = json.Unmarshal(body, &clientInfo)
var identifyData IdentifyDataV2
err = json.Unmarshal(body, &identifyData)
if err != nil {
return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to decode JSON body")
}

client.ShortIdentifier = clientInfo.ShortId
client.LongIdentifier = clientInfo.LongId
err = client.SetHeartbeatInterval(clientInfo.HeartbeatInterval)
err = client.Identify(identifyData)
if err != nil {
return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY "+err.Error())
}

resp := okBytes
if clientInfo.FeatureNegotiation {
if identifyData.FeatureNegotiation {
resp, err = json.Marshal(struct {
MaxRdyCount int64 `json:"max_rdy_count"`
Version string `json:"version"`
MaxRdyCount int64 `json:"max_rdy_count"`
Version string `json:"version"`
MaxMsgTimeout int64 `json:"max_msg_timeout"`
MsgTimeout int64 `json:"msg_timeout"`
}{
MaxRdyCount: nsqd.options.maxRdyCount,
Version: util.BINARY_VERSION,
MaxRdyCount: nsqd.options.maxRdyCount,
Version: util.BINARY_VERSION,
MaxMsgTimeout: int64(nsqd.options.maxMsgTimeout / time.Millisecond),
MsgTimeout: int64(nsqd.options.msgTimeout / time.Millisecond),
})
if err != nil {
panic("should never happen")
Expand Down
Loading

0 comments on commit da7057d

Please sign in to comment.