Skip to content

Commit

Permalink
Merge pull request #1411 from AntelopeIO/p2p-multiple-listen-addresses
Browse files Browse the repository at this point in the history
Listen on multiple addresses for net_plugin p2p.
  • Loading branch information
jgiszczak authored Jul 25, 2023
2 parents 8068252 + 501056f commit 172eb8d
Show file tree
Hide file tree
Showing 8 changed files with 315 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class bp_connection_manager {

fc_dlog(self()->get_logger(), "pending_downstream_neighbors: ${pending_downstream_neighbors}",
("pending_downstream_neighbors", to_string(pending_downstream_neighbors)));
for (auto neighbor : pending_downstream_neighbors) { self()->connections.connect(config.bp_peer_addresses[neighbor]); }
for (auto neighbor : pending_downstream_neighbors) { self()->connections.connect(config.bp_peer_addresses[neighbor], *self()->p2p_addresses.begin() ); }

pending_neighbors = std::move(pending_downstream_neighbors);
finder.add_upstream_neighbors(pending_neighbors);
Expand Down
13 changes: 9 additions & 4 deletions plugins/net_plugin/include/eosio/net_plugin/net_plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@ namespace eosio {

struct connection_status {
string peer;
bool connecting = false;
bool syncing = false;
bool is_bp_peer = false;
string remote_ip;
string remote_port;
bool connecting = false;
bool syncing = false;
bool is_bp_peer = false;
bool is_socket_open = false;
bool is_blocks_only = false;
bool is_transactions_only = false;
handshake_message last_handshake;
};

Expand Down Expand Up @@ -49,4 +54,4 @@ namespace eosio {

}

FC_REFLECT( eosio::connection_status, (peer)(connecting)(syncing)(is_bp_peer)(last_handshake) )
FC_REFLECT( eosio::connection_status, (peer)(remote_ip)(remote_port)(connecting)(syncing)(is_bp_peer)(is_socket_open)(is_blocks_only)(is_transactions_only)(last_handshake) )
151 changes: 100 additions & 51 deletions plugins/net_plugin/net_plugin.cpp

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions plugins/net_plugin/tests/auto_bp_peering_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ struct mock_connections_manager {
uint32_t max_client_count = 0;
std::vector<mock_connection> connections;

std::function<void(std::string)> connect;
std::function<void(std::string, std::string)> connect;
std::function<void(std::string)> disconnect;

uint32_t get_max_client_count() const { return max_client_count; }
Expand All @@ -36,6 +36,7 @@ struct mock_net_plugin : eosio::auto_bp_peering::bp_connection_manager<mock_net_

bool is_in_sync = false;
mock_connections_manager connections;
std::vector<std::string> p2p_addresses{"0.0.0.0:9876"};

bool in_sync() { return is_in_sync; }

Expand Down Expand Up @@ -165,7 +166,7 @@ BOOST_AUTO_TEST_CASE(test_on_pending_schedule) {

std::vector<std::string> connected_hosts;

plugin.connections.connect = [&connected_hosts](std::string host) { connected_hosts.push_back(host); };
plugin.connections.connect = [&connected_hosts](std::string host, std::string p2p_address) { connected_hosts.push_back(host); };

// make sure nothing happens when it is not in_sync
plugin.is_in_sync = false;
Expand Down Expand Up @@ -209,7 +210,7 @@ BOOST_AUTO_TEST_CASE(test_on_active_schedule1) {
plugin.config.my_bp_accounts = { "prodd"_n, "produ"_n };

plugin.active_neighbors = { "proda"_n, "prodh"_n, "prodn"_n };
plugin.connections.connect = [](std::string host) {};
plugin.connections.connect = [](std::string host, std::string p2p_address) {};

std::vector<std::string> disconnected_hosts;
plugin.connections.disconnect = [&disconnected_hosts](std::string host) { disconnected_hosts.push_back(host); };
Expand Down Expand Up @@ -245,7 +246,7 @@ BOOST_AUTO_TEST_CASE(test_on_active_schedule2) {
plugin.config.my_bp_accounts = { "prodd"_n, "produ"_n };

plugin.active_neighbors = { "proda"_n, "prodh"_n, "prodn"_n };
plugin.connections.connect = [](std::string host) {};
plugin.connections.connect = [](std::string host, std::string p2p_address) {};
std::vector<std::string> disconnected_hosts;
plugin.connections.disconnect = [&disconnected_hosts](std::string host) { disconnected_hosts.push_back(host); };

Expand Down
9 changes: 7 additions & 2 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_streamer_test.py ${CMAKE_CURRENT
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/large-lib-test.py ${CMAKE_CURRENT_BINARY_DIR}/large-lib-test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/http_plugin_test.py ${CMAKE_CURRENT_BINARY_DIR}/http_plugin_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/p2p_high_latency_test.py ${CMAKE_CURRENT_BINARY_DIR}/p2p_high_latency_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/p2p_multiple_listen_test.py ${CMAKE_CURRENT_BINARY_DIR}/p2p_multiple_listen_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/p2p_no_listen_test.py ${CMAKE_CURRENT_BINARY_DIR}/p2p_no_listen_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/compute_transaction_test.py ${CMAKE_CURRENT_BINARY_DIR}/compute_transaction_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/subjective_billing_test.py ${CMAKE_CURRENT_BINARY_DIR}/subjective_billing_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/get_account_test.py ${CMAKE_CURRENT_BINARY_DIR}/get_account_test.py COPYONLY)
Expand Down Expand Up @@ -183,7 +185,10 @@ set_property(TEST nested_container_multi_index_test PROPERTY LABELS nonparalleli
add_test(NAME nodeos_run_check_test COMMAND tests/nodeos_run_test.py -v ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST nodeos_run_check_test PROPERTY LABELS nonparallelizable_tests)


add_test(NAME p2p_multiple_listen_test COMMAND tests/p2p_multiple_listen_test.py -v ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST p2p_multiple_listen_test PROPERTY LABELS nonparallelizable_tests)
add_test(NAME p2p_no_listen_test COMMAND tests/p2p_no_listen_test.py -v ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST p2p_no_listen_test PROPERTY LABELS nonparallelizable_tests)

# needs iproute-tc or iproute2 depending on platform
#add_test(NAME p2p_high_latency_test COMMAND tests/p2p_high_latency_test.py -v WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
Expand Down Expand Up @@ -263,7 +268,7 @@ set_property(TEST nodeos_repeat_transaction_lr_test PROPERTY LABELS long_running
add_test(NAME light_validation_sync_test COMMAND tests/light_validation_sync_test.py -v ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST light_validation_sync_test PROPERTY LABELS nonparallelizable_tests)

add_test(NAME auto_bp_peering_test COMMAND tests/auto_bp_peering_test.py ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
add_test(NAME auto_bp_peering_test COMMAND tests/auto_bp_peering_test.py -v ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST auto_bp_peering_test PROPERTY LABELS long_running_tests)

add_test(NAME gelf_test COMMAND tests/gelf_test.py ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
Expand Down
22 changes: 14 additions & 8 deletions tests/auto_bp_peering_test.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
#!/usr/bin/env python3

import re
import signal
import time
import socket

from TestHarness import Cluster, TestHelper, Utils, WalletMgr, ReturnType
from TestHarness import Cluster, TestHelper, Utils, WalletMgr

###############################################################
# auto_bp_peering_test
Expand Down Expand Up @@ -35,7 +33,7 @@
dumpErrorDetails = args.dump_error_details
keepLogs = args.keep_logs

# Setup cluster and it's wallet manager
# Setup cluster and its wallet manager
walletMgr = WalletMgr(True)
cluster = Cluster(unshared=args.unshared, keepRunning=args.leave_running, keepLogs=args.keep_logs)
cluster.setWalletMgr(walletMgr)
Expand All @@ -47,12 +45,17 @@
for nodeId in range(0, producerNodes):
producer_name = "defproducer" + chr(ord('a') + nodeId)
port = cluster.p2pBasePort + nodeId
hostname = "localhost:" + str(port)
if producer_name == 'defproducerf':
hostname = 'ext-ip0:9999'
elif producer_name == 'defproducerk':
hostname = socket.gethostname() + ':9886'
else:
hostname = "localhost:" + str(port)
peer_names[hostname] = producer_name
auto_bp_peer_args += (" --p2p-auto-bp-peer " + producer_name + "," + hostname)


def neigbors_in_schedule(name, schedule):
def neighbors_in_schedule(name, schedule):
index = schedule.index(name)
result = []
num = len(schedule)
Expand All @@ -71,6 +74,9 @@ def neigbors_in_schedule(name, schedule):
for nodeId in range(0, producerNodes):
specificNodeosArgs[nodeId] = auto_bp_peer_args

specificNodeosArgs[5] = specificNodeosArgs[5] + ' --p2p-server-address ext-ip0:9999'
specificNodeosArgs[10] = specificNodeosArgs[10] + ' --p2p-server-address ""'

TestHelper.printSystemInfo("BEGIN")
cluster.launch(
prodCount=producerCountInEachNode,
Expand Down Expand Up @@ -113,7 +119,7 @@ def neigbors_in_schedule(name, schedule):

peers = peers.sort()
name = "defproducer" + chr(ord('a') + nodeId)
expected_peers = neigbors_in_schedule(name, scheduled_producers)
expected_peers = neighbors_in_schedule(name, scheduled_producers)
if peers != expected_peers:
Utils.Print("ERROR: expect {} has connections to {}, got connections to {}".format(
name, expected_peers, peers))
Expand Down
103 changes: 103 additions & 0 deletions tests/p2p_multiple_listen_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#!/usr/bin/env python3

import signal

from TestHarness import Cluster, TestHelper, Utils, WalletMgr

###############################################################
# p2p_multiple_listen_test
#
# Test nodeos ability to listen on multiple ports for p2p
#
###############################################################

Print=Utils.Print
errorExit=Utils.errorExit

args=TestHelper.parse_args({"-p","-n","-d","--keep-logs"
,"--dump-error-details","-v"
,"--leave-running","--unshared"})
pnodes=args.p
delay=args.d
debug=args.v
total_nodes=5
dumpErrorDetails=args.dump_error_details

Utils.Debug=debug
testSuccessful=False

cluster=Cluster(unshared=args.unshared, keepRunning=args.leave_running, keepLogs=args.keep_logs)
walletMgr=WalletMgr(True)

try:
TestHelper.printSystemInfo("BEGIN")

cluster.setWalletMgr(walletMgr)

Print(f'producing nodes: {pnodes}, delay between nodes launch: {delay} second{"s" if delay != 1 else ""}')

Print("Stand up cluster")
specificArgs = {
'0': '--agent-name node-00 --p2p-listen-endpoint 0.0.0.0:9876 --p2p-listen-endpoint 0.0.0.0:9779 --p2p-server-address ext-ip0:20000 --p2p-server-address ext-ip1:20001 --plugin eosio::net_api_plugin',
'2': '--agent-name node-02 --p2p-peer-address localhost:9779 --plugin eosio::net_api_plugin',
'4': '--agent-name node-04 --p2p-peer-address localhost:9876 --plugin eosio::net_api_plugin',
}
if cluster.launch(pnodes=pnodes, totalNodes=total_nodes, topo='line', delay=delay,
specificExtraNodeosArgs=specificArgs) is False:
errorExit("Failed to stand up eos cluster.")

# Be sure all nodes start out connected (bios node omitted from diagram for brevity)
# node00 node01 node02 node03 node04
# localhost:9876 -> localhost:9877 -> localhost:9878 -> localhost:9879 -> localhost:9880
# localhost:9779 ^ | |
# ^ +---------------------------+ |
# +------------------------------------------------------------------------+
cluster.waitOnClusterSync(blockAdvancing=5)
# Shut down bios node, which is connected to all other nodes in all topologies
cluster.biosNode.kill(signal.SIGTERM)
# Shut down second node, interrupting the default connections between it and nodes 00 and 02
cluster.getNode(1).kill(signal.SIGTERM)
# Shut down the fourth node, interrupting the default connections between it and nodes 02 and 04
cluster.getNode(3).kill(signal.SIGTERM)
# Be sure all remaining nodes continue to sync via the two listen ports on node 00
# node00 node01 node02 node03 node04
# localhost:9876 offline localhost:9878 offline localhost:9880
# localhost:9779 ^ | |
# ^ +---------------------------+ |
# +------------------------------------------------------------------------+
cluster.waitOnClusterSync(blockAdvancing=5)
connections = cluster.nodes[0].processUrllibRequest('net', 'connections')
open_socket_count = 0
for conn in connections['payload']:
if conn['is_socket_open']:
open_socket_count += 1
if conn['last_handshake']['agent'] == 'node-02':
assert conn['last_handshake']['p2p_address'].split()[0] == 'localhost:9878', f"Connected node is listening on '{conn['last_handshake']['p2p_address'].split()[0]}' instead of port 9878"
elif conn['last_handshake']['agent'] == 'node-04':
assert conn['last_handshake']['p2p_address'].split()[0] == 'localhost:9880', f"Connected node is listening on '{conn['last_handshake']['p2p_address'].split()[0]}' instead of port 9880"
assert open_socket_count == 2, 'Node 0 is expected to have only two open sockets'

connections = cluster.nodes[2].processUrllibRequest('net', 'connections')
open_socket_count = 0
for conn in connections['payload']:
if conn['is_socket_open']:
open_socket_count += 1
assert conn['last_handshake']['agent'] == 'node-00', f"Connected node identifed as '{conn['last_handshake']['agent']}' instead of node-00"
assert conn['last_handshake']['p2p_address'].split()[0] == 'ext-ip0:20000', f"Connected node is advertising '{conn['last_handshake']['p2p_address'].split()[0]}' instead of ext-ip0:20000"
assert open_socket_count == 1, 'Node 2 is expected to have only one open socket'

connections = cluster.nodes[4].processUrllibRequest('net', 'connections')
open_socket_count = 0
for conn in connections['payload']:
if conn['is_socket_open']:
open_socket_count += 1
assert conn['last_handshake']['agent'] == 'node-00', f"Connected node identifed as '{conn['last_handshake']['agent']}' instead of node-00"
assert conn['last_handshake']['p2p_address'].split()[0] == 'ext-ip1:20001', f"Connected node is advertising '{conn['last_handshake']['p2p_address'].split()[0]} 'instead of ext-ip1:20001"
assert open_socket_count == 1, 'Node 4 is expected to have only one open socket'

testSuccessful=True
finally:
TestHelper.shutdown(cluster, walletMgr, testSuccessful=testSuccessful, dumpErrorDetails=dumpErrorDetails)

exitCode = 0 if testSuccessful else 1
exit(exitCode)
76 changes: 76 additions & 0 deletions tests/p2p_no_listen_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#!/usr/bin/env python3

import errno
import pathlib
import shutil
import signal
import socket
import time

from TestHarness import Node, TestHelper, Utils

###############################################################
# p2p_no_listen_test
#
# Test nodeos disabling p2p
#
###############################################################

Print=Utils.Print
errorExit=Utils.errorExit

args=TestHelper.parse_args({"--keep-logs","-v","--leave-running","--unshared"})
debug=args.v

Utils.Debug=debug
testSuccessful=False

try:
TestHelper.printSystemInfo("BEGIN")

cmd = [
Utils.EosServerPath,
'-e',
'-p',
'eosio',
'--p2p-listen-endpoint',
'',
'--plugin',
'eosio::chain_api_plugin',
'--config-dir',
Utils.ConfigDir,
'--data-dir',
Utils.DataDir,
'--http-server-address',
'localhost:8888'
]
node = Node('localhost', '8888', '00', data_dir=pathlib.Path(Utils.DataDir),
config_dir=pathlib.Path(Utils.ConfigDir), cmd=cmd)

time.sleep(1)
if not node.verifyAlive():
raise RuntimeError
time.sleep(10)
node.waitForBlock(5)

s = socket.socket()
err = s.connect_ex(('localhost',9876))
assert err == errno.ECONNREFUSED, 'Connection to port 9876 must be refused'

testSuccessful=True
finally:
Utils.ShuttingDown=True

if not args.leave_running:
node.kill(signal.SIGTERM)

if not (args.leave_running or args.keep_logs or not testSuccessful):
shutil.rmtree(Utils.DataPath, ignore_errors=True)

if testSuccessful:
Utils.Print("Test succeeded.")
else:
Utils.Print("Test failed.")

exitCode = 0 if testSuccessful else 1
exit(exitCode)

0 comments on commit 172eb8d

Please sign in to comment.