From b3b29b7eff414893bab2b48b590e1b9f847736a6 Mon Sep 17 00:00:00 2001 From: bitpeng Date: Wed, 5 Jun 2019 11:40:14 +0800 Subject: [PATCH] nsqd: set memoryMsgChan as nil when -mem-queue-size=0 do not use unbuffered chan for in-memory queue when size=0 because user intended all messages to go through backend queue (disk) --- nsqd/channel.go | 6 +++++- nsqd/topic.go | 7 +++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/nsqd/channel.go b/nsqd/channel.go index ad9c66ce9..606387267 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -77,11 +77,15 @@ func NewChannel(topicName string, channelName string, ctx *context, c := &Channel{ topicName: topicName, name: channelName, - memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize), + memoryMsgChan: nil, clients: make(map[int64]Consumer), deleteCallback: deleteCallback, ctx: ctx, } + // create mem-queue only if size > 0 (do not use unbuffered chan) + if ctx.nsqd.getOpts().MemQueueSize > 0 { + c.memoryMsgChan = make(chan *Message, ctx.nsqd.getOpts().MemQueueSize) + } if len(ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 { c.e2eProcessingLatencyStream = quantile.New( ctx.nsqd.getOpts().E2EProcessingLatencyWindowTime, diff --git a/nsqd/topic.go b/nsqd/topic.go index c9884fe22..cd2b83065 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -47,7 +47,7 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi t := &Topic{ name: topicName, channelMap: make(map[string]*Channel), - memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize), + memoryMsgChan: nil, startChan: make(chan int, 1), exitChan: make(chan int), channelUpdateChan: make(chan int), @@ -57,7 +57,10 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi deleteCallback: deleteCallback, idFactory: NewGUIDFactory(ctx.nsqd.getOpts().ID), } - + // create mem-queue only if size > 0 (do not use unbuffered chan) + if ctx.nsqd.getOpts().MemQueueSize > 0 { + t.memoryMsgChan = make(chan *Message, ctx.nsqd.getOpts().MemQueueSize) + } if strings.HasSuffix(topicName, "#ephemeral") { t.ephemeral = true t.backend = newDummyBackendQueue()