Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store io_context as shared pointer #4506

Merged
merged 1 commit into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ using namespace std::chrono_literals;
TEST (network, tcp_connection)
{
nano::test::system system;
boost::asio::ip::tcp::acceptor acceptor (system.io_ctx);
boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx);
auto port = system.get_available_port ();
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v4::any (), port);
acceptor.open (endpoint.protocol ());
acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true));
acceptor.bind (endpoint);
acceptor.listen ();
boost::asio::ip::tcp::socket incoming (system.io_ctx);
boost::asio::ip::tcp::socket incoming (*system.io_ctx);
std::atomic<bool> done1 (false);
std::string message1;
acceptor.async_accept (incoming, [&done1, &message1] (boost::system::error_code const & ec_a) {
Expand All @@ -39,7 +39,7 @@ TEST (network, tcp_connection)
}
done1 = true;
});
boost::asio::ip::tcp::socket connector (system.io_ctx);
boost::asio::ip::tcp::socket connector (*system.io_ctx);
std::atomic<bool> done2 (false);
std::string message2;
connector.async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), acceptor.local_endpoint ().port ()),
Expand Down Expand Up @@ -538,13 +538,13 @@ TEST (network, ipv6_bind_send_ipv4)
std::array<uint8_t, 16> bytes1{};
std::atomic<bool> finish1{ false };
nano::endpoint endpoint3;
boost::asio::ip::udp::socket socket1 (system.io_ctx, endpoint1);
boost::asio::ip::udp::socket socket1 (*system.io_ctx, endpoint1);
socket1.async_receive_from (boost::asio::buffer (bytes1.data (), bytes1.size ()), endpoint3, [&finish1] (boost::system::error_code const & error, size_t size_a) {
ASSERT_FALSE (error);
ASSERT_EQ (16, size_a);
finish1 = true;
});
boost::asio::ip::udp::socket socket2 (system.io_ctx, endpoint2);
boost::asio::ip::udp::socket socket2 (*system.io_ctx, endpoint2);
nano::endpoint endpoint5 (boost::asio::ip::address_v4::loopback (), socket1.local_endpoint ().port ());
nano::endpoint endpoint6 (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4::loopback ()), socket2.local_endpoint ().port ());
socket2.async_send_to (boost::asio::buffer (std::array<uint8_t, 16>{}, 16), endpoint5, [] (boost::system::error_code const & error, size_t size_a) {
Expand Down
8 changes: 4 additions & 4 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ TEST (node, stop)
nano::test::system system (1);
ASSERT_NE (system.nodes[0]->wallets.items.end (), system.nodes[0]->wallets.items.begin ());
system.nodes[0]->stop ();
system.io_ctx.run ();
system.io_ctx->run ();
ASSERT_TRUE (true);
}

Expand Down Expand Up @@ -68,10 +68,10 @@ TEST (node, work_generate)
TEST (node, block_store_path_failure)
{
nano::test::system system;
auto service (std::make_shared<boost::asio::io_context> ());
auto io_ctx = std::make_shared<boost::asio::io_context> ();
auto path (nano::unique_path ());
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
auto node (std::make_shared<nano::node> (*service, system.get_available_port (), path, pool));
auto node (std::make_shared<nano::node> (io_ctx, system.get_available_port (), path, pool));
ASSERT_TRUE (node->wallets.items.empty ());
node->stop ();
}
Expand All @@ -97,7 +97,7 @@ TEST (node_DeathTest, readonly_block_store_not_exist)
TEST (node, password_fanout)
{
nano::test::system system;
boost::asio::io_context io_ctx;
auto io_ctx = std::make_shared<boost::asio::io_context> ();
auto path (nano::unique_path ());
nano::node_config config;
config.peering_port = system.get_available_port ();
Expand Down
20 changes: 10 additions & 10 deletions nano/core_test/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ TEST (socket, drop_policy)
nano::inactive_node inactivenode (nano::unique_path (), node_flags);
auto node = inactivenode.node;

nano::thread_runner runner (node->io_ctx, 1);
nano::thread_runner runner (node->io_ctx_shared, 1);

std::vector<std::shared_ptr<nano::transport::socket>> connections;

Expand Down Expand Up @@ -469,7 +469,7 @@ TEST (socket, concurrent_writes)

// This gives more realistic execution than using system#poll, allowing writes to
// queue up and drain concurrently.
nano::thread_runner runner (node->io_ctx, 1);
nano::thread_runner runner (node->io_ctx_shared, 1);

constexpr size_t max_connections = 4;
constexpr size_t client_count = max_connections;
Expand Down Expand Up @@ -622,13 +622,13 @@ TEST (socket_timeout, read)

// create a server socket
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ());
boost::asio::ip::tcp::acceptor acceptor (system.io_ctx);
boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx);
acceptor.open (endpoint.protocol ());
acceptor.bind (endpoint);
acceptor.listen (boost::asio::socket_base::max_listen_connections);

