Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Allow remotes to not open a legacy substream (#7075)
Browse files Browse the repository at this point in the history
* Allow remotes to not open a legacy substream

* Misc fixes

* Special case first protocol as the one bearing the handshake
  • Loading branch information
tomaka authored and bkchr committed Sep 18, 2020
1 parent c071d06 commit 72ea91f
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 154 deletions.
235 changes: 117 additions & 118 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use sp_consensus::{
block_validation::BlockAnnounceValidator,
import_queue::{BlockImportResult, BlockImportError, IncomingBlock, Origin}
};
use codec::{Decode, Encode};
use codec::{Decode, DecodeAll, Encode};
use sp_runtime::{generic::BlockId, ConsensusEngineId, Justification};
use sp_runtime::traits::{
Block as BlockT, Header as HeaderT, NumberFor, Zero, CheckedSub
Expand All @@ -53,7 +53,7 @@ use std::borrow::Cow;
use std::collections::{HashMap, HashSet, VecDeque, hash_map::Entry};
use std::sync::Arc;
use std::fmt::Write;
use std::{io, num::NonZeroUsize, pin::Pin, task::Poll, time};
use std::{io, iter, num::NonZeroUsize, pin::Pin, task::Poll, time};
use log::{log, Level, trace, debug, warn, error};
use wasm_timer::Instant;

Expand Down Expand Up @@ -275,8 +275,6 @@ struct Peer<B: BlockT, H: ExHashT> {
pub struct PeerInfo<B: BlockT> {
/// Roles
pub roles: Roles,
/// Protocol version
pub protocol_version: u32,
/// Peer best block hash
pub best_hash: B::Hash,
/// Peer best block number
Expand Down Expand Up @@ -395,14 +393,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
};

let (peerset, peerset_handle) = sc_peerset::Peerset::from_config(peerset_config);
let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::<Vec<u8>>();
let mut behaviour = GenericProto::new(
local_peer_id,
protocol_id.clone(),
versions,
build_status_message(&config, &chain),
peerset,
);

let mut legacy_equiv_by_name = HashMap::new();

Expand All @@ -413,7 +403,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
proto.push_str("/transactions/1");
proto
});
behaviour.register_notif_protocol(transactions_protocol.clone(), Vec::new());
legacy_equiv_by_name.insert(transactions_protocol.clone(), Fallback::Transactions);

let block_announces_protocol: Cow<'static, str> = Cow::from({
Expand All @@ -423,12 +412,24 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
proto.push_str("/block-announces/1");
proto
});
behaviour.register_notif_protocol(
block_announces_protocol.clone(),
BlockAnnouncesHandshake::build(&config, &chain).encode()
);
legacy_equiv_by_name.insert(block_announces_protocol.clone(), Fallback::BlockAnnounce);

let behaviour = {
let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::<Vec<u8>>();
let block_announces_handshake = BlockAnnouncesHandshake::build(&config, &chain).encode();
GenericProto::new(
local_peer_id,
protocol_id.clone(),
versions,
build_status_message(&config, &chain),
peerset,
// As documented in `GenericProto`, the first protocol in the list is always the
// one carrying the handshake reported in the `CustomProtocolOpen` event.
iter::once((block_announces_protocol.clone(), block_announces_handshake))
.chain(iter::once((transactions_protocol.clone(), vec![]))),
)
};

let protocol = Protocol {
tick_timeout: Box::pin(interval(TICK_TIMEOUT)),
propagate_timeout: Box::pin(interval(PROPAGATE_TIMEOUT)),
Expand Down Expand Up @@ -839,99 +840,86 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
}

