diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 02ba0dac3..e44162f78 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -615,10 +615,22 @@ func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) { return nil, err } - topic := p.ctx.nsqd.GetTopic(topicName) - channel := topic.GetChannel(channelName) - channel.AddClient(client.ID, client) + // This retry-loop is a work-around for a race condition, where the + // last client can leave the channel between GetChannel() and AddClient(). + // Avoid adding a client to an ephemeral channel / topic which has started exiting. + var channel *Channel + for { + topic := p.ctx.nsqd.GetTopic(topicName) + channel = topic.GetChannel(channelName) + channel.AddClient(client.ID, client) + if (channel.ephemeral && channel.Exiting()) || (topic.ephemeral && topic.Exiting()) { + channel.RemoveClient(client.ID) + time.Sleep(1 * time.Millisecond) + continue + } + break + } atomic.StoreInt32(&client.State, stateSubscribed) client.Channel = channel // update message pump