From 81ec639e55786584980b2f0e70aa7437d5b4d890 Mon Sep 17 00:00:00 2001 From: Pierce Lopez Date: Tue, 6 Mar 2018 13:55:49 -0500 Subject: [PATCH 1/2] nsqd: nsqlookupd connect callback simpler, more robust Remove syncTopicChan and do the initial REGISTER commands in the lookupPeer connectCallback(). This should be mostly equivalent, just with less indirection. Close() the lookupPeer in all error cases. It will attempt to re-connect with the next Ping command. --- nsqd/lookup.go | 61 ++++++++++++++++++++++----------------------- nsqd/lookup_peer.go | 9 ++++++- 2 files changed, 38 insertions(+), 32 deletions(-) diff --git a/nsqd/lookup.go b/nsqd/lookup.go index 0e8484a01..37d9f1991 100644 --- a/nsqd/lookup.go +++ b/nsqd/lookup.go @@ -12,7 +12,7 @@ import ( "github.com/nsqio/nsq/internal/version" ) -func connectCallback(n *NSQD, hostname string, syncTopicChan chan *lookupPeer) func(*lookupPeer) { +func connectCallback(n *NSQD, hostname string) func(*lookupPeer) { return func(lp *lookupPeer) { ci := make(map[string]interface{}) ci["version"] = version.Binary @@ -32,10 +32,14 @@ func connectCallback(n *NSQD, hostname string, syncTopicChan chan *lookupPeer) f return } else if bytes.Equal(resp, []byte("E_INVALID")) { n.logf(LOG_INFO, "LOOKUPD(%s): lookupd returned %s", lp, resp) + lp.Close() + return } else { err = json.Unmarshal(resp, &lp.Info) if err != nil { n.logf(LOG_ERROR, "LOOKUPD(%s): parsing response - %s", lp, resp) + lp.Close() + return } else { n.logf(LOG_INFO, "LOOKUPD(%s): peer info %+v", lp, lp.Info) if lp.Info.BroadcastAddress == "" { @@ -44,16 +48,36 @@ func connectCallback(n *NSQD, hostname string, syncTopicChan chan *lookupPeer) f } } - go func() { - syncTopicChan <- lp - }() + // build all the commands first so we exit the lock(s) as fast as possible + var commands []*nsq.Command + n.RLock() + for _, topic := range n.topicMap { + topic.RLock() + if len(topic.channelMap) == 0 { + commands = append(commands, nsq.Register(topic.name, "")) + } else { + for _, channel := range topic.channelMap { + commands = append(commands, nsq.Register(channel.topicName, channel.name)) + } + } + topic.RUnlock() + } + n.RUnlock() + + for _, cmd := range commands { + n.logf(LOG_INFO, "LOOKUPD(%s): %s", lp, cmd) + _, err := lp.Command(cmd) + if err != nil { + n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lp, cmd, err) + return + } + } } } func (n *NSQD) lookupLoop() { var lookupPeers []*lookupPeer var lookupAddrs []string - syncTopicChan := make(chan *lookupPeer) connect := true hostname, err := os.Hostname() @@ -72,7 +96,7 @@ func (n *NSQD) lookupLoop() { } n.logf(LOG_INFO, "LOOKUP(%s): adding peer", host) lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.logf, - connectCallback(n, hostname, syncTopicChan)) + connectCallback(n, hostname)) lookupPeer.Command(nil) // start the connection lookupPeers = append(lookupPeers, lookupPeer) lookupAddrs = append(lookupAddrs, host) @@ -124,31 +148,6 @@ func (n *NSQD) lookupLoop() { n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err) } } - case lookupPeer := <-syncTopicChan: - var commands []*nsq.Command - // build all the commands first so we exit the lock(s) as fast as possible - n.RLock() - for _, topic := range n.topicMap { - topic.RLock() - if len(topic.channelMap) == 0 { - commands = append(commands, nsq.Register(topic.name, "")) - } else { - for _, channel := range topic.channelMap { - commands = append(commands, nsq.Register(channel.topicName, channel.name)) - } - } - topic.RUnlock() - } - n.RUnlock() - - for _, cmd := range commands { - n.logf(LOG_INFO, "LOOKUPD(%s): %s", lookupPeer, cmd) - _, err := lookupPeer.Command(cmd) - if err != nil { - n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err) - break - } - } case <-n.optsNotificationChan: var tmpPeers []*lookupPeer var tmpAddrs []string diff --git a/nsqd/lookup_peer.go b/nsqd/lookup_peer.go index d4aa26f07..867e7f652 100644 --- a/nsqd/lookup_peer.go +++ b/nsqd/lookup_peer.go @@ -98,10 +98,17 @@ func (lp *lookupPeer) Command(cmd *nsq.Command) ([]byte, error) { return nil, err } lp.state = stateConnected - lp.Write(nsq.MagicV1) + _, err = lp.Write(nsq.MagicV1) + if err != nil { + lp.Close() + return nil, err + } if initialState == stateDisconnected { lp.connectCallback(lp) } + if lp.state != stateConnected { + return nil, fmt.Errorf("lookupPeer connectCallback() failed") + } } if cmd == nil { return nil, nil From 859e1d56a3ce24b17f78fbe94615daa20c49b2f4 Mon Sep 17 00:00:00 2001 From: Pierce Lopez Date: Fri, 13 Apr 2018 12:45:48 -0400 Subject: [PATCH 2/2] nsqd_test: longer sleeps for flaky TestReconfigure --- nsqd/nsqd_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nsqd/nsqd_test.go b/nsqd/nsqd_test.go index 4e1774642..150a531b8 100644 --- a/nsqd/nsqd_test.go +++ b/nsqd/nsqd_test.go @@ -368,7 +368,7 @@ func TestReconfigure(t *testing.T) { defer os.RemoveAll(opts.DataPath) defer nsqd.Exit() - time.Sleep(200 * time.Millisecond) + time.Sleep(250 * time.Millisecond) newOpts := *opts newOpts.NSQLookupdTCPAddresses = []string{lookupd1.RealTCPAddr().String()} @@ -376,7 +376,7 @@ func TestReconfigure(t *testing.T) { nsqd.triggerOptsNotification() test.Equal(t, 1, len(nsqd.getOpts().NSQLookupdTCPAddresses)) - time.Sleep(200 * time.Millisecond) + time.Sleep(350 * time.Millisecond) numLookupPeers := len(nsqd.lookupPeers.Load().([]*lookupPeer)) test.Equal(t, 1, numLookupPeers) @@ -387,7 +387,7 @@ func TestReconfigure(t *testing.T) { nsqd.triggerOptsNotification() test.Equal(t, 2, len(nsqd.getOpts().NSQLookupdTCPAddresses)) - time.Sleep(200 * time.Millisecond) + time.Sleep(350 * time.Millisecond) var lookupPeers []string for _, lp := range nsqd.lookupPeers.Load().([]*lookupPeer) {