Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release v0.35.1 #2834

Merged
merged 8 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ require (
github.com/multiformats/go-varint v0.0.7
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
github.com/pion/datachannel v1.5.6
github.com/pion/ice/v2 v2.3.24
github.com/pion/ice/v2 v2.3.25
github.com/pion/logging v0.2.2
github.com/pion/sctp v1.8.16
github.com/pion/stun v0.6.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ github.com/pion/datachannel v1.5.6/go.mod h1:1eKT6Q85pRnr2mHiWHxJwO50SfZRtWHTsNI
github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s=
github.com/pion/dtls/v2 v2.2.11 h1:9U/dpCYl1ySttROPWJgqWKEylUdT0fXp/xst6JwY5Ks=
github.com/pion/dtls/v2 v2.2.11/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
github.com/pion/ice/v2 v2.3.24 h1:RYgzhH/u5lH0XO+ABatVKCtRd+4U1GEaCXSMjNr13tI=
github.com/pion/ice/v2 v2.3.24/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw=
github.com/pion/ice/v2 v2.3.25 h1:M5rJA07dqhi3nobJIg+uPtcVjFECTrhcR3n0ns8kDZs=
github.com/pion/ice/v2 v2.3.25/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw=
github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M=
github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
Expand Down
2 changes: 1 addition & 1 deletion p2p/http/libp2phttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type WellKnownHandler struct {

// streamHostListen returns a net.Listener that listens on libp2p streams for HTTP/1.1 messages.
func streamHostListen(streamHost host.Host) (net.Listener, error) {
return gostream.Listen(streamHost, ProtocolIDForMultistreamSelect)
return gostream.Listen(streamHost, ProtocolIDForMultistreamSelect, gostream.IgnoreEOF())
}

func (h *WellKnownHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand Down
36 changes: 36 additions & 0 deletions p2p/http/libp2phttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,3 +719,39 @@ func TestServerLegacyWellKnownResource(t *testing.T) {
}

}

func TestResponseWriterShouldNotHaveCancelledContext(t *testing.T) {
h, err := libp2p.New()
require.NoError(t, err)
defer h.Close()
httpHost := libp2phttp.Host{StreamHost: h}
go httpHost.Serve()
defer httpHost.Close()

closeNotifyCh := make(chan bool, 1)
httpHost.SetHTTPHandlerAtPath("/test", "/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Legacy code uses this to check if the connection was closed
//lint:ignore SA1019 This is a test to assert we do the right thing since Go HTTP stdlib depends on this.
ch := w.(http.CloseNotifier).CloseNotify()
select {
case <-ch:
closeNotifyCh <- true
case <-time.After(100 * time.Millisecond):
closeNotifyCh <- false
}
w.WriteHeader(http.StatusOK)
}))

clientH, err := libp2p.New()
require.NoError(t, err)
defer clientH.Close()
clientHost := libp2phttp.Host{StreamHost: clientH}

rt, err := clientHost.NewConstrainedRoundTripper(peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()})
require.NoError(t, err)
httpClient := &http.Client{Transport: rt}
_, err = httpClient.Get("/")
require.NoError(t, err)

require.False(t, <-closeNotifyCh)
}
16 changes: 13 additions & 3 deletions p2p/net/gostream/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gostream

