Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep weak pointer to node in socket objects #4542

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 97 additions & 45 deletions nano/node/transport/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ nano::transport::socket::socket (nano::node & node_a, endpoint_type_t endpoint_t
send_queue{ max_queue_size_a },
strand{ node_a.io_ctx.get_executor () },
tcp_socket{ node_a.io_ctx },
node{ node_a },
node_w{ node_a.shared () },
endpoint_type_m{ endpoint_type_a },
timeout{ std::numeric_limits<uint64_t>::max () },
last_completion_time_or_init{ nano::seconds_since_epoch () },
Expand Down Expand Up @@ -55,10 +55,18 @@ void nano::transport::socket::async_connect (nano::tcp_endpoint const & endpoint
tcp_socket.async_connect (endpoint_a,
boost::asio::bind_executor (strand,
[this_l = shared_from_this (), callback = std::move (callback_a), endpoint_a] (boost::system::error_code const & ec) {
debug_assert (this_l->strand.running_in_this_thread ());

auto node_l = this_l->node_w.lock ();
if (!node_l)
{
return;
}

this_l->remote = endpoint_a;
if (ec)
{
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_connect_error, nano::stat::dir::in);
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_connect_error, nano::stat::dir::in);
this_l->close ();
}
else
Expand All @@ -69,7 +77,7 @@ void nano::transport::socket::async_connect (nano::tcp_endpoint const & endpoint
boost::system::error_code ec;
this_l->local = this_l->tcp_socket.local_endpoint (ec);
}
this_l->node.observers.socket_connected.notify (*this_l);
node_l->observers.socket_connected.notify (*this_l);
}
callback (ec);
}));
Expand All @@ -90,14 +98,20 @@ void nano::transport::socket::async_read (std::shared_ptr<std::vector<uint8_t>>
[this_l, buffer_a, cbk = std::move (callback)] (boost::system::error_code const & ec, std::size_t size_a) {
debug_assert (this_l->strand.running_in_this_thread ());

auto node_l = this_l->node_w.lock ();
if (!node_l)
{
return;
}

if (ec)
{
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_read_error, nano::stat::dir::in);
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_read_error, nano::stat::dir::in);
this_l->close ();
}
else
{
this_l->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::in, size_a);
node_l->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::in, size_a);
this_l->set_last_completion ();
this_l->set_last_receive_time ();
}
Expand All @@ -116,11 +130,17 @@ void nano::transport::socket::async_read (std::shared_ptr<std::vector<uint8_t>>

void nano::transport::socket::async_write (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a, nano::transport::traffic_type traffic_type)
{
auto node_l = node_w.lock ();
if (!node_l)
{
return;
}

if (closed)
{
if (callback_a)
{
node.background ([callback = std::move (callback_a)] () {
node_l->background ([callback = std::move (callback_a)] () {
callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
Expand All @@ -132,7 +152,7 @@ void nano::transport::socket::async_write (nano::shared_const_buffer const & buf
{
if (callback_a)
{
node.background ([callback = std::move (callback_a)] () {
node_l->background ([callback = std::move (callback_a)] () {
callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
Expand Down Expand Up @@ -170,15 +190,21 @@ void nano::transport::socket::write_queued_messages ()
boost::asio::bind_executor (strand, [this_l = shared_from_this (), next /* `next` object keeps buffer in scope */] (boost::system::error_code ec, std::size_t size) {
debug_assert (this_l->strand.running_in_this_thread ());

auto node_l = this_l->node_w.lock ();
if (!node_l)
{
return;
}

this_l->write_in_progress = false;
if (ec)
{
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in);
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in);
this_l->close ();
}
else
{
this_l->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size);
node_l->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size);
this_l->set_last_completion ();
}

Expand Down Expand Up @@ -233,56 +259,76 @@ void nano::transport::socket::set_last_receive_time ()

void nano::transport::socket::ongoing_checkup ()
{
std::weak_ptr<nano::transport::socket> this_w (shared_from_this ());
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (node.network_params.network.is_dev_network () ? 1 : 5), [this_w] () {
if (auto this_l = this_w.lock ())
auto node_l = node_w.lock ();
if (!node_l)
{
return;
}

node_l->workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (node_l->network_params.network.is_dev_network () ? 1 : 5), [this_w = weak_from_this ()] () {
auto this_l = this_w.lock ();
if (!this_l)
{
// If the socket is already dead, close just in case, and stop doing checkups
if (!this_l->alive ())
{
this_l->close ();
return;
}
return;
}

nano::seconds_t now = nano::seconds_since_epoch ();
auto condition_to_disconnect{ false };
auto node_l = this_l->node_w.lock ();
if (!node_l)
{
return;
}

// if this is a server socket, and no data is received for silent_connection_tolerance_time seconds then disconnect
if (this_l->endpoint_type () == endpoint_type_t::server && (now - this_l->last_receive_time_or_init) > static_cast<uint64_t> (this_l->silent_connection_tolerance_time.count ()))
{
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_silent_connection_drop, nano::stat::dir::in);
// If the socket is already dead, close just in case, and stop doing checkups
if (!this_l->alive ())
{
this_l->close ();
return;
}

condition_to_disconnect = true;
}
nano::seconds_t now = nano::seconds_since_epoch ();
auto condition_to_disconnect{ false };

// if there is no activity for timeout seconds then disconnect
if ((now - this_l->last_completion_time_or_init) > this_l->timeout)
{
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, this_l->endpoint_type () == endpoint_type_t::server ? nano::stat::dir::in : nano::stat::dir::out);
// if this is a server socket, and no data is received for silent_connection_tolerance_time seconds then disconnect
if (this_l->endpoint_type () == endpoint_type_t::server && (now - this_l->last_receive_time_or_init) > static_cast<uint64_t> (this_l->silent_connection_tolerance_time.count ()))
{
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_silent_connection_drop, nano::stat::dir::in);

condition_to_disconnect = true;
}
condition_to_disconnect = true;
}

if (condition_to_disconnect)
{
this_l->node.logger.debug (nano::log::type::tcp_server, "Closing socket due to timeout ({})", nano::util::to_str (this_l->remote));
// if there is no activity for timeout seconds then disconnect
if ((now - this_l->last_completion_time_or_init) > this_l->timeout)
{
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, this_l->endpoint_type () == endpoint_type_t::server ? nano::stat::dir::in : nano::stat::dir::out);

this_l->timed_out = true;
this_l->close ();
}
else if (!this_l->closed)
{
this_l->ongoing_checkup ();
}
condition_to_disconnect = true;
}

if (condition_to_disconnect)
{
node_l->logger.debug (nano::log::type::tcp_server, "Closing socket due to timeout ({})", nano::util::to_str (this_l->remote));

this_l->timed_out = true;
this_l->close ();
}
else if (!this_l->closed)
{
this_l->ongoing_checkup ();
}
});
}

void nano::transport::socket::read_impl (std::shared_ptr<std::vector<uint8_t>> const & data_a, std::size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a)
{
auto node_l = node_w.lock ();
if (!node_l)
{
return;
}

// Increase timeout to receive TCP header (idle server socket)
auto const prev_timeout = get_default_timeout_value ();
set_default_timeout_value (node.network_params.network.idle_timeout);
set_default_timeout_value (node_l->network_params.network.idle_timeout);
async_read (data_a, size_a, [callback_l = std::move (callback_a), prev_timeout, this_l = shared_from_this ()] (boost::system::error_code const & ec_a, std::size_t size_a) {
this_l->set_default_timeout_value (prev_timeout);
callback_l (ec_a, size_a);
Expand Down Expand Up @@ -314,6 +360,12 @@ void nano::transport::socket::close ()
// This must be called from a strand or the destructor
void nano::transport::socket::close_internal ()
{
auto node_l = node_w.lock ();
if (!node_l)
{
return;
}

if (closed.exchange (true))
{
return;
Expand All @@ -330,8 +382,8 @@ void nano::transport::socket::close_internal ()

if (ec)
{
node.stats.inc (nano::stat::type::socket, nano::stat::detail::error_socket_close);
node.logger.error (nano::log::type::socket, "Failed to close socket gracefully: {} ({})", ec.message (), nano::util::to_str (remote));
node_l->stats.inc (nano::stat::type::socket, nano::stat::detail::error_socket_close);
node_l->logger.error (nano::log::type::socket, "Failed to close socket gracefully: {} ({})", ec.message (), nano::util::to_str (remote));
}
}

Expand Down
3 changes: 2 additions & 1 deletion nano/node/transport/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ class socket final : public std::enable_shared_from_this<nano::transport::socket
protected:
boost::asio::strand<boost::asio::io_context::executor_type> strand;
boost::asio::ip::tcp::socket tcp_socket;
nano::node & node;

std::weak_ptr<nano::node> node_w;

/** The other end of the connection */
boost::asio::ip::tcp::endpoint remote;
Expand Down
Loading