Skip to content

Commit

Permalink
Merge pull request libp2p#102 from mxinden/bench-constrained-connection
Browse files Browse the repository at this point in the history
benches: Benchmark with constrained connections
  • Loading branch information
mxinden authored Feb 3, 2021
2 parents 2ba14dd + 9e4297c commit dada223
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 141 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ futures = "0.3.4"
quickcheck = "0.9"
tokio = { version = "0.2", features = ["tcp", "rt-threaded", "macros"] }
tokio-util = { version = "0.3", features = ["compat"] }
constrained-connection = "0.1"

[[bench]]
name = "concurrent"
harness = false

209 changes: 69 additions & 140 deletions benches/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,16 @@
// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license
// at https://opensource.org/licenses/MIT.

use criterion::{criterion_group, criterion_main, Criterion};
use futures::{channel::mpsc, future, prelude::*, ready};
use std::{fmt, io, pin::Pin, sync::Arc, task::{Context, Poll}};
use constrained_connection::{Endpoint, new_unconstrained_connection, samples};
use criterion::{BenchmarkId, criterion_group, criterion_main, Criterion, Throughput};
use futures::{channel::mpsc, future, prelude::*, io::AsyncReadExt};
use std::sync::Arc;
use tokio::{runtime::Runtime, task};
use yamux::{Config, Connection, Mode};

criterion_group!(benches, concurrent);
criterion_main!(benches);

#[derive(Copy, Clone)]
struct Params { streams: usize, messages: usize }

impl fmt::Debug for Params {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "((streams {}) (messages {}))", self.streams, self.messages)
}
}

#[derive(Debug, Clone)]
struct Bytes(Arc<Vec<u8>>);

Expand All @@ -36,157 +28,94 @@ impl AsRef<[u8]> for Bytes {
}

