Skip to content

Commit

Permalink
Bundle protocol and packet_id together in chain sync (openethereum#10315
Browse files Browse the repository at this point in the history
)

Define a new `enum` where devp2p subprotocol packet ids (currently eth and par) are defined. Additionally provide functionality to query id value and protocol of a given id object.
  • Loading branch information
elferdo authored and thefallentree committed Feb 14, 2019
1 parent ba20de0 commit a7ce7a4
Show file tree
Hide file tree
Showing 12 changed files with 369 additions and 208 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ethcore/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ authors = ["Parity Technologies <[email protected]>"]

[dependencies]
common-types = { path = "../types" }
enum_primitive = "0.1.1"
ethcore = { path = ".." }
ethcore-io = { path = "../../util/io" }
ethcore-light = { path = "../light" }
Expand Down
8 changes: 4 additions & 4 deletions ethcore/sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use std::net::{SocketAddr, AddrParseError};
use std::str::FromStr;
use parking_lot::{RwLock, Mutex};
use chain::{ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_62,
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3,
PRIVATE_TRANSACTION_PACKET, SIGNED_PRIVATE_TRANSACTION_PACKET};
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_2, PAR_PROTOCOL_VERSION_3};
use chain::sync_packet::SyncPacket::{PrivateTransactionPacket, SignedPrivateTransactionPacket};
use light::client::AsLightClient;
use light::Provider;
use light::net::{
Expand Down Expand Up @@ -579,9 +579,9 @@ impl ChainNotify for EthSync {
match message_type {
ChainMessageType::Consensus(message) => self.eth_handler.sync.write().propagate_consensus_packet(&mut sync_io, message),
ChainMessageType::PrivateTransaction(transaction_hash, message) =>
self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, WARP_SYNC_PROTOCOL_ID, PRIVATE_TRANSACTION_PACKET, message),
self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, PrivateTransactionPacket, message),
ChainMessageType::SignedPrivateTransaction(transaction_hash, message) =>
self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, WARP_SYNC_PROTOCOL_ID, SIGNED_PRIVATE_TRANSACTION_PACKET, message),
self.eth_handler.sync.write().propagate_private_transaction(&mut sync_io, transaction_hash, SignedPrivateTransactionPacket, message),
}
});
}
Expand Down
87 changes: 48 additions & 39 deletions ethcore/sync/src/chain/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use api::WARP_SYNC_PROTOCOL_ID;
use block_sync::{BlockDownloaderImportError as DownloaderImportError, DownloadAction};
use bytes::Bytes;
use enum_primitive::FromPrimitive;
use ethcore::error::{Error as EthcoreError, ErrorKind as EthcoreErrorKind, ImportErrorKind, BlockError};
use ethcore::snapshot::{ManifestData, RestorationStatus};
use ethcore::verification::queue::kind::blocks::Unverified;
Expand All @@ -33,6 +34,20 @@ use types::BlockNumber;
use types::block_status::BlockStatus;
use types::ids::BlockId;

use super::sync_packet::{PacketInfo, SyncPacket};
use super::sync_packet::SyncPacket::{
StatusPacket,
NewBlockHashesPacket,
BlockHeadersPacket,
BlockBodiesPacket,
NewBlockPacket,
ReceiptsPacket,
SnapshotManifestPacket,
SnapshotDataPacket,
PrivateTransactionPacket,
SignedPrivateTransactionPacket,
};

