Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Major sync <-> client interactions refactoring (#1572)
Browse files Browse the repository at this point in the history
* chain notify trait

* replaced network service with io service

* fix ethcore crate warnings

* refactored network service without generic

* ethcore fix

* ethsync refactoring

* proper linking of notify

* manage network interface

* rpc crate rebinding

* full rewire

* sync internal io service

* fix deadlock

* fix warnings and removed async io

* sync imported message propagation

* fix rpc warnings

* binart warnings

* test fixes

* rpc mocks and tests

* fix util doctest

* fix message name and removed empty notifier

* pointers mess & dark mode fixed

* fixed sync doctest

* added few warnings

* fix review

* new convention match

* fix error unwraps

* doctest fix
  • Loading branch information
NikVolf authored and arkpar committed Jul 11, 2016
1 parent 2382d77 commit d3695d0
Show file tree
Hide file tree
Showing 23 changed files with 465 additions and 414 deletions.
6 changes: 3 additions & 3 deletions ethcore/src/block_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ struct VerifyingBlock {
struct QueueSignal {
deleting: Arc<AtomicBool>,
signalled: AtomicBool,
message_channel: IoChannel<NetSyncMessage>,
message_channel: IoChannel<ClientIoMessage>,
}

impl QueueSignal {
Expand All @@ -116,7 +116,7 @@ impl QueueSignal {
}

if self.signalled.compare_and_swap(false, true, AtomicOrdering::Relaxed) == false {
if let Err(e) = self.message_channel.send(UserMessage(SyncMessage::BlockVerified)) {
if let Err(e) = self.message_channel.send(ClientIoMessage::BlockVerified) {
debug!("Error sending BlockVerified message: {:?}", e);
}
}
Expand All @@ -137,7 +137,7 @@ struct Verification {

impl BlockQueue {
/// Creates a new queue instance.
pub fn new(config: BlockQueueConfig, engine: Arc<Box<Engine>>, message_channel: IoChannel<NetSyncMessage>) -> BlockQueue {
pub fn new(config: BlockQueueConfig, engine: Arc<Box<Engine>>, message_channel: IoChannel<ClientIoMessage>) -> BlockQueue {
let verification = Arc::new(Verification {
unverified: Mutex::new(VecDeque::new()),
verified: Mutex::new(VecDeque::new()),
Expand Down
40 changes: 40 additions & 0 deletions ethcore/src/client/chain_notify.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.

// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use util::numbers::*;

/// Represents what has to be handled by actor listening to chain events
pub trait ChainNotify : Send + Sync {
/// fires when chain has new blocks
fn new_blocks(&self,
_imported: Vec<H256>,
_invalid: Vec<H256>,
_enacted: Vec<H256>,
_retracted: Vec<H256>,
_sealed: Vec<H256>) {
// does nothing by default
}

/// fires when chain achieves active mode
fn start(&self) {
// does nothing by default
}

/// fires when chain achieves passive mode
fn stop(&self) {
// does nothing by default
}
}
138 changes: 80 additions & 58 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use std::time::Instant;
// util
use util::numbers::*;
use util::panics::*;
use util::network::*;
use util::io::*;
use util::rlp;
use util::sha3::*;
Expand All @@ -47,7 +46,7 @@ use state::State;
use spec::Spec;
use engine::Engine;
use views::HeaderView;
use service::{NetSyncMessage, SyncMessage};
use service::ClientIoMessage;
use env_info::LastHashes;
use verification;
use verification::{PreverifiedBlock, Verifier};
Expand All @@ -60,7 +59,7 @@ use block_queue::{BlockQueue, BlockQueueInfo};
use blockchain::{BlockChain, BlockProvider, TreeRoute, ImportRoute};
use client::{BlockID, TransactionID, UncleID, TraceId, ClientConfig,
DatabaseCompactionProfile, BlockChainClient, MiningBlockChainClient,
TraceFilter, CallAnalytics, BlockImportError, Mode};
TraceFilter, CallAnalytics, BlockImportError, Mode, ChainNotify};
use client::Error as ClientError;
use env_info::EnvInfo;
use executive::{Executive, Executed, TransactOptions, contract_address};
Expand Down Expand Up @@ -141,7 +140,8 @@ pub struct Client {
miner: Arc<Miner>,
sleep_state: Mutex<SleepState>,
liveness: AtomicBool,
io_channel: IoChannel<NetSyncMessage>,
io_channel: IoChannel<ClientIoMessage>,
notify: RwLock<Option<Weak<ChainNotify>>>,
queue_transactions: AtomicUsize,
previous_enode: Mutex<Option<String>>,
}
Expand Down Expand Up @@ -178,7 +178,7 @@ impl Client {
spec: Spec,
path: &Path,
miner: Arc<Miner>,
message_channel: IoChannel<NetSyncMessage>
message_channel: IoChannel<ClientIoMessage>,
) -> Result<Arc<Client>, ClientError> {
let path = get_db_path(path, config.pruning, spec.genesis_header().hash());
let gb = spec.genesis_block();
Expand Down Expand Up @@ -228,12 +228,24 @@ impl Client {
trie_factory: TrieFactory::new(config.trie_spec),
miner: miner,
io_channel: message_channel,
notify: RwLock::new(None),
queue_transactions: AtomicUsize::new(0),
previous_enode: Mutex::new(None),
};
Ok(Arc::new(client))
}

/// Sets the actor to be notified on certain events
pub fn set_notify(&self, target: &Arc<ChainNotify>) {
let mut write_lock = self.notify.unwrapped_write();
*write_lock = Some(Arc::downgrade(target));
}

fn notify(&self) -> Option<Arc<ChainNotify>> {
let read_lock = self.notify.unwrapped_read();
read_lock.as_ref().and_then(|weak| weak.upgrade())
}

/// Flush the block import queue.
pub fn flush_queue(&self) {
self.block_queue.flush();
Expand Down Expand Up @@ -327,52 +339,54 @@ impl Client {
}

/// This is triggered by a message coming from a block queue when the block is ready for insertion
pub fn import_verified_blocks(&self, io: &IoChannel<NetSyncMessage>) -> usize {
pub fn import_verified_blocks(&self, io: &IoChannel<ClientIoMessage>) -> usize {
let max_blocks_to_import = 64;
let (imported_blocks, import_results, invalid_blocks, original_best, imported) = {
let mut imported_blocks = Vec::with_capacity(max_blocks_to_import);
let mut invalid_blocks = HashSet::new();
let mut import_results = Vec::with_capacity(max_blocks_to_import);

let mut imported_blocks = Vec::with_capacity(max_blocks_to_import);
let mut invalid_blocks = HashSet::new();
let mut import_results = Vec::with_capacity(max_blocks_to_import);

let _import_lock = self.import_lock.lock();
let _timer = PerfTimer::new("import_verified_blocks");
let blocks = self.block_queue.drain(max_blocks_to_import);
let _import_lock = self.import_lock.lock();
let _timer = PerfTimer::new("import_verified_blocks");
let blocks = self.block_queue.drain(max_blocks_to_import);

let original_best = self.chain_info().best_block_hash;
let original_best = self.chain_info().best_block_hash;

for block in blocks {
let header = &block.header;
for block in blocks {
let header = &block.header;

if invalid_blocks.contains(&header.parent_hash) {
invalid_blocks.insert(header.hash());
continue;
}
let closed_block = self.check_and_close_block(&block);
if let Err(_) = closed_block {
invalid_blocks.insert(header.hash());
continue;
}
let closed_block = closed_block.unwrap();
imported_blocks.push(header.hash());
if invalid_blocks.contains(&header.parent_hash) {
invalid_blocks.insert(header.hash());
continue;
}
let closed_block = self.check_and_close_block(&block);
if let Err(_) = closed_block {
invalid_blocks.insert(header.hash());
continue;
}
let closed_block = closed_block.unwrap();
imported_blocks.push(header.hash());

let route = self.commit_block(closed_block, &header.hash(), &block.bytes);
import_results.push(route);
let route = self.commit_block(closed_block, &header.hash(), &block.bytes);
import_results.push(route);

self.report.unwrapped_write().accrue_block(&block);
trace!(target: "client", "Imported #{} ({})", header.number(), header.hash());
}
self.report.unwrapped_write().accrue_block(&block);
trace!(target: "client", "Imported #{} ({})", header.number(), header.hash());
}

let imported = imported_blocks.len();
let invalid_blocks = invalid_blocks.into_iter().collect::<Vec<H256>>();
let imported = imported_blocks.len();
let invalid_blocks = invalid_blocks.into_iter().collect::<Vec<H256>>();

{
if !invalid_blocks.is_empty() {
self.block_queue.mark_as_bad(&invalid_blocks);
}
if !imported_blocks.is_empty() {
self.block_queue.mark_as_good(&imported_blocks);
{
if !invalid_blocks.is_empty() {
self.block_queue.mark_as_bad(&invalid_blocks);
}
if !imported_blocks.is_empty() {
self.block_queue.mark_as_good(&imported_blocks);
}
}
}
(imported_blocks, import_results, invalid_blocks, original_best, imported)
};

{
if !imported_blocks.is_empty() && self.block_queue.queue_info().is_empty() {
Expand All @@ -382,13 +396,15 @@ impl Client {
self.miner.chain_new_blocks(self, &imported_blocks, &invalid_blocks, &enacted, &retracted);
}

io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks {
imported: imported_blocks,
invalid: invalid_blocks,
enacted: enacted,
retracted: retracted,
sealed: Vec::new(),
})).unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e));
if let Some(notify) = self.notify() {
notify.new_blocks(
imported_blocks,
invalid_blocks,
enacted,
retracted,
Vec::new(),
);
}
}
}

Expand Down Expand Up @@ -566,7 +582,9 @@ impl Client {
fn wake_up(&self) {
if !self.liveness.load(AtomicOrdering::Relaxed) {
self.liveness.store(true, AtomicOrdering::Relaxed);
self.io_channel.send(NetworkIoMessage::User(SyncMessage::StartNetwork)).unwrap();
if let Some(notify) = self.notify() {
notify.start();
}
trace!(target: "mode", "wake_up: Waking.");
}
}
Expand All @@ -576,7 +594,9 @@ impl Client {
// only sleep if the import queue is mostly empty.
if self.queue_info().total_queue_size() <= MAX_QUEUE_SIZE_TO_SLEEP_ON {
self.liveness.store(false, AtomicOrdering::Relaxed);
self.io_channel.send(NetworkIoMessage::User(SyncMessage::StopNetwork)).unwrap();
if let Some(notify) = self.notify() {
notify.stop();
}
trace!(target: "mode", "sleep: Sleeping.");
} else {
trace!(target: "mode", "sleep: Cannot sleep - syncing ongoing.");
Expand Down Expand Up @@ -901,7 +921,7 @@ impl BlockChainClient for Client {
debug!("Ignoring {} transactions: queue is full", transactions.len());
} else {
let len = transactions.len();
match self.io_channel.send(NetworkIoMessage::User(SyncMessage::NewTransactions(transactions))) {
match self.io_channel.send(ClientIoMessage::NewTransactions(transactions)) {
Ok(_) => {
self.queue_transactions.fetch_add(len, AtomicOrdering::SeqCst);
}
Expand Down Expand Up @@ -969,13 +989,15 @@ impl MiningBlockChainClient for Client {
let (enacted, retracted) = self.calculate_enacted_retracted(&[route]);
self.miner.chain_new_blocks(self, &[h.clone()], &[], &enacted, &retracted);

self.io_channel.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks {
imported: vec![h.clone()],
invalid: vec![],
enacted: enacted,
retracted: retracted,
sealed: vec![h.clone()],
})).unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e));
if let Some(notify) = self.notify() {
notify.new_blocks(
vec![h.clone()],
vec![],
enacted,
retracted,
vec![h.clone()],
);
}
}

if self.chain_info().best_block_hash != original_best {
Expand Down
2 changes: 2 additions & 0 deletions ethcore/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod config;
mod error;
mod test_client;
mod trace;
mod chain_notify;

pub use self::client::*;
pub use self::config::{Mode, ClientConfig, DatabaseCompactionProfile, BlockQueueConfig, BlockChainConfig, Switch, VMType};
Expand All @@ -29,6 +30,7 @@ pub use self::test_client::{TestBlockChainClient, EachBlockWith};
pub use types::trace_filter::Filter as TraceFilter;
pub use executive::{Executed, Executive, TransactOptions};
pub use env_info::{LastHashes, EnvInfo};
pub use self::chain_notify::ChainNotify;

use util::bytes::Bytes;
use util::hash::{Address, H256, H2048};
Expand Down
Loading

0 comments on commit d3695d0

Please sign in to comment.