Skip to content

Commit

Permalink
Fix stuck peer (#1752)
Browse files Browse the repository at this point in the history
* Removed OpenedStream/ClosedStream since they were removed from network.Notifee

* Drop connection to peer if the stream is closed
  • Loading branch information
alexsporn authored Sep 15, 2022
1 parent 5b35e2a commit 5405b5d
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 42 deletions.
2 changes: 0 additions & 2 deletions pkg/p2p/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1099,5 +1099,3 @@ func (m *netNotifiee) Disconnected(net network.Network, conn network.Conn) {
}
m.disconnectedChan <- &disconnectmsg{net: net, conn: conn, reason: errors.New("connection closed by libp2p network event")}
}
func (m *netNotifiee) OpenedStream(_ network.Network, _ network.Stream) {}
func (m *netNotifiee) ClosedStream(_ network.Network, _ network.Stream) {}
45 changes: 5 additions & 40 deletions pkg/protocol/gossip/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"

"github.com/iotaledger/hive.go/core/events"
Expand Down Expand Up @@ -176,7 +175,6 @@ type Service struct {
connectedChan chan *connectionmsg
closeStreamChan chan *closestreammsg
disconnectedChan chan *connectionmsg
streamClosedChan chan *streamclosedmsg
relationUpdatedChan chan *relationupdatedmsg
streamReqChan chan *streamreqmsg
forEachChan chan *foreachmsg
Expand Down Expand Up @@ -224,7 +222,6 @@ func NewService(
connectedChan: make(chan *connectionmsg, 10),
closeStreamChan: make(chan *closestreammsg, 10),
disconnectedChan: make(chan *connectionmsg, 10),
streamClosedChan: make(chan *streamclosedmsg, 10),
relationUpdatedChan: make(chan *relationupdatedmsg, 10),
streamReqChan: make(chan *streamreqmsg, 10),
forEachChan: make(chan *foreachmsg, 10),
Expand Down Expand Up @@ -304,17 +301,11 @@ func (s *Service) Start(ctx context.Context) {
s.inboundStreamChan <- stream
})

// manage libp2p network events
s.host.Network().Notify((*netNotifiee)(s))

s.eventLoop(ctx)

// libp2p stream handler
s.host.RemoveStreamHandler(s.protocol)

// de-register libp2p network events
s.host.Network().StopNotify((*netNotifiee)(s))

s.detachEvents()
s.peeringMngWP.Stop()
}
Expand All @@ -338,8 +329,6 @@ drainLoop:

case <-s.disconnectedChan:

case <-s.streamClosedChan:

case <-s.relationUpdatedChan:

case streamReqMsg := <-s.streamReqChan:
Expand Down Expand Up @@ -369,11 +358,6 @@ type streamreqmsg struct {
back chan *Protocol
}

type streamclosedmsg struct {
peerID peer.ID
stream network.Stream
}

type relationupdatedmsg struct {
peer *p2p.Peer
oldRelation p2p.PeerRelation
Expand Down Expand Up @@ -410,11 +394,6 @@ func (s *Service) eventLoop(ctx context.Context) {
s.Events.Error.Trigger(err)
}

case streamClosedMsg := <-s.streamClosedChan:
if err := s.deregisterProtocol(streamClosedMsg.peerID); err != nil && !errors.Is(err, ErrProtocolDoesNotExist) {
s.Events.Error.Trigger(err)
}

case relationUpdatedMsg := <-s.relationUpdatedChan:
s.handleRelationUpdated(ctx, relationUpdatedMsg.peer, relationUpdatedMsg.oldRelation)

Expand Down Expand Up @@ -583,6 +562,11 @@ func (s *Service) deregisterProtocol(peerID peer.ID) error {
return fmt.Errorf("unable to cleanly reset stream to %s: %w", peerID, err)
}

// Drop connection to peer since we no longer have a protocol stream to it
if conn := proto.Stream.Conn(); conn != nil {
return conn.Close()
}

return nil
}

Expand Down Expand Up @@ -729,22 +713,3 @@ func (s *Service) detachEvents() {
s.Events.InboundStreamCanceled.Detach(s.onGossipServiceInboundStreamCanceled)
s.Events.Error.Detach(s.onGossipServiceError)
}

// lets Service implement network.Notifiee in order to automatically
// clean up ongoing reset streams.
type netNotifiee Service

func (m *netNotifiee) Listen(net network.Network, multiaddr multiaddr.Multiaddr) {}
func (m *netNotifiee) ListenClose(net network.Network, multiaddr multiaddr.Multiaddr) {}
func (m *netNotifiee) Connected(net network.Network, conn network.Conn) {}
func (m *netNotifiee) Disconnected(net network.Network, conn network.Conn) {}
func (m *netNotifiee) OpenedStream(net network.Network, stream network.Stream) {}
func (m *netNotifiee) ClosedStream(net network.Network, stream network.Stream) {
if stream.Protocol() != m.protocol {
return
}
if m.stopped.IsSet() {
return
}
m.streamClosedChan <- &streamclosedmsg{peerID: stream.Conn().RemotePeer(), stream: stream}
}

0 comments on commit 5405b5d

Please sign in to comment.