/// Called on receipt of a status message via the legacy protocol on the first connection between two peers.
pub fn on_peer_connected(
/// Called on the first connection between two peers, after their exchange of handshake.
fn on_peer_connected(
&mut self,
who: PeerId,
status: message::Status<B>,
status: BlockAnnouncesHandshake<B>,
notifications_sink: NotificationsSink,
) -> CustomMessageOutcome<B> {
trace!(target: "sync", "New peer {} {:?}", who, status);
let _protocol_version = {
if self.context_data.peers.contains_key(&who) {
debug!(target: "sync", "Ignoring duplicate status packet from {}", who);
return CustomMessageOutcome::None;
}
if status.genesis_hash != self.genesis_hash {
log!(
target: "sync",
if self.important_peers.contains(&who) { Level::Warn } else { Level::Trace },
"Peer is on different chain (our genesis: {} theirs: {})",
self.genesis_hash, status.genesis_hash
);
self.peerset_handle.report_peer(who.clone(), rep::GENESIS_MISMATCH);
self.behaviour.disconnect_peer(&who);

if self.boot_node_ids.contains(&who) {
error!(
target: "sync",
"Bootnode with peer id `{}` is on a different chain (our genesis: {} theirs: {})",
who,
self.genesis_hash,
status.genesis_hash,
);
}
if self.context_data.peers.contains_key(&who) {
debug!(target: "sync", "Ignoring duplicate status packet from {}", who);
return CustomMessageOutcome::None;
}
if status.genesis_hash != self.genesis_hash {
log!(
target: "sync",
if self.important_peers.contains(&who) { Level::Warn } else { Level::Trace },
"Peer is on different chain (our genesis: {} theirs: {})",
self.genesis_hash, status.genesis_hash
);
self.peerset_handle.report_peer(who.clone(), rep::GENESIS_MISMATCH);
self.behaviour.disconnect_peer(&who);

return CustomMessageOutcome::None;
}
if status.version < MIN_VERSION && CURRENT_VERSION < status.min_supported_version {
log!(
if self.boot_node_ids.contains(&who) {
error!(
target: "sync",
if self.important_peers.contains(&who) { Level::Warn } else { Level::Trace },
"Peer {:?} using unsupported protocol version {}", who, status.version
"Bootnode with peer id `{}` is on a different chain (our genesis: {} theirs: {})",
who,
self.genesis_hash,
status.genesis_hash,
);
self.peerset_handle.report_peer(who.clone(), rep::BAD_PROTOCOL);
self.behaviour.disconnect_peer(&who);
return CustomMessageOutcome::None;
}

if self.config.roles.is_light() {
// we're not interested in light peers
if status.roles.is_light() {
debug!(target: "sync", "Peer {} is unable to serve light requests", who);
self.peerset_handle.report_peer(who.clone(), rep::BAD_ROLE);
self.behaviour.disconnect_peer(&who);
return CustomMessageOutcome::None;
}
return CustomMessageOutcome::None;
}

// we don't interested in peers that are far behind us
let self_best_block = self
.context_data
.chain
.info()
.best_number;
let blocks_difference = self_best_block
.checked_sub(&status.best_number)
.unwrap_or_else(Zero::zero)
.saturated_into::<u64>();
if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE {
debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who);
self.peerset_handle.report_peer(who.clone(), rep::PEER_BEHIND_US_LIGHT);
self.behaviour.disconnect_peer(&who);
return CustomMessageOutcome::None;
}
if self.config.roles.is_light() {
// we're not interested in light peers
if status.roles.is_light() {
debug!(target: "sync", "Peer {} is unable to serve light requests", who);
self.peerset_handle.report_peer(who.clone(), rep::BAD_ROLE);
self.behaviour.disconnect_peer(&who);
return CustomMessageOutcome::None;
}

let peer = Peer {
info: PeerInfo {
protocol_version: status.version,
roles: status.roles,
best_hash: status.best_hash,
best_number: status.best_number
},
block_request: None,
known_transactions: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS)
.expect("Constant is nonzero")),
known_blocks: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_BLOCKS)
.expect("Constant is nonzero")),
next_request_id: 0,
obsolete_requests: HashMap::new(),
};
self.context_data.peers.insert(who.clone(), peer);
// we don't interested in peers that are far behind us
let self_best_block = self
.context_data
.chain
.info()
.best_number;
let blocks_difference = self_best_block
.checked_sub(&status.best_number)
.unwrap_or_else(Zero::zero)
.saturated_into::<u64>();
if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE {
debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who);
self.peerset_handle.report_peer(who.clone(), rep::PEER_BEHIND_US_LIGHT);
self.behaviour.disconnect_peer(&who);
return CustomMessageOutcome::None;
}
}

