Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Reduce locking
Browse files Browse the repository at this point in the history
  • Loading branch information
arkpar committed Jun 20, 2016
1 parent 75a3850 commit 1ffe0c1
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 34 deletions.
56 changes: 34 additions & 22 deletions sync/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1097,7 +1097,7 @@ impl ChainSync {
Ok(Some((RECEIPTS_PACKET, rlp_result)))
}

fn return_rlp<FRlp, FError>(&self, io: &mut SyncIo, rlp: &UntrustedRlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError>
fn return_rlp<FRlp, FError>(io: &mut SyncIo, rlp: &UntrustedRlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError>
where FRlp : Fn(&SyncIo, &UntrustedRlp, PeerId) -> RlpResponseResult,
FError : FnOnce(UtilError) -> String
{
Expand All @@ -1114,37 +1114,48 @@ impl ChainSync {
}

/// Dispatch incoming requests and responses
pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
pub fn dispatch_packet(sync: &RwLock<ChainSync>, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
let rlp = UntrustedRlp::new(data);

if packet_id != STATUS_PACKET && !self.peers.contains_key(&peer) {
debug!(target:"sync", "Unexpected packet from unregistered peer: {}:{}", peer, io.peer_info(peer));
return;
}
let result = match packet_id {
STATUS_PACKET => self.on_peer_status(io, peer, &rlp),
TRANSACTIONS_PACKET => self.on_peer_transactions(io, peer, &rlp),
BLOCK_HEADERS_PACKET => self.on_peer_block_headers(io, peer, &rlp),
BLOCK_BODIES_PACKET => self.on_peer_block_bodies(io, peer, &rlp),
NEW_BLOCK_PACKET => self.on_peer_new_block(io, peer, &rlp),
NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp),

GET_BLOCK_BODIES_PACKET => self.return_rlp(io, &rlp, peer,
GET_BLOCK_BODIES_PACKET => ChainSync::return_rlp(io, &rlp, peer,
ChainSync::return_block_bodies,
|e| format!("Error sending block bodies: {:?}", e)),

GET_BLOCK_HEADERS_PACKET => self.return_rlp(io, &rlp, peer,
GET_BLOCK_HEADERS_PACKET => ChainSync::return_rlp(io, &rlp, peer,
ChainSync::return_block_headers,
|e| format!("Error sending block headers: {:?}", e)),

GET_RECEIPTS_PACKET => self.return_rlp(io, &rlp, peer,
GET_RECEIPTS_PACKET => ChainSync::return_rlp(io, &rlp, peer,
ChainSync::return_receipts,
|e| format!("Error sending receipts: {:?}", e)),

GET_NODE_DATA_PACKET => self.return_rlp(io, &rlp, peer,
GET_NODE_DATA_PACKET => ChainSync::return_rlp(io, &rlp, peer,
ChainSync::return_node_data,
|e| format!("Error sending nodes: {:?}", e)),

_ => {
sync.write().unwrap().on_packet(io, peer, packet_id, data);
Ok(())
}
};
result.unwrap_or_else(|e| {
debug!(target:"sync", "{} -> Malformed packet {} : {}", peer, packet_id, e);
})
}

pub fn on_packet(&mut self, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) {
if packet_id != STATUS_PACKET && !self.peers.contains_key(&peer) {
debug!(target:"sync", "Unexpected packet from unregistered peer: {}:{}", peer, io.peer_info(peer));
return;
}
let rlp = UntrustedRlp::new(data);
let result = match packet_id {
STATUS_PACKET => self.on_peer_status(io, peer, &rlp),
TRANSACTIONS_PACKET => self.on_peer_transactions(io, peer, &rlp),
BLOCK_HEADERS_PACKET => self.on_peer_block_headers(io, peer, &rlp),
BLOCK_BODIES_PACKET => self.on_peer_block_bodies(io, peer, &rlp),
NEW_BLOCK_PACKET => self.on_peer_new_block(io, peer, &rlp),
NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp),
_ => {
debug!(target: "sync", "Unknown packet {}", packet_id);
Ok(())
Expand Down Expand Up @@ -1424,7 +1435,7 @@ mod tests {
fn return_receipts() {
let mut client = TestBlockChainClient::new();
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(H256::new(), &client);
let sync = dummy_sync_with_peer(H256::new(), &client);
let mut io = TestIo::new(&mut client, &mut queue, None);

let mut receipt_list = RlpStream::new_list(4);
Expand All @@ -1445,7 +1456,7 @@ mod tests {
assert_eq!(603, rlp_result.unwrap().1.out().len());

io.sender = Some(2usize);
sync.on_packet(&mut io, 0usize, super::GET_RECEIPTS_PACKET, &receipts_request);
ChainSync::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, super::GET_RECEIPTS_PACKET, &receipts_request);
assert_eq!(1, io.queue.len());
}

Expand Down Expand Up @@ -1517,7 +1528,7 @@ mod tests {
fn return_nodes() {
let mut client = TestBlockChainClient::new();
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(H256::new(), &client);
let sync = dummy_sync_with_peer(H256::new(), &client);
let mut io = TestIo::new(&mut client, &mut queue, None);

let mut node_list = RlpStream::new_list(3);
Expand All @@ -1537,7 +1548,8 @@ mod tests {
assert_eq!(34, rlp_result.unwrap().1.out().len());

io.sender = Some(2usize);
sync.on_packet(&mut io, 0usize, super::GET_NODE_DATA_PACKET, &node_request);

ChainSync::dispatch_packet(&RwLock::new(sync), &mut io, 0usize, super::GET_NODE_DATA_PACKET, &node_request);
assert_eq!(1, io.queue.len());
}

Expand Down
2 changes: 1 addition & 1 deletion sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl NetworkProtocolHandler<SyncMessage> for EthSync {
}

fn read(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId, packet_id: u8, data: &[u8]) {
self.sync.write().unwrap().on_packet(&mut NetSyncIo::new(io, self.chain.deref()) , *peer, packet_id, data);
ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, self.chain.deref()) , *peer, packet_id, data);
}

fn connected(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId) {
Expand Down
6 changes: 3 additions & 3 deletions sync/src/tests/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ fn status_after_sync() {
net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle);
net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle);
net.sync();
let status = net.peer(0).sync.status();
let status = net.peer(0).sync.read().unwrap().status();
assert_eq!(status.state, SyncState::Idle);
}

Expand Down Expand Up @@ -107,14 +107,14 @@ fn restart() {
assert!(net.peer(0).chain.chain_info().best_block_number > 100);
net.restart_peer(0);

let status = net.peer(0).sync.status();
let status = net.peer(0).sync.read().unwrap().status();
assert_eq!(status.state, SyncState::ChainHead);
}

#[test]
fn status_empty() {
let net = TestNet::new(2);
assert_eq!(net.peer(0).sync.status().state, SyncState::Idle);
assert_eq!(net.peer(0).sync.read().unwrap().status().state, SyncState::Idle);
}

#[test]
Expand Down
16 changes: 8 additions & 8 deletions sync/src/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub struct TestPacket {

pub struct TestPeer {
pub chain: TestBlockChainClient,
pub sync: ChainSync,
pub sync: RwLock<ChainSync>,
pub queue: VecDeque<TestPacket>,
}

Expand All @@ -97,7 +97,7 @@ impl TestNet {
let chain = TestBlockChainClient::new();
let sync = ChainSync::new(SyncConfig::default(), &chain);
net.peers.push(TestPeer {
sync: sync,
sync: RwLock::new(sync),
chain: chain,
queue: VecDeque::new(),
});
Expand All @@ -118,7 +118,7 @@ impl TestNet {
for client in 0..self.peers.len() {
if peer != client {
let mut p = self.peers.get_mut(peer).unwrap();
p.sync.on_peer_connected(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(client as PeerId)), client as PeerId);
p.sync.write().unwrap().on_peer_connected(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(client as PeerId)), client as PeerId);
}
}
}
Expand All @@ -129,22 +129,22 @@ impl TestNet {
if let Some(packet) = self.peers[peer].queue.pop_front() {
let mut p = self.peers.get_mut(packet.recipient).unwrap();
trace!("--- {} -> {} ---", peer, packet.recipient);
p.sync.on_packet(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId)), peer as PeerId, packet.packet_id, &packet.data);
ChainSync::dispatch_packet(&p.sync, &mut TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId)), peer as PeerId, packet.packet_id, &packet.data);
trace!("----------------");
}
let mut p = self.peers.get_mut(peer).unwrap();
p.sync.maintain_sync(&mut TestIo::new(&mut p.chain, &mut p.queue, None));
p.sync.write().unwrap().maintain_sync(&mut TestIo::new(&mut p.chain, &mut p.queue, None));
}
}

pub fn sync_step_peer(&mut self, peer_num: usize) {
let mut peer = self.peer_mut(peer_num);
peer.sync.maintain_sync(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None));
peer.sync.write().unwrap().maintain_sync(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None));
}

pub fn restart_peer(&mut self, i: usize) {
let peer = self.peer_mut(i);
peer.sync.restart(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None));
peer.sync.write().unwrap().restart(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None));
}

pub fn sync(&mut self) -> u32 {
Expand Down Expand Up @@ -173,6 +173,6 @@ impl TestNet {

pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) {
let mut peer = self.peer_mut(peer_id);
peer.sync.chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[]);
peer.sync.write().unwrap().chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[]);
}
}

0 comments on commit 1ffe0c1

Please sign in to comment.