Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blockmanager: handle notfound messages and limits #2253

Merged
merged 4 commits into from
Jul 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 58 additions & 12 deletions blockmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ type headersMsg struct {
peer *peerpkg.Peer
}

// notFoundMsg packages a Decred notfound message and the peer it came from
// together so the block handler has access to that information.
type notFoundMsg struct {
notFound *wire.MsgNotFound
peer *peerpkg.Peer
}

// donePeerMsg signifies a newly disconnected peer to the block handler.
type donePeerMsg struct {
peer *peerpkg.Peer
Expand Down Expand Up @@ -730,8 +737,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) {
if err != nil {
// Do not request this transaction again until a new block
// has been processed.
b.rejectedTxns[*txHash] = struct{}{}
b.limitMap(b.rejectedTxns, maxRejectedTxns)
limitAdd(b.rejectedTxns, *txHash, maxRejectedTxns)

// When the error is a rule error, it means the transaction was
// simply rejected as opposed to something actually going wrong,
Expand Down Expand Up @@ -1341,6 +1347,32 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
}
}

// handleNotFoundMsg handles notfound messages from all peers.
func (b *blockManager) handleNotFoundMsg(nfmsg *notFoundMsg) {
peer := nfmsg.peer
state, exists := b.peerStates[peer]
if !exists {
bmgrLog.Warnf("Received notfound message from unknown peer %s", peer)
return
}
for _, inv := range nfmsg.notFound.InvList {
// verify the hash was actually announced by the peer
// before deleting from the global requested maps.
switch inv.Type {
case wire.InvTypeBlock:
if _, exists := state.requestedBlocks[inv.Hash]; exists {
delete(state.requestedBlocks, inv.Hash)
delete(b.requestedBlocks, inv.Hash)
}
case wire.InvTypeTx:
if _, exists := state.requestedTxns[inv.Hash]; exists {
delete(state.requestedTxns, inv.Hash)
delete(b.requestedTxns, inv.Hash)
}
}
}
}

// haveInventory returns whether or not the inventory represented by the passed
// inventory vector is known. This includes checking all of the various places
// inventory can be when it is in different states such as blocks that are part
Expand Down Expand Up @@ -1525,9 +1557,8 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
// Request the block if there is not already a pending
// request.
if _, exists := b.requestedBlocks[iv.Hash]; !exists {
b.requestedBlocks[iv.Hash] = struct{}{}
b.limitMap(b.requestedBlocks, maxRequestedBlocks)
state.requestedBlocks[iv.Hash] = struct{}{}
limitAdd(b.requestedBlocks, iv.Hash, maxRequestedBlocks)
limitAdd(state.requestedBlocks, iv.Hash, maxRequestedBlocks)
gdmsg.AddInvVect(iv)
numRequested++
}
Expand All @@ -1536,9 +1567,8 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
// Request the transaction if there is not already a
// pending request.
if _, exists := b.requestedTxns[iv.Hash]; !exists {
b.requestedTxns[iv.Hash] = struct{}{}
b.limitMap(b.requestedTxns, maxRequestedTxns)
state.requestedTxns[iv.Hash] = struct{}{}
limitAdd(b.requestedTxns, iv.Hash, maxRequestedTxns)
limitAdd(state.requestedTxns, iv.Hash, maxRequestedTxns)
gdmsg.AddInvVect(iv)
numRequested++
}
Expand All @@ -1561,10 +1591,10 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
}
}

