Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Listen on multiple addresses for net_plugin p2p. #1411

Merged
merged 13 commits into from
Jul 25, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 74 additions & 34 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,21 @@

using namespace eosio::chain::plugin_interface;

namespace boost
{
/// @brief Overload for boost::lexical_cast to convert vector of strings to string
///
/// Used by boost::program_options to print the default value of an std::vector<std::string> option
///
/// @param v the vector to convert
/// @return the contents of the vector as a comma-separated string
template<>
inline std::string lexical_cast<std::string>(const std::vector<std::string>& v)
{
return boost::join(v, ",");
}
}

namespace eosio {
static auto _net_plugin = application::register_plugin<net_plugin>();

Expand Down Expand Up @@ -395,8 +410,8 @@ namespace eosio {
* Thread safe, only updated in plugin initialize
* @{
*/
string p2p_address;
string p2p_server_address;
vector<string> p2p_addresses;
vector<string> p2p_server_addresses;

vector<chain::public_key_type> allowed_peers; ///< peer keys allowed to connect
std::map<chain::public_key_type,
Expand Down Expand Up @@ -518,7 +533,7 @@ namespace eosio {
bool in_sync() const;
fc::logger& get_logger() { return logger; }

void create_session(tcp::socket&& socket);
void create_session(tcp::socket&& socket, const string& p2p_address);
};

// peer_[x]log must be called from thread in connection strand
Expand Down Expand Up @@ -751,7 +766,10 @@ namespace eosio {
enum class connection_state { connecting, connected, closing, closed };

explicit connection( const string& endpoint );
explicit connection( tcp::socket&& socket );
/// @brief ctor
/// @param socket created by boost::asio in fc::listener
/// @param address identifier of listen socket which accepted this new connection
explicit connection( tcp::socket&& socket, const string& address );
~connection() = default;

connection( const connection& ) = delete;
Expand Down Expand Up @@ -810,6 +828,7 @@ namespace eosio {

queued_buffer buffer_queue;

string p2p_address; // address string used in handshake
fc::sha256 conn_node_id;
string short_conn_node_id;
string log_p2p_address;
Expand Down Expand Up @@ -1004,7 +1023,7 @@ namespace eosio {
return mvo;
}

bool incoming() const { return peer_address().empty(); } // thread safe becuase of peer_address
bool incoming() const { return peer_address().empty(); } // thread safe because of peer_address
bool incoming_and_handshake_received() const {
if (!incoming()) return false;
fc::lock_guard g_conn( conn_mtx );
Expand Down Expand Up @@ -1141,10 +1160,11 @@ namespace eosio {
fc_ilog( logger, "created connection ${c} to ${n}", ("c", connection_id)("n", endpoint) );
}

connection::connection(tcp::socket&& s)
connection::connection(tcp::socket&& s, const string& address)
: peer_addr(),
strand( my_impl->thread_pool.get_executor() ),
socket( new tcp::socket( std::move(s) ) ),
p2p_address( address),
connection_id( ++my_impl->current_connection_id ),
response_expected_timer( my_impl->thread_pool.get_executor() ),
last_handshake_recv(),
Expand Down Expand Up @@ -2655,7 +2675,7 @@ namespace eosio {
}


void net_plugin_impl::create_session(tcp::socket&& socket) {
void net_plugin_impl::create_session(tcp::socket&& socket, const string& p2p_address) {
uint32_t visitors = 0;
uint32_t from_addr = 0;
boost::system::error_code rec;
Expand All @@ -2681,7 +2701,7 @@ namespace eosio {
visitors < connections.get_max_client_count())) {
fc_ilog(logger, "Accepted new connection: " + paddr_str);

connection_ptr new_connection = std::make_shared<connection>(std::move(socket));
connection_ptr new_connection = std::make_shared<connection>(std::move(socket), p2p_address);
new_connection->strand.post([new_connection, this]() {
if (new_connection->start_session()) {
connections.add(new_connection);
Expand Down Expand Up @@ -3144,9 +3164,9 @@ namespace eosio {
if (msg.time + c_time <= check_time)
return false;
} else if (net_version < proto_dup_node_id_goaway || msg.network_version < proto_dup_node_id_goaway) {
if (my_impl->p2p_address < msg.p2p_address) {
fc_dlog( logger, "my_impl->p2p_address '${lhs}' < msg.p2p_address '${rhs}'",
("lhs", my_impl->p2p_address)( "rhs", msg.p2p_address ) );
if (p2p_address < msg.p2p_address) {
fc_dlog( logger, "p2p_address '${lhs}' < msg.p2p_address '${rhs}'",
("lhs", p2p_address)( "rhs", msg.p2p_address ) );
// only the connection from lower p2p_address to higher p2p_address will be considered as a duplicate,
// so there is no chance for both connections to be closed
return false;
Expand Down Expand Up @@ -3828,7 +3848,7 @@ namespace eosio {
// If we couldn't sign, don't send a token.
if(hello.sig == chain::signature_type())
hello.token = sha256();
hello.p2p_address = my_impl->p2p_address;
hello.p2p_address = p2p_address;
if( is_transactions_only_connection() ) hello.p2p_address += ":trx";
// if we are not accepting transactions tell peer we are blocks only
if( is_blocks_only_connection() || !my_impl->p2p_accept_transactions ) hello.p2p_address += ":blk";
Expand Down Expand Up @@ -3860,8 +3880,8 @@ namespace eosio {
void net_plugin::set_program_options( options_description& /*cli*/, options_description& cfg )
{
cfg.add_options()
( "p2p-listen-endpoint", bpo::value<string>()->default_value( "0.0.0.0:9876" ), "The actual host:port used to listen for incoming p2p connections.")
( "p2p-server-address", bpo::value<string>(), "An externally accessible host:port for identifying this node. Defaults to p2p-listen-endpoint.")
( "p2p-listen-endpoint", bpo::value< vector<string> >()->default_value( vector<string>(1, string("0.0.0.0:9876")) ), "The actual host:port used to listen for incoming p2p connections. May be specified multiple times.")
( "p2p-server-address", bpo::value< vector<string> >(), "An externally accessible host:port for identifying this node. Defaults to p2p-listen-endpoint. May be specified as many times as p2p-listen-endpoint")
( "p2p-peer-address", bpo::value< vector<string> >()->composing(),
"The public endpoint of a peer node to connect to. Use multiple p2p-peer-address options as needed to compose a network.\n"
" Syntax: host:port[:<trx>|<blk>]\n"
Expand All @@ -3882,7 +3902,7 @@ namespace eosio {
( "agent-name", bpo::value<string>()->default_value("EOS Test Agent"), "The name supplied to identify this node amongst the peers.")
( "allowed-connection", bpo::value<vector<string>>()->multitoken()->default_value({"any"}, "any"), "Can be 'any' or 'producers' or 'specified' or 'none'. If 'specified', peer-key must be specified at least once. If only 'producers', peer-key is not required. 'producers' and 'specified' may be combined.")
( "peer-key", bpo::value<vector<string>>()->composing()->multitoken(), "Optional public key of peer allowed to connect. May be used multiple times.")
( "peer-private-key", boost::program_options::value<vector<string>>()->composing()->multitoken(),
( "peer-private-key", bpo::value<vector<string>>()->composing()->multitoken(),
"Tuple of [PublicKey, WIF private key] (may specify multiple times)")
( "max-clients", bpo::value<uint32_t>()->default_value(def_max_clients), "Maximum number of clients from which connections are accepted, use 0 for no limit")
( "connection-cleanup-period", bpo::value<int>()->default_value(def_conn_retry_wait), "number of seconds to wait before cleaning up dead connections")
Expand Down Expand Up @@ -3942,16 +3962,31 @@ namespace eosio {
std::chrono::seconds( options.at("connection-cleanup-period").as<int>() ),
options.at("max-clients").as<uint32_t>() );

if( options.count( "p2p-listen-endpoint" ) && options.at("p2p-listen-endpoint").as<string>().length()) {
p2p_address = options.at( "p2p-listen-endpoint" ).as<string>();
EOS_ASSERT( p2p_address.length() <= max_p2p_address_length, chain::plugin_config_exception,
"p2p-listen-endpoint too long, must be less than ${m}", ("m", max_p2p_address_length) );
if( options.count( "p2p-listen-endpoint" ) && !options.at("p2p-listen-endpoint").as<vector<string>>().empty()) {
p2p_addresses = options.at( "p2p-listen-endpoint" ).as<vector<string>>();
auto addr_count = p2p_addresses.size();
std::sort(p2p_addresses.begin(), p2p_addresses.end());
std::unique(p2p_addresses.begin(), p2p_addresses.end());
if( size_t addr_diff = addr_count - p2p_addresses.size(); addr_diff != 0) {
fc_ilog( logger, "Removed ${count} duplicate p2p-listen-endpoint entries", ("count", addr_diff));
}
for(auto& addr : p2p_addresses) {
EOS_ASSERT( addr.length() <= max_p2p_address_length, chain::plugin_config_exception,
"p2p-listen-endpoint ${a} too long, must be less than ${m}",
("a", addr)("m", max_p2p_address_length) );
}
}
if( options.count( "p2p-server-address" ) ) {
p2p_server_address = options.at( "p2p-server-address" ).as<string>();
EOS_ASSERT( p2p_server_address.length() <= max_p2p_address_length, chain::plugin_config_exception,
"p2p_server_address too long, must be less than ${m}", ("m", max_p2p_address_length) );
p2p_server_addresses = options.at( "p2p-server-address" ).as<vector<string>>();
EOS_ASSERT( p2p_server_addresses.size() <= p2p_addresses.size(), chain::plugin_config_exception,
"p2p-server-address may not be specified more times than p2p-listen-endpoint" );
for( auto& addr: p2p_server_addresses ) {
EOS_ASSERT( addr.length() <= max_p2p_address_length, chain::plugin_config_exception,
"p2p-server-address ${a} too long, must be less than ${m}",
("a", addr)("m", max_p2p_address_length) );
}
}
p2p_server_addresses.resize(p2p_addresses.size()); // extend with empty entries as needed

thread_pool_size = options.at( "net-threads" ).as<uint16_t>();
EOS_ASSERT( thread_pool_size > 0, chain::plugin_config_exception,
Expand Down Expand Up @@ -4044,21 +4079,22 @@ namespace eosio {

dispatcher = std::make_unique<dispatch_manager>( my_impl->thread_pool.get_executor() );

if( !p2p_accept_transactions && p2p_address.size() ) {
if( !p2p_accept_transactions && p2p_addresses.size() ) {
fc_ilog( logger, "\n"
"***********************************\n"
"* p2p-accept-transactions = false *\n"
"* Transactions not forwarded *\n"
"***********************************\n" );
}

std::string listen_address = p2p_address;
std::vector<string> listen_addresses = p2p_addresses;

if( !p2p_address.empty() ) {
auto [host, port] = fc::split_host_port(listen_address);
std::transform(p2p_addresses.begin(), p2p_addresses.end(), p2p_server_addresses.begin(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you do a resize above, but an assert that these are the same size would be good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • it looks better to move line 3991 just above that one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added assert.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dimas1185 I'm sorry, the line numbers have moved around too much. I'm not sure what you mean.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jgiszczak you can skip it. assert looks good.
I meant this one:

`p2p_server_addresses.resize(p2p_addresses.size()); // extend with empty entries as needed`

p2p_addresses.begin(), [](const string& p2p_address, const string& p2p_server_address) {
auto [host, port] = fc::split_host_port(p2p_address);

if( !p2p_server_address.empty() ) {
p2p_address = p2p_server_address;
return p2p_server_address;
} else if( host.empty() || host == "0.0.0.0" || host == "[::]") {
boost::system::error_code ec;
auto hostname = host_name( ec );
Expand All @@ -4068,9 +4104,10 @@ namespace eosio {
"Unable to retrieve host_name. ${msg}", ("msg", ec.message()));

}
p2p_address = hostname + ":" + port;
return hostname + ":" + port;
}
}
return p2p_address;
});

{
chain::controller& cc = chain_plug->chain();
Expand All @@ -4094,8 +4131,10 @@ namespace eosio {
incoming_transaction_ack_subscription = app().get_channel<compat::channels::transaction_ack>().subscribe(
[this](auto&& t) { transaction_ack(std::forward<decltype(t)>(t)); });

app().executor().post(priority::highest, [my=shared_from_this(), address = std::move(listen_address)](){
if (address.size()) {
for(auto listen_itr = listen_addresses.begin(), p2p_iter = p2p_addresses.begin();
listen_itr != listen_addresses.end();
++listen_itr, ++p2p_iter) {
app().executor().post(priority::highest, [my=shared_from_this(), address = std::move(*listen_itr), p2p_addr = *p2p_iter](){
try {
const boost::posix_time::milliseconds accept_timeout(100);

Expand All @@ -4104,15 +4143,16 @@ namespace eosio {

fc::create_listener<tcp>(
my->thread_pool.get_executor(), logger, accept_timeout, address, extra_listening_log_info,
[my = my](tcp::socket&& socket) { my->create_session(std::move(socket)); });
[my = my, addr = p2p_addr](tcp::socket&& socket) { my->create_session(std::move(socket), addr); });
} catch (const std::exception& e) {
fc_elog( logger, "net_plugin::plugin_startup failed to listen on ${addr}, ${what}",
("addr", address)("what", e.what()) );
app().quit();
return;
}
}

});
}
app().executor().post(priority::highest, [my=shared_from_this()](){
my->ticker();
my->start_monitors();
my->update_chain_info();
Expand Down