Skip to content

Commit

Permalink
feat: connection pooling (#404)
Browse files Browse the repository at this point in the history
  • Loading branch information
Arqu authored Oct 24, 2022
1 parent 4f86388 commit b499751
Show file tree
Hide file tree
Showing 11 changed files with 120 additions and 49 deletions.
2 changes: 1 addition & 1 deletion iroh-api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl Api for Iroh {

fn p2p(&self) -> Result<ClientP2p> {
let p2p_client = self.client.try_p2p()?;
Ok(ClientP2p::new(p2p_client.clone()))
Ok(ClientP2p::new(p2p_client))
}

fn get_stream(
Expand Down
1 change: 1 addition & 0 deletions iroh-gateway/src/bad_bits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ mod tests {
gateway_addr: None,
p2p_addr: None,
store_addr: None,
channels: Some(1),
},
);
config.set_default_headers();
Expand Down
2 changes: 2 additions & 0 deletions iroh-gateway/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ mod tests {
gateway_addr: None,
p2p_addr: None,
store_addr: None,
channels: Some(1),
},
);
config.set_default_headers();
Expand Down Expand Up @@ -199,6 +200,7 @@ mod tests {
gateway_addr: None,
p2p_addr: None,
store_addr: Some(store_client_addr),
channels: Some(1),
},
);
config.set_default_headers();
Expand Down
1 change: 1 addition & 0 deletions iroh-one/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl Config {
gateway_addr: None,
p2p_addr: None,
store_addr: None,
channels: Some(1),
}
}

