Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock on notifications #242

Merged
merged 2 commits into from
Oct 7, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 32 additions & 11 deletions notifications/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@ type cmd struct {

// publisher is a publisher of events for
type publisher struct {
lk sync.RWMutex
closed chan struct{}
cmdChan chan cmd
lk sync.RWMutex
closed chan struct{}
cmds []cmd
cmdsLk *sync.Cond
}

// NewPublisher returns a new message event publisher
func NewPublisher() Publisher {
ps := &publisher{
cmdChan: make(chan cmd),
closed: make(chan struct{}),
cmdsLk: sync.NewCond(&sync.Mutex{}),
closed: make(chan struct{}),
}
return ps
}
Expand All @@ -49,7 +50,7 @@ func (ps *publisher) Publish(topic Topic, event Event) {
default:
}

ps.cmdChan <- cmd{op: pub, topics: []Topic{topic}, msg: event}
ps.queue(cmd{op: pub, topics: []Topic{topic}, msg: event})
}

// Shutdown shuts down all events and subscriptions
Expand All @@ -62,7 +63,7 @@ func (ps *publisher) Shutdown() {
default:
}
close(ps.closed)
ps.cmdChan <- cmd{op: shutdown}
ps.queue(cmd{op: shutdown})
}

func (ps *publisher) Close(id Topic) {
Expand All @@ -73,7 +74,7 @@ func (ps *publisher) Close(id Topic) {
return
default:
}
ps.cmdChan <- cmd{op: closeTopic, topics: []Topic{id}}
ps.queue(cmd{op: closeTopic, topics: []Topic{id}})
}

func (ps *publisher) Subscribe(topic Topic, sub Subscriber) bool {
Expand All @@ -86,7 +87,7 @@ func (ps *publisher) Subscribe(topic Topic, sub Subscriber) bool {
default:
}

ps.cmdChan <- cmd{op: subscribe, topics: []Topic{topic}, sub: sub}
ps.queue(cmd{op: subscribe, topics: []Topic{topic}, sub: sub})
return true
}

Expand All @@ -100,7 +101,7 @@ func (ps *publisher) Unsubscribe(sub Subscriber) bool {
default:
}

ps.cmdChan <- cmd{op: unsubAll, sub: sub}
ps.queue(cmd{op: unsubAll, sub: sub})
return true
}

Expand All @@ -111,7 +112,8 @@ func (ps *publisher) start() {
}

loop:
for cmd := range ps.cmdChan {
for {
cmd := ps.dequeue()
if cmd.topics == nil {
switch cmd.op {
case unsubAll:
Expand Down Expand Up @@ -202,3 +204,22 @@ func (reg *subscriberRegistry) remove(topic Topic, sub Subscriber) {

sub.OnClose(topic)
}

func (ps *publisher) queue(cmd cmd) {
ps.cmdsLk.L.Lock()
ps.cmds = append(ps.cmds, cmd)
ps.cmdsLk.L.Unlock()
ps.cmdsLk.Signal()
}

func (ps *publisher) dequeue() cmd {
ps.cmdsLk.L.Lock()
for len(ps.cmds) == 0 {
ps.cmdsLk.Wait()
}

cmd := ps.cmds[0]
ps.cmds = ps.cmds[1:]
ps.cmdsLk.L.Unlock()
return cmd
}
5 changes: 5 additions & 0 deletions responsemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,11 @@ func (rm *ResponseManager) FinishTask(task *peertask.Task, err error) {
rm.send(&finishTaskRequest{task, err}, nil)
}

// CloseWithNetworkError closes a request due to a network error
func (rm *ResponseManager) CloseWithNetworkError(p peer.ID, requestID graphsync.RequestID) {
rm.send(&errorRequestMessage{p, requestID, errNetworkError, make(chan error, 1)}, nil)
}

func (rm *ResponseManager) send(message responseManagerMessage, done <-chan struct{}) {
select {
case <-rm.ctx.Done():
Expand Down
3 changes: 1 addition & 2 deletions responsemanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,7 @@ func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSync
sub := notifications.NewTopicDataSubscriber(&subscriber{
p: key.p,
request: request,
ctx: rm.ctx,
messages: rm.messages,
requestCloser: rm,
blockSentListeners: rm.blockSentListeners,
completedListeners: rm.completedListeners,
networkErrorListeners: rm.networkErrorListeners,
Expand Down
18 changes: 7 additions & 11 deletions responsemanager/subscriber.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package responsemanager

import (
"context"
"errors"

"github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -15,11 +14,14 @@ import (

var errNetworkError = errors.New("network error")

type RequestCloser interface {
CloseWithNetworkError(p peer.ID, requestID graphsync.RequestID)
}

type subscriber struct {
p peer.ID
request gsmsg.GraphSyncRequest
ctx context.Context
messages chan responseManagerMessage
requestCloser RequestCloser
blockSentListeners BlockSentListeners
networkErrorListeners NetworkErrorListeners
completedListeners CompletedListeners
Expand All @@ -36,10 +38,7 @@ func (s *subscriber) OnNext(topic notifications.Topic, event notifications.Event
switch responseEvent.Name {
case messagequeue.Error:
s.networkErrorListeners.NotifyNetworkErrorListeners(s.p, s.request, responseEvent.Err)
select {
case s.messages <- &errorRequestMessage{s.p, s.request.ID(), errNetworkError, make(chan error, 1)}:
case <-s.ctx.Done():
}
s.requestCloser.CloseWithNetworkError(s.p, s.request.ID())
case messagequeue.Sent:
s.blockSentListeners.NotifyBlockSentListeners(s.p, s.request, blockData)
}
Expand All @@ -51,10 +50,7 @@ func (s *subscriber) OnNext(topic notifications.Topic, event notifications.Event
switch responseEvent.Name {
case messagequeue.Error:
s.networkErrorListeners.NotifyNetworkErrorListeners(s.p, s.request, responseEvent.Err)
select {
case s.messages <- &errorRequestMessage{s.p, s.request.ID(), errNetworkError, make(chan error, 1)}:
case <-s.ctx.Done():
}
s.requestCloser.CloseWithNetworkError(s.p, s.request.ID())
case messagequeue.Sent:
s.completedListeners.NotifyCompletedListeners(s.p, s.request, status)
}
Expand Down