Skip to content

Commit

Permalink
Merge pull request #176 from mreiferson/optimizations_176
Browse files Browse the repository at this point in the history
nsqd micro-optimizations
  • Loading branch information
jehiah committed Apr 29, 2013
2 parents 1f833d2 + 0849076 commit 64fd8b6
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 56 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,17 @@ DISCLAIMER: Please keep in mind that NSQ is designed to be used in a distributed
node performance is important, but not the end-all-be-all of what we're looking to achieve. Also,
benchmarks are stupid, but here's a few anyway to ignite the flame:

On a 2012 MacBook Air i7 2ghz (`GOMAXPROCS=1`, `go tip 8bbc0bdf832e`) single publisher, single consumer:
On a 2012 MacBook Air i7 2ghz (`GOMAXPROCS=1`, `go 1.1 beta2 4a712e80e9b1`, NSQ v0.2.19-alpha)
single publisher, single consumer:

```
$ ./nsqd --mem-queue-size=1000000
$ ./bench_writer
2013/01/29 10:24:24 duration: 2.60766631s - 73.144mb/s - 383484.649ops/s - 2.608us/op
2013/04/09 23:25:54 duration: 2.46904784s - 77.250mb/s - 405014.429ops/s - 2.469us/op
$ ./bench_reader
2013/01/29 10:25:43 duration: 6.665561082s - 28.615mb/s - 150024.880ops/s - 6.666us/op
2013/04/09 23:27:53 duration: 5.996050461s - 31.810mb/s - 166776.448ops/s - 5.996us/op
```