use super::{
BlockSet,
ChainSync,
Expand All @@ -48,16 +63,6 @@ use super::{
MAX_NEW_HASHES,
PAR_PROTOCOL_VERSION_1,
PAR_PROTOCOL_VERSION_3,
BLOCK_BODIES_PACKET,
BLOCK_HEADERS_PACKET,
NEW_BLOCK_HASHES_PACKET,
NEW_BLOCK_PACKET,
PRIVATE_TRANSACTION_PACKET,
RECEIPTS_PACKET,
SIGNED_PRIVATE_TRANSACTION_PACKET,
SNAPSHOT_DATA_PACKET,
SNAPSHOT_MANIFEST_PACKET,
STATUS_PACKET,
};

/// The Chain Sync Handler: handles responses from peers
Expand All @@ -67,36 +72,40 @@ impl SyncHandler {
/// Handle incoming packet from peer
pub fn on_packet(sync: &mut ChainSync, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
let rlp = Rlp::new(data);
let result = match packet_id {
STATUS_PACKET => SyncHandler::on_peer_status(sync, io, peer, &rlp),
BLOCK_HEADERS_PACKET => SyncHandler::on_peer_block_headers(sync, io, peer, &rlp),
BLOCK_BODIES_PACKET => SyncHandler::on_peer_block_bodies(sync, io, peer, &rlp),
RECEIPTS_PACKET => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp),
NEW_BLOCK_PACKET => SyncHandler::on_peer_new_block(sync, io, peer, &rlp),
NEW_BLOCK_HASHES_PACKET => SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp),
SNAPSHOT_MANIFEST_PACKET => SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp),
SNAPSHOT_DATA_PACKET => SyncHandler::on_snapshot_data(sync, io, peer, &rlp),
PRIVATE_TRANSACTION_PACKET => SyncHandler::on_private_transaction(sync, io, peer, &rlp),
SIGNED_PRIVATE_TRANSACTION_PACKET => SyncHandler::on_signed_private_transaction(sync, io, peer, &rlp),
_ => {
debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id);
Ok(())
}
};
if let Some(packet_id) = SyncPacket::from_u8(packet_id) {
let result = match packet_id {
StatusPacket => SyncHandler::on_peer_status(sync, io, peer, &rlp),
BlockHeadersPacket => SyncHandler::on_peer_block_headers(sync, io, peer, &rlp),
BlockBodiesPacket => SyncHandler::on_peer_block_bodies(sync, io, peer, &rlp),
ReceiptsPacket => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp),
NewBlockPacket => SyncHandler::on_peer_new_block(sync, io, peer, &rlp),
NewBlockHashesPacket => SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp),
SnapshotManifestPacket => SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp),
SnapshotDataPacket => SyncHandler::on_snapshot_data(sync, io, peer, &rlp),
PrivateTransactionPacket => SyncHandler::on_private_transaction(sync, io, peer, &rlp),
SignedPrivateTransactionPacket => SyncHandler::on_signed_private_transaction(sync, io, peer, &rlp),
_ => {
debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id.id());
Ok(())
}
};

match result {
Err(DownloaderImportError::Invalid) => {
debug!(target:"sync", "{} -> Invalid packet {}", peer, packet_id);
io.disable_peer(peer);
sync.deactivate_peer(io, peer);
},
Err(DownloaderImportError::Useless) => {
sync.deactivate_peer(io, peer);
},
Ok(()) => {
// give a task to the same peer first
sync.sync_peer(io, peer, false);
},
match result {
Err(DownloaderImportError::Invalid) => {
debug!(target:"sync", "{} -> Invalid packet {}", peer, packet_id.id());
io.disable_peer(peer);
sync.deactivate_peer(io, peer);
},
Err(DownloaderImportError::Useless) => {
sync.deactivate_peer(io, peer);
},
Ok(()) => {
// give a task to the same peer first
sync.sync_peer(io, peer, false);
},
}
} else {
debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id);
}
}

Expand Down
41 changes: 13 additions & 28 deletions ethcore/sync/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
//! All other messages are ignored.
mod handler;
pub mod sync_packet;
mod propagator;
mod requester;
mod supplier;
Expand All @@ -103,7 +104,7 @@ use fastmap::{H256FastMap, H256FastSet};
use parking_lot::{Mutex, RwLock, RwLockWriteGuard};
use bytes::Bytes;
use rlp::{RlpStream, DecoderError};
use network::{self, PeerId, PacketId, ProtocolId};
use network::{self, PeerId, PacketId};
use network::client_version::ClientVersion;
use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo, BlockQueueInfo};
use ethcore::snapshot::{RestorationStatus};
Expand All @@ -112,13 +113,19 @@ use super::{WarpSync, SyncConfig};
use block_sync::{BlockDownloader, DownloadAction};
use rand::Rng;
use snapshot::{Snapshot};
use api::{EthProtocolInfo as PeerInfoDigest, ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID, PriorityTask};
use api::{EthProtocolInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID, PriorityTask};
use private_tx::PrivateTxHandler;
use transactions_stats::{TransactionsStats, Stats as TransactionStats};
use types::transaction::UnverifiedTransaction;
use types::BlockNumber;

use self::handler::SyncHandler;
use self::sync_packet::{PacketInfo, SyncPacket};
use self::sync_packet::SyncPacket::{
NewBlockPacket,
StatusPacket,
};