fn concurrent(c: &mut Criterion) {
let params = &[
Params { streams: 1, messages: 1 },
Params { streams: 10, messages: 1 },
Params { streams: 1, messages: 10 },
Params { streams: 100, messages: 1 },
Params { streams: 1, messages: 100 },
Params { streams: 10, messages: 100 },
Params { streams: 100, messages: 10 }
let data = Bytes(Arc::new(vec![0x42; 4096]));
let networks = vec![
("mobile", (|| samples::mobile_hsdpa().2) as fn() -> (_, _)),
("adsl2+", (|| samples::residential_adsl2().2) as fn() -> (_, _)),
("gbit-lan", (|| samples::gbit_lan().2) as fn() -> (_, _)),
("unconstrained", new_unconstrained_connection as fn() -> (_, _)),
];

let data0 = Bytes(Arc::new(vec![0x42; 4096]));
let data1 = data0.clone();
let data2 = data0.clone();

c.bench_function_over_inputs("one by one", move |b, &&params| {
let data = data1.clone();
let mut rt = Runtime::new().unwrap();
b.iter(move || {
rt.block_on(roundtrip(params.streams, params.messages, data.clone(), false))
})
},
params);

c.bench_function_over_inputs("all at once", move |b, &&params| {
let data = data2.clone();
let mut rt = Runtime::new().unwrap();
b.iter(move || {
rt.block_on(roundtrip(params.streams, params.messages, data.clone(), true))
})
},
params);
let mut group = c.benchmark_group("concurrent");
group.sample_size(10);

for (network_name, new_connection) in networks.into_iter() {
for nstreams in [1, 10, 100].iter() {
for nmessages in [1, 10, 100].iter() {
let data = data.clone();
let mut rt = Runtime::new().unwrap();

group.throughput(Throughput::Bytes((nstreams * nmessages * data.0.len()) as u64));
group.bench_function(
BenchmarkId::from_parameter(
format!("{}/#streams{}/#messages{}", network_name, nstreams, nmessages),
),
|b| b.iter(|| {
let (server, client) = new_connection();
rt.block_on(oneway(*nstreams, *nmessages, data.clone(), server, client))
}),
);
}
}
}

group.finish();
}

async fn roundtrip(nstreams: usize, nmessages: usize, data: Bytes, send_all: bool) {
async fn oneway(
nstreams: usize,
nmessages: usize,
data: Bytes,
server: Endpoint,
client: Endpoint,
) {
let msg_len = data.0.len();
let (server, client) = Endpoint::new();
let server = server.into_async_read();
let client = client.into_async_read();
let (tx, rx) = mpsc::unbounded();

let server = async move {
yamux::into_stream(Connection::new(server, Config::default(), Mode::Server))
.try_for_each_concurrent(None, |mut stream| async move {
{
let (mut r, mut w) = futures::io::AsyncReadExt::split(&mut stream);
futures::io::copy(&mut r, &mut w).await?;
let mut connection = Connection::new(server, Config::default(), Mode::Server);

while let Some(mut stream) = connection.next_stream().await.unwrap() {
let tx = tx.clone();

task::spawn(async move {
let mut n = 0;
let mut b = vec![0; msg_len];

// Receive `nmessages` messages.
for _ in 0 .. nmessages {
stream.read_exact(&mut b[..]).await.unwrap();
n += b.len()
}
stream.close().await?;
Ok(())
})
.await
.expect("server works")
};

tx.unbounded_send(n).expect("unbounded_send");
stream.close().await.unwrap();
});
}
};
task::spawn(server);

let (tx, rx) = mpsc::unbounded();
let conn = Connection::new(client, Config::default(), Mode::Client);
let mut ctrl = conn.control();
task::spawn(yamux::into_stream(conn).for_each(|_| future::ready(())));
task::spawn(yamux::into_stream(conn).for_each(|r| {r.unwrap(); future::ready(())} ));

for _ in 0 .. nstreams {
let data = data.clone();
let tx = tx.clone();
let mut ctrl = ctrl.clone();
task::spawn(async move {
let mut stream = ctrl.open_stream().await?;
if send_all {
// Send `nmessages` messages and receive `nmessages` messages.
for _ in 0 .. nmessages {
stream.write_all(data.as_ref()).await?
}
stream.close().await?;
let mut n = 0;
let mut b = vec![0; data.0.len()];
loop {
let k = stream.read(&mut b).await?;
if k == 0 { break }
n += k
}
tx.unbounded_send(n).expect("unbounded_send")
} else {
// Send and receive `nmessages` messages.
let mut n = 0;
let mut b = vec![0; data.0.len()];
for _ in 0 .. nmessages {
stream.write_all(data.as_ref()).await?;
stream.read_exact(&mut b[..]).await?;
n += b.len()
}
stream.close().await?;
tx.unbounded_send(n).expect("unbounded_send");
let mut stream = ctrl.open_stream().await.unwrap();

// Send `nmessages` messages.
for _ in 0 .. nmessages {
stream.write_all(data.as_ref()).await.unwrap();
}

stream.close().await.unwrap();
Ok::<(), yamux::ConnectionError>(())
});
}

let n = rx.take(nstreams).fold(0, |acc, n| future::ready(acc + n)).await;
assert_eq!(n, nstreams * nmessages * msg_len);
ctrl.close().await.expect("close")
}

#[derive(Debug)]
struct Endpoint {
incoming: mpsc::UnboundedReceiver<Vec<u8>>,
outgoing: mpsc::UnboundedSender<Vec<u8>>
}

impl Endpoint {
fn new() -> (Self, Self) {
let (tx_a, rx_a) = mpsc::unbounded();
let (tx_b, rx_b) = mpsc::unbounded();

let a = Endpoint { incoming: rx_a, outgoing: tx_b };
let b = Endpoint { incoming: rx_b, outgoing: tx_a };

(a, b)
}
}

impl Stream for Endpoint {
type Item = Result<Vec<u8>, io::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if let Some(b) = ready!(Pin::new(&mut self.incoming).poll_next(cx)) {
return Poll::Ready(Some(Ok(b)))
}
Poll::Pending
}
}

impl AsyncWrite for Endpoint {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
if ready!(Pin::new(&mut self.outgoing).poll_ready(cx)).is_err() {
return Poll::Ready(Err(io::ErrorKind::ConnectionAborted.into()))
}
let n = buf.len();
if Pin::new(&mut self.outgoing).start_send(Vec::from(buf)).is_err() {
return Poll::Ready(Err(io::ErrorKind::ConnectionAborted.into()))
}
Poll::Ready(Ok(n))
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.outgoing)
.poll_flush(cx)
.map_err(|_| io::ErrorKind::ConnectionAborted.into())
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.outgoing)
.poll_close(cx)
.map_err(|_| io::ErrorKind::ConnectionAborted.into())
}
ctrl.close().await.expect("close");
}

0 comments on commit dada223

Please sign in to comment.