Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Disconnect peers on a fork
Browse files Browse the repository at this point in the history
  • Loading branch information
arkpar committed Jul 27, 2016
1 parent 4cb4344 commit 005db35
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 16 deletions.
4 changes: 3 additions & 1 deletion ethcore/res/ethereum/classic.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
"accountStartNonce": "0x00",
"maximumExtraDataSize": "0x20",
"minGasLimit": "0x1388",
"networkID" : "0x1"
"networkID" : "0x1",
"forkBlock": "0x1d4c00",
"forkCanonHash": "0x94365e3a8c0b35089c1d1195081fe7489b528a84b22199c916180db8b28ade7f"
},
"genesis": {
"seal": {
Expand Down
4 changes: 3 additions & 1 deletion ethcore/res/ethereum/frontier.json
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@
"accountStartNonce": "0x00",
"maximumExtraDataSize": "0x20",
"minGasLimit": "0x1388",
"networkID" : "0x1"
"networkID" : "0x1",
"forkBlock": "0x1d4c00",
"forkCanonHash": "0x4985f5ca3d2afbec36529aa96f74de3cc10a2a4a6c44f2157a57d2c6059a11bb"
},
"genesis": {
"seal": {
Expand Down
6 changes: 6 additions & 0 deletions ethcore/src/spec/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub struct CommonParams {
pub network_id: U256,
/// Minimum gas limit.
pub min_gas_limit: U256,
/// Fork block to check.
pub fork_block: Option<(BlockNumber, H256)>,
}

impl From<ethjson::spec::Params> for CommonParams {
Expand All @@ -47,6 +49,7 @@ impl From<ethjson::spec::Params> for CommonParams {
maximum_extra_data_size: p.maximum_extra_data_size.into(),
network_id: p.network_id.into(),
min_gas_limit: p.min_gas_limit.into(),
fork_block: if let (Some(n), Some(h)) = (p.fork_block, p.fork_hash) { Some((n.into(), h.into())) } else { None },
}
}
}
Expand Down Expand Up @@ -151,6 +154,9 @@ impl Spec {
/// Get the configured Network ID.
pub fn network_id(&self) -> U256 { self.params.network_id }

/// Get the configured network fork block.
pub fn fork_block(&self) -> Option<(BlockNumber, H256)> { self.params.fork_block }

/// Get the header of the genesis block.
pub fn genesis_header(&self) -> Header {
Header {
Expand Down
7 changes: 7 additions & 0 deletions json/src/spec/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//! Spec params deserialization.
use uint::Uint;
use hash::H256;

/// Spec params.
#[derive(Debug, PartialEq, Deserialize)]
Expand All @@ -33,6 +34,12 @@ pub struct Params {
/// Minimum gas limit.
#[serde(rename="minGasLimit")]
pub min_gas_limit: Uint,
/// Option fork block number to check.
#[serde(rename="forkBlock")]
pub fork_block: Option<Uint>,
/// Expected fork block hash.
#[serde(rename="forkCanonHash")]
pub fork_hash: Option<H256>,
}

#[cfg(test)]
Expand Down
2 changes: 2 additions & 0 deletions json/src/spec/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ mod tests {
"maximumExtraDataSize": "0x20",
"minGasLimit": "0x1388",
"networkID" : "0x2"
"forkBlock": "0xffffffffffffffff",
"forkCanonHash": "0x0000000000000000000000000000000000000000000000000000000000000000",
},
"genesis": {
"seal": {
Expand Down
1 change: 1 addition & 0 deletions parity/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ pub fn execute(cmd: RunCmd) -> Result<(), String> {
Some(id) => id,
None => spec.network_id(),
};
sync_config.fork_block = spec.fork_block().clone();

// prepare account provider
let account_provider = Arc::new(try!(prepare_account_provider(&cmd.dirs, cmd.acc_conf)));
Expand Down
4 changes: 4 additions & 0 deletions sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, Peer
NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode};
use util::{TimerToken, U256, H256, UtilError, Secret, Populatable};
use ethcore::client::{BlockChainClient, ChainNotify};
use ethcore::header::BlockNumber;
use io::NetSyncIo;
use chain::{ChainSync, SyncStatus};
use std::net::{SocketAddr, AddrParseError};
Expand All @@ -38,13 +39,16 @@ pub struct SyncConfig {
pub max_download_ahead_blocks: usize,
/// Network ID
pub network_id: U256,
/// Fork block to check
pub fork_block: Option<(BlockNumber, H256)>,
}

impl Default for SyncConfig {
fn default() -> SyncConfig {
SyncConfig {
max_download_ahead_blocks: 20000,
network_id: U256::from(1),
fork_block: None,
}
}
}
Expand Down
80 changes: 68 additions & 12 deletions sync/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ const RECEIPTS_PACKET: u8 = 0x10;

const HEADERS_TIMEOUT_SEC: f64 = 15f64;
const BODIES_TIMEOUT_SEC: f64 = 5f64;
const FORK_HEADER_TIMEOUT_SEC: f64 = 3f64;

#[derive(Copy, Clone, Eq, PartialEq, Debug)]
/// Sync state
Expand Down Expand Up @@ -191,6 +192,7 @@ impl SyncStatus {
/// Peer data type requested
enum PeerAsking {
Nothing,
ForkHeader,
BlockHeaders,
BlockBodies,
Heads,
Expand Down Expand Up @@ -221,6 +223,14 @@ struct PeerInfo {
ask_time: f64,
/// Pending request is expird and result should be ignored
expired: bool,
/// Peer fork confirmed
confirmed: bool,
}

impl PeerInfo {
fn is_available(&self) -> bool {
self.confirmed && !self.expired
}
}

/// Blockchain sync handler.
Expand Down Expand Up @@ -254,6 +264,8 @@ pub struct ChainSync {
round_parents: VecDeque<(H256, H256)>,
/// Network ID
network_id: U256,
/// Optional fork block to check
fork_block: Option<(BlockNumber, H256)>,
}

type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
Expand All @@ -277,6 +289,7 @@ impl ChainSync {
round_parents: VecDeque::new(),
_max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
network_id: config.network_id,
fork_block: config.fork_block,
};
sync.reset();
sync
Expand All @@ -293,8 +306,8 @@ impl ChainSync {
highest_block_number: self.highest_block.map(|n| max(n, self.last_imported_block)),
blocks_received: if self.last_imported_block > self.starting_block { self.last_imported_block - self.starting_block } else { 0 },
blocks_total: match self.highest_block { Some(x) if x > self.starting_block => x - self.starting_block, _ => 0 },
num_peers: self.peers.len(),
num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(),
num_peers: self.peers.values().filter(|p| p.confirmed).count(),
num_active_peers: self.peers.values().filter(|p| p.confirmed && p.asking != PeerAsking::Nothing).count(),
mem_used:
self.blocks.heap_size()
+ self.peers.heap_size_of_children()
Expand All @@ -316,7 +329,7 @@ impl ChainSync {
p.asking_blocks.clear();
p.asking_hash = None;
// mark any pending requests as expired
if p.asking != PeerAsking::Nothing {
if p.asking != PeerAsking::Nothing && p.confirmed {
p.expired = true;
}
}
Expand Down Expand Up @@ -370,6 +383,7 @@ impl ChainSync {
asking_hash: None,
ask_time: 0f64,
expired: false,
confirmed: self.fork_block.is_none(),
};

trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis);
Expand Down Expand Up @@ -397,16 +411,41 @@ impl ChainSync {
self.peers.insert(peer_id.clone(), peer);
self.active_peers.insert(peer_id.clone());
debug!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id));
self.sync_peer(io, peer_id, false);
if let Some((fork_block, _)) = self.fork_block {
self.request_headers_by_number(io, peer_id, fork_block, 1, 0, false, PeerAsking::ForkHeader);
} else {
self.sync_peer(io, peer_id, false);
}
Ok(())
}

#[cfg_attr(feature="dev", allow(cyclomatic_complexity))]
/// Called by peer once it has new block headers during sync
fn on_peer_block_headers(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
let confirmed = match self.peers.get_mut(&peer_id) {
Some(ref mut peer) if peer.asking == PeerAsking::ForkHeader => {
let item_count = r.item_count();
if item_count == 0 || (item_count == 1 && try!(r.at(0)).as_raw().sha3() == self.fork_block.unwrap().1) {
trace!(target: "sync", "{}: Confirmed peer", peer_id);
peer.asking = PeerAsking::Nothing;
peer.confirmed = true;
true
} else {
trace!(target: "sync", "{}: Fork mismatch", peer_id);
io.disconnect_peer(peer_id);
false
}
},
_ => false,
};
if confirmed {
self.sync_peer(io, peer_id, false);
return Ok(());
}

self.clear_peer_download(peer_id);
let expected_hash = self.peers.get(&peer_id).and_then(|p| p.asking_hash);
let expected_asking = if self.state == SyncState::ChainHead { PeerAsking::Heads } else { PeerAsking::BlockHeaders };
let expected_hash = self.peers.get(&peer_id).and_then(|p| p.asking_hash);
if !self.reset_peer_asking(peer_id, expected_asking) || expected_hash.is_none() {
trace!(target: "sync", "{}: Ignored unexpected headers", peer_id);
self.continue_sync(io);
Expand Down Expand Up @@ -474,14 +513,14 @@ impl ChainSync {

// Disable the peer for this syncing round if it gives invalid chain
if !valid_response {
trace!(target: "sync", "{} Deactivated for invalid headers response", peer_id);
self.deactivate_peer(io, peer_id);
trace!(target: "sync", "{} Disabled for invalid headers response", peer_id);
io.disable_peer(peer_id);
}

if headers.is_empty() {
// Peer does not have any new subchain heads, deactivate it nd try with another
trace!(target: "sync", "{} Deactivated for no data", peer_id);
self.deactivate_peer(io, peer_id);
trace!(target: "sync", "{} Disabled for no data", peer_id);
io.disable_peer(peer_id);
}
match self.state {
SyncState::ChainHead => {
Expand Down Expand Up @@ -692,15 +731,16 @@ impl ChainSync {

/// Resume downloading
fn continue_sync(&mut self, io: &mut SyncIo) {
let mut peers: Vec<(PeerId, U256)> = self.peers.iter().map(|(k, p)| (*k, p.difficulty.unwrap_or_else(U256::zero))).collect();
let mut peers: Vec<(PeerId, U256)> = self.peers.iter().filter_map(|(k, p)|
if p.is_available() { Some((*k, p.difficulty.unwrap_or_else(U256::zero))) } else { None }).collect();
thread_rng().shuffle(&mut peers); //TODO: sort by rating
trace!(target: "sync", "Syncing with {}/{} peers", self.active_peers.len(), peers.len());
for (p, _) in peers {
if self.active_peers.contains(&p) {
self.sync_peer(io, p, false);
}
}
if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && !p.expired) {
if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.is_available()) {
self.complete_sync();
}
}
Expand All @@ -726,7 +766,7 @@ impl ChainSync {
}
let (peer_latest, peer_difficulty) = {
let peer = self.peers.get_mut(&peer_id).unwrap();
if peer.asking != PeerAsking::Nothing {
if peer.asking != PeerAsking::Nothing || !peer.is_available() {
return;
}
if self.state == SyncState::Waiting {
Expand Down Expand Up @@ -924,6 +964,17 @@ impl ChainSync {
.asking_hash = Some(h.clone());
}

/// Request headers from a peer by block number
#[cfg_attr(feature="dev", allow(too_many_arguments))]
fn request_headers_by_number(&mut self, sync: &mut SyncIo, peer_id: PeerId, n: BlockNumber, count: usize, skip: usize, reverse: bool, asking: PeerAsking) {
trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}", peer_id, count, n);
let mut rlp = RlpStream::new_list(4);
rlp.append(&n);
rlp.append(&count);
rlp.append(&skip);
rlp.append(&if reverse {1u32} else {0u32});
self.send_request(sync, peer_id, asking, GET_BLOCK_HEADERS_PACKET, rlp.out());
}
/// Request block bodies from a peer
fn request_bodies(&mut self, sync: &mut SyncIo, peer_id: PeerId, hashes: Vec<H256>) {
let mut rlp = RlpStream::new_list(hashes.len());
Expand Down Expand Up @@ -977,6 +1028,9 @@ impl ChainSync {
if !io.is_chain_queue_empty() {
return Ok(());
}
if self.peers.get(&peer_id).map_or(false, |p| p.confirmed) {
trace!(target: "sync", "{} Ignoring transactions from unconfirmed/unknown peer", peer_id);
}

let mut item_count = r.item_count();
trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count);
Expand Down Expand Up @@ -1212,6 +1266,7 @@ impl ChainSync {
PeerAsking::BlockHeaders | PeerAsking::Heads => (tick - peer.ask_time) > HEADERS_TIMEOUT_SEC,
PeerAsking::BlockBodies => (tick - peer.ask_time) > BODIES_TIMEOUT_SEC,
PeerAsking::Nothing => false,
PeerAsking::ForkHeader => (tick - peer.ask_time) > FORK_HEADER_TIMEOUT_SEC,
};
if timeout {
trace!(target:"sync", "Timeout {}", peer_id);
Expand Down Expand Up @@ -1629,6 +1684,7 @@ mod tests {
asking_hash: None,
ask_time: 0f64,
expired: false,
confirmed: false,
});
sync
}
Expand Down
21 changes: 20 additions & 1 deletion sync/src/tests/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use util::*;
use ethcore::client::{BlockChainClient, BlockID, EachBlockWith};
use ethcore::client::{TestBlockChainClient, BlockChainClient, BlockID, EachBlockWith};
use chain::{SyncState};
use super::helpers::*;

Expand Down Expand Up @@ -95,6 +95,25 @@ fn forked() {
assert_eq!(net.peer(2).chain.numbers.read().deref(), &peer1_chain);
}

#[test]
fn net_hard_fork() {
::env_logger::init().ok();
let ref_client = TestBlockChainClient::new();
ref_client.add_blocks(50, EachBlockWith::Uncle);
{
let mut net = TestNet::new_with_fork(2, Some((50, ref_client.block_hash(BlockID::Number(50)).unwrap())));
net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Uncle);
net.sync();
assert_eq!(net.peer(1).chain.chain_info().best_block_number, 100);
}
{
let mut net = TestNet::new_with_fork(2, Some((50, ref_client.block_hash(BlockID::Number(50)).unwrap())));
net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Nothing);
net.sync();
assert_eq!(net.peer(1).chain.chain_info().best_block_number, 0);
}
}

#[test]
fn restart() {
let mut net = TestNet::new(3);
Expand Down
9 changes: 8 additions & 1 deletion sync/src/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use util::*;
use ethcore::client::{TestBlockChainClient, BlockChainClient};
use ethcore::header::BlockNumber;
use io::SyncIo;
use chain::ChainSync;
use ::SyncConfig;
Expand Down Expand Up @@ -89,13 +90,19 @@ pub struct TestNet {

impl TestNet {
pub fn new(n: usize) -> TestNet {
Self::new_with_fork(n, None)
}

pub fn new_with_fork(n: usize, fork: Option<(BlockNumber, H256)>) -> TestNet {
let mut net = TestNet {
peers: Vec::new(),
started: false,
};
for _ in 0..n {
let chain = TestBlockChainClient::new();
let sync = ChainSync::new(SyncConfig::default(), &chain);
let mut config = SyncConfig::default();
config.fork_block = fork;
let sync = ChainSync::new(config, &chain);
net.peers.push(TestPeer {
sync: RwLock::new(sync),
chain: chain,
Expand Down

0 comments on commit 005db35

Please sign in to comment.