diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 2cc0e5e1e4a0..c160e9b2e6e7 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -477,7 +477,7 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) { defer p.lock.RUnlock() return p.headerThroughput } - return ps.idlePeers(62, 101, idle, throughput) + return ps.idlePeers(62, 200, idle, throughput) } // BodyIdlePeers retrieves a flat list of all the currently body-idle peers within @@ -491,7 +491,7 @@ func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) { defer p.lock.RUnlock() return p.blockThroughput } - return ps.idlePeers(62, 101, idle, throughput) + return ps.idlePeers(62, 200, idle, throughput) } // ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers @@ -505,7 +505,7 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) { defer p.lock.RUnlock() return p.receiptThroughput } - return ps.idlePeers(63, 101, idle, throughput) + return ps.idlePeers(63, 200, idle, throughput) } // NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle @@ -519,7 +519,7 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) { defer p.lock.RUnlock() return p.stateThroughput } - return ps.idlePeers(63, 101, idle, throughput) + return ps.idlePeers(63, 200, idle, throughput) } // idlePeers retrieves a flat list of all currently idle peers satisfying the diff --git a/eth/handler.go b/eth/handler.go index d1c9e64df91d..c55aed34c5cb 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -699,7 +699,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } } - case p.version >= eth63 && msg.Code == GetNodeDataMsg: + case supportsEth63(p.version) && msg.Code == GetNodeDataMsg: // Decode the retrieval message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) if _, err := msgStream.List(); err != nil { @@ -726,7 +726,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } return p.SendNodeData(data) - case p.version >= eth63 && msg.Code == NodeDataMsg: + case supportsEth63(p.version) && msg.Code == NodeDataMsg: // A batch of node state data arrived to one of our previous requests var data [][]byte if err := msg.Decode(&data); err != nil { @@ -737,7 +737,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { log.Debug("Failed to deliver node state data", "err", err) } - case p.version >= eth63 && msg.Code == GetReceiptsMsg: + case supportsEth63(p.version) && msg.Code == GetReceiptsMsg: // Decode the retrieval message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) if _, err := msgStream.List(); err != nil { @@ -773,7 +773,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } return p.SendReceiptsRLP(receipts) - case p.version >= eth63 && msg.Code == ReceiptsMsg: + case supportsEth63(p.version) && msg.Code == ReceiptsMsg: // A batch of receipts arrived to one of our previous requests var receipts [][]*types.Receipt if err := msg.Decode(&receipts); err != nil { @@ -847,7 +847,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } } - case msg.Code == NewPooledTransactionHashesMsg && p.version >= eth65: + case msg.Code == NewPooledTransactionHashesMsg && supportsEth65(p.version): // New transaction announcement arrived, make sure we have // a valid and fresh chain to handle them if atomic.LoadUint32(&pm.acceptTxs) == 0 { @@ -863,7 +863,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.txFetcher.Notify(p.id, hashes) - case msg.Code == GetPooledTransactionsMsg && p.version >= eth65: + case msg.Code == GetPooledTransactionsMsg && supportsEth65(p.version): // Decode the retrieval message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) if _, err := msgStream.List(); err != nil { @@ -899,7 +899,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } return p.SendPooledTransactionsRLP(hashes, txs) - case msg.Code == TransactionMsg || (msg.Code == PooledTransactionsMsg && p.version >= eth65): + case msg.Code == TransactionMsg || (msg.Code == PooledTransactionsMsg && supportsEth65(p.version)): // Transactions arrived, make sure we have a valid and fresh chain to handle them if atomic.LoadUint32(&pm.acceptTxs) == 0 { break diff --git a/eth/peer.go b/eth/peer.go index f17bc08d36ac..8fae020c1764 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -783,23 +783,16 @@ func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis ) go func() { switch { - case p.version == xdpos2: - errc <- p2p.Send(p.rw, StatusMsg, &statusData63{ - ProtocolVersion: uint32(p.version), - NetworkId: network, - TD: td, - CurrentBlock: head, - GenesisBlock: genesis, - }) - case p.version == eth63: - errc <- p2p.Send(p.rw, StatusMsg, &statusData63{ + case supportsEth65(p.version): + errc <- p2p.Send(p.rw, StatusMsg, &statusData{ ProtocolVersion: uint32(p.version), - NetworkId: network, + NetworkID: network, TD: td, - CurrentBlock: head, - GenesisBlock: genesis, + Head: head, + Genesis: genesis, + ForkID: forkID, }) - case p.version >= eth64 || p.version >= xdpos22: + case supportsEth64(p.version): errc <- p2p.Send(p.rw, StatusMsg, &statusData{ ProtocolVersion: uint32(p.version), NetworkID: network, @@ -808,18 +801,24 @@ func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis Genesis: genesis, ForkID: forkID, }) + case supportsEth63(p.version): + errc <- p2p.Send(p.rw, StatusMsg, &statusData63{ + ProtocolVersion: uint32(p.version), + NetworkId: network, + TD: td, + CurrentBlock: head, + GenesisBlock: genesis, + }) default: panic(fmt.Sprintf("unsupported eth protocol version: %d", p.version)) } }() go func() { switch { - case p.version == xdpos2: - errc <- p.readStatusLegacy(network, &status63, genesis) - case p.version == eth63: - errc <- p.readStatusLegacy(network, &status63, genesis) - case p.version >= eth64 || p.version >= xdpos22: //include xdpos22 condition for completeness + case supportsEth64(p.version): errc <- p.readStatus(network, &status, genesis, forkFilter) + case supportsEth63(p.version): + errc <- p.readStatusLegacy(network, &status63, genesis) default: panic(fmt.Sprintf("unsupported eth protocol version: %d", p.version)) } @@ -837,12 +836,10 @@ func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis } } switch { - case p.version == xdpos2: - p.td, p.head = status63.TD, status63.CurrentBlock - case p.version == eth63: - p.td, p.head = status63.TD, status63.CurrentBlock - case p.version >= eth64 || p.version >= xdpos22: //include xdpos22 for completeness + case supportsEth64(p.version): p.td, p.head = status.TD, status.Head + case supportsEth63(p.version): + p.td, p.head = status63.TD, status63.CurrentBlock default: panic(fmt.Sprintf("unsupported eth protocol version: %d", p.version)) } diff --git a/eth/protocol.go b/eth/protocol.go index 0e5aea743e94..2011e752b07c 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -42,7 +42,7 @@ func supportsEth63(version int) bool { switch { case version < 63: return false - case version > 63: + case version >= 63: return true default: return false diff --git a/eth/sync.go b/eth/sync.go index 76ad1c2c495a..1ecd45f235e5 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -155,8 +155,10 @@ func (pm *ProtocolManager) syncer() { // Start and ensure cleanup of sync mechanisms pm.blockFetcher.Start() pm.txFetcher.Start() + pm.bft.Start() defer pm.blockFetcher.Stop() defer pm.txFetcher.Stop() + defer pm.bft.Stop() defer pm.downloader.Terminate() // Wait for different events to fire synchronisation operations