Skip to content

Commit

Permalink
Update go-libp2p to 0.12.0 (#8015)
Browse files Browse the repository at this point in the history
* Update go-libp2p to 0.12.0

go-libp2p 0.12.0 made some significant changes to the stream interfaces around
stream closing:

* Close now closes in both directions and frees the stream. However, unlike
FullClose did, it doesn't _wait_ for the remote peer to respond with an EOF.
* To close for writing, call CloseWrite (like one would on a TCP connection, etc.).

This patch:

* Replaces calls to FullClose with Close where appropriate.
* Replaces calls to Close with CloseWrite where appropriate.
* Removes redundant Close calls.
* Calls Reset to where appropriate to indicate that the request/response was
  aborted. Unlike Close, this will not flush and will not cause the remote peer
  to read an EOF. Instead, the remote peer will read an ErrReset error.
* Ensures we always either close or reset streams. Send wasn't closing the
  stream on some error paths.
* Now that stream closing is async, we explicitly wait for a response when
  "hanging up" on a peer (so we don't hang up before they receive our
  response/goodbye message).

* update bazel

* Gazelle

* revert unintentional bazel workspace change

* appease an overzealous linter

* update to latest

* Refactor encoder

* gazelle

* Gazelle

Co-authored-by: Preston Van Loon <[email protected]>
Co-authored-by: Nishant Das <[email protected]>
Co-authored-by: Raul Jordan <[email protected]>
  • Loading branch information
4 people authored Dec 14, 2020
1 parent 4d1f01a commit 2428880
Show file tree
Hide file tree
Showing 20 changed files with 233 additions and 222 deletions.
1 change: 0 additions & 1 deletion beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ go_library(
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_golang_snappy//:go_default_library",
"@com_github_ipfs_go_ipfs_addr//:go_default_library",
"@com_github_kevinms_leakybucket_go//:go_default_library",
"@com_github_libp2p_go_libp2p//:go_default_library",
Expand Down
21 changes: 15 additions & 6 deletions beacon-chain/p2p/encoder/ssz.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,27 @@ func (e SszNetworkEncoder) doDecode(b []byte, to interface{}) error {

// DecodeGossip decodes the bytes to the protobuf gossip message provided.
func (e SszNetworkEncoder) DecodeGossip(b []byte, to interface{}) error {
size, err := snappy.DecodedLen(b)
b, err := DecodeSnappy(b, MaxGossipSize)
if err != nil {
return err
}
if uint64(size) > MaxGossipSize {
return errors.Errorf("gossip message exceeds max gossip size: %d bytes > %d bytes", size, MaxGossipSize)
return e.doDecode(b, to)
}

// DecodeSnappy decodes a snappy compressed message.
func DecodeSnappy(msg []byte, maxSize uint64) ([]byte, error) {
size, err := snappy.DecodedLen(msg)
if err != nil {
return nil, err
}
b, err = snappy.Decode(nil /*dst*/, b)
if uint64(size) > maxSize {
return nil, errors.Errorf("snappy message exceeds max size: %d bytes > %d bytes", size, maxSize)
}
msg, err = snappy.Decode(nil /*dst*/, msg)
if err != nil {
return err
return nil, err
}
return e.doDecode(b, to)
return msg, nil
}

// DecodeWithMaxLength the bytes from io.Reader to the protobuf message provided.
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/encoder/ssz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestSszNetworkEncoder_FailsSnappyLength(t *testing.T) {
data := make([]byte, 32)
binary.PutUvarint(data, encoder.MaxGossipSize+32)
err := e.DecodeGossip(data, att)
require.ErrorContains(t, "gossip message exceeds max gossip size", err)
require.ErrorContains(t, "snappy message exceeds max size", err)
}

func testRoundTripWithLength(t *testing.T, e *encoder.SszNetworkEncoder) {
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"context"
"time"

"github.com/golang/snappy"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
Expand Down Expand Up @@ -104,7 +104,7 @@ func (s *Service) peerInspector(peerMap map[peer.ID]*pubsub.PeerScoreSnapshot) {
// the concatenation of `MESSAGE_DOMAIN_INVALID_SNAPPY` with the raw message data,
// i.e. `SHA256(MESSAGE_DOMAIN_INVALID_SNAPPY + message.data)[:20]`.
func msgIDFunction(pmsg *pubsub_pb.Message) string {
decodedData, err := snappy.Decode(nil /*dst*/, pmsg.Data)
decodedData, err := encoder.DecodeSnappy(pmsg.Data, params.BeaconNetworkConfig().GossipMaxSize)
if err != nil {
combinedData := append(params.BeaconNetworkConfig().MessageDomainInvalidSnappy[:], pmsg.Data...)
h := hashutil.Hash(combinedData)
Expand Down
20 changes: 12 additions & 8 deletions beacon-chain/p2p/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

// Send a message to a specific peer. The returned stream may be used for reading, but has been
// closed for writing.
//
// When done, the caller must Close or Reset on the stream.
func (s *Service) Send(ctx context.Context, message interface{}, baseTopic string, pid peer.ID) (network.Stream, error) {
ctx, span := trace.StartSpan(ctx, "p2p.Send")
defer span.End()
Expand All @@ -31,18 +33,20 @@ func (s *Service) Send(ctx context.Context, message interface{}, baseTopic strin
return nil, err
}
// do not encode anything if we are sending a metadata request
if baseTopic == RPCMetaDataTopic {
return stream, nil
}

if _, err := s.Encoding().EncodeWithMaxLength(stream, message); err != nil {
traceutil.AnnotateError(span, err)
return nil, err
if baseTopic != RPCMetaDataTopic {
if _, err := s.Encoding().EncodeWithMaxLength(stream, message); err != nil {
traceutil.AnnotateError(span, err)
_err := stream.Reset()
_ = _err
return nil, err
}
}

// Close stream for writing.
if err := stream.Close(); err != nil {
if err := stream.CloseWrite(); err != nil {
traceutil.AnnotateError(span, err)
_err := stream.Reset()
_ = _err
return nil, err
}

Expand Down
8 changes: 7 additions & 1 deletion beacon-chain/p2p/testing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ func (p *TestP2P) ReceiveRPC(topic string, msg proto.Message) {

n, err := p.Encoding().EncodeWithMaxLength(s, msg)
if err != nil {
_err := s.Reset()
_ = _err
p.t.Fatalf("Failed to encode message: %v", err)
}

Expand Down Expand Up @@ -287,12 +289,16 @@ func (p *TestP2P) Send(ctx context.Context, msg interface{}, topic string, pid p

if topic != "/eth2/beacon_chain/req/metadata/1" {
if _, err := p.Encoding().EncodeWithMaxLength(stream, msg); err != nil {
_err := stream.Reset()
_ = _err
return nil, err
}
}

// Close stream for writing.
if err := stream.Close(); err != nil {
if err := stream.CloseWrite(); err != nil {
_err := stream.Reset()
_ = _err
return nil, err
}
// Delay returning the stream for testing purposes
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ go_library(
"@com_github_hashicorp_golang_lru//:go_default_library",
"@com_github_kevinms_leakybucket_go//:go_default_library",
"@com_github_libp2p_go_libp2p_core//:go_default_library",
"@com_github_libp2p_go_libp2p_core//helpers:go_default_library",
"@com_github_libp2p_go_libp2p_core//mux:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
Expand Down
36 changes: 32 additions & 4 deletions beacon-chain/sync/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"

libp2pcore "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/network"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
Expand Down Expand Up @@ -56,6 +55,9 @@ func writeErrorResponseToStream(responseCode byte, reason string, stream libp2pc
log.WithError(err).Debug("Could not generate a response error")
} else if _, err := stream.Write(resp); err != nil {
log.WithError(err).Debugf("Could not write to stream")
} else {
// If sending the error message succeeded, close to send an EOF.
closeStream(stream, log)
}
}

Expand Down Expand Up @@ -91,11 +93,37 @@ func readStatusCodeNoDeadline(stream network.Stream, encoding encoder.NetworkEnc

// only returns true for errors that are valid (no resets or expectedEOF errors).
func isValidStreamError(err error) bool {
return err != nil && !errors.Is(err, mux.ErrReset) && !errors.Is(err, helpers.ErrExpectedEOF)
// check the error message itself as well as libp2p doesn't currently
// return the correct error type from Close{Read,Write,}.
return err != nil && !errors.Is(err, mux.ErrReset) && err.Error() != mux.ErrReset.Error()
}

func closeStream(stream network.Stream, log *logrus.Entry) {
if err := helpers.FullClose(stream); err != nil && err.Error() != mux.ErrReset.Error() {
log.WithError(err).Debug("Could not reset stream")
if err := stream.Close(); isValidStreamError(err) {
log.WithError(err).Debugf("Could not reset stream with protocol %s", stream.Protocol())
}
}

func closeStreamAndWait(stream network.Stream, log *logrus.Entry) {
if err := stream.CloseWrite(); err != nil {
_err := stream.Reset()
_ = _err
if isValidStreamError(err) {
log.WithError(err).Debugf("Could not reset stream with protocol %s", stream.Protocol())
}
return
}
// Wait for the remote side to respond.
//
// 1. On success, we expect to read an EOF (remote side received our
// response and closed the stream.
// 2. On failure (e.g., disconnect), we expect to receive an error.
// 3. If the remote side misbehaves, we may receive data.
//
// However, regardless of what happens, we just close the stream and
// walk away. We only read to wait for a response, we close regardless.
_, _err := stream.Read([]byte{0})
_ = _err
_err = stream.Close()
_ = _err
}
6 changes: 4 additions & 2 deletions beacon-chain/sync/initial-sync/initial_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ func connectPeerHavingBlocks(

p.SetStreamHandler("/eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy", func(stream network.Stream) {
defer func() {
assert.NoError(t, stream.Close())
_err := stream.Close()
_ = _err
}()

req := &p2ppb.BeaconBlocksByRangeRequest{}
Expand All @@ -289,7 +290,8 @@ func connectPeerHavingBlocks(

p.SetStreamHandler("/eth2/beacon_chain/req/beacon_blocks_by_root/1/ssz_snappy", func(stream network.Stream) {
defer func() {
assert.NoError(t, stream.Close())
_err := stream.Close()
_ = _err
}()

req := new(p2pTypes.BeaconBlockByRootsReq)
Expand Down
12 changes: 9 additions & 3 deletions beacon-chain/sync/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,24 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
s.p2p.SetStreamHandler(topic, func(stream network.Stream) {
ctx, cancel := context.WithTimeout(s.ctx, ttfbTimeout)
defer cancel()

// Resetting after closing is a no-op so defer a reset in case something goes wrong.
// It's up to the handler to Close the stream (send an EOF) if
// it successfully writes a response. We don't blindly call
// Close here because we may have only written a partial
// response.
defer func() {
closeStream(stream, log)
_err := stream.Reset()
_ = _err
}()

ctx, span := trace.StartSpan(ctx, "sync.rpc")
defer span.End()
span.AddAttributes(trace.StringAttribute("topic", topic))
span.AddAttributes(trace.StringAttribute("peer", stream.Conn().RemotePeer().Pretty()))
log := log.WithField("peer", stream.Conn().RemotePeer().Pretty())
// Check before hand that peer is valid.
if s.p2p.Peers().IsBad(stream.Conn().RemotePeer()) {
closeStream(stream, log)
if err := s.sendGoodByeAndDisconnect(ctx, p2ptypes.GoodbyeCodeBanned, stream.Conn().RemotePeer()); err != nil {
log.Debugf("Could not disconnect from peer: %v", err)
}
Expand Down Expand Up @@ -147,6 +154,5 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
traceutil.AnnotateError(span, err)
}
}

})
}
6 changes: 1 addition & 5 deletions beacon-chain/sync/rpc_beacon_blocks_by_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ import (
func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
ctx, span := trace.StartSpan(ctx, "sync.BeaconBlocksByRangeHandler")
defer span.End()
defer func() {
if err := stream.Close(); err != nil {
log.WithError(err).Debug("Could not close stream")
}
}()
ctx, cancel := context.WithTimeout(ctx, respTimeout)
defer cancel()
SetRPCStreamDeadlines(stream)
Expand Down Expand Up @@ -117,6 +112,7 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
// wait for ticker before resuming streaming blocks to remote peer.
<-ticker.C
}
closeStream(stream, log)
return nil
}

Expand Down
27 changes: 4 additions & 23 deletions beacon-chain/sync/rpc_beacon_blocks_by_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, blockRoots

// beaconBlocksRootRPCHandler looks up the request blocks from the database from the given block roots.
func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
defer func() {
if err := stream.Close(); err != nil {
log.WithError(err).Debug("Could not close stream")
}
}()
ctx, cancel := context.WithTimeout(ctx, ttfbTimeout)
defer cancel()
SetRPCStreamDeadlines(stream)
Expand All @@ -56,22 +51,12 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
// Add to rate limiter in the event no
// roots are requested.
s.rateLimiter.add(stream, 1)
resp, err := s.generateErrorResponse(responseCodeInvalidRequest, "no block roots provided in request")
if err != nil {
log.WithError(err).Debug("Could not generate a response error")
} else if _, err := stream.Write(resp); err != nil {
log.WithError(err).Debugf("Could not write to stream")
}
s.writeErrorResponseToStream(responseCodeInvalidRequest, "no block roots provided in request", stream)
return errors.New("no block roots provided")
}

if uint64(len(blockRoots)) > params.BeaconNetworkConfig().MaxRequestBlocks {
resp, err := s.generateErrorResponse(responseCodeInvalidRequest, "requested more than the max block limit")
if err != nil {
log.WithError(err).Debug("Could not generate a response error")
} else if _, err := stream.Write(resp); err != nil {
log.WithError(err).Debugf("Could not write to stream")
}
s.writeErrorResponseToStream(responseCodeInvalidRequest, "requested more than the max block limit", stream)
return errors.New("requested more than the max block limit")
}
s.rateLimiter.add(stream, int64(len(blockRoots)))
Expand All @@ -80,12 +65,7 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
blk, err := s.db.Block(ctx, root)
if err != nil {
log.WithError(err).Debug("Could not fetch block")
resp, err := s.generateErrorResponse(responseCodeServerError, types.ErrGeneric.Error())
if err != nil {
log.WithError(err).Debug("Could not generate a response error")
} else if _, err := stream.Write(resp); err != nil {
log.WithError(err).Debugf("Could not write to stream")
}
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return err
}
if blk == nil {
Expand All @@ -95,5 +75,6 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
return err
}
}
closeStream(stream, log)
return nil
}
26 changes: 14 additions & 12 deletions beacon-chain/sync/rpc_goodbye.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"time"

libp2pcore "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
Expand All @@ -33,11 +31,6 @@ var backOffTime = map[types.SSZUint64]time.Duration{

// goodbyeRPCHandler reads the incoming goodbye rpc message from the peer.
func (s *Service) goodbyeRPCHandler(_ context.Context, msg interface{}, stream libp2pcore.Stream) error {
defer func() {
if err := stream.Close(); err != nil {
log.WithError(err).Debug("Could not close stream")
}
}()
SetRPCStreamDeadlines(stream)

m, ok := msg.(*types.SSZUint64)
Expand Down Expand Up @@ -91,13 +84,22 @@ func (s *Service) sendGoodByeMessage(ctx context.Context, code types.RPCGoodbyeC
if err != nil {
return err
}
defer func() {
if err := helpers.FullClose(stream); err != nil && err.Error() != mux.ErrReset.Error() {
log.WithError(err).Debugf("Could not reset stream with protocol %s", stream.Protocol())
}
}()
defer closeStream(stream, log)

log := log.WithField("Reason", goodbyeMessage(code))
log.WithField("peer", stream.Conn().RemotePeer()).Debug("Sending Goodbye message to peer")

// Wait up to the response timeout for the peer to receive the goodbye
// and close the stream (or disconnect). We usually don't bother waiting
// around for an EOF, but we're going to close this connection
// immediately after we say goodbye.
//
// NOTE: we don't actually check the response as there's nothing we can
// do if something fails. We just need to wait for it.
SetStreamReadDeadline(stream, respTimeout)
_, _err := stream.Read([]byte{0})
_ = _err

return nil
}

Expand Down
Loading

0 comments on commit 2428880

Please sign in to comment.