Skip to content

Commit

Permalink
GH-1519 Use a std::variant for hotstuff messages
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Aug 28, 2023
1 parent 32e9243 commit 6af3110
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 167 deletions.
2 changes: 2 additions & 0 deletions libraries/chain/include/eosio/chain/hotstuff.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ namespace eosio::chain {
quorum_certificate high_qc; //justification
};

using hs_message = std::variant<hs_vote_message, hs_proposal_message, hs_new_block_message, hs_new_view_message>;

struct finalizer_state {

bool chained_mode = false;
Expand Down
34 changes: 16 additions & 18 deletions libraries/hotstuff/chain_pacemaker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,21 +107,10 @@ namespace eosio { namespace hotstuff {
{
}

void chain_pacemaker::register_bcast_functions(
std::function<void(const chain::hs_proposal_message&)> on_proposal_message,
std::function<void(const chain::hs_vote_message&)> on_vote_message,
std::function<void(const chain::hs_new_block_message&)> on_new_block_message,
std::function<void(const chain::hs_new_view_message&)> on_new_view_message
) {
FC_ASSERT(on_proposal_message, "on_proposal_message must be provided");
FC_ASSERT(on_vote_message, "on_proposal_message must be provided");
FC_ASSERT(on_new_block_message, "on_proposal_message must be provided");
FC_ASSERT(on_new_view_message, "on_proposal_message must be provided");
void chain_pacemaker::register_bcast_function(std::function<void(const chain::hs_message&)> on_hs_message) {
FC_ASSERT(on_hs_message, "on_hs_message must be provided");
std::lock_guard g( _hotstuff_global_mutex ); // not actually needed but doesn't hurt
bcast_proposal_message = std::move(on_proposal_message);
bcast_vote_message = std::move(on_vote_message);
bcast_new_block_message = std::move(on_new_block_message);
bcast_new_view_message = std::move(on_new_view_message);
bcast_hs_message = std::move(on_hs_message);
}

// Called internally by the chain_pacemaker to decide whether it should do something or not, based on feature activation.
Expand Down Expand Up @@ -282,19 +271,28 @@ namespace eosio { namespace hotstuff {
}

void chain_pacemaker::send_hs_proposal_msg(const hs_proposal_message& msg, name id) {
bcast_proposal_message(msg);
bcast_hs_message(msg);
}

void chain_pacemaker::send_hs_vote_msg(const hs_vote_message& msg, name id) {
bcast_vote_message(msg);
bcast_hs_message(msg);
}

void chain_pacemaker::send_hs_new_block_msg(const hs_new_block_message& msg, name id) {
bcast_new_block_message(msg);
bcast_hs_message(msg);
}

void chain_pacemaker::send_hs_new_view_msg(const hs_new_view_message& msg, name id) {
bcast_new_view_message(msg);
bcast_hs_message(msg);
}

void chain_pacemaker::on_hs_msg(const eosio::chain::hs_message &msg) {
std::visit(overloaded{
[this](const hs_vote_message& m) { on_hs_vote_msg(m); },
[this](const hs_proposal_message& m) { on_hs_proposal_msg(m); },
[this](const hs_new_block_message& m) { on_hs_new_block_msg(m); },
[this](const hs_new_view_message& m) { on_hs_new_view_msg(m); },
}, msg);
}

// called from net threads
Expand Down
22 changes: 8 additions & 14 deletions libraries/hotstuff/include/eosio/hotstuff/chain_pacemaker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,11 @@ namespace eosio::hotstuff {
//class-specific functions

chain_pacemaker(controller* chain, std::set<account_name> my_producers, fc::logger& logger);
void register_bcast_functions(
std::function<void(const chain::hs_proposal_message&)> on_proposal_message,
std::function<void(const chain::hs_vote_message&)> on_vote_message,
std::function<void(const chain::hs_new_block_message&)> on_new_block_message,
std::function<void(const chain::hs_new_view_message&)> on_new_view_message
);
void register_bcast_function(std::function<void(const chain::hs_message&)> on_hs_message);

void beat();

void on_hs_proposal_msg(const hs_proposal_message& msg); //consensus msg event handler
void on_hs_vote_msg(const hs_vote_message& msg); //confirmation msg event handler
void on_hs_new_view_msg(const hs_new_view_message& msg); //new view msg event handler
void on_hs_new_block_msg(const hs_new_block_message& msg); //new block msg event handler
void on_hs_msg(const hs_message& msg);

void get_state(finalizer_state& fs) const;

Expand Down Expand Up @@ -57,6 +49,11 @@ namespace eosio::hotstuff {
// Check if consensus upgrade feature is activated
bool enabled() const;

void on_hs_proposal_msg(const hs_proposal_message& msg); //consensus msg event handler
void on_hs_vote_msg(const hs_vote_message& msg); //confirmation msg event handler
void on_hs_new_view_msg(const hs_new_view_message& msg); //new view msg event handler
void on_hs_new_block_msg(const hs_new_block_message& msg); //new block msg event handler

// This serializes all messages (high-level requests) to the qc_chain core.
// For maximum safety, the qc_chain core will only process one request at a time.
// These requests can come directly from the net threads, or indirectly from a
Expand All @@ -73,10 +70,7 @@ namespace eosio::hotstuff {
chain::controller* _chain = nullptr;

qc_chain _qc_chain;
std::function<void(const chain::hs_proposal_message&)> bcast_proposal_message;
std::function<void(const chain::hs_vote_message&)> bcast_vote_message;
std::function<void(const chain::hs_new_block_message&)> bcast_new_block_message;
std::function<void(const chain::hs_new_view_message&)> bcast_new_view_message;
std::function<void(const chain::hs_message&)> bcast_hs_message;

uint32_t _quorum_threshold = 15; //FIXME/TODO: calculate from schedule
fc::logger& _logger;
Expand Down
31 changes: 4 additions & 27 deletions plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1119,17 +1119,9 @@ void chain_plugin::create_pacemaker(std::set<chain::account_name> my_producers)
my->_chain_pacemaker.emplace(&chain(), std::move(my_producers), hotstuff_logger);
}

void chain_plugin::register_pacemaker_bcast_functions(
std::function<void(const chain::hs_proposal_message&)> on_proposal_message,
std::function<void(const chain::hs_vote_message&)> on_vote_message,
std::function<void(const chain::hs_new_block_message&)> on_new_block_message,
std::function<void(const chain::hs_new_view_message&)> on_new_view_message) {
void chain_plugin::register_pacemaker_bcast_function(std::function<void(const chain::hs_message&)> on_hs_message) {
EOS_ASSERT( my->_chain_pacemaker, plugin_config_exception, "chain_pacemaker not created" );
my->_chain_pacemaker->register_bcast_functions(
std::move(on_proposal_message),
std::move(on_vote_message),
std::move(on_new_block_message),
std::move(on_new_view_message));
my->_chain_pacemaker->register_bcast_function(std::move(on_hs_message));
}


Expand Down Expand Up @@ -2691,23 +2683,8 @@ read_only::get_finalizer_state(const get_finalizer_state_params&, const fc::time
} // namespace chain_apis

// called from net threads
void chain_plugin::notify_hs_vote_message( const hs_vote_message& msg ) {
my->_chain_pacemaker->on_hs_vote_msg(msg);
};

// called from net threads
void chain_plugin::notify_hs_proposal_message( const hs_proposal_message& msg ) {
my->_chain_pacemaker->on_hs_proposal_msg(msg);
};

// called from net threads
void chain_plugin::notify_hs_new_view_message( const hs_new_view_message& msg ) {
my->_chain_pacemaker->on_hs_new_view_msg(msg);
};

// called from net threads
void chain_plugin::notify_hs_new_block_message( const hs_new_block_message& msg ) {
my->_chain_pacemaker->on_hs_new_block_msg(msg);
void chain_plugin::notify_hs_message( const hs_message& msg ) {
my->_chain_pacemaker->on_hs_msg(msg);
};

void chain_plugin::notify_hs_block_produced() {
Expand Down
12 changes: 2 additions & 10 deletions plugins/chain_plugin/include/eosio/chain_plugin/chain_plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1032,16 +1032,8 @@ class chain_plugin : public plugin<chain_plugin> {
const controller& chain() const;

void create_pacemaker(std::set<chain::account_name> my_producers);
void register_pacemaker_bcast_functions(
std::function<void(const chain::hs_proposal_message&)> on_proposal_message,
std::function<void(const chain::hs_vote_message&)> on_vote_message,
std::function<void(const chain::hs_new_block_message&)> on_new_block_message,
std::function<void(const chain::hs_new_view_message&)> on_new_view_message
);
void notify_hs_vote_message( const chain::hs_vote_message& msg );
void notify_hs_proposal_message( const chain::hs_proposal_message& msg );
void notify_hs_new_view_message( const chain::hs_new_view_message& msg );
void notify_hs_new_block_message( const chain::hs_new_block_message& msg );
void register_pacemaker_bcast_function(std::function<void(const chain::hs_message&)> on_hs_message);
void notify_hs_message( const chain::hs_message& msg );
void notify_hs_block_produced();

chain::chain_id_type get_chain_id() const;
Expand Down
5 changes: 1 addition & 4 deletions plugins/net_plugin/include/eosio/net_plugin/protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,7 @@ namespace eosio {
sync_request_message,
signed_block,
packed_transaction,
hs_vote_message,
hs_proposal_message,
hs_new_view_message,
hs_new_block_message>;
hs_message>;

} // namespace eosio

Expand Down
105 changes: 11 additions & 94 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -495,10 +495,7 @@ namespace eosio {
void transaction_ack(const std::pair<fc::exception_ptr, packed_transaction_ptr>&);
void on_irreversible_block( const block_state_ptr& block );

void on_hs_proposal_message( const hs_proposal_message& msg );
void on_hs_vote_message( const hs_vote_message& msg );
void on_hs_new_view_message( const hs_new_view_message& msg );
void on_hs_new_block_message( const hs_new_block_message& msg );
void on_hs_message( const hs_message& msg );

void start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr<connection> from_connection);
void start_expire_timer();
Expand Down Expand Up @@ -1038,15 +1035,11 @@ namespace eosio {
void handle_message( const block_id_type& id, signed_block_ptr ptr );
void handle_message( const packed_transaction& msg ) = delete; // packed_transaction_ptr overload used instead
void handle_message( packed_transaction_ptr trx );
void handle_message( const hs_message& msg );

// returns calculated number of blocks combined latency
uint32_t calc_block_latency();

void handle_message( const hs_vote_message& msg );
void handle_message( const hs_proposal_message& msg );
void handle_message( const hs_new_view_message& msg );
void handle_message( const hs_new_block_message& msg );

void process_signed_block( const block_id_type& id, signed_block_ptr block, block_state_ptr bsp );

fc::variant_object get_logger_variant() const {
Expand Down Expand Up @@ -1124,30 +1117,12 @@ namespace eosio {
c->handle_message( msg );
}

void operator()( const hs_vote_message& msg ) const {
void operator()( const hs_message& msg ) const {
// continue call to handle_message on connection strand
peer_dlog( c, "handle hs_vote_message" );
c->handle_message( msg );
}
void operator()( const hs_proposal_message& msg ) const {
// continue call to handle_message on connection strand
peer_dlog( c, "handle hs_proposal_message" );
c->handle_message( msg );
}
void operator()( const hs_new_view_message& msg ) const {
// continue call to handle_message on connection strand
peer_dlog( c, "handle hs_new_view_message" );
c->handle_message( msg );
}
void operator()( const hs_new_block_message& msg ) const {
// continue call to handle_message on connection strand
peer_dlog( c, "handle hs_new_block_message" );
c->handle_message( msg );
}


};



std::tuple<std::string, std::string, std::string> split_host_port_type(const std::string& peer_add) {
Expand Down Expand Up @@ -3586,24 +3561,9 @@ namespace eosio {
}
}

void connection::handle_message( const hs_vote_message& msg ) {
peer_dlog(this, "received vote: ${msg}", ("msg", msg));
my_impl->chain_plug->notify_hs_vote_message(msg);
}

void connection::handle_message( const hs_proposal_message& msg ) {
peer_dlog(this, "received proposal: ${msg}", ("msg", msg));
my_impl->chain_plug->notify_hs_proposal_message(msg);
}

void connection::handle_message( const hs_new_view_message& msg ) {
peer_dlog(this, "received new view: ${msg}", ("msg", msg));
my_impl->chain_plug->notify_hs_new_view_message(msg);
}

void connection::handle_message( const hs_new_block_message& msg ) {
peer_dlog(this, "received new block msg: ${msg}", ("msg", msg));
my_impl->chain_plug->notify_hs_new_block_message(msg);
void connection::handle_message( const hs_message& msg ) {
peer_dlog(this, "received hs: ${msg}", ("msg", msg));
my_impl->chain_plug->notify_hs_message(msg);
}

size_t calc_trx_size( const packed_transaction_ptr& trx ) {
Expand Down Expand Up @@ -3857,41 +3817,8 @@ namespace eosio {
on_active_schedule(chain_plug->chain().active_producers());
}

void net_plugin_impl::on_hs_proposal_message( const hs_proposal_message& msg ) {
fc_dlog(logger, "sending proposal msg: ${msg}", ("msg", msg));

buffer_factory buff_factory;
auto send_buffer = buff_factory.get_send_buffer( msg );

dispatcher->strand.post( [this, msg{std::move(send_buffer)}]() mutable {
dispatcher->bcast_msg( std::move(msg) );
});
}

void net_plugin_impl::on_hs_vote_message( const hs_vote_message& msg ) {
fc_dlog(logger, "sending vote msg: ${msg}", ("msg", msg));

buffer_factory buff_factory;
auto send_buffer = buff_factory.get_send_buffer( msg );

dispatcher->strand.post( [this, msg{std::move(send_buffer)}]() mutable {
dispatcher->bcast_msg( std::move(msg) );
});
}

void net_plugin_impl::on_hs_new_view_message( const hs_new_view_message& msg ) {
fc_dlog(logger, "sending new_view msg: ${msg}", ("msg", msg));

buffer_factory buff_factory;
auto send_buffer = buff_factory.get_send_buffer( msg );

dispatcher->strand.post( [this, msg{std::move(send_buffer)}]() mutable {
dispatcher->bcast_msg( std::move(msg) );
});
}

void net_plugin_impl::on_hs_new_block_message( const hs_new_block_message& msg ) {
fc_dlog(logger, "sending new_block msg: ${msg}", ("msg", msg));
void net_plugin_impl::on_hs_message( const hs_message& msg ) {
fc_dlog(logger, "sending hs msg: ${msg}", ("msg", msg));

buffer_factory buff_factory;
auto send_buffer = buff_factory.get_send_buffer( msg );
Expand Down Expand Up @@ -4225,21 +4152,11 @@ namespace eosio {
chain_plug->enable_accept_transactions();
}

chain_plug->register_pacemaker_bcast_functions(
[my = shared_from_this()](const hs_proposal_message& s) {
my->on_hs_proposal_message(s);
},
[my = shared_from_this()](const hs_vote_message& s) {
my->on_hs_vote_message(s);
},
[my = shared_from_this()](const hs_new_block_message& s) {
my->on_hs_new_block_message(s);
},
[my = shared_from_this()](const hs_new_view_message& s) {
my->on_hs_new_view_message(s);
chain_plug->register_pacemaker_bcast_function(
[my = shared_from_this()](const hs_message& s) {
my->on_hs_message(s);
} );


} FC_LOG_AND_RETHROW()
}

Expand Down

0 comments on commit 6af3110

Please sign in to comment.