Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update go-libp2p to 0.12.0 #8015

Merged
merged 19 commits into from
Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ go_library(
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_golang_snappy//:go_default_library",
"@com_github_ipfs_go_ipfs_addr//:go_default_library",
"@com_github_kevinms_leakybucket_go//:go_default_library",
"@com_github_libp2p_go_libp2p//:go_default_library",
Expand Down
21 changes: 15 additions & 6 deletions beacon-chain/p2p/encoder/ssz.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,27 @@ func (e SszNetworkEncoder) doDecode(b []byte, to interface{}) error {

// DecodeGossip decodes the bytes to the protobuf gossip message provided.
func (e SszNetworkEncoder) DecodeGossip(b []byte, to interface{}) error {
size, err := snappy.DecodedLen(b)
b, err := DecodeSnappy(b, MaxGossipSize)
if err != nil {
return err
}
if uint64(size) > MaxGossipSize {
return errors.Errorf("gossip message exceeds max gossip size: %d bytes > %d bytes", size, MaxGossipSize)
return e.doDecode(b, to)
}

// DecodeSnappy decodes a snappy compressed message.
func DecodeSnappy(msg []byte, maxSize uint64) ([]byte, error) {
size, err := snappy.DecodedLen(msg)
if err != nil {
return nil, err
}
b, err = snappy.Decode(nil /*dst*/, b)
if uint64(size) > maxSize {
return nil, errors.Errorf("snappy message exceeds max size: %d bytes > %d bytes", size, maxSize)
}
msg, err = snappy.Decode(nil /*dst*/, msg)
if err != nil {
return err
return nil, err
}
return e.doDecode(b, to)
return msg, nil
}

// DecodeWithMaxLength the bytes from io.Reader to the protobuf message provided.
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/encoder/ssz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestSszNetworkEncoder_FailsSnappyLength(t *testing.T) {
data := make([]byte, 32)
binary.PutUvarint(data, encoder.MaxGossipSize+32)
err := e.DecodeGossip(data, att)
require.ErrorContains(t, "gossip message exceeds max gossip size", err)
require.ErrorContains(t, "snappy message exceeds max size", err)
}

func testRoundTripWithLength(t *testing.T, e *encoder.SszNetworkEncoder) {
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"context"
"time"

"github.com/golang/snappy"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
Expand Down Expand Up @@ -104,7 +104,7 @@ func (s *Service) peerInspector(peerMap map[peer.ID]*pubsub.PeerScoreSnapshot) {
// the concatenation of `MESSAGE_DOMAIN_INVALID_SNAPPY` with the raw message data,
// i.e. `SHA256(MESSAGE_DOMAIN_INVALID_SNAPPY + message.data)[:20]`.
func msgIDFunction(pmsg *pubsub_pb.Message) string {
decodedData, err := snappy.Decode(nil /*dst*/, pmsg.Data)
decodedData, err := encoder.DecodeSnappy(pmsg.Data, params.BeaconNetworkConfig().GossipMaxSize)
if err != nil {
combinedData := append(params.BeaconNetworkConfig().MessageDomainInvalidSnappy[:], pmsg.Data...)
h := hashutil.Hash(combinedData)
Expand Down
20 changes: 12 additions & 8 deletions beacon-chain/p2p/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

// Send a message to a specific peer. The returned stream may be used for reading, but has been
// closed for writing.
//
// When done, the caller must Close or Reset on the stream.
func (s *Service) Send(ctx context.Context, message interface{}, baseTopic string, pid peer.ID) (network.Stream, error) {
ctx, span := trace.StartSpan(ctx, "p2p.Send")
defer span.End()
Expand All @@ -31,18 +33,20 @@ func (s *Service) Send(ctx context.Context, message interface{}, baseTopic strin
return nil, err
}
// do not encode anything if we are sending a metadata request
if baseTopic == RPCMetaDataTopic {
return stream, nil
}

if _, err := s.Encoding().EncodeWithMaxLength(stream, message); err != nil {
traceutil.AnnotateError(span, err)
return nil, err
if baseTopic != RPCMetaDataTopic {
if _, err := s.Encoding().EncodeWithMaxLength(stream, message); err != nil {
traceutil.AnnotateError(span, err)
_err := stream.Reset()
_ = _err
Comment on lines +39 to +40
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't great. Why does stream.Reset() return an error if we aren't going to handle it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It returns an error just in case there might be something useful to return (and as I said, it probably should never have returned an error in the first place). We can just log it, but that won't actually do anything in practice.

return nil, err
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We now close the stream for writing after "sending" a metadata request. I'm pretty sure the issue here was due to a bug that I fixed in go-libp2p 0.12.0. Please make sure to test this.

Background:

Protocol negotiation in libp2p is usually lazy. This lets us bundle the first write with the protocol headers. Unfortunately, there was a bug in go-libp2p where calling Close before reading or writing would close the stream for writing without flushing the protocol headers. This has since been fixed.

}

// Close stream for writing.
if err := stream.Close(); err != nil {
if err := stream.CloseWrite(); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build is complaining about this.

beacon-chain/p2p/sender.go:45:18: stream.CloseWrite undefined (type network.Stream has no field or method CloseWrite)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to assume this is some kind of bazel thing. CloseWrite is implemented on the stream interface.

traceutil.AnnotateError(span, err)
_err := stream.Reset()
_ = _err
return nil, err
}

Expand Down
8 changes: 7 additions & 1 deletion beacon-chain/p2p/testing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ func (p *TestP2P) ReceiveRPC(topic string, msg proto.Message) {

n, err := p.Encoding().EncodeWithMaxLength(s, msg)
if err != nil {
_err := s.Reset()
_ = _err
p.t.Fatalf("Failed to encode message: %v", err)
}

Expand Down Expand Up @@ -287,12 +289,16 @@ func (p *TestP2P) Send(ctx context.Context, msg interface{}, topic string, pid p

if topic != "/eth2/beacon_chain/req/metadata/1" {
if _, err := p.Encoding().EncodeWithMaxLength(stream, msg); err != nil {
_err := stream.Reset()
_ = _err
return nil, err
}
}

// Close stream for writing.
if err := stream.Close(); err != nil {
if err := stream.CloseWrite(); err != nil {
_err := stream.Reset()
_ = _err
return nil, err
}
// Delay returning the stream for testing purposes
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ go_library(
"@com_github_hashicorp_golang_lru//:go_default_library",
"@com_github_kevinms_leakybucket_go//:go_default_library",
"@com_github_libp2p_go_libp2p_core//:go_default_library",
"@com_github_libp2p_go_libp2p_core//helpers:go_default_library",
"@com_github_libp2p_go_libp2p_core//mux:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
Expand Down
36 changes: 32 additions & 4 deletions beacon-chain/sync/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"

libp2pcore "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/network"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
Expand Down Expand Up @@ -56,6 +55,9 @@ func writeErrorResponseToStream(responseCode byte, reason string, stream libp2pc
log.WithError(err).Debug("Could not generate a response error")
} else if _, err := stream.Write(resp); err != nil {
log.WithError(err).Debugf("Could not write to stream")
} else {
// If sending the error message succeeded, close to send an EOF.
closeStream(stream, log)
}
}

Expand Down Expand Up @@ -91,11 +93,37 @@ func readStatusCodeNoDeadline(stream network.Stream, encoding encoder.NetworkEnc

// only returns true for errors that are valid (no resets or expectedEOF errors).
func isValidStreamError(err error) bool {
return err != nil && !errors.Is(err, mux.ErrReset) && !errors.Is(err, helpers.ErrExpectedEOF)
// check the error message itself as well as libp2p doesn't currently
// return the correct error type from Close{Read,Write,}.
return err != nil && !errors.Is(err, mux.ErrReset) && err.Error() != mux.ErrReset.Error()
}

func closeStream(stream network.Stream, log *logrus.Entry) {
if err := helpers.FullClose(stream); err != nil && err.Error() != mux.ErrReset.Error() {
log.WithError(err).Debug("Could not reset stream")
if err := stream.Close(); isValidStreamError(err) {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
log.WithError(err).Debugf("Could not reset stream with protocol %s", stream.Protocol())
}
}

func closeStreamAndWait(stream network.Stream, log *logrus.Entry) {
if err := stream.CloseWrite(); err != nil {
_err := stream.Reset()
_ = _err
if isValidStreamError(err) {
log.WithError(err).Debugf("Could not reset stream with protocol %s", stream.Protocol())
}
return
}
// Wait for the remote side to respond.
//
// 1. On success, we expect to read an EOF (remote side received our
// response and closed the stream.
// 2. On failure (e.g., disconnect), we expect to receive an error.
// 3. If the remote side misbehaves, we may receive data.
//
// However, regardless of what happens, we just close the stream and
// walk away. We only read to wait for a response, we close regardless.
_, _err := stream.Read([]byte{0})
_ = _err
_err = stream.Close()
_ = _err
}
6 changes: 4 additions & 2 deletions beacon-chain/sync/initial-sync/initial_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ func connectPeerHavingBlocks(

p.SetStreamHandler("/eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy", func(stream network.Stream) {
defer func() {
assert.NoError(t, stream.Close())
_err := stream.Close()
_ = _err
}()

req := &p2ppb.BeaconBlocksByRangeRequest{}
Expand All @@ -289,7 +290,8 @@ func connectPeerHavingBlocks(

p.SetStreamHandler("/eth2/beacon_chain/req/beacon_blocks_by_root/1/ssz_snappy", func(stream network.Stream) {
defer func() {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
assert.NoError(t, stream.Close())
_err := stream.Close()
_ = _err
}()

req := new(p2pTypes.BeaconBlockByRootsReq)
Expand Down
12 changes: 9 additions & 3 deletions beacon-chain/sync/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,24 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
s.p2p.SetStreamHandler(topic, func(stream network.Stream) {
ctx, cancel := context.WithTimeout(s.ctx, ttfbTimeout)
defer cancel()

// Resetting after closing is a no-op so defer a reset in case something goes wrong.
// It's up to the handler to Close the stream (send an EOF) if
// it successfully writes a response. We don't blindly call
// Close here because we may have only written a partial
// response.
defer func() {
closeStream(stream, log)
_err := stream.Reset()
_ = _err
}()

ctx, span := trace.StartSpan(ctx, "sync.rpc")
defer span.End()
span.AddAttributes(trace.StringAttribute("topic", topic))
span.AddAttributes(trace.StringAttribute("peer", stream.Conn().RemotePeer().Pretty()))
log := log.WithField("peer", stream.Conn().RemotePeer().Pretty())
// Check before hand that peer is valid.
if s.p2p.Peers().IsBad(stream.Conn().RemotePeer()) {
closeStream(stream, log)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're walking away, we might as well just reset the stream.

if err := s.sendGoodByeAndDisconnect(ctx, p2ptypes.GoodbyeCodeBanned, stream.Conn().RemotePeer()); err != nil {
log.Debugf("Could not disconnect from peer: %v", err)
}
Expand Down Expand Up @@ -147,6 +154,5 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
traceutil.AnnotateError(span, err)
}
}

})
}
6 changes: 1 addition & 5 deletions beacon-chain/sync/rpc_beacon_blocks_by_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ import (
func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
ctx, span := trace.StartSpan(ctx, "sync.BeaconBlocksByRangeHandler")
defer span.End()
defer func() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm now explicitly closing on success. If we don't close the stream, the main RPC handler will reset it.

if err := stream.Close(); err != nil {
log.WithError(err).Debug("Could not close stream")
}
}()
ctx, cancel := context.WithTimeout(ctx, respTimeout)
defer cancel()
SetRPCStreamDeadlines(stream)
Expand Down Expand Up @@ -117,6 +112,7 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
// wait for ticker before resuming streaming blocks to remote peer.
<-ticker.C
}
closeStream(stream, log)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have the RPC handler always close the stream instead of resetting, we can remove this. Up to you.

return nil
}

Expand Down
27 changes: 4 additions & 23 deletions beacon-chain/sync/rpc_beacon_blocks_by_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, blockRoots

// beaconBlocksRootRPCHandler looks up the request blocks from the database from the given block roots.
func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
defer func() {
if err := stream.Close(); err != nil {
log.WithError(err).Debug("Could not close stream")
}
}()
ctx, cancel := context.WithTimeout(ctx, ttfbTimeout)
defer cancel()
SetRPCStreamDeadlines(stream)
Expand All @@ -56,22 +51,12 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
// Add to rate limiter in the event no
// roots are requested.
s.rateLimiter.add(stream, 1)
resp, err := s.generateErrorResponse(responseCodeInvalidRequest, "no block roots provided in request")
if err != nil {
log.WithError(err).Debug("Could not generate a response error")
} else if _, err := stream.Write(resp); err != nil {
log.WithError(err).Debugf("Could not write to stream")
}
s.writeErrorResponseToStream(responseCodeInvalidRequest, "no block roots provided in request", stream)
return errors.New("no block roots provided")
}

if uint64(len(blockRoots)) > params.BeaconNetworkConfig().MaxRequestBlocks {
resp, err := s.generateErrorResponse(responseCodeInvalidRequest, "requested more than the max block limit")
if err != nil {
log.WithError(err).Debug("Could not generate a response error")
} else if _, err := stream.Write(resp); err != nil {
log.WithError(err).Debugf("Could not write to stream")
}
s.writeErrorResponseToStream(responseCodeInvalidRequest, "requested more than the max block limit", stream)
return errors.New("requested more than the max block limit")
}
s.rateLimiter.add(stream, int64(len(blockRoots)))
Expand All @@ -80,12 +65,7 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
blk, err := s.db.Block(ctx, root)
if err != nil {
log.WithError(err).Debug("Could not fetch block")
resp, err := s.generateErrorResponse(responseCodeServerError, types.ErrGeneric.Error())
if err != nil {
log.WithError(err).Debug("Could not generate a response error")
} else if _, err := stream.Write(resp); err != nil {
log.WithError(err).Debugf("Could not write to stream")
}
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return err
}
if blk == nil {
Expand All @@ -95,5 +75,6 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
return err
}
}
closeStream(stream, log)
return nil
}
26 changes: 14 additions & 12 deletions beacon-chain/sync/rpc_goodbye.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"time"

libp2pcore "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
Expand All @@ -33,11 +31,6 @@ var backOffTime = map[types.SSZUint64]time.Duration{

// goodbyeRPCHandler reads the incoming goodbye rpc message from the peer.
func (s *Service) goodbyeRPCHandler(_ context.Context, msg interface{}, stream libp2pcore.Stream) error {
defer func() {
if err := stream.Close(); err != nil {
log.WithError(err).Debug("Could not close stream")
}
}()
SetRPCStreamDeadlines(stream)

m, ok := msg.(*types.SSZUint64)
Expand Down Expand Up @@ -91,13 +84,22 @@ func (s *Service) sendGoodByeMessage(ctx context.Context, code types.RPCGoodbyeC
if err != nil {
return err
}
defer func() {
if err := helpers.FullClose(stream); err != nil && err.Error() != mux.ErrReset.Error() {
log.WithError(err).Debugf("Could not reset stream with protocol %s", stream.Protocol())
}
}()
defer closeStream(stream, log)

log := log.WithField("Reason", goodbyeMessage(code))
log.WithField("peer", stream.Conn().RemotePeer()).Debug("Sending Goodbye message to peer")

// Wait up to the response timeout for the peer to receive the goodbye
// and close the stream (or disconnect). We usually don't bother waiting
// around for an EOF, but we're going to close this connection
// immediately after we say goodbye.
//
// NOTE: we don't actually check the response as there's nothing we can
// do if something fails. We just need to wait for it.
SetStreamReadDeadline(stream, respTimeout)
_, _err := stream.Read([]byte{0})
_ = _err

return nil
}

Expand Down
Loading