Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Network start/stop #1313

Merged
merged 5 commits into from
Jun 18, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions ethcore/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,31 @@ pub enum SyncMessage {
NewChainHead,
/// A block is ready
BlockVerified,
/// Start network command.
StartNetwork,
/// Stop network command.
StopNetwork,
}

/// IO Message type used for Network service
pub type NetSyncMessage = NetworkIoMessage<SyncMessage>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably not be call NetSyncMessage, but something more generic...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do later, this requires refactoring anyway


/// Client service setup. Creates and registers client and network services with the IO subsystem.
pub struct ClientService {
net_service: NetworkService<SyncMessage>,
net_service: Arc<NetworkService<SyncMessage>>,
client: Arc<Client>,
panic_handler: Arc<PanicHandler>
}

impl ClientService {
/// Start the service in a separate thread.
pub fn start(config: ClientConfig, spec: Spec, net_config: NetworkConfiguration, db_path: &Path, miner: Arc<Miner>) -> Result<ClientService, Error> {
pub fn start(config: ClientConfig, spec: Spec, net_config: NetworkConfiguration, db_path: &Path, miner: Arc<Miner>, enable_network: bool) -> Result<ClientService, Error> {
let panic_handler = PanicHandler::new_in_arc();
let mut net_service = try!(NetworkService::start(net_config));
let net_service = try!(NetworkService::new(net_config));
panic_handler.forward_from(&net_service);
if enable_network {
try!(net_service.start());
}

info!("Starting {}", net_service.host_info());
info!("Configured for {} using {:?} engine", spec.name, spec.engine.name());
Expand All @@ -70,7 +77,7 @@ impl ClientService {
try!(net_service.io().register_handler(client_io));

Ok(ClientService {
net_service: net_service,
net_service: Arc::new(net_service),
client: client,
panic_handler: panic_handler,
})
Expand All @@ -82,8 +89,8 @@ impl ClientService {
}

/// Get general IO interface
pub fn io(&mut self) -> &mut IoService<NetSyncMessage> {
self.net_service.io()
pub fn register_io_handler(&self, handler: Arc<IoHandler<NetSyncMessage> + Send>) -> Result<(), IoError> {
self.net_service.io().register_handler(handler)
}

/// Get client interface
Expand All @@ -92,8 +99,8 @@ impl ClientService {
}

/// Get network service component
pub fn network(&mut self) -> &mut NetworkService<SyncMessage> {
&mut self.net_service
pub fn network(&mut self) -> Arc<NetworkService<SyncMessage>> {
self.net_service.clone()
}
}

Expand Down Expand Up @@ -149,7 +156,7 @@ mod tests {
fn it_can_be_started() {
let spec = get_test_spec();
let temp_path = RandomTempPath::new();
let service = ClientService::start(ClientConfig::default(), spec, NetworkConfiguration::new_local(), &temp_path.as_path(), Arc::new(Miner::default()));
let service = ClientService::start(ClientConfig::default(), spec, NetworkConfiguration::new_local(), &temp_path.as_path(), Arc::new(Miner::default()), false);
assert!(service.is_ok());
}
}
2 changes: 2 additions & 0 deletions parity/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Account Options:
--no-import-keys Do not import keys from legacy clients.

Networking Options:
--no-network Disable p2p networking.
--port PORT Override the port on which the node should listen
[default: 30303].
--peers NUM Try to maintain that many peers [default: 25].
Expand Down Expand Up @@ -266,6 +267,7 @@ pub struct Args {
pub flag_format: Option<String>,
pub flag_jitvm: bool,
pub flag_no_color: bool,
pub flag_no_network: bool,
// legacy...
pub flag_geth: bool,
pub flag_nodekey: Option<String>,
Expand Down
20 changes: 18 additions & 2 deletions parity/io_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

use std::sync::Arc;
use ethcore::client::Client;
use ethcore::service::NetSyncMessage;
use ethcore::service::{NetSyncMessage, SyncMessage};
use ethsync::EthSync;
use util::keys::store::AccountService;
use util::{TimerToken, IoHandler, IoContext};
use util::{TimerToken, IoHandler, IoContext, NetworkService, NetworkIoMessage};

use informant::Informant;

Expand All @@ -33,6 +33,7 @@ pub struct ClientIoHandler {
pub sync: Arc<EthSync>,
pub accounts: Arc<AccountService>,
pub info: Informant,
pub network: Arc<NetworkService<SyncMessage>>,
}

impl IoHandler<NetSyncMessage> for ClientIoHandler {
Expand All @@ -48,6 +49,21 @@ impl IoHandler<NetSyncMessage> for ClientIoHandler {
_ => {}
}
}

fn message(&self, _io: &IoContext<NetSyncMessage>, message: &NetSyncMessage) {
match *message {
NetworkIoMessage::User(SyncMessage::StartNetwork) => {
info!("Starting network");
self.network.start().unwrap_or_else(|e| warn!("Error starting network: {:?}", e));
EthSync::register(&*self.network, self.sync.clone()).unwrap_or_else(|e| warn!("Error registering eth protocol handler: {}", e));
},
NetworkIoMessage::User(SyncMessage::StopNetwork) => {
info!("Stopping network");
self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e));
},
_ => {/* Ignore other messages */},
}
}
}


14 changes: 8 additions & 6 deletions parity/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ use std::thread::sleep;
use std::time::Duration;
use rustc_serialize::hex::FromHex;
use ctrlc::CtrlC;
use util::{H256, ToPretty, NetworkConfiguration, PayloadInfo, Bytes};
use util::{H256, ToPretty, NetworkConfiguration, PayloadInfo, Bytes, UtilError};
use util::panics::{MayPanic, ForwardPanic, PanicHandler};
use ethcore::client::{BlockID, BlockChainClient, ClientConfig, get_db_path};
use ethcore::error::{Error, ImportError};
Expand Down Expand Up @@ -198,7 +198,7 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig)

// Build client
let mut service = ClientService::start(
client_config, spec, net_settings, Path::new(&conf.path()), miner.clone()
client_config, spec, net_settings, Path::new(&conf.path()), miner.clone(), !conf.args.flag_no_network
).unwrap_or_else(|e| die_with_error("Client", e));

panic_handler.forward_from(&service);
Expand All @@ -208,7 +208,8 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig)
let network_settings = Arc::new(conf.network_settings());

// Sync
let sync = EthSync::register(service.network(), sync_config, client.clone());
let sync = EthSync::new(sync_config, client.clone());
EthSync::register(&*service.network(), sync.clone()).unwrap_or_else(|e| die_with_error("Error registering eth protocol handler", UtilError::from(e).into()));

let deps_for_rpc_apis = Arc::new(rpc_apis::Dependencies {
signer_port: conf.signer_port(),
Expand Down Expand Up @@ -269,8 +270,9 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig)
info: Informant::new(conf.have_color()),
sync: sync.clone(),
accounts: account_service.clone(),
network: service.network(),
});
service.io().register_handler(io_handler).expect("Error registering IO handler");
service.register_io_handler(io_handler).expect("Error registering IO handler");

