Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update go-libp2p to 0.12.0 #8015

Merged
merged 19 commits into from
Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,6 @@ load("@com_grail_bazel_toolchain//toolchain:deps.bzl", "bazel_toolchain_dependen

bazel_toolchain_dependencies()

load("@com_grail_bazel_toolchain//toolchain:rules.bzl", "llvm_toolchain")

llvm_toolchain(
name = "llvm_toolchain",
llvm_version = "10.0.0",
)

load("@llvm_toolchain//:toolchains.bzl", "llvm_register_toolchains")

llvm_register_toolchains()

Stebalien marked this conversation as resolved.
Show resolved Hide resolved
load("@prysm//tools/cross-toolchain:prysm_toolchains.bzl", "configure_prysm_toolchains")

configure_prysm_toolchains()
Expand Down
18 changes: 10 additions & 8 deletions beacon-chain/p2p/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

// Send a message to a specific peer. The returned stream may be used for reading, but has been
// closed for writing.
//
// When done, the caller must Close or Reset on the stream.
func (s *Service) Send(ctx context.Context, message interface{}, baseTopic string, pid peer.ID) (network.Stream, error) {
ctx, span := trace.StartSpan(ctx, "p2p.Send")
defer span.End()
Expand All @@ -31,18 +33,18 @@ func (s *Service) Send(ctx context.Context, message interface{}, baseTopic strin
return nil, err
}
// do not encode anything if we are sending a metadata request
if baseTopic == RPCMetaDataTopic {
return stream, nil
}

if _, err := s.Encoding().EncodeWithMaxLength(stream, message); err != nil {
traceutil.AnnotateError(span, err)
return nil, err
if baseTopic != RPCMetaDataTopic {
if _, err := s.Encoding().EncodeWithMaxLength(stream, message); err != nil {
traceutil.AnnotateError(span, err)
_ = stream.Reset()
return nil, err
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We now close the stream for writing after "sending" a metadata request. I'm pretty sure the issue here was due to a bug that I fixed in go-libp2p 0.12.0. Please make sure to test this.

Background:

Protocol negotiation in libp2p is usually lazy. This lets us bundle the first write with the protocol headers. Unfortunately, there was a bug in go-libp2p where calling Close before reading or writing would close the stream for writing without flushing the protocol headers. This has since been fixed.

}

// Close stream for writing.
if err := stream.Close(); err != nil {
if err := stream.CloseWrite(); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build is complaining about this.

beacon-chain/p2p/sender.go:45:18: stream.CloseWrite undefined (type network.Stream has no field or method CloseWrite)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to assume this is some kind of bazel thing. CloseWrite is implemented on the stream interface.

traceutil.AnnotateError(span, err)
_ = stream.Reset()
return nil, err
}

Expand Down
5 changes: 4 additions & 1 deletion beacon-chain/p2p/testing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (p *TestP2P) ReceiveRPC(topic string, msg proto.Message) {

n, err := p.Encoding().EncodeWithMaxLength(s, msg)
if err != nil {
_ = s.Reset()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem appropriate. Why ignore stream.Reset errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. They're not useful. We only call Reset when we run into some other more relevant error. At that point, we don't really care about the stream (other than freeing resources).
  2. Reset is guaranteed to free resources, even if it returns an error.
  3. No stream multiplexer actually returns an error on reset. We return errors for completeness, but if
    I were to redesign these interfaces from scratch, I probably wouldn't.

p.t.Fatalf("Failed to encode message: %v", err)
}

Expand Down Expand Up @@ -287,12 +288,14 @@ func (p *TestP2P) Send(ctx context.Context, msg interface{}, topic string, pid p

if topic != "/eth2/beacon_chain/req/metadata/1" {
if _, err := p.Encoding().EncodeWithMaxLength(stream, msg); err != nil {
_ = stream.Reset()
return nil, err
}
}

// Close stream for writing.
if err := stream.Close(); err != nil {
if err := stream.CloseWrite(); err != nil {
_ = stream.Reset()
return nil, err
}
// Delay returning the stream for testing purposes
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ go_library(
"@com_github_hashicorp_golang_lru//:go_default_library",
"@com_github_kevinms_leakybucket_go//:go_default_library",
"@com_github_libp2p_go_libp2p_core//:go_default_library",
"@com_github_libp2p_go_libp2p_core//helpers:go_default_library",
"@com_github_libp2p_go_libp2p_core//mux:go_default_library",
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
Expand Down
33 changes: 29 additions & 4 deletions beacon-chain/sync/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"

libp2pcore "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/network"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
Expand Down Expand Up @@ -56,6 +55,9 @@ func writeErrorResponseToStream(responseCode byte, reason string, stream libp2pc
log.WithError(err).Debug("Could not generate a response error")
} else if _, err := stream.Write(resp); err != nil {
log.WithError(err).Debugf("Could not write to stream")
} else {
// If sending the error message succeeded, close to send an EOF.
closeStream(stream, log)
}
}

Expand Down Expand Up @@ -91,11 +93,34 @@ func readStatusCodeNoDeadline(stream network.Stream, encoding encoder.NetworkEnc

// only returns true for errors that are valid (no resets or expectedEOF errors).
func isValidStreamError(err error) bool {
return err != nil && !errors.Is(err, mux.ErrReset) && !errors.Is(err, helpers.ErrExpectedEOF)
// check the error message itself as well as libp2p doesn't currently
// return the correct error type from Close{Read,Write,}.
return err != nil && !errors.Is(err, mux.ErrReset) && err.Error() != mux.ErrReset.Error()
}

func closeStream(stream network.Stream, log *logrus.Entry) {
if err := helpers.FullClose(stream); err != nil && err.Error() != mux.ErrReset.Error() {
log.WithError(err).Debug("Could not reset stream")
if err := stream.Close(); isValidStreamError(err) {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
log.WithError(err).Debugf("Could not reset stream with protocol %s", stream.Protocol())
}
}

func closeStreamAndWait(stream network.Stream, log *logrus.Entry) {
if err := stream.CloseWrite(); err != nil {
_ = stream.Reset()
if isValidStreamError(err) {
log.WithError(err).Debugf("Could not reset stream with protocol %s", stream.Protocol())
}
return
}
// Wait for the remote side to respond.
//
// 1. On success, we expect to read an EOF (remote side received our
// response and closed the stream.
// 2. On failure (e.g., disconnect), we expect to receive an error.
// 3. If the remote side misbehaves, we may receive data.
//
// However, regardless of what happens, we just close the stream and
// walk away. We only read to wait for a response, we close regardless.
_, _ = stream.Read([]byte{0})
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
_ = stream.Close()
}
4 changes: 2 additions & 2 deletions beacon-chain/sync/initial-sync/initial_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func connectPeerHavingBlocks(

p.SetStreamHandler("/eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy", func(stream network.Stream) {
defer func() {
assert.NoError(t, stream.Close())
_ = stream.Close()
}()

req := &p2ppb.BeaconBlocksByRangeRequest{}
Expand All @@ -293,7 +293,7 @@ func connectPeerHavingBlocks(

p.SetStreamHandler("/eth2/beacon_chain/req/beacon_blocks_by_root/1/ssz_snappy", func(stream network.Stream) {
defer func() {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
assert.NoError(t, stream.Close())
_ = stream.Close()
}()

req := new(p2pTypes.BeaconBlockByRootsReq)
Expand Down
11 changes: 8 additions & 3 deletions beacon-chain/sync/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,23 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
s.p2p.SetStreamHandler(topic, func(stream network.Stream) {
ctx, cancel := context.WithTimeout(s.ctx, ttfbTimeout)
defer cancel()

// Resetting after closing is a no-op so defer a reset in case something goes wrong.
// It's up to the handler to Close the stream (send an EOF) if
// it successfully writes a response. We don't blindly call
// Close here because we may have only written a partial
// response.
defer func() {
closeStream(stream, log)
_ = stream.Reset()
}()

ctx, span := trace.StartSpan(ctx, "sync.rpc")
defer span.End()
span.AddAttributes(trace.StringAttribute("topic", topic))
span.AddAttributes(trace.StringAttribute("peer", stream.Conn().RemotePeer().Pretty()))
log := log.WithField("peer", stream.Conn().RemotePeer().Pretty())
// Check before hand that peer is valid.
if s.p2p.Peers().IsBad(stream.Conn().RemotePeer()) {
closeStream(stream, log)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're walking away, we might as well just reset the stream.

if err := s.sendGoodByeAndDisconnect(ctx, p2ptypes.GoodbyeCodeBanned, stream.Conn().RemotePeer()); err != nil {
log.Debugf("Could not disconnect from peer: %v", err)
}
Expand Down Expand Up @@ -147,6 +153,5 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
traceutil.AnnotateError(span, err)
}
}

})
}
6 changes: 1 addition & 5 deletions beacon-chain/sync/rpc_beacon_blocks_by_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ import (
func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
ctx, span := trace.StartSpan(ctx, "sync.BeaconBlocksByRangeHandler")
defer span.End()
defer func() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm now explicitly closing on success. If we don't close the stream, the main RPC handler will reset it.

if err := stream.Close(); err != nil {
log.WithError(err).Debug("Could not close stream")
}
}()
ctx, cancel := context.WithTimeout(ctx, respTimeout)
defer cancel()
SetRPCStreamDeadlines(stream)
Expand Down Expand Up @@ -117,6 +112,7 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
// wait for ticker before resuming streaming blocks to remote peer.
<-ticker.C
}
closeStream(stream, log)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have the RPC handler always close the stream instead of resetting, we can remove this. Up to you.

return nil
}

Expand Down
27 changes: 4 additions & 23 deletions beacon-chain/sync/rpc_beacon_blocks_by_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, blockRoots

// beaconBlocksRootRPCHandler looks up the request blocks from the database from the given block roots.
func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
defer func() {
if err := stream.Close(); err != nil {
log.WithError(err).Debug("Could not close stream")
}
}()
ctx, cancel := context.WithTimeout(ctx, ttfbTimeout)
defer cancel()
SetRPCStreamDeadlines(stream)
Expand All @@ -56,22 +51,12 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
// Add to rate limiter in the event no
// roots are requested.
s.rateLimiter.add(stream, 1)
resp, err := s.generateErrorResponse(responseCodeInvalidRequest, "no block roots provided in request")
if err != nil {
log.WithError(err).Debug("Could not generate a response error")
} else if _, err := stream.Write(resp); err != nil {
log.WithError(err).Debugf("Could not write to stream")
}
s.writeErrorResponseToStream(responseCodeInvalidRequest, "no block roots provided in request", stream)
return errors.New("no block roots provided")
}

if uint64(len(blockRoots)) > params.BeaconNetworkConfig().MaxRequestBlocks {
resp, err := s.generateErrorResponse(responseCodeInvalidRequest, "requested more than the max block limit")
if err != nil {
log.WithError(err).Debug("Could not generate a response error")
} else if _, err := stream.Write(resp); err != nil {
log.WithError(err).Debugf("Could not write to stream")
}
s.writeErrorResponseToStream(responseCodeInvalidRequest, "requested more than the max block limit", stream)
return errors.New("requested more than the max block limit")
}
s.rateLimiter.add(stream, int64(len(blockRoots)))
Expand All @@ -80,12 +65,7 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
blk, err := s.db.Block(ctx, root)
if err != nil {
log.WithError(err).Debug("Could not fetch block")
resp, err := s.generateErrorResponse(responseCodeServerError, types.ErrGeneric.Error())
if err != nil {
log.WithError(err).Debug("Could not generate a response error")
} else if _, err := stream.Write(resp); err != nil {
log.WithError(err).Debugf("Could not write to stream")
}
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return err
}
if blk == nil {
Expand All @@ -95,5 +75,6 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
return err
}
}
closeStream(stream, log)
return nil
}
25 changes: 13 additions & 12 deletions beacon-chain/sync/rpc_goodbye.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"time"

libp2pcore "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
Expand All @@ -33,11 +31,6 @@ var backOffTime = map[types.SSZUint64]time.Duration{

// goodbyeRPCHandler reads the incoming goodbye rpc message from the peer.
func (s *Service) goodbyeRPCHandler(_ context.Context, msg interface{}, stream libp2pcore.Stream) error {
defer func() {
if err := stream.Close(); err != nil {
log.WithError(err).Debug("Could not close stream")
}
}()
SetRPCStreamDeadlines(stream)

m, ok := msg.(*types.SSZUint64)
Expand Down Expand Up @@ -91,13 +84,21 @@ func (s *Service) sendGoodByeMessage(ctx context.Context, code types.RPCGoodbyeC
if err != nil {
return err
}
defer func() {
if err := helpers.FullClose(stream); err != nil && err.Error() != mux.ErrReset.Error() {
log.WithError(err).Debugf("Could not reset stream with protocol %s", stream.Protocol())
}
}()
defer closeStream(stream, log)

log := log.WithField("Reason", goodbyeMessage(code))
log.WithField("peer", stream.Conn().RemotePeer()).Debug("Sending Goodbye message to peer")

// Wait up to the response timeout for the peer to receive the goodbye
// and close the stream (or disconnect). We usually don't bother waiting
// around for an EOF, but we're going to close this connection
// immediately after we say goodbye.
//
// NOTE: we don't actually check the response as there's nothing we can
// do if something fails. We just need to wait for it.
SetStreamReadDeadline(stream, respTimeout)
_, _ = stream.Read([]byte{0})

return nil
}

Expand Down
21 changes: 6 additions & 15 deletions beacon-chain/sync/rpc_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

libp2pcore "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
Expand All @@ -13,11 +12,6 @@ import (

// metaDataHandler reads the incoming metadata rpc request from the peer.
func (s *Service) metaDataHandler(_ context.Context, _ interface{}, stream libp2pcore.Stream) error {
defer func() {
if err := stream.Close(); err != nil {
log.WithError(err).Debug("Could not close stream")
}
}()
SetRPCStreamDeadlines(stream)

if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
Expand All @@ -29,7 +23,11 @@ func (s *Service) metaDataHandler(_ context.Context, _ interface{}, stream libp2
return err
}
_, err := s.p2p.Encoding().EncodeWithMaxLength(stream, s.p2p.Metadata())
return err
if err != nil {
return err
}
closeStream(stream, log)
return nil
}

func (s *Service) sendMetaDataRequest(ctx context.Context, id peer.ID) (*pb.MetaData, error) {
Expand All @@ -40,14 +38,7 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, id peer.ID) (*pb.Meta
if err != nil {
return nil, err
}
// we close the stream outside of `send` because
// metadata requests send no payload, so closing the
// stream early leads it to a reset.
defer func() {
if err := helpers.FullClose(stream); isValidStreamError(err) {
log.WithError(err).Debugf("Could not reset stream for protocol %s", stream.Protocol())
}
}()
defer closeStream(stream, log)
code, errMsg, err := ReadStatusCode(stream, s.p2p.Encoding())
if err != nil {
return nil, err
Expand Down
Loading