Skip to content

Commit

Permalink
Rework online reps tracking callback
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Mar 3, 2025
1 parent e992597 commit f9ed20c
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 34 deletions.
3 changes: 0 additions & 3 deletions nano/core_test/election.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,6 @@ TEST (election, quorum_minimum_confirm_fail)
ASSERT_FALSE (election->confirmed ());
}

namespace nano
{
// FIXME: this test fails on rare occasions. It needs a review.
TEST (election, quorum_minimum_update_weight_before_quorum_checks)
{
Expand Down Expand Up @@ -267,7 +265,6 @@ TEST (election, quorum_minimum_update_weight_before_quorum_checks)
ASSERT_TIMELY (5s, election->confirmed ());
ASSERT_NE (nullptr, node1.block (send1->hash ()));
}
}

TEST (election, continuous_voting)
{
Expand Down
10 changes: 6 additions & 4 deletions nano/node/active_elections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,12 +406,14 @@ nano::election_insertion_result nano::active_elections::insert (std::shared_ptr<
if (!recently_confirmed.exists (root))
{
result.inserted = true;
auto observe_rep_cb = [&node = node] (auto const & rep_a) {
// TODO: Is this neccessary? Move this outside of the election class
// Representative is defined as online if replying to live votes or rep_crawler queries

// Passing this callback into the election is important
// We need to observe and update the online voting weight *before* election quorum is checked
auto observe_rep_callback = [&node = node] (auto const & rep_a) {
node.online_reps.observe (rep_a);
};
result.election = nano::make_shared<nano::election> (node, block_a, nullptr, observe_rep_cb, election_behavior_a);
result.election = nano::make_shared<nano::election> (node, block_a, nullptr, observe_rep_callback, election_behavior_a);

roots.get<tag_root> ().emplace (entry{ root, result.election, std::move (erased_callback_a) });
node.vote_router.connect (hash, result.election);

Expand Down
18 changes: 11 additions & 7 deletions nano/node/election.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ std::chrono::milliseconds nano::election::base_latency () const
* election
*/

nano::election::election (nano::node & node_a, std::shared_ptr<nano::block> const & block_a, std::function<void (std::shared_ptr<nano::block> const &)> const & confirmation_action_a, std::function<void (nano::account const &)> const & live_vote_action_a, nano::election_behavior election_behavior_a) :
nano::election::election (nano::node & node_a, std::shared_ptr<nano::block> const & block_a, std::function<void (std::shared_ptr<nano::block> const &)> const & confirmation_action_a, std::function<void (nano::account const &)> const & vote_action_a, nano::election_behavior election_behavior_a) :
confirmation_action (confirmation_action_a),
live_vote_action (live_vote_action_a),
vote_action (vote_action_a),
node (node_a),
behavior_m (election_behavior_a),
status (block_a),
Expand Down Expand Up @@ -502,7 +502,8 @@ std::shared_ptr<nano::block> nano::election::find (nano::block_hash const & hash

nano::vote_code nano::election::vote (nano::account const & rep, uint64_t timestamp_a, nano::block_hash const & block_hash_a, nano::vote_source vote_source_a)
{
auto weight = node.ledger.weight (rep);
auto const weight = node.ledger.weight (rep);

if (!node.network_params.network.is_dev_network () && weight <= node.minimum_principal_weight ())
{
return vote_code::indeterminate;
Expand Down Expand Up @@ -538,11 +539,8 @@ nano::vote_code nano::election::vote (nano::account const & rep, uint64_t timest
}
}

// Update voter list entry
last_votes[rep] = { std::chrono::steady_clock::now (), timestamp_a, block_hash_a };
if (vote_source_a != vote_source::cache)
{
live_vote_action (rep);
}

node.stats.inc (nano::stat::type::election, nano::stat::detail::vote);
node.stats.inc (nano::stat::type::election_vote, to_stat_detail (vote_source_a));
Expand All @@ -565,6 +563,12 @@ nano::vote_code nano::election::vote (nano::account const & rep, uint64_t timest
weight,
to_string (vote_source_a));

// This must execute before calculating the vote tally to ensure accurate online weight and quorum numbers are used
if (vote_action)
{
vote_action (rep);
}

if (!confirmed_locked ())
{
confirm_if_quorum (lock);
Expand Down
4 changes: 3 additions & 1 deletion nano/node/election.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ class election final : public std::enable_shared_from_this<election>
private:
// Minimum time between broadcasts of the current winner of an election, as a backup to requesting confirmations
std::chrono::milliseconds base_latency () const;

// Callbacks
std::function<void (std::shared_ptr<nano::block> const &)> confirmation_action;
std::function<void (nano::account const &)> live_vote_action;
std::function<void (nano::account const &)> vote_action;

private: // State management
static unsigned constexpr passive_duration_factor = 5;
Expand Down
40 changes: 21 additions & 19 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,27 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
}
});

// Representative is defined as online if replying to live votes or rep crawler queries
observers.vote.add ([this] (std::shared_ptr<nano::vote> vote, std::shared_ptr<nano::transport::channel> const & channel, nano::vote_source source, nano::vote_code code) {
release_assert (vote != nullptr);
release_assert (channel != nullptr);
debug_assert (code != nano::vote_code::invalid);

// Track rep weight voting on live elections
bool should_observe = (code != nano::vote_code::indeterminate);

// Ignore republished votes when rep crawling
if (source == nano::vote_source::live)
{
should_observe |= rep_crawler.process (vote, channel);
}

if (should_observe)
{
online_reps.observe (vote->account);
}
});

if (!init_error ())
{
wallets.observer = [this] (bool active) {
Expand All @@ -265,25 +286,6 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
network.send_keepalive_self (channel);
});

observers.vote.add ([this] (std::shared_ptr<nano::vote> vote, std::shared_ptr<nano::transport::channel> const & channel, nano::vote_source source, nano::vote_code code) {
debug_assert (vote != nullptr);
debug_assert (code != nano::vote_code::invalid);
if (channel == nullptr)
{
return; // Channel expired when waiting for vote to be processed
}
// Ignore republished votes
if (source == nano::vote_source::live)
{
bool active_in_rep_crawler = rep_crawler.process (vote, channel);
if (active_in_rep_crawler)
{
// Representative is defined as online if replying to live votes or rep_crawler queries
online_reps.observe (vote->account);
}
}
});

// Cancelling local work generation
observers.work_cancel.add ([this] (nano::root const & root_a) {
this->work.cancel (root_a);
Expand Down

0 comments on commit f9ed20c

Please sign in to comment.