Skip to content

Commit

Permalink
topic: allow unbuffered memoryMsgChan (partial revert of b3b29b7)
Browse files Browse the repository at this point in the history
In b3b29b7 / nsqio#1159 unbuffered
memoryMsgChan were replaced with nil chan for both Topic and Channel
(this only happens if mem-queue-size=0). This inadvertently made
deferred publish never work (with mem-queue-size=0), because some
metadata is lost when messages go through the "backend" disk queue,
but previously messages could go immediately through the topic's
unbuffered chan with their metadata intact, to then be added to each
channel's deferred pqueue. This partial revert brings this part back.

Granted, mem-queue-size=0 never worked very well, in many respects.
For this functionality, at low message publish rates, it seems there
was rarely a problem. But at high message publish rates it was always
likely that some messages would hit the topic disk queue and lose
the deferred time metadata. This potential problem remains.

In fact, the motivation for fully disabling the memoryMsgQueue for
mem-queue-size=0 was to avoid excessively shuffling message order,
which would happen if some messages jump instantly through the
memory-queue while others take the longer way through the disk-queue.
That will be likely again. But the users who noticed this probably
did not use publish-deferred functionality!

Additional fixes would be appropriate, but this is a quick-fix to
restore previous useful behavior for mem-queue-size=0.
  • Loading branch information
ploxiln committed Sep 15, 2021
1 parent 74731b6 commit b3cb741
Showing 1 changed file with 1 addition and 5 deletions.
6 changes: 1 addition & 5 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic
t := &Topic{
name: topicName,
channelMap: make(map[string]*Channel),
memoryMsgChan: nil,
memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize),
startChan: make(chan int, 1),
exitChan: make(chan int),
channelUpdateChan: make(chan int),
Expand All @@ -56,10 +56,6 @@ func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic
deleteCallback: deleteCallback,
idFactory: NewGUIDFactory(nsqd.getOpts().ID),
}
// create mem-queue only if size > 0 (do not use unbuffered chan)
if nsqd.getOpts().MemQueueSize > 0 {
t.memoryMsgChan = make(chan *Message, nsqd.getOpts().MemQueueSize)
}
if strings.HasSuffix(topicName, "#ephemeral") {
t.ephemeral = true
t.backend = newDummyBackendQueue()
Expand Down

0 comments on commit b3cb741

Please sign in to comment.