Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network: non-participating nodes request TX gossip only if ForceFetchTransactions: true #3918

Merged
merged 24 commits into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
98a4650
non-relay non-participating nodes opt out of TX gossip using message-…
brianolson Apr 21, 2022
643c519
fix isParticipating logic
brianolson Apr 24, 2022
b7442ce
bug hunting logging
brianolson Apr 25, 2022
e9f6d3f
also send MOI on outgoing connections
brianolson Apr 26, 2022
bb32d57
use ForceFetchTransactions config
brianolson Apr 27, 2022
64a250e
more efficient HasLiveKeys
brianolson Apr 28, 2022
a486325
testing
brianolson May 3, 2022
4a54c54
fix TestSlowPeerDisconnection
brianolson May 4, 2022
78fdf13
lint. test tweak
brianolson May 4, 2022
69ca188
cleanup development logging
brianolson May 5, 2022
a02b044
spelling
brianolson May 9, 2022
4f33b10
review tweak
brianolson May 14, 2022
d09cfab
Merge remote-tracking branch 'origin/master' into npn-no-TX
brianolson May 16, 2022
be6b13e
use a lock, fix a test race
brianolson May 16, 2022
8284573
use a lock, fix a test race
brianolson May 16, 2022
2050737
simplify
brianolson May 16, 2022
0fe106f
more test conditions. cleanup.
brianolson May 16, 2022
cbdb6e9
wn.node -> wn.nodeInfo
brianolson May 17, 2022
97b1d5c
review tweaks
brianolson May 18, 2022
1ea7803
check wn.nodeInfo != nil in OnNetworkAdvance
cce May 20, 2022
a74255a
don't set nodeNopeInfo by default
cce May 20, 2022
c2806d7
Revert "don't set nodeNopeInfo by default"
cce May 20, 2022
4a96326
use atomic to access peer.messagesOfInterestGeneration
cce May 23, 2022
df15614
TestWebsocketNetworkMessageOfInterest had a rare race/deadlock proble…
brianolson May 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions components/mocks/mockParticipationRegistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ func (m *MockParticipationRegistry) GetStateProofForRound(id account.Participati
return account.StateProofRecordForRound{}, nil
}

// HasLiveKeys quickly tests to see if there is a valid participation key over some range of rounds
func (m *MockParticipationRegistry) HasLiveKeys(from, to basics.Round) bool {
return false
}

