Skip to content

Commit

Permalink
Merge pull request libp2p#101 from mxinden/unit-tests
Browse files Browse the repository at this point in the history
src/tests: Write and read concurrently
  • Loading branch information
mxinden authored Feb 1, 2021
2 parents df476ef + 168d4f5 commit 2ba14dd
Showing 1 changed file with 33 additions and 17 deletions.
50 changes: 33 additions & 17 deletions src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use crate::{Config, Connection, ConnectionError, Mode, Control, connection::State};
use crate::WindowUpdateMode;
use futures::{future, prelude::*};
use futures::io::AsyncReadExt;
use quickcheck::{Arbitrary, Gen, QuickCheck, TestResult};
use rand::Rng;
use std::{fmt::Debug, io, net::{Ipv4Addr, SocketAddr, SocketAddrV4}};
Expand Down Expand Up @@ -46,7 +47,7 @@ fn prop_config_send_recv_single() {
TestResult::from_bool(result.len() == num_requests && result.into_iter().eq(iter))
})
}
QuickCheck::new().quickcheck(prop as fn(_, _, _) -> _)
QuickCheck::new().tests(10).quickcheck(prop as fn(_, _, _) -> _)
}

#[test]
Expand Down Expand Up @@ -78,7 +79,7 @@ fn prop_config_send_recv_multi() {
TestResult::from_bool(result.len() == num_requests && result.into_iter().eq(iter))
})
}
QuickCheck::new().quickcheck(prop as fn(_, _, _) -> _)
QuickCheck::new().tests(10).quickcheck(prop as fn(_, _, _) -> _)
}

#[test]
Expand Down Expand Up @@ -256,43 +257,58 @@ where
I: Iterator<Item = Vec<u8>>
{
let mut result = Vec::new();

for msg in iter {
let mut stream = control.open_stream().await?;
let stream = control.open_stream().await?;
log::debug!("C: new stream: {}", stream);
let id = stream.id();
let len = msg.len();
stream.write_all(&msg).await?;
log::debug!("C: {}: sent {} bytes", id, len);
stream.close().await?;
let (mut reader, mut writer) = AsyncReadExt::split(stream);
let write_fut = async {
writer.write_all(&msg).await.unwrap();
log::debug!("C: {}: sent {} bytes", id, len);
writer.close().await.unwrap();
};
let mut data = Vec::new();
stream.read_to_end(&mut data).await?;
log::debug!("C: {}: received {} bytes", id, data.len());
result.push(data)
let read_fut = async {
reader.read_to_end(&mut data).await.unwrap();
log::debug!("C: {}: received {} bytes", id, data.len());
};
futures::future::join(write_fut, read_fut).await;
result.push(data);
}

log::debug!("C: closing connection");
control.close().await?;
Ok(result)
}

/// Open a stream, send all messages and collect the responses.
/// Open a stream, send all messages and collect the responses. The
/// sequence of responses will be returned.
async fn send_recv_single<I>(mut control: Control, iter: I) -> Result<Vec<Vec<u8>>, ConnectionError>
where
I: Iterator<Item = Vec<u8>>
{
let mut stream = control.open_stream().await?;
let stream = control.open_stream().await?;
log::debug!("C: new stream: {}", stream);
let id = stream.id();
let (mut reader, mut writer) = AsyncReadExt::split(stream);
let mut result = Vec::new();
for msg in iter {
let id = stream.id();
let len = msg.len();
stream.write_all(&msg).await?;
log::debug!("C: {}: sent {} bytes", id, len);
let write_fut = async {
writer.write_all(&msg).await.unwrap();
log::debug!("C: {}: sent {} bytes", id, len);
};
let mut data = vec![0; msg.len()];
stream.read_exact(&mut data).await?;
log::debug!("C: {}: received {} bytes", id, data.len());
let read_fut = async {
reader.read_exact(&mut data).await.unwrap();
log::debug!("C: {}: received {} bytes", id, data.len());
};
futures::future::join(write_fut, read_fut).await;
result.push(data)
}
stream.close().await?;
writer.close().await?;
log::debug!("C: closing connection");
control.close().await?;
Ok(result)
Expand Down

0 comments on commit 2ba14dd

Please sign in to comment.