-
Notifications
You must be signed in to change notification settings - Fork 73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Listen on multiple addresses for net_plugin p2p. #1411
Changes from 4 commits
3a7c072
d5fdd64
1c81166
db93c63
4d71d2a
3c10138
a4f9399
1bbb36a
374f8a3
2f55f15
4a4875e
31f3889
501056f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,6 +42,21 @@ | |
|
||
using namespace eosio::chain::plugin_interface; | ||
|
||
namespace boost | ||
{ | ||
/// @brief Overload for boost::lexical_cast to convert vector of strings to string | ||
/// | ||
/// Used by boost::program_options to print the default value of an std::vector<std::string> option | ||
/// | ||
/// @param v the vector to convert | ||
/// @return the contents of the vector as a comma-separated string | ||
template<> | ||
inline std::string lexical_cast<std::string>(const std::vector<std::string>& v) | ||
{ | ||
return boost::join(v, ","); | ||
} | ||
} | ||
|
||
namespace eosio { | ||
static auto _net_plugin = application::register_plugin<net_plugin>(); | ||
|
||
|
@@ -335,7 +350,7 @@ namespace eosio { | |
private: // must call with held mutex | ||
connection_ptr find_connection_i(const string& host) const; | ||
void add_i(connection_ptr&& c); | ||
void connect_i(const string& peer); | ||
void connect_i(const string& peer, const string& p2p_address); | ||
|
||
void connection_monitor(const std::weak_ptr<connection>& from_connection); | ||
|
||
|
@@ -355,14 +370,14 @@ namespace eosio { | |
|
||
void register_update_p2p_connection_metrics(std::function<void(net_plugin::p2p_connections_metrics)>&& fun); | ||
|
||
void connect_supplied_peers(); | ||
void connect_supplied_peers(const string& p2p_address); | ||
|
||
void start_conn_timer(); | ||
void start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr<connection> from_connection); | ||
void stop_conn_timer(); | ||
|
||
void add(connection_ptr c); | ||
string connect(const string& host); | ||
string connect(const string& host, const string& p2p_address); | ||
string disconnect(const string& host); | ||
void close_all(); | ||
|
||
|
@@ -395,8 +410,8 @@ namespace eosio { | |
* Thread safe, only updated in plugin initialize | ||
* @{ | ||
*/ | ||
string p2p_address; | ||
string p2p_server_address; | ||
vector<string> p2p_addresses; | ||
vector<string> p2p_server_addresses; | ||
|
||
vector<chain::public_key_type> allowed_peers; ///< peer keys allowed to connect | ||
std::map<chain::public_key_type, | ||
|
@@ -518,7 +533,7 @@ namespace eosio { | |
bool in_sync() const; | ||
fc::logger& get_logger() { return logger; } | ||
|
||
void create_session(tcp::socket&& socket); | ||
void create_session(tcp::socket&& socket, const string p2p_address); | ||
}; | ||
|
||
// peer_[x]log must be called from thread in connection strand | ||
|
@@ -750,8 +765,11 @@ namespace eosio { | |
public: | ||
enum class connection_state { connecting, connected, closing, closed }; | ||
|
||
explicit connection( const string& endpoint ); | ||
explicit connection( tcp::socket&& socket ); | ||
explicit connection( const string& endpoint, const string& address ); | ||
/// @brief ctor | ||
/// @param socket created by boost::asio in fc::listener | ||
/// @param address identifier of listen socket which accepted this new connection | ||
explicit connection( tcp::socket&& socket, const string& address ); | ||
heifner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
~connection() = default; | ||
|
||
connection( const connection& ) = delete; | ||
|
@@ -790,6 +808,7 @@ namespace eosio { | |
|
||
std::atomic<connection_state> conn_state{connection_state::connecting}; | ||
|
||
string p2p_address; // address string used in handshake | ||
const string peer_addr; | ||
enum connection_types : char { | ||
both, | ||
|
@@ -1004,7 +1023,7 @@ namespace eosio { | |
return mvo; | ||
} | ||
|
||
bool incoming() const { return peer_address().empty(); } // thread safe becuase of peer_address | ||
bool incoming() const { return peer_address().empty(); } // thread safe because of peer_address | ||
bool incoming_and_handshake_received() const { | ||
if (!incoming()) return false; | ||
fc::lock_guard g_conn( conn_mtx ); | ||
|
@@ -1127,8 +1146,9 @@ namespace eosio { | |
|
||
//--------------------------------------------------------------------------- | ||
|
||
connection::connection( const string& endpoint ) | ||
: peer_addr( endpoint ), | ||
connection::connection( const string& endpoint, const string& address ) | ||
: p2p_address( address ), | ||
peer_addr( endpoint ), | ||
strand( my_impl->thread_pool.get_executor() ), | ||
socket( new tcp::socket( my_impl->thread_pool.get_executor() ) ), | ||
log_p2p_address( endpoint ), | ||
|
@@ -1141,16 +1161,18 @@ namespace eosio { | |
fc_ilog( logger, "created connection ${c} to ${n}", ("c", connection_id)("n", endpoint) ); | ||
} | ||
|
||
connection::connection(tcp::socket&& s) | ||
: peer_addr(), | ||
connection::connection(tcp::socket&& s, const string& address) | ||
: p2p_address( address), | ||
peer_addr(), | ||
strand( my_impl->thread_pool.get_executor() ), | ||
socket( new tcp::socket( std::move(s) ) ), | ||
connection_id( ++my_impl->current_connection_id ), | ||
response_expected_timer( my_impl->thread_pool.get_executor() ), | ||
last_handshake_recv(), | ||
last_handshake_sent() | ||
{ | ||
fc_dlog( logger, "new connection object created" ); | ||
update_endpoints(); | ||
fc_dlog( logger, "new connection object created for peer ${address}:${port} from listener ${addr}", ("address", log_remote_endpoint_ip)("port", log_remote_endpoint_port)("addr", p2p_address) ); | ||
} | ||
|
||
// called from connection strand | ||
|
@@ -1224,7 +1246,6 @@ namespace eosio { | |
bool connection::start_session() { | ||
verify_strand_in_this_thread( strand, __func__, __LINE__ ); | ||
|
||
update_endpoints(); | ||
boost::asio::ip::tcp::no_delay nodelay( true ); | ||
boost::system::error_code ec; | ||
socket->set_option( nodelay, ec ); | ||
|
@@ -2655,7 +2676,7 @@ namespace eosio { | |
} | ||
|
||
|
||
void net_plugin_impl::create_session(tcp::socket&& socket) { | ||
void net_plugin_impl::create_session(tcp::socket&& socket, const string p2p_address) { | ||
uint32_t visitors = 0; | ||
uint32_t from_addr = 0; | ||
boost::system::error_code rec; | ||
|
@@ -2680,8 +2701,8 @@ namespace eosio { | |
(auto_bp_peering_enabled() || connections.get_max_client_count() == 0 || | ||
visitors < connections.get_max_client_count())) { | ||
fc_ilog(logger, "Accepted new connection: " + paddr_str); | ||
|
||
connection_ptr new_connection = std::make_shared<connection>(std::move(socket)); | ||
fc_dlog(logger, "Instantiating connection with listener address ${addr}", ("addr", p2p_address)); | ||
connection_ptr new_connection = std::make_shared<connection>(std::move(socket), p2p_address); | ||
new_connection->strand.post([new_connection, this]() { | ||
if (new_connection->start_session()) { | ||
connections.add(new_connection); | ||
|
@@ -3144,9 +3165,9 @@ namespace eosio { | |
if (msg.time + c_time <= check_time) | ||
return false; | ||
} else if (net_version < proto_dup_node_id_goaway || msg.network_version < proto_dup_node_id_goaway) { | ||
if (my_impl->p2p_address < msg.p2p_address) { | ||
fc_dlog( logger, "my_impl->p2p_address '${lhs}' < msg.p2p_address '${rhs}'", | ||
("lhs", my_impl->p2p_address)( "rhs", msg.p2p_address ) ); | ||
if (p2p_address < msg.p2p_address) { | ||
fc_dlog( logger, "p2p_address '${lhs}' < msg.p2p_address '${rhs}'", | ||
("lhs", p2p_address)( "rhs", msg.p2p_address ) ); | ||
// only the connection from lower p2p_address to higher p2p_address will be considered as a duplicate, | ||
// so there is no chance for both connections to be closed | ||
return false; | ||
|
@@ -3828,7 +3849,8 @@ namespace eosio { | |
// If we couldn't sign, don't send a token. | ||
if(hello.sig == chain::signature_type()) | ||
hello.token = sha256(); | ||
hello.p2p_address = my_impl->p2p_address; | ||
peer_dlog( this, "populated handshake with address ${addr}", ("addr", p2p_address)); | ||
hello.p2p_address = p2p_address; | ||
if( is_transactions_only_connection() ) hello.p2p_address += ":trx"; | ||
// if we are not accepting transactions tell peer we are blocks only | ||
if( is_blocks_only_connection() || !my_impl->p2p_accept_transactions ) hello.p2p_address += ":blk"; | ||
|
@@ -3860,8 +3882,8 @@ namespace eosio { | |
void net_plugin::set_program_options( options_description& /*cli*/, options_description& cfg ) | ||
{ | ||
cfg.add_options() | ||
( "p2p-listen-endpoint", bpo::value<string>()->default_value( "0.0.0.0:9876" ), "The actual host:port used to listen for incoming p2p connections.") | ||
( "p2p-server-address", bpo::value<string>(), "An externally accessible host:port for identifying this node. Defaults to p2p-listen-endpoint.") | ||
( "p2p-listen-endpoint", bpo::value< vector<string> >()->default_value( vector<string>(1, string("0.0.0.0:9876")) ), "The actual host:port used to listen for incoming p2p connections. May be specified multiple times.") | ||
( "p2p-server-address", bpo::value< vector<string> >(), "An externally accessible host:port for identifying this node. Defaults to p2p-listen-endpoint. May be specified as many times as p2p-listen-endpoint") | ||
( "p2p-peer-address", bpo::value< vector<string> >()->composing(), | ||
"The public endpoint of a peer node to connect to. Use multiple p2p-peer-address options as needed to compose a network.\n" | ||
" Syntax: host:port[:<trx>|<blk>]\n" | ||
|
@@ -3882,7 +3904,7 @@ namespace eosio { | |
( "agent-name", bpo::value<string>()->default_value("EOS Test Agent"), "The name supplied to identify this node amongst the peers.") | ||
( "allowed-connection", bpo::value<vector<string>>()->multitoken()->default_value({"any"}, "any"), "Can be 'any' or 'producers' or 'specified' or 'none'. If 'specified', peer-key must be specified at least once. If only 'producers', peer-key is not required. 'producers' and 'specified' may be combined.") | ||
( "peer-key", bpo::value<vector<string>>()->composing()->multitoken(), "Optional public key of peer allowed to connect. May be used multiple times.") | ||
( "peer-private-key", boost::program_options::value<vector<string>>()->composing()->multitoken(), | ||
( "peer-private-key", bpo::value<vector<string>>()->composing()->multitoken(), | ||
"Tuple of [PublicKey, WIF private key] (may specify multiple times)") | ||
( "max-clients", bpo::value<uint32_t>()->default_value(def_max_clients), "Maximum number of clients from which connections are accepted, use 0 for no limit") | ||
( "connection-cleanup-period", bpo::value<int>()->default_value(def_conn_retry_wait), "number of seconds to wait before cleaning up dead connections") | ||
|
@@ -3942,16 +3964,31 @@ namespace eosio { | |
std::chrono::seconds( options.at("connection-cleanup-period").as<int>() ), | ||
options.at("max-clients").as<uint32_t>() ); | ||
|
||
if( options.count( "p2p-listen-endpoint" ) && options.at("p2p-listen-endpoint").as<string>().length()) { | ||
p2p_address = options.at( "p2p-listen-endpoint" ).as<string>(); | ||
EOS_ASSERT( p2p_address.length() <= max_p2p_address_length, chain::plugin_config_exception, | ||
"p2p-listen-endpoint too long, must be less than ${m}", ("m", max_p2p_address_length) ); | ||
if( options.count( "p2p-listen-endpoint" ) && !options.at("p2p-listen-endpoint").as<vector<string>>().empty() && options.at("p2p-listen-endpoint").as<vector<string>>()[0].length()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thinks There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can remove vector checks at all. code below should perfectly handle empty vector. Or you can add a check later if vector is empty - looks more succinct, easier to understand and more optimal since you do not construct vector multiple times There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We'd like to retain the checks in case the default value is ever removed. I figured it was ok to reconstruct the vector because this is startup code executed only once and it would be very unusual for the vector size to exceed even 10 elements. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. my main point was having concise and easy to read code at first. I agree that performance could be neglected here if it adds readability or simplicity. if( options.count( "p2p-listen-endpoint" ) ) {
} in that case if( options.count( "p2p-listen-endpoint" ) ) {
auto p2ps = options.at( "p2p-listen-endpoint" ).as<vector<string>>();
if (!p2ps.empty() && !p2ps.front().empty()) {
p2p_addresses = p2ps;
/*...*/
}
} I'm not hard on this, just trying to keep code reasonably simple. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated for readability. |
||
p2p_addresses = options.at( "p2p-listen-endpoint" ).as<vector<string>>(); | ||
auto addr_count = p2p_addresses.size(); | ||
std::sort(p2p_addresses.begin(), p2p_addresses.end()); | ||
std::unique(p2p_addresses.begin(), p2p_addresses.end()); | ||
heifner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if( size_t addr_diff = addr_count - p2p_addresses.size(); addr_diff != 0) { | ||
fc_ilog( logger, "Removed ${count} duplicate p2p-listen-endpoint entries", ("count", addr_diff)); | ||
heifner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
for(auto& addr : p2p_addresses) { | ||
heifner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
EOS_ASSERT( addr.length() <= max_p2p_address_length, chain::plugin_config_exception, | ||
"p2p-listen-endpoint ${a} too long, must be less than ${m}", | ||
("a", addr)("m", max_p2p_address_length) ); | ||
} | ||
} | ||
if( options.count( "p2p-server-address" ) ) { | ||
p2p_server_address = options.at( "p2p-server-address" ).as<string>(); | ||
EOS_ASSERT( p2p_server_address.length() <= max_p2p_address_length, chain::plugin_config_exception, | ||
"p2p_server_address too long, must be less than ${m}", ("m", max_p2p_address_length) ); | ||
p2p_server_addresses = options.at( "p2p-server-address" ).as<vector<string>>(); | ||
EOS_ASSERT( p2p_server_addresses.size() <= p2p_addresses.size(), chain::plugin_config_exception, | ||
"p2p-server-address may not be specified more times than p2p-listen-endpoint" ); | ||
for( auto& addr: p2p_server_addresses ) { | ||
EOS_ASSERT( addr.length() <= max_p2p_address_length, chain::plugin_config_exception, | ||
"p2p-server-address ${a} too long, must be less than ${m}", | ||
("a", addr)("m", max_p2p_address_length) ); | ||
} | ||
} | ||
p2p_server_addresses.resize(p2p_addresses.size()); // extend with empty entries as needed | ||
|
||
thread_pool_size = options.at( "net-threads" ).as<uint16_t>(); | ||
EOS_ASSERT( thread_pool_size > 0, chain::plugin_config_exception, | ||
|
@@ -4044,21 +4081,22 @@ namespace eosio { | |
|
||
dispatcher = std::make_unique<dispatch_manager>( my_impl->thread_pool.get_executor() ); | ||
|
||
if( !p2p_accept_transactions && p2p_address.size() ) { | ||
if( !p2p_accept_transactions && p2p_addresses.size() ) { | ||
fc_ilog( logger, "\n" | ||
"***********************************\n" | ||
"* p2p-accept-transactions = false *\n" | ||
"* Transactions not forwarded *\n" | ||
"***********************************\n" ); | ||
} | ||
|
||
std::string listen_address = p2p_address; | ||
std::vector<string> listen_addresses = p2p_addresses; | ||
|
||
if( !p2p_address.empty() ) { | ||
auto [host, port] = fc::split_host_port(listen_address); | ||
std::transform(p2p_addresses.begin(), p2p_addresses.end(), p2p_server_addresses.begin(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know you do a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added assert. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dimas1185 I'm sorry, the line numbers have moved around too much. I'm not sure what you mean. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jgiszczak you can skip it. assert looks good. `p2p_server_addresses.resize(p2p_addresses.size()); // extend with empty entries as needed` |
||
p2p_addresses.begin(), [](const string& p2p_address, const string& p2p_server_address) { | ||
auto [host, port] = fc::split_host_port(p2p_address); | ||
|
||
if( !p2p_server_address.empty() ) { | ||
p2p_address = p2p_server_address; | ||
return p2p_server_address; | ||
} else if( host.empty() || host == "0.0.0.0" || host == "[::]") { | ||
boost::system::error_code ec; | ||
auto hostname = host_name( ec ); | ||
|
@@ -4068,9 +4106,10 @@ namespace eosio { | |
"Unable to retrieve host_name. ${msg}", ("msg", ec.message())); | ||
|
||
} | ||
p2p_address = hostname + ":" + port; | ||
return hostname + ":" + port; | ||
} | ||
} | ||
return p2p_address; | ||
}); | ||
heifner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
{ | ||
chain::controller& cc = chain_plug->chain(); | ||
|
@@ -4094,8 +4133,10 @@ namespace eosio { | |
incoming_transaction_ack_subscription = app().get_channel<compat::channels::transaction_ack>().subscribe( | ||
[this](auto&& t) { transaction_ack(std::forward<decltype(t)>(t)); }); | ||
|
||
app().executor().post(priority::highest, [my=shared_from_this(), address = std::move(listen_address)](){ | ||
if (address.size()) { | ||
for(auto listen_itr = listen_addresses.begin(), p2p_iter = p2p_addresses.begin(); | ||
listen_itr != listen_addresses.end(); | ||
++listen_itr, ++p2p_iter) { | ||
app().executor().post(priority::highest, [my=shared_from_this(), address = std::move(*listen_itr), p2p_addr = *p2p_iter](){ | ||
try { | ||
const boost::posix_time::milliseconds accept_timeout(100); | ||
|
||
|
@@ -4104,19 +4145,20 @@ namespace eosio { | |
|
||
fc::create_listener<tcp>( | ||
my->thread_pool.get_executor(), logger, accept_timeout, address, extra_listening_log_info, | ||
[my = my](tcp::socket&& socket) { my->create_session(std::move(socket)); }); | ||
[my = my, addr = p2p_addr](tcp::socket&& socket) { my->create_session(std::move(socket), addr); }); | ||
} catch (const std::exception& e) { | ||
fc_elog( logger, "net_plugin::plugin_startup failed to listen on ${addr}, ${what}", | ||
("addr", address)("what", e.what()) ); | ||
app().quit(); | ||
return; | ||
} | ||
} | ||
|
||
}); | ||
} | ||
app().executor().post(priority::highest, [my=shared_from_this()](){ | ||
my->ticker(); | ||
my->start_monitors(); | ||
my->update_chain_info(); | ||
my->connections.connect_supplied_peers(); | ||
my->connections.connect_supplied_peers(*my->p2p_addresses.begin()); // attribute every outbound connection to the first listen port | ||
}); | ||
} | ||
|
||
|
@@ -4153,7 +4195,7 @@ namespace eosio { | |
|
||
/// RPC API | ||
string net_plugin::connect( const string& host ) { | ||
return my->connections.connect( host ); | ||
return my->connections.connect( host, *my->p2p_addresses.begin() ); | ||
} | ||
|
||
/// RPC API | ||
|
@@ -4227,10 +4269,10 @@ namespace eosio { | |
update_p2p_connection_metrics = std::move(fun); | ||
} | ||
|
||
void connections_manager::connect_supplied_peers() { | ||
void connections_manager::connect_supplied_peers(const string& p2p_address) { | ||
std::lock_guard g(connections_mtx); | ||
for (const auto& peer : supplied_peers) { | ||
connect_i(peer); | ||
connect_i(peer, p2p_address); | ||
} | ||
} | ||
|
||
|
@@ -4240,12 +4282,12 @@ namespace eosio { | |
} | ||
|
||
// called by API | ||
string connections_manager::connect( const string& host ) { | ||
string connections_manager::connect( const string& host, const string& p2p_address ) { | ||
std::lock_guard g( connections_mtx ); | ||
if( find_connection_i( host ) ) | ||
return "already connected"; | ||
|
||
connect_i( host ); | ||
connect_i( host, p2p_address ); | ||
supplied_peers.insert(host); | ||
return "added connection"; | ||
} | ||
|
@@ -4302,8 +4344,8 @@ namespace eosio { | |
} | ||
|
||
// call with connections_mtx | ||
void connections_manager::connect_i( const string& host ) { | ||
connection_ptr c = std::make_shared<connection>( host ); | ||
void connections_manager::connect_i( const string& host, const string& p2p_address ) { | ||
connection_ptr c = std::make_shared<connection>( host, p2p_address ); | ||
fc_dlog( logger, "calling active connector: ${h}", ("h", host) ); | ||
if( c->resolve_and_connect() ) { | ||
fc_dlog( logger, "adding new connection to the list: ${host} ${cid}", ("host", host)("cid", c->connection_id) ); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed please update help and PR description indicating that the first configured server_address is reported in the handshake message to peers. Also guard against
p2p_addresses
being empty here.