diff --git a/go.mod b/go.mod index 00808fdc58..bff7698536 100644 --- a/go.mod +++ b/go.mod @@ -44,7 +44,7 @@ require ( github.com/multiformats/go-varint v0.0.7 github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 github.com/pion/datachannel v1.5.6 - github.com/pion/ice/v2 v2.3.25 + github.com/pion/ice/v2 v2.3.24 github.com/pion/logging v0.2.2 github.com/pion/sctp v1.8.16 github.com/pion/stun v0.6.1 diff --git a/go.sum b/go.sum index a2a327b99b..5455f12e29 100644 --- a/go.sum +++ b/go.sum @@ -278,8 +278,8 @@ github.com/pion/datachannel v1.5.6/go.mod h1:1eKT6Q85pRnr2mHiWHxJwO50SfZRtWHTsNI github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= github.com/pion/dtls/v2 v2.2.11 h1:9U/dpCYl1ySttROPWJgqWKEylUdT0fXp/xst6JwY5Ks= github.com/pion/dtls/v2 v2.2.11/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE= -github.com/pion/ice/v2 v2.3.25 h1:M5rJA07dqhi3nobJIg+uPtcVjFECTrhcR3n0ns8kDZs= -github.com/pion/ice/v2 v2.3.25/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw= +github.com/pion/ice/v2 v2.3.24 h1:RYgzhH/u5lH0XO+ABatVKCtRd+4U1GEaCXSMjNr13tI= +github.com/pion/ice/v2 v2.3.24/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw= github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M= github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= diff --git a/p2p/http/libp2phttp.go b/p2p/http/libp2phttp.go index e5078f772e..5ff025e3ab 100644 --- a/p2p/http/libp2phttp.go +++ b/p2p/http/libp2phttp.go @@ -61,7 +61,7 @@ type WellKnownHandler struct { // streamHostListen returns a net.Listener that listens on libp2p streams for HTTP/1.1 messages. func streamHostListen(streamHost host.Host) (net.Listener, error) { - return gostream.Listen(streamHost, ProtocolIDForMultistreamSelect, gostream.IgnoreEOF()) + return gostream.Listen(streamHost, ProtocolIDForMultistreamSelect) } func (h *WellKnownHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { diff --git a/p2p/http/libp2phttp_test.go b/p2p/http/libp2phttp_test.go index e7e66bb4cd..a444c6e209 100644 --- a/p2p/http/libp2phttp_test.go +++ b/p2p/http/libp2phttp_test.go @@ -719,39 +719,3 @@ func TestServerLegacyWellKnownResource(t *testing.T) { } } - -func TestResponseWriterShouldNotHaveCancelledContext(t *testing.T) { - h, err := libp2p.New() - require.NoError(t, err) - defer h.Close() - httpHost := libp2phttp.Host{StreamHost: h} - go httpHost.Serve() - defer httpHost.Close() - - closeNotifyCh := make(chan bool, 1) - httpHost.SetHTTPHandlerAtPath("/test", "/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Legacy code uses this to check if the connection was closed - //lint:ignore SA1019 This is a test to assert we do the right thing since Go HTTP stdlib depends on this. - ch := w.(http.CloseNotifier).CloseNotify() - select { - case <-ch: - closeNotifyCh <- true - case <-time.After(100 * time.Millisecond): - closeNotifyCh <- false - } - w.WriteHeader(http.StatusOK) - })) - - clientH, err := libp2p.New() - require.NoError(t, err) - defer clientH.Close() - clientHost := libp2phttp.Host{StreamHost: clientH} - - rt, err := clientHost.NewConstrainedRoundTripper(peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()}) - require.NoError(t, err) - httpClient := &http.Client{Transport: rt} - _, err = httpClient.Get("/") - require.NoError(t, err) - - require.False(t, <-closeNotifyCh) -} diff --git a/p2p/net/gostream/conn.go b/p2p/net/gostream/conn.go index 6959b6cbe0..991dd2ff96 100644 --- a/p2p/net/gostream/conn.go +++ b/p2p/net/gostream/conn.go @@ -2,7 +2,6 @@ package gostream import ( "context" - "io" "net" "github.com/libp2p/go-libp2p/core/host" @@ -15,20 +14,11 @@ import ( // libp2p streams. type conn struct { network.Stream - ignoreEOF bool -} - -func (c *conn) Read(b []byte) (int, error) { - n, err := c.Stream.Read(b) - if err != nil && c.ignoreEOF && err == io.EOF { - return n, nil - } - return n, err } // newConn creates a conn given a libp2p stream -func newConn(s network.Stream, ignoreEOF bool) net.Conn { - return &conn{s, ignoreEOF} +func newConn(s network.Stream) net.Conn { + return &conn{s} } // LocalAddr returns the local network address. @@ -49,5 +39,5 @@ func Dial(ctx context.Context, h host.Host, pid peer.ID, tag protocol.ID) (net.C if err != nil { return nil, err } - return newConn(s, false), nil + return newConn(s), nil } diff --git a/p2p/net/gostream/listener.go b/p2p/net/gostream/listener.go index f1146b0617..250e688050 100644 --- a/p2p/net/gostream/listener.go +++ b/p2p/net/gostream/listener.go @@ -18,10 +18,6 @@ type listener struct { tag protocol.ID cancel func() streamCh chan network.Stream - // ignoreEOF is a flag that tells the listener to return conns that ignore EOF errors. - // Necessary because the default responsewriter will consider a connection closed if it reads EOF. - // But when on streams, it's fine for us to read EOF, but still be able to write. - ignoreEOF bool } // Accept returns the next a connection to this listener. @@ -30,7 +26,7 @@ type listener struct { func (l *listener) Accept() (net.Conn, error) { select { case s := <-l.streamCh: - return newConn(s, l.ignoreEOF), nil + return newConn(s), nil case <-l.ctx.Done(): return nil, l.ctx.Err() } @@ -52,7 +48,7 @@ func (l *listener) Addr() net.Addr { // Listen provides a standard net.Listener ready to accept "connections". // Under the hood, these connections are libp2p streams tagged with the // given protocol.ID. -func Listen(h host.Host, tag protocol.ID, opts ...ListenerOption) (net.Listener, error) { +func Listen(h host.Host, tag protocol.ID) (net.Listener, error) { ctx, cancel := context.WithCancel(context.Background()) l := &listener{ @@ -62,11 +58,6 @@ func Listen(h host.Host, tag protocol.ID, opts ...ListenerOption) (net.Listener, tag: tag, streamCh: make(chan network.Stream), } - for _, opt := range opts { - if err := opt(l); err != nil { - return nil, err - } - } h.SetStreamHandler(tag, func(s network.Stream) { select { @@ -78,12 +69,3 @@ func Listen(h host.Host, tag protocol.ID, opts ...ListenerOption) (net.Listener, return l, nil } - -type ListenerOption func(*listener) error - -func IgnoreEOF() ListenerOption { - return func(l *listener) error { - l.ignoreEOF = true - return nil - } -} diff --git a/p2p/protocol/circuitv2/client/reservation.go b/p2p/protocol/circuitv2/client/reservation.go index 462c01d236..dbb9241937 100644 --- a/p2p/protocol/circuitv2/client/reservation.go +++ b/p2p/protocol/circuitv2/client/reservation.go @@ -93,7 +93,10 @@ func Reserve(ctx context.Context, h host.Host, ai peer.AddrInfo) (*Reservation, } if msg.GetType() != pbv2.HopMessage_STATUS { - return nil, ReservationError{Status: pbv2.Status_MALFORMED_MESSAGE, Reason: fmt.Sprintf("unexpected relay response: not a status message (%d)", msg.GetType())} + return nil, ReservationError{ + Status: pbv2.Status_MALFORMED_MESSAGE, + Reason: fmt.Sprintf("unexpected relay response: not a status message (%d)", msg.GetType()), + err: err} } if status := msg.GetStatus(); status != pbv2.Status_OK { @@ -127,7 +130,7 @@ func Reserve(ctx context.Context, h host.Host, ai peer.AddrInfo) (*Reservation, voucherBytes := rsvp.GetVoucher() if voucherBytes != nil { - env, rec, err := record.ConsumeEnvelope(voucherBytes, proto.RecordDomain) + _, rec, err := record.ConsumeEnvelope(voucherBytes, proto.RecordDomain) if err != nil { return nil, ReservationError{ Status: pbv2.Status_MALFORMED_MESSAGE, @@ -143,27 +146,6 @@ func Reserve(ctx context.Context, h host.Host, ai peer.AddrInfo) (*Reservation, Reason: fmt.Sprintf("unexpected voucher record type: %+T", rec), } } - signerPeerID, err := peer.IDFromPublicKey(env.PublicKey) - if err != nil { - return nil, ReservationError{ - Status: pbv2.Status_MALFORMED_MESSAGE, - Reason: fmt.Sprintf("invalid voucher signing public key: %s", err), - err: err, - } - } - if signerPeerID != voucher.Relay { - return nil, ReservationError{ - Status: pbv2.Status_MALFORMED_MESSAGE, - Reason: fmt.Sprintf("invalid voucher relay id: expected %s, got %s", signerPeerID, voucher.Relay), - } - } - if h.ID() != voucher.Peer { - return nil, ReservationError{ - Status: pbv2.Status_MALFORMED_MESSAGE, - Reason: fmt.Sprintf("invalid voucher peer id: expected %s, got %s", h.ID(), voucher.Peer), - } - - } result.Voucher = voucher } diff --git a/p2p/protocol/circuitv2/client/reservation_test.go b/p2p/protocol/circuitv2/client/reservation_test.go index d1ab6dc683..decb3e71de 100644 --- a/p2p/protocol/circuitv2/client/reservation_test.go +++ b/p2p/protocol/circuitv2/client/reservation_test.go @@ -8,11 +8,8 @@ import ( "time" "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/record" - "github.com/libp2p/go-libp2p/core/test" "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client" pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb" "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" @@ -87,45 +84,6 @@ func TestReservationFailures(t *testing.T) { err: "error consuming voucher envelope: failed when unmarshalling the envelope", status: pbv2.Status_MALFORMED_MESSAGE, }, - { - name: "invalid voucher 2", - streamHandler: func(s network.Stream) { - status := pbv2.Status_OK - expire := uint64(time.Now().Add(time.Hour).UnixNano()) - priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256) - if err != nil { - s.Reset() - return - } - relay, _ := test.RandPeerID() - peer, _ := test.RandPeerID() - voucher := &proto.ReservationVoucher{ - Relay: relay, - Peer: peer, - Expiration: time.Now().Add(time.Hour), - } - signedVoucher, err := record.Seal(voucher, priv) - if err != nil { - s.Reset() - return - } - env, err := signedVoucher.Marshal() - if err != nil { - s.Reset() - return - } - util.NewDelimitedWriter(s).WriteMsg(&pbv2.HopMessage{ - Type: pbv2.HopMessage_STATUS.Enum(), - Status: &status, - Reservation: &pbv2.Reservation{ - Expire: &expire, - Voucher: env, - }, - }) - }, - err: "invalid voucher relay id", - status: pbv2.Status_MALFORMED_MESSAGE, - }, } for _, tc := range testcases { diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index e91fdee76e..a91cc4f92e 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -831,7 +831,7 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bo ids.Host.Peerstore().UpdateAddrs(p, peerstore.TempAddrTTL, 0) ids.addrMu.Unlock() - log.Debugf("%s received listen addrs for %s: %s", c.LocalPeer(), c.RemotePeer(), addrs) + log.Debugf("%s received listen addrs for %s: %s", c.LocalPeer(), c.RemotePeer(), lmaddrs) // get protocol versions pv := mes.GetProtocolVersion() @@ -1064,23 +1064,18 @@ func (nn *netNotifiee) Disconnected(_ network.Network, c network.Conn) { func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {} func (nn *netNotifiee) ListenClose(n network.Network, a ma.Multiaddr) {} -// filterAddrs filters the address slice based on the remote multiaddr: -// - if it's a localhost address, no filtering is applied -// - if it's a private network address, all localhost addresses are filtered out -// - if it's a public address, all non-public addresses are filtered out -// - if none of the above, (e.g. discard prefix), no filtering is applied. -// We can't do anything meaningful here so we do nothing. +// filterAddrs filters the address slice based on the remove multiaddr: +// * if it's a localhost address, no filtering is applied +// * if it's a local network address, all localhost addresses are filtered out +// * if it's a public address, all localhost and local network addresses are filtered out func filterAddrs(addrs []ma.Multiaddr, remote ma.Multiaddr) []ma.Multiaddr { - switch { - case manet.IsIPLoopback(remote): + if manet.IsIPLoopback(remote) { return addrs - case manet.IsPrivateAddr(remote): + } + if manet.IsPrivateAddr(remote) { return ma.FilterAddrs(addrs, func(a ma.Multiaddr) bool { return !manet.IsIPLoopback(a) }) - case manet.IsPublicAddr(remote): - return ma.FilterAddrs(addrs, manet.IsPublicAddr) - default: - return addrs } + return ma.FilterAddrs(addrs, manet.IsPublicAddr) } func trimHostAddrList(addrs []ma.Multiaddr, maxSize int) []ma.Multiaddr { diff --git a/p2p/protocol/identify/obsaddr.go b/p2p/protocol/identify/obsaddr.go index fc1c100c8e..4437c4b011 100644 --- a/p2p/protocol/identify/obsaddr.go +++ b/p2p/protocol/identify/obsaddr.go @@ -452,12 +452,6 @@ func (o *ObservedAddrManager) removeConn(conn connMultiaddrs) { o.mu.Lock() defer o.mu.Unlock() - observedTWAddr, ok := o.connObservedTWAddrs[conn] - if !ok { - return - } - delete(o.connObservedTWAddrs, conn) - // normalize before obtaining the thinWaist so that we are always dealing // with the normalized form of the address localTW, err := thinWaistForm(o.normalize(conn.LocalMultiaddr())) @@ -473,6 +467,11 @@ func (o *ObservedAddrManager) removeConn(conn connMultiaddrs) { delete(o.localAddrs, string(localTW.Addr.Bytes())) } + observedTWAddr, ok := o.connObservedTWAddrs[conn] + if !ok { + return + } + delete(o.connObservedTWAddrs, conn) observer, err := getObserver(conn.RemoteMultiaddr()) if err != nil { return diff --git a/p2p/protocol/identify/obsaddr_test.go b/p2p/protocol/identify/obsaddr_test.go index 94366f882e..9c2d8dee57 100644 --- a/p2p/protocol/identify/obsaddr_test.go +++ b/p2p/protocol/identify/obsaddr_test.go @@ -153,7 +153,7 @@ func TestObservedAddrManager(t *testing.T) { var ob1, ob2 [N]connMultiaddrs for i := 0; i < N; i++ { ob1[i] = newConn(quic4ListenAddr, ma.StringCast(fmt.Sprintf("/ip4/1.2.3.%d/udp/1/quic-v1", i))) - ob2[i] = newConn(quic4ListenAddr, ma.StringCast(fmt.Sprintf("/ip4/1.2.3.%d/udp/2/quic-v1", i))) + ob2[i] = newConn(quic4ListenAddr, ma.StringCast(fmt.Sprintf("/ip4/1.2.3.%d/udp/1/quic-v1", i))) } for i := 0; i < N-1; i++ { o.Record(ob1[i], observedQuic) @@ -186,7 +186,6 @@ func TestObservedAddrManager(t *testing.T) { return checkAllEntriesRemoved(o) }, 2*time.Second, 100*time.Millisecond) }) - t.Run("SameObserversDifferentAddrs", func(t *testing.T) { o := newObservedAddrMgr() defer o.Close() @@ -198,7 +197,7 @@ func TestObservedAddrManager(t *testing.T) { var ob1, ob2 [N]connMultiaddrs for i := 0; i < N; i++ { ob1[i] = newConn(quic4ListenAddr, ma.StringCast(fmt.Sprintf("/ip4/1.2.3.%d/udp/1/quic-v1", i))) - ob2[i] = newConn(quic4ListenAddr, ma.StringCast(fmt.Sprintf("/ip4/1.2.3.%d/udp/2/quic-v1", i))) + ob2[i] = newConn(quic4ListenAddr, ma.StringCast(fmt.Sprintf("/ip4/1.2.3.%d/udp/1/quic-v1", i))) } for i := 0; i < N-1; i++ { o.Record(ob1[i], observedQuic1) @@ -239,8 +238,6 @@ func TestObservedAddrManager(t *testing.T) { c2 := newConn(quic4ListenAddr, ma.StringCast("/ip4/1.2.3.2/udp/1/quic-v1")) c3 := newConn(webTransport4ListenAddr, ma.StringCast("/ip4/1.2.3.3/udp/1/quic-v1/webtransport")) c4 := newConn(webTransport4ListenAddr, ma.StringCast("/ip4/1.2.3.4/udp/1/quic-v1/webtransport")) - c5 := newConn(quic4ListenAddr, ma.StringCast("/ip4/1.2.3.5/udp/1/quic-v1")) - c6 := newConn(quic4ListenAddr, ma.StringCast("/ip4/1.2.3.6/udp/1/quic-v1")) var observedQuic, observedWebTransport ma.Multiaddr for i := 0; i < 10; i++ { // Change the IP address in each observation @@ -250,7 +247,6 @@ func TestObservedAddrManager(t *testing.T) { o.Record(c2, observedQuic) o.Record(c3, observedWebTransport) o.Record(c4, observedWebTransport) - o.Record(c5, observedQuic) time.Sleep(20 * time.Millisecond) } @@ -262,23 +258,13 @@ func TestObservedAddrManager(t *testing.T) { require.NoError(t, err) require.Less(t, len(o.externalAddrs[string(tw.TW.Bytes())]), 2) - require.Equal(t, []ma.Multiaddr{observedWebTransport}, o.AddrsFor(webTransport4ListenAddr)) - require.Equal(t, []ma.Multiaddr{observedQuic}, o.AddrsFor(quic4ListenAddr)) - require.ElementsMatch(t, []ma.Multiaddr{observedQuic, observedWebTransport}, o.Addrs()) - - for i := 0; i < 3; i++ { - // remove non-recorded connection - o.removeConn(c6) - } - require.Equal(t, []ma.Multiaddr{observedWebTransport}, o.AddrsFor(webTransport4ListenAddr)) - require.Equal(t, []ma.Multiaddr{observedQuic}, o.AddrsFor(quic4ListenAddr)) - require.ElementsMatch(t, []ma.Multiaddr{observedQuic, observedWebTransport}, o.Addrs()) + require.Equal(t, o.AddrsFor(webTransport4ListenAddr), []ma.Multiaddr{observedWebTransport}) + require.Equal(t, o.AddrsFor(quic4ListenAddr), []ma.Multiaddr{observedQuic}) o.removeConn(c1) o.removeConn(c2) o.removeConn(c3) o.removeConn(c4) - o.removeConn(c5) require.Eventually(t, func() bool { return checkAllEntriesRemoved(o) }, 1*time.Second, 100*time.Millisecond) @@ -425,7 +411,7 @@ func TestObservedAddrManager(t *testing.T) { return checkAllEntriesRemoved(o) }, 1*time.Second, 100*time.Millisecond) }) - t.Run("Nil Input", func(t *testing.T) { + t.Run("Nill Input", func(t *testing.T) { o := newObservedAddrMgr() defer o.Close() o.maybeRecordObservation(nil, nil) diff --git a/p2p/transport/webrtc/listener.go b/p2p/transport/webrtc/listener.go index 3f465b34fc..1834fc812b 100644 --- a/p2p/transport/webrtc/listener.go +++ b/p2p/transport/webrtc/listener.go @@ -330,7 +330,7 @@ func addOnConnectionStateChangeCallback(pc *webrtc.PeerConnection) <-chan error errC := make(chan error, 1) var once sync.Once pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { - switch pc.ConnectionState() { + switch state { case webrtc.PeerConnectionStateConnected: once.Do(func() { close(errC) }) case webrtc.PeerConnectionStateFailed: diff --git a/p2p/transport/webrtc/transport.go b/p2p/transport/webrtc/transport.go index 68a2988c78..b04753ecab 100644 --- a/p2p/transport/webrtc/transport.go +++ b/p2p/transport/webrtc/transport.go @@ -415,17 +415,15 @@ func genUfrag() string { uFragAlphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890" uFragPrefix = "libp2p+webrtc+v1/" uFragIdLength = 32 - uFragLength = len(uFragPrefix) + uFragIdLength + uFragIdOffset = len(uFragPrefix) + uFragLength = uFragIdOffset + uFragIdLength ) seed := [8]byte{} rand.Read(seed[:]) r := mrand.New(mrand.NewSource(binary.BigEndian.Uint64(seed[:]))) b := make([]byte, uFragLength) - for i := 0; i < len(uFragPrefix); i++ { - b[i] = uFragPrefix[i] - } - for i := len(uFragPrefix); i < uFragLength; i++ { + for i := uFragIdOffset; i < uFragLength; i++ { b[i] = uFragAlphabet[r.Intn(len(uFragAlphabet))] } return string(b) diff --git a/p2p/transport/webrtc/transport_test.go b/p2p/transport/webrtc/transport_test.go index b618ec85df..a3054a82df 100644 --- a/p2p/transport/webrtc/transport_test.go +++ b/p2p/transport/webrtc/transport_test.go @@ -4,7 +4,6 @@ import ( "context" "crypto/rand" "encoding/hex" - "errors" "fmt" "io" "net" @@ -18,7 +17,6 @@ import ( "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" - tpt "github.com/libp2p/go-libp2p/core/transport" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" "github.com/multiformats/go-multibase" @@ -862,125 +860,3 @@ func TestMaxInFlightRequests(t *testing.T) { require.Equal(t, count, int(success.Load()), "expected exactly 3 dial successes") require.Equal(t, 1, int(fails.Load()), "expected exactly 1 dial failure") } - -func TestGenUfrag(t *testing.T) { - for i := 0; i < 10; i++ { - s := genUfrag() - require.True(t, strings.HasPrefix(s, "libp2p+webrtc+v1/")) - } -} - -func TestManyConnections(t *testing.T) { - var listeners []tpt.Listener - var listenerPeerIDs []peer.ID - - const numListeners = 5 - const dialersPerListener = 5 - const connsPerDialer = 10 - errCh := make(chan error, 10*numListeners*dialersPerListener*connsPerDialer) - successCh := make(chan struct{}, 10*numListeners*dialersPerListener*connsPerDialer) - - for i := 0; i < numListeners; i++ { - tr, lp := getTransport(t) - listenerPeerIDs = append(listenerPeerIDs, lp) - ln, err := tr.Listen(ma.StringCast("/ip4/127.0.0.1/udp/0/webrtc-direct")) - require.NoError(t, err) - defer ln.Close() - listeners = append(listeners, ln) - } - - runListenConn := func(conn tpt.CapableConn) { - defer conn.Close() - s, err := conn.AcceptStream() - if err != nil { - t.Errorf("accept stream failed for listener: %s", err) - errCh <- err - return - } - var b [4]byte - if _, err := s.Read(b[:]); err != nil { - t.Errorf("read stream failed for listener: %s", err) - errCh <- err - return - } - s.Write(b[:]) - _, err = s.Read(b[:]) // peer will close the connection after read - if !assert.Error(t, err) { - err = errors.New("invalid read: expected conn to close") - errCh <- err - return - } - successCh <- struct{}{} - } - - runDialConn := func(conn tpt.CapableConn) { - defer conn.Close() - - s, err := conn.OpenStream(context.Background()) - if err != nil { - t.Errorf("accept stream failed for listener: %s", err) - errCh <- err - return - } - var b [4]byte - if _, err := s.Write(b[:]); err != nil { - t.Errorf("write stream failed for dialer: %s", err) - errCh <- err - return - } - if _, err := s.Read(b[:]); err != nil { - t.Errorf("read stream failed for dialer: %s", err) - errCh <- err - return - } - s.Close() - } - - runListener := func(ln tpt.Listener) { - for i := 0; i < dialersPerListener*connsPerDialer; i++ { - conn, err := ln.Accept() - if err != nil { - t.Errorf("listener failed to accept conneciton: %s", err) - return - } - go runListenConn(conn) - } - } - - runDialer := func(ln tpt.Listener, lp peer.ID) { - tp, _ := getTransport(t) - for i := 0; i < connsPerDialer; i++ { - // We want to test for deadlocks, set a high timeout - ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) - conn, err := tp.Dial(ctx, ln.Multiaddr(), lp) - if err != nil { - t.Errorf("dial failed: %s", err) - errCh <- err - cancel() - return - } - runDialConn(conn) - cancel() - } - } - - for i := 0; i < numListeners; i++ { - go runListener(listeners[i]) - } - for i := 0; i < numListeners; i++ { - for j := 0; j < dialersPerListener; j++ { - go runDialer(listeners[i], listenerPeerIDs[i]) - } - } - - for i := 0; i < numListeners*dialersPerListener*connsPerDialer; i++ { - select { - case <-successCh: - t.Log("completed conn: ", i) - case err := <-errCh: - t.Fatalf("failed: %s", err) - case <-time.After(300 * time.Second): - t.Fatalf("timed out") - } - } -} diff --git a/test-plans/go.mod b/test-plans/go.mod index 6e4ff51d3d..64361ed334 100644 --- a/test-plans/go.mod +++ b/test-plans/go.mod @@ -66,7 +66,7 @@ require ( github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pion/datachannel v1.5.6 // indirect github.com/pion/dtls/v2 v2.2.11 // indirect - github.com/pion/ice/v2 v2.3.25 // indirect + github.com/pion/ice/v2 v2.3.24 // indirect github.com/pion/interceptor v0.1.29 // indirect github.com/pion/logging v0.2.2 // indirect github.com/pion/mdns v0.0.12 // indirect diff --git a/test-plans/go.sum b/test-plans/go.sum index fdd19bbb02..63eac7a98b 100644 --- a/test-plans/go.sum +++ b/test-plans/go.sum @@ -226,8 +226,8 @@ github.com/pion/datachannel v1.5.6/go.mod h1:1eKT6Q85pRnr2mHiWHxJwO50SfZRtWHTsNI github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= github.com/pion/dtls/v2 v2.2.11 h1:9U/dpCYl1ySttROPWJgqWKEylUdT0fXp/xst6JwY5Ks= github.com/pion/dtls/v2 v2.2.11/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE= -github.com/pion/ice/v2 v2.3.25 h1:M5rJA07dqhi3nobJIg+uPtcVjFECTrhcR3n0ns8kDZs= -github.com/pion/ice/v2 v2.3.25/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw= +github.com/pion/ice/v2 v2.3.24 h1:RYgzhH/u5lH0XO+ABatVKCtRd+4U1GEaCXSMjNr13tI= +github.com/pion/ice/v2 v2.3.24/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw= github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M= github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= diff --git a/version.json b/version.json index f765664902..fb0aea4f0c 100644 --- a/version.json +++ b/version.json @@ -1,3 +1,3 @@ { - "version": "v0.35.1" + "version": "v0.35.0" }