Skip to content

Commit

Permalink
fix: require passing providers to fetch_bitswap
Browse files Browse the repository at this point in the history
Ref #73
  • Loading branch information
dignifiedquire committed May 24, 2022
1 parent 8f491d5 commit ebd1c8c
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 84 deletions.
40 changes: 1 addition & 39 deletions iroh-gateway/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use axum::body::Body;
use cid::Cid;
use tracing::{error, info};
use tracing::info;

use crate::core::State;
use crate::response::ResponseFormat;
Expand Down Expand Up @@ -58,44 +58,6 @@ impl Client {
state.metrics.bytes_streamed.inc_by(n);
Ok(res.into())
}

#[tracing::instrument()]
pub async fn get_file_by_cid(
&self,
c: Cid,
rpc_client: &iroh_rpc_client::Client,
start_time: std::time::Instant,
) -> Result<Body, String> {
info!("get file {}", c);
let (mut sender, body) = Body::channel();
let rpc_client = rpc_client.clone();
tokio::spawn(async move {
// fetch some providers
// TODO
// let providers = match rpc_client.network.fetch_provider(c.to_bytes().into()).await {
// Ok(providers) => Some(providers),
// Err(e) => {
// error!("failed to fetch providers {:?}", e);
// None
// }
// };
let providers = None;

match rpc_client.p2p.fetch_bitswap(c, providers).await {
Ok(res) => {
if let Err(e) = sender.send_data(res).await {
error!("failed to send data: {:?}", e);
}
}
Err(e) => {
error!("{:?}", e);
sender.abort();
}
}
});

Ok(body)
}
}

#[derive(Debug, Clone)]
Expand Down
46 changes: 13 additions & 33 deletions iroh-p2p/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,40 +40,20 @@ impl p2p_server::P2p for P2p {
trace!("received BitswapRequest: {:?}", cid);
let providers = req
.providers
.map(|providers| {
providers
.providers
.into_iter()
.map(|p| {
PeerId::from_bytes(&p).map_err(|e| {
Status::invalid_argument(format!("invalid provider: {:?}", e))
})
})
.collect::<Result<_, Status>>()
.ok_or_else(|| Status::invalid_argument("missing providers"))?;

let providers: HashSet<PeerId> = providers
.providers
.into_iter()
.map(|p| {
PeerId::from_bytes(&p)
.map_err(|e| Status::invalid_argument(format!("invalid provider: {:?}", e)))
})
.transpose()?;

let providers = match providers {
Some(p) => p,
None => {
trace!("looking for providers for {:?}", cid);
let (s, r) = oneshot::channel();
self.sender
.send(RpcMessage::ProviderRequest {
key: cid.to_bytes().into(),
response_channel: s,
})
.await
.unwrap();

let p = r
.await
.expect("sender dropped")
.map_err(|e| Status::internal(format!("failed to get providers: {:?}", e)))?;
trace!("found providers: {:?}", providers);
p
}
};
.collect::<Result<_, Status>>()?;

if providers.is_empty() {
return Err(Status::invalid_argument("missing providers"));
}

let (s, r) = oneshot::channel();
let msg = RpcMessage::BitswapRequest {
Expand Down
2 changes: 1 addition & 1 deletion iroh-resolver/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ impl Resolver {
}

let providers = self.rpc.p2p.fetch_providers(&cid).await?;
let bytes = self.rpc.p2p.fetch_bitswap(cid, Some(providers)).await?;
let bytes = self.rpc.p2p.fetch_bitswap(cid, providers).await?;

// TODO: is this the right place?
// verify cid
Expand Down
15 changes: 5 additions & 10 deletions iroh-rpc-client/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,15 @@ impl P2pClient {

// Fetches a block directly from the network.
#[tracing::instrument(skip(self))]
pub async fn fetch_bitswap(
&self,
cid: Cid,
providers: Option<HashSet<PeerId>>,
) -> Result<Bytes> {
pub async fn fetch_bitswap(&self, cid: Cid, providers: HashSet<PeerId>) -> Result<Bytes> {
debug!("rpc p2p client fetch_bitswap: {:?}", cid);
let providers = providers.map(|p| {
let list = p.into_iter().map(|id| id.to_bytes()).collect::<Vec<_>>();
Providers { providers: list }
});
let providers = Providers {
providers: providers.into_iter().map(|id| id.to_bytes()).collect(),
};

let req = iroh_metrics::req::trace_tonic_req(BitswapRequest {
cid: cid.to_bytes(),
providers,
providers: Some(providers),
});
let res = self.0.clone().fetch_bitswap(req).await?;
Ok(res.into_inner().data)
Expand Down
2 changes: 1 addition & 1 deletion iroh-rpc-types/proto/p2p.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ message Empty {}
message BitswapRequest {
// Serialized CID of the requested block.
bytes cid = 1;
optional Providers providers = 2;
Providers providers = 2;
}

message BitswapResponse {
Expand Down

0 comments on commit ebd1c8c

Please sign in to comment.