Skip to content

Commit

Permalink
Fix request increment ref bug (#1121)
Browse files Browse the repository at this point in the history
The sendToWriteCh() method would increment the ref of a request by 2
(one for the write and another for the publisher). The ref value was
always decremented by the doWrite() function but the publisher would
decrement it only if there were any subscribers.

With this commit, we increment the request ref for the publisher only if
there are any actual subscribers.
  • Loading branch information
Ibrahim Jarif authored Nov 22, 2019
1 parent fb0cdb8 commit ad770ca
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 4 deletions.
3 changes: 1 addition & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,11 +726,10 @@ func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {
// We can only service one request because we need each txn to be stored in a contigous section.
// Txns should not interleave among other txns or rewrites.
req := requestPool.Get().(*request)
req.reset()
req.Entries = entries
req.Wg = sync.WaitGroup{}
req.Wg.Add(1)
req.IncrRef() // for db write
req.IncrRef() // for publisher updates
db.writeCh <- req // Handled in doWrites.
y.NumPuts.Add(int64(len(entries)))

Expand Down
5 changes: 3 additions & 2 deletions publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (p *publisher) listenForUpdates(c *y.Closer) {
p.cleanSubscribers()
c.Done()
}()
slurp := func(batch []*request) {
slurp := func(batch requests) {
for {
select {
case reqs := <-p.pubCh:
Expand Down Expand Up @@ -150,8 +150,9 @@ func (p *publisher) deleteSubscriber(id uint64) {
delete(p.subscribers, id)
}

func (p *publisher) sendUpdates(reqs []*request) {
func (p *publisher) sendUpdates(reqs requests) {
if p.noOfSubscribers() != 0 {
reqs.IncrRef()
p.pubCh <- reqs
}
}
Expand Down
14 changes: 14 additions & 0 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,14 @@ type request struct {
ref int32
}

func (req *request) reset() {
req.Entries = req.Entries[:0]
req.Ptrs = req.Ptrs[:0]
req.Wg = sync.WaitGroup{}
req.Err = nil
req.ref = 0
}

func (req *request) IncrRef() {
atomic.AddInt32(&req.ref, 1)
}
Expand Down Expand Up @@ -1225,6 +1233,12 @@ func (reqs requests) DecrRef() {
}
}

func (reqs requests) IncrRef() {
for _, req := range reqs {
req.IncrRef()
}
}

// sync function syncs content of latest value log file to disk. Syncing of value log directory is
// not required here as it happens every time a value log file rotation happens(check createVlogFile
// function). During rotation, previous value log file also gets synced to disk. It only syncs file
Expand Down

0 comments on commit ad770ca

Please sign in to comment.