Skip to content

Commit

Permalink
Merge pull request #4855 from gr0vity-dev/prs/remove_websocket_confir…
Browse files Browse the repository at this point in the history
…mation_topic_cache

Fix websocket confirmation options handling
  • Loading branch information
pwojcikdev authored Mar 6, 2025
2 parents ef5f90c + 0c6645a commit 2c7c736
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 44 deletions.
103 changes: 103 additions & 0 deletions nano/core_test/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1146,3 +1146,106 @@ TEST (websocket, new_unconfirmed_block)
ASSERT_EQ ("state", message_contents.get<std::string> ("type"));
ASSERT_EQ ("send", message_contents.get<std::string> ("subtype"));
}

// Test verifying that multiple subscribers with different options receive messages with their correct
// individual settings applied (specifically targeting the bug that was fixed)
TEST (websocket, confirmation_options_independent)
{
nano::test::system system;
nano::node_config config = system.default_config ();
config.websocket_config.enabled = true;
config.websocket_config.port = system.get_available_port ();
auto node1 (system.add_node (config));

// First prepare a block we'll confirm later
nano::keypair key;
nano::state_block_builder builder;
system.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv);
auto prev_balance = nano::dev::constants.genesis_amount;
auto send_amount = node1->online_reps.delta () + 1;
auto new_balance = prev_balance - send_amount;
nano::block_hash previous (node1->latest (nano::dev::genesis_key.pub));

auto send = builder
.account (nano::dev::genesis_key.pub)
.previous (previous)
.representative (nano::dev::genesis_key.pub)
.balance (new_balance)
.link (key.pub)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*system.work.generate (previous))
.build ();

// Set up two concurrent tasks to subscribe with different options and wait for responses
std::atomic<bool> client1_done{ false };
std::atomic<bool> client2_done{ false };
boost::optional<std::string> client1_response;
boost::optional<std::string> client2_response;

// Client 1: Subscribe with include_block = true but no sideband
auto client1_task = ([&client1_done, &client1_response, &node1] () {
fake_websocket_client client (node1->websocket.server->listening_port ());
client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"include_block": "true", "include_sideband_info": "false"}})json");
client.await_ack ();
auto response = client.get_response ();
client1_response = response;
client1_done = true;
});

// Client 2: Subscribe with include_block = true AND include_sideband_info = true
auto client2_task = ([&client2_done, &client2_response, &node1] () {
fake_websocket_client client (node1->websocket.server->listening_port ());
client.send_message (R"json({"action": "subscribe", "topic": "confirmation", "ack": "true", "options": {"include_block": "true", "include_sideband_info": "true"}})json");
client.await_ack ();
auto response = client.get_response ();
client2_response = response;
client2_done = true;
});

// Start both client tasks concurrently
auto future1 = std::async (std::launch::async, client1_task);
auto future2 = std::async (std::launch::async, client2_task);

// Wait for both clients to be set up (both awaiting notifications)
ASSERT_TIMELY (5s, node1->websocket.server->subscriber_count (nano::websocket::topic::confirmation) == 2);

// Now process the block to trigger notifications to both clients
node1->process_active (send);

// Wait for both clients to receive their responses
ASSERT_TIMELY (5s, client1_done && client2_done);

// Verify both clients got responses
ASSERT_TRUE (client1_response.has_value ());
ASSERT_TRUE (client2_response.has_value ());

// Parse and check client1 response (should have block but no sideband)
boost::property_tree::ptree event1;
std::stringstream stream1;
stream1 << client1_response.get ();
boost::property_tree::read_json (stream1, event1);
ASSERT_EQ (event1.get<std::string> ("topic"), "confirmation");

auto & message1 = event1.get_child ("message");
ASSERT_EQ (1, message1.count ("block"));
ASSERT_EQ (0, message1.count ("sideband"));

// Parse and check client2 response (should have both block AND sideband)
boost::property_tree::ptree event2;
std::stringstream stream2;
stream2 << client2_response.get ();
boost::property_tree::read_json (stream2, event2);
ASSERT_EQ (event2.get<std::string> ("topic"), "confirmation");

auto & message2 = event2.get_child ("message");
ASSERT_EQ (1, message2.count ("block"));

// With the old caching code, this would fail because client2 would receive the same
// message as client1 (with no sideband info) despite requesting it
ASSERT_EQ (1, message2.count ("sideband"));

// Verify sideband contains expected fields
auto & sideband = message2.get_child ("sideband");
ASSERT_EQ (1, sideband.count ("height"));
ASSERT_EQ (1, sideband.count ("local_timestamp"));
}
61 changes: 25 additions & 36 deletions nano/node/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -659,13 +659,11 @@ void nano::websocket::listener::on_accept (boost::system::error_code ec)
}
}