debug!(target: "sync", "Connected {}", who);
status.version
let peer = Peer {
info: PeerInfo {
roles: status.roles,
best_hash: status.best_hash,
best_number: status.best_number
},
block_request: None,
known_transactions: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS)
.expect("Constant is nonzero")),
known_blocks: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_BLOCKS)
.expect("Constant is nonzero")),
next_request_id: 0,
obsolete_requests: HashMap::new(),
};
self.context_data.peers.insert(who.clone(), peer);

debug!(target: "sync", "Connected {}", who);

let info = self.context_data.peers.get(&who).expect("We just inserted above; QED").info.clone();
self.pending_messages.push_back(CustomMessageOutcome::PeerNewBest(who.clone(), status.best_number));
Expand Down Expand Up @@ -1161,20 +1149,12 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
if inserted || force {
let message = message::BlockAnnounce {
header: header.clone(),
state: if peer.info.protocol_version >= 4 {
if is_best {
Some(message::BlockState::Best)
} else {
Some(message::BlockState::Normal)
}
} else {
None
},
data: if peer.info.protocol_version >= 4 {
Some(data.clone())
state: if is_best {
Some(message::BlockState::Best)
} else {
None
Some(message::BlockState::Normal)
},
data: Some(data.clone()),
};

self.behaviour.write_notification(
Expand Down Expand Up @@ -1620,9 +1600,20 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {

let outcome = match event {
GenericProtoOut::CustomProtocolOpen { peer_id, received_handshake, notifications_sink, .. } => {
match <Message<B> as Decode>::decode(&mut &received_handshake[..]) {
Ok(GenericMessage::Status(handshake)) =>
self.on_peer_connected(peer_id, handshake, notifications_sink),
// `received_handshake` can be either a `Status` message if received from the
// legacy substream ,or a `BlockAnnouncesHandshake` if received from the block
// announces substream.
match <Message<B> as DecodeAll>::decode_all(&mut &received_handshake[..]) {
Ok(GenericMessage::Status(handshake)) => {
let handshake = BlockAnnouncesHandshake {
roles: handshake.roles,
best_number: handshake.best_number,
best_hash: handshake.best_hash,
genesis_hash: handshake.genesis_hash,
};

self.on_peer_connected(peer_id, handshake, notifications_sink)
},
Ok(msg) => {
debug!(
target: "sync",
Expand All @@ -1634,15 +1625,23 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
CustomMessageOutcome::None
}
Err(err) => {
debug!(
target: "sync",
"Couldn't decode handshake sent by {}: {:?}: {}",
peer_id,
received_handshake,
err.what()
);
self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE);
CustomMessageOutcome::None
match <BlockAnnouncesHandshake<B> as DecodeAll>::decode_all(&mut &received_handshake[..]) {
Ok(handshake) => {
self.on_peer_connected(peer_id, handshake, notifications_sink)
}
Err(err2) => {
debug!(
target: "sync",
"Couldn't decode handshake sent by {}: {:?}: {} & {}",
peer_id,
received_handshake,
err.what(),
err2,
);
self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE);
CustomMessageOutcome::None
}
}
}
}
}
Expand Down
9 changes: 8 additions & 1 deletion client/network/src/protocol/generic_proto/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,14 +336,21 @@ impl GenericProto {
versions: &[u8],
handshake_message: Vec<u8>,
peerset: sc_peerset::Peerset,
notif_protocols: impl Iterator<Item = (Cow<'static, str>, Vec<u8>)>,
) -> Self {
let notif_protocols = notif_protocols
.map(|(n, hs)| (n, Arc::new(RwLock::new(hs))))
.collect::<Vec<_>>();

assert!(!notif_protocols.is_empty());

let legacy_handshake_message = Arc::new(RwLock::new(handshake_message));
let legacy_protocol = RegisteredProtocol::new(protocol, versions, legacy_handshake_message);

GenericProto {
local_peer_id,
legacy_protocol,
notif_protocols: Vec::new(),
notif_protocols,
peerset,
peers: FnvHashMap::default(),
delays: Default::default(),
Expand Down
Loading

0 comments on commit 72ea91f

Please sign in to comment.