Expand Down
4 changes: 2 additions & 2 deletions iroh-one/src/content_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ impl ContentLoader for RacingLoader {

let len = cloned.len();
let links_len = links.len();
if let Some(store_rpc) = rpc.store.as_ref() {
match store_rpc.put(cid, cloned, links).await {
if let Ok(store_rpc) = rpc.try_store() {
match store_rpc.clone().put(cid, cloned, links).await {
Ok(_) => debug!("stored {} ({}bytes, {}links)", cid, len, links_len),
Err(err) => {
warn!("failed to store {}: {:?}", cid, err);
Expand Down
3 changes: 2 additions & 1 deletion iroh-p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,7 @@ mod tests {

let cfg = iroh_rpc_client::Config {
p2p_addr: Some(rpc_client_addr),
channels: Some(1),
..Default::default()
};
let p2p_task = tokio::task::spawn(async move {
Expand All @@ -1048,7 +1049,7 @@ mod tests {
.unwrap();

let mut providers = Vec::new();
let mut chan = client.p2p.unwrap().fetch_providers_dht(&c).await?;
let mut chan = client.try_p2p().unwrap().fetch_providers_dht(&c).await?;
while let Some(new_providers) = chan.next().await {
let new_providers = new_providers.unwrap();
println!("providers found: {}", new_providers.len());
Expand Down
12 changes: 6 additions & 6 deletions iroh-resolver/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use libipld::{Ipld, IpldCodec};
use tokio::io::{AsyncRead, AsyncSeek};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::{debug, error, trace, warn};
use tracing::{debug, trace, warn};

use iroh_metrics::{
core::{MObserver, MRecorder},
Expand Down Expand Up @@ -797,8 +797,8 @@ impl ContentLoader for Client {

// trigger storage in the background
let clone = bytes.clone();
let store = self.store.as_ref().cloned();
let p2p = self.try_p2p()?.clone();
let store = self.try_store();
let p2p = self.try_p2p()?;

tokio::spawn(async move {
let clone2 = clone.clone();
Expand All @@ -809,7 +809,7 @@ impl ContentLoader for Client {

let len = clone.len();
let links_len = links.len();
if let Some(store_rpc) = store.as_ref() {
if let Ok(store_rpc) = store {
match store_rpc.put(cid, clone.clone(), links).await {
Ok(_) => {
debug!("stored {} ({}bytes, {}links)", cid, len, links_len);
Expand Down Expand Up @@ -851,11 +851,11 @@ impl<T: ContentLoader> Resolver<T> {
let loader = loader_thread.clone();

tokio::task::spawn(async move {
error!("stopping session {}", session);
debug!("stopping session {}", session);
if let Err(err) = loader.stop_session(session).await {
warn!("failed to stop session {}: {:?}", session, err);
}
error!("stopping session {} done", session);
debug!("stopping session {} done", session);
});
}
});
Expand Down
121 changes: 89 additions & 32 deletions iroh-rpc-client/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use anyhow::{anyhow, Context, Result};
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;

use anyhow::{Context, Result};
#[cfg(feature = "grpc")]
use futures::{Stream, StreamExt};

Expand All @@ -10,8 +13,62 @@ use crate::store::StoreClient;
#[derive(Debug, Clone)]
pub struct Client {
pub gateway: Option<GatewayClient>,
pub p2p: Option<P2pClient>,
pub store: Option<StoreClient>,
p2p: P2pLBClient,
store: StoreLBClient,
}

/// Provides a load balanced client for the store service
/// The client will round robin between all available StoreClients
#[derive(Debug, Clone)]
pub struct StoreLBClient {
clients: Vec<StoreClient>,
pos: Arc<AtomicUsize>,
}

impl StoreLBClient {
/// round robin load balancing
pub fn get(&self) -> Option<StoreClient> {
if self.clients.is_empty() {
return None;
}
let pos = self.pos.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let c = self.clients.get(pos % self.clients.len()).unwrap();
Some(c.clone())
}

pub fn new() -> Self {
Self {
clients: vec![],
pos: Arc::new(AtomicUsize::new(0)),
}
}
}

/// Provides a load balanced client for the p2p service
/// The client will round robin between all available P2pClients
#[derive(Debug, Clone)]
pub struct P2pLBClient {
clients: Vec<P2pClient>,
pos: Arc<AtomicUsize>,
}

impl P2pLBClient {
/// round robin load balancing
pub fn get(&self) -> Option<P2pClient> {
if self.clients.is_empty() {
return None;
}
let pos = self.pos.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let c = self.clients.get(pos % self.clients.len()).unwrap();
Some(c.clone())
}

pub fn new() -> Self {
Self {
clients: vec![],
pos: Arc::new(AtomicUsize::new(0)),
}
}
}

impl Client {
Expand All @@ -20,6 +77,7 @@ impl Client {
gateway_addr,
p2p_addr,
store_addr,
channels,
} = cfg;

let gateway = if let Some(addr) = gateway_addr {
Expand All @@ -32,24 +90,27 @@ impl Client {
None
};

let p2p = if let Some(addr) = p2p_addr {
Some(
P2pClient::new(addr)
let n_channels = channels.unwrap_or(1);

let mut p2p = P2pLBClient::new();
if let Some(addr) = p2p_addr {
for _i in 0..n_channels {
let sc = P2pClient::new(addr.clone())
.await
.context("Could not create p2p rpc client")?,
)
} else {
None
};
let store = if let Some(addr) = store_addr {
Some(
StoreClient::new(addr)
.context("Could not create store rpc client")?;
p2p.clients.push(sc);
}
}

let mut store = StoreLBClient::new();
if let Some(addr) = store_addr {
for _i in 0..n_channels {
let sc = StoreClient::new(addr.clone())
.await
.context("Could not create store rpc client")?,
)
} else {
None
};
.context("Could not create store rpc client")?;
store.clients.push(sc);
}
}

Ok(Client {
gateway,
Expand All @@ -58,22 +119,18 @@ impl Client {
})
}

pub fn try_p2p(&self) -> Result<&P2pClient> {
self.p2p
.as_ref()
.ok_or_else(|| anyhow!("missing rpc p2p connnection"))
pub fn try_p2p(&self) -> Result<P2pClient> {
self.p2p.get().context("missing rpc p2p connnection")
}

pub fn try_gateway(&self) -> Result<&GatewayClient> {
self.gateway
.as_ref()
.ok_or_else(|| anyhow!("missing rpc gateway connnection"))
.context("missing rpc gateway connnection")
}

pub fn try_store(&self) -> Result<&StoreClient> {
self.store
.as_ref()
.ok_or_else(|| anyhow!("missing rpc store connnection"))
pub fn try_store(&self) -> Result<StoreClient> {
self.store.get().context("missing rpc store connnection")
}

#[cfg(feature = "grpc")]
Expand All @@ -83,12 +140,12 @@ impl Client {
} else {
None
};
let p = if let Some(ref p) = self.p2p {
let p = if let Some(ref p) = self.p2p.get() {
Some(p.check().await)
} else {
None
};
let s = if let Some(ref s) = self.store {
let s = if let Some(ref s) = self.store.get() {
Some(s.check().await)
} else {
None
Expand All @@ -106,11 +163,11 @@ impl Client {
let g = g.watch().await;
streams.push(g.boxed());
}
if let Some(ref p) = self.p2p {
if let Some(ref p) = self.p2p.get() {
let p = p.watch().await;
streams.push(p.boxed());
}
if let Some(ref s) = self.store {
if let Some(ref s) = self.store.get() {
let s = s.watch().await;
streams.push(s.boxed());
}
Expand Down
10 changes: 10 additions & 0 deletions iroh-rpc-client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ pub struct Config {
pub p2p_addr: Option<P2pClientAddr>,
// store rpc address
pub store_addr: Option<StoreClientAddr>,
// number of concurent channels
pub channels: Option<usize>,
}

impl Source for Config {
Expand All @@ -30,6 +32,9 @@ impl Source for Config {
if let Some(addr) = &self.store_addr {
insert_into_config_map(&mut map, "store_addr", addr.to_string());
}
if let Some(channels) = &self.channels {
insert_into_config_map(&mut map, "channels", channels.to_string());
}
Ok(map)
}
}
Expand All @@ -40,6 +45,7 @@ impl Config {
gateway_addr: Some("grpc://0.0.0.0:4400".parse().unwrap()),
p2p_addr: Some("grpc://0.0.0.0:4401".parse().unwrap()),
store_addr: Some("grpc://0.0.0.0:4402".parse().unwrap()),
channels: Some(16),
}
}
}
Expand All @@ -65,6 +71,10 @@ mod tests {
"store_addr".to_string(),
Value::new(None, default.store_addr.unwrap().to_string()),
);
expect.insert(
"channels".to_string(),
Value::new(None, default.channels.unwrap().to_string()),
);
let got = Config::default().collect().unwrap();
for key in got.keys() {
let left = expect.get(key).unwrap();
Expand Down
4 changes: 3 additions & 1 deletion iroh-share/src/p2p_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,13 @@ impl P2pNode {
p2p_addr: Some(rpc_p2p_addr_client.clone()),
store_addr: Some(rpc_store_addr_client.clone()),
gateway_addr: None,
channels: Some(1),
};
let rpc_p2p_client_config = iroh_rpc_client::Config {
p2p_addr: Some(rpc_p2p_addr_client.clone()),
store_addr: Some(rpc_store_addr_client.clone()),
gateway_addr: None,
channels: Some(1),
};
let config = config::Config {
libp2p: config::Libp2pConfig {
Expand Down Expand Up @@ -215,7 +217,7 @@ impl P2pNode {
}

pub async fn close(self) -> Result<()> {
self.rpc.p2p.unwrap().shutdown().await?;
self.rpc.try_p2p().unwrap().shutdown().await?;
self.store_task.abort();
self.p2p_task.await?;
self.store_task.await.ok();
Expand Down
9 changes: 3 additions & 6 deletions iroh-store/benches/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ pub fn put_benchmark(c: &mut Criterion) {
let rpc_ref = &rpc;
b.to_async(&executor).iter(|| async move {
rpc_ref
.store
.as_ref()
.try_store()
.unwrap()
.put(*key, black_box(value.clone()), vec![])
.await
Expand Down Expand Up @@ -148,8 +147,7 @@ pub fn get_benchmark(c: &mut Criterion) {
let key = cid::Cid::new_v1(RAW, hash);
keys.push(key);
rpc_ref
.store
.as_ref()
.try_store()
.unwrap()
.put(key, value.clone(), vec![])
.await
Expand All @@ -166,8 +164,7 @@ pub fn get_benchmark(c: &mut Criterion) {
for i in 0..iters {
let key = keys_ref[(i as usize) % l];
let res = rpc_ref
.store
.as_ref()
.try_store()
.unwrap()
.get(key)
.await
Expand Down

0 comments on commit b499751

Please sign in to comment.