Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P: Prioritize blocks, votes, and all messages over trxs #1090

Merged
merged 11 commits into from
Jan 3, 2025
213 changes: 112 additions & 101 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -598,18 +598,25 @@ namespace eosio {
// thread safe
class queued_buffer : boost::noncopyable {
public:
void reset() {
fc::lock_guard g( _mtx );
_write_queue.clear();
_trx_write_queue.clear();
_write_queue_size = 0;
_out_queue.clear();
}

void clear_write_queue() {
fc::lock_guard g( _mtx );
_write_queue.clear();
_sync_write_queue.clear();
_trx_write_queue.clear();
_write_queue_size = 0;
}

void clear_out_queue() {
void clear_out_queue(boost::system::error_code ec, std::size_t w) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does parameter w mean?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced with number_of_bytes_written

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fc::lock_guard g( _mtx );
while ( !_out_queue.empty() ) {
_out_queue.pop_front();
}
out_callback( ec, w );
_out_queue.clear();
}

uint32_t write_queue_size() const {
Expand All @@ -627,7 +634,7 @@ namespace eosio {
fc::unique_lock g( _mtx );
// if out_queue is not empty then async_write is in progress
bool async_write_in_progress = !_out_queue.empty();
bool ready = ((!_sync_write_queue.empty() || !_write_queue.empty()) && !async_write_in_progress);
bool ready = ((!_trx_write_queue.empty() || !_write_queue.empty()) && !async_write_in_progress);
g.unlock();
if (async_write_in_progress) {
fc_dlog(logger, "Connection - ${id} not ready to send data, async write in progress", ("id", connection_id));
Expand All @@ -638,10 +645,10 @@ namespace eosio {
// @param callback must not callback into queued_buffer
bool add_write_queue( const std::shared_ptr<vector<char>>& buff,
std::function<void( boost::system::error_code, std::size_t )> callback,
bool to_sync_queue ) {
uint32_t net_message_which ) {
Copy link
Contributor

@greg7mdp greg7mdp Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this uint32_t net_message_which could be replaced with msg_type_t msg_type, where msg_type_t is defined as:

   enum class msg_type_t : uint32_t {
      signed_block       = fc::get_index<net_message, signed_block>(),
      packed_transaction = fc::get_index<net_message, packed_transaction>(),
      vote_message       = fc::get_index<net_message, vote_message>()
   };

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fc::lock_guard g( _mtx );
if( to_sync_queue ) {
_sync_write_queue.push_back( {buff, std::move(callback)} );
if( net_message_which == packed_transaction_which ) {
_trx_write_queue.push_back( {buff, std::move(callback)} );
} else {
_write_queue.push_back( {buff, std::move(callback)} );
}
Expand All @@ -654,18 +661,11 @@ namespace eosio {

void fill_out_buffer( std::vector<boost::asio::const_buffer>& bufs ) {
fc::lock_guard g( _mtx );
if( !_sync_write_queue.empty() ) { // always send msgs from sync_write_queue first
fill_out_buffer( bufs, _sync_write_queue );
} else { // postpone real_time write_queue if sync queue is not empty
if( !_write_queue.empty() ) { // always send msgs from write_queue first
fill_out_buffer( bufs, _write_queue );
EOS_ASSERT( _write_queue_size == 0, plugin_exception, "write queue size expected to be zero" );
}
}

void out_callback( boost::system::error_code ec, std::size_t w ) {
fc::lock_guard g( _mtx );
for( auto& m : _out_queue ) {
m.callback( ec, w );
} else {
fill_out_buffer( bufs, _trx_write_queue );
assert(_trx_write_queue.empty() && _write_queue.empty() && _write_queue_size == 0);
}
}

Expand All @@ -682,6 +682,12 @@ namespace eosio {
}
}

void out_callback( boost::system::error_code ec, std::size_t w ) REQUIRES(_mtx) {
for( auto& m : _out_queue ) {
m.callback( ec, w );
}
}

private:
struct queued_write {
std::shared_ptr<vector<char>> buff;
Expand All @@ -692,7 +698,7 @@ namespace eosio {
mutable fc::mutex _mtx;
uint32_t _write_queue_size GUARDED_BY(_mtx) {0};
deque<queued_write> _write_queue GUARDED_BY(_mtx);
deque<queued_write> _sync_write_queue GUARDED_BY(_mtx); // sync_write_queue will be sent first
deque<queued_write> _trx_write_queue GUARDED_BY(_mtx); // trx_write_queue will be sent last
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deque<queued_write> _out_queue GUARDED_BY(_mtx);

}; // queued_buffer
Expand Down Expand Up @@ -967,10 +973,11 @@ namespace eosio {
void blk_send_branch( uint32_t msg_head_num, uint32_t fork_db_root_num, uint32_t head_num );

void enqueue( const net_message &msg );
size_t enqueue_block( const std::vector<char>& sb, uint32_t block_num, bool to_sync_queue = false);
void enqueue_buffer( const std::shared_ptr<std::vector<char>>& send_buffer,
go_away_reason close_after_send,
bool to_sync_queue = false);
size_t enqueue_block( const std::vector<char>& sb, uint32_t block_num );
void enqueue_buffer( uint32_t net_message_which,
const std::shared_ptr<std::vector<char>>& send_buffer,
block_num_type block_num,
go_away_reason close_after_send);
void cancel_sync();
void flush_queues();
bool enqueue_sync_block();
Expand All @@ -979,9 +986,9 @@ namespace eosio {
void cancel_sync_wait();
void sync_wait();

void queue_write(const std::shared_ptr<vector<char>>& buff,
std::function<void(boost::system::error_code, std::size_t)> callback,
bool to_sync_queue = false);
void queue_write(uint32_t net_message_which,
const std::shared_ptr<vector<char>>& buff,
std::function<void(boost::system::error_code, std::size_t)> callback);
void do_queue_write();

bool is_valid( const handshake_message& msg ) const;
Expand Down Expand Up @@ -1544,10 +1551,10 @@ namespace eosio {
}

// called from connection strand
void connection::queue_write(const std::shared_ptr<vector<char>>& buff,
std::function<void(boost::system::error_code, std::size_t)> callback,
bool to_sync_queue) {
if( !buffer_queue.add_write_queue( buff, std::move(callback), to_sync_queue )) {
void connection::queue_write(uint32_t net_message_which,
const std::shared_ptr<vector<char>>& buff,
std::function<void(boost::system::error_code, std::size_t)> callback) {
if( !buffer_queue.add_write_queue( buff, std::move(callback), net_message_which )) {
peer_wlog( this, "write_queue full ${s} bytes, giving up on connection", ("s", buffer_queue.write_queue_size()) );
close();
return;
Expand All @@ -1564,55 +1571,54 @@ namespace eosio {
std::vector<boost::asio::const_buffer> bufs;
buffer_queue.fill_out_buffer( bufs );

boost::asio::post(strand, [c{std::move(c)}, bufs{std::move(bufs)}]() {
boost::asio::async_write( *c->socket, bufs,
boost::asio::bind_executor( c->strand, [c, socket=c->socket]( boost::system::error_code ec, std::size_t w ) {
try {
c->buffer_queue.clear_out_queue();
// May have closed connection and cleared buffer_queue
if (!c->socket->is_open() && c->socket_is_open()) { // if socket_open then close not called
peer_ilog(c, "async write socket closed before callback");
c->close();
return;
}
if (socket != c->socket ) { // different socket, c must have created a new socket, make sure previous is closed
peer_ilog( c, "async write socket changed before callback");
boost::system::error_code ec;
socket->shutdown( tcp::socket::shutdown_both, ec );
socket->close( ec );
return;
}
boost::asio::async_write( *c->socket, bufs,
boost::asio::bind_executor( c->strand, [c, socket=c->socket]( boost::system::error_code ec, std::size_t w ) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it could be slightly simplified to:

Suggested change
boost::asio::async_write( *c->socket, bufs,
boost::asio::bind_executor( c->strand, [c, socket=c->socket]( boost::system::error_code ec, std::size_t w ) {
boost::asio::async_write( *socket, bufs,
boost::asio::bind_executor( strand, [c, socket]( boost::system::error_code ec, std::size_t w ) {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try {
peer_dlog(c, "async write complete");
// May have closed connection and cleared buffer_queue
if (!c->socket->is_open() && c->socket_is_open()) { // if socket_open then close not called
peer_ilog(c, "async write socket closed before callback");
c->buffer_queue.clear_out_queue(ec, w);
c->close();
return;
}
if (socket != c->socket ) { // different socket, c must have created a new socket, make sure previous is closed
peer_ilog( c, "async write socket changed before callback");
c->buffer_queue.clear_out_queue(ec, w);
boost::system::error_code ignore_ec;
socket->shutdown( tcp::socket::shutdown_both, ignore_ec );
socket->close( ignore_ec );
return;
}

if( ec ) {
if( ec.value() != boost::asio::error::eof ) {
peer_wlog( c, "Error sending to peer: ${i}", ( "i", ec.message() ) );
} else {
peer_wlog( c, "connection closure detected on write" );
}
c->close();
return;
if( ec ) {
if( ec.value() != boost::asio::error::eof ) {
peer_wlog( c, "Error sending to peer: ${i}", ( "i", ec.message() ) );
} else {
peer_wlog( c, "connection closure detected on write" );
}
peer_dlog(c, "async write complete");
c->bytes_sent += w;
c->last_bytes_sent = c->get_time();

c->buffer_queue.out_callback( ec, w );

c->enqueue_sync_block();
c->do_queue_write();
} catch ( const std::bad_alloc& ) {
throw;
} catch ( const boost::interprocess::bad_alloc& ) {
throw;
} catch( const fc::exception& ex ) {
peer_elog( c, "fc::exception in do_queue_write: ${s}", ("s", ex.to_string()) );
} catch( const std::exception& ex ) {
peer_elog( c, "std::exception in do_queue_write: ${s}", ("s", ex.what()) );
} catch( ... ) {
peer_elog( c, "Unknown exception in do_queue_write" );
c->close();
return;
}
}));
});
c->bytes_sent += w;
c->last_bytes_sent = c->get_time();

c->buffer_queue.clear_out_queue(ec, w);

c->enqueue_sync_block();
c->do_queue_write();
} catch ( const std::bad_alloc& ) {
throw;
} catch ( const boost::interprocess::bad_alloc& ) {
throw;
} catch( const fc::exception& ex ) {
peer_elog( c, "fc::exception in do_queue_write: ${s}", ("s", ex.to_string()) );
} catch( const std::exception& ex ) {
peer_elog( c, "std::exception in do_queue_write: ${s}", ("s", ex.what()) );
} catch( ... ) {
peer_elog( c, "Unknown exception in do_queue_write" );
}
}));
}

// called from connection strand
Expand Down Expand Up @@ -1664,7 +1670,7 @@ namespace eosio {
}
}
block_sync_throttling = false;
auto sent = enqueue_block( sb, num, true );
auto sent = enqueue_block( sb, num );
block_sync_total_bytes_sent += sent;
block_sync_frame_bytes_sent += sent;
++peer_requested->last;
Expand Down Expand Up @@ -1820,38 +1826,43 @@ namespace eosio {

buffer_factory buff_factory;
const auto& send_buffer = buff_factory.get_send_buffer( m );
enqueue_buffer( send_buffer, close_after_send );
enqueue_buffer( m.index(), send_buffer, 0, close_after_send );
}

// called from connection strand
size_t connection::enqueue_block( const std::vector<char>& b, uint32_t block_num, bool to_sync_queue ) {
size_t connection::enqueue_block( const std::vector<char>& b, uint32_t block_num ) {
peer_dlog( this, "enqueue block ${num}", ("num", block_num) );
verify_strand_in_this_thread( strand, __func__, __LINE__ );

block_buffer_factory buff_factory;
const auto& sb = buff_factory.get_send_buffer( b );
latest_blk_time = std::chrono::steady_clock::now();
enqueue_buffer( sb, no_reason, to_sync_queue);
enqueue_buffer( signed_block_which, sb, block_num, no_reason);
return sb->size();
}

// called from connection strand
void connection::enqueue_buffer( const std::shared_ptr<std::vector<char>>& send_buffer,
go_away_reason close_after_send,
bool to_sync_queue)
void connection::enqueue_buffer( uint32_t net_message_which,
const std::shared_ptr<std::vector<char>>& send_buffer,
block_num_type block_num, // only valid for net_message_which == signed_block_which
go_away_reason close_after_send)
{
connection_ptr self = shared_from_this();
queue_write(send_buffer,
[conn{std::move(self)}, close_after_send](boost::system::error_code ec, std::size_t ) {
if (ec) return;
queue_write(net_message_which, send_buffer,
[conn{std::move(self)}, close_after_send, net_message_which, block_num](boost::system::error_code ec, std::size_t ) {
if (ec) {
fc_elog(logger, "Connection - ${cid} - send failed with: ${e}", ("cid", conn->connection_id)("e", ec.to_string()));
return;
}
if (net_message_which == signed_block_which)
fc_dlog(logger, "Connection - ${cid} - done sending block ${bn}", ("cid", conn->connection_id)("bn", block_num));
if (close_after_send != no_reason) {
fc_ilog( logger, "sent a go away message: ${r}, closing connection ${cid}",
("r", reason_str(close_after_send))("cid", conn->connection_id) );
conn->close();
return;
}
},
to_sync_queue);
});
}

// thread safe
Expand Down Expand Up @@ -2674,24 +2685,21 @@ namespace eosio {
bool has_block = cp->peer_fork_db_root_num >= bnum;
if( !has_block ) {
peer_dlog( cp, "bcast block ${b}", ("b", bnum) );
cp->enqueue_buffer( sb, no_reason );
cp->enqueue_buffer( signed_block_which, sb, bnum, no_reason );
}
});
} );
}

void dispatch_manager::bcast_vote_msg( uint32_t exclude_peer, send_buffer_type msg ) {
if (my_impl->sync_master->syncing_from_peer())
return;

my_impl->connections.for_each_block_connection( [exclude_peer, msg{std::move(msg)}]( auto& cp ) {
if( !cp->current() ) return true;
if( cp->connection_id == exclude_peer ) return true;
boost::asio::post(cp->strand, [cp, msg]() {
if (cp->protocol_version >= proto_savanna) {
if (vote_logger.is_enabled(fc::log_level::debug))
peer_dlog(cp, "sending vote msg");
cp->enqueue_buffer( msg, no_reason );
cp->enqueue_buffer( vote_message_which, msg, 0, no_reason );
}
});
return true;
Expand All @@ -2713,7 +2721,7 @@ namespace eosio {
send_buffer_type sb = buff_factory.get_send_buffer( trx );
fc_dlog( logger, "sending trx: ${id}, to connection - ${cid}", ("id", trx->id())("cid", cp->connection_id) );
boost::asio::post(cp->strand, [cp, sb{std::move(sb)}]() {
cp->enqueue_buffer( sb, no_reason );
cp->enqueue_buffer( packed_transaction_which, sb, 0, no_reason );
} );
} );
}
Expand Down Expand Up @@ -2745,7 +2753,7 @@ namespace eosio {
void connection::connect( const tcp::resolver::results_type& endpoints ) {
set_state(connection_state::connecting);
pending_message_buffer.reset();
buffer_queue.clear_out_queue();
buffer_queue.reset();
boost::asio::async_connect( *socket, endpoints,
boost::asio::bind_executor( strand,
[c = shared_from_this(), socket=socket]( const boost::system::error_code& err, const tcp::endpoint& endpoint ) {
Expand Down Expand Up @@ -3928,15 +3936,18 @@ namespace eosio {
}

void net_plugin_impl::bcast_vote_message( uint32_t exclude_peer, const chain::vote_message_ptr& msg ) {
buffer_factory buff_factory;
auto send_buffer = buff_factory.get_send_buffer( *msg );
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the serialization of vote_message off of main thread and move it down below into the net thread pool.

if (my_impl->sync_master->syncing_from_peer())
return;

fc_dlog(vote_logger, "bcast ${t} vote: block #${bn} ${id}.., ${v}, key ${k}..",
("t", exclude_peer ? "received" : "our")("bn", block_header::num_from_id(msg->block_id))("id", msg->block_id.str().substr(8,16))
("v", msg->strong ? "strong" : "weak")("k", msg->finalizer_key.to_string().substr(8,16)));

boost::asio::post( my_impl->thread_pool.get_executor(), [exclude_peer, msg{std::move(send_buffer)}]() mutable {
my_impl->dispatcher.bcast_vote_msg( exclude_peer, std::move(msg) );
boost::asio::post( my_impl->thread_pool.get_executor(), [exclude_peer, msg]() mutable {
buffer_factory buff_factory;
auto send_buffer = buff_factory.get_send_buffer( *msg );
Copy link
Contributor

@greg7mdp greg7mdp Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accesses the shared_ptr send_buffer in buffer_factory from multiple threads without synchronization. It is not even clear to me what this variable is for?

Never mind I see that the buffer_factory is recreated each time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really seeing why we have the caching in buffer factory (and pass a buffer factory to the posted lambdas) rather than just pass the serialized buffer to the posted lambdas?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this particular case the vote_message_ptr is passed to the lambda so it can be serialized in the net thread pool instead of on the calling thread. The buffer_factory caching is not being used for votes, just provides the convenient interface.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking at dispatch_manager::bcast_block() too. What is the benefit of using the buffer_factory there, instead of just calling get_send_buffer before the call to for_each_block_connection and capturing the send_buffer in the lambda instead of the buffer_factory?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For bcast_block the benefit is the node does not have to serialize the block if all peers have already received the block.


my_impl->dispatcher.bcast_vote_msg( exclude_peer, std::move(send_buffer) );
});
}

Expand Down
Loading