// Handle exit
wait_for_exit(panic_handler, rpc_server, dapps_server, signer_server);
Expand Down Expand Up @@ -309,7 +311,7 @@ fn execute_export(conf: Configuration) {

// Build client
let service = ClientService::start(
client_config, spec, net_settings, Path::new(&conf.path()), Arc::new(Miner::default()),
client_config, spec, net_settings, Path::new(&conf.path()), Arc::new(Miner::default()), false
).unwrap_or_else(|e| die_with_error("Client", e));

panic_handler.forward_from(&service);
Expand Down Expand Up @@ -380,7 +382,7 @@ fn execute_import(conf: Configuration) {

// Build client
let service = ClientService::start(
client_config, spec, net_settings, Path::new(&conf.path()), Arc::new(Miner::default()),
client_config, spec, net_settings, Path::new(&conf.path()), Arc::new(Miner::default()), false
).unwrap_or_else(|e| die_with_error("Client", e));

panic_handler.forward_from(&service);
Expand Down
10 changes: 10 additions & 0 deletions rpc/src/v1/impls/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,14 @@ impl<S> Net for NetClient<S> where S: SyncProvider + 'static {
// right now (11 march 2016), we are always listening for incoming connections
Ok(Value::Bool(true))
}

fn start_network(&self, _: Params) -> Result<Value, Error> {
take_weak!(self.sync).start_network();
Ok(Value::Bool(true))
}

fn stop_network(&self, _: Params) -> Result<Value, Error> {
take_weak!(self.sync).stop_network();
Ok(Value::Bool(true))
}
}
6 changes: 6 additions & 0 deletions rpc/src/v1/tests/helpers/sync_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,11 @@ impl SyncProvider for TestSyncProvider {
fn status(&self) -> SyncStatus {
self.status.read().unwrap().clone()
}

fn start_network(&self) {
}

fn stop_network(&self) {
}
}

6 changes: 6 additions & 0 deletions rpc/src/v1/traits/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ pub trait Net: Sized + Send + Sync + 'static {
/// Otherwise false.
fn is_listening(&self, _: Params) -> Result<Value, Error>;

/// Start the network.
fn start_network(&self, _: Params) -> Result<Value, Error>;

/// Stop the network.
fn stop_network(&self, _: Params) -> Result<Value, Error>;

