Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P: Prioritize blocks, votes, and all messages over trxs #1090

Merged
merged 11 commits into from
Jan 3, 2025

Conversation

heifner
Copy link
Member

@heifner heifner commented Jan 2, 2025

  • Send all messages including blocks and votes over transactions. This prevents large trxs or large number of trxs from delaying block or vote propagation.
  • Includes a fix for buffer_queue.out_callback() called before clear_out_queue(). Long standing bug where the callbacks were never called because they were cleared before being called.
  • Do not post async_write to connection strand as the call is already on the connection strand.
  • Move serialization of votes off the main thread.

Resolves #1071

@heifner heifner requested review from greg7mdp and linh2931 January 2, 2025 15:00
@heifner heifner added the OCI Work exclusive to OCI team label Jan 2, 2025
Copy link
Contributor

@greg7mdp greg7mdp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move serialization of votes off the main thread.

Where is that done?

@@ -692,7 +698,7 @@ namespace eosio {
mutable fc::mutex _mtx;
uint32_t _write_queue_size GUARDED_BY(_mtx) {0};
deque<queued_write> _write_queue GUARDED_BY(_mtx);
deque<queued_write> _sync_write_queue GUARDED_BY(_mtx); // sync_write_queue will be sent first
deque<queued_write> _trx_write_queue GUARDED_BY(_mtx); // trx_write_queue will be sent last
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -638,10 +645,10 @@ namespace eosio {
// @param callback must not callback into queued_buffer
bool add_write_queue( const std::shared_ptr<vector<char>>& buff,
std::function<void( boost::system::error_code, std::size_t )> callback,
bool to_sync_queue ) {
uint32_t net_message_which ) {
Copy link
Contributor

@greg7mdp greg7mdp Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this uint32_t net_message_which could be replaced with msg_type_t msg_type, where msg_type_t is defined as:

   enum class msg_type_t : uint32_t {
      signed_block       = fc::get_index<net_message, signed_block>(),
      packed_transaction = fc::get_index<net_message, packed_transaction>(),
      vote_message       = fc::get_index<net_message, vote_message>()
   };

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 1574 to 1575
boost::asio::async_write( *c->socket, bufs,
boost::asio::bind_executor( c->strand, [c, socket=c->socket]( boost::system::error_code ec, std::size_t w ) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it could be slightly simplified to:

Suggested change
boost::asio::async_write( *c->socket, bufs,
boost::asio::bind_executor( c->strand, [c, socket=c->socket]( boost::system::error_code ec, std::size_t w ) {
boost::asio::async_write( *socket, bufs,
boost::asio::bind_executor( strand, [c, socket]( boost::system::error_code ec, std::size_t w ) {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_write_queue_size = 0;
}

void clear_out_queue() {
void clear_out_queue(boost::system::error_code ec, std::size_t w) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does parameter w mean?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced with number_of_bytes_written

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -3928,15 +3936,18 @@ namespace eosio {
}

void net_plugin_impl::bcast_vote_message( uint32_t exclude_peer, const chain::vote_message_ptr& msg ) {
buffer_factory buff_factory;
auto send_buffer = buff_factory.get_send_buffer( *msg );
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the serialization of vote_message off of main thread and move it down below into the net thread pool.

my_impl->dispatcher.bcast_vote_msg( exclude_peer, std::move(msg) );
boost::asio::post( my_impl->thread_pool.get_executor(), [exclude_peer, msg]() mutable {
buffer_factory buff_factory;
auto send_buffer = buff_factory.get_send_buffer( *msg );
Copy link
Contributor

@greg7mdp greg7mdp Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accesses the shared_ptr send_buffer in buffer_factory from multiple threads without synchronization. It is not even clear to me what this variable is for?

Never mind I see that the buffer_factory is recreated each time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really seeing why we have the caching in buffer factory (and pass a buffer factory to the posted lambdas) rather than just pass the serialized buffer to the posted lambdas?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this particular case the vote_message_ptr is passed to the lambda so it can be serialized in the net thread pool instead of on the calling thread. The buffer_factory caching is not being used for votes, just provides the convenient interface.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking at dispatch_manager::bcast_block() too. What is the benefit of using the buffer_factory there, instead of just calling get_send_buffer before the call to for_each_block_connection and capturing the send_buffer in the lambda instead of the buffer_factory?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For bcast_block the benefit is the node does not have to serialize the block if all peers have already received the block.

@ericpassmore
Copy link
Contributor

Note:start
category: System Stability
component: P2P
summary: Update net-plugin to prioritize blocks, votes, and messages to ensure timeline arrival of critical information.
Note:end

@heifner heifner merged commit d89dc9f into main Jan 3, 2025
36 checks passed
@heifner heifner deleted the GH-1071-p2p-prioritize-blocks branch January 3, 2025 12:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
OCI Work exclusive to OCI team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

P2P: Prioritize blocks over trxs in connection buffer queue
4 participants