From 151c8ab53a073d5138f387b5e11c22cf6f4c1518 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Mon, 11 Mar 2013 16:31:05 -0400 Subject: [PATCH 1/5] nsqd: remove locks on topic messagePump() --- nsqd/topic.go | 59 ++++++++++++++++++++++++++++++++------------------- 1 file changed, 37 insertions(+), 22 deletions(-) diff --git a/nsqd/topic.go b/nsqd/topic.go index 646579600..4c7faafca 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -19,6 +19,7 @@ type Topic struct { memoryMsgChan chan *nsq.Message messagePumpStarter *sync.Once exitChan chan int + channelUpdateChan chan int waitGroup util.WaitGroupWrapper exitFlag int32 messageCount uint64 @@ -37,6 +38,7 @@ func NewTopic(topicName string, options *nsqdOptions, notifier Notifier) *Topic notifier: notifier, options: options, exitChan: make(chan int), + channelUpdateChan: make(chan int), messagePumpStarter: new(sync.Once), } @@ -57,12 +59,19 @@ func (t *Topic) Exiting() bool { // for the given Topic func (t *Topic) GetChannel(channelName string) *Channel { t.Lock() - defer t.Unlock() - return t.getOrCreateChannel(channelName) + channel, isNew := t.getOrCreateChannel(channelName) + t.Unlock() + if isNew { + select { + case t.channelUpdateChan <- 1: + case <-t.exitChan: + } + } + return channel } // this expects the caller to handle locking -func (t *Topic) getOrCreateChannel(channelName string) *Channel { +func (t *Topic) getOrCreateChannel(channelName string) (*Channel, bool) { channel, ok := t.channelMap[channelName] if !ok { deleteCallback := func(c *Channel) { @@ -73,8 +82,9 @@ func (t *Topic) getOrCreateChannel(channelName string) *Channel { log.Printf("TOPIC(%s): new channel(%s)", t.name, channel.name) // start the topic message pump lazily using a `once` on the first channel creation t.messagePumpStarter.Do(func() { t.waitGroup.Wrap(func() { t.messagePump() }) }) + return channel, true } - return channel + return channel, false } func (t *Topic) GetExistingChannel(channelName string) (*Channel, error) { @@ -105,6 +115,11 @@ func (t *Topic) DeleteExistingChannel(channelName string) error { // (so that we dont leave any messages around) channel.Delete() + select { + case t.channelUpdateChan <- 1: + case <-t.exitChan: + } + return nil } @@ -143,15 +158,15 @@ func (t *Topic) messagePump() { var msg *nsq.Message var buf []byte var err error + var chans []*Channel - for { - // do an extra check for exit before we select on all the memory/backend/exitChan - // this solves the case where we are closed and something else is writing into - // backend. we don't want to reverse that - if atomic.LoadInt32(&t.exitFlag) == 1 { - goto exit - } + t.RLock() + for _, c := range t.channelMap { + chans = append(chans, c) + } + t.RUnlock() + for { select { case msg = <-t.memoryMsgChan: case buf = <-t.backend.ReadChan(): @@ -160,27 +175,28 @@ func (t *Topic) messagePump() { log.Printf("ERROR: failed to decode message - %s", err.Error()) continue } + case <-t.channelUpdateChan: + chans = chans[:0] + t.RLock() + for _, c := range t.channelMap { + chans = append(chans, c) + } + t.RUnlock() + continue case <-t.exitChan: goto exit } - t.RLock() // check if all the channels have been deleted - if len(t.channelMap) == 0 { + if len(chans) == 0 { // put this message back on the queue - // we need to background because we currently hold the lock - go func() { - t.PutMessage(msg) - }() - + t.PutMessage(msg) // reset the sync.Once t.messagePumpStarter = new(sync.Once) - - t.RUnlock() goto exit } - for _, channel := range t.channelMap { + for _, channel := range chans { // copy the message because each channel // needs a unique instance chanMsg := nsq.NewMessage(msg.Id, msg.Body) @@ -190,7 +206,6 @@ func (t *Topic) messagePump() { log.Printf("TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s", t.name, msg.Id, channel.name, err.Error()) } } - t.RUnlock() } exit: From 2de7909340dac8fe0e81b5d18da37cb747d31d75 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Thu, 14 Mar 2013 09:10:22 -0400 Subject: [PATCH 2/5] nsqd: dont dupe msg for the first channel --- nsqd/topic.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/nsqd/topic.go b/nsqd/topic.go index 4c7faafca..79bb2203c 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -196,11 +196,16 @@ func (t *Topic) messagePump() { goto exit } - for _, channel := range chans { + for i, channel := range chans { + chanMsg := msg // copy the message because each channel - // needs a unique instance - chanMsg := nsq.NewMessage(msg.Id, msg.Body) - chanMsg.Timestamp = msg.Timestamp + // needs a unique instance but... + // fastpath to avoid copy if its the first channel + // (the topic already created the first copy) + if i > 0 { + chanMsg = nsq.NewMessage(msg.Id, msg.Body) + chanMsg.Timestamp = msg.Timestamp + } err := channel.PutMessage(chanMsg) if err != nil { log.Printf("TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s", t.name, msg.Id, channel.name, err.Error()) From 9dde14fee611d6467166424aafe4f3235e38a9cf Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Sat, 30 Mar 2013 13:40:00 -0400 Subject: [PATCH 3/5] nsqd: reduce garbage shown in escape analysis --- nsqd/client_v2.go | 15 ++++++++++++--- nsqd/protocol_v2.go | 40 ++++++++++++++++++++++++---------------- 2 files changed, 36 insertions(+), 19 deletions(-) diff --git a/nsqd/client_v2.go b/nsqd/client_v2.go index ce2f14d44..3abc91d85 100644 --- a/nsqd/client_v2.go +++ b/nsqd/client_v2.go @@ -15,8 +15,11 @@ import ( type ClientV2 struct { net.Conn sync.Mutex - Reader *bufio.Reader - Writer *bufio.Writer + + // buffered IO + Reader *bufio.Reader + Writer *bufio.Writer + State int32 ReadyCount int64 LastReadyCount int64 @@ -32,6 +35,10 @@ type ClientV2 struct { LongIdentifier string SubEventChan chan *Channel + // re-usable buffer for reading the 4-byte lengths off the wire + lenBuf [4]byte + lenSlice []byte + // heartbeats are client configurable via IDENTIFY Heartbeat *time.Ticker HeartbeatInterval time.Duration @@ -44,7 +51,7 @@ func NewClientV2(conn net.Conn) *ClientV2 { identifier, _, _ = net.SplitHostPort(conn.RemoteAddr().String()) } - return &ClientV2{ + c := &ClientV2{ Conn: conn, // ReadyStateChan has a buffer of 1 to guarantee that in the event // there is a race the state update is not lost @@ -63,6 +70,8 @@ func NewClientV2(conn net.Conn) *ClientV2 { HeartbeatInterval: nsqd.options.clientTimeout / 2, HeartbeatUpdateChan: make(chan time.Duration, 1), } + c.lenSlice = c.lenBuf[:] + return c } func (c *ClientV2) String() string { diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index c767ee5a5..ef03205bf 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -16,6 +16,10 @@ import ( const maxTimeout = time.Hour +var separatorBytes = []byte(" ") +var heartbeatBytes = []byte("_heartbeat_") +var okBytes = []byte("OK") + type ProtocolV2 struct { nsq.Protocol } @@ -51,7 +55,7 @@ func (p *ProtocolV2) IOLoop(conn net.Conn) error { if len(line) > 0 && line[len(line)-1] == '\r' { line = line[:len(line)-1] } - params := bytes.Split(line, []byte(" ")) + params := bytes.Split(line, separatorBytes) if *verbose { log.Printf("PROTOCOL(V2): [%s] %s", client, params) @@ -238,7 +242,7 @@ func (p *ProtocolV2) messagePump(client *ClientV2) { // you can't update heartbeat anymore heartbeatUpdateChan = nil case <-client.Heartbeat.C: - err = p.Send(client, nsq.FrameTypeResponse, []byte("_heartbeat_")) + err = p.Send(client, nsq.FrameTypeResponse, heartbeatBytes) if err != nil { log.Printf("PROTOCOL(V2): error sending heartbeat - %s", err.Error()) } @@ -276,8 +280,7 @@ func (p *ProtocolV2) IDENTIFY(client *ClientV2, params [][]byte) ([]byte, error) return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "cannot IDENTIFY in current state") } - var bodyLen int32 - err = binary.Read(client.Reader, binary.BigEndian, &bodyLen) + bodyLen, err := p.readLen(client) if err != nil { return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body size") } @@ -312,7 +315,7 @@ func (p *ProtocolV2) IDENTIFY(client *ClientV2, params [][]byte) ([]byte, error) return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY "+err.Error()) } - resp := []byte("OK") + resp := okBytes if clientInfo.FeatureNegotiation { resp, err = json.Marshal(struct { MaxRdyCount int64 `json:"max_rdy_count"` @@ -363,7 +366,7 @@ func (p *ProtocolV2) SUB(client *ClientV2, params [][]byte) ([]byte, error) { // update message pump client.SubEventChan <- channel - return []byte("OK"), nil + return okBytes, nil } func (p *ProtocolV2) RDY(client *ClientV2, params [][]byte) ([]byte, error) { @@ -477,7 +480,6 @@ func (p *ProtocolV2) NOP(client *ClientV2, params [][]byte) ([]byte, error) { func (p *ProtocolV2) PUB(client *ClientV2, params [][]byte) ([]byte, error) { var err error - var bodyLen int32 if len(params) < 2 { return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "PUB insufficient number of parameters") @@ -489,7 +491,7 @@ func (p *ProtocolV2) PUB(client *ClientV2, params [][]byte) ([]byte, error) { fmt.Sprintf("PUB topic name '%s' is not valid", topicName)) } - err = binary.Read(client.Reader, binary.BigEndian, &bodyLen) + bodyLen, err := p.readLen(client) if err != nil { return nil, nsq.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body size") } @@ -512,14 +514,11 @@ func (p *ProtocolV2) PUB(client *ClientV2, params [][]byte) ([]byte, error) { return nil, nsq.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error()) } - return []byte("OK"), nil + return okBytes, nil } func (p *ProtocolV2) MPUB(client *ClientV2, params [][]byte) ([]byte, error) { var err error - var bodyLen int32 - var numMessages int32 - var messageSize int32 if len(params) < 2 { return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "MPUB insufficient number of parameters") @@ -531,7 +530,7 @@ func (p *ProtocolV2) MPUB(client *ClientV2, params [][]byte) ([]byte, error) { fmt.Sprintf("E_BAD_TOPIC MPUB topic name '%s' is not valid", topicName)) } - err = binary.Read(client.Reader, binary.BigEndian, &bodyLen) + bodyLen, err := p.readLen(client) if err != nil { return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "MPUB failed to read body size") } @@ -541,14 +540,14 @@ func (p *ProtocolV2) MPUB(client *ClientV2, params [][]byte) ([]byte, error) { fmt.Sprintf("MPUB body too big %d > %d", bodyLen, nsqd.options.maxBodySize)) } - err = binary.Read(client.Reader, binary.BigEndian, &numMessages) + numMessages, err := p.readLen(client) if err != nil { return nil, nsq.NewFatalClientErr(err, "E_BAD_BODY", "MPUB failed to read message count") } messages := make([]*nsq.Message, 0, numMessages) for i := int32(0); i < numMessages; i++ { - err = binary.Read(client.Reader, binary.BigEndian, &messageSize) + messageSize, err := p.readLen(client) if err != nil { return nil, nsq.NewFatalClientErr(err, "E_BAD_MESSAGE", fmt.Sprintf("MPUB failed to read message(%d) body size", i)) @@ -578,7 +577,7 @@ func (p *ProtocolV2) MPUB(client *ClientV2, params [][]byte) ([]byte, error) { return nil, nsq.NewFatalClientErr(err, "E_MPUB_FAILED", "MPUB failed "+err.Error()) } - return []byte("OK"), nil + return okBytes, nil } func (p *ProtocolV2) TOUCH(client *ClientV2, params [][]byte) ([]byte, error) { @@ -602,3 +601,12 @@ func (p *ProtocolV2) TOUCH(client *ClientV2, params [][]byte) ([]byte, error) { return nil, nil } + +func (p *ProtocolV2) readLen(client *ClientV2) (int32, error) { + client.lenSlice = client.lenSlice[0:] + _, err := io.ReadFull(client.Reader, client.lenSlice) + if err != nil { + return 0, err + } + return int32(binary.BigEndian.Uint32(client.lenSlice)), nil +} From e5f448cf453490111fe6afae8aaffaaaeaa08ac4 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Sun, 31 Mar 2013 11:42:56 -0400 Subject: [PATCH 4/5] nsqd: no copy message ids --- nsqd/protocol_v2.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index ef03205bf..ccf0f7223 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -12,6 +12,7 @@ import ( "net" "sync/atomic" "time" + "unsafe" ) const maxTimeout = time.Hour @@ -405,8 +406,6 @@ func (p *ProtocolV2) RDY(client *ClientV2, params [][]byte) ([]byte, error) { } func (p *ProtocolV2) FIN(client *ClientV2, params [][]byte) ([]byte, error) { - var id nsq.MessageID - state := atomic.LoadInt32(&client.State) if state != nsq.StateSubscribed && state != nsq.StateClosing { return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "cannot FIN in current state") @@ -416,7 +415,7 @@ func (p *ProtocolV2) FIN(client *ClientV2, params [][]byte) ([]byte, error) { return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "FIN insufficient number of params") } - copy(id[:], params[1]) + id := *(*nsq.MessageID)(unsafe.Pointer(¶ms[1][0])) err := client.Channel.FinishMessage(client, id) if err != nil { return nil, nsq.NewClientErr(err, "E_FIN_FAILED", @@ -429,8 +428,6 @@ func (p *ProtocolV2) FIN(client *ClientV2, params [][]byte) ([]byte, error) { } func (p *ProtocolV2) REQ(client *ClientV2, params [][]byte) ([]byte, error) { - var id nsq.MessageID - state := atomic.LoadInt32(&client.State) if state != nsq.StateSubscribed && state != nsq.StateClosing { return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "cannot REQ in current state") @@ -440,7 +437,7 @@ func (p *ProtocolV2) REQ(client *ClientV2, params [][]byte) ([]byte, error) { return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "REQ insufficient number of params") } - copy(id[:], params[1]) + id := *(*nsq.MessageID)(unsafe.Pointer(¶ms[1][0])) timeoutMs, err := util.ByteToBase10(params[2]) if err != nil { return nil, nsq.NewFatalClientErr(err, "E_INVALID", @@ -581,8 +578,6 @@ func (p *ProtocolV2) MPUB(client *ClientV2, params [][]byte) ([]byte, error) { } func (p *ProtocolV2) TOUCH(client *ClientV2, params [][]byte) ([]byte, error) { - var id nsq.MessageID - state := atomic.LoadInt32(&client.State) if state != nsq.StateSubscribed && state != nsq.StateClosing { return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "cannot TOUCH in current state") @@ -592,7 +587,7 @@ func (p *ProtocolV2) TOUCH(client *ClientV2, params [][]byte) ([]byte, error) { return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "TOUCH insufficient number of params") } - copy(id[:], params[1]) + id := *(*nsq.MessageID)(unsafe.Pointer(¶ms[1][0])) err := client.Channel.TouchMessage(client, id) if err != nil { return nil, nsq.NewClientErr(err, "E_TOUCH_FAILED", From 084907654f14b024eae712880be987c2d6fbcb80 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Wed, 10 Apr 2013 11:27:00 -0400 Subject: [PATCH 5/5] update README benchmarks; add some GUID benchmarks --- README.md | 7 ++++--- nsqd/guid_test.go | 24 ++++++++++++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) create mode 100644 nsqd/guid_test.go diff --git a/README.md b/README.md index 3f731853f..3dbd4446e 100644 --- a/README.md +++ b/README.md @@ -80,16 +80,17 @@ DISCLAIMER: Please keep in mind that NSQ is designed to be used in a distributed node performance is important, but not the end-all-be-all of what we're looking to achieve. Also, benchmarks are stupid, but here's a few anyway to ignite the flame: -On a 2012 MacBook Air i7 2ghz (`GOMAXPROCS=1`, `go tip 8bbc0bdf832e`) single publisher, single consumer: +On a 2012 MacBook Air i7 2ghz (`GOMAXPROCS=1`, `go 1.1 beta2 4a712e80e9b1`, NSQ v0.2.19-alpha) +single publisher, single consumer: ``` $ ./nsqd --mem-queue-size=1000000 $ ./bench_writer -2013/01/29 10:24:24 duration: 2.60766631s - 73.144mb/s - 383484.649ops/s - 2.608us/op +2013/04/09 23:25:54 duration: 2.46904784s - 77.250mb/s - 405014.429ops/s - 2.469us/op $ ./bench_reader -2013/01/29 10:25:43 duration: 6.665561082s - 28.615mb/s - 150024.880ops/s - 6.666us/op +2013/04/09 23:27:53 duration: 5.996050461s - 31.810mb/s - 166776.448ops/s - 5.996us/op ``` ### Getting Started diff --git a/nsqd/guid_test.go b/nsqd/guid_test.go new file mode 100644 index 000000000..6b4196013 --- /dev/null +++ b/nsqd/guid_test.go @@ -0,0 +1,24 @@ +package main + +import ( + "github.com/bitly/nsq/nsq" + "testing" + "unsafe" +) + +func BenchmarkGUIDCopy(b *testing.B) { + source := make([]byte, 16) + var dest nsq.MessageID + for i := 0; i < b.N; i++ { + copy(dest[:], source) + } +} + +func BenchmarkGUIDUnsafe(b *testing.B) { + source := make([]byte, 16) + var dest nsq.MessageID + for i := 0; i < b.N; i++ { + dest = *(*nsq.MessageID)(unsafe.Pointer(&source[0])) + } + _ = dest +}