Skip to content

Commit

Permalink
Update go-libp2p to 0.12.0
Browse files Browse the repository at this point in the history
go-libp2p 0.12.0 made some significant changes to the stream interfaces around
stream closing:

* Close now closes in both directions and frees the stream. However, unlike
FullClose did, it doesn't _wait_ for the remote peer to respond with an EOF.
* To close for writing, call CloseWrite (like one would on a TCP connection, etc.).

This patch:

* Replaces calls to FullClose with Close where appropriate.
* Replaces calls to Close with CloseWrite where appropriate.
* Removes redundant Close calls.
* Calls Reset to where appropriate to indicate that the request/response was
  aborted. Unlike Close, this will not flush and will not cause the remote peer
  to read an EOF. Instead, the remote peer will read an ErrReset error.
* Ensures we always either close or reset streams. Send wasn't closing the
  stream on some error paths.
* Now that stream closing is async, we explicitly wait for a response when
  "hanging up" on a peer (so we don't hang up before they receive our
  response/goodbye message).
  • Loading branch information
Stebalien committed Dec 2, 2020
1 parent 3bd5e58 commit 9b29b8a
Show file tree
Hide file tree
Showing 14 changed files with 118 additions and 168 deletions.
18 changes: 10 additions & 8 deletions beacon-chain/p2p/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

