Skip to content

Commit

Permalink
Fix race conditions closing HTTP I/O objects:
Browse files Browse the repository at this point in the history
This fixes a case where stop can sometimes skip calling close on some
I/O objects or crash in a rare circumstance where a connection is in the
process of being torn down at the exact time the server is stopped. When
the acceptor receives errors, it logs the error and continues listening
instead of stopping.
  • Loading branch information
vinniefalco committed Nov 3, 2014
1 parent 35f9499 commit 549ad32
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 109 deletions.
17 changes: 8 additions & 9 deletions src/ripple/app/main/ServerHandlerImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,9 @@ ServerHandlerImp::onRequest (HTTP::Session& session)
return;
}

session.detach();

m_jobQueue.addJob (jtCLIENT, "RPC-Client", std::bind (
&ServerHandlerImp::processSession, this, std::placeholders::_1,
std::ref (session)));
session.detach()));
}

void
Expand All @@ -145,18 +143,19 @@ ServerHandlerImp::onStopped (HTTP::Server&)

// Dispatched on the job queue
void
ServerHandlerImp::processSession (Job& job, HTTP::Session& session)
ServerHandlerImp::processSession (Job& job,
std::shared_ptr<HTTP::Session> const& session)
{
session.write (processRequest (to_string(session.body()),
session.remoteAddress().at_port(0)));
session->write (processRequest (to_string(session->body()),
session->remoteAddress().at_port(0)));

if (session.request().keep_alive())
if (session->request().keep_alive())
{
session.complete();
session->complete();
}
else
{
session.close (true);
session->close (true);
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/ripple/app/main/ServerHandlerImp.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ class ServerHandlerImp
//--------------------------------------------------------------------------

void
processSession (Job& job, HTTP::Session& session);
processSession (Job& job,
std::shared_ptr<HTTP::Session> const& session);

std::string
createResponse (int statusCode, std::string const& description);
Expand Down
3 changes: 2 additions & 1 deletion src/ripple/http/Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <beast/net/IPEndpoint.h>
#include <beast/utility/Journal.h>
#include <beast/module/asio/HTTPRequest.h>
#include <memory>
#include <ostream>

namespace ripple {
Expand Down Expand Up @@ -104,7 +105,7 @@ class Session
will not return until all detached sessions are closed.
*/
virtual
void
std::shared_ptr<Session>
detach() = 0;

/** Indicate that the response is complete.
Expand Down
118 changes: 78 additions & 40 deletions src/ripple/http/impl/Door.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,36 +69,43 @@ detect_ssl (Socket& socket, StreamBuf& buf, Yield yield)
break;
}

std::size_t const bytes_transferred = boost::asio::async_read (socket,
buf.commit(boost::asio::async_read (socket,
buf.prepare(max - bytes), boost::asio::transfer_at_least(1),
yield[result.first]);
yield[result.first]));
if (result.first)
break;
buf.commit (bytes_transferred);
}
return result;
}

//------------------------------------------------------------------------------

Door::Child::Child(Door& door)
: door_(door)
{
}

Door::Child::~Child()
{
door_.remove(*this);
}

//------------------------------------------------------------------------------

Door::detector::detector (Door& door, socket_type&& socket,
endpoint_type endpoint)
: door_ (door)
: Child(door)
, socket_ (std::move(socket))
, timer_ (socket_.get_io_service())
, remote_endpoint_ (endpoint)
{
door_.add(*this);
}

Door::detector::~detector()
{
door_.remove(*this);
}

void
Door::detector::run()
{
// do_detect must be called before do_timer or else
// the timer can be canceled before it gets set.
boost::asio::spawn (door_.strand_, std::bind (&detector::do_detect,
shared_from_this(), std::placeholders::_1));

Expand Down Expand Up @@ -151,31 +158,55 @@ Door::Door (boost::asio::io_service& io_service,
ServerImpl& server, Port const& port)
: port_(port)
, server_(server)
, acceptor_(io_service, to_asio(port), true)
, acceptor_(io_service)
, strand_(io_service)
{
server_.add (*this);

error_code ec;
endpoint_type const local_address = to_asio(port);

acceptor_.open(local_address.protocol(), ec);
if (ec)
{
if (server_.journal().error) server_.journal().error <<
"Error setting acceptor socket option: " << ec.message();
"Error opening listener: " << ec.message();
throw std::exception();
return;
}

if (! ec)
acceptor_.set_option(
boost::asio::ip::tcp::acceptor::reuse_address(true), ec);
if (ec)
{
if (server_.journal().info) server_.journal().info <<
"Bound to endpoint " << to_string (acceptor_.local_endpoint());
if (server_.journal().error) server_.journal().error <<
"Error setting listener options: " << ec.message();
throw std::exception();
return;
}
else

acceptor_.bind(local_address, ec);
if (ec)
{
if (server_.journal().error) server_.journal().error <<
"Error binding to endpoint " <<
to_string (acceptor_.local_endpoint()) <<
", '" << ec.message() << "'";
"Error binding to endpoint " << local_address <<
", '" << ec.message() << "'";
throw std::exception();
return;
}

acceptor_.listen(boost::asio::socket_base::max_connections, ec);
if (ec)
{
if (server_.journal().error) server_.journal().error <<
"Error on listen: " << local_address <<
", '" << ec.message() << "'";
throw std::exception();
return;
}

if (server_.journal().info) server_.journal().info <<
"Bound to endpoint " << to_string (acceptor_.local_endpoint());
}

Door::~Door()
Expand Down Expand Up @@ -205,26 +236,31 @@ Door::close()
error_code ec;
acceptor_.close(ec);
// Close all detector, Peer objects
std::lock_guard<std::mutex> lock(mutex_);
for(auto& _ : list_)
_.close();
{
auto const peer = _.second.lock();
if (peer != nullptr)
peer->close();
}
}

//------------------------------------------------------------------------------

void
Door::add (Child& c)
Door::remove (Child& c)
{
std::lock_guard<std::mutex> lock(mutex_);
list_.push_back(c);
list_.erase(&c);
if (list_.empty())
cond_.notify_all();
}

//------------------------------------------------------------------------------

void
Door::remove (Child& c)
Door::add (std::shared_ptr<Child> const& child)
{
std::lock_guard<std::mutex> lock(mutex_);
list_.erase(list_.iterator_to(c));
if (list_.empty())
cond_.notify_all();
list_.emplace(child.get(), child);
}

void
Expand Down Expand Up @@ -254,14 +290,16 @@ Door::create (bool ssl, beast::asio::streambuf&& buf,
auto const peer = std::make_shared <SSLPeer> (*this,
server_.journal(), remote_address, buf.data(),
std::move(socket));
peer->accept();
add(peer);
peer->run();
return;
}

auto const peer = std::make_shared <PlainPeer> (*this,
server_.journal(), remote_address, buf.data(),
std::move(socket));
peer->accept();
add(peer);
peer->run();
return;
}
break;
Expand All @@ -282,34 +320,34 @@ Door::do_accept (boost::asio::yield_context yield)
endpoint_type endpoint;
socket_type socket (acceptor_.get_io_service());
acceptor_.async_accept (socket, endpoint, yield[ec]);
if (server_.closed())
if (ec && ec != boost::asio::error::operation_aborted)
if (server_.journal().error) server_.journal().error <<
"accept: " << ec.message();
if (ec == boost::asio::error::operation_aborted || server_.closed())
break;
if (ec)
{
if (ec != boost::asio::error::operation_aborted)
if (server_.journal().error) server_.journal().error <<
"accept: " << ec.message();
break;
}

continue;
if (port_.security == Port::Security::no_ssl)
{
auto const peer = std::make_shared <PlainPeer> (*this,
server_.journal(), endpoint, boost::asio::null_buffers(),
std::move(socket));
peer->accept();
add(peer);
peer->run();
}
else if (port_.security == Port::Security::require_ssl)
{
auto const peer = std::make_shared <SSLPeer> (*this,
server_.journal(), endpoint, boost::asio::null_buffers(),
std::move(socket));
peer->accept();
add(peer);
peer->run();
}
else
{
auto const c = std::make_shared <detector> (
*this, std::move(socket), endpoint);
add(c);
c->run();
}
}
Expand Down
38 changes: 22 additions & 16 deletions src/ripple/http/impl/Door.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/intrusive/list.hpp>
#include <boost/container/flat_map.hpp>
#include <chrono>
#include <condition_variable>
#include <memory>
Expand All @@ -41,6 +41,18 @@ class Door
: public ServerImpl::Child
, public std::enable_shared_from_this <Door>
{
public:
class Child
{
protected:
Door& door_;

public:
Child (Door& door);
virtual ~Child();
virtual void close() = 0;
};

private:
using clock_type = std::chrono::steady_clock;
using timer_type = boost::asio::basic_waitable_timer<clock_type>;
Expand All @@ -53,39 +65,33 @@ class Door

// Detects SSL on a socket
class detector
: public std::enable_shared_from_this <detector>
, public ServerImpl::Child
: public Child
, public std::enable_shared_from_this <detector>
{
private:
Door& door_;
socket_type socket_;
timer_type timer_;
endpoint_type remote_endpoint_;

public:
detector (Door& door, socket_type&& socket,
endpoint_type endpoint);

~detector();

void run();
void close();
void close() override;

private:
void do_timer (yield_context yield);
void do_detect (yield_context yield);
};

using list_type = boost::intrusive::make_list <Child,
boost::intrusive::constant_time_size <false>>::type;

Port port_;
ServerImpl& server_;
acceptor_type acceptor_;
boost::asio::io_service::strand strand_;
std::mutex mutex_;
std::condition_variable cond_;
list_type list_;
boost::container::flat_map<
Child*, std::weak_ptr<Child>> list_;

public:
Door (boost::asio::io_service& io_service,
Expand Down Expand Up @@ -113,10 +119,6 @@ class Door
// Work-around because we can't call shared_from_this in ctor
void run();

void add (Child& c);

void remove (Child& c);

/** Close the Door listening socket and connections.
The listening socket is closed, and all open connections
belonging to the Door are closed.
Expand All @@ -125,7 +127,11 @@ class Door
*/
void close();

void remove (Child& c);

private:
void add (std::shared_ptr<Child> const& child);

void create (bool ssl, beast::asio::streambuf&& buf,
socket_type&& socket, endpoint_type remote_address);

Expand Down
Loading

0 comments on commit 549ad32

Please sign in to comment.