Skip to content

Commit

Permalink
transport integration tests: add tests for resource manager (#2285)
Browse files Browse the repository at this point in the history
* Remove duplicate `SetProtocol` call

* Don't log if transport ErrListenerClosed

* Close the conn scope in webtransport dial

* Mock resource scope span

* Add transport rcmgr integration test

* PR nits

* Fix flakiness

* Threadsafe way of waiting for all streams

* Expand comment
  • Loading branch information
MarcoPolo authored Jun 1, 2023
1 parent 8d77135 commit 1f7b0d2
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 8 deletions.
102 changes: 102 additions & 0 deletions core/network/mocks/mock_resource_scope_span.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/network/mocks/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ package mocknetwork
//go:generate sh -c "go run github.com/golang/mock/mockgen -package mocknetwork -destination mock_stream_management_scope.go github.com/libp2p/go-libp2p/core/network StreamManagementScope"
//go:generate sh -c "go run github.com/golang/mock/mockgen -package mocknetwork -destination mock_peer_scope.go github.com/libp2p/go-libp2p/core/network PeerScope"
//go:generate sh -c "go run github.com/golang/mock/mockgen -package mocknetwork -destination mock_protocol_scope.go github.com/libp2p/go-libp2p/core/network ProtocolScope"
//go:generate sh -c "go run github.com/golang/mock/mockgen -package mocknetwork -destination mock_resource_scope_span.go github.com/libp2p/go-libp2p/core/network ResourceScopeSpan"
2 changes: 0 additions & 2 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,6 @@ func (h *BasicHost) EventBus() event.Bus {
func (h *BasicHost) SetStreamHandler(pid protocol.ID, handler network.StreamHandler) {
h.Mux().AddHandler(pid, func(p protocol.ID, rwc io.ReadWriteCloser) error {
is := rwc.(network.Stream)
is.SetProtocol(p)
handler(is)
return nil
})
Expand All @@ -605,7 +604,6 @@ func (h *BasicHost) SetStreamHandler(pid protocol.ID, handler network.StreamHand
func (h *BasicHost) SetStreamHandlerMatch(pid protocol.ID, m func(protocol.ID) bool, handler network.StreamHandler) {
h.Mux().AddHandlerWithFunc(pid, m, func(p protocol.ID, rwc io.ReadWriteCloser) error {
is := rwc.(network.Stream)
is.SetProtocol(p)
handler(is)
return nil
})
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (s *Swarm) close() {

for l := range listeners {
go func(l transport.Listener) {
if err := l.Close(); err != nil {
if err := l.Close(); err != nil && err != transport.ErrListenerClosed {
log.Errorf("error when shutting down listener: %s", err)
}
}(l)
Expand Down
141 changes: 141 additions & 0 deletions p2p/test/transport/rcmgr_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package transport_integration

import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

gomock "github.com/golang/mock/gomock"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
mocknetwork "github.com/libp2p/go-libp2p/core/network/mocks"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/stretchr/testify/require"
)

func TestResourceManagerIsUsed(t *testing.T) {
for _, tc := range transportsToTest {
t.Run(tc.Name, func(t *testing.T) {
for _, testDialer := range []bool{true, false} {
t.Run(tc.Name+fmt.Sprintf(" test_dialer=%v", testDialer), func(t *testing.T) {

var reservedMemory, releasedMemory atomic.Int32
defer func() {
require.Equal(t, reservedMemory.Load(), releasedMemory.Load())
require.NotEqual(t, 0, reservedMemory.Load())
}()

ctrl := gomock.NewController(t)
defer ctrl.Finish()
rcmgr := mocknetwork.NewMockResourceManager(ctrl)
rcmgr.EXPECT().Close()

var listener, dialer host.Host
var expectedPeer peer.ID
var expectedDir network.Direction
var expectedAddr interface{}
if testDialer {
listener = tc.HostGenerator(t, TransportTestCaseOpts{NoRcmgr: true})
dialer = tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true, ResourceManager: rcmgr})
expectedPeer = listener.ID()
expectedDir = network.DirOutbound
expectedAddr = listener.Addrs()[0]
} else {
listener = tc.HostGenerator(t, TransportTestCaseOpts{ResourceManager: rcmgr})
dialer = tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true, NoRcmgr: true})
expectedPeer = dialer.ID()
expectedDir = network.DirInbound
expectedAddr = gomock.Any()
}

expectFd := true
if strings.Contains(tc.Name, "QUIC") || strings.Contains(tc.Name, "WebTransport") {
expectFd = false
}

peerScope := mocknetwork.NewMockPeerScope(ctrl)
peerScope.EXPECT().ReserveMemory(gomock.Any(), gomock.Any()).AnyTimes().Do(func(amount int, pri uint8) {
reservedMemory.Add(int32(amount))
})
peerScope.EXPECT().ReleaseMemory(gomock.Any()).AnyTimes().Do(func(amount int) {
releasedMemory.Add(int32(amount))
})
peerScope.EXPECT().BeginSpan().AnyTimes().DoAndReturn(func() (network.ResourceScopeSpan, error) {
s := mocknetwork.NewMockResourceScopeSpan(ctrl)
s.EXPECT().BeginSpan().AnyTimes().Return(mocknetwork.NewMockResourceScopeSpan(ctrl), nil)
// No need to track these memory reservations since we assert that Done is called
s.EXPECT().ReserveMemory(gomock.Any(), gomock.Any())
s.EXPECT().Done()
return s, nil
})
var calledSetPeer atomic.Bool

connScope := mocknetwork.NewMockConnManagementScope(ctrl)
connScope.EXPECT().SetPeer(expectedPeer).Do(func(peer.ID) {
calledSetPeer.Store(true)
})
connScope.EXPECT().PeerScope().AnyTimes().DoAndReturn(func() network.PeerScope {
if calledSetPeer.Load() {
return peerScope
}
return nil
})
connScope.EXPECT().Done()

var allStreamsDone sync.WaitGroup

rcmgr.EXPECT().OpenConnection(expectedDir, expectFd, expectedAddr).Return(connScope, nil)
rcmgr.EXPECT().OpenStream(expectedPeer, gomock.Any()).AnyTimes().DoAndReturn(func(id peer.ID, dir network.Direction) (network.StreamManagementScope, error) {
allStreamsDone.Add(1)
streamScope := mocknetwork.NewMockStreamManagementScope(ctrl)
// No need to track these memory reservations since we assert that Done is called
streamScope.EXPECT().ReserveMemory(gomock.Any(), gomock.Any()).AnyTimes()
streamScope.EXPECT().ReleaseMemory(gomock.Any()).AnyTimes()
streamScope.EXPECT().BeginSpan().AnyTimes().DoAndReturn(func() (network.ResourceScopeSpan, error) {
s := mocknetwork.NewMockResourceScopeSpan(ctrl)
s.EXPECT().BeginSpan().AnyTimes().Return(mocknetwork.NewMockResourceScopeSpan(ctrl), nil)
s.EXPECT().Done()
return s, nil
})

streamScope.EXPECT().SetService(gomock.Any()).MaxTimes(1)
streamScope.EXPECT().SetProtocol(gomock.Any())

streamScope.EXPECT().Done().Do(func() {
allStreamsDone.Done()
})
return streamScope, nil
})

require.NoError(t, dialer.Connect(context.Background(), peer.AddrInfo{
ID: listener.ID(),
Addrs: listener.Addrs(),
}))
// Wait for any in progress identifies to finish.
// We shouldn't have to do this, but basic host currently
// always does an identify.
<-dialer.(interface{ IDService() identify.IDService }).IDService().IdentifyWait(dialer.Network().ConnsToPeer(listener.ID())[0])
<-listener.(interface{ IDService() identify.IDService }).IDService().IdentifyWait(listener.Network().ConnsToPeer(dialer.ID())[0])
<-ping.Ping(context.Background(), dialer, listener.ID())
err := dialer.Network().ClosePeer(listener.ID())
require.NoError(t, err)

// Wait a bit for any pending .Adds before we call .Wait to avoid a data race.
// This shouldn't be necessary since it should be impossible
// for an OpenStream to happen *after* a ClosePeer, however
// in practice it does and leads to test flakiness.
time.Sleep(10 * time.Millisecond)
allStreamsDone.Wait()
dialer.Close()
listener.Close()
})
}
})
}
}
11 changes: 8 additions & 3 deletions p2p/test/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ type TransportTestCase struct {
}

type TransportTestCaseOpts struct {
NoListen bool
NoRcmgr bool
ConnGater connmgr.ConnectionGater
NoListen bool
NoRcmgr bool
ConnGater connmgr.ConnectionGater
ResourceManager network.ResourceManager
}

func transformOpts(opts TransportTestCaseOpts) []config.Option {
Expand All @@ -45,6 +46,10 @@ func transformOpts(opts TransportTestCaseOpts) []config.Option {
if opts.ConnGater != nil {
libp2pOpts = append(libp2pOpts, libp2p.ConnectionGater(opts.ConnGater))
}

if opts.ResourceManager != nil {
libp2pOpts = append(libp2pOpts, libp2p.ResourceManager(opts.ResourceManager))
}
return libp2pOpts
}

Expand Down
4 changes: 4 additions & 0 deletions p2p/transport/websocket/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net"
"net/http"
"strings"

"github.com/libp2p/go-libp2p/core/transport"

Expand Down Expand Up @@ -136,6 +137,9 @@ func (l *listener) Close() error {
l.server.Close()
err := l.nl.Close()
<-l.closed
if strings.Contains(err.Error(), "use of closed network connection") {
return transport.ErrListenerClosed
}
return err
}

Expand Down
5 changes: 3 additions & 2 deletions p2p/transport/webtransport/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ type conn struct {
transport *transport
session *webtransport.Session

scope network.ConnScope
scope network.ConnManagementScope
}

var _ tpt.CapableConn = &conn{}

func newConn(tr *transport, sess *webtransport.Session, sconn *connSecurityMultiaddrs, scope network.ConnScope) *conn {
func newConn(tr *transport, sess *webtransport.Session, sconn *connSecurityMultiaddrs, scope network.ConnManagementScope) *conn {
return &conn{
connSecurityMultiaddrs: sconn,
transport: tr,
Expand Down Expand Up @@ -68,6 +68,7 @@ func (c *conn) allowWindowIncrease(size uint64) bool {
// It must be called even if the peer closed the connection in order for
// garbage collection to properly work in this package.
func (c *conn) Close() error {
c.scope.Done()
c.transport.removeConn(c.session)
return c.session.CloseWithError(0, "")
}
Expand Down

0 comments on commit 1f7b0d2

Please sign in to comment.