use self::propagator::SyncPropagator;
use self::requester::SyncRequester;
pub(crate) use self::supplier::SyncSupplier;
Expand Down Expand Up @@ -154,28 +161,6 @@ const MAX_TRANSACTION_PACKET_SIZE: usize = 5 * 1024 * 1024;
const SNAPSHOT_RESTORE_THRESHOLD: BlockNumber = 30000;
const SNAPSHOT_MIN_PEERS: usize = 3;

pub const STATUS_PACKET: u8 = 0x00;
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
const TRANSACTIONS_PACKET: u8 = 0x02;
pub const GET_BLOCK_HEADERS_PACKET: u8 = 0x03;
pub const BLOCK_HEADERS_PACKET: u8 = 0x04;
pub const GET_BLOCK_BODIES_PACKET: u8 = 0x05;
const BLOCK_BODIES_PACKET: u8 = 0x06;
const NEW_BLOCK_PACKET: u8 = 0x07;

pub const GET_NODE_DATA_PACKET: u8 = 0x0d;
pub const NODE_DATA_PACKET: u8 = 0x0e;
pub const GET_RECEIPTS_PACKET: u8 = 0x0f;
pub const RECEIPTS_PACKET: u8 = 0x10;

pub const GET_SNAPSHOT_MANIFEST_PACKET: u8 = 0x11;
pub const SNAPSHOT_MANIFEST_PACKET: u8 = 0x12;
pub const GET_SNAPSHOT_DATA_PACKET: u8 = 0x13;
pub const SNAPSHOT_DATA_PACKET: u8 = 0x14;
pub const CONSENSUS_DATA_PACKET: u8 = 0x15;
pub const PRIVATE_TRANSACTION_PACKET: u8 = 0x16;
pub const SIGNED_PRIVATE_TRANSACTION_PACKET: u8 = 0x17;

const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3;

const WAIT_PEERS_TIMEOUT: Duration = Duration::from_secs(5);
Expand Down Expand Up @@ -484,7 +469,7 @@ impl ChainSyncApi {
for peers in sync.get_peers(&chain_info, PeerState::SameBlock).chunks(10) {
check_deadline(deadline)?;
for peer in peers {
SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer, NEW_BLOCK_PACKET, rlp.clone());
SyncPropagator::send_packet(io, *peer, NewBlockPacket, rlp.clone());
if let Some(ref mut peer) = sync.peers.get_mut(peer) {
peer.latest_hash = hash;
}
Expand Down Expand Up @@ -1146,7 +1131,7 @@ impl ChainSync {
}
}
packet.complete_unbounded_list();
io.respond(STATUS_PACKET, packet.out())
io.respond(StatusPacket.id(), packet.out())
}

pub fn maintain_peers(&mut self, io: &mut SyncIo) {
Expand Down Expand Up @@ -1331,8 +1316,8 @@ impl ChainSync {
}

/// Broadcast private transaction message to peers.
pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, protocol: ProtocolId, packet_id: PacketId, packet: Bytes) {
SyncPropagator::propagate_private_transaction(self, io, transaction_hash, protocol, packet_id, packet);
pub fn propagate_private_transaction(&mut self, io: &mut SyncIo, transaction_hash: H256, packet_id: SyncPacket, packet: Bytes) {
SyncPropagator::propagate_private_transaction(self, io, transaction_hash, packet_id, packet);
}
}

Expand Down
34 changes: 19 additions & 15 deletions ethcore/sync/src/chain/propagator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,33 @@
use std::cmp;
use std::collections::HashSet;

use api::{ETH_PROTOCOL, WARP_SYNC_PROTOCOL_ID};
use bytes::Bytes;
use ethereum_types::H256;
use fastmap::H256FastSet;
use network::{PeerId, PacketId, ProtocolId};
use network::client_version::ClientCapabilities;
use network::PeerId;
use rand::Rng;
use rlp::{Encodable, RlpStream};
use sync_io::SyncIo;
use types::transaction::SignedTransaction;
use types::BlockNumber;
use types::blockchain_info::BlockChainInfo;

use super::sync_packet::SyncPacket;
use super::sync_packet::SyncPacket::{
NewBlockHashesPacket,
TransactionsPacket,
NewBlockPacket,
ConsensusDataPacket,
};

