diff --git a/parity/configuration.rs b/parity/configuration.rs index 3f820202108..e6e6e36a17c 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -200,7 +200,10 @@ impl Configuration { net_path.push("network"); ret.config_path = Some(net_path.to_str().unwrap().to_owned()); ret.reserved_nodes = self.init_reserved_nodes(); - ret.reserved_only = self.args.flag_reserved_only; + + if self.args.flag_reserved_only { + ret.non_reserved_mode = ::util::network::NonReservedPeerMode::Deny; + } ret } diff --git a/parity/main.rs b/parity/main.rs index dc5457aa129..a44cdf6d30d 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -230,6 +230,7 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig) logger: logger.clone(), settings: network_settings.clone(), allow_pending_receipt_query: !conf.args.flag_geth, + net_service: service.network(), }); let dependencies = rpc::Dependencies { @@ -315,11 +316,11 @@ fn execute_export(conf: Configuration) { udp_port: None, nat_enabled: false, discovery_enabled: false, - reserved_only: true, boot_nodes: Vec::new(), use_secret: None, ideal_peers: 0, reserved_nodes: Vec::new(), + non_reserved_mode: ::util::network::NonReservedPeerMode::Accept, }; let client_config = conf.client_config(&spec); @@ -387,11 +388,11 @@ fn execute_import(conf: Configuration) { udp_port: None, nat_enabled: false, discovery_enabled: false, - reserved_only: true, boot_nodes: Vec::new(), use_secret: None, ideal_peers: 0, reserved_nodes: Vec::new(), + non_reserved_mode: ::util::network::NonReservedPeerMode::Accept, }; let client_config = conf.client_config(&spec); diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index bf21181a109..251c24d85fd 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -25,6 +25,7 @@ use ethcore::client::Client; use util::RotatingLogger; use ethcore::account_provider::AccountProvider; use util::network_settings::NetworkSettings; +use util::network::NetworkService; #[cfg(feature="rpc")] pub use ethcore_rpc::ConfirmationsQueue; @@ -89,6 +90,7 @@ pub struct Dependencies { pub logger: Arc, pub settings: Arc, pub allow_pending_receipt_query: bool, + pub net_service: Arc>, } fn to_modules(apis: &[Api]) -> BTreeMap { @@ -163,7 +165,7 @@ pub fn setup_rpc(server: T, deps: Arc, apis: ApiSet server.add_delegate(EthcoreClient::new(&deps.client, &deps.miner, deps.logger.clone(), deps.settings.clone()).to_delegate()) }, Api::EthcoreSet => { - server.add_delegate(EthcoreSetClient::new(&deps.miner).to_delegate()) + server.add_delegate(EthcoreSetClient::new(&deps.miner, &deps.net_service).to_delegate()) }, Api::Traces => { server.add_delegate(TracesClient::new(&deps.client, &deps.miner).to_delegate()) diff --git a/rpc/src/v1/impls/ethcore_set.rs b/rpc/src/v1/impls/ethcore_set.rs index cbd9c4309d6..66f1a34aa28 100644 --- a/rpc/src/v1/impls/ethcore_set.rs +++ b/rpc/src/v1/impls/ethcore_set.rs @@ -16,9 +16,11 @@ /// Ethcore-specific rpc interface for operations altering the settings. use util::{U256, Address}; +use util::network::{NetworkService, NonReservedPeerMode}; use std::sync::{Arc, Weak}; use jsonrpc_core::*; use ethcore::miner::MinerService; +use ethcore::service::SyncMessage; use v1::traits::EthcoreSet; use v1::types::{Bytes}; @@ -27,13 +29,15 @@ pub struct EthcoreSetClient where M: MinerService { miner: Weak, + net: Weak>, } impl EthcoreSetClient where M: MinerService { /// Creates new `EthcoreSetClient`. - pub fn new(miner: &Arc) -> Self { + pub fn new(miner: &Arc, net: &Arc>) -> Self { EthcoreSetClient { miner: Arc::downgrade(miner), + net: Arc::downgrade(net), } } } @@ -74,4 +78,32 @@ impl EthcoreSet for EthcoreSetClient where M: MinerService + 'static { to_value(&true) }) } + + fn add_reserved_peer(&self, params: Params) -> Result { + from_params::<(String,)>(params).and_then(|(peer,)| { + match take_weak!(self.net).add_reserved_peer(&peer) { + Ok(()) => to_value(&true), + Err(_) => Err(Error::invalid_params()), + } + }) + } + + fn remove_reserved_peer(&self, params: Params) -> Result { + from_params::<(String,)>(params).and_then(|(peer,)| { + match take_weak!(self.net).remove_reserved_peer(&peer) { + Ok(()) => to_value(&true), + Err(_) => Err(Error::invalid_params()), + } + }) + } + + fn drop_non_reserved_peers(&self, _: Params) -> Result { + take_weak!(self.net).set_non_reserved_mode(NonReservedPeerMode::Deny); + to_value(&true) + } + + fn accept_non_reserved_peers(&self, _: Params) -> Result { + take_weak!(self.net).set_non_reserved_mode(NonReservedPeerMode::Accept); + to_value(&true) + } } diff --git a/rpc/src/v1/tests/mocked/ethcore.rs b/rpc/src/v1/tests/mocked/ethcore.rs index 68c33ecce39..a096543e34f 100644 --- a/rpc/src/v1/tests/mocked/ethcore.rs +++ b/rpc/src/v1/tests/mocked/ethcore.rs @@ -19,11 +19,13 @@ use std::str::FromStr; use jsonrpc_core::IoHandler; use v1::{Ethcore, EthcoreClient, EthcoreSet, EthcoreSetClient}; use ethcore::miner::MinerService; +use ethcore::service::SyncMessage; use v1::tests::helpers::TestMinerService; use ethcore::client::{TestBlockChainClient}; use util::numbers::*; use rustc_serialize::hex::FromHex; use util::log::RotatingLogger; +use util::network::{NetworkConfiguration, NetworkService}; use util::network_settings::NetworkSettings; fn miner_service() -> Arc { @@ -50,21 +52,26 @@ fn settings() -> Arc { }) } +fn network_service() -> Arc> { + Arc::new(NetworkService::new(NetworkConfiguration::new()).unwrap()) +} + fn ethcore_client(client: &Arc, miner: &Arc) -> EthcoreClient { EthcoreClient::new(client, miner, logger(), settings()) } -fn ethcore_set_client(miner: &Arc) -> EthcoreSetClient { - EthcoreSetClient::new(miner) +fn ethcore_set_client(miner: &Arc, net: &Arc>) -> EthcoreSetClient { + EthcoreSetClient::new(miner, net) } #[test] fn rpc_ethcore_extra_data() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_extraData", "params": [], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":"0x01020304","id":1}"#; @@ -79,9 +86,10 @@ fn rpc_ethcore_default_extra_data() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_defaultExtraData", "params": [], "id": 1}"#; let response = format!(r#"{{"jsonrpc":"2.0","result":"0x{}","id":1}}"#, misc::version_data().to_hex()); @@ -93,9 +101,10 @@ fn rpc_ethcore_default_extra_data() { fn rpc_ethcore_gas_floor_target() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_gasFloorTarget", "params": [], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":"0x3039","id":1}"#; @@ -107,9 +116,10 @@ fn rpc_ethcore_gas_floor_target() { fn rpc_ethcore_min_gas_price() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_minGasPrice", "params": [], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":"0x01312d00","id":1}"#; @@ -121,9 +131,10 @@ fn rpc_ethcore_min_gas_price() { fn rpc_ethcore_set_min_gas_price() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setMinGasPrice", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; @@ -136,9 +147,10 @@ fn rpc_ethcore_set_min_gas_price() { fn rpc_ethcore_set_gas_floor_target() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setGasFloorTarget", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; @@ -151,9 +163,10 @@ fn rpc_ethcore_set_gas_floor_target() { fn rpc_ethcore_set_extra_data() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setExtraData", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; @@ -166,9 +179,10 @@ fn rpc_ethcore_set_extra_data() { fn rpc_ethcore_set_author() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setAuthor", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; @@ -181,13 +195,14 @@ fn rpc_ethcore_set_author() { fn rpc_ethcore_dev_logs() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let logger = logger(); logger.append("a".to_owned()); logger.append("b".to_owned()); let ethcore = EthcoreClient::new(&client, &miner, logger.clone(), settings()).to_delegate(); let io = IoHandler::new(); io.add_delegate(ethcore); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_devLogs", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":["b","a"],"id":1}"#; @@ -199,9 +214,10 @@ fn rpc_ethcore_dev_logs() { fn rpc_ethcore_dev_logs_levels() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_devLogsLevels", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":"rpc=trace","id":1}"#; @@ -212,9 +228,10 @@ fn rpc_ethcore_dev_logs_levels() { fn rpc_ethcore_set_transactions_limit() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setTransactionsLimit", "params":[10240240], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; @@ -227,9 +244,10 @@ fn rpc_ethcore_set_transactions_limit() { fn rpc_ethcore_transactions_limit() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_transactionsLimit", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":1024,"id":1}"#; @@ -241,9 +259,10 @@ fn rpc_ethcore_transactions_limit() { fn rpc_ethcore_net_chain() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_netChain", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":"testchain","id":1}"#; @@ -255,9 +274,10 @@ fn rpc_ethcore_net_chain() { fn rpc_ethcore_net_max_peers() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_netMaxPeers", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":25,"id":1}"#; @@ -269,9 +289,10 @@ fn rpc_ethcore_net_max_peers() { fn rpc_ethcore_net_port() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_netPort", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":30303,"id":1}"#; @@ -283,9 +304,10 @@ fn rpc_ethcore_net_port() { fn rpc_ethcore_rpc_settings() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_rpcSettings", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":{"enabled":true,"interface":"all","port":8545},"id":1}"#; @@ -297,9 +319,10 @@ fn rpc_ethcore_rpc_settings() { fn rpc_ethcore_node_name() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_nodeName", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":"mynode","id":1}"#; diff --git a/rpc/src/v1/traits/ethcore_set.rs b/rpc/src/v1/traits/ethcore_set.rs index 332c505b62c..ed4303be17f 100644 --- a/rpc/src/v1/traits/ethcore_set.rs +++ b/rpc/src/v1/traits/ethcore_set.rs @@ -37,6 +37,18 @@ pub trait EthcoreSet: Sized + Send + Sync + 'static { /// Sets the limits for transaction queue. fn set_transactions_limit(&self, _: Params) -> Result; + /// Add a reserved peer. + fn add_reserved_peer(&self, _: Params) -> Result; + + /// Remove a reserved peer. + fn remove_reserved_peer(&self, _: Params) -> Result; + + /// Drop all non-reserved peers. + fn drop_non_reserved_peers(&self, _: Params) -> Result; + + /// Accept non-reserved peers (default behavior) + fn accept_non_reserved_peers(&self, _: Params) -> Result; + /// Should be used to convert object to io delegate. fn to_delegate(self) -> IoDelegate { let mut delegate = IoDelegate::new(Arc::new(self)); @@ -45,6 +57,10 @@ pub trait EthcoreSet: Sized + Send + Sync + 'static { delegate.add_method("ethcore_setExtraData", EthcoreSet::set_extra_data); delegate.add_method("ethcore_setAuthor", EthcoreSet::set_author); delegate.add_method("ethcore_setTransactionsLimit", EthcoreSet::set_transactions_limit); + delegate.add_method("ethcore_addReservedPeer", EthcoreSet::add_reserved_peer); + delegate.add_method("ethcore_removeReservedPeer", EthcoreSet::remove_reserved_peer); + delegate.add_method("ethcore_dropNonReservedPeers", EthcoreSet::drop_non_reserved_peers); + delegate.add_method("ethcore_acceptNonReservedPeers", EthcoreSet::accept_non_reserved_peers); delegate } diff --git a/util/src/network/host.rs b/util/src/network/host.rs index aacfc3fcb33..03ba07544a6 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -14,9 +14,9 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::net::{SocketAddr}; -use std::collections::{HashMap}; -use std::str::{FromStr}; +use std::net::SocketAddr; +use std::collections::{HashMap, HashSet}; +use std::str::FromStr; use std::sync::*; use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; use std::ops::*; @@ -35,7 +35,7 @@ use rlp::*; use network::session::{Session, SessionData}; use error::*; use io::*; -use network::{NetworkProtocolHandler, PROTOCOL_VERSION}; +use network::{NetworkProtocolHandler, NonReservedPeerMode, PROTOCOL_VERSION}; use network::node_table::*; use network::stats::NetworkStats; use network::error::{NetworkError, DisconnectReason}; @@ -65,8 +65,6 @@ pub struct NetworkConfiguration { pub nat_enabled: bool, /// Enable discovery pub discovery_enabled: bool, - /// Pin to reserved nodes only - pub reserved_only: bool, /// List of initial node addresses pub boot_nodes: Vec, /// Use provided node key instead of default @@ -75,6 +73,8 @@ pub struct NetworkConfiguration { pub ideal_peers: u32, /// List of reserved node addresses. pub reserved_nodes: Vec, + /// The non-reserved peer mode. + pub non_reserved_mode: NonReservedPeerMode, } impl Default for NetworkConfiguration { @@ -93,11 +93,11 @@ impl NetworkConfiguration { udp_port: None, nat_enabled: true, discovery_enabled: true, - reserved_only: false, boot_nodes: Vec::new(), use_secret: None, ideal_peers: 25, reserved_nodes: Vec::new(), + non_reserved_mode: NonReservedPeerMode::Accept, } } @@ -191,13 +191,15 @@ pub struct NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'sta sessions: Arc>>, session: Option, session_id: Option, + reserved_peers: &'s HashSet, } impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'static, { /// Create a new network IO access point. Takes references to all the data that can be updated within the IO handler. fn new(io: &'s IoContext>, protocol: ProtocolId, - session: Option, sessions: Arc>>) -> NetworkContext<'s, Message> { + session: Option, sessions: Arc>>, + reserved_peers: &'s HashSet) -> NetworkContext<'s, Message> { let id = session.as_ref().map(|s| s.lock().unwrap().token()); NetworkContext { io: io, @@ -205,6 +207,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone session_id: id, session: session, sessions: sessions, + reserved_peers: reserved_peers, } } @@ -237,7 +240,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone self.io.message(NetworkIoMessage::User(msg)); } - /// Send an IO message + /// Get an IoChannel. pub fn io_channel(&self) -> IoChannel> { self.io.channel() } @@ -335,7 +338,7 @@ pub struct Host where Message: Send + Sync + Clone { timers: RwLock>, timer_counter: RwLock, stats: Arc, - pinned_nodes: Vec, + reserved_nodes: RwLock>, num_sessions: AtomicUsize, stopping: AtomicBool, } @@ -390,28 +393,28 @@ impl Host where Message: Send + Sync + Clone { timers: RwLock::new(HashMap::new()), timer_counter: RwLock::new(USER_TIMER), stats: stats, - pinned_nodes: Vec::new(), + reserved_nodes: RwLock::new(HashSet::new()), num_sessions: AtomicUsize::new(0), stopping: AtomicBool::new(false), }; for n in boot_nodes { - // don't pin boot nodes. - host.add_node(&n, false); + host.add_node(&n); } for n in reserved_nodes { - host.add_node(&n, true); + if let Err(e) = host.add_reserved_node(&n) { + debug!(target: "network", "Error parsing node id: {}: {:?}", n, e); + } } Ok(host) } - pub fn add_node(&mut self, id: &str, pin: bool) { + pub fn add_node(&mut self, id: &str) { match Node::from_str(id) { Err(e) => { debug!(target: "network", "Could not add node {}: {:?}", id, e); }, Ok(n) => { let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() }; - if pin { self.pinned_nodes.push(n.id.clone()) } self.nodes.write().unwrap().add_node(n); if let Some(ref mut discovery) = *self.discovery.lock().unwrap() { @@ -421,6 +424,56 @@ impl Host where Message: Send + Sync + Clone { } } + pub fn add_reserved_node(&self, id: &str) -> Result<(), UtilError> { + let n = try!(Node::from_str(id)); + + let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() }; + self.reserved_nodes.write().unwrap().insert(n.id.clone()); + + if let Some(ref mut discovery) = *self.discovery.lock().unwrap() { + discovery.add_node(entry); + } + + Ok(()) + } + + pub fn set_non_reserved_mode(&self, mode: NonReservedPeerMode, io: &IoContext>) { + let mut info = self.info.write().unwrap(); + + if info.config.non_reserved_mode != mode { + info.config.non_reserved_mode = mode.clone(); + drop(info); + if let NonReservedPeerMode::Deny = mode { + // disconnect all non-reserved peers here. + let reserved: HashSet = self.reserved_nodes.read().unwrap().clone(); + let mut to_kill = Vec::new(); + for e in self.sessions.write().unwrap().iter_mut() { + let mut s = e.lock().unwrap(); + { + let id = s.id(); + if id.is_some() && reserved.contains(id.unwrap()) { + continue; + } + } + + s.disconnect(io, DisconnectReason::ClientQuit); + to_kill.push(s.token()); + } + for p in to_kill { + trace!(target: "network", "Disconnecting on reserved-only mode: {}", p); + self.kill_connection(p, io, false); + } + } + } + } + + pub fn remove_reserved_node(&self, id: &str) -> Result<(), UtilError> { + let n = try!(Node::from_str(id)); + self.reserved_nodes.write().unwrap().remove(&n.id); + + Ok(()) + } + pub fn client_version() -> String { version() } @@ -483,7 +536,7 @@ impl Host where Message: Send + Sync + Clone { // Initialize discovery. let discovery = { let info = self.info.read().unwrap(); - if info.config.discovery_enabled && !info.config.reserved_only { + if info.config.discovery_enabled && info.config.non_reserved_mode == NonReservedPeerMode::Accept { Some(Discovery::new(&info.keys, public_endpoint.address.clone(), public_endpoint, DISCOVERY)) } else { None } }; @@ -540,17 +593,26 @@ impl Host where Message: Send + Sync + Clone { } fn connect_peers(&self, io: &IoContext>) { - if self.info.read().unwrap().capabilities.is_empty() { - return; - } - let ideal_peers = { self.info.read().unwrap().config.ideal_peers }; - let pin = { self.info.read().unwrap().config.reserved_only }; + let (ideal_peers, mut pin) = { + let info = self.info.read().unwrap(); + if info.capabilities.is_empty() { + return; + } + let config = &info.config; + + (config.ideal_peers, config.non_reserved_mode == NonReservedPeerMode::Deny) + }; + let session_count = self.session_count(); - if session_count >= ideal_peers as usize + self.pinned_nodes.len() { + let reserved_nodes = self.reserved_nodes.read().unwrap(); + if session_count >= ideal_peers as usize + reserved_nodes.len() { // check if all pinned nodes are connected. - if self.pinned_nodes.iter().all(|n| self.have_session(n) && self.connecting_to(n)) { + if reserved_nodes.iter().all(|n| self.have_session(n) && self.connecting_to(n)) { return; } + + // if not, only attempt connect to reserved peers + pin = true; } let handshake_count = self.handshake_count(); @@ -562,7 +624,7 @@ impl Host where Message: Send + Sync + Clone { // iterate over all nodes, reserved ones coming first. // if we are pinned to only reserved nodes, ignore all others. - let nodes = self.pinned_nodes.iter().cloned().chain(if !pin { + let nodes = reserved_nodes.iter().cloned().chain(if !pin { self.nodes.read().unwrap().nodes() } else { Vec::new() @@ -696,11 +758,20 @@ impl Host where Message: Send + Sync + Clone { self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst); if !s.info.originated { let session_count = self.session_count(); - let ideal_peers = { self.info.read().unwrap().config.ideal_peers }; - if session_count >= ideal_peers as usize { - s.disconnect(io, DisconnectReason::TooManyPeers); - return; + let reserved_nodes = self.reserved_nodes.read().unwrap(); + let (ideal_peers, reserved_only) = { + let info = self.info.read().unwrap(); + (info.config.ideal_peers, info.config.non_reserved_mode == NonReservedPeerMode::Deny) + }; + + if session_count >= ideal_peers as usize || reserved_only { + // only proceed if the connecting peer is reserved. + if !reserved_nodes.contains(s.id().unwrap()) { + s.disconnect(io, DisconnectReason::TooManyPeers); + return; + } } + // Add it no node table if let Ok(address) = s.remote_addr() { let entry = NodeEntry { id: s.id().unwrap().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } }; @@ -735,14 +806,17 @@ impl Host where Message: Send + Sync + Clone { if kill { self.kill_connection(token, io, true); } + let handlers = self.handlers.read().unwrap(); for p in ready_data { - let h = self.handlers.read().unwrap().get(p).unwrap().clone(); + let h = handlers.get(p).unwrap().clone(); self.stats.inc_sessions(); - h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token); + let reserved = self.reserved_nodes.read().unwrap(); + h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token); } for (p, packet_id, data) in packet_data { - let h = self.handlers.read().unwrap().get(p).unwrap().clone(); - h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token, packet_id, &data[1..]); + let h = handlers.get(p).unwrap().clone(); + let reserved = self.reserved_nodes.read().unwrap(); + h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token, packet_id, &data[1..]); } } @@ -783,7 +857,8 @@ impl Host where Message: Send + Sync + Clone { } for p in to_disconnect { let h = self.handlers.read().unwrap().get(p).unwrap().clone(); - h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone()), &token); + let reserved = self.reserved_nodes.read().unwrap(); + h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone(), &reserved), &token); } if deregister { io.deregister_stream(token).unwrap_or_else(|e| debug!("Error deregistering stream: {:?}", e)); @@ -886,7 +961,10 @@ impl IoHandler> for Host where Messa _ => match self.timers.read().unwrap().get(&token).cloned() { Some(timer) => match self.handlers.read().unwrap().get(timer.protocol).cloned() { None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) }, - Some(h) => { h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone()), timer.token); } + Some(h) => { + let reserved = self.reserved_nodes.read().unwrap(); + h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone(), &reserved), timer.token); + } }, None => { warn!("Unknown timer token: {}", token); } // timer is not registerd through us } @@ -904,7 +982,8 @@ impl IoHandler> for Host where Messa ref versions } => { let h = handler.clone(); - h.initialize(&NetworkContext::new(io, protocol, None, self.sessions.clone())); + let reserved = self.reserved_nodes.read().unwrap(); + h.initialize(&NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved)); self.handlers.write().unwrap().insert(protocol, h); let mut info = self.info.write().unwrap(); for v in versions { @@ -946,8 +1025,9 @@ impl IoHandler> for Host where Messa self.kill_connection(*peer, io, false); }, NetworkIoMessage::User(ref message) => { + let reserved = self.reserved_nodes.read().unwrap(); for (p, h) in self.handlers.read().unwrap().iter() { - h.message(&NetworkContext::new(io, p, None, self.sessions.clone()), &message); + h.message(&NetworkContext::new(io, p, None, self.sessions.clone(), &reserved), &message); } } } diff --git a/util/src/network/mod.rs b/util/src/network/mod.rs index d074e663145..d59ab63b105 100644 --- a/util/src/network/mod.rs +++ b/util/src/network/mod.rs @@ -14,8 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -//! Network and general IO module. -//! +//! Network and general IO module. +//! //! Example usage for craeting a network service and adding an IO handler: //! //! ```rust @@ -112,3 +112,22 @@ pub trait NetworkProtocolHandler: Sync + Send where Message: Send + Syn fn message(&self, _io: &NetworkContext, _message: &Message) {} } +/// Non-reserved peer modes. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum NonReservedPeerMode { + /// Accept them. This is the default. + Accept, + /// Deny them. + Deny, +} + +impl NonReservedPeerMode { + /// Attempt to parse the peer mode from a string. + pub fn parse(s: &str) -> Option { + match s { + "accept" => Some(NonReservedPeerMode::Accept), + "deny" => Some(NonReservedPeerMode::Deny), + _ => None, + } + } +} \ No newline at end of file diff --git a/util/src/network/service.rs b/util/src/network/service.rs index e8db354d44b..353a24bbe8b 100644 --- a/util/src/network/service.rs +++ b/util/src/network/service.rs @@ -18,9 +18,9 @@ use std::sync::*; use error::*; use panics::*; use network::{NetworkProtocolHandler, NetworkConfiguration}; -use network::error::{NetworkError}; +use network::error::NetworkError; use network::host::{Host, NetworkIoMessage, ProtocolId}; -use network::stats::{NetworkStats}; +use network::stats::NetworkStats; use io::*; /// IO Service with networking @@ -111,6 +111,35 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat *host = None; Ok(()) } + + /// Try to add a reserved peer. + pub fn add_reserved_peer(&self, peer: &str) -> Result<(), UtilError> { + let host = self.host.read().unwrap(); + if let Some(ref host) = *host { + host.add_reserved_node(peer) + } else { + Ok(()) + } + } + + /// Try to remove a reserved peer. + pub fn remove_reserved_peer(&self, peer: &str) -> Result<(), UtilError> { + let host = self.host.read().unwrap(); + if let Some(ref host) = *host { + host.remove_reserved_node(peer) + } else { + Ok(()) + } + } + + /// Set the non-reserved peer mode. + pub fn set_non_reserved_mode(&self, mode: ::network::NonReservedPeerMode) { + let host = self.host.read().unwrap(); + if let Some(ref host) = *host { + let io_ctxt = IoContext::new(self.io_service.channel(), 0); + host.set_non_reserved_mode(mode, &io_ctxt); + } + } } impl MayPanic for NetworkService where Message: Send + Sync + Clone + 'static {