diff --git a/cov.sh b/cov.sh index 9f2a87a4752..371746a3939 100755 --- a/cov.sh +++ b/cov.sh @@ -17,5 +17,5 @@ fi cargo test --no-run || exit $? mkdir -p target/coverage -kcov --exclude-pattern ~/.multirust --include-pattern src --verify target/coverage target/debug/ethcore* +kcov --exclude-pattern ~/.multirust,rocksdb,secp256k1 --include-pattern src --verify target/coverage target/debug/ethcore* xdg-open target/coverage/index.html diff --git a/res/ethereum/tests b/res/ethereum/tests index dc86e635967..e838fd90998 160000 --- a/res/ethereum/tests +++ b/res/ethereum/tests @@ -1 +1 @@ -Subproject commit dc86e6359675440aea59ddb48648a01c799925d8 +Subproject commit e838fd90998fc5502d0b7c9427a4c231f9a6953d diff --git a/src/bin/client/main.rs b/src/bin/client/main.rs index 0194a0a91f5..31bb79d8e90 100644 --- a/src/bin/client/main.rs +++ b/src/bin/client/main.rs @@ -61,13 +61,13 @@ fn main() { setup_log(&args.flag_logging); let spec = ethereum::new_frontier(); - let init_nodes = match &args.arg_enode { &None => spec.nodes().clone(), &Some(ref enodes) => enodes.clone(), }; - - let mut service = ClientService::start(spec, &init_nodes).unwrap(); + let mut net_settings = NetworkConfiguration::new(); + net_settings.boot_nodes = init_nodes; + let mut service = ClientService::start(spec, net_settings).unwrap(); let io_handler = Arc::new(ClientIoHandler { client: service.client(), info: Default::default(), sync: service.sync() }); service.io().register_handler(io_handler).expect("Error registering IO handler"); diff --git a/src/client.rs b/src/client.rs index 6f47d060155..4461f3d7b0b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -162,14 +162,10 @@ impl Client { let db = Arc::new(DB::open(&opts, state_path.to_str().unwrap()).unwrap()); let engine = Arc::new(try!(spec.to_engine())); - { - let mut state_db = JournalDB::new_with_arc(db.clone()); - if engine.spec().ensure_db_good(&mut state_db) { - state_db.commit(0, &engine.spec().genesis_header().hash(), None).expect("Error commiting genesis state to state DB"); - } + let mut state_db = JournalDB::new_with_arc(db.clone()); + if engine.spec().ensure_db_good(&mut state_db) { + state_db.commit(0, &engine.spec().genesis_header().hash(), None).expect("Error commiting genesis state to state DB"); } - let state_db = JournalDB::new_with_arc(db); - Ok(Arc::new(Client { chain: chain, engine: engine.clone(), diff --git a/src/service.rs b/src/service.rs index 7e9c3ae416b..8c900d20a6e 100644 --- a/src/service.rs +++ b/src/service.rs @@ -26,8 +26,8 @@ pub struct ClientService { impl ClientService { /// Start the service in a separate thread. - pub fn start(spec: Spec, init_nodes: &[String]) -> Result { - let mut net_service = try!(NetworkService::start(init_nodes)); + pub fn start(spec: Spec, net_config: NetworkConfiguration) -> Result { + let mut net_service = try!(NetworkService::start(net_config)); info!("Starting {}", net_service.host_info()); info!("Configured for {} using {} engine", spec.name, spec.engine_name); let mut dir = env::home_dir().unwrap(); diff --git a/src/sync/chain.rs b/src/sync/chain.rs index c55cb0a6ee2..aaba701c265 100644 --- a/src/sync/chain.rs +++ b/src/sync/chain.rs @@ -475,6 +475,9 @@ impl ChainSync { } } }; + if max_height != x!(0) { + self.sync_peer(io, peer_id, true); + } Ok(()) } diff --git a/util/src/io/mod.rs b/util/src/io/mod.rs index 48c02f6ee8d..1906e74380b 100644 --- a/util/src/io/mod.rs +++ b/util/src/io/mod.rs @@ -74,6 +74,8 @@ pub trait IoHandler: Send + Sync where Message: Send + Sync + Clone + ' fn register_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop>) {} /// Re-register a stream with the event loop fn update_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop>) {} + /// Deregister a stream. Called whenstream is removed from event loop + fn deregister_stream(&self, _stream: StreamToken, _event_loop: &mut EventLoop>) {} } /// TODO [arkpar] Please document me diff --git a/util/src/io/service.rs b/util/src/io/service.rs index 8a165305627..9a3187f8eec 100644 --- a/util/src/io/service.rs +++ b/util/src/io/service.rs @@ -42,6 +42,10 @@ pub enum IoMessage where Message: Send + Clone + Sized { handler_id: HandlerId, token: StreamToken, }, + DeregisterStream { + handler_id: HandlerId, + token: StreamToken, + }, UpdateStreamRegistration { handler_id: HandlerId, token: StreamToken, @@ -83,6 +87,7 @@ impl IoContext where Message: Send + Clone + 'static { })); Ok(()) } + /// Register a new IO stream. pub fn register_stream(&self, token: StreamToken) -> Result<(), UtilError> { try!(self.channel.send_io(IoMessage::RegisterStream { @@ -92,6 +97,15 @@ impl IoContext where Message: Send + Clone + 'static { Ok(()) } + /// Deregister an IO stream. + pub fn deregister_stream(&self, token: StreamToken) -> Result<(), UtilError> { + try!(self.channel.send_io(IoMessage::DeregisterStream { + token: token, + handler_id: self.handler, + })); + Ok(()) + } + /// Reregister an IO stream. pub fn update_registration(&self, token: StreamToken) -> Result<(), UtilError> { try!(self.channel.send_io(IoMessage::UpdateStreamRegistration { @@ -214,6 +228,10 @@ impl Handler for IoManager where Message: Send + Clone + Sync let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone(); handler.register_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop); }, + IoMessage::DeregisterStream { handler_id, token } => { + let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone(); + handler.deregister_stream(token, event_loop); + }, IoMessage::UpdateStreamRegistration { handler_id, token } => { let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone(); handler.update_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop); diff --git a/util/src/network/connection.rs b/util/src/network/connection.rs index 33cafd70848..7ed8c3c181d 100644 --- a/util/src/network/connection.rs +++ b/util/src/network/connection.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::collections::VecDeque; use mio::{Handler, Token, EventSet, EventLoop, PollOpt, TryRead, TryWrite}; use mio::tcp::*; @@ -10,6 +11,7 @@ use error::*; use io::{IoContext, StreamToken}; use network::error::NetworkError; use network::handshake::Handshake; +use network::stats::NetworkStats; use crypto; use rcrypto::blockmodes::*; use rcrypto::aessafe::*; @@ -34,6 +36,8 @@ pub struct Connection { send_queue: VecDeque>, /// Event flags this connection expects interest: EventSet, + /// Shared network staistics + stats: Arc, } /// Connection write status. @@ -47,7 +51,7 @@ pub enum WriteStatus { impl Connection { /// Create a new connection with given id and socket. - pub fn new(token: StreamToken, socket: TcpStream) -> Connection { + pub fn new(token: StreamToken, socket: TcpStream, stats: Arc) -> Connection { Connection { token: token, socket: socket, @@ -55,6 +59,7 @@ impl Connection { rec_buf: Bytes::new(), rec_size: 0, interest: EventSet::hup() | EventSet::readable(), + stats: stats, } } @@ -68,7 +73,6 @@ impl Connection { } /// Readable IO handler. Called when there is some data to be read. - //TODO: return a slice pub fn readable(&mut self) -> io::Result> { if self.rec_size == 0 || self.rec_buf.len() >= self.rec_size { warn!(target:"net", "Unexpected connection read"); @@ -77,9 +81,12 @@ impl Connection { // resolve "multiple applicable items in scope [E0034]" error let sock_ref = ::by_ref(&mut self.socket); match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) { - Ok(Some(_)) if self.rec_buf.len() == self.rec_size => { - self.rec_size = 0; - Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new()))) + Ok(Some(size)) if size != 0 => { + self.stats.inc_recv(size); + if self.rec_size != 0 && self.rec_buf.len() == self.rec_size { + self.rec_size = 0; + Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new()))) + } else { Ok(None) } }, Ok(_) => Ok(None), Err(e) => Err(e), @@ -109,14 +116,17 @@ impl Connection { return Ok(WriteStatus::Complete) } match self.socket.try_write_buf(buf) { - Ok(_) if (buf.position() as usize) < send_size => { + Ok(Some(size)) if (buf.position() as usize) < send_size => { self.interest.insert(EventSet::writable()); + self.stats.inc_send(size); Ok(WriteStatus::Ongoing) }, - Ok(_) if (buf.position() as usize) == send_size => { + Ok(Some(size)) if (buf.position() as usize) == send_size => { + self.stats.inc_send(size); Ok(WriteStatus::Complete) }, - Ok(_) => { panic!("Wrote past buffer");}, + Ok(Some(_)) => { panic!("Wrote past buffer");}, + Ok(None) => Ok(WriteStatus::Ongoing), Err(e) => Err(e) } }.and_then(|r| { @@ -137,8 +147,8 @@ impl Connection { pub fn register_socket(&self, reg: Token, event_loop: &mut EventLoop) -> io::Result<()> { trace!(target: "net", "connection register; token={:?}", reg); event_loop.register(&self.socket, reg, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| { - error!("Failed to register {:?}, {:?}", reg, e); - Err(e) + debug!("Failed to register {:?}, {:?}", reg, e); + Ok(()) }) } @@ -146,10 +156,17 @@ impl Connection { pub fn update_socket(&self, reg: Token, event_loop: &mut EventLoop) -> io::Result<()> { trace!(target: "net", "connection reregister; token={:?}", reg); event_loop.reregister( &self.socket, reg, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| { - error!("Failed to reregister {:?}, {:?}", reg, e); - Err(e) + debug!("Failed to reregister {:?}, {:?}", reg, e); + Ok(()) }) } + + /// Delete connection registration. Should be called at the end of the IO handler. + pub fn deregister_socket(&self, event_loop: &mut EventLoop) -> io::Result<()> { + trace!(target: "net", "connection deregister; token={:?}", self.token); + event_loop.deregister(&self.socket).ok(); // ignore errors here + Ok(()) + } } /// RLPx packet @@ -371,6 +388,12 @@ impl EncryptedConnection { try!(self.connection.update_socket(reg, event_loop)); Ok(()) } + + /// Delete connection registration. This should be called at the end of the event loop. + pub fn deregister_socket(&self, event_loop: &mut EventLoop) -> Result<(), UtilError> { + try!(self.connection.deregister_socket(event_loop)); + Ok(()) + } } #[test] diff --git a/util/src/network/handshake.rs b/util/src/network/handshake.rs index ddeab17b71e..9b835d5bd7d 100644 --- a/util/src/network/handshake.rs +++ b/util/src/network/handshake.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use mio::*; use mio::tcp::*; use hash::*; @@ -10,6 +11,7 @@ use network::host::{HostInfo}; use network::node::NodeId; use error::*; use network::error::NetworkError; +use network::stats::NetworkStats; use io::{IoContext, StreamToken}; #[derive(PartialEq, Eq, Debug)] @@ -54,10 +56,10 @@ const HANDSHAKE_TIMEOUT: u64 = 30000; impl Handshake { /// Create a new handshake object - pub fn new(token: StreamToken, id: &NodeId, socket: TcpStream, nonce: &H256) -> Result { + pub fn new(token: StreamToken, id: Option<&NodeId>, socket: TcpStream, nonce: &H256, stats: Arc) -> Result { Ok(Handshake { - id: id.clone(), - connection: Connection::new(token, socket), + id: if let Some(id) = id { id.clone()} else { NodeId::new() }, + connection: Connection::new(token, socket, stats), originated: false, state: HandshakeState::New, ecdhe: try!(KeyPair::create()), @@ -134,32 +136,45 @@ impl Handshake { Ok(()) } + /// Delete registration + pub fn deregister_socket(&self, event_loop: &mut EventLoop) -> Result<(), UtilError> { + try!(self.connection.deregister_socket(event_loop)); + Ok(()) + } + /// Parse, validate and confirm auth message fn read_auth(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> { trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr()); - assert!(data.len() == AUTH_PACKET_SIZE); + if data.len() != AUTH_PACKET_SIZE { + debug!(target:"net", "Wrong auth packet size"); + return Err(From::from(NetworkError::BadProtocol)); + } self.auth_cipher = data.to_vec(); let auth = try!(ecies::decrypt(host.secret(), data)); let (sig, rest) = auth.split_at(65); let (hepubk, rest) = rest.split_at(32); let (pubk, rest) = rest.split_at(64); let (nonce, _) = rest.split_at(32); - self.remote_public.clone_from_slice(pubk); + self.id.clone_from_slice(pubk); self.remote_nonce.clone_from_slice(nonce); - let shared = try!(ecdh::agree(host.secret(), &self.remote_public)); + let shared = try!(ecdh::agree(host.secret(), &self.id)); let signature = Signature::from_slice(sig); let spub = try!(ec::recover(&signature, &(&shared ^ &self.remote_nonce))); + self.remote_public = spub.clone(); if &spub.sha3()[..] != hepubk { trace!(target:"net", "Handshake hash mismath with {:?}", self.connection.socket.peer_addr()); return Err(From::from(NetworkError::Auth)); }; - self.write_ack() + Ok(()) } /// Parse and validate ack message fn read_ack(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> { trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr()); - assert!(data.len() == ACK_PACKET_SIZE); + if data.len() != ACK_PACKET_SIZE { + debug!(target:"net", "Wrong ack packet size"); + return Err(From::from(NetworkError::BadProtocol)); + } self.ack_cipher = data.to_vec(); let ack = try!(ecies::decrypt(host.secret(), data)); self.remote_public.clone_from_slice(&ack[0..64]); diff --git a/util/src/network/host.rs b/util/src/network/host.rs index 06e7f5a4ebe..6f306ecb073 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -17,6 +17,7 @@ use error::*; use io::*; use network::NetworkProtocolHandler; use network::node::*; +use network::stats::NetworkStats; type Slab = ::slab::Slab; @@ -28,24 +29,45 @@ const IDEAL_PEERS: u32 = 10; const MAINTENANCE_TIMEOUT: u64 = 1000; #[derive(Debug)] -struct NetworkConfiguration { - listen_address: SocketAddr, - public_address: SocketAddr, - nat_enabled: bool, - discovery_enabled: bool, - pin: bool, +/// Network service configuration +pub struct NetworkConfiguration { + /// IP address to listen for incoming connections + pub listen_address: SocketAddr, + /// IP address to advertise + pub public_address: SocketAddr, + /// Enable NAT configuration + pub nat_enabled: bool, + /// Enable discovery + pub discovery_enabled: bool, + /// Pin to boot nodes only + pub pin: bool, + /// List of initial node addresses + pub boot_nodes: Vec, + /// Use provided node key instead of default + pub use_secret: Option, } impl NetworkConfiguration { - fn new() -> NetworkConfiguration { + /// Create a new instance of default settings. + pub fn new() -> NetworkConfiguration { NetworkConfiguration { listen_address: SocketAddr::from_str("0.0.0.0:30304").unwrap(), public_address: SocketAddr::from_str("0.0.0.0:30304").unwrap(), nat_enabled: true, discovery_enabled: true, pin: false, + boot_nodes: Vec::new(), + use_secret: None, } } + + /// Create new default configuration with sepcified listen port. + pub fn new_with_port(port: u16) -> NetworkConfiguration { + let mut config = NetworkConfiguration::new(); + config.listen_address = SocketAddr::from_str(&format!("0.0.0.0:{}", port)).unwrap(); + config.public_address = SocketAddr::from_str(&format!("0.0.0.0:{}", port)).unwrap(); + config + } } // Tokens @@ -243,18 +265,19 @@ pub struct Host where Message: Send + Sync + Clone { handlers: RwLock>>>, timers: RwLock>, timer_counter: RwLock, + stats: Arc, } impl Host where Message: Send + Sync + Clone { - pub fn new() -> Host { - let config = NetworkConfiguration::new(); + /// Create a new instance + pub fn new(config: NetworkConfiguration) -> Host { let addr = config.listen_address; // Setup the server socket let tcp_listener = TcpListener::bind(&addr).unwrap(); let udp_socket = UdpSocket::bound(&addr).unwrap(); - let host = Host:: { + let mut host = Host:: { info: RwLock::new(HostInfo { - keys: KeyPair::create().unwrap(), + keys: if let Some(ref secret) = config.use_secret { KeyPair::from_secret(secret.clone()).unwrap() } else { KeyPair::create().unwrap() }, config: config, nonce: H256::random(), protocol_version: 4, @@ -269,6 +292,7 @@ impl Host where Message: Send + Sync + Clone { handlers: RwLock::new(HashMap::new()), timers: RwLock::new(HashMap::new()), timer_counter: RwLock::new(LAST_CONNECTION + 1), + stats: Arc::new(NetworkStats::default()), }; let port = host.info.read().unwrap().config.listen_address.port(); host.info.write().unwrap().deref_mut().listen_port = port; @@ -278,9 +302,18 @@ impl Host where Message: Send + Sync + Clone { Some(iface) => config.public_address = iface.addr.unwrap(), None => warn!("No public network interface"), */ + + let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone(); + for n in boot_nodes { + host.add_node(&n); + } host } + pub fn stats(&self) -> Arc { + self.stats.clone() + } + pub fn add_node(&mut self, id: &str) { match Node::from_str(id) { Err(e) => { warn!("Could not add node: {:?}", e); }, @@ -358,7 +391,6 @@ impl Host where Message: Send + Sync + Clone { } #[allow(single_match)] - #[allow(block_in_if_condition_stmt)] fn connect_peer(&self, id: &NodeId, io: &IoContext>) { if self.have_session(id) { @@ -385,11 +417,16 @@ impl Host where Message: Send + Sync + Clone { } } }; + self.create_connection(socket, Some(id), io); + } + #[allow(block_in_if_condition_stmt)] + fn create_connection(&self, socket: TcpStream, id: Option<&NodeId>, io: &IoContext>) { let nonce = self.info.write().unwrap().next_nonce(); - if self.connections.write().unwrap().insert_with(|token| { - let mut handshake = Handshake::new(token, id, socket, &nonce).expect("Can't create handshake"); - handshake.start(io, &self.info.read().unwrap(), true).and_then(|_| io.register_stream(token)).unwrap_or_else (|e| { + let mut connections = self.connections.write().unwrap(); + if connections.insert_with(|token| { + let mut handshake = Handshake::new(token, id, socket, &nonce, self.stats.clone()).expect("Can't create handshake"); + handshake.start(io, &self.info.read().unwrap(), id.is_some()).and_then(|_| io.register_stream(token)).unwrap_or_else (|e| { debug!(target: "net", "Handshake create error: {:?}", e); }); Arc::new(Mutex::new(ConnectionEntry::Handshake(handshake))) @@ -398,8 +435,20 @@ impl Host where Message: Send + Sync + Clone { } } - fn accept(&self, _io: &IoContext>) { + fn accept(&self, io: &IoContext>) { trace!(target: "net", "accept"); + loop { + let socket = match self.tcp_listener.lock().unwrap().accept() { + Ok(None) => break, + Ok(Some((sock, _addr))) => sock, + Err(e) => { + warn!("Error accepting connection: {:?}", e); + break + }, + }; + self.create_connection(socket, None, io); + } + io.update_registration(TCP_ACCEPT).expect("Error registering TCP listener"); } #[allow(single_match)] @@ -508,11 +557,13 @@ impl Host where Message: Send + Sync + Clone { } fn start_session(&self, token: StreamToken, io: &IoContext>) { - self.connections.write().unwrap().replace_with(token, |c| { + let mut connections = self.connections.write().unwrap(); + connections.replace_with(token, |c| { match Arc::try_unwrap(c).ok().unwrap().into_inner().unwrap() { ConnectionEntry::Handshake(h) => { let session = Session::new(h, io, &self.info.read().unwrap()).expect("Session creation error"); io.update_registration(token).expect("Error updating session registration"); + self.stats.inc_sessions(); Some(Arc::new(Mutex::new(ConnectionEntry::Session(session)))) }, _ => { None } // handshake expired @@ -544,6 +595,7 @@ impl Host where Message: Send + Sync + Clone { _ => {}, } } + io.deregister_stream(token).expect("Error deregistering stream"); } for p in to_disconnect { let h = self.handlers.read().unwrap().get(p).unwrap().clone(); @@ -656,6 +708,24 @@ impl IoHandler> for Host where Messa } } + fn deregister_stream(&self, stream: StreamToken, event_loop: &mut EventLoop>>) { + match stream { + FIRST_CONNECTION ... LAST_CONNECTION => { + let mut connections = self.connections.write().unwrap(); + if let Some(connection) = connections.get(stream).cloned() { + match *connection.lock().unwrap().deref() { + ConnectionEntry::Handshake(ref h) => h.deregister_socket(event_loop).expect("Error deregistering socket"), + ConnectionEntry::Session(ref s) => s.deregister_socket(event_loop).expect("Error deregistering session socket"), + } + connections.remove(stream); + } + }, + NODETABLE_RECEIVE => event_loop.deregister(self.udp_socket.lock().unwrap().deref()).unwrap(), + TCP_ACCEPT => event_loop.deregister(self.tcp_listener.lock().unwrap().deref()).unwrap(), + _ => warn!("Unexpected stream deregistration") + } + } + fn update_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop>>) { match stream { FIRST_CONNECTION ... LAST_CONNECTION => { diff --git a/util/src/network/mod.rs b/util/src/network/mod.rs index 0c734442d1d..668cdc8b1ef 100644 --- a/util/src/network/mod.rs +++ b/util/src/network/mod.rs @@ -1,5 +1,4 @@ -/// Network and general IO module. -/// +/// Network and general IO module. /// Example usage for craeting a network service and adding an IO handler: /// /// ```rust @@ -40,7 +39,7 @@ /// } /// /// fn main () { -/// let mut service = NetworkService::::start().expect("Error creating network service"); +/// let mut service = NetworkService::::start(NetworkConfiguration::new()).expect("Error creating network service"); /// service.register_protocol(Arc::new(MyHandler), "myproto", &[1u8]); /// /// // Wait for quit condition @@ -56,21 +55,20 @@ mod discovery; mod service; mod error; mod node; +mod stats; + +#[cfg(test)] +mod tests; -/// TODO [arkpar] Please document me pub use network::host::PeerId; -/// TODO [arkpar] Please document me pub use network::host::PacketId; -/// TODO [arkpar] Please document me pub use network::host::NetworkContext; -/// TODO [arkpar] Please document me pub use network::service::NetworkService; -/// TODO [arkpar] Please document me pub use network::host::NetworkIoMessage; -/// TODO [arkpar] Please document me pub use network::host::NetworkIoMessage::User as UserMessage; -/// TODO [arkpar] Please document me pub use network::error::NetworkError; +pub use network::host::NetworkConfiguration; +pub use network::stats::NetworkStats; use io::TimerToken; @@ -92,44 +90,3 @@ pub trait NetworkProtocolHandler: Sync + Send where Message: Send + Syn fn message(&self, _io: &NetworkContext, _message: &Message) {} } - -#[test] -fn test_net_service() { - - use std::sync::Arc; - struct MyHandler; - - #[derive(Clone)] - struct MyMessage { - data: u32 - } - - impl NetworkProtocolHandler for MyHandler { - fn initialize(&self, io: &NetworkContext) { - io.register_timer(0, 1000).unwrap(); - } - - fn read(&self, _io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { - println!("Received {} ({} bytes) from {}", packet_id, data.len(), peer); - } - - fn connected(&self, _io: &NetworkContext, peer: &PeerId) { - println!("Connected {}", peer); - } - - fn disconnected(&self, _io: &NetworkContext, peer: &PeerId) { - println!("Disconnected {}", peer); - } - - fn timeout(&self, _io: &NetworkContext, timer: TimerToken) { - println!("Timeout {}", timer); - } - - fn message(&self, _io: &NetworkContext, message: &MyMessage) { - println!("Message {}", message.data); - } - } - - let mut service = NetworkService::::start().expect("Error creating network service"); - service.register_protocol(Arc::new(MyHandler), "myproto", &[1u8]).unwrap(); -} diff --git a/util/src/network/service.rs b/util/src/network/service.rs index 811730d4b58..cbf40087203 100644 --- a/util/src/network/service.rs +++ b/util/src/network/service.rs @@ -1,8 +1,9 @@ use std::sync::*; use error::*; -use network::{NetworkProtocolHandler}; +use network::{NetworkProtocolHandler, NetworkConfiguration}; use network::error::{NetworkError}; use network::host::{Host, NetworkIoMessage, ProtocolId}; +use network::stats::{NetworkStats}; use io::*; /// IO Service with networking @@ -10,21 +11,22 @@ use io::*; pub struct NetworkService where Message: Send + Sync + Clone + 'static { io_service: IoService>, host_info: String, + stats: Arc } impl NetworkService where Message: Send + Sync + Clone + 'static { /// Starts IO event loop - pub fn start(init_nodes: &[String]) -> Result, UtilError> { + pub fn start(config: NetworkConfiguration) -> Result, UtilError> { let mut io_service = try!(IoService::>::start()); - let mut host = Host::new(); - for n in init_nodes { host.add_node(&n); } - let host = Arc::new(host); + let host = Arc::new(Host::new(config)); + let stats = host.stats().clone(); let host_info = host.client_version(); info!("NetworkService::start(): id={:?}", host.client_id()); try!(io_service.register_handler(host)); Ok(NetworkService { io_service: io_service, host_info: host_info, + stats: stats, }) } @@ -47,5 +49,10 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat pub fn io(&mut self) -> &mut IoService> { &mut self.io_service } + + /// Returns underlying io service. + pub fn stats(&self) -> &NetworkStats { + &self.stats + } } diff --git a/util/src/network/session.rs b/util/src/network/session.rs index 8f580f47619..2817f008d06 100644 --- a/util/src/network/session.rs +++ b/util/src/network/session.rs @@ -131,6 +131,11 @@ impl Session { self.connection.update_socket(reg, event_loop) } + /// Delete registration + pub fn deregister_socket(&self, event_loop: &mut EventLoop) -> Result<(), UtilError> { + self.connection.deregister_socket(event_loop) + } + /// Send a protocol packet to peer. pub fn send_packet(&mut self, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), UtilError> { let mut i = 0usize; diff --git a/util/src/network/stats.rs b/util/src/network/stats.rs new file mode 100644 index 00000000000..02d9049853a --- /dev/null +++ b/util/src/network/stats.rs @@ -0,0 +1,51 @@ +//! Network Statistics +use std::sync::atomic::*; + +/// Network statistics structure +#[derive(Default, Debug)] +pub struct NetworkStats { + /// Bytes received + recv: AtomicUsize, + /// Bytes sent + send: AtomicUsize, + /// Total number of sessions created + sessions: AtomicUsize, +} + +impl NetworkStats { + /// Increase bytes received. + #[inline] + pub fn inc_recv(&self, size: usize) { + self.recv.fetch_add(size, Ordering::Relaxed); + } + + /// Increase bytes sent. + #[inline] + pub fn inc_send(&self, size: usize) { + self.send.fetch_add(size, Ordering::Relaxed); + } + + /// Increase number of sessions. + #[inline] + pub fn inc_sessions(&self) { + self.sessions.fetch_add(1, Ordering::Relaxed); + } + + /// Get bytes sent. + #[inline] + pub fn send(&self) -> usize { + self.send.load(Ordering::Relaxed) + } + + /// Get bytes received. + #[inline] + pub fn recv(&self) -> usize { + self.recv.load(Ordering::Relaxed) + } + + /// Get total number of sessions created. + #[inline] + pub fn sessions(&self) -> usize { + self.sessions.load(Ordering::Relaxed) + } +} diff --git a/util/src/network/tests.rs b/util/src/network/tests.rs new file mode 100644 index 00000000000..7b0870532e6 --- /dev/null +++ b/util/src/network/tests.rs @@ -0,0 +1,103 @@ +use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; +use std::thread; +use std::time::*; +use common::*; +use network::*; +use io::TimerToken; +use crypto::KeyPair; + +pub struct TestProtocol { + pub packet: Mutex, + pub got_timeout: AtomicBool, +} + +impl Default for TestProtocol { + fn default() -> Self { + TestProtocol { + packet: Mutex::new(Vec::new()), + got_timeout: AtomicBool::new(false), + } + } +} + +#[derive(Clone)] +pub struct TestProtocolMessage { + payload: u32, +} + +impl TestProtocol { + /// Creates and register protocol with the network service + pub fn register(service: &mut NetworkService) -> Arc { + let handler = Arc::new(TestProtocol::default()); + service.register_protocol(handler.clone(), "test", &[42u8, 43u8]).expect("Error registering test protocol handler"); + handler + } + + pub fn got_packet(&self) -> bool { + self.packet.lock().unwrap().deref()[..] == b"hello"[..] + } + + pub fn got_timeout(&self) -> bool { + self.got_timeout.load(AtomicOrdering::Relaxed) + } +} + +impl NetworkProtocolHandler for TestProtocol { + fn initialize(&self, io: &NetworkContext) { + io.register_timer(0, 10).unwrap(); + } + + fn read(&self, _io: &NetworkContext, _peer: &PeerId, packet_id: u8, data: &[u8]) { + assert_eq!(packet_id, 33); + self.packet.lock().unwrap().extend(data); + } + + fn connected(&self, io: &NetworkContext, _peer: &PeerId) { + io.respond(33, "hello".to_owned().into_bytes()).unwrap(); + } + + fn disconnected(&self, _io: &NetworkContext, _peer: &PeerId) { + } + + /// Timer function called after a timeout created with `NetworkContext::timeout`. + fn timeout(&self, _io: &NetworkContext, timer: TimerToken) { + assert_eq!(timer, 0); + self.got_timeout.store(true, AtomicOrdering::Relaxed); + } +} + + +#[test] +fn test_net_service() { + let mut service = NetworkService::::start(NetworkConfiguration::new()).expect("Error creating network service"); + service.register_protocol(Arc::new(TestProtocol::default()), "myproto", &[1u8]).unwrap(); +} + +#[test] +fn test_net_connect() { + let key1 = KeyPair::create().unwrap(); + let mut config1 = NetworkConfiguration::new_with_port(30344); + config1.use_secret = Some(key1.secret().clone()); + config1.boot_nodes = Some(vec![ ]); + let mut config2 = NetworkConfiguration::new_with_port(30345); + config2.boot_nodes = Some(vec![ format!("enode://{}@127.0.0.1:30344", key1.public().hex()) ]); + let mut service1 = NetworkService::::start(config1).unwrap(); + let mut service2 = NetworkService::::start(config2).unwrap(); + let handler1 = TestProtocol::register(&mut service1); + let handler2 = TestProtocol::register(&mut service2); + while !handler1.got_packet() && !handler2.got_packet() { + thread::sleep(Duration::from_millis(50)); + } + assert!(service1.stats().sessions() >= 1); + assert!(service2.stats().sessions() >= 1); +} + +#[test] +fn test_net_timeout() { + let config = NetworkConfiguration::new_with_port(30346); + let mut service = NetworkService::::start(config).unwrap(); + let handler = TestProtocol::register(&mut service); + while !handler.got_timeout() { + thread::sleep(Duration::from_millis(50)); + } +}