Skip to content

Commit

Permalink
Fix random failure in TestWebsocketNetworkPrioLimit
Browse files Browse the repository at this point in the history
  • Loading branch information
tsachiherman committed Jul 9, 2021
1 parent abc4058 commit a4a0ce7
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
2 changes: 2 additions & 0 deletions network/netprio.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package network

import (
"container/heap"
"sync/atomic"

"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/protocol"
Expand Down Expand Up @@ -125,6 +126,7 @@ func (pt *prioTracker) setPriority(peer *wsPeer, addr basics.Address, weight uin
peer.prioAddress = addr
peer.prioWeight = weight
heap.Fix(peersHeap{wn}, peer.peerIndex)
atomic.AddInt32(&wn.peersChangeCounter, 1)
}

func (pt *prioTracker) removePeer(peer *wsPeer) {
Expand Down
21 changes: 19 additions & 2 deletions network/wsNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1128,6 +1128,7 @@ func TestWebsocketNetworkPrioLimit(t *testing.T) {
netB := makeTestWebsocketNode(t)
netB.SetPrioScheme(&prioB)
netB.config.GossipFanout = 1
netB.config.NetAddress = ""
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counterB}})
netB.Start()
Expand All @@ -1141,37 +1142,53 @@ func TestWebsocketNetworkPrioLimit(t *testing.T) {
netC := makeTestWebsocketNode(t)
netC.SetPrioScheme(&prioC)
netC.config.GossipFanout = 1
netC.config.NetAddress = ""
netC.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
netC.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counterC}})
netC.Start()
defer func() { t.Log("stopping C"); netC.Stop(); t.Log("C done") }()

// Wait for response messages to propagate from B+C to A
select {
case <-netA.prioResponseChan:
case peer := <-netA.prioResponseChan:
netA.peersLock.RLock()
require.Subset(t, []uint64{prioB.prio, prioC.prio}, []uint64{peer.prioWeight})
netA.peersLock.RUnlock()
case <-time.After(time.Second):
t.Errorf("timeout on netA.prioResponseChan 1")
}
select {
case <-netA.prioResponseChan:
case peer := <-netA.prioResponseChan:
netA.peersLock.RLock()
require.Subset(t, []uint64{prioB.prio, prioC.prio}, []uint64{peer.prioWeight})
netA.peersLock.RUnlock()
case <-time.After(time.Second):
t.Errorf("timeout on netA.prioResponseChan 2")
}
waitReady(t, netA, time.After(time.Second))

firstPeer := netA.peers[0]
netA.Broadcast(context.Background(), protocol.TxnTag, nil, true, nil)

failed := false
select {
case <-counterBdone:
case <-time.After(time.Second):
t.Errorf("timeout, B did not receive message")
failed = true
}

select {
case <-counterCdone:
t.Errorf("C received message")
failed = true
case <-time.After(time.Second):
}

if failed {
t.Errorf("NetA had the following two peers priorities : [0]:%s=%d [1]:%s=%d", netA.peers[0].rootURL, netA.peers[0].prioWeight, netA.peers[1].rootURL, netA.peers[1].prioWeight)
t.Errorf("first peer before broadcasting was %s", firstPeer.rootURL)
}
}

// Create many idle connections, to see if we have excessive CPU utilization.
Expand Down

0 comments on commit a4a0ce7

Please sign in to comment.