Skip to content

Commit

Permalink
swarm: fix flaky TestBasicDialSync
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed May 18, 2022
1 parent c51c1b6 commit 9c49349
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 62 deletions.
42 changes: 30 additions & 12 deletions p2p/net/swarm/dial_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,21 @@ func getMockDialFunc() (dialWorkerFunc, func(), context.Context, <-chan struct{}
}

func TestBasicDialSync(t *testing.T) {
df, done, _, callsch := getMockDialFunc()
dsync := newDialSync(df)
p := peer.ID("testpeer")
var counter int32
requests := make(chan dialRequest, 2)
done := make(chan struct{})
dsync := newDialSync(func(id peer.ID, reqs <-chan dialRequest) {
require.Equal(t, id, p)
atomic.AddInt32(&counter, 1)
for req := range reqs {
requests <- req
go func(req dialRequest) {
<-done
req.resch <- dialResponse{conn: new(Conn)}
}(req)
}
})

finished := make(chan struct{}, 2)
go func() {
Expand All @@ -52,16 +64,22 @@ func TestBasicDialSync(t *testing.T) {
finished <- struct{}{}
}()

// short sleep just to make sure we've moved around in the scheduler
time.Sleep(time.Millisecond * 20)
done()

<-finished
<-finished

if len(callsch) > 1 {
t.Fatal("should only have called dial func once!")
}
// wait until both requests are registered
<<<<<<< Updated upstream
require.Eventually(t, func() bool { return len(requests) == 2 }, 100*time.Millisecond, 5*time.Millisecond, "expected both requests to be processed")
// make the dials return
close(done)
// make sure the Dial functions return
require.Eventually(t, func() bool { return len(finished) == 2 }, 100*time.Millisecond, 5*time.Millisecond, "dial functions should have returned")
=======
require.Eventually(t, func() bool { return len(requests) == 2 }, 100*time.Millisecond, 10*time.Millisecond, "expected both requests to be processed")
// make the dials return
close(done)
// make sure the Dial functions return
require.Eventually(t, func() bool { return len(finished) == 2 }, 100*time.Millisecond, 10*time.Millisecond, "dial functions should have returned")
>>>>>>> Stashed changes

require.Equal(t, 1, int(atomic.LoadInt32(&counter)), "should only have called dial func once!")
}

func TestDialSyncCancel(t *testing.T) {
Expand Down
69 changes: 21 additions & 48 deletions p2p/protocol/holepunch/holepunch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,6 @@ func TestEndToEndSimConnect(t *testing.T) {
defer h2.Close()
defer relay.Close()

// wait till a direct connection is complete
ensureDirectConn(t, h1, h2)
// ensure no hole-punching streams are open on either side
ensureNoHolePunchingStream(t, h1, h2)
var h2Events []*holepunch.Event
require.Eventually(t,
func() bool {
Expand All @@ -139,6 +135,27 @@ func TestEndToEndSimConnect(t *testing.T) {
require.Len(t, h1Events, 2)
require.Equal(t, holepunch.StartHolePunchEvtT, h1Events[0].Type)
require.Equal(t, holepunch.HolePunchAttemptEvtT, h1Events[1].Type)

// check that we have a direct connection
for _, c := range h1.Network().ConnsToPeer(h2.ID()) {
if _, err := c.RemoteMultiaddr().ValueForProtocol(ma.P_CIRCUIT); err != nil {
t.Fatal("didn't expect any relayed connection to be open at this point")
}
}
for _, c := range h2.Network().ConnsToPeer(h1.ID()) {
if _, err := c.RemoteMultiaddr().ValueForProtocol(ma.P_CIRCUIT); err != nil {
t.Fatal("didn't expect any relayed connection to be open at this point")
}
}

// check that no hole-punching streams are open
for _, c := range h1.Network().ConnsToPeer(h2.ID()) {
for _, s := range c.GetStreams() {
if s.ID() == string(holepunch.Protocol) {
t.Fatal("didn't expect any hole punching streams to be open at this point")
}
}
}
}

func TestFailuresOnInitiator(t *testing.T) {
Expand Down Expand Up @@ -294,50 +311,6 @@ func TestFailuresOnResponder(t *testing.T) {
}
}

func ensureNoHolePunchingStream(t *testing.T, h1, h2 host.Host) {
require.Eventually(t, func() bool {
for _, c := range h1.Network().ConnsToPeer(h2.ID()) {
for _, s := range c.GetStreams() {
if s.ID() == string(holepunch.Protocol) {
return false
}
}
}
return true
}, 5*time.Second, 50*time.Millisecond)

require.Eventually(t, func() bool {
for _, c := range h2.Network().ConnsToPeer(h1.ID()) {
for _, s := range c.GetStreams() {
if s.ID() == string(holepunch.Protocol) {
return false
}
}
}
return true
}, 5*time.Second, 50*time.Millisecond)
}

func ensureDirectConn(t *testing.T, h1, h2 host.Host) {
require.Eventually(t, func() bool {
for _, c := range h1.Network().ConnsToPeer(h2.ID()) {
if _, err := c.RemoteMultiaddr().ValueForProtocol(ma.P_CIRCUIT); err != nil {
return true
}
}
return false
}, 5*time.Second, 50*time.Millisecond)

require.Eventually(t, func() bool {
for _, c := range h2.Network().ConnsToPeer(h1.ID()) {
if _, err := c.RemoteMultiaddr().ValueForProtocol(ma.P_CIRCUIT); err != nil {
return true
}
}
return false
}, 5*time.Second, 50*time.Millisecond)
}

func mkHostWithStaticAutoRelay(t *testing.T, relay host.Host) host.Host {
if race.WithRace() {
t.Skip("modifying manet.Private4 is racy")
Expand Down
4 changes: 2 additions & 2 deletions p2p/protocol/identify/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,8 @@ func TestIdentifyDeltaOnProtocolChange(t *testing.T) {
defer sub.Close()

// add two new protocols in h2 and wait for identify to send deltas.
h2.SetStreamHandler(protocol.ID("foo"), func(_ network.Stream) {})
h2.SetStreamHandler(protocol.ID("bar"), func(_ network.Stream) {})
h2.SetStreamHandler("foo", func(_ network.Stream) {})
h2.SetStreamHandler("bar", func(_ network.Stream) {})

// check that h1 now knows about h2's new protocols.
require.Eventually(t, func() bool {
Expand Down

0 comments on commit 9c49349

Please sign in to comment.