From 1e1720f25e09cbcc3385dd5e0475f4c1b2649f9e Mon Sep 17 00:00:00 2001 From: Pierce Lopez Date: Mon, 8 May 2017 14:13:04 -0400 Subject: [PATCH] nsqd: retry sub to ephemeral topic/channel which is Exiting quick hacky fix, instead of "proper" locking --- nsqd/protocol_v2.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 02ba0dac3..e44162f78 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -615,10 +615,22 @@ func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) { return nil, err } - topic := p.ctx.nsqd.GetTopic(topicName) - channel := topic.GetChannel(channelName) - channel.AddClient(client.ID, client) + // This retry-loop is a work-around for a race condition, where the + // last client can leave the channel between GetChannel() and AddClient(). + // Avoid adding a client to an ephemeral channel / topic which has started exiting. + var channel *Channel + for { + topic := p.ctx.nsqd.GetTopic(topicName) + channel = topic.GetChannel(channelName) + channel.AddClient(client.ID, client) + if (channel.ephemeral && channel.Exiting()) || (topic.ephemeral && topic.Exiting()) { + channel.RemoveClient(client.ID) + time.Sleep(1 * time.Millisecond) + continue + } + break + } atomic.StoreInt32(&client.State, stateSubscribed) client.Channel = channel // update message pump