Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd micro-optimizations #176

Merged
merged 5 commits into from
Apr 29, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,17 @@ DISCLAIMER: Please keep in mind that NSQ is designed to be used in a distributed
node performance is important, but not the end-all-be-all of what we're looking to achieve. Also,
benchmarks are stupid, but here's a few anyway to ignite the flame:

On a 2012 MacBook Air i7 2ghz (`GOMAXPROCS=1`, `go tip 8bbc0bdf832e`) single publisher, single consumer:
On a 2012 MacBook Air i7 2ghz (`GOMAXPROCS=1`, `go 1.1 beta2 4a712e80e9b1`, NSQ v0.2.19-alpha)
single publisher, single consumer:

```
$ ./nsqd --mem-queue-size=1000000

$ ./bench_writer
2013/01/29 10:24:24 duration: 2.60766631s - 73.144mb/s - 383484.649ops/s - 2.608us/op
2013/04/09 23:25:54 duration: 2.46904784s - 77.250mb/s - 405014.429ops/s - 2.469us/op

$ ./bench_reader
2013/01/29 10:25:43 duration: 6.665561082s - 28.615mb/s - 150024.880ops/s - 6.666us/op
2013/04/09 23:27:53 duration: 5.996050461s - 31.810mb/s - 166776.448ops/s - 5.996us/op
```