void nano::websocket::listener::broadcast_confirmation (std::shared_ptr<nano::block> const & block_a, nano::account const & account_a, nano::amount const & amount_a, std::string const & subtype, nano::election_status const & election_status_a, std::vector<nano::vote_with_weight_info> const & election_votes_a)
void nano::websocket::listener::broadcast_confirmation (std::shared_ptr<nano::block> const & block, nano::account const & account, nano::amount const & amount, std::string const & subtype, nano::election_status const & election_status, std::vector<nano::vote_with_weight_info> const & election_votes)
{
nano::websocket::message_builder builder{ node.ledger };

nano::lock_guard<nano::mutex> lk (sessions_mutex);
boost::optional<nano::websocket::message> msg_with_block;
boost::optional<nano::websocket::message> msg_without_block;
for (auto & weak_session : sessions)
{
auto session_ptr (weak_session.lock ());
Expand All @@ -680,18 +678,9 @@ void nano::websocket::listener::broadcast_confirmation (std::shared_ptr<nano::bl
{
conf_options = &default_options;
}
auto include_block (conf_options == nullptr ? true : conf_options->get_include_block ());

if (include_block && !msg_with_block)
{
msg_with_block = builder.block_confirmed (block_a, account_a, amount_a, subtype, include_block, election_status_a, election_votes_a, *conf_options);
}
else if (!include_block && !msg_without_block)
{
msg_without_block = builder.block_confirmed (block_a, account_a, amount_a, subtype, include_block, election_status_a, election_votes_a, *conf_options);
}

session_ptr->write (include_block ? msg_with_block.get () : msg_without_block.get ());
auto message = builder.block_confirmed (block, account, amount, subtype, election_status, election_votes, *conf_options);
session_ptr->write (message);
}
}
}
Expand Down Expand Up @@ -751,19 +740,19 @@ nano::websocket::message nano::websocket::message_builder::stopped_election (nan
return message_l;
}