// Register updates the EffectiveFirst and EffectiveLast fields. If there are multiple records for the account
// then it is possible for multiple records to be updated.
func (m *MockParticipationRegistry) Register(id account.ParticipationID, on basics.Round) error {
Expand Down
15 changes: 15 additions & 0 deletions data/account/participationRegistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ type ParticipationRegistry interface {
// GetStateProofForRound fetches a record with stateproof secrets for a particular round.
GetStateProofForRound(id ParticipationID, round basics.Round) (StateProofRecordForRound, error)

// HasLiveKeys quickly tests to see if there is a valid participation key over some range of rounds
HasLiveKeys(from, to basics.Round) bool

// Register updates the EffectiveFirst and EffectiveLast fields. If there are multiple records for the account
// then it is possible for multiple records to be updated.
Register(id ParticipationID, on basics.Round) error
Expand Down Expand Up @@ -726,6 +729,18 @@ func (db *participationDB) GetAll() []ParticipationRecord {
return results
}

func (db *participationDB) HasLiveKeys(from, to basics.Round) bool {
db.mutex.RLock()
defer db.mutex.RUnlock()

for _, record := range db.cache {
if record.OverlapsInterval(from, to) {
return true
}
}
return false
}

// GetStateProofForRound returns the state proof data required to sign the compact certificate for this round
func (db *participationDB) GetStateProofForRound(id ParticipationID, round basics.Round) (StateProofRecordForRound, error) {
partRecord, err := db.GetForRound(id, round)
Expand Down
7 changes: 1 addition & 6 deletions data/accountManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,7 @@ func (manager *AccountManager) HasLiveKeys(from, to basics.Round) bool {
manager.mu.Lock()
defer manager.mu.Unlock()

for _, part := range manager.registry.GetAll() {
if part.OverlapsInterval(from, to) {
return true
}
}
return false
return manager.registry.HasLiveKeys(from, to)
}

// AddParticipation adds a new account.Participation to be managed.
Expand Down
137 changes: 122 additions & 15 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,19 @@ func Propagate(msg IncomingMessage) OutgoingMessage {
// Contains {genesisID} param to be handled by gorilla/mux
const GossipNetworkPath = "/v1/{genesisID}/gossip"

// NodeInfo helps the network get information about the node it is running on
type NodeInfo interface {
// IsParticipating returns true if this node has stake and may vote on blocks or propose blocks.
IsParticipating() bool
}

type nopeNodeInfo struct {
}

func (nnni *nopeNodeInfo) IsParticipating() bool {
return false
}

// WebsocketNetwork implements GossipNode
type WebsocketNetwork struct {
listener net.Listener
Expand Down Expand Up @@ -397,21 +410,34 @@ type WebsocketNetwork struct {
// to be sent to new peers. This is filled in at network start,
// at which point messagesOfInterestEncoded is set to prevent
// further changes.
messagesOfInterestEnc []byte
messagesOfInterestEncoded bool
messagesOfInterestEnc []byte
messagesOfInterestEncoded bool
messagesOfInterestGeneration uint32

// messagesOfInterestMu protects messagesOfInterest and ensures
// that messagesOfInterestEnc does not change once it is set during
// network start.
messagesOfInterestMu deadlock.Mutex
messagesOfInterestMu deadlock.Mutex
messagesOfInterestCond *sync.Cond

// peersConnectivityCheckTicker is the timer for testing that all the connected peers
// are still transmitting or receiving information. The channel produced by this ticker
// is consumed by any of the messageHandlerThread(s). The ticker itself is created during
// Start(), and being shut down when Stop() is called.
peersConnectivityCheckTicker *time.Ticker

node NodeInfo

// atomic {0:unknown, 1:yes, 2:no}
wantTXGossip uint32
}

const (
wantTXGossipUnk = 0
wantTXGossipYes = 1
wantTXGossipNo = 2
)

type broadcastRequest struct {
tags []Tag
data [][]byte
Expand Down Expand Up @@ -661,6 +687,9 @@ func (wn *WebsocketNetwork) setup() {
if wn.config.DNSSecurityRelayAddrEnforced() {
preferredResolver = dnssec.MakeDefaultDnssecResolver(wn.config.FallbackDNSResolverAddress, wn.log)
}
if wn.node == nil {
wn.node = &nopeNodeInfo{}
}
maxIdleConnsPerHost := int(wn.config.ConnectionsRateLimitingCount)
wn.dialer = makeRateLimitingDialer(wn.phonebook, preferredResolver)
wn.transport = makeRateLimitingTransport(wn.phonebook, 10*time.Second, &wn.dialer, maxIdleConnsPerHost)
Expand All @@ -684,6 +713,9 @@ func (wn *WebsocketNetwork) setup() {
wn.server.MaxHeaderBytes = httpServerMaxHeaderBytes
wn.ctx, wn.ctxCancel = context.WithCancel(context.Background())
wn.relayMessages = wn.config.NetAddress != "" || wn.config.ForceRelayMessages
if wn.relayMessages || wn.config.ForceFetchTransactions {
wn.wantTXGossip = wantTXGossipYes
}
// roughly estimate the number of messages that could be seen at any given moment.
// For the late/redo/down committee, which happen in parallel, we need to allocate
// extra space there.
Expand Down Expand Up @@ -732,6 +764,8 @@ func (wn *WebsocketNetwork) setup() {
SupportedProtocolVersions = []string{wn.config.NetworkProtocolVersion}
}

wn.messagesOfInterestCond = sync.NewCond(&wn.messagesOfInterestMu)
wn.messagesOfInterestGeneration = 1 // something nonzero so that any new wsPeer needs updating
if wn.relayMessages {
wn.RegisterMessageInterest(protocol.CompactCertSigTag)
}
Expand Down Expand Up @@ -798,6 +832,9 @@ func (wn *WebsocketNetwork) Start() {
wn.wg.Add(1)
go wn.prioWeightRefresh()
}

go wn.postMessagesOfInterestThraed()

wn.log.Infof("serving genesisID=%s on %#v with RandomID=%s", wn.GenesisID, wn.PublicAddress(), wn.RandomID)
}

Expand Down Expand Up @@ -1123,18 +1160,28 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt
InstanceName: trackedRequest.otherInstanceName,
})

// We are careful to encode this prior to starting the server to avoid needing 'messagesOfInterestMu' here.
if wn.messagesOfInterestEnc != nil {
err = peer.Unicast(wn.ctx, wn.messagesOfInterestEnc, protocol.MsgOfInterestTag)
if err != nil {
wn.log.Infof("ws send msgOfInterest: %v", err)
}
}
wn.maybeSendMessagesOfInterest(peer, nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not pass in wn.messagesOfInterestEnc here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because wn.messagesOfInterestEnc should only be grabbed inside wn.messagesOfInterestMu.Lock().
Having it as an argument is really just an optimization for postMessagesOfInterestThread() which already holds the lock while doing a Cond.Wait().
I could put a lock/get/unlock three lines in front of the two other calls to maybeSendMOI() but it's slightly fewer lines of code to do it inside that? (and still reasonable separation of concerns?)


peers.Set(float64(wn.NumPeers()), nil)
incomingPeers.Set(float64(wn.numIncomingPeers()), nil)
}

func (wn *WebsocketNetwork) maybeSendMessagesOfInterest(peer *wsPeer, messagesOfInterestEnc []byte) {
messagesOfInterestGeneration := atomic.LoadUint32(&wn.messagesOfInterestGeneration)
if peer.messagesOfInterestGeneration != messagesOfInterestGeneration {
if messagesOfInterestEnc == nil {
wn.messagesOfInterestMu.Lock()
messagesOfInterestEnc = wn.messagesOfInterestEnc
wn.messagesOfInterestMu.Unlock()
}
if messagesOfInterestEnc != nil {
peer.sendMessagesOfInterest(messagesOfInterestGeneration, messagesOfInterestEnc)
} else {
wn.log.Infof("msgOfInterest Enc=nil")
}
}
}

func (wn *WebsocketNetwork) messageHandlerThread(peersConnectivityCheckCh <-chan time.Time) {
defer wn.wg.Done()

Expand Down Expand Up @@ -1677,6 +1724,22 @@ func (wn *WebsocketNetwork) OnNetworkAdvance() {
wn.lastNetworkAdvanceMu.Lock()
defer wn.lastNetworkAdvanceMu.Unlock()
wn.lastNetworkAdvance = time.Now().UTC()
if !wn.relayMessages && !wn.config.ForceFetchTransactions {
// if we're not a relay, and not participating, we don't need txn pool
wantTXGossipPrev := atomic.LoadUint32(&wn.wantTXGossip)
wantTXGossip := wn.node.IsParticipating()
if wantTXGossip && (wantTXGossipPrev != wantTXGossipYes) {
didChange := atomic.CompareAndSwapUint32(&wn.wantTXGossip, wantTXGossipPrev, wantTXGossipYes)
if didChange {
wn.RegisterMessageInterest(protocol.TxnTag)
}
} else if !wantTXGossip && (wantTXGossipPrev != wantTXGossipNo) {
didChange := atomic.CompareAndSwapUint32(&wn.wantTXGossip, wantTXGossipPrev, wantTXGossipNo)
if didChange {
wn.DeregisterMessageInterest(protocol.TxnTag)
}
}
}
}

// sendPeerConnectionsTelemetryStatus sends a snapshot of the currently connected peers
Expand Down Expand Up @@ -2046,6 +2109,8 @@ func (wn *WebsocketNetwork) tryConnect(addr, gossipAddr string) {
Endpoint: peer.GetAddress(),
})

wn.maybeSendMessagesOfInterest(peer, nil)

peers.Set(float64(wn.NumPeers()), nil)
outgoingPeers.Set(float64(wn.numOutgoingPeers()), nil)

Expand Down Expand Up @@ -2085,7 +2150,7 @@ func (wn *WebsocketNetwork) SetPeerData(peer Peer, key string, value interface{}
}

// NewWebsocketNetwork constructor for websockets based gossip network
func NewWebsocketNetwork(log logging.Logger, config config.Local, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID) (wn *WebsocketNetwork, err error) {
func NewWebsocketNetwork(log logging.Logger, config config.Local, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID, node NodeInfo) (wn *WebsocketNetwork, err error) {
phonebook := MakePhonebook(config.ConnectionsRateLimitingCount,
time.Duration(config.ConnectionsRateLimitingWindowSeconds)*time.Second)
phonebook.ReplacePeerList(phonebookAddresses, config.DNSBootstrapID, PhoneBookEntryRelayRole)
Expand All @@ -2095,6 +2160,7 @@ func NewWebsocketNetwork(log logging.Logger, config config.Local, phonebookAddre
phonebook: phonebook,
GenesisID: genesisID,
NetworkID: networkID,
node: node,
}

wn.setup()
Expand All @@ -2103,7 +2169,7 @@ func NewWebsocketNetwork(log logging.Logger, config config.Local, phonebookAddre

// NewWebsocketGossipNode constructs a websocket network node and returns it as a GossipNode interface implementation
func NewWebsocketGossipNode(log logging.Logger, config config.Local, phonebookAddresses []string, genesisID string, networkID protocol.NetworkID) (gn GossipNode, err error) {
return NewWebsocketNetwork(log, config, phonebookAddresses, genesisID, networkID)
return NewWebsocketNetwork(log, config, phonebookAddresses, genesisID, networkID, nil)
}

// SetPrioScheme specifies the network priority scheme for a network node
Expand Down Expand Up @@ -2254,21 +2320,62 @@ func (wn *WebsocketNetwork) RegisterMessageInterest(t protocol.Tag) error {
wn.messagesOfInterestMu.Lock()
defer wn.messagesOfInterestMu.Unlock()

if wn.messagesOfInterestEncoded {
return fmt.Errorf("network already started")
if wn.messagesOfInterest == nil {
wn.messagesOfInterest = make(map[protocol.Tag]bool)
for tag, flag := range defaultSendMessageTags {
wn.messagesOfInterest[tag] = flag
}
}

wn.messagesOfInterest[t] = true
wn.updateMessagesOfInterestEnc()
return nil
}

// DeregisterMessageInterest will tell peers to no longer send us traffic with a protocol Tag
func (wn *WebsocketNetwork) DeregisterMessageInterest(t protocol.Tag) error {
wn.messagesOfInterestMu.Lock()
defer wn.messagesOfInterestMu.Unlock()

if wn.messagesOfInterest == nil {
wn.messagesOfInterest = make(map[protocol.Tag]bool)
for tag, flag := range defaultSendMessageTags {
wn.messagesOfInterest[tag] = flag
}
}

wn.messagesOfInterest[t] = true
delete(wn.messagesOfInterest, t)
wn.updateMessagesOfInterestEnc()
return nil
}

func (wn *WebsocketNetwork) updateMessagesOfInterestEnc() {
// must run inside wn.messagesOfInterestMu.Lock
wantTXGossip := atomic.LoadUint32(&wn.wantTXGossip)
if wantTXGossip != wantTXGossipNo {
wn.messagesOfInterest[protocol.TxnTag] = true
} else {
delete(wn.messagesOfInterest, protocol.TxnTag)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic looks like belongs to the caller where wn.messagesOfInterest is updated (added, deleted)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. wound up being able to simplify away a chunk of logic that became redundant during development.

wn.messagesOfInterestEnc = MarshallMessageOfInterestMap(wn.messagesOfInterest)
wn.messagesOfInterestEncoded = true
atomic.AddUint32(&wn.messagesOfInterestGeneration, 1)
wn.messagesOfInterestCond.Broadcast()
}

func (wn *WebsocketNetwork) postMessagesOfInterestThraed() {
var peers []*wsPeer
wn.messagesOfInterestMu.Lock()
defer wn.messagesOfInterestMu.Unlock()
for {
wn.messagesOfInterestCond.Wait()
peers, _ = wn.peerSnapshot(peers)
for _, peer := range peers {
wn.maybeSendMessagesOfInterest(peer, wn.messagesOfInterestEnc)
}
}
}

// SubstituteGenesisID substitutes the "{genesisID}" with their network-specific genesisID.
func (wn *WebsocketNetwork) SubstituteGenesisID(rawURL string) string {
return strings.Replace(rawURL, "{genesisID}", wn.GenesisID, -1)
Expand Down
Loading