From c59cf257aa9f54226bceaa270f7baadba4d0d087 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 5 Jun 2024 09:32:39 -0700 Subject: [PATCH 1/8] identify: Don't filter addr if remote is neither public nor private (#2820) Updates the filterAddrs logic to no-op if the address is neither public nor private. This fixes an issue in mocknet that assigns each node an address in the IPv6 discard prefix space. That doesn't interact well with this logic in identify. The issue mocknet hits is that it filters out all received listen addresses and then doesn't remember any address for the peer. --- p2p/protocol/identify/id.go | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index a91cc4f92e..e91fdee76e 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -831,7 +831,7 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bo ids.Host.Peerstore().UpdateAddrs(p, peerstore.TempAddrTTL, 0) ids.addrMu.Unlock() - log.Debugf("%s received listen addrs for %s: %s", c.LocalPeer(), c.RemotePeer(), lmaddrs) + log.Debugf("%s received listen addrs for %s: %s", c.LocalPeer(), c.RemotePeer(), addrs) // get protocol versions pv := mes.GetProtocolVersion() @@ -1064,18 +1064,23 @@ func (nn *netNotifiee) Disconnected(_ network.Network, c network.Conn) { func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {} func (nn *netNotifiee) ListenClose(n network.Network, a ma.Multiaddr) {} -// filterAddrs filters the address slice based on the remove multiaddr: -// * if it's a localhost address, no filtering is applied -// * if it's a local network address, all localhost addresses are filtered out -// * if it's a public address, all localhost and local network addresses are filtered out +// filterAddrs filters the address slice based on the remote multiaddr: +// - if it's a localhost address, no filtering is applied +// - if it's a private network address, all localhost addresses are filtered out +// - if it's a public address, all non-public addresses are filtered out +// - if none of the above, (e.g. discard prefix), no filtering is applied. +// We can't do anything meaningful here so we do nothing. func filterAddrs(addrs []ma.Multiaddr, remote ma.Multiaddr) []ma.Multiaddr { - if manet.IsIPLoopback(remote) { + switch { + case manet.IsIPLoopback(remote): return addrs - } - if manet.IsPrivateAddr(remote) { + case manet.IsPrivateAddr(remote): return ma.FilterAddrs(addrs, func(a ma.Multiaddr) bool { return !manet.IsIPLoopback(a) }) + case manet.IsPublicAddr(remote): + return ma.FilterAddrs(addrs, manet.IsPublicAddr) + default: + return addrs } - return ma.FilterAddrs(addrs, manet.IsPublicAddr) } func trimHostAddrList(addrs []ma.Multiaddr, maxSize int) []ma.Multiaddr { From 6bb59af6572c8bd170505efc1a28e7f859f765a8 Mon Sep 17 00:00:00 2001 From: Ivan Shvedunov Date: Sat, 8 Jun 2024 10:35:50 +0400 Subject: [PATCH 2/8] identify: fix bug in observed address handling (#2825) --- p2p/protocol/identify/obsaddr.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/p2p/protocol/identify/obsaddr.go b/p2p/protocol/identify/obsaddr.go index 4437c4b011..fc1c100c8e 100644 --- a/p2p/protocol/identify/obsaddr.go +++ b/p2p/protocol/identify/obsaddr.go @@ -452,6 +452,12 @@ func (o *ObservedAddrManager) removeConn(conn connMultiaddrs) { o.mu.Lock() defer o.mu.Unlock() + observedTWAddr, ok := o.connObservedTWAddrs[conn] + if !ok { + return + } + delete(o.connObservedTWAddrs, conn) + // normalize before obtaining the thinWaist so that we are always dealing // with the normalized form of the address localTW, err := thinWaistForm(o.normalize(conn.LocalMultiaddr())) @@ -467,11 +473,6 @@ func (o *ObservedAddrManager) removeConn(conn connMultiaddrs) { delete(o.localAddrs, string(localTW.Addr.Bytes())) } - observedTWAddr, ok := o.connObservedTWAddrs[conn] - if !ok { - return - } - delete(o.connObservedTWAddrs, conn) observer, err := getObserver(conn.RemoteMultiaddr()) if err != nil { return From c142e5aa171a76a4093ea838b91fc6aa002f8d60 Mon Sep 17 00:00:00 2001 From: Ivan Shvedunov Date: Sun, 9 Jun 2024 16:30:33 +0400 Subject: [PATCH 3/8] identify: add test for observed address handling (#2828) This modifies TestObservedAddrManager to verify the fix in #2825 --- p2p/protocol/identify/obsaddr_test.go | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/p2p/protocol/identify/obsaddr_test.go b/p2p/protocol/identify/obsaddr_test.go index 9c2d8dee57..94366f882e 100644 --- a/p2p/protocol/identify/obsaddr_test.go +++ b/p2p/protocol/identify/obsaddr_test.go @@ -153,7 +153,7 @@ func TestObservedAddrManager(t *testing.T) { var ob1, ob2 [N]connMultiaddrs for i := 0; i < N; i++ { ob1[i] = newConn(quic4ListenAddr, ma.StringCast(fmt.Sprintf("/ip4/1.2.3.%d/udp/1/quic-v1", i))) - ob2[i] = newConn(quic4ListenAddr, ma.StringCast(fmt.Sprintf("/ip4/1.2.3.%d/udp/1/quic-v1", i))) + ob2[i] = newConn(quic4ListenAddr, ma.StringCast(fmt.Sprintf("/ip4/1.2.3.%d/udp/2/quic-v1", i))) } for i := 0; i < N-1; i++ { o.Record(ob1[i], observedQuic) @@ -186,6 +186,7 @@ func TestObservedAddrManager(t *testing.T) { return checkAllEntriesRemoved(o) }, 2*time.Second, 100*time.Millisecond) }) + t.Run("SameObserversDifferentAddrs", func(t *testing.T) { o := newObservedAddrMgr() defer o.Close() @@ -197,7 +198,7 @@ func TestObservedAddrManager(t *testing.T) { var ob1, ob2 [N]connMultiaddrs for i := 0; i < N; i++ { ob1[i] = newConn(quic4ListenAddr, ma.StringCast(fmt.Sprintf("/ip4/1.2.3.%d/udp/1/quic-v1", i))) - ob2[i] = newConn(quic4ListenAddr, ma.StringCast(fmt.Sprintf("/ip4/1.2.3.%d/udp/1/quic-v1", i))) + ob2[i] = newConn(quic4ListenAddr, ma.StringCast(fmt.Sprintf("/ip4/1.2.3.%d/udp/2/quic-v1", i))) } for i := 0; i < N-1; i++ { o.Record(ob1[i], observedQuic1) @@ -238,6 +239,8 @@ func TestObservedAddrManager(t *testing.T) { c2 := newConn(quic4ListenAddr, ma.StringCast("/ip4/1.2.3.2/udp/1/quic-v1")) c3 := newConn(webTransport4ListenAddr, ma.StringCast("/ip4/1.2.3.3/udp/1/quic-v1/webtransport")) c4 := newConn(webTransport4ListenAddr, ma.StringCast("/ip4/1.2.3.4/udp/1/quic-v1/webtransport")) + c5 := newConn(quic4ListenAddr, ma.StringCast("/ip4/1.2.3.5/udp/1/quic-v1")) + c6 := newConn(quic4ListenAddr, ma.StringCast("/ip4/1.2.3.6/udp/1/quic-v1")) var observedQuic, observedWebTransport ma.Multiaddr for i := 0; i < 10; i++ { // Change the IP address in each observation @@ -247,6 +250,7 @@ func TestObservedAddrManager(t *testing.T) { o.Record(c2, observedQuic) o.Record(c3, observedWebTransport) o.Record(c4, observedWebTransport) + o.Record(c5, observedQuic) time.Sleep(20 * time.Millisecond) } @@ -258,13 +262,23 @@ func TestObservedAddrManager(t *testing.T) { require.NoError(t, err) require.Less(t, len(o.externalAddrs[string(tw.TW.Bytes())]), 2) - require.Equal(t, o.AddrsFor(webTransport4ListenAddr), []ma.Multiaddr{observedWebTransport}) - require.Equal(t, o.AddrsFor(quic4ListenAddr), []ma.Multiaddr{observedQuic}) + require.Equal(t, []ma.Multiaddr{observedWebTransport}, o.AddrsFor(webTransport4ListenAddr)) + require.Equal(t, []ma.Multiaddr{observedQuic}, o.AddrsFor(quic4ListenAddr)) + require.ElementsMatch(t, []ma.Multiaddr{observedQuic, observedWebTransport}, o.Addrs()) + + for i := 0; i < 3; i++ { + // remove non-recorded connection + o.removeConn(c6) + } + require.Equal(t, []ma.Multiaddr{observedWebTransport}, o.AddrsFor(webTransport4ListenAddr)) + require.Equal(t, []ma.Multiaddr{observedQuic}, o.AddrsFor(quic4ListenAddr)) + require.ElementsMatch(t, []ma.Multiaddr{observedQuic, observedWebTransport}, o.Addrs()) o.removeConn(c1) o.removeConn(c2) o.removeConn(c3) o.removeConn(c4) + o.removeConn(c5) require.Eventually(t, func() bool { return checkAllEntriesRemoved(o) }, 1*time.Second, 100*time.Millisecond) @@ -411,7 +425,7 @@ func TestObservedAddrManager(t *testing.T) { return checkAllEntriesRemoved(o) }, 1*time.Second, 100*time.Millisecond) }) - t.Run("Nill Input", func(t *testing.T) { + t.Run("Nil Input", func(t *testing.T) { o := newObservedAddrMgr() defer o.Close() o.maybeRecordObservation(nil, nil) From 4d511cd3a570092a260038abcbcb8dc0e41b0775 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Mon, 10 Jun 2024 15:37:04 -0700 Subject: [PATCH 4/8] libp2phttp: workaround for ResponseWriter's CloseNotifier (#2821) * libp2phttp: workaround for CloseNotifier * Add lintignore --- p2p/http/libp2phttp.go | 2 +- p2p/http/libp2phttp_test.go | 36 ++++++++++++++++++++++++++++++++++++ p2p/net/gostream/conn.go | 16 +++++++++++++--- p2p/net/gostream/listener.go | 22 ++++++++++++++++++++-- 4 files changed, 70 insertions(+), 6 deletions(-) diff --git a/p2p/http/libp2phttp.go b/p2p/http/libp2phttp.go index 5ff025e3ab..e5078f772e 100644 --- a/p2p/http/libp2phttp.go +++ b/p2p/http/libp2phttp.go @@ -61,7 +61,7 @@ type WellKnownHandler struct { // streamHostListen returns a net.Listener that listens on libp2p streams for HTTP/1.1 messages. func streamHostListen(streamHost host.Host) (net.Listener, error) { - return gostream.Listen(streamHost, ProtocolIDForMultistreamSelect) + return gostream.Listen(streamHost, ProtocolIDForMultistreamSelect, gostream.IgnoreEOF()) } func (h *WellKnownHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { diff --git a/p2p/http/libp2phttp_test.go b/p2p/http/libp2phttp_test.go index a444c6e209..e7e66bb4cd 100644 --- a/p2p/http/libp2phttp_test.go +++ b/p2p/http/libp2phttp_test.go @@ -719,3 +719,39 @@ func TestServerLegacyWellKnownResource(t *testing.T) { } } + +func TestResponseWriterShouldNotHaveCancelledContext(t *testing.T) { + h, err := libp2p.New() + require.NoError(t, err) + defer h.Close() + httpHost := libp2phttp.Host{StreamHost: h} + go httpHost.Serve() + defer httpHost.Close() + + closeNotifyCh := make(chan bool, 1) + httpHost.SetHTTPHandlerAtPath("/test", "/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Legacy code uses this to check if the connection was closed + //lint:ignore SA1019 This is a test to assert we do the right thing since Go HTTP stdlib depends on this. + ch := w.(http.CloseNotifier).CloseNotify() + select { + case <-ch: + closeNotifyCh <- true + case <-time.After(100 * time.Millisecond): + closeNotifyCh <- false + } + w.WriteHeader(http.StatusOK) + })) + + clientH, err := libp2p.New() + require.NoError(t, err) + defer clientH.Close() + clientHost := libp2phttp.Host{StreamHost: clientH} + + rt, err := clientHost.NewConstrainedRoundTripper(peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()}) + require.NoError(t, err) + httpClient := &http.Client{Transport: rt} + _, err = httpClient.Get("/") + require.NoError(t, err) + + require.False(t, <-closeNotifyCh) +} diff --git a/p2p/net/gostream/conn.go b/p2p/net/gostream/conn.go index 991dd2ff96..6959b6cbe0 100644 --- a/p2p/net/gostream/conn.go +++ b/p2p/net/gostream/conn.go @@ -2,6 +2,7 @@ package gostream import ( "context" + "io" "net" "github.com/libp2p/go-libp2p/core/host" @@ -14,11 +15,20 @@ import ( // libp2p streams. type conn struct { network.Stream + ignoreEOF bool +} + +func (c *conn) Read(b []byte) (int, error) { + n, err := c.Stream.Read(b) + if err != nil && c.ignoreEOF && err == io.EOF { + return n, nil + } + return n, err } // newConn creates a conn given a libp2p stream -func newConn(s network.Stream) net.Conn { - return &conn{s} +func newConn(s network.Stream, ignoreEOF bool) net.Conn { + return &conn{s, ignoreEOF} } // LocalAddr returns the local network address. @@ -39,5 +49,5 @@ func Dial(ctx context.Context, h host.Host, pid peer.ID, tag protocol.ID) (net.C if err != nil { return nil, err } - return newConn(s), nil + return newConn(s, false), nil } diff --git a/p2p/net/gostream/listener.go b/p2p/net/gostream/listener.go index 250e688050..f1146b0617 100644 --- a/p2p/net/gostream/listener.go +++ b/p2p/net/gostream/listener.go @@ -18,6 +18,10 @@ type listener struct { tag protocol.ID cancel func() streamCh chan network.Stream + // ignoreEOF is a flag that tells the listener to return conns that ignore EOF errors. + // Necessary because the default responsewriter will consider a connection closed if it reads EOF. + // But when on streams, it's fine for us to read EOF, but still be able to write. + ignoreEOF bool } // Accept returns the next a connection to this listener. @@ -26,7 +30,7 @@ type listener struct { func (l *listener) Accept() (net.Conn, error) { select { case s := <-l.streamCh: - return newConn(s), nil + return newConn(s, l.ignoreEOF), nil case <-l.ctx.Done(): return nil, l.ctx.Err() } @@ -48,7 +52,7 @@ func (l *listener) Addr() net.Addr { // Listen provides a standard net.Listener ready to accept "connections". // Under the hood, these connections are libp2p streams tagged with the // given protocol.ID. -func Listen(h host.Host, tag protocol.ID) (net.Listener, error) { +func Listen(h host.Host, tag protocol.ID, opts ...ListenerOption) (net.Listener, error) { ctx, cancel := context.WithCancel(context.Background()) l := &listener{ @@ -58,6 +62,11 @@ func Listen(h host.Host, tag protocol.ID) (net.Listener, error) { tag: tag, streamCh: make(chan network.Stream), } + for _, opt := range opts { + if err := opt(l); err != nil { + return nil, err + } + } h.SetStreamHandler(tag, func(s network.Stream) { select { @@ -69,3 +78,12 @@ func Listen(h host.Host, tag protocol.ID) (net.Listener, error) { return l, nil } + +type ListenerOption func(*listener) error + +func IgnoreEOF() ListenerOption { + return func(l *listener) error { + l.ignoreEOF = true + return nil + } +} From 0d9abb56ed779652c15e1b80cedd9cb648d4303a Mon Sep 17 00:00:00 2001 From: sukun Date: Wed, 12 Jun 2024 16:25:33 +0530 Subject: [PATCH 5/8] circuitv2: improve voucher validation (#2826) --- p2p/protocol/circuitv2/client/reservation.go | 28 ++++++++++--- .../circuitv2/client/reservation_test.go | 42 +++++++++++++++++++ 2 files changed, 65 insertions(+), 5 deletions(-) diff --git a/p2p/protocol/circuitv2/client/reservation.go b/p2p/protocol/circuitv2/client/reservation.go index dbb9241937..462c01d236 100644 --- a/p2p/protocol/circuitv2/client/reservation.go +++ b/p2p/protocol/circuitv2/client/reservation.go @@ -93,10 +93,7 @@ func Reserve(ctx context.Context, h host.Host, ai peer.AddrInfo) (*Reservation, } if msg.GetType() != pbv2.HopMessage_STATUS { - return nil, ReservationError{ - Status: pbv2.Status_MALFORMED_MESSAGE, - Reason: fmt.Sprintf("unexpected relay response: not a status message (%d)", msg.GetType()), - err: err} + return nil, ReservationError{Status: pbv2.Status_MALFORMED_MESSAGE, Reason: fmt.Sprintf("unexpected relay response: not a status message (%d)", msg.GetType())} } if status := msg.GetStatus(); status != pbv2.Status_OK { @@ -130,7 +127,7 @@ func Reserve(ctx context.Context, h host.Host, ai peer.AddrInfo) (*Reservation, voucherBytes := rsvp.GetVoucher() if voucherBytes != nil { - _, rec, err := record.ConsumeEnvelope(voucherBytes, proto.RecordDomain) + env, rec, err := record.ConsumeEnvelope(voucherBytes, proto.RecordDomain) if err != nil { return nil, ReservationError{ Status: pbv2.Status_MALFORMED_MESSAGE, @@ -146,6 +143,27 @@ func Reserve(ctx context.Context, h host.Host, ai peer.AddrInfo) (*Reservation, Reason: fmt.Sprintf("unexpected voucher record type: %+T", rec), } } + signerPeerID, err := peer.IDFromPublicKey(env.PublicKey) + if err != nil { + return nil, ReservationError{ + Status: pbv2.Status_MALFORMED_MESSAGE, + Reason: fmt.Sprintf("invalid voucher signing public key: %s", err), + err: err, + } + } + if signerPeerID != voucher.Relay { + return nil, ReservationError{ + Status: pbv2.Status_MALFORMED_MESSAGE, + Reason: fmt.Sprintf("invalid voucher relay id: expected %s, got %s", signerPeerID, voucher.Relay), + } + } + if h.ID() != voucher.Peer { + return nil, ReservationError{ + Status: pbv2.Status_MALFORMED_MESSAGE, + Reason: fmt.Sprintf("invalid voucher peer id: expected %s, got %s", h.ID(), voucher.Peer), + } + + } result.Voucher = voucher } diff --git a/p2p/protocol/circuitv2/client/reservation_test.go b/p2p/protocol/circuitv2/client/reservation_test.go index decb3e71de..d1ab6dc683 100644 --- a/p2p/protocol/circuitv2/client/reservation_test.go +++ b/p2p/protocol/circuitv2/client/reservation_test.go @@ -8,8 +8,11 @@ import ( "time" "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/record" + "github.com/libp2p/go-libp2p/core/test" "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client" pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb" "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" @@ -84,6 +87,45 @@ func TestReservationFailures(t *testing.T) { err: "error consuming voucher envelope: failed when unmarshalling the envelope", status: pbv2.Status_MALFORMED_MESSAGE, }, + { + name: "invalid voucher 2", + streamHandler: func(s network.Stream) { + status := pbv2.Status_OK + expire := uint64(time.Now().Add(time.Hour).UnixNano()) + priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256) + if err != nil { + s.Reset() + return + } + relay, _ := test.RandPeerID() + peer, _ := test.RandPeerID() + voucher := &proto.ReservationVoucher{ + Relay: relay, + Peer: peer, + Expiration: time.Now().Add(time.Hour), + } + signedVoucher, err := record.Seal(voucher, priv) + if err != nil { + s.Reset() + return + } + env, err := signedVoucher.Marshal() + if err != nil { + s.Reset() + return + } + util.NewDelimitedWriter(s).WriteMsg(&pbv2.HopMessage{ + Type: pbv2.HopMessage_STATUS.Enum(), + Status: &status, + Reservation: &pbv2.Reservation{ + Expire: &expire, + Voucher: env, + }, + }) + }, + err: "invalid voucher relay id", + status: pbv2.Status_MALFORMED_MESSAGE, + }, } for _, tc := range testcases { From c5e2b33ee1eb4949b946bb0b96242f5c9ab760ed Mon Sep 17 00:00:00 2001 From: sukun Date: Wed, 12 Jun 2024 16:25:57 +0530 Subject: [PATCH 6/8] webrtc: fix ufrag prefix for dialing (#2832) --- p2p/transport/webrtc/transport.go | 8 +++++--- p2p/transport/webrtc/transport_test.go | 7 +++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/p2p/transport/webrtc/transport.go b/p2p/transport/webrtc/transport.go index b04753ecab..68a2988c78 100644 --- a/p2p/transport/webrtc/transport.go +++ b/p2p/transport/webrtc/transport.go @@ -415,15 +415,17 @@ func genUfrag() string { uFragAlphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890" uFragPrefix = "libp2p+webrtc+v1/" uFragIdLength = 32 - uFragIdOffset = len(uFragPrefix) - uFragLength = uFragIdOffset + uFragIdLength + uFragLength = len(uFragPrefix) + uFragIdLength ) seed := [8]byte{} rand.Read(seed[:]) r := mrand.New(mrand.NewSource(binary.BigEndian.Uint64(seed[:]))) b := make([]byte, uFragLength) - for i := uFragIdOffset; i < uFragLength; i++ { + for i := 0; i < len(uFragPrefix); i++ { + b[i] = uFragPrefix[i] + } + for i := len(uFragPrefix); i < uFragLength; i++ { b[i] = uFragAlphabet[r.Intn(len(uFragAlphabet))] } return string(b) diff --git a/p2p/transport/webrtc/transport_test.go b/p2p/transport/webrtc/transport_test.go index a3054a82df..5831a257a7 100644 --- a/p2p/transport/webrtc/transport_test.go +++ b/p2p/transport/webrtc/transport_test.go @@ -860,3 +860,10 @@ func TestMaxInFlightRequests(t *testing.T) { require.Equal(t, count, int(success.Load()), "expected exactly 3 dial successes") require.Equal(t, 1, int(fails.Load()), "expected exactly 1 dial failure") } + +func TestGenUfrag(t *testing.T) { + for i := 0; i < 10; i++ { + s := genUfrag() + require.True(t, strings.HasPrefix(s, "libp2p+webrtc+v1/")) + } +} From 789263e45bc978b09d92156c7253f558039d4589 Mon Sep 17 00:00:00 2001 From: sukun Date: Wed, 12 Jun 2024 16:41:15 +0530 Subject: [PATCH 7/8] webrtc: add a test for establishing many connections (#2801) Update pion/ice to include the fix for out of order ConnectionState update callbacks --- go.mod | 2 +- go.sum | 4 +- p2p/transport/webrtc/listener.go | 2 +- p2p/transport/webrtc/transport_test.go | 117 +++++++++++++++++++++++++ test-plans/go.mod | 2 +- test-plans/go.sum | 4 +- 6 files changed, 124 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index bff7698536..00808fdc58 100644 --- a/go.mod +++ b/go.mod @@ -44,7 +44,7 @@ require ( github.com/multiformats/go-varint v0.0.7 github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 github.com/pion/datachannel v1.5.6 - github.com/pion/ice/v2 v2.3.24 + github.com/pion/ice/v2 v2.3.25 github.com/pion/logging v0.2.2 github.com/pion/sctp v1.8.16 github.com/pion/stun v0.6.1 diff --git a/go.sum b/go.sum index 5455f12e29..a2a327b99b 100644 --- a/go.sum +++ b/go.sum @@ -278,8 +278,8 @@ github.com/pion/datachannel v1.5.6/go.mod h1:1eKT6Q85pRnr2mHiWHxJwO50SfZRtWHTsNI github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= github.com/pion/dtls/v2 v2.2.11 h1:9U/dpCYl1ySttROPWJgqWKEylUdT0fXp/xst6JwY5Ks= github.com/pion/dtls/v2 v2.2.11/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE= -github.com/pion/ice/v2 v2.3.24 h1:RYgzhH/u5lH0XO+ABatVKCtRd+4U1GEaCXSMjNr13tI= -github.com/pion/ice/v2 v2.3.24/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw= +github.com/pion/ice/v2 v2.3.25 h1:M5rJA07dqhi3nobJIg+uPtcVjFECTrhcR3n0ns8kDZs= +github.com/pion/ice/v2 v2.3.25/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw= github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M= github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= diff --git a/p2p/transport/webrtc/listener.go b/p2p/transport/webrtc/listener.go index 1834fc812b..3f465b34fc 100644 --- a/p2p/transport/webrtc/listener.go +++ b/p2p/transport/webrtc/listener.go @@ -330,7 +330,7 @@ func addOnConnectionStateChangeCallback(pc *webrtc.PeerConnection) <-chan error errC := make(chan error, 1) var once sync.Once pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { - switch state { + switch pc.ConnectionState() { case webrtc.PeerConnectionStateConnected: once.Do(func() { close(errC) }) case webrtc.PeerConnectionStateFailed: diff --git a/p2p/transport/webrtc/transport_test.go b/p2p/transport/webrtc/transport_test.go index 5831a257a7..b618ec85df 100644 --- a/p2p/transport/webrtc/transport_test.go +++ b/p2p/transport/webrtc/transport_test.go @@ -4,6 +4,7 @@ import ( "context" "crypto/rand" "encoding/hex" + "errors" "fmt" "io" "net" @@ -17,6 +18,7 @@ import ( "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + tpt "github.com/libp2p/go-libp2p/core/transport" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" "github.com/multiformats/go-multibase" @@ -867,3 +869,118 @@ func TestGenUfrag(t *testing.T) { require.True(t, strings.HasPrefix(s, "libp2p+webrtc+v1/")) } } + +func TestManyConnections(t *testing.T) { + var listeners []tpt.Listener + var listenerPeerIDs []peer.ID + + const numListeners = 5 + const dialersPerListener = 5 + const connsPerDialer = 10 + errCh := make(chan error, 10*numListeners*dialersPerListener*connsPerDialer) + successCh := make(chan struct{}, 10*numListeners*dialersPerListener*connsPerDialer) + + for i := 0; i < numListeners; i++ { + tr, lp := getTransport(t) + listenerPeerIDs = append(listenerPeerIDs, lp) + ln, err := tr.Listen(ma.StringCast("/ip4/127.0.0.1/udp/0/webrtc-direct")) + require.NoError(t, err) + defer ln.Close() + listeners = append(listeners, ln) + } + + runListenConn := func(conn tpt.CapableConn) { + defer conn.Close() + s, err := conn.AcceptStream() + if err != nil { + t.Errorf("accept stream failed for listener: %s", err) + errCh <- err + return + } + var b [4]byte + if _, err := s.Read(b[:]); err != nil { + t.Errorf("read stream failed for listener: %s", err) + errCh <- err + return + } + s.Write(b[:]) + _, err = s.Read(b[:]) // peer will close the connection after read + if !assert.Error(t, err) { + err = errors.New("invalid read: expected conn to close") + errCh <- err + return + } + successCh <- struct{}{} + } + + runDialConn := func(conn tpt.CapableConn) { + defer conn.Close() + + s, err := conn.OpenStream(context.Background()) + if err != nil { + t.Errorf("accept stream failed for listener: %s", err) + errCh <- err + return + } + var b [4]byte + if _, err := s.Write(b[:]); err != nil { + t.Errorf("write stream failed for dialer: %s", err) + errCh <- err + return + } + if _, err := s.Read(b[:]); err != nil { + t.Errorf("read stream failed for dialer: %s", err) + errCh <- err + return + } + s.Close() + } + + runListener := func(ln tpt.Listener) { + for i := 0; i < dialersPerListener*connsPerDialer; i++ { + conn, err := ln.Accept() + if err != nil { + t.Errorf("listener failed to accept conneciton: %s", err) + return + } + go runListenConn(conn) + } + } + + runDialer := func(ln tpt.Listener, lp peer.ID) { + tp, _ := getTransport(t) + for i := 0; i < connsPerDialer; i++ { + // We want to test for deadlocks, set a high timeout + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + conn, err := tp.Dial(ctx, ln.Multiaddr(), lp) + if err != nil { + t.Errorf("dial failed: %s", err) + errCh <- err + cancel() + return + } + runDialConn(conn) + cancel() + } + } + + for i := 0; i < numListeners; i++ { + go runListener(listeners[i]) + } + for i := 0; i < numListeners; i++ { + for j := 0; j < dialersPerListener; j++ { + go runDialer(listeners[i], listenerPeerIDs[i]) + } + } + + for i := 0; i < numListeners*dialersPerListener*connsPerDialer; i++ { + select { + case <-successCh: + t.Log("completed conn: ", i) + case err := <-errCh: + t.Fatalf("failed: %s", err) + case <-time.After(300 * time.Second): + t.Fatalf("timed out") + } + } +} diff --git a/test-plans/go.mod b/test-plans/go.mod index 64361ed334..6e4ff51d3d 100644 --- a/test-plans/go.mod +++ b/test-plans/go.mod @@ -66,7 +66,7 @@ require ( github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pion/datachannel v1.5.6 // indirect github.com/pion/dtls/v2 v2.2.11 // indirect - github.com/pion/ice/v2 v2.3.24 // indirect + github.com/pion/ice/v2 v2.3.25 // indirect github.com/pion/interceptor v0.1.29 // indirect github.com/pion/logging v0.2.2 // indirect github.com/pion/mdns v0.0.12 // indirect diff --git a/test-plans/go.sum b/test-plans/go.sum index 63eac7a98b..fdd19bbb02 100644 --- a/test-plans/go.sum +++ b/test-plans/go.sum @@ -226,8 +226,8 @@ github.com/pion/datachannel v1.5.6/go.mod h1:1eKT6Q85pRnr2mHiWHxJwO50SfZRtWHTsNI github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= github.com/pion/dtls/v2 v2.2.11 h1:9U/dpCYl1ySttROPWJgqWKEylUdT0fXp/xst6JwY5Ks= github.com/pion/dtls/v2 v2.2.11/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE= -github.com/pion/ice/v2 v2.3.24 h1:RYgzhH/u5lH0XO+ABatVKCtRd+4U1GEaCXSMjNr13tI= -github.com/pion/ice/v2 v2.3.24/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw= +github.com/pion/ice/v2 v2.3.25 h1:M5rJA07dqhi3nobJIg+uPtcVjFECTrhcR3n0ns8kDZs= +github.com/pion/ice/v2 v2.3.25/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw= github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M= github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= From 0952311c26b8507eeccf56f2d9eb94cecd27ecda Mon Sep 17 00:00:00 2001 From: sukun Date: Sat, 8 Jun 2024 12:07:55 +0530 Subject: [PATCH 8/8] release v0.35.1 --- version.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.json b/version.json index fb0aea4f0c..f765664902 100644 --- a/version.json +++ b/version.json @@ -1,3 +1,3 @@ { - "version": "v0.35.0" + "version": "v0.35.1" }