Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Network mod tests first part #329

Merged
merged 7 commits into from
Feb 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cov.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ if ! type kcov > /dev/null; then
exit 1
fi

cargo test -p ethcore --no-run || exit $?
cargo test --features ethcore/json-tests -p ethcore --no-run || exit $?
mkdir -p target/coverage
kcov --exclude-pattern ~/.multirust,rocksdb,secp256k1 --include-pattern src --verify target/coverage target/debug/deps/ethcore*
xdg-open target/coverage/index.html
249 changes: 219 additions & 30 deletions util/src/network/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use hash::*;
use sha3::*;
use bytes::*;
use rlp::*;
use std::io::{self, Cursor, Read};
use std::io::{self, Cursor, Read, Write};
use error::*;
use io::{IoContext, StreamToken};
use network::error::NetworkError;
Expand All @@ -22,12 +22,17 @@ use tiny_keccak::Keccak;
const ENCRYPTED_HEADER_LEN: usize = 32;
const RECIEVE_PAYLOAD_TIMEOUT: u64 = 30000;

/// Low level tcp connection
pub struct Connection {
pub trait GenericSocket : Read + Write {
}

impl GenericSocket for TcpStream {
}

pub struct GenericConnection<Socket: GenericSocket> {
/// Connection id (token)
pub token: StreamToken,
/// Network socket
pub socket: TcpStream,
pub socket: Socket,
/// Receive buffer
rec_buf: Bytes,
/// Expected size
Expand All @@ -36,34 +41,11 @@ pub struct Connection {
send_queue: VecDeque<Cursor<Bytes>>,
/// Event flags this connection expects
interest: EventSet,
/// Shared network staistics
/// Shared network statistics
stats: Arc<NetworkStats>,
}

/// Connection write status.
#[derive(PartialEq, Eq)]
pub enum WriteStatus {
/// Some data is still pending for current packet
Ongoing,
/// All data sent.
Complete
}

impl Connection {
/// Create a new connection with given id and socket.
pub fn new(token: StreamToken, socket: TcpStream, stats: Arc<NetworkStats>) -> Connection {
Connection {
token: token,
socket: socket,
send_queue: VecDeque::new(),
rec_buf: Bytes::new(),
rec_size: 0,
interest: EventSet::hup() | EventSet::readable(),
stats: stats,
}
}

/// Put a connection into read mode. Receiving up `size` bytes of data.
impl<Socket: GenericSocket> GenericConnection<Socket> {
pub fn expect(&mut self, size: usize) {
if self.rec_size != self.rec_buf.len() {
warn!(target:"net", "Unexpected connection read start");
Expand All @@ -79,7 +61,7 @@ impl Connection {
}
let max = self.rec_size - self.rec_buf.len();
// resolve "multiple applicable items in scope [E0034]" error
let sock_ref = <TcpStream as Read>::by_ref(&mut self.socket);
let sock_ref = <Socket as Read>::by_ref(&mut self.socket);
match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) {
Ok(Some(size)) if size != 0 => {
self.stats.inc_recv(size);
Expand Down Expand Up @@ -142,6 +124,24 @@ impl Connection {
Ok(r)
})
}
}

/// Low level tcp connection
pub type Connection = GenericConnection<TcpStream>;

impl Connection {
/// Create a new connection with given id and socket.
pub fn new(token: StreamToken, socket: TcpStream, stats: Arc<NetworkStats>) -> Connection {
Connection {
token: token,
socket: socket,
send_queue: VecDeque::new(),
rec_buf: Bytes::new(),
rec_size: 0,
interest: EventSet::hup() | EventSet::readable(),
stats: stats,
}
}

/// Register this connection with the IO event loop.
pub fn register_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
Expand Down Expand Up @@ -169,6 +169,15 @@ impl Connection {
}
}

/// Connection write status.
#[derive(PartialEq, Eq)]
pub enum WriteStatus {
/// Some data is still pending for current packet
Ongoing,
/// All data sent.
Complete
}

/// RLPx packet
pub struct Packet {
pub protocol: u16,
Expand Down Expand Up @@ -424,4 +433,184 @@ pub fn test_encryption() {
assert_eq!(got, after2);
}

#[cfg(test)]
mod tests {
use super::*;
use std::sync::*;
use super::super::stats::*;
use std::io::{Read, Write, Error, Cursor, ErrorKind};
use std::cmp;
use mio::{EventSet};
use std::collections::VecDeque;
use bytes::*;

struct TestSocket {
read_buffer: Vec<u8>,
write_buffer: Vec<u8>,
cursor: usize,
buf_size: usize,
}

impl TestSocket {
fn new() -> TestSocket {
TestSocket {
read_buffer: vec![],
write_buffer: vec![],
cursor: 0,
buf_size: 0,
}
}

fn new_buf(buf_size: usize) -> TestSocket {
TestSocket {
read_buffer: vec![],
write_buffer: vec![],
cursor: 0,
buf_size: buf_size,
}
}
}

impl Read for TestSocket {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
let end_position = cmp::min(self.read_buffer.len(), self.cursor+buf.len());
let len = cmp::max(end_position - self.cursor, 0);
match len {
0 => Ok(0),
_ => {
for i in self.cursor..end_position {
buf[i-self.cursor] = self.read_buffer[i];
}
self.cursor = self.cursor + buf.len();
Ok(len)
}
}
}
}

impl Write for TestSocket {
fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
if self.buf_size == 0 || buf.len() < self.buf_size {
self.write_buffer.extend(buf.iter().cloned());
Ok(buf.len())
}
else {
self.write_buffer.extend(buf.iter().take(self.buf_size).cloned());
Ok(self.buf_size)
}
}

fn flush(&mut self) -> Result<(), Error> {
unimplemented!();
}
}

impl GenericSocket for TestSocket {}

struct TestBrokenSocket {
error: String
}

impl Read for TestBrokenSocket {
fn read(&mut self, _: &mut [u8]) -> Result<usize, Error> {
Err(Error::new(ErrorKind::Other, self.error.clone()))
}
}

impl Write for TestBrokenSocket {
fn write(&mut self, _: &[u8]) -> Result<usize, Error> {
Err(Error::new(ErrorKind::Other, self.error.clone()))
}

fn flush(&mut self) -> Result<(), Error> {
unimplemented!();
}
}

impl GenericSocket for TestBrokenSocket {}

type TestConnection = GenericConnection<TestSocket>;

impl TestConnection {
pub fn new() -> TestConnection {
TestConnection {
token: 999998888usize,
socket: TestSocket::new(),
send_queue: VecDeque::new(),
rec_buf: Bytes::new(),
rec_size: 0,
interest: EventSet::hup() | EventSet::readable(),
stats: Arc::<NetworkStats>::new(NetworkStats::new()),
}
}
}

type TestBrokenConnection = GenericConnection<TestBrokenSocket>;

impl TestBrokenConnection {
pub fn new() -> TestBrokenConnection {
TestBrokenConnection {
token: 999998888usize,
socket: TestBrokenSocket { error: "test broken socket".to_owned() },
send_queue: VecDeque::new(),
rec_buf: Bytes::new(),
rec_size: 0,
interest: EventSet::hup() | EventSet::readable(),
stats: Arc::<NetworkStats>::new(NetworkStats::new()),
}
}
}

#[test]
fn connection_expect() {
let mut connection = TestConnection::new();
connection.expect(1024);
assert_eq!(1024, connection.rec_size);
}

#[test]
fn connection_write_empty() {
let mut connection = TestConnection::new();
let status = connection.writable();
assert!(status.is_ok());
assert!(WriteStatus::Complete == status.unwrap());
}

#[test]
fn connection_write() {
let mut connection = TestConnection::new();
let data = Cursor::new(vec![0; 10240]);
connection.send_queue.push_back(data);

let status = connection.writable();
assert!(status.is_ok());
assert!(WriteStatus::Complete == status.unwrap());
assert_eq!(10240, connection.socket.write_buffer.len());
}

#[test]
fn connection_write_is_buffered() {
let mut connection = TestConnection::new();
connection.socket = TestSocket::new_buf(1024);
let data = Cursor::new(vec![0; 10240]);
connection.send_queue.push_back(data);

let status = connection.writable();

assert!(status.is_ok());
assert!(WriteStatus::Ongoing == status.unwrap());
assert_eq!(1024, connection.socket.write_buffer.len());
}

#[test]
fn connection_write_to_broken_socket() {
let mut connection = TestBrokenConnection::new();
let data = Cursor::new(vec![0; 10240]);
connection.send_queue.push_back(data);

let status = connection.writable();

assert!(!status.is_ok());
assert_eq!(1, connection.send_queue.len());
}
}
9 changes: 9 additions & 0 deletions util/src/network/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,13 @@ impl NetworkStats {
pub fn sessions(&self) -> usize {
self.sessions.load(Ordering::Relaxed)
}

#[cfg(test)]
pub fn new() -> NetworkStats {
NetworkStats {
recv: AtomicUsize::new(0),
send: AtomicUsize::new(0),
sessions: AtomicUsize::new(0),
}
}
}
6 changes: 3 additions & 3 deletions util/src/network/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ impl NetworkProtocolHandler<TestProtocolMessage> for TestProtocol {


#[test]
fn test_net_service() {
fn net_service() {
let mut service = NetworkService::<TestProtocolMessage>::start(NetworkConfiguration::new()).expect("Error creating network service");
service.register_protocol(Arc::new(TestProtocol::default()), "myproto", &[1u8]).unwrap();
}

#[test]
fn test_net_connect() {
fn net_connect() {
let key1 = KeyPair::create().unwrap();
let mut config1 = NetworkConfiguration::new_with_port(30344);
config1.use_secret = Some(key1.secret().clone());
Expand All @@ -93,7 +93,7 @@ fn test_net_connect() {
}

#[test]
fn test_net_timeout() {
fn net_timeout() {
let config = NetworkConfiguration::new_with_port(30346);
let mut service = NetworkService::<TestProtocolMessage>::start(config).unwrap();
let handler = TestProtocol::register(&mut service);
Expand Down