diff --git a/plugins/net_plugin/include/eosio/net_plugin/auto_bp_peering.hpp b/plugins/net_plugin/include/eosio/net_plugin/auto_bp_peering.hpp index 7f138ef1d7..a394312669 100644 --- a/plugins/net_plugin/include/eosio/net_plugin/auto_bp_peering.hpp +++ b/plugins/net_plugin/include/eosio/net_plugin/auto_bp_peering.hpp @@ -145,7 +145,7 @@ class bp_connection_manager { // Only called from connection strand std::size_t num_established_clients() const { uint32_t num_clients = 0; - self()->for_each_connection([&num_clients](auto&& conn) { + self()->connections.for_each_connection([&num_clients](auto&& conn) { if (established_client_connection(conn)) { ++num_clients; } @@ -158,8 +158,8 @@ class bp_connection_manager { // This should only be called after the first handshake message is received to check if an incoming connection // has exceeded the pre-configured max_client_count limit. bool exceeding_connection_limit(Connection* new_connection) const { - return auto_bp_peering_enabled() && self()->max_client_count != 0 && - established_client_connection(new_connection) && num_established_clients() > self()->max_client_count; + return auto_bp_peering_enabled() && self()->connections.get_max_client_count() != 0 && + established_client_connection(new_connection) && num_established_clients() > self()->connections.get_max_client_count(); } // Only called from main thread @@ -182,7 +182,7 @@ class bp_connection_manager { fc_dlog(self()->get_logger(), "pending_downstream_neighbors: ${pending_downstream_neighbors}", ("pending_downstream_neighbors", to_string(pending_downstream_neighbors))); - for (auto neighbor : pending_downstream_neighbors) { self()->connect(config.bp_peer_addresses[neighbor]); } + for (auto neighbor : pending_downstream_neighbors) { self()->connections.connect(config.bp_peer_addresses[neighbor]); } pending_neighbors = std::move(pending_downstream_neighbors); finder.add_upstream_neighbors(pending_neighbors); @@ -222,7 +222,7 @@ class bp_connection_manager { std::back_inserter(peers_to_drop)); fc_dlog(self()->get_logger(), "peers to drop: ${peers_to_drop}", ("peers_to_drop", to_string(peers_to_drop))); - for (auto account : peers_to_drop) { self()->disconnect(config.bp_peer_addresses[account]); } + for (auto account : peers_to_drop) { self()->connections.disconnect(config.bp_peer_addresses[account]); } active_schedule_version = schedule.version; } } diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 5e873aa03f..dfa1e47d85 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -316,6 +316,75 @@ namespace eosio { constexpr uint32_t signed_block_which = fc::get_index(); // see protocol net_message constexpr uint32_t packed_transaction_which = fc::get_index(); // see protocol net_message + class connections_manager { + alignas(hardware_destructive_interference_size) + mutable std::shared_mutex connections_mtx; + chain::flat_set connections; + chain::flat_set supplied_peers; + + alignas(hardware_destructive_interference_size) + std::mutex connector_check_timer_mtx; + unique_ptr connector_check_timer; + + /// thread safe, only modified on startup + std::chrono::milliseconds heartbeat_timeout{def_keepalive_interval*2}; + fc::microseconds max_cleanup_time; + boost::asio::steady_timer::duration connector_period{0}; + uint32_t max_client_count{def_max_clients}; + std::function update_p2p_connection_metrics; + + private: // must call with held mutex + connection_ptr find_connection_i(const string& host) const; + void add_i(connection_ptr&& c); + void connect_i(const string& peer); + + void connection_monitor(const std::weak_ptr& from_connection); + + public: + void add_supplied_peers(const vector& peers ); + + // not thread safe, only call on startup + void init(std::chrono::milliseconds heartbeat_timeout_ms, + fc::microseconds conn_max_cleanup_time, + boost::asio::steady_timer::duration conn_period, + uint32_t maximum_client_count); + + uint32_t get_max_client_count() const { return max_client_count; } + + fc::microseconds get_connector_period() const; + + void register_update_p2p_connection_metrics(std::function&& fun); + + void connect_supplied_peers(); + + void start_conn_timer(); + void start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr from_connection); + void stop_conn_timer(); + + void add(connection_ptr c); + string connect(const string& host); + string disconnect(const string& host); + void close_all(); + + std::optional status(const string& host) const; + vector connection_statuses() const; + + // return the next connection after current in collection that has blocks above + connection_ptr round_robin_next(const connection_ptr& current, uint32_t sync_known_lib_num) const; + + template + void for_each_connection(Function&& f) const; + + template + void for_each_block_connection(Function&& f) const; + + template + bool any_of_connections(UnaryPredicate&& p) const; + + template + bool any_of_block_connections(UnaryPredicate&& p) const; + }; + class net_plugin_impl : public std::enable_shared_from_this, public auto_bp_peering::bp_connection_manager { public: @@ -323,6 +392,7 @@ namespace eosio { unique_ptr< sync_manager > sync_master; unique_ptr< dispatch_manager > dispatcher; + connections_manager connections; /** * Thread safe, only updated in plugin initialize @@ -331,7 +401,6 @@ namespace eosio { string p2p_address; string p2p_server_address; - chain::flat_set supplied_peers; vector allowed_peers; ///< peer keys allowed to connect std::map private_keys; ///< overlapping with producer keys, also authenticating non-producing nodes @@ -343,21 +412,14 @@ namespace eosio { }; possible_connections allowed_connections{None}; - boost::asio::steady_timer::duration connector_period{0}; boost::asio::steady_timer::duration txn_exp_period{0}; boost::asio::steady_timer::duration resp_expected_period{0}; std::chrono::milliseconds keepalive_interval{std::chrono::milliseconds{def_keepalive_interval}}; - std::chrono::milliseconds heartbeat_timeout{keepalive_interval * 2}; - int max_cleanup_time_ms = 0; - uint32_t max_client_count = 0; uint32_t max_nodes_per_host = 1; bool p2p_accept_transactions = true; fc::microseconds p2p_dedup_cache_expire_time_us{}; - /// Peer clock may be no more than 1 second skewed from our clock, including network latency. - const std::chrono::system_clock::duration peer_authentication_interval{std::chrono::seconds{1}}; - chain_id_type chain_id; fc::sha256 node_id; string user_agent_name; @@ -367,15 +429,6 @@ namespace eosio { bool use_socket_read_watermark = false; /** @} */ - alignas(hardware_destructive_interference_size) - mutable std::shared_mutex connections_mtx; - std::set< connection_ptr > connections; // todo: switch to a thread safe container to avoid big mutex over complete collection - - alignas(hardware_destructive_interference_size) - std::mutex connector_check_timer_mtx; - unique_ptr connector_check_timer; - int connector_checks_in_flight{0}; - alignas(hardware_destructive_interference_size) std::mutex expire_timer_mtx; unique_ptr expire_timer; @@ -404,7 +457,6 @@ namespace eosio { }; - std::function update_p2p_connection_metrics; std::function increment_failed_p2p_connections; std::function increment_dropped_trxs; @@ -425,12 +477,10 @@ namespace eosio { void transaction_ack(const std::pair&); void on_irreversible_block( const block_state_ptr& block ); - void start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr from_connection); void start_expire_timer(); void start_monitors(); void expire(); - void connection_monitor(const std::weak_ptr& from_connection, bool reschedule); /** \name Peer Timestamps * Time message handling * @{ @@ -465,13 +515,6 @@ namespace eosio { constexpr static uint16_t to_protocol_version(uint16_t v); - connection_ptr find_connection(const string& host)const; // must call with held mutex - string connect( const string& host ); - string disconnect( const string& host ); - - template - void for_each_connection(Function&& fun) const; - void plugin_shutdown(); bool in_sync() const; fc::logger& get_logger() { return logger; } @@ -866,7 +909,7 @@ namespace eosio { /** @} */ void blk_send_branch( const block_id_type& msg_head_id ); - void blk_send_branch_impl( uint32_t msg_head_num, uint32_t lib_num, uint32_t head_num ); + void blk_send_branch( uint32_t msg_head_num, uint32_t lib_num, uint32_t head_num ); void blk_send(const block_id_type& blkid); void stop_send(); @@ -940,7 +983,7 @@ namespace eosio { std::lock_guard g_conn( conn_mtx ); return !last_handshake_recv.p2p_address.empty(); } - }; + }; // class connection const string connection::unknown = ""; @@ -1021,27 +1064,38 @@ namespace eosio { template - void net_plugin_impl::for_each_connection( Function&& f ) const { - std::shared_lock g( connections_mtx ); - for( auto& c :connections ) { - if( !f( c ) ) return; - } + void connections_manager::for_each_connection( Function&& f ) const { + std::shared_lock g( connections_mtx ); + std::for_each(connections.begin(), connections.end(), std::forward(f)); } template - void for_each_connection( Function&& f ) { - my_impl->for_each_connection(std::forward(f)); + void connections_manager::for_each_block_connection( Function&& f ) const { + std::shared_lock g( connections_mtx ); + for( auto& c : connections ) { + if( c->is_transactions_only_connection() ) continue; + f( c ); + } } - template - void for_each_block_connection( Function f ) { - std::shared_lock g( my_impl->connections_mtx ); - for( auto& c : my_impl->connections ) { + template + bool connections_manager::any_of_connections(UnaryPredicate&& p) const { + std::shared_lock g(connections_mtx); + return std::any_of(connections.cbegin(), connections.cend(), std::forward(p)); + } + + template + bool connections_manager::any_of_block_connections(UnaryPredicate&& p) const { + std::shared_lock g( connections_mtx ); + for( auto& c : connections ) { if( c->is_transactions_only_connection() ) continue; - if( !f( c ) ) return; + if (p(c)) + return true; } + return false; } + //--------------------------------------------------------------------------- connection::connection( const string& endpoint ) @@ -1067,7 +1121,6 @@ namespace eosio { last_handshake_recv(), last_handshake_sent() { - set_heartbeat_timeout(my_impl->heartbeat_timeout); fc_dlog( logger, "new connection object created" ); } @@ -1187,7 +1240,7 @@ namespace eosio { self->closing = false; if( reconnect && !shutdown ) { - my_impl->start_conn_timer( std::chrono::milliseconds( 100 ), connection_wptr() ); + my_impl->connections.start_conn_timer( std::chrono::milliseconds( 100 ), connection_wptr() ); } } @@ -1235,12 +1288,12 @@ namespace eosio { } else { if( on_fork ) msg_head_num = 0; // if peer on fork, start at their last lib, otherwise we can start at msg_head+1 - blk_send_branch_impl( msg_head_num, lib_num, head_num ); + blk_send_branch( msg_head_num, lib_num, head_num ); } } // called from connection strand - void connection::blk_send_branch_impl( uint32_t msg_head_num, uint32_t lib_num, uint32_t head_num ) { + void connection::blk_send_branch( uint32_t msg_head_num, uint32_t lib_num, uint32_t head_num ) { if( !peer_requested ) { auto last = msg_head_num != 0 ? msg_head_num : lib_num; peer_requested = peer_sync_state( last+1, head_num, last ); @@ -1717,12 +1770,11 @@ namespace eosio { // Closing connection, therefore its view of LIB can no longer be considered as we will no longer be connected. // Determine current LIB of remaining peers as our sync_known_lib_num. uint32_t highest_lib_num = 0; - for_each_block_connection( [&highest_lib_num]( const auto& cc ) { + my_impl->connections.for_each_block_connection( [&highest_lib_num]( const auto& cc ) { std::lock_guard g_conn( cc->conn_mtx ); if( cc->current() && cc->last_handshake_recv.last_irreversible_block_num > highest_lib_num ) { highest_lib_num = cc->last_handshake_recv.last_irreversible_block_num; } - return true; } ); sync_known_lib_num = highest_lib_num; @@ -1761,52 +1813,7 @@ namespace eosio { if (conn && conn->current() ) { new_sync_source = conn; } else { - std::shared_lock g( my_impl->connections_mtx ); - if( my_impl->connections.empty() ) { - new_sync_source.reset(); - } else if( my_impl->connections.size() == 1 ) { - if (!new_sync_source) { - new_sync_source = *my_impl->connections.begin(); - } - } else { - // init to a linear array search - auto cptr = my_impl->connections.begin(); - auto cend = my_impl->connections.end(); - // do we remember the previous source? - if (new_sync_source) { - //try to find it in the list - cptr = my_impl->connections.find( new_sync_source ); - cend = cptr; - if( cptr == my_impl->connections.end() ) { - //not there - must have been closed! cend is now connections.end, so just flatten the ring. - new_sync_source.reset(); - cptr = my_impl->connections.begin(); - } else { - //was found - advance the start to the next. cend is the old source. - if( ++cptr == my_impl->connections.end() && cend != my_impl->connections.end() ) { - cptr = my_impl->connections.begin(); - } - } - } - - //scan the list of peers looking for another able to provide sync blocks. - if( cptr != my_impl->connections.end() ) { - auto cstart_it = cptr; - do { - //select the first one which is current and has valid lib and break out. - if( !(*cptr)->is_transactions_only_connection() && (*cptr)->current() ) { - std::lock_guard g_conn( (*cptr)->conn_mtx ); - if( (*cptr)->last_handshake_recv.last_irreversible_block_num >= sync_known_lib_num ) { - new_sync_source = *cptr; - break; - } - } - if( ++cptr == my_impl->connections.end() ) - cptr = my_impl->connections.begin(); - } while( cptr != cstart_it ); - } - // no need to check the result, either source advanced or the whole list was checked and the old source is reused. - } + new_sync_source = my_impl->connections.round_robin_next(new_sync_source, sync_known_lib_num); } // verify there is an available source @@ -1844,11 +1851,10 @@ namespace eosio { // static, thread safe void sync_manager::send_handshakes() { - for_each_connection( []( auto& ci ) { + my_impl->connections.for_each_connection( []( auto& ci ) { if( ci->current() ) { ci->send_handshake(); } - return true; } ); } @@ -2016,14 +2022,17 @@ namespace eosio { bool sync_manager::verify_catchup(const connection_ptr& c, uint32_t num, const block_id_type& id) { request_message req; req.req_blocks.mode = catch_up; - for_each_block_connection( [num, &id, &req]( const auto& cc ) { + auto is_fork_head_greater = [num, &id, &req]( const auto& cc ) { std::lock_guard g_conn( cc->conn_mtx ); if( cc->fork_head_num > num || cc->fork_head == id ) { req.req_blocks.mode = none; - return false; + return true; } - return true; - } ); + return false; + }; + if (my_impl->connections.any_of_block_connections(is_fork_head_greater)) { + req.req_blocks.mode = none; + } if( req.req_blocks.mode == catch_up ) { { std::lock_guard g( sync_mtx ); @@ -2136,7 +2145,7 @@ namespace eosio { block_id_type null_id; bool set_state_to_head_catchup = false; - for_each_block_connection( [&null_id, blk_num, &blk_id, &c, &set_state_to_head_catchup]( const auto& cp ) { + my_impl->connections.for_each_block_connection( [&null_id, blk_num, &blk_id, &c, &set_state_to_head_catchup]( const auto& cp ) { std::unique_lock g_cp_conn( cp->conn_mtx ); uint32_t fork_head_num = cp->fork_head_num; block_id_type fork_head_id = cp->fork_head; @@ -2150,7 +2159,6 @@ namespace eosio { } else { set_state_to_head_catchup = true; } - return true; } ); if( set_state_to_head_catchup ) { @@ -2269,14 +2277,14 @@ namespace eosio { block_buffer_factory buff_factory; const auto bnum = b->block_num(); - for_each_block_connection( [this, &id, &bnum, &b, &buff_factory]( auto& cp ) { + my_impl->connections.for_each_block_connection( [this, &id, &bnum, &b, &buff_factory]( auto& cp ) { fc_dlog( logger, "socket_is_open ${s}, connecting ${c}, syncing ${ss}, connection ${cid}", ("s", cp->socket_is_open())("c", cp->connecting.load())("ss", cp->syncing.load())("cid", cp->connection_id) ); - if( !cp->current() ) return true; + if( !cp->current() ) return; if( !add_peer_block( id, cp->connection_id ) ) { fc_dlog( logger, "not bcast block ${b} to connection ${cid}", ("b", bnum)("cid", cp->connection_id) ); - return true; + return; } send_buffer_type sb = buff_factory.get_send_buffer( b ); @@ -2289,7 +2297,6 @@ namespace eosio { cp->enqueue_buffer( sb, no_reason ); } }); - return true; } ); } @@ -2318,12 +2325,12 @@ namespace eosio { void dispatch_manager::bcast_transaction(const packed_transaction_ptr& trx) { trx_buffer_factory buff_factory; const fc::time_point_sec now{fc::time_point::now()}; - for_each_connection( [this, &trx, &now, &buff_factory]( auto& cp ) { + my_impl->connections.for_each_connection( [this, &trx, &now, &buff_factory]( auto& cp ) { if( cp->is_blocks_only_connection() || !cp->current() ) { - return true; + return; } if( !add_peer_txn(trx->id(), trx->expiration(), cp->connection_id, now) ) { - return true; + return; } send_buffer_type sb = buff_factory.get_send_buffer( trx ); @@ -2331,7 +2338,6 @@ namespace eosio { cp->strand.post( [cp, sb{std::move(sb)}]() { cp->enqueue_buffer( sb, no_reason ); } ); - return true; } ); } @@ -2381,14 +2387,14 @@ namespace eosio { } last_req = *c->last_req; } - for_each_block_connection( [this, &c, &last_req, &bid]( auto& conn ) { + auto request_from_peer = [this, &c, &last_req, &bid]( auto& conn ) { if( conn == c ) - return true; + return false; { std::lock_guard guard( conn->conn_mtx ); if( conn->last_req ) { - return true; + return false; } } @@ -2400,16 +2406,18 @@ namespace eosio { std::lock_guard g_conn_conn( conn->conn_mtx ); conn->last_req = last_req; } ); - return false; + return true; } - return true; - } ); + return false; + }; - // at this point no other peer has it, re-request or do nothing? - peer_wlog( c, "no peer has last_req" ); - if( c->connected() ) { - c->enqueue( last_req ); - c->fetch_wait(); + if (!my_impl->connections.any_of_block_connections(request_from_peer)) { + // at this point no other peer has it, re-request or do nothing? + peer_wlog(c, "no peer has last_req"); + if (c->connected()) { + c->enqueue(last_req); + c->fetch_wait(); + } } } @@ -2437,9 +2445,9 @@ namespace eosio { connection_ptr c = shared_from_this(); if( consecutive_immediate_connection_close > def_max_consecutive_immediate_connection_close || no_retry == benign_other ) { - auto connector_period_us = std::chrono::duration_cast( my_impl->connector_period ); + fc::microseconds connector_period = my_impl->connections.get_connector_period(); std::lock_guard g( c->conn_mtx ); - if( last_close == fc::time_point() || last_close > fc::time_point::now() - fc::microseconds( connector_period_us.count() ) ) { + if( last_close == fc::time_point() || last_close > fc::time_point::now() - connector_period ) { return true; // true so doesn't remove from valid connections } } @@ -2501,7 +2509,7 @@ namespace eosio { state_(impl) {} std::string extra_listening_log_info() { - return ", max clients is " + std::to_string(state_->max_client_count); + return ", max clients is " + std::to_string(state_->connections.get_max_client_count()); } void create_session(tcp::socket&& socket) { @@ -2514,7 +2522,7 @@ namespace eosio { fc_elog(logger, "Error getting remote endpoint: ${m}", ("m", rec.message())); } else { paddr_str = paddr_add.to_string(); - state_->for_each_connection([&visitors, &from_addr, &paddr_str](auto& conn) { + state_->connections.for_each_connection([&visitors, &from_addr, &paddr_str](auto& conn) { if (conn->socket_is_open()) { if (conn->peer_address().empty()) { ++visitors; @@ -2524,18 +2532,16 @@ namespace eosio { } } } - return true; }); if (from_addr < state_->max_nodes_per_host && - (state_->auto_bp_peering_enabled() || state_->max_client_count == 0 || - visitors < state_->max_client_count)) { + (state_->auto_bp_peering_enabled() || state_->connections.get_max_client_count() == 0 || + visitors < state_->connections.get_max_client_count())) { fc_ilog(logger, "Accepted new connection: " + paddr_str); connection_ptr new_connection = std::make_shared(std::move(socket)); new_connection->strand.post([new_connection, state = state_]() { if (new_connection->start_session()) { - std::lock_guard g_unique(state->connections_mtx); - state->connections.insert(new_connection); + state->connections.add(new_connection); } }); @@ -2544,7 +2550,7 @@ namespace eosio { fc_dlog(logger, "Number of connections (${n}) from ${ra} exceeds limit ${l}", ("n", from_addr + 1)("ra", paddr_str)("l", state_->max_nodes_per_host)); } else { - fc_dlog(logger, "max_client_count ${m} exceeded", ("m", state_->max_client_count)); + fc_dlog(logger, "max_client_count ${m} exceeded", ("m", state_->connections.get_max_client_count())); } // new_connection never added to connections and start_session not called, lifetime will end boost::system::error_code ec; @@ -2825,11 +2831,8 @@ namespace eosio { void net_plugin_impl::plugin_shutdown() { in_shutdown = true; - { - std::lock_guard g( connector_check_timer_mtx ); - if( connector_check_timer ) - connector_check_timer->cancel(); - } + + connections.stop_conn_timer(); { std::lock_guard g( expire_timer_mtx ); if( expire_timer ) @@ -2841,16 +2844,7 @@ namespace eosio { keepalive_timer->cancel(); } - { - fc_ilog( logger, "close ${s} connections", ("s", connections.size()) ); - std::lock_guard g( connections_mtx ); - for( auto& con : connections ) { - fc_dlog( logger, "close: ${cid}", ("cid", con->connection_id) ); - con->close( false, true ); - } - connections.clear(); - } - + connections.close_all(); thread_pool.stop(); } @@ -2960,8 +2954,8 @@ namespace eosio { // from, but it would be different from the address it is listening. The only way to make sure is when the // first handshake message is received with the p2p_address information in the message. Thus the connection // limit checking has to be here when auto bp peering is enabled. - fc_dlog(logger, "max_client_count ${m} exceeded", ("m", my_impl->max_client_count)); - my_impl->disconnect(peer_address()); + fc_dlog(logger, "max_client_count ${m} exceeded", ("m", my_impl->connections.get_max_client_count())); + my_impl->connections.disconnect(peer_address()); return; } @@ -2976,10 +2970,9 @@ namespace eosio { auto c_time = last_handshake_sent.time; g_conn.unlock(); peer_dlog( this, "checking for duplicate" ); - std::shared_lock g_cnts( my_impl->connections_mtx ); - for(const auto& check : my_impl->connections) { + auto is_duplicate = [&](const auto& check) { if(check.get() == this) - continue; + return false; std::unique_lock g_check_conn( check->conn_mtx ); fc_dlog( logger, "dup check: connected ${c}, ${l} =? ${r}", ("c", check->connected())("l", check->last_handshake_recv.node_id)("r", msg.node_id) ); @@ -2993,31 +2986,33 @@ namespace eosio { auto check_time = check->last_handshake_sent.time + check->last_handshake_recv.time; g_check_conn.unlock(); if (msg.time + c_time <= check_time) - continue; + return false; } else if (net_version < proto_dup_node_id_goaway || msg.network_version < proto_dup_node_id_goaway) { if (my_impl->p2p_address < msg.p2p_address) { fc_dlog( logger, "my_impl->p2p_address '${lhs}' < msg.p2p_address '${rhs}'", ("lhs", my_impl->p2p_address)( "rhs", msg.p2p_address ) ); // only the connection from lower p2p_address to higher p2p_address will be considered as a duplicate, // so there is no chance for both connections to be closed - continue; + return false; } } else if (my_impl->node_id < msg.node_id) { fc_dlog( logger, "not duplicate, my_impl->node_id '${lhs}' < msg.node_id '${rhs}'", ("lhs", my_impl->node_id)("rhs", msg.node_id) ); // only the connection from lower node_id to higher node_id will be considered as a duplicate, // so there is no chance for both connections to be closed - continue; + return false; } - - g_cnts.unlock(); - peer_dlog( this, "sending go_away duplicate, msg.p2p_address: ${add}", ("add", msg.p2p_address) ); - go_away_message gam(duplicate); - gam.node_id = conn_node_id; - enqueue(gam); - no_retry = duplicate; - return; + return true; } + return false; + }; + if (my_impl->connections.any_of_connections(std::move(is_duplicate))) { + peer_dlog( this, "sending go_away duplicate, msg.p2p_address: ${add}", ("add", msg.p2p_address) ); + go_away_message gam(duplicate); + gam.node_id = conn_node_id; + enqueue(gam); + no_retry = duplicate; + return; } } else { peer_dlog( this, "skipping duplicate check, addr == ${pa}, id = ${ni}", @@ -3452,28 +3447,6 @@ namespace eosio { } } - // called from any thread - void net_plugin_impl::start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr from_connection) { - if( in_shutdown ) return; - std::lock_guard g( connector_check_timer_mtx ); - ++connector_checks_in_flight; - connector_check_timer->expires_from_now( du ); - connector_check_timer->async_wait( [my = shared_from_this(), from_connection{std::move(from_connection)}](boost::system::error_code ec) mutable { - std::unique_lock g( my->connector_check_timer_mtx ); - int num_in_flight = --my->connector_checks_in_flight; - g.unlock(); - if( !ec ) { - my->connection_monitor(from_connection, num_in_flight == 0 ); - } else { - if( num_in_flight == 0 ) { - if( my->in_shutdown ) return; - fc_elog( logger, "Error from connection check monitor: ${m}", ("m", ec.message())); - my->start_conn_timer( my->connector_period, std::weak_ptr() ); - } - } - }); - } - // thread safe void net_plugin_impl::start_expire_timer() { if( in_shutdown ) return; @@ -3503,27 +3476,22 @@ namespace eosio { } tstamp current_time = connection::get_time(); - my->for_each_connection( [current_time]( auto& c ) { + my->connections.for_each_connection( [current_time]( auto& c ) { if( c->socket_is_open() ) { c->strand.post([c, current_time]() { c->check_heartbeat(current_time); } ); } - return true; } ); } ); } void net_plugin_impl::start_monitors() { - { - std::lock_guard g( connector_check_timer_mtx ); - connector_check_timer = std::make_unique( my_impl->thread_pool.get_executor() ); - } { std::lock_guard g( expire_timer_mtx ); expire_timer = std::make_unique( my_impl->thread_pool.get_executor() ); } - start_conn_timer(connector_period, std::weak_ptr()); + connections.start_conn_timer(); start_expire_timer(); } @@ -3537,64 +3505,6 @@ namespace eosio { start_expire_timer(); } - // called from any thread - void net_plugin_impl::connection_monitor(const std::weak_ptr& from_connection, bool reschedule ) { - auto max_time = fc::time_point::now(); - max_time += fc::milliseconds(max_cleanup_time_ms); - auto from = from_connection.lock(); - std::unique_lock g( connections_mtx ); - auto it = (from ? connections.find(from) : connections.begin()); - if (it == connections.end()) it = connections.begin(); - size_t num_rm = 0, num_clients = 0, num_peers = 0, num_bp_peers = 0; - while (it != connections.end()) { - if (fc::time_point::now() >= max_time) { - connection_wptr wit = *it; - g.unlock(); - fc_dlog( logger, "Exiting connection monitor early, ran out of time: ${t}", ("t", max_time - fc::time_point::now()) ); - fc_ilog( logger, "p2p client connections: ${num}/${max}, peer connections: ${pnum}/${pmax}", - ("num", num_clients)("max", max_client_count)("pnum", num_peers)("pmax", supplied_peers.size()) ); - if( reschedule ) { - start_conn_timer( std::chrono::milliseconds( 1 ), wit ); // avoid exhausting - } - return; - } - if ((*it)->is_bp_connection) - ++num_bp_peers; - else if ((*it)->incoming()) - ++num_clients; - else - ++num_peers; - - if( !(*it)->socket_is_open() && !(*it)->connecting) { - if( !(*it)->incoming() ) { - if( !(*it)->resolve_and_connect() ) { - it = connections.erase(it); - --num_peers; ++num_rm; - continue; - } - } else { - --num_clients; ++num_rm; - it = connections.erase(it); - continue; - } - } - ++it; - } - g.unlock(); - - if (update_p2p_connection_metrics) { - update_p2p_connection_metrics({.num_peers = num_peers, .num_clients = num_clients}); - } - - if( num_clients > 0 || num_peers > 0 ) - fc_ilog( logger, "p2p client connections: ${num}/${max}, peer connections: ${pnum}/${pmax}, block producer peers: ${num_bp_peers}", - ("num", num_clients)("max", max_client_count)("pnum", num_peers)("pmax", supplied_peers.size())("num_bp_peers", num_bp_peers) ); - fc_dlog( logger, "connection monitor, removed ${n} connections", ("n", num_rm) ); - if( reschedule ) { - start_conn_timer( connector_period, std::weak_ptr()); - } - } - // called from application thread void net_plugin_impl::on_accepted_block_header(const block_state_ptr& bs) { update_chain_info(); @@ -3765,9 +3675,9 @@ namespace eosio { ( "peer-key", bpo::value>()->composing()->multitoken(), "Optional public key of peer allowed to connect. May be used multiple times.") ( "peer-private-key", boost::program_options::value>()->composing()->multitoken(), "Tuple of [PublicKey, WIF private key] (may specify multiple times)") - ( "max-clients", bpo::value()->default_value(def_max_clients), "Maximum number of clients from which connections are accepted, use 0 for no limit") + ( "max-clients", bpo::value()->default_value(def_max_clients), "Maximum number of clients from which connections are accepted, use 0 for no limit") ( "connection-cleanup-period", bpo::value()->default_value(def_conn_retry_wait), "number of seconds to wait before cleaning up dead connections") - ( "max-cleanup-time-msec", bpo::value()->default_value(10), "max connection cleanup time per cleanup call in milliseconds") + ( "max-cleanup-time-msec", bpo::value()->default_value(10), "max connection cleanup time per cleanup call in milliseconds") ( "p2p-dedup-cache-expire-time-sec", bpo::value()->default_value(10), "Maximum time to track transaction for duplicate optimization") ( "net-threads", bpo::value()->default_value(my->thread_pool_size), "Number of worker threads in net_plugin thread pool" ) @@ -3803,12 +3713,9 @@ namespace eosio { my->sync_master = std::make_unique( options.at( "sync-fetch-span" ).as()); - my->connector_period = std::chrono::seconds( options.at( "connection-cleanup-period" ).as()); - my->max_cleanup_time_ms = options.at("max-cleanup-time-msec").as(); my->txn_exp_period = def_txn_expire_wait; my->p2p_dedup_cache_expire_time_us = fc::seconds( options.at( "p2p-dedup-cache-expire-time-sec" ).as() ); my->resp_expected_period = def_resp_expected_wait; - my->max_client_count = options.at( "max-clients" ).as(); my->max_nodes_per_host = options.at( "p2p-max-nodes-per-host" ).as(); my->p2p_accept_transactions = options.at( "p2p-accept-transactions" ).as(); @@ -3817,9 +3724,10 @@ namespace eosio { EOS_ASSERT( my->keepalive_interval.count() > 0, chain::plugin_config_exception, "p2p-keepalive_interval-ms must be greater than 0" ); - if( options.count( "p2p-keepalive-interval-ms" )) { - my->heartbeat_timeout = std::chrono::milliseconds( options.at( "p2p-keepalive-interval-ms" ).as() * 2 ); - } + my->connections.init( std::chrono::milliseconds( options.at("p2p-keepalive-interval-ms").as() * 2 ), + fc::microseconds( options.at("max-cleanup-time-msec").as() ), + std::chrono::seconds( options.at("connection-cleanup-period").as() ), + options.at("max-clients").as() ); if( options.count( "p2p-listen-endpoint" ) && options.at("p2p-listen-endpoint").as().length()) { my->p2p_address = options.at( "p2p-listen-endpoint" ).as(); @@ -3836,9 +3744,10 @@ namespace eosio { EOS_ASSERT( my->thread_pool_size > 0, chain::plugin_config_exception, "net-threads ${num} must be greater than 0", ("num", my->thread_pool_size) ); + std::vector peers; if( options.count( "p2p-peer-address" )) { - auto v = options.at( "p2p-peer-address" ).as >(); - my->supplied_peers.insert(v.begin(), v.end()); + peers = options.at( "p2p-peer-address" ).as>(); + my->connections.add_supplied_peers(peers); } if( options.count( "agent-name" )) { my->user_agent_name = options.at( "agent-name" ).as(); @@ -3848,8 +3757,8 @@ namespace eosio { if ( options.count( "p2p-auto-bp-peer")) { my->set_bp_peers(options.at( "p2p-auto-bp-peer" ).as>()); - my->for_each_bp_peer_address([this](const auto& addr) { - EOS_ASSERT(my->supplied_peers.count(addr) == 0, chain::plugin_config_exception, + my->for_each_bp_peer_address([&peers](const auto& addr) { + EOS_ASSERT(std::find(peers.begin(), peers.end(), addr) == peers.end(), chain::plugin_config_exception, "\"${addr}\" should only appear in either p2p-peer-address or p2p-auto-bp-peer option, not both.", ("addr",addr)); }); @@ -3989,9 +3898,7 @@ namespace eosio { my->ticker(); my->start_monitors(); my->update_chain_info(); - for( const auto& seed_node : my->supplied_peers ) { - my->connect( seed_node ); - } + my->connections.connect_supplied_peers(); }); } catch( ... ) { @@ -4016,92 +3923,301 @@ namespace eosio { FC_CAPTURE_AND_RETHROW() } - /** - * Used to trigger a new connection from RPC API - */ + /// RPC API string net_plugin::connect( const string& host ) { - return my->connect( host ); + return my->connections.connect( host ); } - string net_plugin_impl::connect( const string& host ) { - std::lock_guard g( connections_mtx ); - if( find_connection( host ) ) - return "already connected"; + /// RPC API + string net_plugin::disconnect( const string& host ) { + return my->connections.disconnect(host); + } - connection_ptr c = std::make_shared( host ); - fc_dlog( logger, "calling active connector: ${h}", ("h", host) ); - if( c->resolve_and_connect() ) { - fc_dlog( logger, "adding new connection to the list: ${host} ${cid}", ("host", host)("cid", c->connection_id) ); - c->set_heartbeat_timeout( heartbeat_timeout ); - connections.insert( c ); + /// RPC API + std::optional net_plugin::status( const string& host )const { + return my->connections.status(host); + } + + /// RPC API + vector net_plugin::connections()const { + return my->connections.connection_statuses(); + } + + constexpr uint16_t net_plugin_impl::to_protocol_version(uint16_t v) { + if (v >= net_version_base) { + v -= net_version_base; + return (v > net_version_range) ? 0 : v; + } + return 0; + } + + bool net_plugin_impl::in_sync() const { + return sync_master->is_in_sync(); + } + + void net_plugin::register_update_p2p_connection_metrics(std::function&& fun){ + my->connections.register_update_p2p_connection_metrics(std::move(fun)); + } + + void net_plugin::register_increment_failed_p2p_connections(std::function&& fun){ + my->increment_failed_p2p_connections = std::move(fun); + } + + void net_plugin::register_increment_dropped_trxs(std::function&& fun){ + my->increment_dropped_trxs = std::move(fun); + } + + //---------------------------------------------------------------------------- + + void connections_manager::add_supplied_peers(const vector& peers ) { + std::lock_guard g(connections_mtx); + supplied_peers.insert( peers.begin(), peers.end() ); + } + + // not thread safe, only call on startup + void connections_manager::init( std::chrono::milliseconds heartbeat_timeout_ms, + fc::microseconds conn_max_cleanup_time, + boost::asio::steady_timer::duration conn_period, + uint32_t maximum_client_count ) { + heartbeat_timeout = heartbeat_timeout_ms; + max_cleanup_time = conn_max_cleanup_time; + connector_period = conn_period; + max_client_count = maximum_client_count; + } + + fc::microseconds connections_manager::get_connector_period() const { + auto connector_period_us = std::chrono::duration_cast( connector_period ); + return fc::microseconds{ connector_period_us.count() }; + } + + void connections_manager::register_update_p2p_connection_metrics(std::function&& fun){ + update_p2p_connection_metrics = std::move(fun); + } + + void connections_manager::connect_supplied_peers() { + std::lock_guard g(connections_mtx); + for (const auto& peer : supplied_peers) { + connect_i(peer); } + } + + void connections_manager::add( connection_ptr c ) { + std::lock_guard g( connections_mtx ); + add_i( std::move(c) ); + } + + // called by API + string connections_manager::connect( const string& host ) { + std::lock_guard g( connections_mtx ); + if( find_connection_i( host ) ) + return "already connected"; + + connect_i( host ); + supplied_peers.insert(host); return "added connection"; } - string net_plugin_impl::disconnect( const string& host ) { - std::lock_guard g( connections_mtx ); - for( auto itr = connections.begin(); itr != connections.end(); ++itr ) { - if( (*itr)->peer_address() == host ) { - fc_ilog( logger, "disconnecting: ${cid}", ("cid", (*itr)->connection_id) ); - (*itr)->close(); - connections.erase(itr); - return "connection removed"; - } + // called by API + string connections_manager::disconnect( const string& host ) { + std::lock_guard g( connections_mtx ); + if( auto c = find_connection_i( host ) ) { + fc_ilog( logger, "disconnecting: ${cid}", ("cid", c->connection_id) ); + c->close(); + connections.erase(c); + supplied_peers.erase(host); + return "connection removed"; } return "no known connection for host"; } - string net_plugin::disconnect( const string& host ) { - return my->disconnect(host); + void connections_manager::close_all() { + fc_ilog( logger, "close all ${s} connections", ("s", connections.size()) ); + std::lock_guard g( connections_mtx ); + for( auto& con : connections ) { + fc_dlog( logger, "close: ${cid}", ("cid", con->connection_id) ); + con->close( false, true ); + } + connections.clear(); } - std::optional net_plugin::status( const string& host )const { - std::shared_lock g( my->connections_mtx ); - auto con = my->find_connection( host ); - if( con ) + connection_ptr connections_manager::round_robin_next( const connection_ptr& current, uint32_t sync_known_lib_num ) const { + connection_ptr new_sync_source = current; + std::shared_lock g( connections_mtx ); + if( connections.empty() ) { + new_sync_source.reset(); + } else if( connections.size() == 1 ) { + if (!new_sync_source) { + new_sync_source = *connections.begin(); + } + } else { + // init to a linear array search + auto cptr = connections.begin(); + auto cend = connections.end(); + // do we remember the previous source? + if (current) { + //try to find it in the list + cptr = connections.find( current ); + cend = cptr; + if( cptr == connections.end() ) { + // not there - must have been closed! cend is now connections.end, so just flatten the ring. + new_sync_source.reset(); + cptr = connections.begin(); + } else { + // was found - advance the start to the next. cend is the old source. + if( ++cptr == connections.end() ) { + cptr = connections.begin(); + } + } + } + + // scan the list of peers looking for another able to provide sync blocks. + if( cptr != connections.end() ) { + auto cstart_it = cptr; + do { + // select the first one which is current and has lib above current and break out. + if( !(*cptr)->is_transactions_only_connection() && (*cptr)->current() ) { + std::lock_guard g_conn( (*cptr)->conn_mtx ); + // TODO: change to a better heuristic than lib >= sync_known_lib_num + if( (*cptr)->last_handshake_recv.last_irreversible_block_num >= sync_known_lib_num ) { + new_sync_source = *cptr; + break; + } + } + if( ++cptr == connections.end() ) + cptr = connections.begin(); + } while( cptr != cstart_it ); + } + + // no need to check the result, either source advanced or the whole list was checked and the old source is reused. + } + return new_sync_source; + } + + std::optional connections_manager::status( const string& host )const { + std::shared_lock g( connections_mtx ); + auto con = find_connection_i( host ); + if( con ) { return con->get_status(); + } return {}; } - vector net_plugin::connections()const { + vector connections_manager::connection_statuses()const { vector result; - std::shared_lock g( my->connections_mtx ); - result.reserve( my->connections.size() ); - for( const auto& c : my->connections ) { + std::shared_lock g( connections_mtx ); + result.reserve( connections.size() ); + for( const auto& c : connections ) { result.push_back( c->get_status() ); } return result; } // call with connections_mtx - connection_ptr net_plugin_impl::find_connection( const string& host )const { - for( const auto& c : connections ) - if( c->peer_address() == host ) return c; + connection_ptr connections_manager::find_connection_i( const string& host )const { + for( const auto& c : connections ) { + if (c->peer_address() == host) + return c; + } return {}; } - constexpr uint16_t net_plugin_impl::to_protocol_version(uint16_t v) { - if (v >= net_version_base) { - v -= net_version_base; - return (v > net_version_range) ? 0 : v; + // call with connections_mtx + void connections_manager::connect_i( const string& host ) { + connection_ptr c = std::make_shared( host ); + fc_dlog( logger, "calling active connector: ${h}", ("h", host) ); + if( c->resolve_and_connect() ) { + fc_dlog( logger, "adding new connection to the list: ${host} ${cid}", ("host", host)("cid", c->connection_id) ); + add_i( std::move(c) ); } - return 0; } - bool net_plugin_impl::in_sync() const { - return sync_master->is_in_sync(); + // call with connections_mtx + void connections_manager::add_i(connection_ptr&& c) { + c->set_heartbeat_timeout( heartbeat_timeout ); + connections.insert( std::move(c) ); } - void net_plugin::register_update_p2p_connection_metrics(std::function&& fun){ - my->update_p2p_connection_metrics = std::move(fun); + // called from any thread + void connections_manager::start_conn_timer() { + start_conn_timer(connector_period, {}); // this locks mutex } - void net_plugin::register_increment_failed_p2p_connections(std::function&& fun){ - my->increment_failed_p2p_connections = std::move(fun); + // called from any thread + void connections_manager::start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr from_connection) { + std::lock_guard g( connector_check_timer_mtx ); + if (!connector_check_timer) { + connector_check_timer = std::make_unique( my_impl->thread_pool.get_executor() ); + } + connector_check_timer->expires_from_now( du ); + connector_check_timer->async_wait( [this, from_connection{std::move(from_connection)}](boost::system::error_code ec) mutable { + if( !ec ) { + connection_monitor(from_connection); + } + }); } - void net_plugin::register_increment_dropped_trxs(std::function&& fun){ - my->increment_dropped_trxs = std::move(fun); + void connections_manager::stop_conn_timer() { + std::lock_guard g( connector_check_timer_mtx ); + if (connector_check_timer) { + connector_check_timer->cancel(); + } + } + + // called from any thread + void connections_manager::connection_monitor(const std::weak_ptr& from_connection) { + auto max_time = fc::time_point::now().safe_add(max_cleanup_time); + auto from = from_connection.lock(); + std::unique_lock g( connections_mtx ); + auto it = (from ? connections.find(from) : connections.begin()); + if (it == connections.end()) it = connections.begin(); + size_t num_rm = 0, num_clients = 0, num_peers = 0, num_bp_peers = 0; + while (it != connections.end()) { + if (fc::time_point::now() >= max_time) { + connection_wptr wit = *it; + g.unlock(); + fc_dlog( logger, "Exiting connection monitor early, ran out of time: ${t}", ("t", max_time - fc::time_point::now()) ); + fc_ilog( logger, "p2p client connections: ${num}/${max}, peer connections: ${pnum}/${pmax}", + ("num", num_clients)("max", max_client_count)("pnum", num_peers)("pmax", supplied_peers.size()) ); + start_conn_timer( std::chrono::milliseconds( 1 ), wit ); // avoid exhausting + return; + } + if ((*it)->is_bp_connection) { + ++num_bp_peers; + } else if ((*it)->incoming()) { + ++num_clients; + } else { + ++num_peers; + } + + if (!(*it)->socket_is_open() && !(*it)->connecting) { + if (!(*it)->incoming()) { + if (!(*it)->resolve_and_connect()) { + it = connections.erase(it); + --num_peers; + ++num_rm; + continue; + } + } else { + --num_clients; + ++num_rm; + it = connections.erase(it); + continue; + } + } + ++it; + } + g.unlock(); + + if (update_p2p_connection_metrics) { + update_p2p_connection_metrics({.num_peers = num_peers, .num_clients = num_clients}); + } + + if( num_clients > 0 || num_peers > 0 ) { + fc_ilog(logger, "p2p client connections: ${num}/${max}, peer connections: ${pnum}/${pmax}, block producer peers: ${num_bp_peers}", + ("num", num_clients)("max", max_client_count)("pnum", num_peers)("pmax", supplied_peers.size())("num_bp_peers", num_bp_peers)); + } + fc_dlog( logger, "connection monitor, removed ${n} connections", ("n", num_rm) ); + start_conn_timer( connector_period, {}); } -} +} // namespace eosio diff --git a/plugins/net_plugin/tests/auto_bp_peering_unittest.cpp b/plugins/net_plugin/tests/auto_bp_peering_unittest.cpp index 485234e4d8..93ac898a5b 100644 --- a/plugins/net_plugin/tests/auto_bp_peering_unittest.cpp +++ b/plugins/net_plugin/tests/auto_bp_peering_unittest.cpp @@ -14,13 +14,14 @@ struct mock_connection { using namespace eosio::chain::literals; using namespace std::literals::string_literals; -struct mock_net_plugin : eosio::auto_bp_peering::bp_connection_manager { - - uint32_t max_client_count; - bool is_in_sync = false; +struct mock_connections_manager { + uint32_t max_client_count = 0; std::vector connections; - bool in_sync() { return is_in_sync; } + std::function connect; + std::function disconnect; + + uint32_t get_max_client_count() const { return max_client_count; } template void for_each_connection(Function&& func) const { @@ -29,9 +30,14 @@ struct mock_net_plugin : eosio::auto_bp_peering::bp_connection_manager connect; - std::function disconnect; +struct mock_net_plugin : eosio::auto_bp_peering::bp_connection_manager { + + bool is_in_sync = false; + mock_connections_manager connections; + + bool in_sync() { return is_in_sync; } void setup_test_peers() { set_bp_peers({ "proda,127.0.0.1:8001:blk"s, "prodb,127.0.0.1:8002:trx"s, "prodc,127.0.0.1:8003"s, @@ -159,7 +165,7 @@ BOOST_AUTO_TEST_CASE(test_on_pending_schedule) { std::vector connected_hosts; - plugin.connect = [&connected_hosts](std::string host) { connected_hosts.push_back(host); }; + plugin.connections.connect = [&connected_hosts](std::string host) { connected_hosts.push_back(host); }; // make sure nothing happens when it is not in_sync plugin.is_in_sync = false; @@ -203,10 +209,10 @@ BOOST_AUTO_TEST_CASE(test_on_active_schedule1) { plugin.config.my_bp_accounts = { "prodd"_n, "produ"_n }; plugin.active_neighbors = { "proda"_n, "prodh"_n, "prodn"_n }; - plugin.connect = [](std::string host) {}; + plugin.connections.connect = [](std::string host) {}; std::vector disconnected_hosts; - plugin.disconnect = [&disconnected_hosts](std::string host) { disconnected_hosts.push_back(host); }; + plugin.connections.disconnect = [&disconnected_hosts](std::string host) { disconnected_hosts.push_back(host); }; // make sure nothing happens when it is not in_sync plugin.is_in_sync = false; @@ -239,9 +245,9 @@ BOOST_AUTO_TEST_CASE(test_on_active_schedule2) { plugin.config.my_bp_accounts = { "prodd"_n, "produ"_n }; plugin.active_neighbors = { "proda"_n, "prodh"_n, "prodn"_n }; - plugin.connect = [](std::string host) {}; + plugin.connections.connect = [](std::string host) {}; std::vector disconnected_hosts; - plugin.disconnect = [&disconnected_hosts](std::string host) { disconnected_hosts.push_back(host); }; + plugin.connections.disconnect = [&disconnected_hosts](std::string host) { disconnected_hosts.push_back(host); }; // when pending and active schedules are changed simultaneosly plugin.is_in_sync = true; @@ -263,8 +269,8 @@ BOOST_AUTO_TEST_CASE(test_exceeding_connection_limit) { mock_net_plugin plugin; plugin.setup_test_peers(); plugin.config.my_bp_accounts = { "prodd"_n, "produ"_n }; - plugin.max_client_count = 1; - plugin.connections = { + plugin.connections.max_client_count = 1; + plugin.connections.connections = { { .is_bp_connection = true, .is_open = true, .handshake_received = true }, // 0 { .is_bp_connection = true, .is_open = true, .handshake_received = false }, // 1 { .is_bp_connection = true, .is_open = false, .handshake_received = true }, // 2 @@ -277,12 +283,12 @@ BOOST_AUTO_TEST_CASE(test_exceeding_connection_limit) { BOOST_CHECK_EQUAL(plugin.num_established_clients(), 2); - BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections[0])); - BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections[1])); - BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections[2])); - BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections[3])); - BOOST_CHECK(plugin.exceeding_connection_limit(&plugin.connections[4])); - BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections[5])); - BOOST_CHECK(plugin.exceeding_connection_limit(&plugin.connections[6])); - BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections[7])); + BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections.connections[0])); + BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections.connections[1])); + BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections.connections[2])); + BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections.connections[3])); + BOOST_CHECK(plugin.exceeding_connection_limit(&plugin.connections.connections[4])); + BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections.connections[5])); + BOOST_CHECK(plugin.exceeding_connection_limit(&plugin.connections.connections[6])); + BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections.connections[7])); } \ No newline at end of file