Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Warp-only sync with warp-barrier [blocknumber] flag. (#8228)
Browse files Browse the repository at this point in the history
* Warp-only sync with warp-after [blocknumber] flag.

* Fix tests.

* Fix configuration tests.

* Rename to warp barrier.
  • Loading branch information
tomusdrw committed Apr 10, 2018
1 parent 68320e8 commit 71ffb44
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 24 deletions.
7 changes: 7 additions & 0 deletions parity/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,10 @@ usage! {
"--no-serve-light",
"Disable serving of light peers.",

ARG arg_warp_barrier: (Option<u64>) = None, or |c: &Config| c.network.as_ref()?.warp_barrier.clone(),
"--warp-barrier=[NUM]",
"When warp enabled never attempt regular sync before warping to block NUM.",

ARG arg_port: (u16) = 30303u16, or |c: &Config| c.network.as_ref()?.port.clone(),
"--port=[PORT]",
"Override the port on which the node should listen.",
Expand Down Expand Up @@ -1034,6 +1038,7 @@ struct Ui {
#[serde(deny_unknown_fields)]
struct Network {
warp: Option<bool>,
warp_barrier: Option<u64>,
port: Option<u16>,
min_peers: Option<u16>,
max_peers: Option<u16>,
Expand Down Expand Up @@ -1613,6 +1618,7 @@ mod tests {
flag_geth: false,
flag_testnet: false,
flag_import_geth_keys: false,
arg_warp_barrier: None,
arg_datadir: None,
arg_networkid: None,
arg_peers: None,
Expand Down Expand Up @@ -1717,6 +1723,7 @@ mod tests {
}),
network: Some(Network {
warp: Some(false),
warp_barrier: None,
port: None,
min_peers: Some(10),
max_peers: Some(20),
Expand Down
2 changes: 2 additions & 0 deletions parity/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ impl Configuration {
wal: wal,
vm_type: vm_type,
warp_sync: warp_sync,
warp_barrier: self.args.arg_warp_barrier,
public_node: public_node,
geth_compatibility: geth_compatibility,
net_settings: self.network_settings()?,
Expand Down Expand Up @@ -1388,6 +1389,7 @@ mod tests {
network_id: None,
public_node: false,
warp_sync: true,
warp_barrier: None,
acc_conf: Default::default(),
gas_pricer_conf: Default::default(),
miner_extras: Default::default(),
Expand Down
9 changes: 7 additions & 2 deletions parity/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ pub struct RunCmd {
pub net_conf: ethsync::NetworkConfiguration,
pub network_id: Option<u64>,
pub warp_sync: bool,
pub warp_barrier: Option<u64>,
pub public_node: bool,
pub acc_conf: AccountsConfig,
pub gas_pricer_conf: GasPricerConfig,
Expand Down Expand Up @@ -497,7 +498,7 @@ pub fn execute_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>)
}

sync_config.fork_block = spec.fork_block();
let mut warp_sync = cmd.warp_sync;
let mut warp_sync = spec.engine.supports_warp() && cmd.warp_sync;
if warp_sync {
// Logging is not initialized yet, so we print directly to stderr
if fat_db {
Expand All @@ -511,7 +512,11 @@ pub fn execute_impl(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>)
warp_sync = false;
}
}
sync_config.warp_sync = spec.engine.supports_warp() && warp_sync;
sync_config.warp_sync = match (warp_sync, cmd.warp_barrier) {
(true, Some(block)) => ethsync::WarpSync::OnlyAndAfter(block),
(true, _) => ethsync::WarpSync::Enabled,
_ => ethsync::WarpSync::Disabled,
};
sync_config.download_old_blocks = cmd.download_old_blocks;
sync_config.serve_light = cmd.serve_light;

Expand Down
39 changes: 37 additions & 2 deletions sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,41 @@ pub const ETH_PROTOCOL: ProtocolId = *b"eth";
/// Ethereum light protocol
pub const LIGHT_PROTOCOL: ProtocolId = *b"pip";

/// Determine warp sync status.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WarpSync {
/// Warp sync is enabled.
Enabled,
/// Warp sync is disabled.
Disabled,
/// Only warp sync is allowed (no regular sync) and only after given block number.
OnlyAndAfter(BlockNumber),
}

impl WarpSync {
/// Returns true if warp sync is enabled.
pub fn is_enabled(&self) -> bool {
match *self {
WarpSync::Enabled => true,
WarpSync::OnlyAndAfter(_) => true,
WarpSync::Disabled => false,
}
}

/// Returns `true` if we are in warp-only mode.
///
/// i.e. we will never fall back to regular sync
/// until given block number is reached by
/// successfuly finding and restoring from a snapshot.
pub fn is_warp_only(&self) -> bool {
if let WarpSync::OnlyAndAfter(_) = *self {
true
} else {
false
}
}
}

/// Sync configuration
#[derive(Debug, Clone, Copy)]
pub struct SyncConfig {
Expand All @@ -60,7 +95,7 @@ pub struct SyncConfig {
/// Fork block to check
pub fork_block: Option<(BlockNumber, H256)>,
/// Enable snapshot sync
pub warp_sync: bool,
pub warp_sync: WarpSync,
/// Enable light client server.
pub serve_light: bool,
}
Expand All @@ -74,7 +109,7 @@ impl Default for SyncConfig {
subprotocol_name: ETH_PROTOCOL,
light_subprotocol_name: LIGHT_PROTOCOL,
fork_block: None,
warp_sync: false,
warp_sync: WarpSync::Disabled,
serve_light: false,
}
}
Expand Down
53 changes: 39 additions & 14 deletions sync/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ use ethcore::snapshot::{ManifestData, RestorationStatus};
use transaction::PendingTransaction;
use sync_io::SyncIo;
use time;
use super::SyncConfig;
use super::{WarpSync, SyncConfig};
use block_sync::{BlockDownloader, BlockRequest, BlockDownloaderImportError as DownloaderImportError, DownloadAction};
use rand::Rng;
use snapshot::{Snapshot, ChunkType};
Expand Down Expand Up @@ -385,7 +385,7 @@ pub struct ChainSync {
/// Enable ancient block downloading
download_old_blocks: bool,
/// Enable warp sync.
enable_warp_sync: bool,
warp_sync: WarpSync,
}

type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
Expand All @@ -394,9 +394,16 @@ impl ChainSync {
/// Create a new instance of syncing strategy.
pub fn new(config: SyncConfig, chain: &BlockChainClient) -> ChainSync {
let chain_info = chain.chain_info();
let best_block = chain.chain_info().best_block_number;
let state = match config.warp_sync {
WarpSync::Enabled => SyncState::WaitingPeers,
WarpSync::OnlyAndAfter(block) if block > best_block => SyncState::WaitingPeers,
_ => SyncState::Idle,
};

let mut sync = ChainSync {
state: if config.warp_sync { SyncState::WaitingPeers } else { SyncState::Idle },
starting_block: chain.chain_info().best_block_number,
state,
starting_block: best_block,
highest_block: None,
peers: HashMap::new(),
handshaking_peers: HashMap::new(),
Expand All @@ -410,7 +417,7 @@ impl ChainSync {
snapshot: Snapshot::new(),
sync_start_time: None,
transactions_stats: TransactionsStats::default(),
enable_warp_sync: config.warp_sync,
warp_sync: config.warp_sync,
};
sync.update_targets(chain);
sync
Expand Down Expand Up @@ -508,10 +515,12 @@ impl ChainSync {
}

fn maybe_start_snapshot_sync(&mut self, io: &mut SyncIo) {
if !self.enable_warp_sync || io.snapshot_service().supported_versions().is_none() {
if !self.warp_sync.is_enabled() || io.snapshot_service().supported_versions().is_none() {
trace!(target: "sync", "Skipping warp sync. Disabled or not supported.");
return;
}
if self.state != SyncState::WaitingPeers && self.state != SyncState::Blocks && self.state != SyncState::Waiting {
trace!(target: "sync", "Skipping warp sync. State: {:?}", self.state);
return;
}
// Make sure the snapshot block is not too far away from best block and network best block and
Expand All @@ -520,11 +529,16 @@ impl ChainSync {
let fork_block = self.fork_block.as_ref().map(|&(n, _)| n).unwrap_or(0);

let (best_hash, max_peers, snapshot_peers) = {
let expected_warp_block = match self.warp_sync {
WarpSync::OnlyAndAfter(block) => block,
_ => 0,
};
//collect snapshot infos from peers
let snapshots = self.peers.iter()
.filter(|&(_, p)| p.is_allowed() && p.snapshot_number.map_or(false, |sn|
our_best_block < sn && (sn - our_best_block) > SNAPSHOT_RESTORE_THRESHOLD &&
sn > fork_block &&
sn > expected_warp_block &&
self.highest_block.map_or(true, |highest| highest >= sn && (highest - sn) <= SNAPSHOT_RESTORE_THRESHOLD)
))
.filter_map(|(p, peer)| peer.snapshot_hash.map(|hash| (p, hash.clone())))
Expand Down Expand Up @@ -554,7 +568,7 @@ impl ChainSync {
trace!(target: "sync", "Starting unconfirmed snapshot sync {:?} with {:?}", hash, peers);
self.start_snapshot_sync(io, peers);
}
} else if timeout {
} else if timeout && !self.warp_sync.is_warp_only() {
trace!(target: "sync", "No snapshots found, starting full sync");
self.state = SyncState::Idle;
self.continue_sync(io);
Expand Down Expand Up @@ -626,10 +640,6 @@ impl ChainSync {
block_set: None,
};

if self.sync_start_time.is_none() {
self.sync_start_time = Some(time::precise_time_ns());
}

trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{}, snapshot:{:?})",
peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis, peer.snapshot_number);
if io.is_expired() {
Expand Down Expand Up @@ -658,6 +668,10 @@ impl ChainSync {
return Ok(());
}

if self.sync_start_time.is_none() {
self.sync_start_time = Some(time::precise_time_ns());
}

self.peers.insert(peer_id.clone(), peer);
// Don't activate peer immediatelly when searching for common block.
// Let the current sync round complete first.
Expand Down Expand Up @@ -1167,9 +1181,14 @@ impl ChainSync {
self.sync_peer(io, p, false);
}
}
if (self.state != SyncState::WaitingPeers && self.state != SyncState::SnapshotWaiting && self.state != SyncState::Waiting && self.state != SyncState::Idle)
&& !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.block_set != Some(BlockSet::OldBlocks) && p.can_sync()) {

if
self.state != SyncState::WaitingPeers &&
self.state != SyncState::SnapshotWaiting &&
self.state != SyncState::Waiting &&
self.state != SyncState::Idle &&
!self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.block_set != Some(BlockSet::OldBlocks) && p.can_sync())
{
self.complete_sync(io);
}
}
Expand Down Expand Up @@ -1220,7 +1239,13 @@ impl ChainSync {
if force || higher_difficulty || self.old_blocks.is_some() {
match self.state {
SyncState::WaitingPeers => {
trace!(target: "sync", "Checking snapshot sync: {} vs {}", peer_snapshot_number, chain_info.best_block_number);
trace!(
target: "sync",
"Checking snapshot sync: {} vs {} (peer: {})",
peer_snapshot_number,
chain_info.best_block_number,
peer_id
);
self.maybe_start_snapshot_sync(io);
},
SyncState::Idle | SyncState::Blocks | SyncState::NewBlocks => {
Expand Down
4 changes: 2 additions & 2 deletions sync/src/tests/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;
use ethcore::client::{TestBlockChainClient, BlockChainClient, BlockId, EachBlockWith};
use chain::{SyncState};
use super::helpers::*;
use SyncConfig;
use {SyncConfig, WarpSync};

#[test]
fn two_peers() {
Expand Down Expand Up @@ -161,7 +161,7 @@ fn status_empty() {
let net = TestNet::new(2);
assert_eq!(net.peer(0).sync.read().status().state, SyncState::Idle);
let mut config = SyncConfig::default();
config.warp_sync = true;
config.warp_sync = WarpSync::Enabled;
let net = TestNet::new_with_config(2, config);
assert_eq!(net.peer(0).sync.read().status().state, SyncState::WaitingPeers);
}
Expand Down
4 changes: 2 additions & 2 deletions sync/src/tests/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use ethcore::snapshot::{SnapshotService, ManifestData, RestorationStatus};
use ethcore::header::BlockNumber;
use ethcore::client::{EachBlockWith};
use super::helpers::*;
use SyncConfig;
use {SyncConfig, WarpSync};

pub struct TestSnapshotService {
manifest: Option<ManifestData>,
Expand Down Expand Up @@ -127,7 +127,7 @@ impl SnapshotService for TestSnapshotService {
fn snapshot_sync() {
::env_logger::init().ok();
let mut config = SyncConfig::default();
config.warp_sync = true;
config.warp_sync = WarpSync::Enabled;
let mut net = TestNet::new_with_config(5, config);
let snapshot_service = Arc::new(TestSnapshotService::new_with_snapshot(16, H256::new(), 500000));
for i in 0..4 {
Expand Down
5 changes: 3 additions & 2 deletions util/network/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,11 +732,11 @@ impl Host {
};
match TcpStream::connect(&address) {
Ok(socket) => {
trace!(target: "network", "Connecting to {:?}", address);
trace!(target: "network", "{}: Connecting to {:?}", id, address);
socket
},
Err(e) => {
debug!(target: "network", "Can't connect to address {:?}: {:?}", address, e);
debug!(target: "network", "{}: Can't connect to address {:?}: {:?}", id, address, e);
return;
}
}
Expand All @@ -752,6 +752,7 @@ impl Host {
let mut sessions = self.sessions.write();

let token = sessions.insert_with_opt(|token| {
trace!(target: "network", "{}: Initiating session {:?}", token, id);
match Session::new(io, socket, token, id, &nonce, self.stats.clone(), &self.info.read()) {
Ok(s) => Some(Arc::new(Mutex::new(s))),
Err(e) => {
Expand Down

0 comments on commit 71ffb44

Please sign in to comment.