// asynchronously accept an incoming connection and create a newsock and do not send any data
boost::asio::ip::tcp::socket newsock (system.io_ctx);
boost::asio::ip::tcp::socket newsock (*system.io_ctx);
acceptor.async_accept (newsock, [] (boost::system::error_code const & ec_a) {
EXPECT_FALSE (ec_a);
});
Expand Down Expand Up @@ -668,13 +668,13 @@ TEST (socket_timeout, write)

// create a server socket
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ());
boost::asio::ip::tcp::acceptor acceptor (system.io_ctx);
boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx);
acceptor.open (endpoint.protocol ());
acceptor.bind (endpoint);
acceptor.listen (boost::asio::socket_base::max_listen_connections);

// asynchronously accept an incoming connection and create a newsock and do not receive any data
boost::asio::ip::tcp::socket newsock (system.io_ctx);
boost::asio::ip::tcp::socket newsock (*system.io_ctx);
acceptor.async_accept (newsock, [] (boost::system::error_code const & ec_a) {
EXPECT_FALSE (ec_a);
});
Expand Down Expand Up @@ -719,13 +719,13 @@ TEST (socket_timeout, read_overlapped)

// create a server socket
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ());
boost::asio::ip::tcp::acceptor acceptor (system.io_ctx);
boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx);
acceptor.open (endpoint.protocol ());
acceptor.bind (endpoint);
acceptor.listen (boost::asio::socket_base::max_listen_connections);

// asynchronously accept an incoming connection and send one byte only
boost::asio::ip::tcp::socket newsock (system.io_ctx);
boost::asio::ip::tcp::socket newsock (*system.io_ctx);
acceptor.async_accept (newsock, [&newsock] (boost::system::error_code const & ec_a) {
EXPECT_FALSE (ec_a);

Expand Down Expand Up @@ -777,13 +777,13 @@ TEST (socket_timeout, write_overlapped)

// create a server socket
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ());
boost::asio::ip::tcp::acceptor acceptor (system.io_ctx);
boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx);
acceptor.open (endpoint.protocol ());
acceptor.bind (endpoint);
acceptor.listen (boost::asio::socket_base::max_listen_connections);

