-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update go-libp2p to 0.12.0 #8015
Changes from all commits
c50700d
c8368a8
c89e777
139c6d7
a1891ba
62cfb57
6f48d2d
271477d
fc43215
bac1b35
d31334a
0d3efc4
f538ea5
85539d1
4e2bd5e
f581f7a
d81c28c
2670100
b1f21ea
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,8 @@ import ( | |
|
||
// Send a message to a specific peer. The returned stream may be used for reading, but has been | ||
// closed for writing. | ||
// | ||
// When done, the caller must Close or Reset on the stream. | ||
func (s *Service) Send(ctx context.Context, message interface{}, baseTopic string, pid peer.ID) (network.Stream, error) { | ||
ctx, span := trace.StartSpan(ctx, "p2p.Send") | ||
defer span.End() | ||
|
@@ -31,18 +33,20 @@ func (s *Service) Send(ctx context.Context, message interface{}, baseTopic strin | |
return nil, err | ||
} | ||
// do not encode anything if we are sending a metadata request | ||
if baseTopic == RPCMetaDataTopic { | ||
return stream, nil | ||
} | ||
|
||
if _, err := s.Encoding().EncodeWithMaxLength(stream, message); err != nil { | ||
traceutil.AnnotateError(span, err) | ||
return nil, err | ||
if baseTopic != RPCMetaDataTopic { | ||
if _, err := s.Encoding().EncodeWithMaxLength(stream, message); err != nil { | ||
traceutil.AnnotateError(span, err) | ||
_err := stream.Reset() | ||
_ = _err | ||
return nil, err | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We now close the stream for writing after "sending" a metadata request. I'm pretty sure the issue here was due to a bug that I fixed in go-libp2p 0.12.0. Please make sure to test this. Background: Protocol negotiation in libp2p is usually lazy. This lets us bundle the first write with the protocol headers. Unfortunately, there was a bug in go-libp2p where calling Close before reading or writing would close the stream for writing without flushing the protocol headers. This has since been fixed. |
||
} | ||
|
||
// Close stream for writing. | ||
if err := stream.Close(); err != nil { | ||
if err := stream.CloseWrite(); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Build is complaining about this.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have to assume this is some kind of bazel thing. CloseWrite is implemented on the stream interface. |
||
traceutil.AnnotateError(span, err) | ||
_err := stream.Reset() | ||
_ = _err | ||
return nil, err | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -63,17 +63,24 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) { | |
s.p2p.SetStreamHandler(topic, func(stream network.Stream) { | ||
ctx, cancel := context.WithTimeout(s.ctx, ttfbTimeout) | ||
defer cancel() | ||
|
||
// Resetting after closing is a no-op so defer a reset in case something goes wrong. | ||
// It's up to the handler to Close the stream (send an EOF) if | ||
// it successfully writes a response. We don't blindly call | ||
// Close here because we may have only written a partial | ||
// response. | ||
defer func() { | ||
closeStream(stream, log) | ||
_err := stream.Reset() | ||
_ = _err | ||
}() | ||
|
||
ctx, span := trace.StartSpan(ctx, "sync.rpc") | ||
defer span.End() | ||
span.AddAttributes(trace.StringAttribute("topic", topic)) | ||
span.AddAttributes(trace.StringAttribute("peer", stream.Conn().RemotePeer().Pretty())) | ||
log := log.WithField("peer", stream.Conn().RemotePeer().Pretty()) | ||
// Check before hand that peer is valid. | ||
if s.p2p.Peers().IsBad(stream.Conn().RemotePeer()) { | ||
closeStream(stream, log) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we're walking away, we might as well just reset the stream. |
||
if err := s.sendGoodByeAndDisconnect(ctx, p2ptypes.GoodbyeCodeBanned, stream.Conn().RemotePeer()); err != nil { | ||
log.Debugf("Could not disconnect from peer: %v", err) | ||
} | ||
|
@@ -147,6 +154,5 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) { | |
traceutil.AnnotateError(span, err) | ||
} | ||
} | ||
|
||
}) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,11 +21,6 @@ import ( | |
func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { | ||
ctx, span := trace.StartSpan(ctx, "sync.BeaconBlocksByRangeHandler") | ||
defer span.End() | ||
defer func() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm now explicitly closing on success. If we don't close the stream, the main RPC handler will reset it. |
||
if err := stream.Close(); err != nil { | ||
log.WithError(err).Debug("Could not close stream") | ||
} | ||
}() | ||
ctx, cancel := context.WithTimeout(ctx, respTimeout) | ||
defer cancel() | ||
SetRPCStreamDeadlines(stream) | ||
|
@@ -117,6 +112,7 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa | |
// wait for ticker before resuming streaming blocks to remote peer. | ||
<-ticker.C | ||
} | ||
closeStream(stream, log) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we have the RPC handler always close the stream instead of resetting, we can remove this. Up to you. |
||
return nil | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't great. Why does stream.Reset() return an error if we aren't going to handle it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It returns an error just in case there might be something useful to return (and as I said, it probably should never have returned an error in the first place). We can just log it, but that won't actually do anything in practice.