// limitMap is a helper function for maps that require a maximum limit by
// evicting a random transaction if adding a new value would cause it to
// limitAdd is a helper function for maps that require a maximum limit by
// evicting a random value if adding the new value would cause it to
// overflow the maximum allowed.
func (b *blockManager) limitMap(m map[chainhash.Hash]struct{}, limit int) {
func limitAdd(m map[chainhash.Hash]struct{}, hash chainhash.Hash, limit int) {
if len(m)+1 > limit {
// Remove a random entry from the map. For most compilers, Go's
// range statement iterates starting at a random item although
Expand All @@ -1574,9 +1604,10 @@ func (b *blockManager) limitMap(m map[chainhash.Hash]struct{}, limit int) {
// order to target eviction of specific entries anyways.
for txHash := range m {
delete(m, txHash)
return
break
}
}
m[hash] = struct{}{}
}

// blockHandler is the main handler for the block manager. It must be run
Expand Down Expand Up @@ -1608,6 +1639,9 @@ out:
case *headersMsg:
b.handleHeadersMsg(msg)

case *notFoundMsg:
b.handleNotFoundMsg(msg)

case *donePeerMsg:
b.handleDonePeerMsg(msg.peer)

Expand Down Expand Up @@ -2183,6 +2217,18 @@ func (b *blockManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer
b.msgChan <- &headersMsg{headers: headers, peer: peer}
}

// QueueNotFound adds the passed notfound message and peer to the block handling
// queue.
func (b *blockManager) QueueNotFound(notFound *wire.MsgNotFound, peer *peerpkg.Peer) {
// No channel handling here because peers do not need to block on
// reject messages.
if atomic.LoadInt32(&b.shutdown) != 0 {
return
}

b.msgChan <- &notFoundMsg{notFound: notFound, peer: peer}
}

// DonePeer informs the blockmanager that a peer has disconnected.
func (b *blockManager) DonePeer(peer *peerpkg.Peer) {
// Ignore if we are shutting down.
Expand Down
56 changes: 50 additions & 6 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,14 +578,14 @@ func (sp *serverPeer) pushAddrMsg(addresses []*wire.NetAddress) {
// threshold, a warning is logged including the reason provided. Further, if
// the score is above the ban threshold, the peer will be banned and
// disconnected.
func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) {
func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) bool {
// No warning is logged and no score is calculated if banning is disabled.
if cfg.DisableBanning {
return
return false
}
if sp.isWhitelisted {
peerLog.Debugf("Misbehaving whitelisted peer %s: %s", sp, reason)
return
return false
}

warnThreshold := cfg.BanThreshold >> 1
Expand All @@ -597,7 +597,7 @@ func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) {
peerLog.Warnf("Misbehaving peer %s: %s -- ban score is %d, "+
"it was not increased this time", sp, reason, score)
}
return
return false
}
score := sp.banScore.Increase(persistent, transient)
if score > warnThreshold {
Expand All @@ -608,8 +608,10 @@ func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) {
sp)
sp.server.BanPeer(sp)
sp.Disconnect()
return true
}
}
return false
}

// hasServices returns whether or not the provided advertised service flags have
Expand Down Expand Up @@ -713,7 +715,9 @@ func (sp *serverPeer) OnMemPool(p *peer.Peer, msg *wire.MsgMemPool) {
// The ban score accumulates and passes the ban threshold if a burst of
// mempool messages comes from a peer. The score decays each minute to
// half of its value.
sp.addBanScore(0, 33, "mempool")
if sp.addBanScore(0, 33, "mempool") {
return
}

// Generate inventory message with the available transactions in the
// transaction memory pool. Limit it to the max allowed inventory
Expand Down Expand Up @@ -981,7 +985,9 @@ func (sp *serverPeer) OnGetData(p *peer.Peer, msg *wire.MsgGetData) {
// bursts of small requests are not penalized as that would potentially ban
// peers performing IBD.
// This incremental score decays each minute to half of its value.
sp.addBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata")
if sp.addBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata") {
return
}

// We wait on this wait channel periodically to prevent queuing
// far more data than we can send in a reasonable time, wasting memory.
Expand Down Expand Up @@ -1434,6 +1440,43 @@ func (sp *serverPeer) OnWrite(p *peer.Peer, bytesWritten int, msg wire.Message,
sp.server.AddBytesSent(uint64(bytesWritten))
}

// OnNotFound is invoked when a peer sends a notfound message.
func (sp *serverPeer) OnNotFound(p *peer.Peer, msg *wire.MsgNotFound) {
if !sp.Connected() {
return
}

var numBlocks, numTxns uint32
for _, inv := range msg.InvList {
switch inv.Type {
case wire.InvTypeBlock:
numBlocks++
case wire.InvTypeTx:
numTxns++
default:
peerLog.Debugf("Invalid inv type '%d' in notfound message from %s",
inv.Type, sp)
sp.Disconnect()
return
}
}
if numBlocks > 0 {
blockStr := pickNoun(uint64(numBlocks), "block", "blocks")
reason := fmt.Sprintf("%d %v not found", numBlocks, blockStr)
if sp.addBanScore(20*numBlocks, 0, reason) {
return
}
}
if numTxns > 0 {
txStr := pickNoun(uint64(numTxns), "transaction", "transactions")
reason := fmt.Sprintf("%d %v not found", numBlocks, txStr)
if sp.addBanScore(0, 10*numTxns, reason) {
return
}
}
sp.server.blockManager.QueueNotFound(msg, p)
}

// randomUint16Number returns a random uint16 in a specified input range. Note
// that the range is in zeroth ordering; if you pass it 1800, you will get
// values from 0 to 1800.
Expand Down Expand Up @@ -2108,6 +2151,7 @@ func newPeerConfig(sp *serverPeer) *peer.Config {
OnAddr: sp.OnAddr,
OnRead: sp.OnRead,
OnWrite: sp.OnWrite,
OnNotFound: sp.OnNotFound,
},
NewestBlock: sp.newestBlock,
HostToNetAddress: sp.server.addrManager.HostToNetAddress,
Expand Down