Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Merge pull request #329 from ethcore/nvolf
Browse files Browse the repository at this point in the history
Network mod tests first part
  • Loading branch information
debris committed Feb 3, 2016
2 parents d467ac7 + e154e3e commit 9cd29b7
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 34 deletions.
2 changes: 1 addition & 1 deletion cov.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ if ! type kcov > /dev/null; then
exit 1
fi

cargo test -p ethcore --no-run || exit $?
cargo test --features ethcore/json-tests -p ethcore --no-run || exit $?
mkdir -p target/coverage
kcov --exclude-pattern ~/.multirust,rocksdb,secp256k1 --include-pattern src --verify target/coverage target/debug/deps/ethcore*
xdg-open target/coverage/index.html
249 changes: 219 additions & 30 deletions util/src/network/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use hash::*;
use sha3::*;
use bytes::*;
use rlp::*;
use std::io::{self, Cursor, Read};
use std::io::{self, Cursor, Read, Write};
use error::*;
use io::{IoContext, StreamToken};
use network::error::NetworkError;
Expand All @@ -22,12 +22,17 @@ use tiny_keccak::Keccak;
const ENCRYPTED_HEADER_LEN: usize = 32;
const RECIEVE_PAYLOAD_TIMEOUT: u64 = 30000;

/// Low level tcp connection
pub struct Connection {
pub trait GenericSocket : Read + Write {
}

impl GenericSocket for TcpStream {
}

pub struct GenericConnection<Socket: GenericSocket> {
/// Connection id (token)
pub token: StreamToken,
/// Network socket
pub socket: TcpStream,
pub socket: Socket,
/// Receive buffer
rec_buf: Bytes,
/// Expected size
Expand All @@ -36,34 +41,11 @@ pub struct Connection {
send_queue: VecDeque<Cursor<Bytes>>,
/// Event flags this connection expects
interest: EventSet,
/// Shared network staistics
/// Shared network statistics
stats: Arc<NetworkStats>,
}

/// Connection write status.
#[derive(PartialEq, Eq)]
pub enum WriteStatus {
/// Some data is still pending for current packet
Ongoing,
/// All data sent.
Complete
}

impl Connection {
/// Create a new connection with given id and socket.
pub fn new(token: StreamToken, socket: TcpStream, stats: Arc<NetworkStats>) -> Connection {
Connection {
token: token,
socket: socket,
send_queue: VecDeque::new(),
rec_buf: Bytes::new(),
rec_size: 0,
interest: EventSet::hup() | EventSet::readable(),
stats: stats,
}
}

/// Put a connection into read mode. Receiving up `size` bytes of data.
impl<Socket: GenericSocket> GenericConnection<Socket> {
pub fn expect(&mut self, size: usize) {
if self.rec_size != self.rec_buf.len() {
warn!(target:"net", "Unexpected connection read start");
Expand All @@ -79,7 +61,7 @@ impl Connection {
}
let max = self.rec_size - self.rec_buf.len();
// resolve "multiple applicable items in scope [E0034]" error
let sock_ref = <TcpStream as Read>::by_ref(&mut self.socket);
let sock_ref = <Socket as Read>::by_ref(&mut self.socket);
match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) {
Ok(Some(size)) if size != 0 => {
self.stats.inc_recv(size);
Expand Down Expand Up @@ -142,6 +124,24 @@ impl Connection {
Ok(r)
})
}
}

/// Low level tcp connection
pub type Connection = GenericConnection<TcpStream>;

impl Connection {
/// Create a new connection with given id and socket.
pub fn new(token: StreamToken, socket: TcpStream, stats: Arc<NetworkStats>) -> Connection {
Connection {
token: token,
socket: socket,
send_queue: VecDeque::new(),
rec_buf: Bytes::new(),
rec_size: 0,
interest: EventSet::hup() | EventSet::readable(),
stats: stats,
}
}

/// Register this connection with the IO event loop.
pub fn register_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
Expand Down Expand Up @@ -169,6 +169,15 @@ impl Connection {
}
}

/// Connection write status.
#[derive(PartialEq, Eq)]
pub enum WriteStatus {
/// Some data is still pending for current packet
Ongoing,
/// All data sent.
Complete
}

/// RLPx packet
pub struct Packet {
pub protocol: u16,
Expand Down Expand Up @@ -424,4 +433,184 @@ pub fn test_encryption() {
assert_eq!(got, after2);
}

#[cfg(test)]
mod tests {
use super::*;
use std::sync::*;
use super::super::stats::*;
use std::io::{Read, Write, Error, Cursor, ErrorKind};
use std::cmp;
use mio::{EventSet};
use std::collections::VecDeque;
use bytes::*;

struct TestSocket {
read_buffer: Vec<u8>,
write_buffer: Vec<u8>,
cursor: usize,
buf_size: usize,
}

impl TestSocket {
fn new() -> TestSocket {
TestSocket {
read_buffer: vec![],
write_buffer: vec![],
cursor: 0,
buf_size: 0,
}
}

fn new_buf(buf_size: usize) -> TestSocket {
TestSocket {
read_buffer: vec![],
write_buffer: vec![],
cursor: 0,
buf_size: buf_size,
}
}
}

impl Read for TestSocket {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
let end_position = cmp::min(self.read_buffer.len(), self.cursor+buf.len());
let len = cmp::max(end_position - self.cursor, 0);
match len {
0 => Ok(0),
_ => {
for i in self.cursor..end_position {
buf[i-self.cursor] = self.read_buffer[i];
}
self.cursor = self.cursor + buf.len();
Ok(len)
}
}
}
}

impl Write for TestSocket {
fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
if self.buf_size == 0 || buf.len() < self.buf_size {
self.write_buffer.extend(buf.iter().cloned());
Ok(buf.len())
}
else {
self.write_buffer.extend(buf.iter().take(self.buf_size).cloned());
Ok(self.buf_size)
}
}

fn flush(&mut self) -> Result<(), Error> {
unimplemented!();
}
}

impl GenericSocket for TestSocket {}

struct TestBrokenSocket {
error: String
}

impl Read for TestBrokenSocket {
fn read(&mut self, _: &mut [u8]) -> Result<usize, Error> {
Err(Error::new(ErrorKind::Other, self.error.clone()))
}
}

impl Write for TestBrokenSocket {
fn write(&mut self, _: &[u8]) -> Result<usize, Error> {
Err(Error::new(ErrorKind::Other, self.error.clone()))
}

fn flush(&mut self) -> Result<(), Error> {
unimplemented!();
}
}

impl GenericSocket for TestBrokenSocket {}

type TestConnection = GenericConnection<TestSocket>;

impl TestConnection {
pub fn new() -> TestConnection {
TestConnection {
token: 999998888usize,
socket: TestSocket::new(),
send_queue: VecDeque::new(),
rec_buf: Bytes::new(),
rec_size: 0,
interest: EventSet::hup() | EventSet::readable(),
stats: Arc::<NetworkStats>::new(NetworkStats::new()),
}
}
}

type TestBrokenConnection = GenericConnection<TestBrokenSocket>;

impl TestBrokenConnection {
pub fn new() -> TestBrokenConnection {
TestBrokenConnection {
token: 999998888usize,
socket: TestBrokenSocket { error: "test broken socket".to_owned() },
send_queue: VecDeque::new(),
rec_buf: Bytes::new(),
rec_size: 0,
interest: EventSet::hup() | EventSet::readable(),
stats: Arc::<NetworkStats>::new(NetworkStats::new()),
}
}
}

#[test]
fn connection_expect() {
let mut connection = TestConnection::new();
connection.expect(1024);
assert_eq!(1024, connection.rec_size);
}

#[test]
fn connection_write_empty() {
let mut connection = TestConnection::new();
let status = connection.writable();
assert!(status.is_ok());
assert!(WriteStatus::Complete == status.unwrap());
}

#[test]
fn connection_write() {
let mut connection = TestConnection::new();
let data = Cursor::new(vec![0; 10240]);
connection.send_queue.push_back(data);

let status = connection.writable();
assert!(status.is_ok());
assert!(WriteStatus::Complete == status.unwrap());
assert_eq!(10240, connection.socket.write_buffer.len());
}

#[test]
fn connection_write_is_buffered() {
let mut connection = TestConnection::new();
connection.socket = TestSocket::new_buf(1024);
let data = Cursor::new(vec![0; 10240]);
connection.send_queue.push_back(data);

let status = connection.writable();

assert!(status.is_ok());
assert!(WriteStatus::Ongoing == status.unwrap());
assert_eq!(1024, connection.socket.write_buffer.len());
}

#[test]
fn connection_write_to_broken_socket() {
let mut connection = TestBrokenConnection::new();
let data = Cursor::new(vec![0; 10240]);
connection.send_queue.push_back(data);

let status = connection.writable();

assert!(!status.is_ok());
assert_eq!(1, connection.send_queue.len());
}
}
9 changes: 9 additions & 0 deletions util/src/network/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,13 @@ impl NetworkStats {
pub fn sessions(&self) -> usize {
self.sessions.load(Ordering::Relaxed)
}

#[cfg(test)]
pub fn new() -> NetworkStats {
NetworkStats {
recv: AtomicUsize::new(0),
send: AtomicUsize::new(0),
sessions: AtomicUsize::new(0),
}
}
}
6 changes: 3 additions & 3 deletions util/src/network/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ impl NetworkProtocolHandler<TestProtocolMessage> for TestProtocol {


#[test]
fn test_net_service() {
fn net_service() {
let mut service = NetworkService::<TestProtocolMessage>::start(NetworkConfiguration::new()).expect("Error creating network service");
service.register_protocol(Arc::new(TestProtocol::default()), "myproto", &[1u8]).unwrap();
}

#[test]
fn test_net_connect() {
fn net_connect() {
let key1 = KeyPair::create().unwrap();
let mut config1 = NetworkConfiguration::new_with_port(30344);
config1.use_secret = Some(key1.secret().clone());
Expand All @@ -93,7 +93,7 @@ fn test_net_connect() {
}

#[test]
fn test_net_timeout() {
fn net_timeout() {
let config = NetworkConfiguration::new_with_port(30346);
let mut service = NetworkService::<TestProtocolMessage>::start(config).unwrap();
let handler = TestProtocol::register(&mut service);
Expand Down

0 comments on commit 9cd29b7

Please sign in to comment.