/// Should be used to convert object to io delegate.
fn to_delegate(self) -> IoDelegate<Self> {
let mut delegate = IoDelegate::new(Arc::new(self));
Expand Down
4 changes: 4 additions & 0 deletions sync/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,10 @@ impl ChainSync {
};

trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis);
if io.is_expired() {
trace!("Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id));
return Ok(());
}

if self.peers.contains_key(&peer_id) {
warn!("Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id));
Expand Down
6 changes: 6 additions & 0 deletions sync/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub trait SyncIo {
fn is_chain_queue_empty(&self) -> bool {
self.chain().queue_info().is_empty()
}
/// Check if the session is expired
fn is_expired(&self) -> bool;
}

/// Wraps `NetworkContext` and the blockchain client
Expand Down Expand Up @@ -83,6 +85,10 @@ impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> {
fn peer_info(&self, peer_id: PeerId) -> String {
self.network.peer_info(peer_id)
}

fn is_expired(&self) -> bool {
self.network.is_expired()
}
}


36 changes: 30 additions & 6 deletions sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@
//! use ethcore::miner::Miner;
//!
//! fn main() {
//! let mut service = NetworkService::start(NetworkConfiguration::new()).unwrap();
//! let mut service = NetworkService::new(NetworkConfiguration::new()).unwrap();
//! service.start().unwrap();
//! let dir = env::temp_dir();
//! let client = Client::new(ClientConfig::default(), ethereum::new_frontier(), &dir, Arc::new(Miner::default()), service.io().channel()).unwrap();
//! let miner = Miner::new(false, ethereum::new_frontier());
//! EthSync::register(&mut service, SyncConfig::default(), client);
//! let sync = EthSync::new(SyncConfig::default(), client);
//! EthSync::register(&mut service, sync);
//! }
//! ```

Expand All @@ -66,8 +68,10 @@ use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, Peer
use util::TimerToken;
use util::{U256, ONE_U256};
use ethcore::client::Client;
use ethcore::service::SyncMessage;
use ethcore::service::{SyncMessage, NetSyncMessage};
use io::NetSyncIo;
use util::io::IoChannel;
use util::{NetworkIoMessage, NetworkError};
use chain::ChainSync;

mod chain;
Expand Down Expand Up @@ -98,30 +102,41 @@ impl Default for SyncConfig {
pub trait SyncProvider: Send + Sync {
/// Get sync status
fn status(&self) -> SyncStatus;
/// Start the network
fn start_network(&self);
/// Stop the network
fn stop_network(&self);
}

/// Ethereum network protocol handler
pub struct EthSync {
/// Shared blockchain client. TODO: this should evetually become an IPC endpoint
chain: Arc<Client>,
/// Sync strategy
sync: RwLock<ChainSync>
sync: RwLock<ChainSync>,
/// IO communication chnnel.
io_channel: RwLock<IoChannel<NetSyncMessage>>,
}

pub use self::chain::{SyncStatus, SyncState};

impl EthSync {
/// Creates and register protocol with the network service
pub fn register(service: &mut NetworkService<SyncMessage>, config: SyncConfig, chain: Arc<Client>) -> Arc<EthSync> {
pub fn new(config: SyncConfig, chain: Arc<Client>) -> Arc<EthSync> {
let sync = ChainSync::new(config, chain.deref());
let sync = Arc::new(EthSync {
chain: chain,
sync: RwLock::new(sync),
io_channel: RwLock::new(IoChannel::disconnected()),
});
service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler");
sync
}

/// Register protocol with the network service
pub fn register(service: &NetworkService<SyncMessage>, sync: Arc<EthSync>) -> Result<(), NetworkError> {
service.register_protocol(sync.clone(), "eth", &[62u8, 63u8])
}

/// Stop sync
pub fn stop(&mut self, io: &mut NetworkContext<SyncMessage>) {
self.sync.write().unwrap().abort(&mut NetSyncIo::new(io, self.chain.deref()));
Expand All @@ -138,11 +153,20 @@ impl SyncProvider for EthSync {
fn status(&self) -> SyncStatus {
self.sync.read().unwrap().status()
}

fn start_network(&self) {
self.io_channel.read().unwrap().send(NetworkIoMessage::User(SyncMessage::StartNetwork)).expect("Error sending IO notification");
}

fn stop_network(&self) {
self.io_channel.read().unwrap().send(NetworkIoMessage::User(SyncMessage::StopNetwork)).expect("Error sending IO notification");
}
}

impl NetworkProtocolHandler<SyncMessage> for EthSync {
fn initialize(&self, io: &NetworkContext<SyncMessage>) {
io.register_timer(0, 1000).expect("Error registering sync timer");
*self.io_channel.write().unwrap() = io.io_channel();
}

fn read(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId, packet_id: u8, data: &[u8]) {
Expand Down
10 changes: 10 additions & 0 deletions sync/src/tests/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,13 @@ fn restart_on_broken_chain() {

assert_eq!(net.peer(0).chain.chain_info().best_block_number, 5);
}

#[test]
fn high_td_attach() {
let mut net = TestNet::new(2);
net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle);
net.peer_mut(1).chain.corrupt_block_parent(6);
net.sync_steps(20);

assert_eq!(net.peer(0).chain.chain_info().best_block_number, 5);
}
4 changes: 4 additions & 0 deletions sync/src/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ impl<'p> SyncIo for TestIo<'p> {
fn disconnect_peer(&mut self, _peer_id: PeerId) {
}

fn is_expired(&self) -> bool {
false
}

fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
self.queue.push_back(TestPacket {
data: data,
Expand Down
2 changes: 1 addition & 1 deletion util/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ mod tests {

#[test]
fn test_service_register_handler () {
let mut service = IoService::<MyMessage>::start().expect("Error creating network service");
let service = IoService::<MyMessage>::start().expect("Error creating network service");
service.register_handler(Arc::new(MyHandler)).unwrap();
}

Expand Down
Loading