### Getting Started
Expand Down
15 changes: 12 additions & 3 deletions nsqd/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ import (
type ClientV2 struct {
net.Conn
sync.Mutex
Reader *bufio.Reader
Writer *bufio.Writer

// buffered IO
Reader *bufio.Reader
Writer *bufio.Writer

State int32
ReadyCount int64
LastReadyCount int64
Expand All @@ -32,6 +35,10 @@ type ClientV2 struct {
LongIdentifier string
SubEventChan chan *Channel

// re-usable buffer for reading the 4-byte lengths off the wire
lenBuf [4]byte
lenSlice []byte

// heartbeats are client configurable via IDENTIFY
Heartbeat *time.Ticker
HeartbeatInterval time.Duration
Expand All @@ -44,7 +51,7 @@ func NewClientV2(conn net.Conn) *ClientV2 {
identifier, _, _ = net.SplitHostPort(conn.RemoteAddr().String())
}

return &ClientV2{
c := &ClientV2{
Conn: conn,
// ReadyStateChan has a buffer of 1 to guarantee that in the event
// there is a race the state update is not lost
Expand All @@ -63,6 +70,8 @@ func NewClientV2(conn net.Conn) *ClientV2 {
HeartbeatInterval: nsqd.options.clientTimeout / 2,
HeartbeatUpdateChan: make(chan time.Duration, 1),
}
c.lenSlice = c.lenBuf[:]
return c
}

func (c *ClientV2) String() string {
Expand Down
24 changes: 24 additions & 0 deletions nsqd/guid_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package main

import (
"github.com/bitly/nsq/nsq"
"testing"
"unsafe"
)

func BenchmarkGUIDCopy(b *testing.B) {
source := make([]byte, 16)
var dest nsq.MessageID
for i := 0; i < b.N; i++ {
copy(dest[:], source)
}
}

func BenchmarkGUIDUnsafe(b *testing.B) {
source := make([]byte, 16)
var dest nsq.MessageID
for i := 0; i < b.N; i++ {
dest = *(*nsq.MessageID)(unsafe.Pointer(&source[0]))
}
_ = dest
}
53 changes: 28 additions & 25 deletions nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@ import (
"net"
"sync/atomic"
"time"
"unsafe"
)

const maxTimeout = time.Hour

var separatorBytes = []byte(" ")
var heartbeatBytes = []byte("_heartbeat_")
var okBytes = []byte("OK")

type ProtocolV2 struct {
nsq.Protocol
}
Expand Down Expand Up @@ -51,7 +56,7 @@ func (p *ProtocolV2) IOLoop(conn net.Conn) error {
if len(line) > 0 && line[len(line)-1] == '\r' {
line = line[:len(line)-1]
}
params := bytes.Split(line, []byte(" "))
params := bytes.Split(line, separatorBytes)

if *verbose {
log.Printf("PROTOCOL(V2): [%s] %s", client, params)
Expand Down Expand Up @@ -238,7 +243,7 @@ func (p *ProtocolV2) messagePump(client *ClientV2) {
// you can't update heartbeat anymore
heartbeatUpdateChan = nil
case <-client.Heartbeat.C:
err = p.Send(client, nsq.FrameTypeResponse, []byte("_heartbeat_"))
err = p.Send(client, nsq.FrameTypeResponse, heartbeatBytes)
if err != nil {
log.Printf("PROTOCOL(V2): error sending heartbeat - %s", err.Error())
}
Expand Down Expand Up @@ -276,8 +281,7 @@ func (p *ProtocolV2) IDENTIFY(client *ClientV2, params [][]byte) ([]byte, error)
return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "cannot IDENTIFY in current state")
}

var bodyLen int32
err = binary.Read(client.Reader, binary.BigEndian, &bodyLen)
bodyLen, err := p.readLen(client)
if err != nil {
return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body size")
}
Expand Down Expand Up @@ -312,7 +316,7 @@ func (p *ProtocolV2) IDENTIFY(client *ClientV2, params [][]byte) ([]byte, error)
return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY "+err.Error())
}

resp := []byte("OK")
resp := okBytes
if clientInfo.FeatureNegotiation {
resp, err = json.Marshal(struct {
MaxRdyCount int64 `json:"max_rdy_count"`
Expand Down Expand Up @@ -363,7 +367,7 @@ func (p *ProtocolV2) SUB(client *ClientV2, params [][]byte) ([]byte, error) {
// update message pump
client.SubEventChan <- channel

return []byte("OK"), nil
return okBytes, nil
}

func (p *ProtocolV2) RDY(client *ClientV2, params [][]byte) ([]byte, error) {
Expand Down Expand Up @@ -402,8 +406,6 @@ func (p *ProtocolV2) RDY(client *ClientV2, params [][]byte) ([]byte, error) {
}

func (p *ProtocolV2) FIN(client *ClientV2, params [][]byte) ([]byte, error) {
var id nsq.MessageID

state := atomic.LoadInt32(&client.State)
if state != nsq.StateSubscribed && state != nsq.StateClosing {
return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "cannot FIN in current state")
Expand All @@ -413,7 +415,7 @@ func (p *ProtocolV2) FIN(client *ClientV2, params [][]byte) ([]byte, error) {
return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "FIN insufficient number of params")
}

copy(id[:], params[1])
id := *(*nsq.MessageID)(unsafe.Pointer(&params[1][0]))
err := client.Channel.FinishMessage(client, id)
if err != nil {
return nil, nsq.NewClientErr(err, "E_FIN_FAILED",
Expand All @@ -426,8 +428,6 @@ func (p *ProtocolV2) FIN(client *ClientV2, params [][]byte) ([]byte, error) {
}

func (p *ProtocolV2) REQ(client *ClientV2, params [][]byte) ([]byte, error) {
var id nsq.MessageID

state := atomic.LoadInt32(&client.State)
if state != nsq.StateSubscribed && state != nsq.StateClosing {
return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "cannot REQ in current state")
Expand All @@ -437,7 +437,7 @@ func (p *ProtocolV2) REQ(client *ClientV2, params [][]byte) ([]byte, error) {
return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "REQ insufficient number of params")
}

copy(id[:], params[1])
id := *(*nsq.MessageID)(unsafe.Pointer(&params[1][0]))
timeoutMs, err := util.ByteToBase10(params[2])
if err != nil {
return nil, nsq.NewFatalClientErr(err, "E_INVALID",
Expand Down Expand Up @@ -477,7 +477,6 @@ func (p *ProtocolV2) NOP(client *ClientV2, params [][]byte) ([]byte, error) {

func (p *ProtocolV2) PUB(client *ClientV2, params [][]byte) ([]byte, error) {
var err error
var bodyLen int32

if len(params) < 2 {
return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "PUB insufficient number of parameters")
Expand All @@ -489,7 +488,7 @@ func (p *ProtocolV2) PUB(client *ClientV2, params [][]byte) ([]byte, error) {
fmt.Sprintf("PUB topic name '%s' is not valid", topicName))
}

err = binary.Read(client.Reader, binary.BigEndian, &bodyLen)
bodyLen, err := p.readLen(client)
if err != nil {
return nil, nsq.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body size")
}
Expand All @@ -512,14 +511,11 @@ func (p *ProtocolV2) PUB(client *ClientV2, params [][]byte) ([]byte, error) {
return nil, nsq.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error())
}

return []byte("OK"), nil
return okBytes, nil
}

func (p *ProtocolV2) MPUB(client *ClientV2, params [][]byte) ([]byte, error) {
var err error
var bodyLen int32
var numMessages int32
var messageSize int32

if len(params) < 2 {
return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "MPUB insufficient number of parameters")
Expand All @@ -531,7 +527,7 @@ func (p *ProtocolV2) MPUB(client *ClientV2, params [][]byte) ([]byte, error) {
fmt.Sprintf("E_BAD_TOPIC MPUB topic name '%s' is not valid", topicName))
}

err = binary.Read(client.Reader, binary.BigEndian, &bodyLen)
bodyLen, err := p.readLen(client)
if err != nil {
return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "MPUB failed to read body size")
}
Expand All @@ -541,14 +537,14 @@ func (p *ProtocolV2) MPUB(client *ClientV2, params [][]byte) ([]byte, error) {
fmt.Sprintf("MPUB body too big %d > %d", bodyLen, nsqd.options.maxBodySize))
}

err = binary.Read(client.Reader, binary.BigEndian, &numMessages)
numMessages, err := p.readLen(client)
if err != nil {
return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "MPUB failed to read message count")
}

messages := make([]*nsq.Message, 0, numMessages)
for i := int32(0); i < numMessages; i++ {
err = binary.Read(client.Reader, binary.BigEndian, &messageSize)
messageSize, err := p.readLen(client)
if err != nil {
return nil, nsq.NewFatalClientErr(err, "E_BAD_MESSAGE",
fmt.Sprintf("MPUB failed to read message(%d) body size", i))
Expand Down Expand Up @@ -578,12 +574,10 @@ func (p *ProtocolV2) MPUB(client *ClientV2, params [][]byte) ([]byte, error) {
return nil, nsq.NewFatalClientErr(err, "E_MPUB_FAILED", "MPUB failed "+err.Error())
}

return []byte("OK"), nil
return okBytes, nil
}

func (p *ProtocolV2) TOUCH(client *ClientV2, params [][]byte) ([]byte, error) {
var id nsq.MessageID

state := atomic.LoadInt32(&client.State)
if state != nsq.StateSubscribed && state != nsq.StateClosing {
return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "cannot TOUCH in current state")
Expand All @@ -593,7 +587,7 @@ func (p *ProtocolV2) TOUCH(client *ClientV2, params [][]byte) ([]byte, error) {
return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "TOUCH insufficient number of params")
}

copy(id[:], params[1])
id := *(*nsq.MessageID)(unsafe.Pointer(&params[1][0]))
err := client.Channel.TouchMessage(client, id)
if err != nil {
return nil, nsq.NewClientErr(err, "E_TOUCH_FAILED",
Expand All @@ -602,3 +596,12 @@ func (p *ProtocolV2) TOUCH(client *ClientV2, params [][]byte) ([]byte, error) {

return nil, nil
}

func (p *ProtocolV2) readLen(client *ClientV2) (int32, error) {
client.lenSlice = client.lenSlice[0:]
_, err := io.ReadFull(client.Reader, client.lenSlice)
if err != nil {
return 0, err
}
return int32(binary.BigEndian.Uint32(client.lenSlice)), nil
}
Loading