From 206cb6b22775d7c5eb93ce86fa545eef4624064a Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Wed, 3 Feb 2016 03:55:18 +0300 Subject: [PATCH 1/6] decoupling sockets from logic for tests --- cov.sh | 2 +- util/src/network/connection.rs | 73 +++++++++++++++++++--------------- 2 files changed, 41 insertions(+), 34 deletions(-) diff --git a/cov.sh b/cov.sh index a78e36205c3..eeb4d21f65b 100755 --- a/cov.sh +++ b/cov.sh @@ -15,7 +15,7 @@ if ! type kcov > /dev/null; then exit 1 fi -cargo test -p ethcore --no-run || exit $? +cargo test --features ethcore/json-tests -p ethcore --no-run || exit $? mkdir -p target/coverage kcov --exclude-pattern ~/.multirust,rocksdb,secp256k1 --include-pattern src --verify target/coverage target/debug/deps/ethcore* xdg-open target/coverage/index.html diff --git a/util/src/network/connection.rs b/util/src/network/connection.rs index 7ed8c3c181d..f553dbb54c3 100644 --- a/util/src/network/connection.rs +++ b/util/src/network/connection.rs @@ -6,7 +6,7 @@ use hash::*; use sha3::*; use bytes::*; use rlp::*; -use std::io::{self, Cursor, Read}; +use std::io::{self, Cursor, Read, Write}; use error::*; use io::{IoContext, StreamToken}; use network::error::NetworkError; @@ -22,12 +22,17 @@ use tiny_keccak::Keccak; const ENCRYPTED_HEADER_LEN: usize = 32; const RECIEVE_PAYLOAD_TIMEOUT: u64 = 30000; -/// Low level tcp connection -pub struct Connection { +pub trait GenericSocket : Sized + Read + Write { +} + +impl GenericSocket for TcpStream { +} + +pub struct GenericConnection { /// Connection id (token) pub token: StreamToken, /// Network socket - pub socket: TcpStream, + pub socket: Socket, /// Receive buffer rec_buf: Bytes, /// Expected size @@ -36,34 +41,11 @@ pub struct Connection { send_queue: VecDeque>, /// Event flags this connection expects interest: EventSet, - /// Shared network staistics + /// Shared network statistics stats: Arc, } -/// Connection write status. -#[derive(PartialEq, Eq)] -pub enum WriteStatus { - /// Some data is still pending for current packet - Ongoing, - /// All data sent. - Complete -} - -impl Connection { - /// Create a new connection with given id and socket. - pub fn new(token: StreamToken, socket: TcpStream, stats: Arc) -> Connection { - Connection { - token: token, - socket: socket, - send_queue: VecDeque::new(), - rec_buf: Bytes::new(), - rec_size: 0, - interest: EventSet::hup() | EventSet::readable(), - stats: stats, - } - } - - /// Put a connection into read mode. Receiving up `size` bytes of data. +impl GenericConnection { pub fn expect(&mut self, size: usize) { if self.rec_size != self.rec_buf.len() { warn!(target:"net", "Unexpected connection read start"); @@ -79,7 +61,7 @@ impl Connection { } let max = self.rec_size - self.rec_buf.len(); // resolve "multiple applicable items in scope [E0034]" error - let sock_ref = ::by_ref(&mut self.socket); + let sock_ref = ::by_ref(&mut self.socket); match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) { Ok(Some(size)) if size != 0 => { self.stats.inc_recv(size); @@ -142,6 +124,24 @@ impl Connection { Ok(r) }) } +} + +/// Low level tcp connection +pub type Connection = GenericConnection; + +impl Connection { + /// Create a new connection with given id and socket. + pub fn new(token: StreamToken, socket: TcpStream, stats: Arc) -> Connection { + Connection { + token: token, + socket: socket, + send_queue: VecDeque::new(), + rec_buf: Bytes::new(), + rec_size: 0, + interest: EventSet::hup() | EventSet::readable(), + stats: stats, + } + } /// Register this connection with the IO event loop. pub fn register_socket(&self, reg: Token, event_loop: &mut EventLoop) -> io::Result<()> { @@ -169,6 +169,15 @@ impl Connection { } } +/// Connection write status. +#[derive(PartialEq, Eq)] +pub enum WriteStatus { + /// Some data is still pending for current packet + Ongoing, + /// All data sent. + Complete +} + /// RLPx packet pub struct Packet { pub protocol: u16, @@ -416,6 +425,4 @@ pub fn test_encryption() { encoder.encrypt(&mut RefReadBuffer::new(&before2), &mut RefWriteBuffer::new(&mut got), true).unwrap(); encoder.reset(); assert_eq!(got, after2); -} - - +} \ No newline at end of file From e86a680b23cca9921a489f097521523559aad6cd Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Wed, 3 Feb 2016 14:04:24 +0300 Subject: [PATCH 2/6] test sockets, connections setup --- util/src/network/connection.rs | 2 +- util/src/network/tests.rs | 66 ++++++++++++++++++++++++++++++++-- 2 files changed, 64 insertions(+), 4 deletions(-) diff --git a/util/src/network/connection.rs b/util/src/network/connection.rs index f553dbb54c3..fa6bb7f9a62 100644 --- a/util/src/network/connection.rs +++ b/util/src/network/connection.rs @@ -22,7 +22,7 @@ use tiny_keccak::Keccak; const ENCRYPTED_HEADER_LEN: usize = 32; const RECIEVE_PAYLOAD_TIMEOUT: u64 = 30000; -pub trait GenericSocket : Sized + Read + Write { +pub trait GenericSocket : Read + Write { } impl GenericSocket for TcpStream { diff --git a/util/src/network/tests.rs b/util/src/network/tests.rs index 06966abb5f3..b2594353134 100644 --- a/util/src/network/tests.rs +++ b/util/src/network/tests.rs @@ -20,6 +20,61 @@ impl Default for TestProtocol { } } +struct TestSocket { + read_buffer: Vec, + write_buffer: Vec, + cursor: usize, +} + +impl TestSocket { + fn new() -> TestSocket { + TestSocket { + read_buffer: vec![], + write_buffer: vec![] + } + } +} + +impl Read for TestSocket { + fn read(&mut self, buf: &mut [u8]) -> Result { + let end_position = cmp::min(self.read_buffer.len(), self.cursor+buf.len()); + let len = cmp::max(self.end_position - self.cursor, 0); + match len { + 0 => Ok(0), + _ => { + bytes::copy_memory(buf, &mut self.read_buffer[cursor..len]); + self.cursor = self.cursor + buf.len(); + Ok(len); + } + } + } +} + +impl Write for TestSocket { + fn write(&mut self, buf: &mut [u8]) -> Result { + self.write_buffer.extend(buf.iter().cloned()); + Ok(buf.len()); + } +} + +impl GenericSocket for TestSocket {} + +type TestConnection = GenericConnection; + +impl TestConnection { + pub fn new() -> Connection { + Connection { + token: 999998888usize, + socket: TestSocket::new(), + send_queue: VecDeque::new(), + rec_buf: Bytes::new(), + rec_size: 0, + interest: EventSet::hup() | EventSet::readable(), + stats: Arc::::new(), + } + } +} + #[derive(Clone)] pub struct TestProtocolMessage { payload: u32, @@ -68,13 +123,13 @@ impl NetworkProtocolHandler for TestProtocol { #[test] -fn test_net_service() { +fn net_service() { let mut service = NetworkService::::start(NetworkConfiguration::new()).expect("Error creating network service"); service.register_protocol(Arc::new(TestProtocol::default()), "myproto", &[1u8]).unwrap(); } #[test] -fn test_net_connect() { +fn net_connect() { let key1 = KeyPair::create().unwrap(); let mut config1 = NetworkConfiguration::new_with_port(30344); config1.use_secret = Some(key1.secret().clone()); @@ -93,7 +148,7 @@ fn test_net_connect() { } #[test] -fn test_net_timeout() { +fn net_timeout() { let config = NetworkConfiguration::new_with_port(30346); let mut service = NetworkService::::start(config).unwrap(); let handler = TestProtocol::register(&mut service); @@ -101,3 +156,8 @@ fn test_net_timeout() { thread::sleep(Duration::from_millis(50)); } } + +#[test] +fn connection_expect() { + let connection = TestConnection::new(); +} \ No newline at end of file From df18d8104d43962babd73cf1db6e59dd18dd8dc5 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Wed, 3 Feb 2016 18:05:46 +0300 Subject: [PATCH 3/6] final network tests di setup --- util/src/network/connection.rs | 81 ++++++++++++++++++++++++++++++++++ util/src/network/stats.rs | 9 ++++ util/src/network/tests.rs | 60 ------------------------- 3 files changed, 90 insertions(+), 60 deletions(-) diff --git a/util/src/network/connection.rs b/util/src/network/connection.rs index fa6bb7f9a62..acb41bfcd74 100644 --- a/util/src/network/connection.rs +++ b/util/src/network/connection.rs @@ -425,4 +425,85 @@ pub fn test_encryption() { encoder.encrypt(&mut RefReadBuffer::new(&before2), &mut RefWriteBuffer::new(&mut got), true).unwrap(); encoder.reset(); assert_eq!(got, after2); +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::*; + use super::super::stats::*; + use std::io::{Read, Write, Error}; + use std::cmp; + use mio::{EventSet}; + use std::collections::VecDeque; + use bytes::*; + + struct TestSocket { + read_buffer: Vec, + write_buffer: Vec, + cursor: usize, + } + + impl TestSocket { + fn new() -> TestSocket { + TestSocket { + read_buffer: vec![], + write_buffer: vec![], + cursor: 0, + } + } + } + + impl Read for TestSocket { + fn read(&mut self, buf: &mut [u8]) -> Result { + let end_position = cmp::min(self.read_buffer.len(), self.cursor+buf.len()); + let len = cmp::max(end_position - self.cursor, 0); + match len { + 0 => Ok(0), + _ => { + for i in self.cursor..end_position { + buf[i-self.cursor] = self.read_buffer[i]; + } + self.cursor = self.cursor + buf.len(); + Ok(len) + } + } + } + } + + impl Write for TestSocket { + fn write(&mut self, buf: &[u8]) -> Result { + self.write_buffer.extend(buf.iter().cloned()); + Ok(buf.len()) + } + + fn flush(&mut self) -> Result<(), Error> { + unimplemented!(); + } + } + + impl GenericSocket for TestSocket {} + + type TestConnection = GenericConnection; + + impl TestConnection { + pub fn new() -> TestConnection { + TestConnection { + token: 999998888usize, + socket: TestSocket::new(), + send_queue: VecDeque::new(), + rec_buf: Bytes::new(), + rec_size: 0, + interest: EventSet::hup() | EventSet::readable(), + stats: Arc::::new(NetworkStats::new()), + } + } + } + + #[test] + fn connection_expect() { + let mut connection = TestConnection::new(); + connection.expect(1024); + assert_eq!(connection.rec_size, 1024); + } } \ No newline at end of file diff --git a/util/src/network/stats.rs b/util/src/network/stats.rs index 02d9049853a..4f6c4be2a42 100644 --- a/util/src/network/stats.rs +++ b/util/src/network/stats.rs @@ -48,4 +48,13 @@ impl NetworkStats { pub fn sessions(&self) -> usize { self.sessions.load(Ordering::Relaxed) } + + #[cfg(test)] + pub fn new() -> NetworkStats { + NetworkStats { + recv: AtomicUsize::new(0), + send: AtomicUsize::new(0), + sessions: AtomicUsize::new(0), + } + } } diff --git a/util/src/network/tests.rs b/util/src/network/tests.rs index b2594353134..bccd4c27d21 100644 --- a/util/src/network/tests.rs +++ b/util/src/network/tests.rs @@ -20,61 +20,6 @@ impl Default for TestProtocol { } } -struct TestSocket { - read_buffer: Vec, - write_buffer: Vec, - cursor: usize, -} - -impl TestSocket { - fn new() -> TestSocket { - TestSocket { - read_buffer: vec![], - write_buffer: vec![] - } - } -} - -impl Read for TestSocket { - fn read(&mut self, buf: &mut [u8]) -> Result { - let end_position = cmp::min(self.read_buffer.len(), self.cursor+buf.len()); - let len = cmp::max(self.end_position - self.cursor, 0); - match len { - 0 => Ok(0), - _ => { - bytes::copy_memory(buf, &mut self.read_buffer[cursor..len]); - self.cursor = self.cursor + buf.len(); - Ok(len); - } - } - } -} - -impl Write for TestSocket { - fn write(&mut self, buf: &mut [u8]) -> Result { - self.write_buffer.extend(buf.iter().cloned()); - Ok(buf.len()); - } -} - -impl GenericSocket for TestSocket {} - -type TestConnection = GenericConnection; - -impl TestConnection { - pub fn new() -> Connection { - Connection { - token: 999998888usize, - socket: TestSocket::new(), - send_queue: VecDeque::new(), - rec_buf: Bytes::new(), - rec_size: 0, - interest: EventSet::hup() | EventSet::readable(), - stats: Arc::::new(), - } - } -} - #[derive(Clone)] pub struct TestProtocolMessage { payload: u32, @@ -156,8 +101,3 @@ fn net_timeout() { thread::sleep(Duration::from_millis(50)); } } - -#[test] -fn connection_expect() { - let connection = TestConnection::new(); -} \ No newline at end of file From 834f8a07eb97eb1a01b099dfc0bd79789738fb09 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Wed, 3 Feb 2016 18:40:54 +0300 Subject: [PATCH 4/6] socket write tests --- util/src/network/connection.rs | 59 +++++++++++++++++++++++++++++++--- 1 file changed, 55 insertions(+), 4 deletions(-) diff --git a/util/src/network/connection.rs b/util/src/network/connection.rs index acb41bfcd74..2ff09f64a95 100644 --- a/util/src/network/connection.rs +++ b/util/src/network/connection.rs @@ -432,7 +432,7 @@ mod tests { use super::*; use std::sync::*; use super::super::stats::*; - use std::io::{Read, Write, Error}; + use std::io::{Read, Write, Error, Cursor}; use std::cmp; use mio::{EventSet}; use std::collections::VecDeque; @@ -442,6 +442,7 @@ mod tests { read_buffer: Vec, write_buffer: Vec, cursor: usize, + buf_size: usize, } impl TestSocket { @@ -450,6 +451,16 @@ mod tests { read_buffer: vec![], write_buffer: vec![], cursor: 0, + buf_size: 0, + } + } + + fn new_buf(buf_size: usize) -> TestSocket { + TestSocket { + read_buffer: vec![], + write_buffer: vec![], + cursor: 0, + buf_size: buf_size, } } } @@ -473,8 +484,14 @@ mod tests { impl Write for TestSocket { fn write(&mut self, buf: &[u8]) -> Result { - self.write_buffer.extend(buf.iter().cloned()); - Ok(buf.len()) + if self.buf_size == 0 || buf.len() < self.buf_size { + self.write_buffer.extend(buf.iter().cloned()); + Ok(buf.len()) + } + else { + self.write_buffer.extend(buf.iter().take(self.buf_size).cloned()); + Ok(self.buf_size) + } } fn flush(&mut self) -> Result<(), Error> { @@ -504,6 +521,40 @@ mod tests { fn connection_expect() { let mut connection = TestConnection::new(); connection.expect(1024); - assert_eq!(connection.rec_size, 1024); + assert_eq!(1024, connection.rec_size); + } + + #[test] + fn connection_write_empty() { + let mut connection = TestConnection::new(); + let status = connection.writable(); + assert!(status.is_ok()); + assert!(WriteStatus::Complete == status.unwrap()); + } + + #[test] + fn connection_write() { + let mut connection = TestConnection::new(); + let data = Cursor::new(vec![0; 10240]); + connection.send_queue.push_back(data); + + let status = connection.writable(); + assert!(status.is_ok()); + assert!(WriteStatus::Complete == status.unwrap()); + assert_eq!(10240, connection.socket.write_buffer.len()); + } + + #[test] + fn connection_write_is_buffered() { + let mut connection = TestConnection::new(); + connection.socket = TestSocket::new_buf(1024); + let data = Cursor::new(vec![0; 10240]); + connection.send_queue.push_back(data); + + let status = connection.writable(); + + assert!(status.is_ok()); + assert!(WriteStatus::Ongoing == status.unwrap()); + assert_eq!(1024, connection.socket.write_buffer.len()); } } \ No newline at end of file From 84e1c77938bee0b29503310cbfb5ab3e91fac93c Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Wed, 3 Feb 2016 18:59:35 +0300 Subject: [PATCH 5/6] broken sockets --- util/src/network/connection.rs | 52 +++++++++++++++++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/util/src/network/connection.rs b/util/src/network/connection.rs index 2ff09f64a95..0ca127eccb3 100644 --- a/util/src/network/connection.rs +++ b/util/src/network/connection.rs @@ -432,7 +432,7 @@ mod tests { use super::*; use std::sync::*; use super::super::stats::*; - use std::io::{Read, Write, Error, Cursor}; + use std::io::{Read, Write, Error, Cursor, ErrorKind}; use std::cmp; use mio::{EventSet}; use std::collections::VecDeque; @@ -501,6 +501,28 @@ mod tests { impl GenericSocket for TestSocket {} + struct TestBrokenSocket { + error: String + } + + impl Read for TestBrokenSocket { + fn read(&mut self, _: &mut [u8]) -> Result { + Err(Error::new(ErrorKind::Other, self.error.clone())) + } + } + + impl Write for TestBrokenSocket { + fn write(&mut self, _: &[u8]) -> Result { + Err(Error::new(ErrorKind::Other, self.error.clone())) + } + + fn flush(&mut self) -> Result<(), Error> { + unimplemented!(); + } + } + + impl GenericSocket for TestBrokenSocket {} + type TestConnection = GenericConnection; impl TestConnection { @@ -517,6 +539,22 @@ mod tests { } } + type TestBrokenConnection = GenericConnection; + + impl TestBrokenConnection { + pub fn new() -> TestBrokenConnection { + TestBrokenConnection { + token: 999998888usize, + socket: TestBrokenSocket { error: "test broken socket".to_owned() }, + send_queue: VecDeque::new(), + rec_buf: Bytes::new(), + rec_size: 0, + interest: EventSet::hup() | EventSet::readable(), + stats: Arc::::new(NetworkStats::new()), + } + } + } + #[test] fn connection_expect() { let mut connection = TestConnection::new(); @@ -557,4 +595,16 @@ mod tests { assert!(WriteStatus::Ongoing == status.unwrap()); assert_eq!(1024, connection.socket.write_buffer.len()); } + + #[test] + fn connection_write_to_broken_socket() { + let mut connection = TestBrokenConnection::new(); + let data = Cursor::new(vec![0; 10240]); + connection.send_queue.push_back(data); + + let status = connection.writable(); + + assert!(!status.is_ok()); + assert_eq!(1, connection.send_queue.len()); + } } \ No newline at end of file From e154e3e7fa04fc39f4adc9315d225a043458fb0a Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Wed, 3 Feb 2016 19:06:16 +0300 Subject: [PATCH 6/6] fixed indents --- util/src/network/stats.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/util/src/network/stats.rs b/util/src/network/stats.rs index 4f6c4be2a42..39ad4e079f8 100644 --- a/util/src/network/stats.rs +++ b/util/src/network/stats.rs @@ -52,9 +52,9 @@ impl NetworkStats { #[cfg(test)] pub fn new() -> NetworkStats { NetworkStats { - recv: AtomicUsize::new(0), - send: AtomicUsize::new(0), - sessions: AtomicUsize::new(0), + recv: AtomicUsize::new(0), + send: AtomicUsize::new(0), + sessions: AtomicUsize::new(0), } } }