Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P: Misc improvements #1040

Merged
merged 5 commits into from
Apr 17, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 54 additions & 34 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,19 @@
#include <atomic>
#include <cmath>
#include <memory>
#include <new>
#include <shared_mutex>

// should be defined for c++17, but clang++16 still has not implemented it
#ifdef __cpp_lib_hardware_interference_size
using std::hardware_constructive_interference_size;
using std::hardware_destructive_interference_size;
#else
// 64 bytes on x86-64 │ L1_CACHE_BYTES │ L1_CACHE_SHIFT │ __cacheline_aligned │ ...
[[maybe_unused]] constexpr std::size_t hardware_constructive_interference_size = 64;
[[maybe_unused]] constexpr std::size_t hardware_destructive_interference_size = 64;
#endif

using namespace eosio::chain::plugin_interface;

namespace eosio {
Expand Down Expand Up @@ -122,12 +133,15 @@ namespace eosio {
static constexpr int64_t block_interval_ns =
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::milliseconds(config::block_interval_ms)).count();

mutable std::mutex sync_mtx;
alignas(hardware_destructive_interference_size)
std::mutex sync_mtx;
uint32_t sync_known_lib_num{0};
uint32_t sync_last_requested_num{0};
uint32_t sync_next_expected_num{0};
uint32_t sync_req_span{0};
connection_ptr sync_source;

alignas(hardware_destructive_interference_size)
std::atomic<stages> sync_state{in_sync};

private:
Expand All @@ -150,17 +164,18 @@ namespace eosio {
void sync_update_expected( const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num, bool blk_applied );
void recv_handshake( const connection_ptr& c, const handshake_message& msg );
void sync_recv_notice( const connection_ptr& c, const notice_message& msg );
inline std::unique_lock<std::mutex> locked_sync_mutex() {
return std::unique_lock<std::mutex>(sync_mtx);
}
inline void reset_last_requested_num(const std::unique_lock<std::mutex>& lock) {
inline void reset_last_requested_num() {
std::lock_guard g(sync_mtx);
sync_last_requested_num = 0;
}
};

class dispatch_manager {
alignas(hardware_destructive_interference_size)
mutable std::mutex blk_state_mtx;
peer_block_state_index blk_state;

alignas(hardware_destructive_interference_size)
mutable std::mutex local_txns_mtx;
node_transaction_index local_txns;

Expand Down Expand Up @@ -215,7 +230,6 @@ namespace eosio {
class net_plugin_impl : public std::enable_shared_from_this<net_plugin_impl>,
public auto_bp_peering::bp_connection_manager<net_plugin_impl, connection> {
public:
using connection_t = connection;
unique_ptr<tcp::acceptor> acceptor;
std::atomic<uint32_t> current_connection_id{0};

Expand Down Expand Up @@ -265,21 +279,27 @@ namespace eosio {
bool use_socket_read_watermark = false;
/** @} */

alignas(hardware_destructive_interference_size)
mutable std::shared_mutex connections_mtx;
std::set< connection_ptr > connections; // todo: switch to a thread safe container to avoid big mutex over complete collection

alignas(hardware_destructive_interference_size)
std::mutex connector_check_timer_mtx;
unique_ptr<boost::asio::steady_timer> connector_check_timer;
int connector_checks_in_flight{0};

alignas(hardware_destructive_interference_size)
std::mutex expire_timer_mtx;
unique_ptr<boost::asio::steady_timer> expire_timer;

alignas(hardware_destructive_interference_size)
std::mutex keepalive_timer_mtx;
unique_ptr<boost::asio::steady_timer> keepalive_timer;

alignas(hardware_destructive_interference_size)
std::atomic<bool> in_shutdown{false};

alignas(hardware_destructive_interference_size)
compat::channels::transaction_ack::channel_type::handle incoming_transaction_ack_subscription;

uint16_t thread_pool_size = 4;
Expand All @@ -297,6 +317,7 @@ namespace eosio {
};

private:
alignas(hardware_destructive_interference_size)
mutable std::mutex chain_info_mtx; // protects chain_info_t
chain_info_t chain_info;

Expand Down Expand Up @@ -541,6 +562,7 @@ namespace eosio {
std::function<void( boost::system::error_code, std::size_t )> callback;
};

alignas(hardware_destructive_interference_size)
mutable std::mutex _mtx;
uint32_t _write_queue_size{0};
deque<queued_write> _write_queue;
Expand Down Expand Up @@ -616,7 +638,8 @@ namespace eosio {

std::optional<peer_sync_state> peer_requested; // this peer is requesting info from us

std::atomic<bool> socket_open{false};
alignas(hardware_destructive_interference_size)
std::atomic<bool> socket_open{false};

const string peer_addr;
enum connection_types : char {
Expand All @@ -632,7 +655,7 @@ namespace eosio {
std::shared_ptr<tcp::socket> socket; // only accessed through strand after construction

fc::message_buffer<1024*1024> pending_message_buffer;
std::atomic<std::size_t> outstanding_read_bytes{0}; // accessed only from strand threads
std::size_t outstanding_read_bytes{0}; // accessed only from strand threads

queued_buffer buffer_queue;

Expand All @@ -646,24 +669,32 @@ namespace eosio {
// kept in sync with last_handshake_recv.last_irreversible_block_num, only accessed from connection strand
uint32_t peer_lib_num = 0;

alignas(hardware_destructive_interference_size)
std::atomic<uint32_t> trx_in_progress_size{0};

fc::time_point last_dropped_trx_msg_time;
const uint32_t connection_id;
int16_t sent_handshake_count = 0;

alignas(hardware_destructive_interference_size)
std::atomic<bool> connecting{true};
std::atomic<bool> syncing{false};
std::atomic<bool> closing{false};

std::atomic<uint16_t> protocol_version = 0;
uint16_t net_version = net_version_max;
std::atomic<uint16_t> consecutive_immediate_connection_close = 0;
std::atomic<bool> is_bp_connection = false;
block_status_monitor block_status_monitor_;

alignas(hardware_destructive_interference_size)
std::mutex response_expected_timer_mtx;
boost::asio::steady_timer response_expected_timer;

alignas(hardware_destructive_interference_size)
std::atomic<go_away_reason> no_retry{no_reason};

alignas(hardware_destructive_interference_size)
mutable std::mutex conn_mtx; //< mtx for last_req .. remote_endpoint_ip
std::optional<request_message> last_req;
handshake_message last_handshake_recv;
Expand Down Expand Up @@ -1000,7 +1031,7 @@ namespace eosio {
}

bool connection::connected() const {
return socket_is_open() && !connecting;
return socket_is_open() && !connecting && !closing;
}

bool connection::current() const {
Expand All @@ -1012,6 +1043,7 @@ namespace eosio {
}

void connection::close( bool reconnect, bool shutdown ) {
closing = true;
strand.post( [self = shared_from_this(), reconnect, shutdown]() {
connection::_close( self.get(), reconnect, shutdown );
});
Expand Down Expand Up @@ -1049,6 +1081,7 @@ namespace eosio {
if( !shutdown) my_impl->sync_master->sync_reset_lib_num( self->shared_from_this(), true );
peer_ilog( self, "closing" );
self->cancel_wait();
self->closing = false;

if( reconnect && !shutdown ) {
my_impl->start_conn_timer( std::chrono::milliseconds( 100 ), connection_wptr() );
Expand Down Expand Up @@ -1145,6 +1178,8 @@ namespace eosio {
}

void connection::send_handshake() {
if (closing)
return;
strand.post( [c = shared_from_this()]() {
std::unique_lock<std::mutex> g_conn( c->conn_mtx );
if( c->populate_handshake( c->last_handshake_sent ) ) {
Expand Down Expand Up @@ -1220,7 +1255,7 @@ namespace eosio {

// called from connection strand
void connection::do_queue_write() {
if( !buffer_queue.ready_to_send() )
if( !buffer_queue.ready_to_send() || closing )
return;
connection_ptr c(shared_from_this());

Expand Down Expand Up @@ -1590,7 +1625,7 @@ namespace eosio {

// if closing the connection we are currently syncing from, then reset our last requested and next expected.
if( c == sync_source ) {
reset_last_requested_num(g);
sync_last_requested_num = 0;
// if starting to sync need to always start from lib as we might be on our own fork
uint32_t lib_num = my_impl->get_chain_lib_num();
sync_next_expected_num = lib_num + 1;
Expand Down Expand Up @@ -1676,7 +1711,7 @@ namespace eosio {
fc_elog( logger, "Unable to continue syncing at this time");
if( !new_sync_source ) sync_source.reset();
sync_known_lib_num = chain_info.lib_num;
reset_last_requested_num(g_sync);
sync_last_requested_num = 0;
set_state( in_sync ); // probably not, but we can't do anything else
return;
}
Expand Down Expand Up @@ -1758,7 +1793,7 @@ namespace eosio {

if( c == sync_source ) {
c->cancel_sync(reason);
reset_last_requested_num(g);
sync_last_requested_num = 0;
request_next_chunk( std::move(g) );
}
}
Expand Down Expand Up @@ -1951,7 +1986,7 @@ namespace eosio {
void sync_manager::rejected_block( const connection_ptr& c, uint32_t blk_num ) {
c->block_status_monitor_.rejected();
std::unique_lock<std::mutex> g( sync_mtx );
reset_last_requested_num(g);
sync_last_requested_num = 0;
if( c->block_status_monitor_.max_events_violated()) {
peer_wlog( c, "block ${bn} not accepted, closing connection", ("bn", blk_num) );
sync_source.reset();
Expand Down Expand Up @@ -2343,8 +2378,7 @@ namespace eosio {
c->send_handshake();
}
} else {
fc_elog( logger, "connection failed to ${host}:${port} ${error}",
("host", endpoint.address().to_string())("port", endpoint.port())( "error", err.message()));
fc_elog( logger, "connection failed to ${a}, ${error}", ("a", c->peer_address())( "error", err.message()));
c->close( false );
}
} ) );
Expand Down Expand Up @@ -2433,9 +2467,8 @@ namespace eosio {
// only called from strand thread
void connection::start_read_message() {
try {
std::size_t minimum_read =
std::atomic_exchange<decltype(outstanding_read_bytes.load())>( &outstanding_read_bytes, 0 );
minimum_read = minimum_read != 0 ? minimum_read : message_header_size;
std::size_t minimum_read = outstanding_read_bytes != 0 ? outstanding_read_bytes : message_header_size;
outstanding_read_bytes = 0;

if (my_impl->use_socket_read_watermark) {
const size_t max_socket_read_watermark = 4096;
Expand Down Expand Up @@ -2622,7 +2655,7 @@ namespace eosio {
peer_ilog( this, "received block ${n} less than ${which}lib ${lib}",
("n", blk_num)("which", blk_num < last_sent_lib ? "sent " : "")
("lib", blk_num < last_sent_lib ? last_sent_lib : lib_num) );
my_impl->sync_master->reset_last_requested_num(my_impl->sync_master->locked_sync_mutex());
my_impl->sync_master->reset_last_requested_num();
enqueue( (sync_request_message) {0, 0} );
send_handshake();
cancel_wait();
Expand Down Expand Up @@ -3252,18 +3285,6 @@ namespace eosio {
// use c in this method instead of this to highlight that all methods called on c-> must be thread safe
connection_ptr c = shared_from_this();

// if we have closed connection then stop processing
if( !c->socket_is_open() ) {
if( bsp ) {
// valid bsp means add_peer_block already called, need to remove it since we are not going to process the block
// call on dispatch strand to serialize with the add_peer_block calls
my_impl->dispatcher->strand.post( [blk_id]() {
my_impl->dispatcher->rm_block( blk_id );
} );
}
return;
}

try {
if( cc.fetch_block_by_id(blk_id) ) {
c->strand.post( [sync_master = my_impl->sync_master.get(),
Expand All @@ -3274,8 +3295,7 @@ namespace eosio {
return;
}
} catch(...) {
// should this even be caught?
fc_elog( logger, "Caught an unknown exception trying to recall block ID" );
fc_elog( logger, "Caught an unknown exception trying to fetch block ${id}", ("id", blk_id) );
}

fc::microseconds age( fc::time_point::now() - msg->timestamp);
Expand Down