diff --git a/Cargo.lock b/Cargo.lock index ecf3900df1..2ffaca6a22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3825,6 +3825,21 @@ dependencies = [ "syn 2.0.85", ] +[[package]] +name = "nimiq-dht" +version = "1.0.0-rc.2" +dependencies = [ + "nimiq-blockchain-interface", + "nimiq-blockchain-proxy", + "nimiq-keys", + "nimiq-log", + "nimiq-network-libp2p", + "nimiq-serde", + "nimiq-utils", + "nimiq-validator-network", + "tracing", +] + [[package]] name = "nimiq-fuzz" version = "1.0.0-rc.2" @@ -4060,6 +4075,7 @@ dependencies = [ "nimiq-bls", "nimiq-consensus", "nimiq-database", + "nimiq-dht", "nimiq-genesis", "nimiq-hash", "nimiq-jsonrpc-core", @@ -4284,8 +4300,8 @@ dependencies = [ "instant", "ip_network", "libp2p", - "nimiq-bls", "nimiq-hash", + "nimiq-keys", "nimiq-macros", "nimiq-network-interface", "nimiq-serde", @@ -4862,8 +4878,9 @@ version = "1.0.0-rc.2" dependencies = [ "async-trait", "futures-util", - "nimiq-bls", + "nimiq-keys", "nimiq-network-interface", + "nimiq-primitives", "nimiq-serde", "nimiq-utils", "parking_lot", diff --git a/Cargo.toml b/Cargo.toml index c9b8fe357e..340e9cf9a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "database", "database/database-value", "database/database-value-derive", + "dht", "fuzz", "genesis", "genesis-builder", @@ -189,6 +190,7 @@ nimiq-consensus = { path = "consensus", default-features = false } nimiq-database = { path = "database", default-features = false } nimiq-database-value = { path = "database/database-value", default-features = false } nimiq-database-value-derive = { path = "database/database-value-derive", default-features = false } +nimiq-dht = { path = "dht", default-features = false } nimiq-genesis = { path = "genesis", default-features = false } nimiq-genesis-builder = { path = "genesis-builder", default-features = false } nimiq-handel = { path = "handel", default-features = false } diff --git a/dht/Cargo.toml b/dht/Cargo.toml new file mode 100644 index 0000000000..a65f4321a3 --- /dev/null +++ b/dht/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "nimiq-dht" +version.workspace = true +authors.workspace = true +edition.workspace = true +description = "Nimiq Dht verifier implementation." +homepage.workspace = true +repository.workspace = true +license.workspace = true +categories.workspace = true +keywords.workspace = true + +[badges] +travis-ci = { repository = "nimiq/core-rs", branch = "master" } +is-it-maintained-issue-resolution = { repository = "nimiq/core-rs" } +is-it-maintained-open-issues = { repository = "nimiq/core-rs" } +maintenance = { status = "experimental" } + +[lints] +workspace = true + +[dependencies] +log = { workspace = true } + +nimiq-blockchain-interface = { workspace = true } +nimiq-blockchain-proxy = { workspace = true } +nimiq-keys = { workspace = true } +nimiq-log = { workspace = true, optional = true } +nimiq-network-libp2p = { workspace = true } +nimiq-serde = { workspace = true } +nimiq-utils = { workspace = true } +nimiq-validator-network = { workspace = true } + diff --git a/dht/src/lib.rs b/dht/src/lib.rs new file mode 100644 index 0000000000..c16ff9d80f --- /dev/null +++ b/dht/src/lib.rs @@ -0,0 +1,85 @@ +use nimiq_blockchain_proxy::BlockchainProxy; +use nimiq_keys::{Address, KeyPair}; +use nimiq_network_libp2p::{ + dht::{DhtRecord, DhtVerifierError, Verifier as DhtVerifier}, + libp2p::kad::Record, + PeerId, +}; +use nimiq_serde::Deserialize; +use nimiq_utils::tagged_signing::{TaggedSignable, TaggedSigned}; +use nimiq_validator_network::validator_record::ValidatorRecord; + +pub struct Verifier { + blockchain: BlockchainProxy, +} + +impl Verifier { + pub fn new(blockchain: BlockchainProxy) -> Self { + Self { blockchain } + } + + fn verify_validator_record(&self, record: &Record) -> Result { + // Deserialize the value of the record, which is a ValidatorRecord. If it fails return an error. + let validator_record = + TaggedSigned::, KeyPair>::deserialize_from_vec(&record.value) + .map_err(DhtVerifierError::MalformedValue)?; + + // Deserialize the key of the record which is an Address. If it fails return an error. + let validator_address = Address::deserialize_from_vec(record.key.as_ref()) + .map_err(DhtVerifierError::MalformedKey)?; + + // Acquire blockchain read access. For now exclude Light clients. + let blockchain = match self.blockchain { + BlockchainProxy::Light(ref _light_blockchain) => { + return Err(DhtVerifierError::UnknownTag) + } + BlockchainProxy::Full(ref full_blockchain) => full_blockchain, + }; + let blockchain_read = blockchain.read(); + + // Get the staking contract to retrieve the public key for verification. + let staking_contract = blockchain_read + .get_staking_contract_if_complete(None) + .ok_or(DhtVerifierError::StateIncomplete)?; + + // Get the public key needed for verification. + let data_store = blockchain_read.get_staking_contract_store(); + let txn = blockchain_read.read_transaction(); + let public_key = staking_contract + .get_validator(&data_store.read(&txn), &validator_address) + .ok_or(DhtVerifierError::UnknownValidator(validator_address))? + .signing_key; + + // Verify the record. + validator_record + .verify(&public_key) + .then(|| { + DhtRecord::Validator( + record.publisher.unwrap(), + validator_record.record, + record.clone(), + ) + }) + .ok_or(DhtVerifierError::InvalidSignature) + } +} + +impl DhtVerifier for Verifier { + fn verify(&self, record: &Record) -> Result { + // Peek the tag to know what kind of record this is. + let Some(tag) = TaggedSigned::, KeyPair>::peek_tag(&record.value) + else { + log::warn!(?record, "DHT Tag not peekable."); + return Err(DhtVerifierError::MalformedTag); + }; + + // Depending on tag perform the verification. + match tag { + ValidatorRecord::::TAG => self.verify_validator_record(record), + _ => { + log::error!(tag, "DHT invalid record tag received"); + Err(DhtVerifierError::UnknownTag) + } + } + } +} diff --git a/lib/Cargo.toml b/lib/Cargo.toml index f48d49e2bd..b0b19ea103 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -57,6 +57,7 @@ nimiq-blockchain-proxy = { workspace = true, default-features = false } nimiq-bls = { workspace = true } nimiq-consensus = { workspace = true, default-features = false } nimiq-database = { workspace = true, optional = true } +nimiq-dht = { workspace = true, optional = true } nimiq-genesis = { workspace = true, default-features = false } nimiq-hash = { workspace = true } nimiq-jsonrpc-core = { workspace = true, optional = true } @@ -94,6 +95,8 @@ full-consensus = [ "database-storage", "nimiq-blockchain", "nimiq-consensus/full", + "nimiq-dht", + "nimiq-network-libp2p/kad", ] launcher = [] logging = ["nimiq-log", "serde_json", "tokio", "tracing-subscriber"] diff --git a/lib/src/client.rs b/lib/src/client.rs index 79332ce4ed..e39ec45c21 100644 --- a/lib/src/client.rs +++ b/lib/src/client.rs @@ -13,6 +13,8 @@ use nimiq_consensus::{ sync::syncer_proxy::SyncerProxy, Consensus as AbstractConsensus, ConsensusProxy as AbstractConsensusProxy, }; +#[cfg(feature = "full-consensus")] +use nimiq_dht::Verifier; #[cfg(feature = "zkp-prover")] use nimiq_genesis::NetworkId; use nimiq_genesis::NetworkInfo; @@ -345,11 +347,6 @@ impl ClientInner { "Advertised addresses", ); - let network = Arc::new(Network::new(network_config).await); - - // Start buffering network events as early as possible - let network_events = network.subscribe_events(); - // We update the services flags depending on the pre-genesis database file being present #[cfg(feature = "database-storage")] let pre_genesis_environment = if config.storage.has_pre_genesis_database(config.network_id) @@ -388,7 +385,7 @@ impl ClientInner { #[cfg(not(feature = "database-storage"))] let zkp_storage = None; - let (blockchain_proxy, syncer_proxy, zkp_component) = match config.consensus.sync_mode { + let blockchain_proxy = match config.consensus.sync_mode { #[cfg(not(feature = "full-consensus"))] SyncMode::History => { panic!("Can't build a history node without the full-consensus feature enabled") @@ -398,8 +395,8 @@ impl ClientInner { panic!("Can't build a full node without the full-consensus feature enabled") } #[cfg(feature = "full-consensus")] - SyncMode::History => { - blockchain_config.keep_history = true; + SyncMode::History | SyncMode::Full => { + blockchain_config.keep_history = config.consensus.sync_mode == SyncMode::History; blockchain_config.index_history = config.consensus.index_history; let blockchain = match Blockchain::new_merged( environment.clone(), @@ -413,8 +410,41 @@ impl ClientInner { return Err(Error::Consensus(BlockchainError(err))); } }; + BlockchainProxy::from(&blockchain) + } + SyncMode::Light => BlockchainProxy::from(&Arc::new(RwLock::new(LightBlockchain::new( + config.network_id, + )))), + }; + + // Create the Dht verifier + #[cfg(feature = "full-consensus")] + let dht_verifier = Verifier::new(blockchain_proxy.clone()); + + // Create the network. + let network = Arc::new( + Network::new( + network_config, + #[cfg(feature = "full-consensus")] + dht_verifier, + ) + .await, + ); + + // Start buffering network events as early as possible + let network_events = network.subscribe_events(); - let blockchain_proxy = BlockchainProxy::from(&blockchain); + let (syncer_proxy, zkp_component) = match config.consensus.sync_mode { + #[cfg(not(feature = "full-consensus"))] + SyncMode::History => { + panic!("Can't build a history node without the full-consensus feature enabled") + } + #[cfg(not(feature = "full-consensus"))] + SyncMode::Full => { + panic!("Can't build a full node without the full-consensus feature enabled") + } + #[cfg(feature = "full-consensus")] + SyncMode::History => { #[cfg(feature = "zkp-prover")] let zkp_component = if let Some(zk_prover_config) = config.zk_prover { ZKPComponent::with_prover( @@ -441,27 +471,10 @@ impl ClientInner { network_events, ) .await; - (blockchain_proxy, syncer, zkp_component) + (syncer, zkp_component) } #[cfg(feature = "full-consensus")] SyncMode::Full => { - blockchain_config.keep_history = false; - blockchain_config.index_history = config.consensus.index_history; - - let blockchain = match Blockchain::new_merged( - environment.clone(), - pre_genesis_environment, - blockchain_config, - config.network_id, - time, - ) { - Ok(blockchain) => Arc::new(RwLock::new(blockchain)), - Err(err) => { - return Err(Error::Consensus(BlockchainError(err))); - } - }; - - let blockchain_proxy = BlockchainProxy::from(&blockchain); #[cfg(feature = "zkp-prover")] let zkp_component = if let Some(zk_prover_config) = config.zk_prover { ZKPComponent::with_prover( @@ -491,11 +504,9 @@ impl ClientInner { config.consensus.full_sync_threshold, ) .await; - (blockchain_proxy, syncer, zkp_component) + (syncer, zkp_component) } SyncMode::Light => { - let blockchain = Arc::new(RwLock::new(LightBlockchain::new(config.network_id))); - let blockchain_proxy = BlockchainProxy::from(&blockchain); let zkp_component = ZKPComponent::new(blockchain_proxy.clone(), Arc::clone(&network), zkp_storage) .await; @@ -507,7 +518,7 @@ impl ClientInner { network_events, ) .await; - (blockchain_proxy, syncer, zkp_component) + (syncer, zkp_component) } }; diff --git a/log/src/lib.rs b/log/src/lib.rs index 586af6d74b..e731889331 100644 --- a/log/src/lib.rs +++ b/log/src/lib.rs @@ -25,6 +25,7 @@ pub static NIMIQ_MODULES: &[&str] = &[ "nimiq_collections", "nimiq_consensus", "nimiq_database", + "nimiq_dht", "nimiq_genesis", "nimiq_genesis_builder", "nimiq_handel", diff --git a/network-libp2p/Cargo.toml b/network-libp2p/Cargo.toml index dc3ebdf2d8..7e2a2bbc6a 100644 --- a/network-libp2p/Cargo.toml +++ b/network-libp2p/Cargo.toml @@ -42,7 +42,7 @@ tokio-stream = "0.1" unsigned-varint = "0.8" void = "1.0" -nimiq-bls = { workspace = true } +nimiq-keys = { workspace = true } nimiq-macros = { workspace = true } nimiq-network-interface = { workspace = true } nimiq-hash = { workspace = true } @@ -92,5 +92,6 @@ nimiq-test-log = { workspace = true } nimiq-test-utils = { workspace = true } [features] +kad = [] metrics = ["prometheus-client"] tokio-websocket = ["libp2p/dns", "libp2p/tcp", "libp2p/tokio", "libp2p/websocket"] diff --git a/network-libp2p/src/behaviour.rs b/network-libp2p/src/behaviour.rs index f69c620fc8..15e01a31ac 100644 --- a/network-libp2p/src/behaviour.rs +++ b/network-libp2p/src/behaviour.rs @@ -37,6 +37,7 @@ pub struct Behaviour { pub discovery: discovery::Behaviour, pub autonat_server: autonat::server::Behaviour, pub autonat_client: autonat::client::Behaviour, + #[cfg(feature = "kad")] pub dht: kad::Behaviour, pub gossipsub: gossipsub::Behaviour, pub ping: ping::Behaviour, @@ -55,7 +56,9 @@ impl Behaviour { // DHT behaviour let store = MemoryStore::new(peer_id); + #[cfg(feature = "kad")] let mut dht = kad::Behaviour::with_config(peer_id, store, config.kademlia); + #[cfg(feature = "kad")] if force_dht_server_mode { dht.set_mode(Some(kad::Mode::Server)); } @@ -125,6 +128,7 @@ impl Behaviour { let connection_limits = connection_limits::Behaviour::new(limits); Self { + #[cfg(feature = "kad")] dht, discovery, gossipsub, @@ -140,17 +144,20 @@ impl Behaviour { /// Adds a peer address into the DHT pub fn add_peer_address(&mut self, peer_id: PeerId, address: Multiaddr) { // Add address to the DHT + #[cfg(feature = "kad")] self.dht.add_address(&peer_id, address); } /// Removes a peer from the DHT pub fn remove_peer(&mut self, peer_id: PeerId) { + #[cfg(feature = "kad")] self.dht.remove_peer(&peer_id); } /// Removes a peer address from the DHT pub fn remove_peer_address(&mut self, peer_id: PeerId, address: Multiaddr) { // Remove address from the DHT + #[cfg(feature = "kad")] self.dht.remove_address(&peer_id, &address); } diff --git a/network-libp2p/src/dht.rs b/network-libp2p/src/dht.rs new file mode 100644 index 0000000000..5ad307bc87 --- /dev/null +++ b/network-libp2p/src/dht.rs @@ -0,0 +1,32 @@ +use libp2p::{kad::Record, PeerId}; +use nimiq_keys::Address; +use nimiq_serde::DeserializeError; +use nimiq_validator_network::validator_record::ValidatorRecord; + +pub use crate::network_types::DhtRecord; + +#[derive(Debug)] +pub enum DhtVerifierError { + MalformedTag, + MalformedKey(DeserializeError), + MalformedValue(DeserializeError), + UnknownTag, + UnknownValidator(Address), + StateIncomplete, + InvalidSignature, +} + +pub trait Verifier: Send + Sync { + fn verify(&self, record: &Record) -> Result; +} + +/// Dummy implementation for testcases +impl Verifier for () { + fn verify(&self, record: &Record) -> Result { + Ok(DhtRecord::Validator( + PeerId::random(), + ValidatorRecord::::new(PeerId::random(), 0u64), + record.clone(), + )) + } +} diff --git a/network-libp2p/src/lib.rs b/network-libp2p/src/lib.rs index 6782864674..21bb6d35dc 100644 --- a/network-libp2p/src/lib.rs +++ b/network-libp2p/src/lib.rs @@ -5,6 +5,7 @@ mod autonat; mod behaviour; mod config; mod connection_pool; +pub mod dht; pub mod discovery; pub mod dispatch; mod error; diff --git a/network-libp2p/src/network.rs b/network-libp2p/src/network.rs index 486c03ea0d..e60e8d471f 100644 --- a/network-libp2p/src/network.rs +++ b/network-libp2p/src/network.rs @@ -37,6 +37,7 @@ use tokio_stream::wrappers::{BroadcastStream, ReceiverStream}; #[cfg(feature = "metrics")] use crate::network_metrics::NetworkMetrics; use crate::{ + dht, discovery::peer_contacts::PeerContactBook, network_types::{GossipsubId, NetworkAction, ValidateMessage}, rate_limiting::RateLimitConfig, @@ -74,8 +75,12 @@ impl Network { /// # Arguments /// /// - `config`: The network configuration, containing key pair, and other behavior-specific configuration. + /// - `dht_verifier`: The verifier used to verify all Dht records. /// - pub async fn new(config: Config) -> Self { + pub async fn new( + config: Config, + #[cfg(feature = "kad")] dht_verifier: impl dht::Verifier + 'static, + ) -> Self { let required_services = config.required_services; // TODO: persist to disk let own_peer_contact = config.peer_contact.clone(); @@ -122,6 +127,8 @@ impl Network { Arc::clone(&connected_peers), update_scores, Arc::clone(&contacts), + #[cfg(feature = "kad")] + dht_verifier, force_dht_server_mode, dht_quorum, #[cfg(feature = "metrics")] diff --git a/network-libp2p/src/network_types.rs b/network-libp2p/src/network_types.rs index d0bc80833c..0579b1c8bf 100644 --- a/network-libp2p/src/network_types.rs +++ b/network-libp2p/src/network_types.rs @@ -10,7 +10,7 @@ use libp2p::{ swarm::NetworkInfo, Multiaddr, PeerId, }; -use nimiq_bls::KeyPair; +use nimiq_keys::KeyPair; use nimiq_network_interface::{ network::{CloseReason, MsgAcceptance, PubsubId, Topic}, peer_info::Services, @@ -140,7 +140,7 @@ pub(crate) enum DhtBootStrapState { /// Enum over all of the possible DHT records values #[derive(Clone, PartialEq)] -pub(crate) enum DhtRecord { +pub enum DhtRecord { /// Validator record with its publisher Peer ID, /// the decoded validator record and the original serialized record. Validator(PeerId, ValidatorRecord, Record), @@ -174,7 +174,7 @@ impl PartialOrd for DhtRecord { /// DHT record decoding errors #[derive(Debug, Error)] -pub(crate) enum DhtRecordError { +pub enum DhtRecordError { /// Tag is unknown #[error("Unknown record tag")] UnknownTag, diff --git a/network-libp2p/src/swarm.rs b/network-libp2p/src/swarm.rs index c1d1959ebd..2d58c925d7 100644 --- a/network-libp2p/src/swarm.rs +++ b/network-libp2p/src/swarm.rs @@ -30,16 +30,13 @@ use libp2p::{ #[cfg(feature = "tokio-websocket")] use libp2p::{dns, tcp, websocket}; use log::Instrument; -use nimiq_bls::{CompressedPublicKey, KeyPair}; use nimiq_network_interface::{ network::{CloseReason, NetworkEvent}, peer_info::PeerInfo, request::{peek_type, InboundRequestError, OutboundRequestError, RequestError}, }; -use nimiq_serde::{Deserialize, Serialize}; +use nimiq_serde::Serialize; use nimiq_time::Interval; -use nimiq_utils::tagged_signing::{TaggedSignable, TaggedSigned}; -use nimiq_validator_network::validator_record::ValidatorRecord; use parking_lot::RwLock; use tokio::sync::{broadcast, mpsc}; @@ -47,7 +44,7 @@ use tokio::sync::{broadcast, mpsc}; use crate::network_metrics::NetworkMetrics; use crate::{ autonat::NatStatus, - behaviour, + behaviour, dht, discovery::{self, peer_contacts::PeerContactBook}, network_types::{ DhtBootStrapState, DhtRecord, DhtResults, GossipsubTopicInfo, NetworkAction, TaskState, @@ -65,6 +62,8 @@ struct EventInfo<'a> { state: &'a mut TaskState, connected_peers: &'a RwLock>, rate_limiting: &'a mut RateLimits, + #[cfg(feature = "kad")] + dht_verifier: &'a dyn dht::Verifier, #[cfg(feature = "metrics")] metrics: &'a Arc, } @@ -115,6 +114,7 @@ pub(crate) async fn swarm_task( connected_peers: Arc>>, mut update_scores: Interval, contacts: Arc>, + #[cfg(feature = "kad")] dht_verifier: impl dht::Verifier, force_dht_server_mode: bool, dht_quorum: NonZeroU8, #[cfg(feature = "metrics")] metrics: Arc, @@ -161,6 +161,8 @@ pub(crate) async fn swarm_task( state: &mut task_state, connected_peers: &connected_peers, rate_limiting: &mut rate_limiting, + #[cfg(feature = "kad")] + dht_verifier: &dht_verifier, #[cfg( feature = "metrics")] metrics: &metrics, }, ); @@ -330,6 +332,7 @@ fn handle_event(event: SwarmEvent, event_info: EventI .add_peer_address(peer_id, listen_addr.clone()); // Bootstrap Kademlia if we're performing our first connection + #[cfg(feature = "kad")] if event_info.state.dht_bootstrap_state == DhtBootStrapState::NotStarted { debug!("Bootstrapping DHT"); if event_info.swarm.behaviour_mut().dht.bootstrap().is_err() { @@ -445,6 +448,7 @@ fn handle_behaviour_event(event: behaviour::BehaviourEvent, event_info: EventInf } behaviour::BehaviourEvent::ConnectionLimits(event) => match event {}, behaviour::BehaviourEvent::Pool(event) => match event {}, + #[cfg(feature = "kad")] behaviour::BehaviourEvent::Dht(event) => handle_dht_event(event, event_info), behaviour::BehaviourEvent::Discovery(event) => handle_discovery_event(event, event_info), behaviour::BehaviourEvent::Gossipsub(event) => handle_gossipsup_event(event, event_info), @@ -473,6 +477,7 @@ fn handle_autonat_server_event(event: autonat::v2::server::Event, _event_info: E log::trace!(?event, "AutoNAT inbound probe"); } +#[cfg(feature = "kad")] fn handle_dht_event(event: kad::Event, event_info: EventInfo) { match event { kad::Event::OutboundQueryProgressed { @@ -511,6 +516,7 @@ fn handle_dht_event(event: kad::Event, event_info: EventInfo) { } } +#[cfg(feature = "kad")] fn handle_dht_get( id: QueryId, result: Result, @@ -520,9 +526,13 @@ fn handle_dht_get( ) { match result { Ok(GetRecordOk::FoundRecord(record)) => { - let Some(dht_record) = verify_record(&record.record) else { - warn!("DHT record verification failed: Invalid public key received"); - return; + // Verify incoming record + let dht_record = match event_info.dht_verifier.verify(&record.record) { + Ok(record) => record, + Err(error) => { + warn!(?error, "DHT record verification failed"); + return; + } }; if step.count.get() == 1_usize { @@ -614,6 +624,7 @@ fn handle_dht_get( } } +#[cfg(feature = "kad")] fn handle_dht_put_record( id: QueryId, result: Result, @@ -631,6 +642,7 @@ fn handle_dht_put_record( } } +#[cfg(feature = "kad")] fn handle_dht_bootstrap( _id: QueryId, result: Result, @@ -653,6 +665,7 @@ fn handle_dht_bootstrap( } } +#[cfg(feature = "kad")] fn handle_dht_inbound_put( _source: PeerId, _connection: ConnectionId, @@ -660,10 +673,14 @@ fn handle_dht_inbound_put( event_info: EventInfo, ) { // Verify incoming record - let Some(dht_record) = verify_record(&record) else { - warn!("DHT record verification failed: Invalid public key received"); - return; + let dht_record = match event_info.dht_verifier.verify(&record) { + Ok(record) => record, + Err(error) => { + warn!(?error, "DHT record verification failed"); + return; + } }; + // Now verify that we should overwrite it because it's better than the one we have let mut overwrite = true; let store = event_info.swarm.behaviour_mut().dht.store_mut(); @@ -679,6 +696,7 @@ fn handle_dht_inbound_put( } } +#[cfg(feature = "kad")] fn handle_dht_mode_change(new_mode: Mode, event_info: EventInfo) { debug!(%new_mode, "DHT mode changed"); if new_mode == Mode::Server { @@ -730,6 +748,7 @@ fn handle_discovery_event(event: discovery::Event, event_info: EventInfo) { .add_peer_address(peer_id, peer_address); // Bootstrap Kademlia if we're adding our first address + #[cfg(feature = "kad")] if event_info.state.dht_bootstrap_state == DhtBootStrapState::NotStarted { debug!("Bootstrapping DHT"); if event_info.swarm.behaviour_mut().dht.bootstrap().is_err() { @@ -1023,7 +1042,9 @@ fn perform_action(action: NetworkAction, swarm: &mut NimiqSwarm, state: &mut Tas output.send(result).ok(); } NetworkAction::DhtGet { key, output } => { + #[cfg(feature = "kad")] let query_id = swarm.behaviour_mut().dht.get_record(key.into()); + #[cfg(feature = "kad")] state.dht_gets.insert(query_id, output); } NetworkAction::DhtPut { key, value, output } => { @@ -1036,6 +1057,7 @@ fn perform_action(action: NetworkAction, swarm: &mut NimiqSwarm, state: &mut Tas expires: None, // This only affects local storage. Records are replicated with configured TTL. }; + #[cfg(feature = "kad")] match swarm.behaviour_mut().dht.put_record(record, Quorum::One) { Ok(query_id) => { // Remember put operation to resolve when we receive a `QueryResult::PutRecord` @@ -1243,46 +1265,6 @@ fn perform_action(action: NetworkAction, swarm: &mut NimiqSwarm, state: &mut Tas } } -/// Returns a DHT record if the record decoding and verification was successful, None otherwise -pub(crate) fn verify_record(record: &Record) -> Option { - let Some(tag) = TaggedSigned::, KeyPair>::peek_tag(&record.value) - else { - log::warn!(?record, "DHT Tag not peekable."); - return None; - }; - - if tag != ValidatorRecord::::TAG { - log::error!(tag, "DHT invalid record tag received"); - return None; - } - - let Ok(validator_record) = - TaggedSigned::, KeyPair>::deserialize_from_vec(&record.value) - else { - log::warn!(?record.value, "Failed to deserialize dht value"); - return None; - }; - - // In this type of messages we assume the record key is also the public key used to verify these records - let Ok(compressed_pk) = CompressedPublicKey::deserialize_from_vec(record.key.as_ref()) else { - log::warn!(?record.key, "Failed to deserialize dht key"); - return None; - }; - - let Ok(pk) = compressed_pk.uncompress() else { - log::warn!(%compressed_pk, "Failed to uncompress public key"); - return None; - }; - - validator_record.verify(&pk).then(|| { - DhtRecord::Validator( - record.publisher.unwrap(), - validator_record.record, - record.clone(), - ) - }) -} - fn to_response_error(error: OutboundFailure) -> RequestError { match error { OutboundFailure::ConnectionClosed => { diff --git a/network-libp2p/tests/network.rs b/network-libp2p/tests/network.rs index d3dc62f34a..2b2a525304 100644 --- a/network-libp2p/tests/network.rs +++ b/network-libp2p/tests/network.rs @@ -1,4 +1,4 @@ -use std::{num::NonZeroU8, time::Duration}; +use std::{collections::BTreeMap, num::NonZeroU8, sync::Arc, time::Duration}; use futures::{Stream, StreamExt}; use instant::SystemTime; @@ -8,22 +8,28 @@ use libp2p::{ multiaddr::{multiaddr, Multiaddr}, PeerId, }; -use nimiq_bls::KeyPair; +use nimiq_keys::{Address, KeyPair}; use nimiq_network_interface::{ network::{CloseReason, MsgAcceptance, Network as NetworkInterface, NetworkEvent, Topic}, peer_info::Services, }; use nimiq_network_libp2p::{ + dht, discovery::{self, peer_contacts::PeerContact}, Config, Network, }; +use nimiq_serde::{Deserialize, Serialize}; use nimiq_test_log::test; use nimiq_test_utils::test_rng::test_rng; use nimiq_time::{sleep, timeout}; -use nimiq_utils::{key_rng::SecureGenerate, spawn, tagged_signing::TaggedSignable}; +use nimiq_utils::{ + key_rng::SecureGenerate, + spawn, + tagged_signing::{TaggedKeyPair, TaggedSignable, TaggedSigned}, +}; use nimiq_validator_network::validator_record::ValidatorRecord; +use parking_lot::RwLock; use rand::{thread_rng, Rng}; -use serde::{Deserialize, Serialize}; mod helper; @@ -94,7 +100,7 @@ impl TestNetwork { let address = multiaddr![Memory(self.next_address)]; self.next_address += 1; - let net = Network::new(network_config(address.clone())).await; + let net = Network::new(network_config(address.clone()), ()).await; net.listen_on(vec![address.clone()]).await; log::debug!(address = %address, peer_id = %net.get_local_peer_id(), "Creating node"); @@ -122,10 +128,10 @@ async fn create_connected_networks() -> (Network, Network) { let addr1 = multiaddr![Memory(rng.gen::())]; let addr2 = multiaddr![Memory(rng.gen::())]; - let net1 = Network::new(network_config(addr1.clone())).await; + let net1 = Network::new(network_config(addr1.clone()), ()).await; net1.listen_on(vec![addr1.clone()]).await; - let net2 = Network::new(network_config(addr2.clone())).await; + let net2 = Network::new(network_config(addr2.clone()), ()).await; net2.listen_on(vec![addr2.clone()]).await; log::debug!(address = %addr1, peer_id = %net1.get_local_peer_id(), "Network 1"); @@ -156,10 +162,10 @@ async fn create_double_connected_networks() -> (Network, Network) { let addr1 = multiaddr![Memory(rng.gen::())]; let addr2 = multiaddr![Memory(rng.gen::())]; - let net1 = Network::new(network_config(addr1.clone())).await; + let net1 = Network::new(network_config(addr1.clone()), ()).await; net1.listen_on(vec![addr1.clone()]).await; - let net2 = Network::new(network_config(addr2.clone())).await; + let net2 = Network::new(network_config(addr2.clone()), ()).await; net2.listen_on(vec![addr2.clone()]).await; log::debug!(address = %addr1, peer_id = %net1.get_local_peer_id(), "Network 1"); @@ -184,12 +190,75 @@ async fn create_double_connected_networks() -> (Network, Network) { (net1, net2) } -async fn create_network_with_n_peers(n_peers: usize) -> Vec { +struct Verifier { + keys: Arc::PublicKey>>>, +} +impl Verifier { + pub fn new( + keys: &Arc::PublicKey>>>, + ) -> Self { + Self { + keys: Arc::clone(&keys), + } + } +} + +impl dht::Verifier for Verifier { + fn verify( + &self, + record: &libp2p::kad::Record, + ) -> Result { + // Peek the tag to know what kind of record this is. + let Some(tag) = TaggedSigned::, KeyPair>::peek_tag(&record.value) + else { + log::warn!(?record, "DHT Tag not peekable."); + return Err(dht::DhtVerifierError::MalformedTag); + }; + + if tag != ValidatorRecord::::TAG { + return Err(dht::DhtVerifierError::UnknownTag); + } + + // Deserialize the value of the record, which is a ValidatorRecord. If it fails return an error. + let validator_record = + TaggedSigned::, KeyPair>::deserialize_from_vec(&record.value) + .map_err(dht::DhtVerifierError::MalformedValue)?; + + // Deserialize the key of the record which is an Address. If it fails return an error. + let validator_address = Address::deserialize_from_vec(record.key.as_ref()) + .map_err(dht::DhtVerifierError::MalformedKey)?; + + let keys = self.keys.read(); + let public_key = keys + .get(&validator_address) + .ok_or(dht::DhtVerifierError::UnknownValidator(validator_address))?; + + validator_record + .verify(&public_key) + .then(|| { + dht::DhtRecord::Validator( + record.publisher.unwrap(), + validator_record.record, + record.clone(), + ) + }) + .ok_or(dht::DhtVerifierError::InvalidSignature) + } +} + +async fn create_network_with_n_peers( + n_peers: usize, +) -> ( + Vec, + Arc::PublicKey>>>, +) { let mut networks = Vec::new(); let mut addresses = Vec::new(); let mut events = Vec::new(); let mut rng = thread_rng(); + let keys = Arc::new(RwLock::new(BTreeMap::default())); + // Create all the networks and addresses for peer in 0..n_peers { let addr = multiaddr![Memory(rng.gen::())]; @@ -198,7 +267,7 @@ async fn create_network_with_n_peers(n_peers: usize) -> Vec { addresses.push(addr.clone()); - let network = Network::new(network_config(addr.clone())).await; + let network = Network::new(network_config(addr.clone()), Verifier::new(&keys)).await; network.listen_on(vec![addr.clone()]).await; log::debug!(address = %addr, peer_id = %network.get_local_peer_id(), "Network {}", peer); @@ -224,7 +293,10 @@ async fn create_network_with_n_peers(n_peers: usize) -> Vec { // Wait for all PeerJoined events let all_joined = futures::stream::select_all(events) - .take(n_peers * (n_peers - 1 + 1/*1 x DHT bootstrapped*/)) + .filter(|(local_peer_id, event)| { + std::future::ready(matches!(event, Ok(NetworkEvent::PeerJoined(..)))) + }) + .take(n_peers * (n_peers - 1)) .for_each(|(local_peer_id, event)| async move { match event { Ok(NetworkEvent::PeerJoined(peer_id, _)) => { @@ -319,13 +391,13 @@ async fn create_network_with_n_peers(n_peers: usize) -> Vec { ); } - networks + (networks, keys) } #[test(tokio::test)] async fn connections_stress_and_reconnect() { let peers: usize = 5; - let networks = create_network_with_n_peers(peers).await; + let (networks, _) = create_network_with_n_peers(peers).await; assert_eq!(peers, networks.len()); } @@ -408,9 +480,10 @@ impl TaggedSignable for TestRecord { } #[test(tokio::test)] +#[cfg(feature = "kad")] async fn dht_put_and_get() { // We have a quorum of 3 for getting DHT records, so we need at least 3 peers - let networks = create_network_with_n_peers(3).await; + let (networks, keys) = create_network_with_n_peers(3).await; let net1 = &networks[0]; let net2 = &networks[1]; @@ -422,13 +495,18 @@ async fn dht_put_and_get() { timestamp: 0x42u64, }; + // Generate a key let mut rng = test_rng(false); let keypair = KeyPair::generate(&mut rng); - let key = keypair.public_key.compress(); + // Put it into the keys collection. + let key: Address = (&keypair.public).into(); + assert!(keys.write().insert(key.clone(), keypair.public).is_none()); + // Put the record into the dht, keyed by the address. net1.dht_put(&key, &put_record, &keypair).await.unwrap(); + // Fetch the record. and make sure they are identical. let fetched_record = net2 .dht_get::<_, ValidatorRecord, KeyPair>(&key) .await diff --git a/network-libp2p/tests/request_response.rs b/network-libp2p/tests/request_response.rs index 579f35454b..50525d8915 100644 --- a/network-libp2p/tests/request_response.rs +++ b/network-libp2p/tests/request_response.rs @@ -113,10 +113,10 @@ impl TestNetwork { let addr1 = multiaddr![Memory(rng.gen::())]; let addr2 = multiaddr![Memory(rng.gen::())]; - let net1 = Network::new(network_config(addr1.clone())).await; + let net1 = Network::new(network_config(addr1.clone()), ()).await; net1.listen_on(vec![addr1.clone()]).await; - let net2 = Network::new(network_config(addr2.clone())).await; + let net2 = Network::new(network_config(addr2.clone()), ()).await; net2.listen_on(vec![addr2.clone()]).await; log::debug!(address = %addr1, peer_id = %net1.get_local_peer_id(), "Network 1"); @@ -154,16 +154,16 @@ impl TestNetwork { let addr3 = multiaddr![Memory(rng.gen::())]; let addr4 = multiaddr![Memory(rng.gen::())]; - let net1 = Network::new(network_config(addr1.clone())).await; + let net1 = Network::new(network_config(addr1.clone()), ()).await; net1.listen_on(vec![addr1.clone()]).await; - let net2 = Network::new(network_config(addr2.clone())).await; + let net2 = Network::new(network_config(addr2.clone()), ()).await; net2.listen_on(vec![addr2.clone()]).await; - let net3 = Network::new(network_config(addr3.clone())).await; + let net3 = Network::new(network_config(addr3.clone()), ()).await; net3.listen_on(vec![addr3.clone()]).await; - let net4 = Network::new(network_config(addr4.clone())).await; + let net4 = Network::new(network_config(addr4.clone()), ()).await; net4.listen_on(vec![addr4.clone()]).await; log::debug!(address = %addr1, peer_id = %net1.get_local_peer_id(), "Network 1"); diff --git a/network-mock/src/lib.rs b/network-mock/src/lib.rs index 328fd6c06c..3f2916c3b1 100644 --- a/network-mock/src/lib.rs +++ b/network-mock/src/lib.rs @@ -152,6 +152,7 @@ pub mod tests { } #[test(tokio::test)] + #[cfg(feature = "kad")] async fn dht_put_and_get() { let mut hub = MockHub::new(); let mut rng = test_rng(false); diff --git a/test-utils/src/test_network.rs b/test-utils/src/test_network.rs index 3a60e170af..17817bbcd1 100644 --- a/test-utils/src/test_network.rs +++ b/test-utils/src/test_network.rs @@ -82,7 +82,7 @@ impl TestNetwork for Network { true, NonZeroU8::new(1).unwrap(), ); - let network = Arc::new(Network::new(config).await); + let network = Arc::new(Network::new(config, ()).await); network.listen_on(vec![peer_address]).await; network } diff --git a/validator-network/Cargo.toml b/validator-network/Cargo.toml index 2b95860266..66f16c95d4 100644 --- a/validator-network/Cargo.toml +++ b/validator-network/Cargo.toml @@ -29,7 +29,8 @@ thiserror = "1.0" time = { version = "0.3" } tokio = { version = "1.41", features = ["rt"] } -nimiq-bls = { workspace = true, features = ["lazy", "serde-derive"] } +nimiq-keys = { workspace = true } nimiq-network-interface = { workspace = true } +nimiq-primitives = { workspace = true } nimiq-serde = { workspace = true } nimiq-utils = { workspace = true, features = ["futures", "spawn", "tagged-signing"] } diff --git a/validator-network/src/lib.rs b/validator-network/src/lib.rs index 4c32ba3988..58a5e035bc 100644 --- a/validator-network/src/lib.rs +++ b/validator-network/src/lib.rs @@ -5,11 +5,12 @@ pub mod validator_record; use async_trait::async_trait; use futures::stream::BoxStream; -use nimiq_bls::{lazy::LazyPublicKey, CompressedPublicKey, SecretKey}; +use nimiq_keys::{Address, KeyPair}; use nimiq_network_interface::{ network::{CloseReason, MsgAcceptance, Network, SubscribeEvents, Topic}, request::{Message, Request, RequestCommon}, }; +use nimiq_primitives::slots_allocation::Validators; pub use crate::error::NetworkError; @@ -28,9 +29,9 @@ pub trait ValidatorNetwork: Send + Sync { /// `None`, otherwise. fn set_validator_id(&self, validator_id: Option); - /// Tells the validator network the validator keys for the current set of active validators. + /// Tells the validator network the validator addresses for the current set of active validators. /// The keys must be ordered, such that the k-th entry is the validator with ID k. - async fn set_validators(&self, validator_keys: Vec); + fn set_validators(&self, validators: &Validators); /// Sends a message to a validator identified by its ID (position) in the `validator keys`. /// It must make a reasonable effort to establish a connection to the peer denoted with `validator_id` @@ -79,8 +80,8 @@ pub trait ValidatorNetwork: Send + Sync { /// Sets this node peer ID using its secret key and public key. async fn set_public_key( &self, - public_key: &CompressedPublicKey, - secret_key: &SecretKey, + validator_address: &Address, + signing_key_pair: &KeyPair, ) -> Result<(), Self::Error>; /// Closes the connection to the peer with `peer_id` with the given `close_reason`. diff --git a/validator-network/src/network_impl.rs b/validator-network/src/network_impl.rs index fb060acafc..6b8497755e 100644 --- a/validator-network/src/network_impl.rs +++ b/validator-network/src/network_impl.rs @@ -3,11 +3,12 @@ use std::{collections::BTreeMap, error::Error, fmt::Debug, sync::Arc}; use async_trait::async_trait; use futures::{stream::BoxStream, StreamExt, TryFutureExt}; use log::warn; -use nimiq_bls::{lazy::LazyPublicKey, CompressedPublicKey, KeyPair, SecretKey}; +use nimiq_keys::{Address, KeyPair}; use nimiq_network_interface::{ network::{CloseReason, MsgAcceptance, Network, SubscribeEvents, Topic}, request::{InboundRequestError, Message, Request, RequestCommon, RequestError}, }; +use nimiq_primitives::slots_allocation::{Validator, Validators}; use nimiq_serde::{Deserialize, Serialize}; use nimiq_utils::spawn; use parking_lot::RwLock; @@ -60,10 +61,10 @@ where network: Arc, /// Own validator ID if active, `None` otherwise. own_validator_id: Arc>>, - /// Set of public keys for each of the validators - validator_keys: Arc>>, + /// Per validator_id contains the validator_address for each of the validators + validators: Arc>>, /// Cache for mapping validator public keys to peer IDs - validator_peer_id_cache: Arc>>>, + validator_peer_id_cache: Arc>>>, } impl ValidatorNetworkImpl @@ -76,7 +77,7 @@ where Self { network, own_validator_id: Arc::new(RwLock::new(None)), - validator_keys: Arc::new(RwLock::new(vec![])), + validators: Arc::new(RwLock::new(None)), validator_peer_id_cache: Arc::new(RwLock::new(BTreeMap::new())), } } @@ -86,7 +87,7 @@ where ValidatorNetworkImpl { network: Arc::clone(&self.network), own_validator_id: Arc::clone(&self.own_validator_id), - validator_keys: Arc::clone(&self.validator_keys), + validators: Arc::clone(&self.validators), validator_peer_id_cache: Arc::clone(&self.validator_peer_id_cache), } } @@ -96,13 +97,23 @@ where self.own_validator_id.read().ok_or(NetworkError::NotElected) } - /// Looks up the peer ID for a validator public key in the DHT. + /// Given the Validators and a validator_id, returns the Validator represented by the id if it exists. + /// None otherwise. + fn get_validator(validators: Option<&Validators>, validator_id: u16) -> Option<&Validator> { + // Acquire read on the validators and make sure they have been set. Return None otherwise. + validators.and_then(|validators| { + (usize::from(validator_id) < validators.num_validators()) + .then(|| validators.get_validator_by_slot_band(validator_id)) + }) + } + + /// Looks up the peer ID for a validator address in the DHT. async fn resolve_peer_id( network: &N, - public_key: &LazyPublicKey, + validator_address: &Address, ) -> Result, NetworkError> { if let Some(record) = network - .dht_get::<_, ValidatorRecord, KeyPair>(public_key.compressed()) + .dht_get::<_, ValidatorRecord, KeyPair>(validator_address) .await? { Ok(Some(record.peer_id)) @@ -111,31 +122,33 @@ where } } - /// Looks up the peer ID for a validator public key in the DHT and updates + /// Looks up the peer ID for a validator address in the DHT and updates /// the internal cache. /// /// Assumes that the cache entry has been set to `InProgress` by the /// caller, will panic otherwise. - async fn update_peer_id_cache(&self, validator_id: u16, public_key: &LazyPublicKey) { - let cache_value = match Self::resolve_peer_id(&self.network, public_key).await { + /// + /// The given `validator_id` is used for logging purposes only. + async fn update_peer_id_cache(&self, validator_id: u16, validator_address: &Address) { + let cache_value = match Self::resolve_peer_id(&self.network, validator_address).await { Ok(Some(peer_id)) => { log::trace!( %peer_id, validator_id, - %public_key, + %validator_address, "Resolved validator peer ID" ); Ok(peer_id) } Ok(None) => { - log::debug!(validator_id, %public_key, "Unable to resolve validator peer ID: Entry not found in DHT"); + log::debug!(validator_id, %validator_address, "Unable to resolve validator peer ID: Entry not found in DHT"); Err(()) } Err(error) => { log::debug!( validator_id, ?error, - %public_key, + %validator_address, "Unable to resolve validator peer ID: Network error" ); Err(()) @@ -145,7 +158,7 @@ where match self .validator_peer_id_cache .write() - .get_mut(public_key.compressed()) + .get_mut(validator_address) { Some(cache_entry) => { if let CacheState::InProgress(prev_peer_id) = *cache_entry { @@ -163,16 +176,12 @@ where /// Look up the peer ID for a validator ID. fn get_validator_cache(&self, validator_id: u16) -> CacheState { - let public_key = match self.validator_keys.read().get(usize::from(validator_id)) { - Some(pk) => pk.clone(), - None => return CacheState::Error(None), + let validators = self.validators.read(); + let Some(validator) = Self::get_validator(validators.as_ref(), validator_id) else { + return CacheState::Error(None); }; - if let Some(cache_state) = self - .validator_peer_id_cache - .read() - .get(public_key.compressed()) - { + if let Some(cache_state) = self.validator_peer_id_cache.read().get(&validator.address) { match *cache_state { CacheState::Resolved(..) => return *cache_state, CacheState::Error(..) => {} @@ -189,7 +198,7 @@ where { // Re-check the validator Peer ID cache with the write lock taken and update it if necessary let mut validator_peer_id_cache = self.validator_peer_id_cache.write(); - if let Some(cache_state) = validator_peer_id_cache.get_mut(public_key.compressed()) { + if let Some(cache_state) = validator_peer_id_cache.get_mut(&validator.address) { new_cache_state = match *cache_state { CacheState::Resolved(..) => return *cache_state, CacheState::Error(prev_peer_id) => { @@ -209,17 +218,19 @@ where } else { new_cache_state = CacheState::InProgress(None); // No cache entry for this validator ID: we are going to perform the DHT query - validator_peer_id_cache.insert(public_key.compressed().clone(), new_cache_state); + validator_peer_id_cache.insert(validator.address.clone(), new_cache_state); log::debug!( - ?public_key, + ?validator.address, validator_id, "No cache entry found, querying DHT", ); } } + let self_ = self.arc_clone(); + let validator_address = validator.address.clone(); spawn(async move { - Self::update_peer_id_cache(&self_, validator_id, &public_key).await; + Self::update_peer_id_cache(&self_, validator_id, &validator_address).await; }); new_cache_state } @@ -232,30 +243,28 @@ where error: &RequestError, peer_id: &N::PeerId, ) { - match error { - // The no receiver is not an error since the peer might not be aggregating - RequestError::InboundRequest(InboundRequestError::NoReceiver) => {} - // In all other cases, clear the Peer ID cache for this validator - _ => { - // Make sure to drop the `self.validator_keys` lock after this line. - let validator_key = self - .validator_keys - .read() - .get(usize::from(validator_id)) - .cloned(); - if let Some(validator_key) = validator_key { - // Clear the peer ID cache only if the error happened for the same Peer ID that we have cached - let mut validator_peer_id_cache = self.validator_peer_id_cache.write(); - if let Some(cache_entry) = - validator_peer_id_cache.get_mut(validator_key.compressed()) - { - if let CacheState::Resolved(cached_peer_id) = *cache_entry { - if cached_peer_id == *peer_id { - *cache_entry = CacheState::Empty(cached_peer_id); - } - } - } - } + // The no receiver is not an error since the peer might not be aggregating. + if *error == RequestError::InboundRequest(InboundRequestError::NoReceiver) { + return; + } + + // Fetch the validator from the validators. If it does not exist that peer_id is not + // assigned in this epoch and there is no cached entry to clear. + let validators = self.validators.read(); + let Some(validator) = Self::get_validator(validators.as_ref(), validator_id) else { + return; + }; + + // Fetch the cache. If it does not exist there is no need to clear. + let mut validator_peer_id_cache = self.validator_peer_id_cache.write(); + let Some(cache_entry) = validator_peer_id_cache.get_mut(&validator.address) else { + return; + }; + + // Clear the peer ID cache only if the error happened for the same Peer ID that we have cached. + if let CacheState::Resolved(cached_peer_id) = *cache_entry { + if cached_peer_id == *peer_id { + *cache_entry = CacheState::Empty(cached_peer_id); } } } @@ -299,19 +308,20 @@ where *self.own_validator_id.write() = validator_id; } - /// Tells the validator network the validator keys for the current set of active validators. The keys must be - /// ordered, such that the k-th entry is the validator with ID k. - async fn set_validators(&self, validator_keys: Vec) { - log::trace!(?validator_keys, "Setting validators for ValidatorNetwork"); + fn set_validators(&self, validators: &Validators) { + log::trace!(?validators, "Setting validators for ValidatorNetwork"); - // Put the `validator_keys` into the same order as the + // Put the `validator_addresses` into the same order as the // `self.validator_peer_id_cache` so that we can simultaneously iterate - // over them. Note that `LazyPublicKey::cmp` forwards to - // `CompressedPublicKey::cmp`. - let mut sorted_validator_keys: Vec<_> = validator_keys.iter().collect(); - sorted_validator_keys.sort_unstable(); - let mut sorted_validator_keys = sorted_validator_keys.into_iter(); - let mut cur_key = sorted_validator_keys.next(); + // over them. + let mut sorted_validator_addresses: Vec<_> = validators + .validators + .iter() + .map(|validator| &validator.address) + .collect(); + sorted_validator_addresses.sort_unstable(); + let mut sorted_validator_addresses = sorted_validator_addresses.into_iter(); + let mut cur_key = sorted_validator_addresses.next(); // Drop peer ID cache, but keep validators that are still active and // validators who are currently being resolved. @@ -326,13 +336,13 @@ where return true; } // Move `cur_key` until we're greater or equal to `key`. - while cur_key.map(|k| k.compressed() < key).unwrap_or(false) { - cur_key = sorted_validator_keys.next(); + while cur_key.map(|k| k < key).unwrap_or(false) { + cur_key = sorted_validator_addresses.next(); } - Some(key) == cur_key.map(LazyPublicKey::compressed) + Some(key) == cur_key }); - *self.validator_keys.write() = validator_keys; + *self.validators.write() = Some(validators.clone()); } async fn send_to(&self, validator_id: u16, msg: M) -> Result<(), Self::Error> { @@ -477,8 +487,8 @@ where async fn set_public_key( &self, - public_key: &CompressedPublicKey, - secret_key: &SecretKey, + validator_address: &Address, + signing_key_pair: &KeyPair, ) -> Result<(), Self::Error> { let peer_id = self.network.get_local_peer_id(); let record = ValidatorRecord::new( @@ -486,7 +496,7 @@ where (OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000) as u64, ); self.network - .dht_put(public_key, &record, &KeyPair::from(*secret_key)) + .dht_put(validator_address, &record, signing_key_pair) .await?; Ok(()) diff --git a/validator/src/validator.rs b/validator/src/validator.rs index ab0d8de29c..3ade892a99 100644 --- a/validator/src/validator.rs +++ b/validator/src/validator.rs @@ -15,7 +15,7 @@ use nimiq_account::Validator as ValidatorAccount; use nimiq_block::{Block, BlockType, EquivocationProof}; use nimiq_blockchain::{interface::HistoryInterface, BlockProducer, Blockchain}; use nimiq_blockchain_interface::{AbstractBlockchain, BlockchainEvent, ForkEvent}; -use nimiq_bls::{lazy::LazyPublicKey, KeyPair as BlsKeyPair}; +use nimiq_bls::KeyPair as BlsKeyPair; use nimiq_consensus::{ messages::{BlockBodyTopic, BlockHeaderMessage, BlockHeaderTopic}, Consensus, ConsensusEvent, ConsensusProxy, @@ -353,16 +353,8 @@ where // Inform the network about the current validator ID. self.network.set_validator_id(*self.slot_band.read()); - let voting_keys: Vec = validators - .iter() - .map(|validator| validator.voting_key.clone()) - .collect(); - let network = Arc::clone(&self.network); - - // TODO might better be done without the task. - spawn(async move { - network.set_validators(voting_keys).await; - }); + // Set the elected validators of the current epoch in the network as well. + self.network.set_validators(validators); // Check validator configuration if let Some(validator) = self.get_validator(&blockchain) { @@ -628,14 +620,12 @@ where /// Publish our own validator record to the DHT. fn publish_dht(&self) { - let key = self.voting_key(); + let key_pair = self.signing_key(); + let validator_address = self.validator_address(); let network = Arc::clone(&self.network); spawn(async move { - if let Err(err) = network - .set_public_key(&key.public_key.compress(), &key.secret_key) - .await - { + if let Err(err) = network.set_public_key(&validator_address, &key_pair).await { error!("could not set up DHT record: {:?}", err); } });