import (
"context"
"io"
"net"

"github.com/libp2p/go-libp2p/core/host"
Expand All @@ -14,11 +15,20 @@ import (
// libp2p streams.
type conn struct {
network.Stream
ignoreEOF bool
}

func (c *conn) Read(b []byte) (int, error) {
n, err := c.Stream.Read(b)
if err != nil && c.ignoreEOF && err == io.EOF {
return n, nil
}
return n, err
}

// newConn creates a conn given a libp2p stream
func newConn(s network.Stream) net.Conn {
return &conn{s}
func newConn(s network.Stream, ignoreEOF bool) net.Conn {
return &conn{s, ignoreEOF}
}

// LocalAddr returns the local network address.
Expand All @@ -39,5 +49,5 @@ func Dial(ctx context.Context, h host.Host, pid peer.ID, tag protocol.ID) (net.C
if err != nil {
return nil, err
}
return newConn(s), nil
return newConn(s, false), nil
}
22 changes: 20 additions & 2 deletions p2p/net/gostream/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ type listener struct {
tag protocol.ID
cancel func()
streamCh chan network.Stream
// ignoreEOF is a flag that tells the listener to return conns that ignore EOF errors.
// Necessary because the default responsewriter will consider a connection closed if it reads EOF.
// But when on streams, it's fine for us to read EOF, but still be able to write.
ignoreEOF bool
}

// Accept returns the next a connection to this listener.
Expand All @@ -26,7 +30,7 @@ type listener struct {
func (l *listener) Accept() (net.Conn, error) {
select {
case s := <-l.streamCh:
return newConn(s), nil
return newConn(s, l.ignoreEOF), nil
case <-l.ctx.Done():
return nil, l.ctx.Err()
}
Expand All @@ -48,7 +52,7 @@ func (l *listener) Addr() net.Addr {
// Listen provides a standard net.Listener ready to accept "connections".
// Under the hood, these connections are libp2p streams tagged with the
// given protocol.ID.
func Listen(h host.Host, tag protocol.ID) (net.Listener, error) {
func Listen(h host.Host, tag protocol.ID, opts ...ListenerOption) (net.Listener, error) {
ctx, cancel := context.WithCancel(context.Background())

l := &listener{
Expand All @@ -58,6 +62,11 @@ func Listen(h host.Host, tag protocol.ID) (net.Listener, error) {
tag: tag,
streamCh: make(chan network.Stream),
}
for _, opt := range opts {
if err := opt(l); err != nil {
return nil, err
}
}

h.SetStreamHandler(tag, func(s network.Stream) {
select {
Expand All @@ -69,3 +78,12 @@ func Listen(h host.Host, tag protocol.ID) (net.Listener, error) {

return l, nil
}

type ListenerOption func(*listener) error

func IgnoreEOF() ListenerOption {
return func(l *listener) error {
l.ignoreEOF = true
return nil
}
}
28 changes: 23 additions & 5 deletions p2p/protocol/circuitv2/client/reservation.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,7 @@ func Reserve(ctx context.Context, h host.Host, ai peer.AddrInfo) (*Reservation,
}

if msg.GetType() != pbv2.HopMessage_STATUS {
return nil, ReservationError{
Status: pbv2.Status_MALFORMED_MESSAGE,
Reason: fmt.Sprintf("unexpected relay response: not a status message (%d)", msg.GetType()),
err: err}
return nil, ReservationError{Status: pbv2.Status_MALFORMED_MESSAGE, Reason: fmt.Sprintf("unexpected relay response: not a status message (%d)", msg.GetType())}
}

if status := msg.GetStatus(); status != pbv2.Status_OK {
Expand Down Expand Up @@ -130,7 +127,7 @@ func Reserve(ctx context.Context, h host.Host, ai peer.AddrInfo) (*Reservation,

voucherBytes := rsvp.GetVoucher()
if voucherBytes != nil {
_, rec, err := record.ConsumeEnvelope(voucherBytes, proto.RecordDomain)
env, rec, err := record.ConsumeEnvelope(voucherBytes, proto.RecordDomain)
if err != nil {
return nil, ReservationError{
Status: pbv2.Status_MALFORMED_MESSAGE,
Expand All @@ -146,6 +143,27 @@ func Reserve(ctx context.Context, h host.Host, ai peer.AddrInfo) (*Reservation,
Reason: fmt.Sprintf("unexpected voucher record type: %+T", rec),
}
}
signerPeerID, err := peer.IDFromPublicKey(env.PublicKey)
if err != nil {
return nil, ReservationError{
Status: pbv2.Status_MALFORMED_MESSAGE,
Reason: fmt.Sprintf("invalid voucher signing public key: %s", err),
err: err,
}
}
if signerPeerID != voucher.Relay {
return nil, ReservationError{
Status: pbv2.Status_MALFORMED_MESSAGE,
Reason: fmt.Sprintf("invalid voucher relay id: expected %s, got %s", signerPeerID, voucher.Relay),
}
}
if h.ID() != voucher.Peer {
return nil, ReservationError{
Status: pbv2.Status_MALFORMED_MESSAGE,
Reason: fmt.Sprintf("invalid voucher peer id: expected %s, got %s", h.ID(), voucher.Peer),
}

}
result.Voucher = voucher
}

Expand Down
42 changes: 42 additions & 0 deletions p2p/protocol/circuitv2/client/reservation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/record"
"github.com/libp2p/go-libp2p/core/test"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
Expand Down Expand Up @@ -84,6 +87,45 @@ func TestReservationFailures(t *testing.T) {
err: "error consuming voucher envelope: failed when unmarshalling the envelope",
status: pbv2.Status_MALFORMED_MESSAGE,
},
{
name: "invalid voucher 2",
streamHandler: func(s network.Stream) {
status := pbv2.Status_OK
expire := uint64(time.Now().Add(time.Hour).UnixNano())
priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256)
if err != nil {
s.Reset()
return
}
relay, _ := test.RandPeerID()
peer, _ := test.RandPeerID()
voucher := &proto.ReservationVoucher{
Relay: relay,
Peer: peer,
Expiration: time.Now().Add(time.Hour),
}
signedVoucher, err := record.Seal(voucher, priv)
if err != nil {
s.Reset()
return
}
env, err := signedVoucher.Marshal()
if err != nil {
s.Reset()
return
}
util.NewDelimitedWriter(s).WriteMsg(&pbv2.HopMessage{
Type: pbv2.HopMessage_STATUS.Enum(),
Status: &status,
Reservation: &pbv2.Reservation{
Expire: &expire,
Voucher: env,
},
})
},
err: "invalid voucher relay id",
status: pbv2.Status_MALFORMED_MESSAGE,
},
}

for _, tc := range testcases {
Expand Down
23 changes: 14 additions & 9 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bo
ids.Host.Peerstore().UpdateAddrs(p, peerstore.TempAddrTTL, 0)
ids.addrMu.Unlock()

log.Debugf("%s received listen addrs for %s: %s", c.LocalPeer(), c.RemotePeer(), lmaddrs)
log.Debugf("%s received listen addrs for %s: %s", c.LocalPeer(), c.RemotePeer(), addrs)

// get protocol versions
pv := mes.GetProtocolVersion()
Expand Down Expand Up @@ -1064,18 +1064,23 @@ func (nn *netNotifiee) Disconnected(_ network.Network, c network.Conn) {
func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {}
func (nn *netNotifiee) ListenClose(n network.Network, a ma.Multiaddr) {}

// filterAddrs filters the address slice based on the remove multiaddr:
// * if it's a localhost address, no filtering is applied
// * if it's a local network address, all localhost addresses are filtered out
// * if it's a public address, all localhost and local network addresses are filtered out
// filterAddrs filters the address slice based on the remote multiaddr:
// - if it's a localhost address, no filtering is applied
// - if it's a private network address, all localhost addresses are filtered out
// - if it's a public address, all non-public addresses are filtered out
// - if none of the above, (e.g. discard prefix), no filtering is applied.
// We can't do anything meaningful here so we do nothing.
func filterAddrs(addrs []ma.Multiaddr, remote ma.Multiaddr) []ma.Multiaddr {
if manet.IsIPLoopback(remote) {
switch {
case manet.IsIPLoopback(remote):
return addrs
}
if manet.IsPrivateAddr(remote) {
case manet.IsPrivateAddr(remote):
return ma.FilterAddrs(addrs, func(a ma.Multiaddr) bool { return !manet.IsIPLoopback(a) })
case manet.IsPublicAddr(remote):
return ma.FilterAddrs(addrs, manet.IsPublicAddr)
default:
return addrs
}
return ma.FilterAddrs(addrs, manet.IsPublicAddr)
}

func trimHostAddrList(addrs []ma.Multiaddr, maxSize int) []ma.Multiaddr {
Expand Down
11 changes: 6 additions & 5 deletions p2p/protocol/identify/obsaddr.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,12 @@ func (o *ObservedAddrManager) removeConn(conn connMultiaddrs) {
o.mu.Lock()
defer o.mu.Unlock()

observedTWAddr, ok := o.connObservedTWAddrs[conn]
if !ok {
return
}
delete(o.connObservedTWAddrs, conn)

// normalize before obtaining the thinWaist so that we are always dealing
// with the normalized form of the address
localTW, err := thinWaistForm(o.normalize(conn.LocalMultiaddr()))
Expand All @@ -467,11 +473,6 @@ func (o *ObservedAddrManager) removeConn(conn connMultiaddrs) {
delete(o.localAddrs, string(localTW.Addr.Bytes()))
}

observedTWAddr, ok := o.connObservedTWAddrs[conn]
if !ok {
return
}
delete(o.connObservedTWAddrs, conn)
observer, err := getObserver(conn.RemoteMultiaddr())
if err != nil {
return
Expand Down
Loading
Loading