use super::{
random,
ChainSync,
MAX_TRANSACTION_PACKET_SIZE,
MAX_PEER_LAG_PROPAGATION,
MAX_PEERS_PROPAGATION,
MIN_PEERS_PROPAGATION,
CONSENSUS_DATA_PACKET,
NEW_BLOCK_HASHES_PACKET,
NEW_BLOCK_PACKET,
TRANSACTIONS_PACKET,
};

/// The Chain Sync Propagator: propagates data to peers
Expand All @@ -53,7 +56,8 @@ impl SyncPropagator {
let sent = peers.len();
let mut send_packet = |io: &mut SyncIo, rlp: Bytes| {
for peer_id in peers {
SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer_id, NEW_BLOCK_PACKET, rlp.clone());
SyncPropagator::send_packet(io, *peer_id, NewBlockPacket, rlp.clone());

if let Some(ref mut peer) = sync.peers.get_mut(peer_id) {
peer.latest_hash = chain_info.best_block_hash.clone();
}
Expand Down Expand Up @@ -88,7 +92,7 @@ impl SyncPropagator {
if let Some(ref mut peer) = sync.peers.get_mut(peer_id) {
peer.latest_hash = best_block_hash;
}
SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer_id, NEW_BLOCK_HASHES_PACKET, rlp.clone());
SyncPropagator::send_packet(io, *peer_id, NewBlockHashesPacket, rlp.clone());
}
sent
}
Expand Down Expand Up @@ -156,7 +160,7 @@ impl SyncPropagator {

let send_packet = |io: &mut SyncIo, peer_id: PeerId, sent: usize, rlp: Bytes| {
let size = rlp.len();
SyncPropagator::send_packet(io, ETH_PROTOCOL, peer_id, TRANSACTIONS_PACKET, rlp);
SyncPropagator::send_packet(io, peer_id, TransactionsPacket, rlp);
trace!(target: "sync", "{:02} <- Transactions ({} entries; {} bytes)", peer_id, sent, size);
};

Expand Down Expand Up @@ -275,7 +279,7 @@ impl SyncPropagator {
io.chain().chain_info().total_difficulty
);
for peer_id in &peers {
SyncPropagator::send_packet(io, ETH_PROTOCOL, *peer_id, NEW_BLOCK_PACKET, rlp.clone());
SyncPropagator::send_packet(io, *peer_id, NewBlockPacket, rlp.clone());
}
}
}
Expand All @@ -285,12 +289,12 @@ impl SyncPropagator {
let lucky_peers = ChainSync::select_random_peers(&sync.get_consensus_peers());
trace!(target: "sync", "Sending consensus packet to {:?}", lucky_peers);
for peer_id in lucky_peers {
SyncPropagator::send_packet(io, WARP_SYNC_PROTOCOL_ID, peer_id, CONSENSUS_DATA_PACKET, packet.clone());
SyncPropagator::send_packet(io, peer_id, ConsensusDataPacket, packet.clone());
}
}

/// Broadcast private transaction message to peers.
pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, protocol: ProtocolId, packet_id: PacketId, packet: Bytes) {
pub fn propagate_private_transaction(sync: &mut ChainSync, io: &mut SyncIo, transaction_hash: H256, packet_id: SyncPacket, packet: Bytes) {
let lucky_peers = ChainSync::select_random_peers(&sync.get_private_transaction_peers(&transaction_hash));
if lucky_peers.is_empty() {
error!(target: "privatetx", "Cannot propagate the packet, no peers with private tx enabled connected");
Expand All @@ -300,7 +304,7 @@ impl SyncPropagator {
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
peer.last_sent_private_transactions.insert(transaction_hash);
}
SyncPropagator::send_packet(io, protocol, peer_id, packet_id, packet.clone());
SyncPropagator::send_packet(io, peer_id, packet_id, packet.clone());
}
}
}
Expand All @@ -321,8 +325,8 @@ impl SyncPropagator {
}

/// Generic packet sender
pub fn send_packet(sync: &mut SyncIo, protocol: ProtocolId, peer_id: PeerId, packet_id: PacketId, packet: Bytes) {
if let Err(e) = sync.send_protocol(protocol, peer_id, packet_id, packet) {
pub fn send_packet(sync: &mut SyncIo, peer_id: PeerId, packet_id: SyncPacket, packet: Bytes) {
if let Err(e) = sync.send(peer_id, packet_id, packet) {
debug!(target:"sync", "Error sending packet: {:?}", e);
sync.disconnect_peer(peer_id);
}
Expand Down
Loading

0 comments on commit a7ce7a4

Please sign in to comment.