From 4ba62629f6acca7a18a23546e3a925e86259acb2 Mon Sep 17 00:00:00 2001 From: Keith Wansbrough Date: Thu, 26 Apr 2018 08:43:05 +0100 Subject: [PATCH] feat(client): support local bind for HttpConnector Add `set_local_address` to the `HttpConnector`. This configures the client to bind the socket to a local address of the host before it connects to the destination. This is useful on hosts which have multiple network interfaces, to ensure the request is issued over a specific interface. Closes #1498 --- Cargo.toml | 5 +++ examples/bound_client.rs | 56 +++++++++++++++++++++++++++ src/client/connect.rs | 81 +++++++++++++++++++++++++--------------- 3 files changed, 112 insertions(+), 30 deletions(-) create mode 100644 examples/bound_client.rs diff --git a/Cargo.toml b/Cargo.toml index f3e47952d7..bd33c274b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,6 +57,11 @@ runtime = [ "tokio-tcp", ] +[[example]] +name = "bound_client" +path = "examples/bound_client.rs" +required-features = ["runtime"] + [[example]] name = "client" path = "examples/client.rs" diff --git a/examples/bound_client.rs b/examples/bound_client.rs new file mode 100644 index 0000000000..8ed688bf36 --- /dev/null +++ b/examples/bound_client.rs @@ -0,0 +1,56 @@ +#![deny(warnings)] +extern crate hyper; +extern crate pretty_env_logger; + +use std::env; +use std::net::IpAddr; +use std::io::{self, Write}; + +use hyper::{Body, Client, Request}; +use hyper::rt::{self, Future, Stream}; +use hyper::client::connect::HttpConnector; + +fn main() { + pretty_env_logger::init(); + + let url = match env::args().nth(1) { + Some(url) => url, + None => { + println!("Usage: client []"); + return; + } + }; + + let url = url.parse::().unwrap(); + if url.scheme_part().map(|s| s.as_ref()) != Some("http") { + println!("This example only works with 'http' URLs."); + return; + } + + let bind_addr = env::args().nth(2); + + let bind_addr: Option = bind_addr.map(|s| s.parse::().unwrap()); + + rt::run(rt::lazy(move || { + let mut connector = HttpConnector::new(4); + connector.set_local_address(bind_addr); + let client = Client::builder().build(connector); + + let mut req = Request::new(Body::empty()); + *req.uri_mut() = url; + + client.request(req).and_then(|res| { + println!("Response: {}", res.status()); + println!("Headers: {:#?}", res.headers()); + + res.into_body().for_each(|chunk| { + io::stdout().write_all(&chunk) + .map_err(|e| panic!("example expects stdout is open, error={}", e)) + }) + }).map(|_| { + println!("\n\nDone."); + }).map_err(|err| { + eprintln!("Error {}", err); + }) + })); +} diff --git a/src/client/connect.rs b/src/client/connect.rs index 83dd1b676f..0fbb5b5afd 100644 --- a/src/client/connect.rs +++ b/src/client/connect.rs @@ -6,6 +6,7 @@ //! establishes connections over TCP. //! - The [`Connect`](Connect) trait and related types to build custom connectors. use std::error::Error as StdError; +use std::borrow::Cow; use futures::Future; use http::Uri; @@ -128,7 +129,7 @@ mod http { use std::fmt; use std::io; use std::mem; - use std::net::SocketAddr; + use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use std::time::Duration; @@ -146,30 +147,35 @@ mod http { use self::http_connector::HttpConnectorBlockingTask; - fn connect(addr: &SocketAddr, handle: &Option) -> io::Result { - if let Some(ref handle) = *handle { - let builder = match addr { - &SocketAddr::V4(_) => TcpBuilder::new_v4()?, - &SocketAddr::V6(_) => TcpBuilder::new_v6()?, + fn connect(addr: &SocketAddr, local_addr: &Option, handle: &Option) -> io::Result { + let builder = match addr { + &SocketAddr::V4(_) => TcpBuilder::new_v4()?, + &SocketAddr::V6(_) => TcpBuilder::new_v6()?, + }; + + if let Some(ref local_addr) = *local_addr { + // Caller has requested this socket be bound before calling connect + builder.bind(SocketAddr::new(local_addr.clone(), 0))?; + } + else if cfg!(windows) { + // Windows requires a socket be bound before calling connect + let any: SocketAddr = match addr { + &SocketAddr::V4(_) => { + ([0, 0, 0, 0], 0).into() + }, + &SocketAddr::V6(_) => { + ([0, 0, 0, 0, 0, 0, 0, 0], 0).into() + } }; + builder.bind(any)?; + } - if cfg!(windows) { - // Windows requires a socket be bound before calling connect - let any: SocketAddr = match addr { - &SocketAddr::V4(_) => { - ([0, 0, 0, 0], 0).into() - }, - &SocketAddr::V6(_) => { - ([0, 0, 0, 0, 0, 0, 0, 0], 0).into() - } - }; - builder.bind(any)?; - } + let handle = match *handle { + Some(ref handle) => Cow::Borrowed(handle), + None => Cow::Owned(Handle::current()), + }; - Ok(TcpStream::connect_std(builder.to_tcp_stream()?, addr, handle)) - } else { - Ok(TcpStream::connect(addr)) - } + Ok(TcpStream::connect_std(builder.to_tcp_stream()?, addr, &handle)) } /// A connector for the `http` scheme. @@ -182,6 +188,7 @@ mod http { handle: Option, keep_alive_timeout: Option, nodelay: bool, + local_address: Option, } impl HttpConnector { @@ -218,6 +225,7 @@ mod http { handle, keep_alive_timeout: None, nodelay: false, + local_address: None, } } @@ -246,6 +254,16 @@ mod http { pub fn set_nodelay(&mut self, nodelay: bool) { self.nodelay = nodelay; } + + /// Set that all sockets are bound to the configured address before connection. + /// + /// If `None`, the sockets will not be bound. + /// + /// Default is `None`. + #[inline] + pub fn set_local_address(&mut self, addr: Option) { + self.local_address = addr; + } } impl fmt::Debug for HttpConnector { @@ -287,7 +305,7 @@ mod http { }; HttpConnecting { - state: State::Lazy(self.executor.clone(), host.into(), port), + state: State::Lazy(self.executor.clone(), host.into(), port, self.local_address), handle: self.handle.clone(), keep_alive_timeout: self.keep_alive_timeout, nodelay: self.nodelay, @@ -337,8 +355,8 @@ mod http { } enum State { - Lazy(HttpConnectExecutor, String, u16), - Resolving(oneshot::SpawnHandle), + Lazy(HttpConnectExecutor, String, u16, Option), + Resolving(oneshot::SpawnHandle, Option), Connecting(ConnectingTcp), Error(Option), } @@ -351,26 +369,28 @@ mod http { loop { let state; match self.state { - State::Lazy(ref executor, ref mut host, port) => { + State::Lazy(ref executor, ref mut host, port, local_addr) => { // If the host is already an IP addr (v4 or v6), // skip resolving the dns and start connecting right away. if let Some(addrs) = dns::IpAddrs::try_parse(host, port) { state = State::Connecting(ConnectingTcp { addrs: addrs, + local_addr: local_addr, current: None }) } else { let host = mem::replace(host, String::new()); let work = dns::Work::new(host, port); - state = State::Resolving(oneshot::spawn(work, executor)); + state = State::Resolving(oneshot::spawn(work, executor), local_addr); } }, - State::Resolving(ref mut future) => { + State::Resolving(ref mut future, local_addr) => { match try!(future.poll()) { Async::NotReady => return Ok(Async::NotReady), Async::Ready(addrs) => { state = State::Connecting(ConnectingTcp { addrs: addrs, + local_addr: local_addr, current: None, }) } @@ -402,6 +422,7 @@ mod http { struct ConnectingTcp { addrs: dns::IpAddrs, + local_addr: Option, current: Option, } @@ -418,14 +439,14 @@ mod http { err = Some(e); if let Some(addr) = self.addrs.next() { debug!("connecting to {}", addr); - *current = connect(&addr, handle)?; + *current = connect(&addr, &self.local_addr, handle)?; continue; } } } } else if let Some(addr) = self.addrs.next() { debug!("connecting to {}", addr); - self.current = Some(connect(&addr, handle)?); + self.current = Some(connect(&addr, &self.local_addr, handle)?); continue; }