diff --git a/netsync/manager.go b/netsync/manager.go index 51c2cbe4a8..603fca6ec8 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -79,6 +79,13 @@ type headersMsg struct { peer *peerpkg.Peer } +// notFoundMsg packages a bitcoin notfound message and the peer it came from +// together so the block handler has access to that information. +type notFoundMsg struct { + notFound *wire.MsgNotFound + peer *peerpkg.Peer +} + // donePeerMsg signifies a newly disconnected peer to the block handler. type donePeerMsg struct { peer *peerpkg.Peer @@ -1012,6 +1019,32 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) { } } +// handleNotFoundMsg handles notfound messages from all peers. +func (sm *SyncManager) handleNotFoundMsg(nfmsg *notFoundMsg) { + peer := nfmsg.peer + state, exists := sm.peerStates[peer] + if !exists { + log.Warnf("Received notfound message from unknown peer %s", peer) + return + } + for _, inv := range nfmsg.notFound.InvList { + // verify the hash was actually announced by the peer + // before deleting from the global requested maps. + switch inv.Type { + case wire.InvTypeBlock: + if _, exists := state.requestedBlocks[inv.Hash]; exists { + delete(state.requestedBlocks, inv.Hash) + delete(sm.requestedBlocks, inv.Hash) + } + case wire.InvTypeTx: + if _, exists := state.requestedTxns[inv.Hash]; exists { + delete(state.requestedTxns, inv.Hash) + delete(sm.requestedTxns, inv.Hash) + } + } + } +} + // haveInventory returns whether or not the inventory represented by the passed // inventory vector is known. This includes checking all of the various places // inventory can be when it is in different states such as blocks that are part @@ -1293,6 +1326,9 @@ out: case *headersMsg: sm.handleHeadersMsg(msg) + case *notFoundMsg: + sm.handleNotFoundMsg(msg) + case *donePeerMsg: sm.handleDonePeerMsg(msg.peer) @@ -1490,6 +1526,18 @@ func (sm *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer sm.msgChan <- &headersMsg{headers: headers, peer: peer} } +// QueueNotFound adds the passed notfound message and peer to the block handling +// queue. +func (sm *SyncManager) QueueNotFound(notFound *wire.MsgNotFound, peer *peerpkg.Peer) { + // No channel handling here because peers do not need to block on + // reject messages. + if atomic.LoadInt32(&sm.shutdown) != 0 { + return + } + + sm.msgChan <- ¬FoundMsg{notFound: notFound, peer: peer} +} + // DonePeer informs the blockmanager that a peer has disconnected. func (sm *SyncManager) DonePeer(peer *peerpkg.Peer) { // Ignore if we are shutting down. diff --git a/server.go b/server.go index 9a76be8d4c..c9f23fa638 100644 --- a/server.go +++ b/server.go @@ -364,14 +364,14 @@ func (sp *serverPeer) pushAddrMsg(addresses []*wire.NetAddress) { // threshold, a warning is logged including the reason provided. Further, if // the score is above the ban threshold, the peer will be banned and // disconnected. -func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) { +func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) bool { // No warning is logged and no score is calculated if banning is disabled. if cfg.DisableBanning { - return + return false } if sp.isWhitelisted { peerLog.Debugf("Misbehaving whitelisted peer %s: %s", sp, reason) - return + return false } warnThreshold := cfg.BanThreshold >> 1 @@ -383,7 +383,7 @@ func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) { peerLog.Warnf("Misbehaving peer %s: %s -- ban score is %d, "+ "it was not increased this time", sp, reason, score) } - return + return false } score := sp.banScore.Increase(persistent, transient) if score > warnThreshold { @@ -394,8 +394,10 @@ func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) { sp) sp.server.BanPeer(sp) sp.Disconnect() + return true } } + return false } // hasServices returns whether or not the provided advertised service flags have @@ -498,7 +500,9 @@ func (sp *serverPeer) OnMemPool(_ *peer.Peer, msg *wire.MsgMemPool) { // The ban score accumulates and passes the ban threshold if a burst of // mempool messages comes from a peer. The score decays each minute to // half of its value. - sp.addBanScore(0, 33, "mempool") + if sp.addBanScore(0, 33, "mempool") { + return + } // Generate inventory message with the available transactions in the // transaction memory pool. Limit it to the max allowed inventory @@ -638,7 +642,9 @@ func (sp *serverPeer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) { // bursts of small requests are not penalized as that would potentially ban // peers performing IBD. // This incremental score decays each minute to half of its value. - sp.addBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata") + if sp.addBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata") { + return + } // We wait on this wait channel periodically to prevent queuing // far more data than we can send in a reasonable time, wasting memory. @@ -1304,6 +1310,44 @@ func (sp *serverPeer) OnWrite(_ *peer.Peer, bytesWritten int, msg wire.Message, sp.server.AddBytesSent(uint64(bytesWritten)) } +// OnNotFound is invoked when a peer sends a notfound message. +func (sp *serverPeer) OnNotFound(p *peer.Peer, msg *wire.MsgNotFound) { + if !sp.Connected() { + return + } + + var numBlocks, numTxns uint32 + for _, inv := range msg.InvList { + switch inv.Type { + case wire.InvTypeBlock: + numBlocks++ + case wire.InvTypeTx: + numTxns++ + default: + peerLog.Debugf("Invalid inv type '%d' in notfound message from %s", + inv.Type, sp) + sp.Disconnect() + return + } + } + if numBlocks > 0 { + blockStr := pickNoun(uint64(numBlocks), "block", "blocks") + reason := fmt.Sprintf("%d %v not found", numBlocks, blockStr) + if sp.addBanScore(20*numBlocks, 0, reason) { + return + } + } + if numTxns > 0 { + txStr := pickNoun(uint64(numTxns), "transaction", "transactions") + reason := fmt.Sprintf("%d %v not found", numBlocks, txStr) + if sp.addBanScore(0, 10*numTxns, reason) { + return + } + } + + sp.server.syncManager.QueueNotFound(msg, p) +} + // randomUint16Number returns a random uint16 in a specified input range. Note // that the range is in zeroth ordering; if you pass it 1800, you will get // values from 0 to 1800. @@ -1998,6 +2042,7 @@ func newPeerConfig(sp *serverPeer) *peer.Config { OnAddr: sp.OnAddr, OnRead: sp.OnRead, OnWrite: sp.OnWrite, + OnNotFound: sp.OnNotFound, // Note: The reference client currently bans peers that send alerts // not signed with its key. We could verify against their key, but