Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Listen on multiple addresses for net_plugin p2p. #1411

Merged
merged 13 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class bp_connection_manager {

fc_dlog(self()->get_logger(), "pending_downstream_neighbors: ${pending_downstream_neighbors}",
("pending_downstream_neighbors", to_string(pending_downstream_neighbors)));
for (auto neighbor : pending_downstream_neighbors) { self()->connections.connect(config.bp_peer_addresses[neighbor]); }
for (auto neighbor : pending_downstream_neighbors) { self()->connections.connect(config.bp_peer_addresses[neighbor], *self()->p2p_addresses.begin() ); }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed please update help and PR description indicating that the first configured server_address is reported in the handshake message to peers. Also guard against p2p_addresses being empty here.


pending_neighbors = std::move(pending_downstream_neighbors);
finder.add_upstream_neighbors(pending_neighbors);
Expand Down
146 changes: 94 additions & 52 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,21 @@

using namespace eosio::chain::plugin_interface;

namespace boost
{
/// @brief Overload for boost::lexical_cast to convert vector of strings to string
///
/// Used by boost::program_options to print the default value of an std::vector<std::string> option
///
/// @param v the vector to convert
/// @return the contents of the vector as a comma-separated string
template<>
inline std::string lexical_cast<std::string>(const std::vector<std::string>& v)
{
return boost::join(v, ",");
}
}

namespace eosio {
static auto _net_plugin = application::register_plugin<net_plugin>();

Expand Down Expand Up @@ -335,7 +350,7 @@ namespace eosio {
private: // must call with held mutex
connection_ptr find_connection_i(const string& host) const;
void add_i(connection_ptr&& c);
void connect_i(const string& peer);
void connect_i(const string& peer, const string& p2p_address);

void connection_monitor(const std::weak_ptr<connection>& from_connection);

Expand All @@ -355,14 +370,14 @@ namespace eosio {

void register_update_p2p_connection_metrics(std::function<void(net_plugin::p2p_connections_metrics)>&& fun);

void connect_supplied_peers();
void connect_supplied_peers(const string& p2p_address);

void start_conn_timer();
void start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr<connection> from_connection);
void stop_conn_timer();

void add(connection_ptr c);
string connect(const string& host);
string connect(const string& host, const string& p2p_address);
string disconnect(const string& host);
void close_all();

Expand Down Expand Up @@ -395,8 +410,8 @@ namespace eosio {
* Thread safe, only updated in plugin initialize
* @{
*/
string p2p_address;
string p2p_server_address;
vector<string> p2p_addresses;
vector<string> p2p_server_addresses;

vector<chain::public_key_type> allowed_peers; ///< peer keys allowed to connect
std::map<chain::public_key_type,
Expand Down Expand Up @@ -518,7 +533,7 @@ namespace eosio {
bool in_sync() const;
fc::logger& get_logger() { return logger; }

void create_session(tcp::socket&& socket);
void create_session(tcp::socket&& socket, const string p2p_address);
};

// peer_[x]log must be called from thread in connection strand
Expand Down Expand Up @@ -750,8 +765,11 @@ namespace eosio {
public:
enum class connection_state { connecting, connected, closing, closed };

explicit connection( const string& endpoint );
explicit connection( tcp::socket&& socket );
explicit connection( const string& endpoint, const string& address );
/// @brief ctor
/// @param socket created by boost::asio in fc::listener
/// @param address identifier of listen socket which accepted this new connection
explicit connection( tcp::socket&& socket, const string& address );
~connection() = default;

connection( const connection& ) = delete;
Expand Down Expand Up @@ -790,6 +808,7 @@ namespace eosio {

std::atomic<connection_state> conn_state{connection_state::connecting};

string p2p_address; // address string used in handshake
const string peer_addr;
enum connection_types : char {
both,
Expand Down Expand Up @@ -1004,7 +1023,7 @@ namespace eosio {
return mvo;
}

bool incoming() const { return peer_address().empty(); } // thread safe becuase of peer_address
bool incoming() const { return peer_address().empty(); } // thread safe because of peer_address
bool incoming_and_handshake_received() const {
if (!incoming()) return false;
fc::lock_guard g_conn( conn_mtx );
Expand Down Expand Up @@ -1127,8 +1146,9 @@ namespace eosio {

//---------------------------------------------------------------------------

connection::connection( const string& endpoint )
: peer_addr( endpoint ),
connection::connection( const string& endpoint, const string& address )
: p2p_address( address ),
peer_addr( endpoint ),
strand( my_impl->thread_pool.get_executor() ),
socket( new tcp::socket( my_impl->thread_pool.get_executor() ) ),
log_p2p_address( endpoint ),
Expand All @@ -1141,16 +1161,18 @@ namespace eosio {
fc_ilog( logger, "created connection ${c} to ${n}", ("c", connection_id)("n", endpoint) );
}

connection::connection(tcp::socket&& s)
: peer_addr(),
connection::connection(tcp::socket&& s, const string& address)
: p2p_address( address),
peer_addr(),
strand( my_impl->thread_pool.get_executor() ),
socket( new tcp::socket( std::move(s) ) ),
connection_id( ++my_impl->current_connection_id ),
response_expected_timer( my_impl->thread_pool.get_executor() ),
last_handshake_recv(),
last_handshake_sent()
{
fc_dlog( logger, "new connection object created" );
update_endpoints();
fc_dlog( logger, "new connection object created for peer ${address}:${port} from listener ${addr}", ("address", log_remote_endpoint_ip)("port", log_remote_endpoint_port)("addr", p2p_address) );
}

// called from connection strand
Expand Down Expand Up @@ -1224,7 +1246,6 @@ namespace eosio {
bool connection::start_session() {
verify_strand_in_this_thread( strand, __func__, __LINE__ );

update_endpoints();
boost::asio::ip::tcp::no_delay nodelay( true );
boost::system::error_code ec;
socket->set_option( nodelay, ec );
Expand Down Expand Up @@ -2655,7 +2676,7 @@ namespace eosio {
}


void net_plugin_impl::create_session(tcp::socket&& socket) {
void net_plugin_impl::create_session(tcp::socket&& socket, const string p2p_address) {
uint32_t visitors = 0;
uint32_t from_addr = 0;
boost::system::error_code rec;
Expand All @@ -2680,8 +2701,8 @@ namespace eosio {
(auto_bp_peering_enabled() || connections.get_max_client_count() == 0 ||
visitors < connections.get_max_client_count())) {
fc_ilog(logger, "Accepted new connection: " + paddr_str);

connection_ptr new_connection = std::make_shared<connection>(std::move(socket));
fc_dlog(logger, "Instantiating connection with listener address ${addr}", ("addr", p2p_address));
connection_ptr new_connection = std::make_shared<connection>(std::move(socket), p2p_address);
new_connection->strand.post([new_connection, this]() {
if (new_connection->start_session()) {
connections.add(new_connection);
Expand Down Expand Up @@ -3144,9 +3165,9 @@ namespace eosio {
if (msg.time + c_time <= check_time)
return false;
} else if (net_version < proto_dup_node_id_goaway || msg.network_version < proto_dup_node_id_goaway) {
if (my_impl->p2p_address < msg.p2p_address) {
fc_dlog( logger, "my_impl->p2p_address '${lhs}' < msg.p2p_address '${rhs}'",
("lhs", my_impl->p2p_address)( "rhs", msg.p2p_address ) );
if (p2p_address < msg.p2p_address) {
fc_dlog( logger, "p2p_address '${lhs}' < msg.p2p_address '${rhs}'",
("lhs", p2p_address)( "rhs", msg.p2p_address ) );
// only the connection from lower p2p_address to higher p2p_address will be considered as a duplicate,
// so there is no chance for both connections to be closed
return false;
Expand Down Expand Up @@ -3828,7 +3849,8 @@ namespace eosio {
// If we couldn't sign, don't send a token.
if(hello.sig == chain::signature_type())
hello.token = sha256();
hello.p2p_address = my_impl->p2p_address;
peer_dlog( this, "populated handshake with address ${addr}", ("addr", p2p_address));
hello.p2p_address = p2p_address;
if( is_transactions_only_connection() ) hello.p2p_address += ":trx";
// if we are not accepting transactions tell peer we are blocks only
if( is_blocks_only_connection() || !my_impl->p2p_accept_transactions ) hello.p2p_address += ":blk";
Expand Down Expand Up @@ -3860,8 +3882,8 @@ namespace eosio {
void net_plugin::set_program_options( options_description& /*cli*/, options_description& cfg )
{
cfg.add_options()
( "p2p-listen-endpoint", bpo::value<string>()->default_value( "0.0.0.0:9876" ), "The actual host:port used to listen for incoming p2p connections.")
( "p2p-server-address", bpo::value<string>(), "An externally accessible host:port for identifying this node. Defaults to p2p-listen-endpoint.")
( "p2p-listen-endpoint", bpo::value< vector<string> >()->default_value( vector<string>(1, string("0.0.0.0:9876")) ), "The actual host:port used to listen for incoming p2p connections. May be specified multiple times.")
( "p2p-server-address", bpo::value< vector<string> >(), "An externally accessible host:port for identifying this node. Defaults to p2p-listen-endpoint. May be specified as many times as p2p-listen-endpoint")
( "p2p-peer-address", bpo::value< vector<string> >()->composing(),
"The public endpoint of a peer node to connect to. Use multiple p2p-peer-address options as needed to compose a network.\n"
" Syntax: host:port[:<trx>|<blk>]\n"
Expand All @@ -3882,7 +3904,7 @@ namespace eosio {
( "agent-name", bpo::value<string>()->default_value("EOS Test Agent"), "The name supplied to identify this node amongst the peers.")
( "allowed-connection", bpo::value<vector<string>>()->multitoken()->default_value({"any"}, "any"), "Can be 'any' or 'producers' or 'specified' or 'none'. If 'specified', peer-key must be specified at least once. If only 'producers', peer-key is not required. 'producers' and 'specified' may be combined.")
( "peer-key", bpo::value<vector<string>>()->composing()->multitoken(), "Optional public key of peer allowed to connect. May be used multiple times.")
( "peer-private-key", boost::program_options::value<vector<string>>()->composing()->multitoken(),
( "peer-private-key", bpo::value<vector<string>>()->composing()->multitoken(),
"Tuple of [PublicKey, WIF private key] (may specify multiple times)")
( "max-clients", bpo::value<uint32_t>()->default_value(def_max_clients), "Maximum number of clients from which connections are accepted, use 0 for no limit")
( "connection-cleanup-period", bpo::value<int>()->default_value(def_conn_retry_wait), "number of seconds to wait before cleaning up dead connections")
Expand Down Expand Up @@ -3942,16 +3964,31 @@ namespace eosio {
std::chrono::seconds( options.at("connection-cleanup-period").as<int>() ),
options.at("max-clients").as<uint32_t>() );

if( options.count( "p2p-listen-endpoint" ) && options.at("p2p-listen-endpoint").as<string>().length()) {
p2p_address = options.at( "p2p-listen-endpoint" ).as<string>();
EOS_ASSERT( p2p_address.length() <= max_p2p_address_length, chain::plugin_config_exception,
"p2p-listen-endpoint too long, must be less than ${m}", ("m", max_p2p_address_length) );
if( options.count( "p2p-listen-endpoint" ) && !options.at("p2p-listen-endpoint").as<vector<string>>().empty() && options.at("p2p-listen-endpoint").as<vector<string>>()[0].length()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thinks && !options.at("p2p-listen-endpoint").as<vector<string>>()[0].empty() is easier to understand.

Copy link
Contributor

@dimas1185 dimas1185 Jul 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove vector checks at all. code below should perfectly handle empty vector. Or you can add a check later if vector is empty - looks more succinct, easier to understand and more optimal since you do not construct vector multiple times

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd like to retain the checks in case the default value is ever removed. I figured it was ok to reconstruct the vector because this is startup code executed only once and it would be very unusual for the vector size to exceed even 10 elements.

Copy link
Contributor

@dimas1185 dimas1185 Jul 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my main point was having concise and easy to read code at first. I agree that performance could be neglected here if it adds readability or simplicity.
So if you remove it or at least split that huge if statement - that will improve readability.
technically I do not see any difference in functionality if you'll leave it like that:

if( options.count( "p2p-listen-endpoint" ) ) {
}

in that case p2p_addresses will be empty anyways and sort /erase won't crash on empty vector. If I missed something you could just split the if like that:

if( options.count( "p2p-listen-endpoint" ) ) {
   auto p2ps = options.at( "p2p-listen-endpoint" ).as<vector<string>>();
   if (!p2ps.empty() && !p2ps.front().empty()) { 
      p2p_addresses = p2ps;
       /*...*/
   }
}

I'm not hard on this, just trying to keep code reasonably simple. net_plugin code is complicated enough already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated for readability.

p2p_addresses = options.at( "p2p-listen-endpoint" ).as<vector<string>>();
auto addr_count = p2p_addresses.size();
std::sort(p2p_addresses.begin(), p2p_addresses.end());
std::unique(p2p_addresses.begin(), p2p_addresses.end());
if( size_t addr_diff = addr_count - p2p_addresses.size(); addr_diff != 0) {
fc_ilog( logger, "Removed ${count} duplicate p2p-listen-endpoint entries", ("count", addr_diff));
}
for(auto& addr : p2p_addresses) {
EOS_ASSERT( addr.length() <= max_p2p_address_length, chain::plugin_config_exception,
"p2p-listen-endpoint ${a} too long, must be less than ${m}",
("a", addr)("m", max_p2p_address_length) );
}
}
if( options.count( "p2p-server-address" ) ) {
p2p_server_address = options.at( "p2p-server-address" ).as<string>();
EOS_ASSERT( p2p_server_address.length() <= max_p2p_address_length, chain::plugin_config_exception,
"p2p_server_address too long, must be less than ${m}", ("m", max_p2p_address_length) );
p2p_server_addresses = options.at( "p2p-server-address" ).as<vector<string>>();
EOS_ASSERT( p2p_server_addresses.size() <= p2p_addresses.size(), chain::plugin_config_exception,
"p2p-server-address may not be specified more times than p2p-listen-endpoint" );
for( auto& addr: p2p_server_addresses ) {
EOS_ASSERT( addr.length() <= max_p2p_address_length, chain::plugin_config_exception,
"p2p-server-address ${a} too long, must be less than ${m}",
("a", addr)("m", max_p2p_address_length) );
}
}
p2p_server_addresses.resize(p2p_addresses.size()); // extend with empty entries as needed

thread_pool_size = options.at( "net-threads" ).as<uint16_t>();
EOS_ASSERT( thread_pool_size > 0, chain::plugin_config_exception,
Expand Down Expand Up @@ -4044,21 +4081,22 @@ namespace eosio {

dispatcher = std::make_unique<dispatch_manager>( my_impl->thread_pool.get_executor() );

if( !p2p_accept_transactions && p2p_address.size() ) {
if( !p2p_accept_transactions && p2p_addresses.size() ) {
fc_ilog( logger, "\n"
"***********************************\n"
"* p2p-accept-transactions = false *\n"
"* Transactions not forwarded *\n"
"***********************************\n" );
}

std::string listen_address = p2p_address;
std::vector<string> listen_addresses = p2p_addresses;

if( !p2p_address.empty() ) {
auto [host, port] = fc::split_host_port(listen_address);
std::transform(p2p_addresses.begin(), p2p_addresses.end(), p2p_server_addresses.begin(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you do a resize above, but an assert that these are the same size would be good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • it looks better to move line 3991 just above that one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added assert.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dimas1185 I'm sorry, the line numbers have moved around too much. I'm not sure what you mean.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jgiszczak you can skip it. assert looks good.
I meant this one:

`p2p_server_addresses.resize(p2p_addresses.size()); // extend with empty entries as needed`

p2p_addresses.begin(), [](const string& p2p_address, const string& p2p_server_address) {
auto [host, port] = fc::split_host_port(p2p_address);

if( !p2p_server_address.empty() ) {
p2p_address = p2p_server_address;
return p2p_server_address;
} else if( host.empty() || host == "0.0.0.0" || host == "[::]") {
boost::system::error_code ec;
auto hostname = host_name( ec );
Expand All @@ -4068,9 +4106,10 @@ namespace eosio {
"Unable to retrieve host_name. ${msg}", ("msg", ec.message()));

}
p2p_address = hostname + ":" + port;
return hostname + ":" + port;
}
}
return p2p_address;
});

{
chain::controller& cc = chain_plug->chain();
Expand All @@ -4094,8 +4133,10 @@ namespace eosio {
incoming_transaction_ack_subscription = app().get_channel<compat::channels::transaction_ack>().subscribe(
[this](auto&& t) { transaction_ack(std::forward<decltype(t)>(t)); });

app().executor().post(priority::highest, [my=shared_from_this(), address = std::move(listen_address)](){
if (address.size()) {
for(auto listen_itr = listen_addresses.begin(), p2p_iter = p2p_addresses.begin();
listen_itr != listen_addresses.end();
++listen_itr, ++p2p_iter) {
app().executor().post(priority::highest, [my=shared_from_this(), address = std::move(*listen_itr), p2p_addr = *p2p_iter](){
try {
const boost::posix_time::milliseconds accept_timeout(100);

Expand All @@ -4104,19 +4145,20 @@ namespace eosio {

fc::create_listener<tcp>(
my->thread_pool.get_executor(), logger, accept_timeout, address, extra_listening_log_info,
[my = my](tcp::socket&& socket) { my->create_session(std::move(socket)); });
[my = my, addr = p2p_addr](tcp::socket&& socket) { my->create_session(std::move(socket), addr); });
} catch (const std::exception& e) {
fc_elog( logger, "net_plugin::plugin_startup failed to listen on ${addr}, ${what}",
("addr", address)("what", e.what()) );
app().quit();
return;
}
}

});
}
app().executor().post(priority::highest, [my=shared_from_this()](){
my->ticker();
my->start_monitors();
my->update_chain_info();
my->connections.connect_supplied_peers();
my->connections.connect_supplied_peers(*my->p2p_addresses.begin()); // attribute every outbound connection to the first listen port
});
}

Expand Down Expand Up @@ -4153,7 +4195,7 @@ namespace eosio {

/// RPC API
string net_plugin::connect( const string& host ) {
return my->connections.connect( host );
return my->connections.connect( host, *my->p2p_addresses.begin() );
}

/// RPC API
Expand Down Expand Up @@ -4227,10 +4269,10 @@ namespace eosio {
update_p2p_connection_metrics = std::move(fun);
}

void connections_manager::connect_supplied_peers() {
void connections_manager::connect_supplied_peers(const string& p2p_address) {
std::lock_guard g(connections_mtx);
for (const auto& peer : supplied_peers) {
connect_i(peer);
connect_i(peer, p2p_address);
}
}

Expand All @@ -4240,12 +4282,12 @@ namespace eosio {
}

// called by API
string connections_manager::connect( const string& host ) {
string connections_manager::connect( const string& host, const string& p2p_address ) {
std::lock_guard g( connections_mtx );
if( find_connection_i( host ) )
return "already connected";

connect_i( host );
connect_i( host, p2p_address );
supplied_peers.insert(host);
return "added connection";
}
Expand Down Expand Up @@ -4302,8 +4344,8 @@ namespace eosio {
}

// call with connections_mtx
void connections_manager::connect_i( const string& host ) {
connection_ptr c = std::make_shared<connection>( host );
void connections_manager::connect_i( const string& host, const string& p2p_address ) {
connection_ptr c = std::make_shared<connection>( host, p2p_address );
fc_dlog( logger, "calling active connector: ${h}", ("h", host) );
if( c->resolve_and_connect() ) {
fc_dlog( logger, "adding new connection to the list: ${host} ${cid}", ("host", host)("cid", c->connection_id) );
Expand Down
Loading