// Send a message to a specific peer. The returned stream may be used for reading, but has been
// closed for writing.
//
// When done, the caller must Close or Reset on the stream.
func (s *Service) Send(ctx context.Context, message interface{}, baseTopic string, pid peer.ID) (network.Stream, error) {
ctx, span := trace.StartSpan(ctx, "p2p.Send")
defer span.End()
Expand All @@ -31,18 +33,18 @@ func (s *Service) Send(ctx context.Context, message interface{}, baseTopic strin
return nil, err
}
// do not encode anything if we are sending a metadata request
if baseTopic == RPCMetaDataTopic {
return stream, nil
}

if _, err := s.Encoding().EncodeWithMaxLength(stream, message); err != nil {
traceutil.AnnotateError(span, err)
return nil, err
if baseTopic != RPCMetaDataTopic {
if _, err := s.Encoding().EncodeWithMaxLength(stream, message); err != nil {
traceutil.AnnotateError(span, err)
stream.Reset()
return nil, err
}
}

// Close stream for writing.
if err := stream.Close(); err != nil {
if err := stream.CloseWrite(); err != nil {
traceutil.AnnotateError(span, err)
stream.Reset()
return nil, err
}

Expand Down
5 changes: 4 additions & 1 deletion beacon-chain/p2p/testing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (p *TestP2P) ReceiveRPC(topic string, msg proto.Message) {

n, err := p.Encoding().EncodeWithMaxLength(s, msg)
if err != nil {
s.Reset()
p.t.Fatalf("Failed to encode message: %v", err)
}

Expand Down Expand Up @@ -287,12 +288,14 @@ func (p *TestP2P) Send(ctx context.Context, msg interface{}, topic string, pid p

if topic != "/eth2/beacon_chain/req/metadata/1" {
if _, err := p.Encoding().EncodeWithMaxLength(stream, msg); err != nil {
stream.Reset()
return nil, err
}
}

// Close stream for writing.
if err := stream.Close(); err != nil {
if err := stream.CloseWrite(); err != nil {
stream.Reset()
return nil, err
}
// Delay returning the stream for testing purposes
Expand Down
22 changes: 18 additions & 4 deletions beacon-chain/sync/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"

libp2pcore "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/network"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
Expand Down Expand Up @@ -56,6 +55,9 @@ func writeErrorResponseToStream(responseCode byte, reason string, stream libp2pc
log.WithError(err).Debug("Could not generate a response error")
} else if _, err := stream.Write(resp); err != nil {
log.WithError(err).Debugf("Could not write to stream")
} else {
// If sending the error message succeeded, close to send an EOF.
stream.Close()
}
}

Expand Down Expand Up @@ -91,11 +93,23 @@ func readStatusCodeNoDeadline(stream network.Stream, encoding encoder.NetworkEnc

// only returns true for errors that are valid (no resets or expectedEOF errors).
func isValidStreamError(err error) bool {
return err != nil && !errors.Is(err, mux.ErrReset) && !errors.Is(err, helpers.ErrExpectedEOF)
return err != nil && !errors.Is(err, mux.ErrReset)
}

func closeStream(stream network.Stream, log *logrus.Entry) {
if err := helpers.FullClose(stream); err != nil && err.Error() != mux.ErrReset.Error() {
log.WithError(err).Debug("Could not reset stream")
if err := stream.Close(); isValidStreamError(err) {
log.WithError(err).Debugf("Could not reset stream with protocol %s", stream.Protocol())
}
}

func closeStreamAndWait(stream network.Stream, log *logrus.Entry) {
if err := stream.CloseWrite(); err != nil {
stream.Reset()
if isValidStreamError(err) {
log.WithError(err).Debugf("Could not reset stream with protocol %s", stream.Protocol())
}
return
}
_, _ = stream.Read([]byte{0})
stream.Close()
}
8 changes: 2 additions & 6 deletions beacon-chain/sync/initial-sync/initial_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,7 @@ func connectPeerHavingBlocks(
p := p2pt.NewTestP2P(t)

p.SetStreamHandler("/eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy", func(stream network.Stream) {
defer func() {
assert.NoError(t, stream.Close())
}()
defer stream.Close()

req := &p2ppb.BeaconBlocksByRangeRequest{}
assert.NoError(t, p.Encoding().DecodeWithMaxLength(stream, req))
Expand All @@ -292,9 +290,7 @@ func connectPeerHavingBlocks(
})

p.SetStreamHandler("/eth2/beacon_chain/req/beacon_blocks_by_root/1/ssz_snappy", func(stream network.Stream) {
defer func() {
assert.NoError(t, stream.Close())
}()
defer stream.Close()

req := new(p2pTypes.BeaconBlockByRootsReq)
assert.NoError(t, p.Encoding().DecodeWithMaxLength(stream, req))
Expand Down
13 changes: 8 additions & 5 deletions beacon-chain/sync/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,21 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
s.p2p.SetStreamHandler(topic, func(stream network.Stream) {
ctx, cancel := context.WithTimeout(s.ctx, ttfbTimeout)
defer cancel()
defer func() {
closeStream(stream, log)
}()

// Resetting after closing is a no-op so defer a reset in case something goes wrong.
// It's up to the handler to Close the stream (send an EOF) if
// it successfully writes a response. We don't blindly call
// Close here because we may have only written a partial
// response.
defer stream.Reset()

ctx, span := trace.StartSpan(ctx, "sync.rpc")
defer span.End()
span.AddAttributes(trace.StringAttribute("topic", topic))
span.AddAttributes(trace.StringAttribute("peer", stream.Conn().RemotePeer().Pretty()))
log := log.WithField("peer", stream.Conn().RemotePeer().Pretty())
// Check before hand that peer is valid.
if s.p2p.Peers().IsBad(stream.Conn().RemotePeer()) {
closeStream(stream, log)
if err := s.sendGoodByeAndDisconnect(ctx, p2ptypes.GoodbyeCodeBanned, stream.Conn().RemotePeer()); err != nil {
log.Debugf("Could not disconnect from peer: %v", err)
}
Expand Down Expand Up @@ -147,6 +151,5 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
traceutil.AnnotateError(span, err)
}
}

})
}
6 changes: 1 addition & 5 deletions beacon-chain/sync/rpc_beacon_blocks_by_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ import (
func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
ctx, span := trace.StartSpan(ctx, "sync.BeaconBlocksByRangeHandler")
defer span.End()
defer func() {
if err := stream.Close(); err != nil {
log.WithError(err).Debug("Could not close stream")
}
}()
ctx, cancel := context.WithTimeout(ctx, respTimeout)
defer cancel()
SetRPCStreamDeadlines(stream)
Expand Down Expand Up @@ -117,6 +112,7 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
// wait for ticker before resuming streaming blocks to remote peer.
<-ticker.C
}
closeStream(stream, log)
return nil
}

Expand Down
27 changes: 4 additions & 23 deletions beacon-chain/sync/rpc_beacon_blocks_by_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, blockRoots

// beaconBlocksRootRPCHandler looks up the request blocks from the database from the given block roots.
func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
defer func() {
if err := stream.Close(); err != nil {
log.WithError(err).Debug("Could not close stream")
}
}()
ctx, cancel := context.WithTimeout(ctx, ttfbTimeout)
defer cancel()
SetRPCStreamDeadlines(stream)
Expand All @@ -56,22 +51,12 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
// Add to rate limiter in the event no
// roots are requested.
s.rateLimiter.add(stream, 1)
resp, err := s.generateErrorResponse(responseCodeInvalidRequest, "no block roots provided in request")
if err != nil {
log.WithError(err).Debug("Could not generate a response error")
} else if _, err := stream.Write(resp); err != nil {
log.WithError(err).Debugf("Could not write to stream")
}
s.writeErrorResponseToStream(responseCodeInvalidRequest, "no block roots provided in request", stream)
return errors.New("no block roots provided")
}

if uint64(len(blockRoots)) > params.BeaconNetworkConfig().MaxRequestBlocks {
resp, err := s.generateErrorResponse(responseCodeInvalidRequest, "requested more than the max block limit")
if err != nil {
log.WithError(err).Debug("Could not generate a response error")
} else if _, err := stream.Write(resp); err != nil {
log.WithError(err).Debugf("Could not write to stream")
}
s.writeErrorResponseToStream(responseCodeInvalidRequest, "requested more than the max block limit", stream)
return errors.New("requested more than the max block limit")
}
s.rateLimiter.add(stream, int64(len(blockRoots)))
Expand All @@ -80,12 +65,7 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
blk, err := s.db.Block(ctx, root)
if err != nil {
log.WithError(err).Debug("Could not fetch block")
resp, err := s.generateErrorResponse(responseCodeServerError, types.ErrGeneric.Error())
if err != nil {
log.WithError(err).Debug("Could not generate a response error")
} else if _, err := stream.Write(resp); err != nil {
log.WithError(err).Debugf("Could not write to stream")
}
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return err
}
if blk == nil {
Expand All @@ -95,5 +75,6 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
return err
}
}
closeStream(stream, log)
return nil
}
22 changes: 10 additions & 12 deletions beacon-chain/sync/rpc_goodbye.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"time"

libp2pcore "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
Expand All @@ -33,11 +31,6 @@ var backOffTime = map[types.SSZUint64]time.Duration{

// goodbyeRPCHandler reads the incoming goodbye rpc message from the peer.
func (s *Service) goodbyeRPCHandler(_ context.Context, msg interface{}, stream libp2pcore.Stream) error {
defer func() {
if err := stream.Close(); err != nil {
log.WithError(err).Debug("Could not close stream")
}
}()
SetRPCStreamDeadlines(stream)

m, ok := msg.(*types.SSZUint64)
Expand Down Expand Up @@ -91,13 +84,18 @@ func (s *Service) sendGoodByeMessage(ctx context.Context, code types.RPCGoodbyeC
if err != nil {
return err
}
defer func() {
if err := helpers.FullClose(stream); err != nil && err.Error() != mux.ErrReset.Error() {
log.WithError(err).Debugf("Could not reset stream with protocol %s", stream.Protocol())
}
}()
defer stream.Close()

log := log.WithField("Reason", goodbyeMessage(code))
log.WithField("peer", stream.Conn().RemotePeer()).Debug("Sending Goodbye message to peer")

// Wait up to the response timeout for the peer to receive the goodbye
// and close the stream (or disconnect). We usually don't bother waiting
// around for an EOF, but we're going to close this connection
// immediately after we say goodbye.
SetStreamReadDeadline(stream, respTimeout)
_, _ = stream.Read([]byte{0})

return nil
}

Expand Down
21 changes: 6 additions & 15 deletions beacon-chain/sync/rpc_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

libp2pcore "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
Expand All @@ -13,11 +12,6 @@ import (

// metaDataHandler reads the incoming metadata rpc request from the peer.
func (s *Service) metaDataHandler(_ context.Context, _ interface{}, stream libp2pcore.Stream) error {
defer func() {
if err := stream.Close(); err != nil {
log.WithError(err).Debug("Could not close stream")
}
}()
SetRPCStreamDeadlines(stream)

if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
Expand All @@ -29,7 +23,11 @@ func (s *Service) metaDataHandler(_ context.Context, _ interface{}, stream libp2
return err
}
_, err := s.p2p.Encoding().EncodeWithMaxLength(stream, s.p2p.Metadata())
return err
if err != nil {
return err
}
closeStream(stream, log)
return nil
}

func (s *Service) sendMetaDataRequest(ctx context.Context, id peer.ID) (*pb.MetaData, error) {
Expand All @@ -40,14 +38,7 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, id peer.ID) (*pb.Meta
if err != nil {
return nil, err
}
// we close the stream outside of `send` because
// metadata requests send no payload, so closing the
// stream early leads it to a reset.
defer func() {
if err := helpers.FullClose(stream); isValidStreamError(err) {
log.WithError(err).Debugf("Could not reset stream for protocol %s", stream.Protocol())
}
}()
defer closeStream(stream, log)
code, errMsg, err := ReadStatusCode(stream, s.p2p.Encoding())
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 9b29b8a

Please sign in to comment.