Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Networking fixes #202

Merged
merged 5 commits into from
Jan 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cov.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ fi

cargo test --no-run || exit $?
mkdir -p target/coverage
kcov --exclude-pattern ~/.multirust --include-pattern src --verify target/coverage target/debug/ethcore*
kcov --exclude-pattern ~/.multirust,rocksdb,secp256k1 --include-pattern src --verify target/coverage target/debug/ethcore*
xdg-open target/coverage/index.html
2 changes: 1 addition & 1 deletion res/ethereum/tests
Submodule tests updated 32 files
+390 −391 BasicTests/difficultyCustomHomestead.json
+7,440 −3,601 BasicTests/difficultyFrontier.json
+7,440 −3,600 BasicTests/difficultyHomestead.json
+7,440 −3,600 BasicTests/difficultyMorden.json
+7,440 −3,600 BasicTests/difficultyOlimpic.json
+6,077 −0 BlockchainTests/Homestead/bcInvalidRLPTest.json
+1,685 −1,685 BlockchainTests/Homestead/bcWalletTest.json
+281 −271 StateTests/Homestead/stCallCodes.json
+38 −38 StateTests/Homestead/stCallCreateCallCodeTest.json
+217 −209 StateTests/Homestead/stCallDelegateCodes.json
+198 −190 StateTests/Homestead/stCallDelegateCodesCallCode.json
+2,637 −0 StateTests/Homestead/stDelegatecallTest.json
+5 −5 StateTests/Homestead/stHomeSteadSpecific.json
+17 −17 StateTests/Homestead/stInitCodeTest.json
+46 −46 StateTests/Homestead/stLogTests.json
+6 −6 StateTests/Homestead/stMemoryStressTest.json
+65 −65 StateTests/Homestead/stMemoryTest.json
+89 −89 StateTests/Homestead/stPreCompiledContracts.json
+15 −15 StateTests/Homestead/stQuadraticComplexityTest.json
+2 −2 StateTests/Homestead/stRecursiveCreate.json
+17 −17 StateTests/Homestead/stRefundTest.json
+9 −9 StateTests/Homestead/stSpecialTest.json
+66 −74 StateTests/Homestead/stSystemOperationsTest.json
+40 −40 StateTests/Homestead/stTransactionTest.json
+84 −84 StateTests/Homestead/stWalletTest.json
+870 −293 StateTests/stCallCodes.json
+5 −13 StateTests/stCallCreateCallCodeTest.json
+8 −16 StateTests/stSystemOperationsTest.json
+11 −11 StateTests/stTransitionTest.json
+1 −1 TransactionTests/Homestead/tt10mbDataField.json
+57 −57 TransactionTests/Homestead/ttTransactionTest.json
+69 −69 TransactionTests/Homestead/ttWrongRLPTransaction.json
6 changes: 3 additions & 3 deletions src/bin/client/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ fn main() {
setup_log(&args.flag_logging);

let spec = ethereum::new_frontier();

let init_nodes = match &args.arg_enode {
&None => spec.nodes().clone(),
&Some(ref enodes) => enodes.clone(),
};

let mut service = ClientService::start(spec, &init_nodes).unwrap();
let mut net_settings = NetworkConfiguration::new();
net_settings.boot_nodes = init_nodes;
let mut service = ClientService::start(spec, net_settings).unwrap();
let io_handler = Arc::new(ClientIoHandler { client: service.client(), info: Default::default(), sync: service.sync() });
service.io().register_handler(io_handler).expect("Error registering IO handler");

Expand Down
10 changes: 3 additions & 7 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,10 @@ impl Client {
let db = Arc::new(DB::open(&opts, state_path.to_str().unwrap()).unwrap());

let engine = Arc::new(try!(spec.to_engine()));
{
let mut state_db = JournalDB::new_with_arc(db.clone());
if engine.spec().ensure_db_good(&mut state_db) {
state_db.commit(0, &engine.spec().genesis_header().hash(), None).expect("Error commiting genesis state to state DB");
}
let mut state_db = JournalDB::new_with_arc(db.clone());
if engine.spec().ensure_db_good(&mut state_db) {
state_db.commit(0, &engine.spec().genesis_header().hash(), None).expect("Error commiting genesis state to state DB");
}
let state_db = JournalDB::new_with_arc(db);

Ok(Arc::new(Client {
chain: chain,
engine: engine.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ pub struct ClientService {

impl ClientService {
/// Start the service in a separate thread.
pub fn start(spec: Spec, init_nodes: &[String]) -> Result<ClientService, Error> {
let mut net_service = try!(NetworkService::start(init_nodes));
pub fn start(spec: Spec, net_config: NetworkConfiguration) -> Result<ClientService, Error> {
let mut net_service = try!(NetworkService::start(net_config));
info!("Starting {}", net_service.host_info());
info!("Configured for {} using {} engine", spec.name, spec.engine_name);
let mut dir = env::home_dir().unwrap();
Expand Down
3 changes: 3 additions & 0 deletions src/sync/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,9 @@ impl ChainSync {
}
}
};
if max_height != x!(0) {
self.sync_peer(io, peer_id, true);
}
Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions util/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ pub trait IoHandler<Message>: Send + Sync where Message: Send + Sync + Clone + '
fn register_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop<IoManager<Message>>) {}
/// Re-register a stream with the event loop
fn update_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop<IoManager<Message>>) {}
/// Deregister a stream. Called whenstream is removed from event loop
fn deregister_stream(&self, _stream: StreamToken, _event_loop: &mut EventLoop<IoManager<Message>>) {}
}

/// TODO [arkpar] Please document me
Expand Down
18 changes: 18 additions & 0 deletions util/src/io/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ pub enum IoMessage<Message> where Message: Send + Clone + Sized {
handler_id: HandlerId,
token: StreamToken,
},
DeregisterStream {
handler_id: HandlerId,
token: StreamToken,
},
UpdateStreamRegistration {
handler_id: HandlerId,
token: StreamToken,
Expand Down Expand Up @@ -83,6 +87,7 @@ impl<Message> IoContext<Message> where Message: Send + Clone + 'static {
}));
Ok(())
}

/// Register a new IO stream.
pub fn register_stream(&self, token: StreamToken) -> Result<(), UtilError> {
try!(self.channel.send_io(IoMessage::RegisterStream {
Expand All @@ -92,6 +97,15 @@ impl<Message> IoContext<Message> where Message: Send + Clone + 'static {
Ok(())
}

/// Deregister an IO stream.
pub fn deregister_stream(&self, token: StreamToken) -> Result<(), UtilError> {
try!(self.channel.send_io(IoMessage::DeregisterStream {
token: token,
handler_id: self.handler,
}));
Ok(())
}

/// Reregister an IO stream.
pub fn update_registration(&self, token: StreamToken) -> Result<(), UtilError> {
try!(self.channel.send_io(IoMessage::UpdateStreamRegistration {
Expand Down Expand Up @@ -214,6 +228,10 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone();
handler.register_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
},
IoMessage::DeregisterStream { handler_id, token } => {
let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone();
handler.deregister_stream(token, event_loop);
},
IoMessage::UpdateStreamRegistration { handler_id, token } => {
let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone();
handler.update_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
Expand Down
47 changes: 35 additions & 12 deletions util/src/network/connection.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::Arc;
use std::collections::VecDeque;
use mio::{Handler, Token, EventSet, EventLoop, PollOpt, TryRead, TryWrite};
use mio::tcp::*;
Expand All @@ -10,6 +11,7 @@ use error::*;
use io::{IoContext, StreamToken};
use network::error::NetworkError;
use network::handshake::Handshake;
use network::stats::NetworkStats;
use crypto;
use rcrypto::blockmodes::*;
use rcrypto::aessafe::*;
Expand All @@ -34,6 +36,8 @@ pub struct Connection {
send_queue: VecDeque<Cursor<Bytes>>,
/// Event flags this connection expects
interest: EventSet,
/// Shared network staistics
stats: Arc<NetworkStats>,
}

/// Connection write status.
Expand All @@ -47,14 +51,15 @@ pub enum WriteStatus {

impl Connection {
/// Create a new connection with given id and socket.
pub fn new(token: StreamToken, socket: TcpStream) -> Connection {
pub fn new(token: StreamToken, socket: TcpStream, stats: Arc<NetworkStats>) -> Connection {
Connection {
token: token,
socket: socket,
send_queue: VecDeque::new(),
rec_buf: Bytes::new(),
rec_size: 0,
interest: EventSet::hup() | EventSet::readable(),
stats: stats,
}
}

Expand All @@ -68,7 +73,6 @@ impl Connection {
}

/// Readable IO handler. Called when there is some data to be read.
//TODO: return a slice
pub fn readable(&mut self) -> io::Result<Option<Bytes>> {
if self.rec_size == 0 || self.rec_buf.len() >= self.rec_size {
warn!(target:"net", "Unexpected connection read");
Expand All @@ -77,9 +81,12 @@ impl Connection {
// resolve "multiple applicable items in scope [E0034]" error
let sock_ref = <TcpStream as Read>::by_ref(&mut self.socket);
match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) {
Ok(Some(_)) if self.rec_buf.len() == self.rec_size => {
self.rec_size = 0;
Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new())))
Ok(Some(size)) if size != 0 => {
self.stats.inc_recv(size);
if self.rec_size != 0 && self.rec_buf.len() == self.rec_size {
self.rec_size = 0;
Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new())))
} else { Ok(None) }
},
Ok(_) => Ok(None),
Err(e) => Err(e),
Expand Down Expand Up @@ -109,14 +116,17 @@ impl Connection {
return Ok(WriteStatus::Complete)
}
match self.socket.try_write_buf(buf) {
Ok(_) if (buf.position() as usize) < send_size => {
Ok(Some(size)) if (buf.position() as usize) < send_size => {
self.interest.insert(EventSet::writable());
self.stats.inc_send(size);
Ok(WriteStatus::Ongoing)
},
Ok(_) if (buf.position() as usize) == send_size => {
Ok(Some(size)) if (buf.position() as usize) == send_size => {
self.stats.inc_send(size);
Ok(WriteStatus::Complete)
},
Ok(_) => { panic!("Wrote past buffer");},
Ok(Some(_)) => { panic!("Wrote past buffer");},
Ok(None) => Ok(WriteStatus::Ongoing),
Err(e) => Err(e)
}
}.and_then(|r| {
Expand All @@ -137,19 +147,26 @@ impl Connection {
pub fn register_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
trace!(target: "net", "connection register; token={:?}", reg);
event_loop.register(&self.socket, reg, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| {
error!("Failed to register {:?}, {:?}", reg, e);
Err(e)
debug!("Failed to register {:?}, {:?}", reg, e);
Ok(())
})
}

/// Update connection registration. Should be called at the end of the IO handler.
pub fn update_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
trace!(target: "net", "connection reregister; token={:?}", reg);
event_loop.reregister( &self.socket, reg, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| {
error!("Failed to reregister {:?}, {:?}", reg, e);
Err(e)
debug!("Failed to reregister {:?}, {:?}", reg, e);
Ok(())
})
}

/// Delete connection registration. Should be called at the end of the IO handler.
pub fn deregister_socket<Host: Handler>(&self, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
trace!(target: "net", "connection deregister; token={:?}", self.token);
event_loop.deregister(&self.socket).ok(); // ignore errors here
Ok(())
}
}

/// RLPx packet
Expand Down Expand Up @@ -371,6 +388,12 @@ impl EncryptedConnection {
try!(self.connection.update_socket(reg, event_loop));
Ok(())
}

/// Delete connection registration. This should be called at the end of the event loop.
pub fn deregister_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
try!(self.connection.deregister_socket(event_loop));
Ok(())
}
}

#[test]
Expand Down
31 changes: 23 additions & 8 deletions util/src/network/handshake.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::Arc;
use mio::*;
use mio::tcp::*;
use hash::*;
Expand All @@ -10,6 +11,7 @@ use network::host::{HostInfo};
use network::node::NodeId;
use error::*;
use network::error::NetworkError;
use network::stats::NetworkStats;
use io::{IoContext, StreamToken};

#[derive(PartialEq, Eq, Debug)]
Expand Down Expand Up @@ -54,10 +56,10 @@ const HANDSHAKE_TIMEOUT: u64 = 30000;

impl Handshake {
/// Create a new handshake object
pub fn new(token: StreamToken, id: &NodeId, socket: TcpStream, nonce: &H256) -> Result<Handshake, UtilError> {
pub fn new(token: StreamToken, id: Option<&NodeId>, socket: TcpStream, nonce: &H256, stats: Arc<NetworkStats>) -> Result<Handshake, UtilError> {
Ok(Handshake {
id: id.clone(),
connection: Connection::new(token, socket),
id: if let Some(id) = id { id.clone()} else { NodeId::new() },
connection: Connection::new(token, socket, stats),
originated: false,
state: HandshakeState::New,
ecdhe: try!(KeyPair::create()),
Expand Down Expand Up @@ -134,32 +136,45 @@ impl Handshake {
Ok(())
}

/// Delete registration
pub fn deregister_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
try!(self.connection.deregister_socket(event_loop));
Ok(())
}

/// Parse, validate and confirm auth message
fn read_auth(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> {
trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr());
assert!(data.len() == AUTH_PACKET_SIZE);
if data.len() != AUTH_PACKET_SIZE {
debug!(target:"net", "Wrong auth packet size");
return Err(From::from(NetworkError::BadProtocol));
}
self.auth_cipher = data.to_vec();
let auth = try!(ecies::decrypt(host.secret(), data));
let (sig, rest) = auth.split_at(65);
let (hepubk, rest) = rest.split_at(32);
let (pubk, rest) = rest.split_at(64);
let (nonce, _) = rest.split_at(32);
self.remote_public.clone_from_slice(pubk);
self.id.clone_from_slice(pubk);
self.remote_nonce.clone_from_slice(nonce);
let shared = try!(ecdh::agree(host.secret(), &self.remote_public));
let shared = try!(ecdh::agree(host.secret(), &self.id));
let signature = Signature::from_slice(sig);
let spub = try!(ec::recover(&signature, &(&shared ^ &self.remote_nonce)));
self.remote_public = spub.clone();
if &spub.sha3()[..] != hepubk {
trace!(target:"net", "Handshake hash mismath with {:?}", self.connection.socket.peer_addr());
return Err(From::from(NetworkError::Auth));
};
self.write_ack()
Ok(())
}

/// Parse and validate ack message
fn read_ack(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> {
trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr());
assert!(data.len() == ACK_PACKET_SIZE);
if data.len() != ACK_PACKET_SIZE {
debug!(target:"net", "Wrong ack packet size");
return Err(From::from(NetworkError::BadProtocol));
}
self.ack_cipher = data.to_vec();
let ack = try!(ecies::decrypt(host.secret(), data));
self.remote_public.clone_from_slice(&ack[0..64]);
Expand Down
Loading