From 5bcb654726ef692f9cb5b07b7f0adc8d2f4cafac Mon Sep 17 00:00:00 2001 From: lilinzhe Date: Tue, 2 May 2017 05:09:49 +0000 Subject: [PATCH] nsqd: SUB: fix ephemeral channel race condition with reconnecting client. It's possible for a client reconnecting quickly and subscribed to an ephemeral channel to race with nsqd's cleanup of said ephemeral channel, as documented in nsqio/go-nsq#206. Fixes #883 --- nsqd/protocol_v2.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 02ba0dac3..e08fa89db 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -615,9 +615,20 @@ func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) { return nil, err } - topic := p.ctx.nsqd.GetTopic(topicName) - channel := topic.GetChannel(channelName) - channel.AddClient(client.ID, client) + for { + topic := p.ctx.nsqd.GetTopic(topicName) + if topic.Exiting() { + time.Sleep(1 * time.Microsecond) + continue + } + channel := topic.GetChannel(channelName) + if channel.Exiting() { + time.Sleep(1 * time.Microsecond) + continue + } + channel.AddClient(client.ID, client) + break + } atomic.StoreInt32(&client.State, stateSubscribed) client.Channel = channel