Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend Server to use anything that produces a stream of (Io, SocketAddr) #1045

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 56 additions & 14 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use futures::{Future, Map, Stream, Poll, Async, Sink, StartSend, AsyncSink};

use tokio::io::Io;
use tokio::reactor::{Core, Handle, Timeout};
use tokio::net::TcpListener;
use tokio::net::{TcpListener, TcpStream};
use tokio_proto::BindServer;
use tokio_proto::streaming::Message;
use tokio_proto::streaming::pipeline::{Transport, Frame, ServerProto};
Expand All @@ -41,15 +41,19 @@ pub struct Http {
keep_alive: bool,
}


type ConnectionStream<I: Io> = Box<Stream<Item=(Box<I>, SocketAddr), Error=io::Error>>;

/// An instance of a server created through `Http::bind`.
///
/// This server is intended as a convenience for creating a TCP listener on an
/// address and then serving TCP connections accepted with the service provided.
pub struct Server<S> {
pub struct Server<S, I: Io + 'static> {
protocol: Http,
new_service: S,
core: Core,
listener: TcpListener,
local_address: SocketAddr,
incoming: ConnectionStream<I>,
shutdown_timeout: Duration,
}

Expand Down Expand Up @@ -80,22 +84,55 @@ impl Http {
///
/// The returned `Server` contains one method, `run`, which is used to
/// actually run the server.
pub fn bind<S>(&self, addr: &SocketAddr, new_service: S) -> ::Result<Server<S>>
pub fn bind<S>(&self, addr: &SocketAddr, new_service: S) -> ::Result<Server<S, TcpStream>>
where S: NewService<Request = Request, Response = Response, Error = ::Error> +
Send + Sync + 'static,
{
let core = try!(Core::new());
let handle = core.handle();
let listener = try!(TcpListener::bind(addr, &handle));
let local_addr = try!(listener.local_addr());

Ok(Server {
new_service: new_service,
core: core,
listener: listener,
local_address: local_addr,
incoming: Box::new(listener.incoming().map(|(s,a)| { (Box::new(s), a) })),
protocol: self.clone(),
shutdown_timeout: Duration::new(1, 0),
})
}
/// Bind the provided incoming stream and return a server ready to handle
/// connections.
///
///
/// This method will setup a Server on the provided Core and ConnectionStream
///
/// to accept connections. Each connection will be processed with the
/// `new_service` object provided as well, creating a new service per
/// connection.
///
///
/// The returned `Server` contains one method, `run`, which is used to
/// actually run the server.
pub fn bind_existing<S, I>(&self, local_addr: SocketAddr, core: Core, listener: ConnectionStream<I>, new_service: S) -> ::Result<Server<S, I>>
where S: NewService<Request = Request, Response = Response, Error = ::Error> + Send + Sync + 'static,
I: Io + 'static
{

Ok(Server {
new_service: new_service,
core: core,
local_address: local_addr,
incoming: listener,
protocol: self.clone(),
shutdown_timeout: Duration::new(1, 0),
})
}





/// Use this `Http` instance to create a new server task which handles the
/// connection `io` provided.
Expand Down Expand Up @@ -270,13 +307,13 @@ impl<T> Service for HttpService<T>
}
}

impl<S> Server<S>
where S: NewService<Request = Request, Response = Response, Error = ::Error>
+ Send + Sync + 'static,
impl<S, I> Server<S, I>
where S: NewService<Request = Request, Response = Response, Error = ::Error> + Send + Sync + 'static,
I: Io + 'static
{
/// Returns the local address that this server is bound to.
pub fn local_addr(&self) -> ::Result<SocketAddr> {
Ok(try!(self.listener.local_addr()))
Ok(self.local_address)
}

/// Returns a handle to the underlying event loop that this server will be
Expand Down Expand Up @@ -322,23 +359,26 @@ impl<S> Server<S>
pub fn run_until<F>(self, shutdown_signal: F) -> ::Result<()>
where F: Future<Item = (), Error = ::Error>,
{
let Server { protocol, new_service, mut core, listener, shutdown_timeout } = self;
let Server { protocol, new_service, mut core, local_address, incoming, shutdown_timeout } = self;
let handle = core.handle();

// Unused :)
{ let _ =local_address; }

// Mini future to track the number of active services
let info = Rc::new(RefCell::new(Info {
active: 0,
blocker: None,
}));

// Future for our server's execution
let srv = listener.incoming().for_each(|(socket, addr)| {
let srv = incoming.for_each(|(socket, addr)| {
let s = NotifyService {
inner: try!(new_service.new_service()),
info: Rc::downgrade(&info),
};
info.borrow_mut().active += 1;
protocol.bind_connection(&handle, socket, addr, s);
protocol.bind_connection(&handle, *socket, addr, s);
Ok(())
});

Expand All @@ -361,6 +401,7 @@ impl<S> Server<S>
//
// Our custom `WaitUntilZero` will resolve once all services constructed
// here have been destroyed.

let timeout = try!(Timeout::new(shutdown_timeout, &handle));
let wait = WaitUntilZero { info: info.clone() };
match core.run(wait.select(timeout)) {
Expand All @@ -370,11 +411,11 @@ impl<S> Server<S>
}
}

impl<S: fmt::Debug> fmt::Debug for Server<S> {
impl<S: fmt::Debug, I: Io> fmt::Debug for Server<S, I> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Server")
.field("core", &"...")
.field("listener", &self.listener)
.field("local_address", &self.local_address)
.field("new_service", &self.new_service)
.field("protocol", &self.protocol)
.finish()
Expand Down Expand Up @@ -436,3 +477,4 @@ impl Future for WaitUntilZero {
}
}
}