Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P: Misc improvements #1040

Merged
merged 5 commits into from
Apr 17, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,19 @@
#include <atomic>
#include <cmath>
#include <memory>
#include <new>
#include <shared_mutex>

// should be defined for c++17, but clang++16 still has not implemented it
#ifdef __cpp_lib_hardware_interference_size
using std::hardware_constructive_interference_size;
using std::hardware_destructive_interference_size;
#else
// 64 bytes on x86-64 │ L1_CACHE_BYTES │ L1_CACHE_SHIFT │ __cacheline_aligned │ ...
[[maybe_unused]] constexpr std::size_t hardware_constructive_interference_size = 64;
[[maybe_unused]] constexpr std::size_t hardware_destructive_interference_size = 64;
#endif

using namespace eosio::chain::plugin_interface;

namespace eosio {
Expand Down Expand Up @@ -122,12 +133,15 @@ namespace eosio {
static constexpr int64_t block_interval_ns =
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::milliseconds(config::block_interval_ms)).count();

alignas(hardware_destructive_interference_size)
std::mutex sync_mtx;
uint32_t sync_known_lib_num{0};
uint32_t sync_last_requested_num{0};
uint32_t sync_next_expected_num{0};
uint32_t sync_req_span{0};
connection_ptr sync_source;

alignas(hardware_destructive_interference_size)
std::atomic<stages> sync_state{in_sync};

private:
Expand Down Expand Up @@ -157,8 +171,11 @@ namespace eosio {
};

class dispatch_manager {
alignas(hardware_destructive_interference_size)
mutable std::mutex blk_state_mtx;
peer_block_state_index blk_state;

alignas(hardware_destructive_interference_size)
mutable std::mutex local_txns_mtx;
node_transaction_index local_txns;

Expand Down Expand Up @@ -262,21 +279,27 @@ namespace eosio {
bool use_socket_read_watermark = false;
/** @} */

alignas(hardware_destructive_interference_size)
mutable std::shared_mutex connections_mtx;
std::set< connection_ptr > connections; // todo: switch to a thread safe container to avoid big mutex over complete collection

alignas(hardware_destructive_interference_size)
std::mutex connector_check_timer_mtx;
unique_ptr<boost::asio::steady_timer> connector_check_timer;
int connector_checks_in_flight{0};

alignas(hardware_destructive_interference_size)
std::mutex expire_timer_mtx;
unique_ptr<boost::asio::steady_timer> expire_timer;

alignas(hardware_destructive_interference_size)
std::mutex keepalive_timer_mtx;
unique_ptr<boost::asio::steady_timer> keepalive_timer;

alignas(hardware_destructive_interference_size)
std::atomic<bool> in_shutdown{false};

alignas(hardware_destructive_interference_size)
compat::channels::transaction_ack::channel_type::handle incoming_transaction_ack_subscription;

uint16_t thread_pool_size = 4;
Expand All @@ -294,6 +317,7 @@ namespace eosio {
};

private:
alignas(hardware_destructive_interference_size)
mutable std::mutex chain_info_mtx; // protects chain_info_t
chain_info_t chain_info;

Expand Down Expand Up @@ -538,6 +562,7 @@ namespace eosio {
std::function<void( boost::system::error_code, std::size_t )> callback;
};

alignas(hardware_destructive_interference_size)
mutable std::mutex _mtx;
uint32_t _write_queue_size{0};
deque<queued_write> _write_queue;
Expand Down Expand Up @@ -613,7 +638,8 @@ namespace eosio {

std::optional<peer_sync_state> peer_requested; // this peer is requesting info from us

std::atomic<bool> socket_open{false};
alignas(hardware_destructive_interference_size)
std::atomic<bool> socket_open{false};

const string peer_addr;
enum connection_types : char {
Expand All @@ -629,7 +655,7 @@ namespace eosio {
std::shared_ptr<tcp::socket> socket; // only accessed through strand after construction

fc::message_buffer<1024*1024> pending_message_buffer;
std::atomic<std::size_t> outstanding_read_bytes{0}; // accessed only from strand threads
std::size_t outstanding_read_bytes{0}; // accessed only from strand threads

queued_buffer buffer_queue;

Expand All @@ -643,10 +669,14 @@ namespace eosio {
// kept in sync with last_handshake_recv.last_irreversible_block_num, only accessed from connection strand
uint32_t peer_lib_num = 0;

alignas(hardware_destructive_interference_size)
std::atomic<uint32_t> trx_in_progress_size{0};

fc::time_point last_dropped_trx_msg_time;
const uint32_t connection_id;
int16_t sent_handshake_count = 0;

alignas(hardware_destructive_interference_size)
std::atomic<bool> connecting{true};
std::atomic<bool> syncing{false};

Expand All @@ -656,11 +686,14 @@ namespace eosio {
std::atomic<bool> is_bp_connection = false;
block_status_monitor block_status_monitor_;

alignas(hardware_destructive_interference_size)
std::mutex response_expected_timer_mtx;
boost::asio::steady_timer response_expected_timer;

alignas(hardware_destructive_interference_size)
std::atomic<go_away_reason> no_retry{no_reason};

alignas(hardware_destructive_interference_size)
mutable std::mutex conn_mtx; //< mtx for last_req .. remote_endpoint_ip
std::optional<request_message> last_req;
handshake_message last_handshake_recv;
Expand Down Expand Up @@ -2430,9 +2463,8 @@ namespace eosio {
// only called from strand thread
void connection::start_read_message() {
try {
std::size_t minimum_read =
std::atomic_exchange<decltype(outstanding_read_bytes.load())>( &outstanding_read_bytes, 0 );
minimum_read = minimum_read != 0 ? minimum_read : message_header_size;
std::size_t minimum_read = outstanding_read_bytes != 0 ? outstanding_read_bytes : message_header_size;
outstanding_read_bytes = 0;

if (my_impl->use_socket_read_watermark) {
const size_t max_socket_read_watermark = 4096;
Expand Down