### Getting Started
Expand Down
15 changes: 12 additions & 3 deletions nsqd/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ import (
type ClientV2 struct {
net.Conn
sync.Mutex
Reader *bufio.Reader
Writer *bufio.Writer

// buffered IO
Reader *bufio.Reader
Writer *bufio.Writer

State int32
ReadyCount int64
LastReadyCount int64
Expand All @@ -32,6 +35,10 @@ type ClientV2 struct {
LongIdentifier string
SubEventChan chan *Channel

// re-usable buffer for reading the 4-byte lengths off the wire
lenBuf [4]byte
lenSlice []byte

// heartbeats are client configurable via IDENTIFY
Heartbeat *time.Ticker
HeartbeatInterval time.Duration
Expand All @@ -44,7 +51,7 @@ func NewClientV2(conn net.Conn) *ClientV2 {
identifier, _, _ = net.SplitHostPort(conn.RemoteAddr().String())
}

return &ClientV2{
c := &ClientV2{
Conn: conn,
// ReadyStateChan has a buffer of 1 to guarantee that in the event
// there is a race the state update is not lost
Expand All @@ -63,6 +70,8 @@ func NewClientV2(conn net.Conn) *ClientV2 {
HeartbeatInterval: nsqd.options.clientTimeout / 2,
HeartbeatUpdateChan: make(chan time.Duration, 1),
}
c.lenSlice = c.lenBuf[:]
return c
}

func (c *ClientV2) String() string {
Expand Down
24 changes: 24 additions & 0 deletions nsqd/guid_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package main

import (
"github.com/bitly/nsq/nsq"
"testing"
"unsafe"
)

func BenchmarkGUIDCopy(b *testing.B) {
source := make([]byte, 16)
var dest nsq.MessageID
for i := 0; i < b.N; i++ {
copy(dest[:], source)
}
}

func BenchmarkGUIDUnsafe(b *testing.B) {
source := make([]byte, 16)
var dest nsq.MessageID
for i := 0; i < b.N; i++ {
dest = *(*nsq.MessageID)(unsafe.Pointer(&source[0]))
}
_ = dest
}
53 changes: 28 additions & 25 deletions nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@ import (
"net"
"sync/atomic"
"time"
"unsafe"
)

const maxTimeout = time.Hour

var separatorBytes = []byte(" ")
var heartbeatBytes = []byte("_heartbeat_")
var okBytes = []byte("OK")

type ProtocolV2 struct {
nsq.Protocol
}
Expand Down Expand Up @@ -51,7 +56,7 @@ func (p *ProtocolV2) IOLoop(conn net.Conn) error {
if len(line) > 0 && line[len(line)-1] == '\r' {
line = line[:len(line)-1]
}
params := bytes.Split(line, []byte(" "))
params := bytes.Split(line, separatorBytes)

if *verbose {
log.Printf("PROTOCOL(V2): [%s] %s", client, params)
Expand Down Expand Up @@ -238,7 +243,7 @@ func (p *ProtocolV2) messagePump(client *ClientV2) {
// you can't update heartbeat anymore
heartbeatUpdateChan = nil
case <-client.Heartbeat.C:
err = p.Send(client, nsq.FrameTypeResponse, []byte("_heartbeat_"))
err = p.Send(client, nsq.FrameTypeResponse, heartbeatBytes)
if err != nil {
log.Printf("PROTOCOL(V2): error sending heartbeat - %s", err.Error())
}
Expand Down Expand Up @@ -276,8 +281,7 @@ func (p *ProtocolV2) IDENTIFY(client *ClientV2, params [][]byte) ([]byte, error)
return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "cannot IDENTIFY in current state")
}

var bodyLen int32
err = binary.Read(client.Reader, binary.BigEndian, &bodyLen)
bodyLen, err := p.readLen(client)
if err != nil {
return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body size")
}
Expand Down Expand Up @@ -312,7 +316,7 @@ func (p *ProtocolV2) IDENTIFY(client *ClientV2, params [][]byte) ([]byte, error)
return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY "+err.Error())
}

resp := []byte("OK")
resp := okBytes
if clientInfo.FeatureNegotiation {
resp, err = json.Marshal(struct {
MaxRdyCount int64 `json:"max_rdy_count"`
Expand Down Expand Up @@ -363,7 +367,7 @@ func (p *ProtocolV2) SUB(client *ClientV2, params [][]byte) ([]byte, error) {
// update message pump
client.SubEventChan <- channel

return []byte("OK"), nil
return okBytes, nil
}

func (p *ProtocolV2) RDY(client *ClientV2, params [][]byte) ([]byte, error) {
Expand Down Expand Up @@ -402,8 +406,6 @@ func (p *ProtocolV2) RDY(client *ClientV2, params [][]byte) ([]byte, error) {
}

func (p *ProtocolV2) FIN(client *ClientV2, params [][]byte) ([]byte, error) {
var id nsq.MessageID

state := atomic.LoadInt32(&client.State)
if state != nsq.StateSubscribed && state != nsq.StateClosing {
return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "cannot FIN in current state")
Expand All @@ -413,7 +415,7 @@ func (p *ProtocolV2) FIN(client *ClientV2, params [][]byte) ([]byte, error) {
return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "FIN insufficient number of params")
}

copy(id[:], params[1])
id := *(*nsq.MessageID)(unsafe.Pointer(&params[1][0]))
err := client.Channel.FinishMessage(client, id)
if err != nil {
return nil, nsq.NewClientErr(err, "E_FIN_FAILED",
Expand All @@ -426,8 +428,6 @@ func (p *ProtocolV2) FIN(client *ClientV2, params [][]byte) ([]byte, error) {
}

func (p *ProtocolV2) REQ(client *ClientV2, params [][]byte) ([]byte, error) {
var id nsq.MessageID

state := atomic.LoadInt32(&client.State)
if state != nsq.StateSubscribed && state != nsq.StateClosing {
return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "cannot REQ in current state")
Expand All @@ -437,7 +437,7 @@ func (p *ProtocolV2) REQ(client *ClientV2, params [][]byte) ([]byte, error) {
return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "REQ insufficient number of params")
}

copy(id[:], params[1])
id := *(*nsq.MessageID)(unsafe.Pointer(&params[1][0]))
timeoutMs, err := util.ByteToBase10(params[2])
if err != nil {
return nil, nsq.NewFatalClientErr(err, "E_INVALID",
Expand Down Expand Up @@ -477,7 +477,6 @@ func (p *ProtocolV2) NOP(client *ClientV2, params [][]byte) ([]byte, error) {

func (p *ProtocolV2) PUB(client *ClientV2, params [][]byte) ([]byte, error) {
var err error
var bodyLen int32

if len(params) < 2 {
return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "PUB insufficient number of parameters")
Expand All @@ -489,7 +488,7 @@ func (p *ProtocolV2) PUB(client *ClientV2, params [][]byte) ([]byte, error) {
fmt.Sprintf("PUB topic name '%s' is not valid", topicName))
}

err = binary.Read(client.Reader, binary.BigEndian, &bodyLen)
bodyLen, err := p.readLen(client)
if err != nil {
return nil, nsq.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body size")
}
Expand All @@ -512,14 +511,11 @@ func (p *ProtocolV2) PUB(client *ClientV2, params [][]byte) ([]byte, error) {
return nil, nsq.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error())
}

return []byte("OK"), nil
return okBytes, nil
}

func (p *ProtocolV2) MPUB(client *ClientV2, params [][]byte) ([]byte, error) {
var err error
var bodyLen int32
var numMessages int32
var messageSize int32

if len(params) < 2 {
return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "MPUB insufficient number of parameters")
Expand All @@ -531,7 +527,7 @@ func (p *ProtocolV2) MPUB(client *ClientV2, params [][]byte) ([]byte, error) {
fmt.Sprintf("E_BAD_TOPIC MPUB topic name '%s' is not valid", topicName))
}

err = binary.Read(client.Reader, binary.BigEndian, &bodyLen)
bodyLen, err := p.readLen(client)
if err != nil {
return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "MPUB failed to read body size")
}
Expand All @@ -541,14 +537,14 @@ func (p *ProtocolV2) MPUB(client *ClientV2, params [][]byte) ([]byte, error) {
fmt.Sprintf("MPUB body too big %d > %d", bodyLen, nsqd.options.maxBodySize))
}

err = binary.Read(client.Reader, binary.BigEndian, &numMessages)
numMessages, err := p.readLen(client)
if err != nil {
return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "MPUB failed to read message count")
}

messages := make([]*nsq.Message, 0, numMessages)
for i := int32(0); i < numMessages; i++ {
err = binary.Read(client.Reader, binary.BigEndian, &messageSize)
messageSize, err := p.readLen(client)
if err != nil {
return nil, nsq.NewFatalClientErr(err, "E_BAD_MESSAGE",
fmt.Sprintf("MPUB failed to read message(%d) body size", i))
Expand Down Expand Up @@ -578,12 +574,10 @@ func (p *ProtocolV2) MPUB(client *ClientV2, params [][]byte) ([]byte, error) {
return nil, nsq.NewFatalClientErr(err, "E_MPUB_FAILED", "MPUB failed "+err.Error())
}

return []byte("OK"), nil
return okBytes, nil
}

func (p *ProtocolV2) TOUCH(client *ClientV2, params [][]byte) ([]byte, error) {
var id nsq.MessageID

state := atomic.LoadInt32(&client.State)
if state != nsq.StateSubscribed && state != nsq.StateClosing {
return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "cannot TOUCH in current state")
Expand All @@ -593,7 +587,7 @@ func (p *ProtocolV2) TOUCH(client *ClientV2, params [][]byte) ([]byte, error) {
return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "TOUCH insufficient number of params")
}

copy(id[:], params[1])
id := *(*nsq.MessageID)(unsafe.Pointer(&params[1][0]))
err := client.Channel.TouchMessage(client, id)
if err != nil {
return nil, nsq.NewClientErr(err, "E_TOUCH_FAILED",
Expand All @@ -602,3 +596,12 @@ func (p *ProtocolV2) TOUCH(client *ClientV2, params [][]byte) ([]byte, error) {

return nil, nil
}

func (p *ProtocolV2) readLen(client *ClientV2) (int32, error) {
client.lenSlice = client.lenSlice[0:]
_, err := io.ReadFull(client.Reader, client.lenSlice)
if err != nil {
return 0, err
}
return int32(binary.BigEndian.Uint32(client.lenSlice)), nil
}
Loading

0 comments on commit 64fd8b6

Please sign in to comment.