From dfa6bd26ec605e8a74b1791beba540a66dcd7fcc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 20 Jan 2025 16:10:29 +0100 Subject: [PATCH] Extract `vote_rebroadcaster` --- nano/core_test/vote_processor.cpp | 4 +- nano/lib/stats_enums.hpp | 5 ++ nano/lib/thread_roles.cpp | 3 + nano/lib/thread_roles.hpp | 1 + nano/node/CMakeLists.txt | 2 + nano/node/fwd.hpp | 4 +- nano/node/node.cpp | 21 ++--- nano/node/node.hpp | 2 + nano/node/vote_rebroadcaster.cpp | 130 ++++++++++++++++++++++++++++++ nano/node/vote_rebroadcaster.hpp | 49 +++++++++++ 10 files changed, 202 insertions(+), 19 deletions(-) create mode 100644 nano/node/vote_rebroadcaster.cpp create mode 100644 nano/node/vote_rebroadcaster.hpp diff --git a/nano/core_test/vote_processor.cpp b/nano/core_test/vote_processor.cpp index 8f93ab726e..57a3c6a05c 100644 --- a/nano/core_test/vote_processor.cpp +++ b/nano/core_test/vote_processor.cpp @@ -248,8 +248,8 @@ TEST (vote_processor, local_broadcast_without_a_representative) ASSERT_NE (votes.end (), existing); ASSERT_EQ (vote->timestamp (), existing->second.timestamp); // Ensure the vote was broadcast - ASSERT_EQ (1, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); - ASSERT_EQ (1, node.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::out)); + ASSERT_TIMELY_EQ (5s, 1, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); + ASSERT_TIMELY_EQ (5s, 1, node.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::out)); } // Issue that tracks last changes on this test: https://github.com/nanocurrency/nano-node/issues/3485 diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 51d8a9b9ce..86e09190c9 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -24,6 +24,7 @@ enum class type vote_processor, vote_processor_tier, vote_processor_overfill, + vote_rebroadcaster, election, election_cleanup, election_vote, @@ -172,6 +173,7 @@ enum class detail queued, error, failed, + refresh, // processing queue queue, @@ -655,6 +657,9 @@ enum class detail pruned_count, collect_targets, + // vote_rebroadcaster + rebroadcast_hashes, + _last // Must be the last enum }; diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 5684d7f218..7f5e349e2d 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -37,6 +37,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::vote_cache_processing: thread_role_name_string = "Vote cache proc"; break; + case nano::thread_role::name::vote_rebroadcasting: + thread_role_name_string = "Vote rebroad"; + break; case nano::thread_role::name::block_processing: thread_role_name_string = "Blck processing"; break; diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index 789739881d..2f359177c5 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -17,6 +17,7 @@ enum class name message_processing, vote_processing, vote_cache_processing, + vote_rebroadcasting, block_processing, ledger_notifications, request_loop, diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 4a90531d0e..f1d17464a6 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -195,6 +195,8 @@ add_library( vote_generator.cpp vote_processor.hpp vote_processor.cpp + vote_rebroadcaster.hpp + vote_rebroadcaster.cpp vote_router.hpp vote_router.cpp vote_spacing.hpp diff --git a/nano/node/fwd.hpp b/nano/node/fwd.hpp index d9ff17e382..95d1e11536 100644 --- a/nano/node/fwd.hpp +++ b/nano/node/fwd.hpp @@ -40,10 +40,9 @@ class telemetry; class unchecked_map; class stats; class vote_cache; -enum class vote_code; -enum class vote_source; class vote_generator; class vote_processor; +class vote_rebroadcaster; class vote_router; class vote_spacing; class wallets; @@ -52,6 +51,7 @@ enum class block_source; enum class election_behavior; enum class election_state; enum class vote_code; +enum class vote_source; } namespace nano::scheduler diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 7030b1e56a..8de537f9b1 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -42,6 +42,7 @@ #include #include #include +#include #include #include #include @@ -202,6 +203,8 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy http_callbacks{ *http_callbacks_impl }, pruning_impl{ std::make_unique (config, flags, ledger, stats, logger) }, pruning{ *pruning_impl }, + vote_rebroadcaster_impl{ std::make_unique (vote_router, network, wallets, stats, logger) }, + vote_rebroadcaster{ *vote_rebroadcaster_impl }, startup_time{ std::chrono::steady_clock::now () }, node_seq{ seq } { @@ -223,21 +226,6 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy } }); - // Republish vote if it is new and the node does not host a principal representative (or close to) - vote_router.vote_processed.add ([this] (std::shared_ptr const & vote, nano::vote_source source, std::unordered_map const & results) { - bool processed = std::any_of (results.begin (), results.end (), [] (auto const & result) { - return result.second == nano::vote_code::vote; - }); - if (processed) - { - auto const reps = wallets.reps (); - if (!reps.have_half_rep () && !reps.exists (vote->account)) - { - network.flood_vote (vote, 0.5f, /* rebroadcasted */ true); - } - } - }); - // Do some cleanup due to this block never being processed by confirmation height processor confirming_set.cementing_failed.add ([this] (auto const & hash) { active.recently_confirmed.erase (hash); @@ -575,6 +563,7 @@ void nano::node::start () monitor.start (); http_callbacks.start (); pruning.start (); + vote_rebroadcaster.start (); add_initial_peers (); } @@ -625,6 +614,7 @@ void nano::node::stop () monitor.stop (); http_callbacks.stop (); pruning.stop (); + vote_rebroadcaster.stop (); bootstrap_workers.stop (); wallet_workers.stop (); @@ -993,6 +983,7 @@ nano::container_info nano::node::container_info () const info.add ("bounded_backlog", backlog.container_info ()); info.add ("http_callbacks", http_callbacks.container_info ()); info.add ("pruning", pruning.container_info ()); + info.add ("vote_rebroadcaster", vote_rebroadcaster.container_info ()); return info; } diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 35e753cbda..0c3a58f3e5 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -203,6 +203,8 @@ class node final : public std::enable_shared_from_this nano::http_callbacks & http_callbacks; std::unique_ptr pruning_impl; nano::pruning & pruning; + std::unique_ptr vote_rebroadcaster_impl; + nano::vote_rebroadcaster & vote_rebroadcaster; public: std::chrono::steady_clock::time_point const startup_time; diff --git a/nano/node/vote_rebroadcaster.cpp b/nano/node/vote_rebroadcaster.cpp new file mode 100644 index 0000000000..c4485f82ad --- /dev/null +++ b/nano/node/vote_rebroadcaster.cpp @@ -0,0 +1,130 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +nano::vote_rebroadcaster::vote_rebroadcaster (nano::vote_router & vote_router_a, nano::network & network_a, nano::wallets & wallets_a, nano::stats & stats_a, nano::logger & logger_a) : + vote_router{ vote_router_a }, + network{ network_a }, + wallets{ wallets_a }, + stats{ stats_a }, + logger{ logger_a } +{ + vote_router.vote_processed.add ([this] (std::shared_ptr const & vote, nano::vote_source source, std::unordered_map const & results) { + bool processed = std::any_of (results.begin (), results.end (), [] (auto const & result) { + return result.second == nano::vote_code::vote; + }); + if (processed && enable) + { + put (vote); + } + }); +} + +nano::vote_rebroadcaster::~vote_rebroadcaster () +{ + debug_assert (!thread.joinable ()); +} + +void nano::vote_rebroadcaster::start () +{ + debug_assert (!thread.joinable ()); + + thread = std::thread ([this] () { + nano::thread_role::set (nano::thread_role::name::vote_rebroadcasting); + run (); + }); +} + +void nano::vote_rebroadcaster::stop () +{ + { + std::lock_guard guard{ mutex }; + stopped = true; + } + condition.notify_all (); + if (thread.joinable ()) + { + thread.join (); + } +} + +bool nano::vote_rebroadcaster::put (std::shared_ptr const & vote) +{ + bool added{ false }; + { + std::lock_guard guard{ mutex }; + if (queue.size () < max_queue) + { + if (!reps.exists (vote->account)) + { + queue.push_back (vote); + added = true; + } + } + } + if (added) + { + stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::queued); + condition.notify_one (); + } + else + { + stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::overfill); + } + return added; +} + +void nano::vote_rebroadcaster::run () +{ + std::unique_lock lock{ mutex }; + while (!stopped) + { + condition.wait (lock, [&] { + return stopped || !queue.empty (); + }); + + if (stopped) + { + return; + } + + stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::loop); + + if (refresh_interval.elapse (15s)) + { + stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::refresh); + + reps = wallets.reps (); + enable = !reps.have_half_rep (); // Disable vote rebroadcasting if the node has a principal representative (or close to) + } + + if (!queue.empty ()) + { + auto vote = queue.front (); + queue.pop_front (); + + lock.unlock (); + + stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::rebroadcast); + stats.add (nano::stat::type::vote_rebroadcaster, nano::stat::detail::rebroadcast_hashes, vote->hashes.size ()); + network.flood_vote (vote, 0.5f, /* rebroadcasted */ true); // TODO: Track number of peers that we sent the vote to + + lock.lock (); + } + } +} + +nano::container_info nano::vote_rebroadcaster::container_info () const +{ + std::lock_guard guard{ mutex }; + + nano::container_info info; + info.put ("queue", queue.size ()); + return info; +} \ No newline at end of file diff --git a/nano/node/vote_rebroadcaster.hpp b/nano/node/vote_rebroadcaster.hpp new file mode 100644 index 0000000000..20513e0a1b --- /dev/null +++ b/nano/node/vote_rebroadcaster.hpp @@ -0,0 +1,49 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include + +namespace nano +{ +class vote_rebroadcaster final +{ +public: + static size_t constexpr max_queue = 1024 * 16; + +public: + vote_rebroadcaster (nano::vote_router &, nano::network &, nano::wallets &, nano::stats &, nano::logger &); + ~vote_rebroadcaster (); + + void start (); + void stop (); + + bool put (std::shared_ptr const &); + + nano::container_info container_info () const; + +public: // Dependencies + nano::vote_router & vote_router; + nano::network & network; + nano::wallets & wallets; + nano::stats & stats; + nano::logger & logger; + +private: + void run (); + + std::atomic enable{ true }; // Enable vote rebroadcasting only if the node does not host a representative + std::deque> queue; + nano::wallet_representatives reps; + nano::interval refresh_interval; + + bool stopped{ false }; + std::condition_variable condition; + mutable std::mutex mutex; + std::thread thread; +}; +} \ No newline at end of file