diff --git a/go.mod b/go.mod index 9966beb932..c6f3f2b324 100644 --- a/go.mod +++ b/go.mod @@ -44,7 +44,7 @@ require ( github.com/multiformats/go-multibase v0.2.0 github.com/multiformats/go-multicodec v0.9.0 github.com/multiformats/go-multihash v0.2.3 - github.com/multiformats/go-multistream v0.5.0 + github.com/multiformats/go-multistream v0.6.0 github.com/multiformats/go-varint v0.0.7 github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 github.com/pion/datachannel v1.5.9 diff --git a/go.sum b/go.sum index 5c288ad7ab..4650eef1f0 100644 --- a/go.sum +++ b/go.sum @@ -246,8 +246,8 @@ github.com/multiformats/go-multicodec v0.9.0/go.mod h1:L3QTQvMIaVBkXOXXtVmYE+LI1 github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew= github.com/multiformats/go-multihash v0.2.3 h1:7Lyc8XfX/IY2jWb/gI7JP+o7JEq9hOa7BFvVU9RSh+U= github.com/multiformats/go-multihash v0.2.3/go.mod h1:dXgKXCXjBzdscBLk9JkjINiEsCKRVch90MdaGiKsvSM= -github.com/multiformats/go-multistream v0.5.0 h1:5htLSLl7lvJk3xx3qT/8Zm9J4K8vEOf/QGkvOGQAyiE= -github.com/multiformats/go-multistream v0.5.0/go.mod h1:n6tMZiwiP2wUsR8DgfDWw1dydlEqV3l6N3/GBsX6ILA= +github.com/multiformats/go-multistream v0.6.0 h1:ZaHKbsL404720283o4c/IHQXiS6gb8qAN5EIJ4PN5EA= +github.com/multiformats/go-multistream v0.6.0/go.mod h1:MOyoG5otO24cHIg8kf9QW2/NozURlkP/rvi2FQJyCPg= github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8= github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index f7d3c5275a..820411bd27 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -122,9 +122,10 @@ type HostOpts struct { // MultistreamMuxer is essential for the *BasicHost and will use a sensible default value if omitted. MultistreamMuxer *msmux.MultistreamMuxer[protocol.ID] - // NegotiationTimeout determines the read and write timeouts on streams. - // If 0 or omitted, it will use DefaultNegotiationTimeout. - // If below 0, timeouts on streams will be deactivated. + // NegotiationTimeout determines the read and write timeouts when negotiating + // protocols for streams. If 0 or omitted, it will use + // DefaultNegotiationTimeout. If below 0, timeouts on streams will be + // deactivated. NegotiationTimeout time.Duration // AddrsFactory holds a function which can be used to override or filter the result of Addrs. @@ -689,6 +690,14 @@ func (h *BasicHost) RemoveStreamHandler(pid protocol.ID) { // to create one. If ProtocolID is "", writes no header. // (Thread-safe) func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (str network.Stream, strErr error) { + if _, ok := ctx.Deadline(); !ok { + if h.negtimeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, h.negtimeout) + defer cancel() + } + } + // If the caller wants to prevent the host from dialing, it should use the NoDial option. if nodial, _ := network.GetNoDial(ctx); !nodial { err := h.Connect(ctx, peer.AddrInfo{ID: p}) diff --git a/p2p/host/basic/basic_host_test.go b/p2p/host/basic/basic_host_test.go index 1ab98aae9d..2a7a772976 100644 --- a/p2p/host/basic/basic_host_test.go +++ b/p2p/host/basic/basic_host_test.go @@ -2,6 +2,7 @@ package basichost import ( "context" + "encoding/binary" "fmt" "io" "reflect" @@ -941,3 +942,56 @@ func TestTrimHostAddrList(t *testing.T) { }) } } + +func TestHostTimeoutNewStream(t *testing.T) { + h1, err := NewHost(swarmt.GenSwarm(t), nil) + require.NoError(t, err) + h1.Start() + defer h1.Close() + + const proto = "/testing" + h2 := swarmt.GenSwarm(t) + + h2.SetStreamHandler(func(s network.Stream) { + // First message is multistream header. Just echo it + msHeader := []byte("\x19/multistream/1.0.0\n") + _, err := s.Read(msHeader) + assert.NoError(t, err) + _, err = s.Write(msHeader) + assert.NoError(t, err) + + buf := make([]byte, 1024) + n, err := s.Read(buf) + assert.NoError(t, err) + + msgLen, varintN := binary.Uvarint(buf[:n]) + buf = buf[varintN:] + proto := buf[:int(msgLen)] + if string(proto) == "/ipfs/id/1.0.0\n" { + // Signal we don't support identify + na := []byte("na\n") + n := binary.PutUvarint(buf, uint64(len(na))) + copy(buf[n:], na) + + _, err = s.Write(buf[:int(n)+len(na)]) + assert.NoError(t, err) + } else { + // Stall + time.Sleep(5 * time.Second) + } + t.Log("Resetting") + s.Reset() + }) + + err = h1.Connect(context.Background(), peer.AddrInfo{ + ID: h2.LocalPeer(), + Addrs: h2.ListenAddresses(), + }) + require.NoError(t, err) + + // No context passed in, fallback to negtimeout + h1.negtimeout = time.Second + _, err = h1.NewStream(context.Background(), h2.LocalPeer(), proto) + require.Error(t, err) + require.ErrorContains(t, err, "context deadline exceeded") +} diff --git a/p2p/http/example_test.go b/p2p/http/example_test.go index 7073b7f0e3..8e94f6e7e0 100644 --- a/p2p/http/example_test.go +++ b/p2p/http/example_test.go @@ -6,6 +6,7 @@ import ( "log" "net" "net/http" + "regexp" "strings" "github.com/libp2p/go-libp2p" @@ -125,18 +126,24 @@ func ExampleHost_overLibp2pStreams() { // Output: Hello HTTP } +var tcpPortRE = regexp.MustCompile(`/tcp/(\d+)`) + func ExampleHost_Serve() { server := libp2phttp.Host{ InsecureAllowHTTP: true, // For our example, we'll allow insecure HTTP - ListenAddrs: []ma.Multiaddr{ma.StringCast("/ip4/127.0.0.1/tcp/50221/http")}, + ListenAddrs: []ma.Multiaddr{ma.StringCast("/ip4/127.0.0.1/tcp/0/http")}, } go server.Serve() defer server.Close() - fmt.Println(server.Addrs()) + for _, a := range server.Addrs() { + s := a.String() + addrWithoutSpecificPort := tcpPortRE.ReplaceAllString(s, "/tcp/") + fmt.Println(addrWithoutSpecificPort) + } - // Output: [/ip4/127.0.0.1/tcp/50221/http] + // Output: /ip4/127.0.0.1/tcp//http } func ExampleHost_SetHTTPHandler() { diff --git a/p2p/protocol/circuitv2/relay/relay.go b/p2p/protocol/circuitv2/relay/relay.go index 2ee237d97b..326d17781b 100644 --- a/p2p/protocol/circuitv2/relay/relay.go +++ b/p2p/protocol/circuitv2/relay/relay.go @@ -118,7 +118,7 @@ func (r *Relay) Close() error { r.host.RemoveStreamHandler(proto.ProtoIDv2Hop) r.host.Network().StopNotify(r.notifiee) - r.scope.Done() + defer r.scope.Done() r.cancel() r.gc() if r.metricsTracer != nil { @@ -315,7 +315,7 @@ func (r *Relay) handleConnect(s network.Stream, msg *pbv2.HopMessage) pbv2.Statu connStTime := time.Now() cleanup := func() { - span.Done() + defer span.Done() r.mx.Lock() r.rmConn(src) r.rmConn(dest.ID) diff --git a/p2p/protocol/identify/obsaddr.go b/p2p/protocol/identify/obsaddr.go index ffe60345e1..06e54bf5fd 100644 --- a/p2p/protocol/identify/obsaddr.go +++ b/p2p/protocol/identify/obsaddr.go @@ -335,6 +335,11 @@ func (o *ObservedAddrManager) worker() { } } +func isRelayedAddress(a ma.Multiaddr) bool { + _, err := a.ValueForProtocol(ma.P_CIRCUIT) + return err == nil +} + func (o *ObservedAddrManager) shouldRecordObservation(conn connMultiaddrs, observed ma.Multiaddr) (shouldRecord bool, localTW thinWaist, observedTW thinWaist) { if conn == nil || observed == nil { return false, thinWaist{}, thinWaist{} @@ -350,6 +355,12 @@ func (o *ObservedAddrManager) shouldRecordObservation(conn connMultiaddrs, obser return false, thinWaist{}, thinWaist{} } + // Ignore p2p-circuit addresses. These are the observed address of the relay. + // Not useful for us. + if isRelayedAddress(observed) { + return false, thinWaist{}, thinWaist{} + } + // we should only use ObservedAddr when our connection's LocalAddr is one // of our ListenAddrs. If we Dial out using an ephemeral addr, knowing that // address's external mapping is not very useful because the port will not be @@ -410,7 +421,7 @@ func (o *ObservedAddrManager) maybeRecordObservation(conn connMultiaddrs, observ if !shouldRecord { return } - log.Debugw("added own observed listen addr", "observed", observed) + log.Debugw("added own observed listen addr", "conn", conn, "observed", observed) o.mu.Lock() defer o.mu.Unlock() diff --git a/p2p/protocol/identify/obsaddr_glass_test.go b/p2p/protocol/identify/obsaddr_glass_test.go index 31fd4f5726..3211aa5f54 100644 --- a/p2p/protocol/identify/obsaddr_glass_test.go +++ b/p2p/protocol/identify/obsaddr_glass_test.go @@ -53,6 +53,24 @@ func TestShouldRecordObservationWithWebTransport(t *testing.T) { require.True(t, shouldRecord) } +func TestShouldNotRecordObservationWithRelayedAddr(t *testing.T) { + listenAddr := ma.StringCast("/ip4/1.2.3.4/udp/8888/quic-v1/p2p-circuit") + ifaceAddr := ma.StringCast("/ip4/10.0.0.2/udp/9999/quic-v1") + listenAddrs := func() []ma.Multiaddr { return []ma.Multiaddr{listenAddr} } + ifaceListenAddrs := func() ([]ma.Multiaddr, error) { return []ma.Multiaddr{ifaceAddr}, nil } + addrs := func() []ma.Multiaddr { return []ma.Multiaddr{listenAddr} } + + c := &mockConn{ + local: listenAddr, + remote: ma.StringCast("/ip4/1.2.3.6/udp/1236/quic-v1/p2p-circuit"), + } + observedAddr := ma.StringCast("/ip4/1.2.3.4/udp/1231/quic-v1/p2p-circuit") + o, err := NewObservedAddrManager(listenAddrs, addrs, ifaceListenAddrs, normalize) + require.NoError(t, err) + shouldRecord, _, _ := o.shouldRecordObservation(c, observedAddr) + require.False(t, shouldRecord) +} + func TestShouldRecordObservationWithNAT64Addr(t *testing.T) { listenAddr1 := ma.StringCast("/ip4/0.0.0.0/tcp/1234") ifaceAddr1 := ma.StringCast("/ip4/10.0.0.2/tcp/4321") diff --git a/p2p/transport/webrtc/listener.go b/p2p/transport/webrtc/listener.go index d4ba3c0550..c3e2f29799 100644 --- a/p2p/transport/webrtc/listener.go +++ b/p2p/transport/webrtc/listener.go @@ -33,8 +33,12 @@ func (c *connMultiaddrs) LocalMultiaddr() ma.Multiaddr { return c.local } func (c *connMultiaddrs) RemoteMultiaddr() ma.Multiaddr { return c.remote } const ( - candidateSetupTimeout = 20 * time.Second - DefaultMaxInFlightConnections = 10 + candidateSetupTimeout = 10 * time.Second + // This is higher than other transports(64) as there's no way to detect a peer that has gone away after + // sending the initial connection request message(STUN Binding request). Such peers take up a goroutine + // till connection timeout. As the number of handshakes in parallel is still guarded by the resource + // manager, this higher number is okay. + DefaultMaxInFlightConnections = 128 ) type listener struct { @@ -325,8 +329,7 @@ func (l *listener) Multiaddr() ma.Multiaddr { // addOnConnectionStateChangeCallback adds the OnConnectionStateChange to the PeerConnection. // The channel returned here: // * is closed when the state changes to Connection -// * receives an error when the state changes to Failed -// * doesn't receive anything (nor is closed) when the state changes to Disconnected +// * receives an error when the state changes to Failed or Closed or Disconnected func addOnConnectionStateChangeCallback(pc *webrtc.PeerConnection) <-chan error { errC := make(chan error, 1) var once sync.Once @@ -334,17 +337,15 @@ func addOnConnectionStateChangeCallback(pc *webrtc.PeerConnection) <-chan error switch pc.ConnectionState() { case webrtc.PeerConnectionStateConnected: once.Do(func() { close(errC) }) - case webrtc.PeerConnectionStateFailed: + // PeerConnectionStateFailed happens when we fail to negotiate the connection. + // PeerConnectionStateDisconnected happens when we disconnect immediately after connecting. + // PeerConnectionStateClosed happens when we close the peer connection locally, not when remote closes. We don't need + // to error in this case, but it's a no-op, so it doesn't hurt. + case webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateClosed, webrtc.PeerConnectionStateDisconnected: once.Do(func() { errC <- errors.New("peerconnection failed") close(errC) }) - case webrtc.PeerConnectionStateDisconnected: - // the connection can move to a disconnected state and back to a connected state without ICE renegotiation. - // This could happen when underlying UDP packets are lost, and therefore the connection moves to the disconnected state. - // If the connection then receives packets on the connection, it can move back to the connected state. - // If no packets are received until the failed timeout is triggered, the connection moves to the failed state. - log.Warn("peerconnection disconnected") } }) return errC diff --git a/p2p/transport/webtransport/conn.go b/p2p/transport/webtransport/conn.go index 0525124711..d914398e0e 100644 --- a/p2p/transport/webtransport/conn.go +++ b/p2p/transport/webtransport/conn.go @@ -71,7 +71,7 @@ func (c *conn) allowWindowIncrease(size uint64) bool { // It must be called even if the peer closed the connection in order for // garbage collection to properly work in this package. func (c *conn) Close() error { - c.scope.Done() + defer c.scope.Done() c.transport.removeConn(c.session) err := c.session.CloseWithError(0, "") _ = c.qconn.CloseWithError(1, "") diff --git a/test-plans/go.mod b/test-plans/go.mod index 8e670a5129..b2eee27810 100644 --- a/test-plans/go.mod +++ b/test-plans/go.mod @@ -61,7 +61,7 @@ require ( github.com/multiformats/go-multibase v0.2.0 // indirect github.com/multiformats/go-multicodec v0.9.0 // indirect github.com/multiformats/go-multihash v0.2.3 // indirect - github.com/multiformats/go-multistream v0.5.0 // indirect + github.com/multiformats/go-multistream v0.6.0 // indirect github.com/multiformats/go-varint v0.0.7 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/onsi/ginkgo/v2 v2.20.2 // indirect diff --git a/test-plans/go.sum b/test-plans/go.sum index 2b532868ba..cbb839c369 100644 --- a/test-plans/go.sum +++ b/test-plans/go.sum @@ -196,8 +196,8 @@ github.com/multiformats/go-multicodec v0.9.0/go.mod h1:L3QTQvMIaVBkXOXXtVmYE+LI1 github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew= github.com/multiformats/go-multihash v0.2.3 h1:7Lyc8XfX/IY2jWb/gI7JP+o7JEq9hOa7BFvVU9RSh+U= github.com/multiformats/go-multihash v0.2.3/go.mod h1:dXgKXCXjBzdscBLk9JkjINiEsCKRVch90MdaGiKsvSM= -github.com/multiformats/go-multistream v0.5.0 h1:5htLSLl7lvJk3xx3qT/8Zm9J4K8vEOf/QGkvOGQAyiE= -github.com/multiformats/go-multistream v0.5.0/go.mod h1:n6tMZiwiP2wUsR8DgfDWw1dydlEqV3l6N3/GBsX6ILA= +github.com/multiformats/go-multistream v0.6.0 h1:ZaHKbsL404720283o4c/IHQXiS6gb8qAN5EIJ4PN5EA= +github.com/multiformats/go-multistream v0.6.0/go.mod h1:MOyoG5otO24cHIg8kf9QW2/NozURlkP/rvi2FQJyCPg= github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8= github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=