nano::websocket::message nano::websocket::message_builder::block_confirmed (std::shared_ptr<nano::block> const & block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype, bool include_block_a, nano::election_status const & election_status_a, std::vector<nano::vote_with_weight_info> const & election_votes_a, nano::websocket::confirmation_options const & options_a)
nano::websocket::message nano::websocket::message_builder::block_confirmed (std::shared_ptr<nano::block> const & block, nano::account const & account, nano::amount const & amount, std::string subtype, nano::election_status const & election_status, std::vector<nano::vote_with_weight_info> const & election_votes, nano::websocket::confirmation_options const & options)
{
nano::websocket::message message_l (nano::websocket::topic::confirmation);
set_common_fields (message_l);

// Block confirmation properties
boost::property_tree::ptree message_node_l;
message_node_l.add ("account", account_a.to_account ());
message_node_l.add ("amount", amount_a.to_string_dec ());
message_node_l.add ("hash", block_a->hash ().to_string ());
message_node_l.add ("account", account.to_account ());
message_node_l.add ("amount", amount.to_string_dec ());
message_node_l.add ("hash", block->hash ().to_string ());

std::string confirmation_type = "unknown";
switch (election_status_a.type)
switch (election_status.type)
{
case nano::election_status_type::active_confirmed_quorum:
confirmation_type = "active_quorum";
Expand All @@ -779,20 +768,20 @@ nano::websocket::message nano::websocket::message_builder::block_confirmed (std:
};
message_node_l.add ("confirmation_type", confirmation_type);

if (options_a.get_include_election_info () || options_a.get_include_election_info_with_votes ())
if (options.get_include_election_info () || options.get_include_election_info_with_votes ())
{
boost::property_tree::ptree election_node_l;
election_node_l.add ("duration", election_status_a.election_duration.count ());
election_node_l.add ("time", election_status_a.election_end.count ());
election_node_l.add ("tally", election_status_a.tally.to_string_dec ());
election_node_l.add ("final", election_status_a.final_tally.to_string_dec ());
election_node_l.add ("blocks", std::to_string (election_status_a.block_count));
election_node_l.add ("voters", std::to_string (election_status_a.voter_count));
election_node_l.add ("request_count", std::to_string (election_status_a.confirmation_request_count));
if (options_a.get_include_election_info_with_votes ())
election_node_l.add ("duration", election_status.election_duration.count ());
election_node_l.add ("time", election_status.election_end.count ());
election_node_l.add ("tally", election_status.tally.to_string_dec ());
election_node_l.add ("final", election_status.final_tally.to_string_dec ());
election_node_l.add ("blocks", std::to_string (election_status.block_count));
election_node_l.add ("voters", std::to_string (election_status.voter_count));
election_node_l.add ("request_count", std::to_string (election_status.confirmation_request_count));
if (options.get_include_election_info_with_votes ())
{
boost::property_tree::ptree election_votes_l;
for (auto const & vote_l : election_votes_a)
for (auto const & vote_l : election_votes)
{
boost::property_tree::ptree entry;
entry.put ("representative", vote_l.representative.to_account ());
Expand All @@ -806,13 +795,13 @@ nano::websocket::message nano::websocket::message_builder::block_confirmed (std:
message_node_l.add_child ("election_info", election_node_l);
}

if (include_block_a)
if (options.get_include_block ())
{
boost::property_tree::ptree block_node_l;
block_a->serialize_json (block_node_l);
if (options_a.get_include_linked_account ())
block->serialize_json (block_node_l);
if (options.get_include_linked_account ())
{
auto linked_account = ledger.linked_account (ledger.tx_begin_read (), *block_a);
auto linked_account = ledger.linked_account (ledger.tx_begin_read (), *block);
if (linked_account.has_value ())
{
block_node_l.add ("linked_account", linked_account.value ().to_account ());
Expand All @@ -829,11 +818,11 @@ nano::websocket::message nano::websocket::message_builder::block_confirmed (std:
message_node_l.add_child ("block", block_node_l);
}

if (options_a.get_include_sideband_info ())
if (options.get_include_sideband_info ())
{
boost::property_tree::ptree sideband_node_l;
sideband_node_l.add ("height", std::to_string (block_a->sideband ().height));
sideband_node_l.add ("local_timestamp", std::to_string (block_a->sideband ().timestamp));
sideband_node_l.add ("height", std::to_string (block->sideband ().height));
sideband_node_l.add ("local_timestamp", std::to_string (block->sideband ().timestamp));
message_node_l.add_child ("sideband", sideband_node_l);
}

Expand Down
16 changes: 8 additions & 8 deletions nano/node/websocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ namespace websocket
public:
message_builder (nano::ledger & ledger);

message block_confirmed (std::shared_ptr<nano::block> const & block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype, bool include_block, nano::election_status const & election_status_a, std::vector<nano::vote_with_weight_info> const & election_votes_a, nano::websocket::confirmation_options const & options_a);
message started_election (nano::block_hash const & hash_a);
message stopped_election (nano::block_hash const & hash_a);
message vote_received (std::shared_ptr<nano::vote> const & vote_a, nano::vote_code code_a);
message work_generation (nano::work_version const version_a, nano::block_hash const & root_a, uint64_t const work_a, uint64_t const difficulty_a, uint64_t const publish_threshold_a, std::chrono::milliseconds const & duration_a, std::string const & peer_a, std::vector<std::string> const & bad_peers_a, bool const completed_a = true, bool const cancelled_a = false);
message work_cancelled (nano::work_version const version_a, nano::block_hash const & root_a, uint64_t const difficulty_a, uint64_t const publish_threshold_a, std::chrono::milliseconds const & duration_a, std::vector<std::string> const & bad_peers_a);
message work_failed (nano::work_version const version_a, nano::block_hash const & root_a, uint64_t const difficulty_a, uint64_t const publish_threshold_a, std::chrono::milliseconds const & duration_a, std::vector<std::string> const & bad_peers_a);
message block_confirmed (std::shared_ptr<nano::block> const & block, nano::account const & account, nano::amount const & amount, std::string subtype, nano::election_status const & election_status, std::vector<nano::vote_with_weight_info> const & election_votes, nano::websocket::confirmation_options const & options);
message started_election (nano::block_hash const & hash);
message stopped_election (nano::block_hash const & hash);
message vote_received (std::shared_ptr<nano::vote> const & vote, nano::vote_code code);
message work_generation (nano::work_version const version, nano::block_hash const & root, uint64_t work, uint64_t difficulty, uint64_t publish_threshold, std::chrono::milliseconds const & duration, std::string const & peer, std::vector<std::string> const & bad_peers, bool completed = true, bool cancelled = false);
message work_cancelled (nano::work_version const version, nano::block_hash const & root, uint64_t difficulty, uint64_t publish_threshold, std::chrono::milliseconds const & duration, std::vector<std::string> const & bad_peers);
message work_failed (nano::work_version const version, nano::block_hash const & root, uint64_t difficulty, uint64_t publish_threshold, std::chrono::milliseconds const & duration, std::vector<std::string> const & bad_peers);
message bootstrap_started (std::string const & id_a, std::string const & mode_a);
message bootstrap_exited (std::string const & id_a, std::string const & mode_a, std::chrono::steady_clock::time_point const start_time_a, uint64_t const total_blocks_a);
message telemetry_received (nano::telemetry_data const &, nano::endpoint const &);
Expand Down Expand Up @@ -325,7 +325,7 @@ namespace websocket
void stop ();

/** Broadcast block confirmation. The content of the message depends on subscription options (such as "include_block") */
void broadcast_confirmation (std::shared_ptr<nano::block> const & block_a, nano::account const & account_a, nano::amount const & amount_a, std::string const & subtype, nano::election_status const & election_status_a, std::vector<nano::vote_with_weight_info> const & election_votes_a);
void broadcast_confirmation (std::shared_ptr<nano::block> const & block, nano::account const & account, nano::amount const & amount, std::string const & subtype, nano::election_status const & election_status, std::vector<nano::vote_with_weight_info> const & election_votes);

/** Broadcast \p message to all session subscribing to the message topic. */
void broadcast (nano::websocket::message message_a);
Expand Down

0 comments on commit 2c7c736

Please sign in to comment.