Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

T1. Fix isolated connection bugs, improve tests, upgrade dependencies #3302

Merged
merged 16 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
334 changes: 184 additions & 150 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 1 addition & 3 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ skip-tree = [
# ticket #2998: hdrhistogram dependencies
{ name = "hdrhistogram", version = "=6.3.4" },

# ticket #2999: http dependencies
{ name = "bytes", version = "=0.5.6" },

# ticket #3061: reqwest and minreq dependencies
{ name = "webpki-roots", version = "=0.18.0" },

Expand All @@ -81,6 +78,7 @@ skip-tree = [

# wait for lots of crates in the tokio ecosystem to upgrade
{ name = "redox_syscall", version = "=0.1.57" },
{ name = "rustls", version = "=0.19.1" },
{ name = "socket2", version = "=0.3.16" },
]

Expand Down
159 changes: 52 additions & 107 deletions zebra-network/src/isolated.rs
Original file line number Diff line number Diff line change
@@ -1,62 +1,66 @@
//! Code for creating isolated connections to specific peers.
//! Creating isolated connections to specific peers.

use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use std::{future::Future, net::SocketAddr};

use futures::future::{FutureExt, TryFutureExt};
use tokio::net::TcpStream;
use futures::future::TryFutureExt;
use tokio::io::{AsyncRead, AsyncWrite};
use tower::{
util::{BoxService, Oneshot},
Service,
ServiceExt,
};

use zebra_chain::chain_tip::NoChainTip;
use zebra_chain::{chain_tip::NoChainTip, parameters::Network};

use crate::{
peer::{self, ConnectedAddr, HandshakeRequest},
peer_set::ActiveConnectionCounter,
BoxError, Config, Request, Response,
};

/// Use the provided TCP connection to create a Zcash connection completely
/// isolated from all other node state.
#[cfg(test)]
mod tests;

/// Creates a Zcash peer connection using the provided data stream.
/// This connection is completely isolated from all other node state.
///
/// The connection pool returned by `init` should be used for all requests that
/// The connection pool returned by [`init`](zebra_network::init)
/// should be used for all requests that
/// don't require isolated state or use of an existing TCP connection. However,
/// this low-level API is useful for custom network crawlers or Tor connections.
///
/// In addition to being completely isolated from all other node state, this
/// method also aims to be minimally distinguishable from other clients.
///
/// SECURITY TODO: check if the timestamp field can be zeroed, to remove another distinguisher (#3300)
///
/// Note that this method does not implement any timeout behavior, so callers may
/// want to layer it with a timeout as appropriate for their application.
///
/// # Inputs
///
/// - `conn`: an existing TCP connection to use. Passing an existing TCP
/// connection allows this method to be used with clearnet or Tor transports.
/// - `network`: the Zcash [`Network`] used for this connection.
///
/// - `user_agent`: a valid BIP14 user-agent, e.g., the empty string.
///
/// # Bug
/// - `data_stream`: an existing data stream. This can be a non-anonymised TCP connection,
/// or a Tor client [`DataStream`].
///
/// `connect_isolated` only works on `Mainnet`, see #1687.
pub fn connect_isolated(
conn: TcpStream,
/// - `user_agent`: a valid BIP14 user-agent, e.g., the empty string.
pub fn connect_isolated<AsyncReadWrite>(
network: Network,
data_stream: AsyncReadWrite,
user_agent: String,
) -> impl Future<
Output = Result<
BoxService<Request, Response, Box<dyn std::error::Error + Send + Sync + 'static>>,
Box<dyn std::error::Error + Send + Sync + 'static>,
>,
> {
) -> impl Future<Output = Result<BoxService<Request, Response, BoxError>, BoxError>>
where
AsyncReadWrite: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let config = Config {
network,
..Config::default()
};

let handshake = peer::Handshake::builder()
.with_config(Config::default())
.with_config(config)
.with_inbound_service(tower::service_fn(|_req| async move {
Ok::<Response, Box<dyn std::error::Error + Send + Sync + 'static>>(Response::Nil)
Ok::<Response, BoxError>(Response::Nil)
}))
.with_user_agent(user_agent)
.with_latest_chain_tip(NoChainTip)
Expand All @@ -70,88 +74,29 @@ pub fn connect_isolated(
Oneshot::new(
handshake,
HandshakeRequest {
tcp_stream: conn,
data_stream,
connected_addr,
connection_tracker,
},
)
.map_ok(|client| BoxService::new(Wrapper(client)))
.map_ok(|client| BoxService::new(client.map_err(Into::into)))
}

// This can be deleted when a new version of Tower with map_err is released.
struct Wrapper(peer::Client);

impl Service<Request> for Wrapper {
type Response = Response;
type Error = BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, req: Request) -> Self::Future {
self.0.call(req).map_err(Into::into).boxed()
}
}

#[cfg(test)]
mod tests {

use super::*;

#[tokio::test]
async fn connect_isolated_sends_minimally_distinguished_version_message() {
use std::net::SocketAddr;

use futures::stream::StreamExt;
use tokio_util::codec::Framed;

use crate::{
protocol::external::{AddrInVersion, Codec, Message},
types::PeerServices,
};

let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let listen_addr = listener.local_addr().unwrap();

let fixed_isolated_addr: SocketAddr = "0.0.0.0:8233".parse().unwrap();

let conn = tokio::net::TcpStream::connect(listen_addr).await.unwrap();

tokio::spawn(connect_isolated(conn, "".to_string()));

let (conn, _) = listener.accept().await.unwrap();

let mut stream = Framed::new(conn, Codec::builder().finish());
if let Message::Version {
services,
timestamp,
address_from,
user_agent,
start_height,
relay,
..
} = stream
.next()
.await
.expect("stream item")
.expect("item is Ok(msg)")
{
// Check that the version message sent by connect_isolated
// has the fields specified in the Stolon RFC.
assert_eq!(services, PeerServices::empty());
assert_eq!(timestamp.timestamp() % (5 * 60), 0);
assert_eq!(
address_from,
AddrInVersion::new(fixed_isolated_addr, PeerServices::empty()),
);
assert_eq!(user_agent, "");
assert_eq!(start_height.0, 0);
assert!(!relay);
} else {
panic!("handshake did not send version message");
}
}
/// Creates a direct TCP Zcash peer connection to `addr`.
/// This connection is completely isolated from all other node state.
///
/// See [`connect_isolated`] for details.
///
/// # Privacy
///
/// Transactions sent over this connection can be linked to the sending and receiving IP address
/// by passive internet observers.
pub fn connect_isolated_tcp_direct(
network: Network,
addr: SocketAddr,
user_agent: String,
) -> impl Future<Output = Result<BoxService<Request, Response, BoxError>, BoxError>> {
tokio::net::TcpStream::connect(addr)
.err_into()
.and_then(move |tcp_stream| connect_isolated(network, tcp_stream, user_agent))
}
3 changes: 3 additions & 0 deletions zebra-network/src/isolated/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//! Tests for isolated Zebra connections.

mod vectors;
Loading