// asynchronously accept an incoming connection and read 2 bytes only
boost::asio::ip::tcp::socket newsock (system.io_ctx);
boost::asio::ip::tcp::socket newsock (*system.io_ctx);
auto buffer = std::make_shared<std::vector<uint8_t>> (1);
acceptor.async_accept (newsock, [&newsock, &buffer] (boost::system::error_code const & ec_a) {
EXPECT_FALSE (ec_a);
Expand Down
12 changes: 8 additions & 4 deletions nano/lib/thread_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,20 @@
* thread_runner
*/

nano::thread_runner::thread_runner (boost::asio::io_context & io_ctx_a, unsigned num_threads, const nano::thread_role::name thread_role_a) :
io_guard{ boost::asio::make_work_guard (io_ctx_a) },
nano::thread_runner::thread_runner (std::shared_ptr<boost::asio::io_context> io_ctx_a, unsigned num_threads, const nano::thread_role::name thread_role_a) :
io_ctx{ io_ctx_a },
io_guard{ boost::asio::make_work_guard (*io_ctx_a) },
role{ thread_role_a }
{
debug_assert (io_ctx != nullptr);

for (auto i (0u); i < num_threads; ++i)
{
threads.emplace_back (nano::thread_attributes::get_default (), [this, &io_ctx_a] () {
threads.emplace_back (nano::thread_attributes::get_default (), [this] () {
nano::thread_role::set (role);
try
{
run (io_ctx_a);
run (*io_ctx);
}
catch (std::exception const & ex)
{
Expand Down Expand Up @@ -78,6 +81,7 @@ void nano::thread_runner::join ()
i.join ();
}
}
io_ctx.reset ();
}

void nano::thread_runner::stop_event_processing ()
Expand Down
6 changes: 4 additions & 2 deletions nano/lib/thread_runner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@ namespace nano
class thread_runner final
{
public:
thread_runner (boost::asio::io_context &, unsigned num_threads, nano::thread_role::name thread_role = nano::thread_role::name::io);
thread_runner (std::shared_ptr<boost::asio::io_context>, unsigned num_threads, nano::thread_role::name thread_role = nano::thread_role::name::io);
~thread_runner ();

/** Tells the IO context to stop processing events.*/
void stop_event_processing ();

/** Wait for IO threads to complete */
void join ();

private:
std::shared_ptr<boost::asio::io_context> io_ctx;
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> io_guard;
nano::thread_role::name const role;
std::vector<boost::thread> threads;
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> io_guard;

private:
void run (boost::asio::io_context &);
Expand Down
6 changes: 4 additions & 2 deletions nano/load_test/entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,8 @@ int main (int argc, char * const * argv)
std::this_thread::sleep_for (std::chrono::seconds (7));
std::cout << "Connecting nodes..." << std::endl;

boost::asio::io_context ioc;
std::shared_ptr<boost::asio::io_context> ioc_shared = std::make_shared<boost::asio::io_context> ();
boost::asio::io_context & ioc{ *ioc_shared };

debug_assert (!nano::signal_handler_impl);
nano::signal_handler_impl = [&ioc] () {
Expand Down Expand Up @@ -715,7 +716,8 @@ int main (int argc, char * const * argv)
// Stop main node
stop_rpc (ioc, primary_node_results);
});
nano::thread_runner runner (ioc, simultaneous_process_calls);

nano::thread_runner runner (ioc_shared, simultaneous_process_calls);
t.join ();
runner.join ();

Expand Down
22 changes: 15 additions & 7 deletions nano/nano_node/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ void nano::daemon::run (std::filesystem::path const & data_path, nano::node_flag
config.node.websocket_config.tls_config = tls_config;
}

boost::asio::io_context io_ctx;
std::shared_ptr<boost::asio::io_context> io_ctx = std::make_shared<boost::asio::io_context> ();

auto opencl = nano::opencl_work::create (config.opencl_enable, config.opencl, logger, config.node.network_params.work);
nano::opencl_work_func_t opencl_work_func;
if (opencl)
Expand Down Expand Up @@ -132,7 +133,7 @@ void nano::daemon::run (std::filesystem::path const & data_path, nano::node_flag
config.node.peering_port = network_params.network.default_node_port;
}

auto node (std::make_shared<nano::node> (io_ctx, data_path, config.node, opencl_work, flags));
auto node = std::make_shared<nano::node> (io_ctx, data_path, config.node, opencl_work, flags);
if (!node->init_error ())
{
auto network_label = node->network_params.network.get_current_network_as_string ();
Expand Down Expand Up @@ -165,10 +166,14 @@ void nano::daemon::run (std::filesystem::path const & data_path, nano::node_flag
}

rpc_config.tls_config = tls_config;
rpc_handler = std::make_unique<nano::inprocess_rpc_handler> (*node, ipc_server, config.rpc, [&ipc_server, &workers = node->workers, &io_ctx] () {
rpc_handler = std::make_unique<nano::inprocess_rpc_handler> (*node, ipc_server, config.rpc,
[&ipc_server, &workers = node->workers, io_ctx_w = std::weak_ptr{ io_ctx }] () {
ipc_server.stop ();
workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (3), [&io_ctx] () {
io_ctx.stop ();
workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (3), [io_ctx_w] () {
if (auto io_ctx_l = io_ctx_w.lock ())
{
io_ctx_l->stop ();
}
});
});
rpc = nano::get_rpc (io_ctx, rpc_config, *rpc_handler);
Expand All @@ -189,10 +194,13 @@ void nano::daemon::run (std::filesystem::path const & data_path, nano::node_flag
}

debug_assert (!nano::signal_handler_impl);
nano::signal_handler_impl = [this, &io_ctx] () {
nano::signal_handler_impl = [this, io_ctx_w = std::weak_ptr{ io_ctx }] () {
logger.warn (nano::log::type::daemon, "Interrupt signal received, stopping...");

io_ctx.stop ();
if (auto io_ctx_l = io_ctx_w.lock ())
{
io_ctx_l->stop ();
}
sig_int_or_term = 1;
};

Expand Down
8 changes: 4 additions & 4 deletions nano/nano_node/entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1129,8 +1129,8 @@ int main (int argc, char * const * argv)
}
}
std::cout << boost::str (boost::format ("Starting generating %1% blocks...\n") % (count * 2));
boost::asio::io_context io_ctx1;
boost::asio::io_context io_ctx2;
auto io_ctx1 = std::make_shared<boost::asio::io_context> ();
auto io_ctx2 = std::make_shared<boost::asio::io_context> ();
nano::work_pool work{ network_params.network, std::numeric_limits<unsigned>::max () };
auto path1 (nano::unique_path ());
auto path2 (nano::unique_path ());
Expand Down Expand Up @@ -1283,8 +1283,8 @@ int main (int argc, char * const * argv)
auto end (std::chrono::high_resolution_clock::now ());
auto time (std::chrono::duration_cast<std::chrono::microseconds> (end - begin).count ());
std::cout << boost::str (boost::format ("%|1$ 12d| us \n%2% frontiers per second\n") % time % ((count + 1) * 1000000 / time));
io_ctx1.stop ();
io_ctx2.stop ();
io_ctx1->stop ();
io_ctx2->stop ();
runner1.join ();
runner2.join ();
node1->stop ();
Expand Down
14 changes: 10 additions & 4 deletions nano/nano_rpc/entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,23 @@ void run (std::filesystem::path const & data_path, std::vector<std::string> cons
rpc_config.tls_config = tls_config;
}

boost::asio::io_context io_ctx;
std::shared_ptr<boost::asio::io_context> io_ctx = std::make_shared<boost::asio::io_context> ();

nano::signal_manager sigman;
try
{
nano::ipc_rpc_processor ipc_rpc_processor (io_ctx, rpc_config);
nano::ipc_rpc_processor ipc_rpc_processor (*io_ctx, rpc_config);
auto rpc = nano::get_rpc (io_ctx, rpc_config, ipc_rpc_processor);
rpc->start ();

debug_assert (!nano::signal_handler_impl);
nano::signal_handler_impl = [&io_ctx] () {
io_ctx.stop ();
nano::signal_handler_impl = [io_ctx_w = std::weak_ptr{ io_ctx }] () {
logger.warn (nano::log::type::daemon, "Interrupt signal received, stopping...");

if (auto io_ctx_l = io_ctx_w.lock ())
{
io_ctx_l->stop ();
}
sig_int_or_term = 1;
};

Expand Down
3 changes: 2 additions & 1 deletion nano/nano_wallet/entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ int run_wallet (QApplication & application, int argc, char * const * argv, std::
config.node.websocket_config.tls_config = tls_config;
}

boost::asio::io_context io_ctx;
std::shared_ptr<boost::asio::io_context> io_ctx = std::make_shared<boost::asio::io_context> ();

nano::thread_runner runner (io_ctx, config.node.io_threads);

std::shared_ptr<nano::node> node;
Expand Down
9 changes: 5 additions & 4 deletions nano/node/ipc/ipc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,12 +463,13 @@ class socket_transport : public nano::ipc::transport
{
public:
socket_transport (nano::ipc::ipc_server & server_a, ENDPOINT_TYPE endpoint_a, nano::ipc::ipc_config_transport & config_transport_a, int concurrency_a) :
server (server_a), config_transport (config_transport_a)
server (server_a),
config_transport (config_transport_a)
{
// Using a per-transport event dispatcher?
if (concurrency_a > 0)
{
io_ctx = std::make_unique<boost::asio::io_context> ();
io_ctx = std::make_shared<boost::asio::io_context> ();
}

boost::asio::socket_base::reuse_address option (true);
Expand All @@ -482,7 +483,7 @@ class socket_transport : public nano::ipc::transport
// A separate io_context for domain sockets may facilitate better performance on some systems.
if (concurrency_a > 0)
{
runner = std::make_unique<nano::thread_runner> (*io_ctx, static_cast<unsigned> (concurrency_a));
runner = std::make_unique<nano::thread_runner> (io_ctx, static_cast<unsigned> (concurrency_a));
}
}

Expand Down Expand Up @@ -544,7 +545,7 @@ class socket_transport : public nano::ipc::transport
nano::ipc::ipc_server & server;
nano::ipc::ipc_config_transport & config_transport;
std::unique_ptr<nano::thread_runner> runner;
std::unique_ptr<boost::asio::io_context> io_ctx;
std::shared_ptr<boost::asio::io_context> io_ctx;
std::unique_ptr<ACCEPTOR_TYPE> acceptor;
};

Expand Down
Loading
Loading