Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: client configurable output buffering #196

Merged
merged 4 commits into from
Jun 1, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 87 additions & 4 deletions nsqd/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,24 @@ import (
"time"
)

type IdentifyDataV2 struct {
ShortId string `json:"short_id"`
LongId string `json:"long_id"`
HeartbeatInterval int `json:"heartbeat_interval"`
OutputBufferSize int `json:"output_buffer_size"`
OutputBufferTimeout int `json:"output_buffer_timeout"`
FeatureNegotiation bool `json:"feature_negotiation"`
}

type ClientV2 struct {
net.Conn
sync.Mutex

// buffered IO
Reader *bufio.Reader
Writer *bufio.Writer
Reader *bufio.Reader
Writer *bufio.Writer
OutputBufferTimeout *time.Ticker
OutputBufferTimeoutUpdateChan chan time.Duration

State int32
ReadyCount int64
Expand Down Expand Up @@ -53,15 +64,19 @@ func NewClientV2(conn net.Conn) *ClientV2 {

c := &ClientV2{
Conn: conn,

Reader: bufio.NewReaderSize(conn, 16*1024),
Writer: bufio.NewWriterSize(conn, 16*1024),
OutputBufferTimeout: time.NewTicker(5 * time.Millisecond),
OutputBufferTimeoutUpdateChan: make(chan time.Duration, 1),

// ReadyStateChan has a buffer of 1 to guarantee that in the event
// there is a race the state update is not lost
ReadyStateChan: make(chan int, 1),
ExitChan: make(chan int),
ConnectTime: time.Now(),
ShortIdentifier: identifier,
LongIdentifier: identifier,
Reader: bufio.NewReaderSize(conn, 16*1024),
Writer: bufio.NewWriterSize(conn, 16*1024),
State: nsq.StateInit,
SubEventChan: make(chan *Channel, 1),

Expand All @@ -78,6 +93,20 @@ func (c *ClientV2) String() string {
return c.RemoteAddr().String()
}

func (c *ClientV2) Identify(data IdentifyDataV2) error {
c.ShortIdentifier = data.ShortId
c.LongIdentifier = data.LongId
err := c.SetHeartbeatInterval(data.HeartbeatInterval)
if err != nil {
return err
}
err = c.SetOutputBufferSize(data.OutputBufferSize)
if err != nil {
return err
}
return c.SetOutputBufferTimeout(data.OutputBufferTimeout)
}

func (c *ClientV2) Stats() ClientStats {
return ClientStats{
Version: "V2",
Expand Down Expand Up @@ -201,3 +230,57 @@ func (c *ClientV2) SetHeartbeatInterval(desiredInterval int) error {

return nil
}

func (c *ClientV2) SetOutputBufferSize(desiredSize int) error {
c.Lock()
defer c.Unlock()

var size int

switch {
case desiredSize == -1:
// effectively no buffer (every write will go directly to the wrapped net.Conn)
size = 1
case desiredSize == 0:
// do nothing (use default)
case desiredSize >= 64 && desiredSize <= int(nsqd.options.maxOutputBufferSize):
size = desiredSize
default:
return errors.New(fmt.Sprintf("output buffer size (%d) is invalid", desiredSize))
}

if size > 0 {
err := c.Writer.Flush()
if err != nil {
return err
}
c.Writer = bufio.NewWriterSize(c.Conn, size)
}

return nil
}

func (c *ClientV2) SetOutputBufferTimeout(desiredTimeout int) error {
var timeout time.Duration

switch {
case desiredTimeout == -1:
timeout = -1
case desiredTimeout == 0:
// do nothing (use default)
case desiredTimeout >= 5 &&
desiredTimeout <= int(nsqd.options.maxOutputBufferTimeout/time.Millisecond):
timeout = (time.Duration(desiredTimeout) * time.Millisecond)
default:
return errors.New(fmt.Sprintf("output buffer timeout (%d) is invalid", desiredTimeout))
}

if desiredTimeout != 0 {
select {
case c.OutputBufferTimeoutUpdateChan <- timeout:
default:
}
}

return nil
}
51 changes: 32 additions & 19 deletions nsqd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,36 @@ import (
)

var (
showVersion = flag.Bool("version", false, "print version string")
httpAddress = flag.String("http-address", "0.0.0.0:4151", "<addr>:<port> to listen on for HTTP clients")
tcpAddress = flag.String("tcp-address", "0.0.0.0:4150", "<addr>:<port> to listen on for TCP clients")
memQueueSize = flag.Int64("mem-queue-size", 10000, "number of messages to keep in memory (per topic/channel)")
maxBytesPerFile = flag.Int64("max-bytes-per-file", 104857600, "number of bytes per diskqueue file before rolling")
syncEvery = flag.Int64("sync-every", 2500, "number of messages between diskqueue syncs")
msgTimeout = flag.String("msg-timeout", "60s", "duration to wait before auto-requeing a message")
maxMessageSize = flag.Int64("max-message-size", 1024768, "maximum size of a single message in bytes")
maxBodySize = flag.Int64("max-body-size", 5*1024768, "maximum size of a single command body")
maxMsgTimeout = flag.Duration("max-msg-timeout", 15*time.Minute, "maximum duration before a message will timeout")
maxHeartbeatInterval = flag.Duration("max-heartbeat-interval", 60*time.Second, "maximum duration of time between heartbeats that a client can configure")
maxRdyCount = flag.Int64("max-rdy-count", 2500, "maximum RDY count for a single client")
dataPath = flag.String("data-path", "", "path to store disk-backed messages")
workerId = flag.Int64("worker-id", 0, "unique identifier (int) for this worker (will default to a hash of hostname)")
verbose = flag.Bool("verbose", false, "enable verbose logging")
statsdAddress = flag.String("statsd-address", "", "UDP <addr>:<port> of a statsd daemon for writing stats")
statsdInterval = flag.Int("statsd-interval", 30, "seconds between pushing to statsd")
broadcastAddress = flag.String("broadcast-address", "", "address that will be registered with lookupd, (default to the OS hostname)")
lookupdTCPAddrs = util.StringArray{}
// basic options
showVersion = flag.Bool("version", false, "print version string")
verbose = flag.Bool("verbose", false, "enable verbose logging")
workerId = flag.Int64("worker-id", 0, "unique identifier (int) for this worker (will default to a hash of hostname)")
httpAddress = flag.String("http-address", "0.0.0.0:4151", "<addr>:<port> to listen on for HTTP clients")
tcpAddress = flag.String("tcp-address", "0.0.0.0:4150", "<addr>:<port> to listen on for TCP clients")
broadcastAddress = flag.String("broadcast-address", "", "address that will be registered with lookupd (defaults to the OS hostname)")
lookupdTCPAddrs = util.StringArray{}

// diskqueue options
dataPath = flag.String("data-path", "", "path to store disk-backed messages")
memQueueSize = flag.Int64("mem-queue-size", 10000, "number of messages to keep in memory (per topic/channel)")
maxBytesPerFile = flag.Int64("max-bytes-per-file", 104857600, "number of bytes per diskqueue file before rolling")
syncEvery = flag.Int64("sync-every", 2500, "number of messages between diskqueue syncs")

// msg and command options
msgTimeout = flag.String("msg-timeout", "60s", "duration to wait before auto-requeing a message")
maxMsgTimeout = flag.Duration("max-msg-timeout", 15*time.Minute, "maximum duration before a message will timeout")
maxMessageSize = flag.Int64("max-message-size", 1024768, "maximum size of a single message in bytes")
maxBodySize = flag.Int64("max-body-size", 5*1024768, "maximum size of a single command body")

// client overridable configuration options
maxHeartbeatInterval = flag.Duration("max-heartbeat-interval", 60*time.Second, "maximum client configurable duration of time between client heartbeats")
maxRdyCount = flag.Int64("max-rdy-count", 2500, "maximum RDY count for a client")
maxOutputBufferSize = flag.Int64("max-output-buffer-size", 64*1024, "maximum client configurable size (in bytes) for a client output buffer")
maxOutputBufferTimeout = flag.Duration("max-output-buffer-timeout", 1*time.Second, "maximum client configurable duration of time between flushing to a client")

// statsd integration options
statsdAddress = flag.String("statsd-address", "", "UDP <addr>:<port> of a statsd daemon for writing stats")
statsdInterval = flag.Int("statsd-interval", 30, "seconds between pushing to statsd")
)

func init() {
Expand Down Expand Up @@ -126,6 +137,8 @@ func main() {
options.maxMsgTimeout = *maxMsgTimeout
options.broadcastAddress = *broadcastAddress
options.maxHeartbeatInterval = *maxHeartbeatInterval
options.maxOutputBufferSize = *maxOutputBufferSize
options.maxOutputBufferTimeout = *maxOutputBufferTimeout

nsqd = NewNSQd(*workerId, options)
nsqd.tcpAddr = tcpAddr
Expand Down
6 changes: 6 additions & 0 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type nsqdOptions struct {
clientTimeout time.Duration
maxHeartbeatInterval time.Duration
broadcastAddress string

maxOutputBufferSize int64
maxOutputBufferTimeout time.Duration
}

func NewNsqdOptions() *nsqdOptions {
Expand All @@ -68,6 +71,9 @@ func NewNsqdOptions() *nsqdOptions {
clientTimeout: nsq.DefaultClientTimeout,
maxHeartbeatInterval: 60 * time.Second,
broadcastAddress: "",

maxOutputBufferSize: 64 * 1024,
maxOutputBufferTimeout: 1 * time.Second,
}
}

Expand Down
50 changes: 26 additions & 24 deletions nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ func (p *ProtocolV2) messagePump(client *ClientV2) {
var buf bytes.Buffer
var clientMsgChan chan *nsq.Message
var subChannel *Channel
// NOTE: `flusherChan` is used to bound message latency for
// the pathological case of a channel on a low volume topic
// with >1 clients having >1 RDY counts
var flusherChan <-chan time.Time

// v2 opportunistically buffers data to clients to reduce write system calls
Expand All @@ -189,13 +192,10 @@ func (p *ProtocolV2) messagePump(client *ClientV2) {
// 2. we're buffered and the channel has nothing left to send us
// (ie. we would block in this loop anyway)
//
// NOTE: `flusher` is used to bound message latency for
// the pathological case of a channel on a low volume topic
// with >1 clients having >1 RDY counts
flusher := time.NewTicker(5 * time.Millisecond)
flushed := true
subEventChan := client.SubEventChan
heartbeatUpdateChan := client.HeartbeatUpdateChan
outputBufferTimeoutUpdateChan := client.OutputBufferTimeoutUpdateChan
flushed := true

for {
if subChannel == nil || !client.IsReadyForMessages() {
Expand All @@ -217,7 +217,7 @@ func (p *ProtocolV2) messagePump(client *ClientV2) {
// we're buffered (if there isn't any more data we should flush)...
// select on the flusher ticker channel, too
clientMsgChan = subChannel.clientMsgChan
flusherChan = flusher.C
flusherChan = client.OutputBufferTimeout.C
}

select {
Expand All @@ -234,12 +234,18 @@ func (p *ProtocolV2) messagePump(client *ClientV2) {
// you can't subscribe anymore
subEventChan = nil
case <-client.ReadyStateChan:
case timeout := <-outputBufferTimeoutUpdateChan:
client.OutputBufferTimeout.Stop()
if timeout > 0 {
client.OutputBufferTimeout = time.NewTicker(timeout)
}
// you can't update output buffer timeout anymore
outputBufferTimeoutUpdateChan = nil
case interval := <-heartbeatUpdateChan:
client.Heartbeat.Stop()
if interval > 0 {
client.Heartbeat = time.NewTicker(interval)
}

// you can't update heartbeat anymore
heartbeatUpdateChan = nil
case <-client.Heartbeat.C:
Expand All @@ -251,7 +257,6 @@ func (p *ProtocolV2) messagePump(client *ClientV2) {
if !ok {
goto exit
}

err = p.SendMessage(client, msg, &buf)
if err != nil {
goto exit
Expand All @@ -265,7 +270,7 @@ func (p *ProtocolV2) messagePump(client *ClientV2) {
exit:
log.Printf("PROTOCOL(V2): [%s] exiting messagePump", client)
client.Heartbeat.Stop()
flusher.Stop()
client.OutputBufferTimeout.Stop()
if subChannel != nil {
subChannel.RemoveClient(client)
}
Expand Down Expand Up @@ -298,32 +303,29 @@ func (p *ProtocolV2) IDENTIFY(client *ClientV2, params [][]byte) ([]byte, error)
}

// body is a json structure with producer information
clientInfo := struct {
ShortId string `json:"short_id"`
LongId string `json:"long_id"`
HeartbeatInterval int `json:"heartbeat_interval"`
FeatureNegotiation bool `json:"feature_negotiation"`
}{}
err = json.Unmarshal(body, &clientInfo)
var identifyData IdentifyDataV2
err = json.Unmarshal(body, &identifyData)
if err != nil {
return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to decode JSON body")
}

client.ShortIdentifier = clientInfo.ShortId
client.LongIdentifier = clientInfo.LongId
err = client.SetHeartbeatInterval(clientInfo.HeartbeatInterval)
err = client.Identify(identifyData)
if err != nil {
return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY "+err.Error())
}

resp := okBytes
if clientInfo.FeatureNegotiation {
if identifyData.FeatureNegotiation {
resp, err = json.Marshal(struct {
MaxRdyCount int64 `json:"max_rdy_count"`
Version string `json:"version"`
MaxRdyCount int64 `json:"max_rdy_count"`
Version string `json:"version"`
MaxMsgTimeout int64 `json:"max_msg_timeout"`
MsgTimeout int64 `json:"msg_timeout"`
}{
MaxRdyCount: nsqd.options.maxRdyCount,
Version: util.BINARY_VERSION,
MaxRdyCount: nsqd.options.maxRdyCount,
Version: util.BINARY_VERSION,
MaxMsgTimeout: int64(nsqd.options.maxMsgTimeout / time.Millisecond),
MsgTimeout: int64(nsqd.options.msgTimeout / time.Millisecond),
})
if err != nil {
panic("should never happen")
Expand Down
Loading