diff --git a/ethcore/private-tx/src/lib.rs b/ethcore/private-tx/src/lib.rs index 5f7b5cde3aa..4b9f6d33e2c 100644 --- a/ethcore/private-tx/src/lib.rs +++ b/ethcore/private-tx/src/lib.rs @@ -69,7 +69,6 @@ pub use error::{Error, ErrorKind}; use std::sync::{Arc, Weak}; use std::collections::{HashMap, HashSet, BTreeMap}; -use std::time::Duration; use ethereum_types::{H128, H256, U256, Address}; use hash::keccak; use rlp::*; @@ -82,7 +81,7 @@ use ethcore::executed::{Executed}; use transaction::{SignedTransaction, Transaction, Action, UnverifiedTransaction}; use ethcore::{contract_address as ethcore_contract_address}; use ethcore::client::{ - Client, ChainNotify, ChainRoute, ChainMessageType, ClientIoMessage, BlockId, + Client, ChainNotify, NewBlocks, ChainMessageType, ClientIoMessage, BlockId, CallContract, Call, BlockInfo }; use ethcore::account_provider::AccountProvider; @@ -733,12 +732,11 @@ fn find_account_password(passwords: &Vec, account_provider: &AccountPr } impl ChainNotify for Provider { - fn new_blocks(&self, imported: Vec, _invalid: Vec, _route: ChainRoute, _sealed: Vec, _proposed: Vec, _duration: Duration) { - if !imported.is_empty() { - trace!(target: "privatetx", "New blocks imported, try to prune the queue"); - if let Err(err) = self.process_verification_queue() { - warn!(target: "privatetx", "Cannot prune private transactions queue. error: {:?}", err); - } + fn new_blocks(&self, new_blocks: NewBlocks) { + if new_blocks.imported.is_empty() || new_blocks.has_more_blocks_to_import { return } + trace!(target: "privatetx", "New blocks imported, try to prune the queue"); + if let Err(err) = self.process_verification_queue() { + warn!(target: "privatetx", "Cannot prune private transactions queue. error: {:?}", err); } } } diff --git a/ethcore/src/client/chain_notify.rs b/ethcore/src/client/chain_notify.rs index 3d576ae12aa..4b599ad171a 100644 --- a/ethcore/src/client/chain_notify.rs +++ b/ethcore/src/client/chain_notify.rs @@ -114,19 +114,51 @@ impl ChainRoute { } } +/// Used by `ChainNotify` `new_blocks()` +pub struct NewBlocks { + /// Imported blocks + pub imported: Vec, + /// Invalid blocks + pub invalid: Vec, + /// Route + pub route: ChainRoute, + /// Sealed + pub sealed: Vec, + /// Block bytes. + pub proposed: Vec, + /// Duration + pub duration: Duration, + /// Has more blocks to import + pub has_more_blocks_to_import: bool, +} + +impl NewBlocks { + /// Constructor + pub fn new ( + imported: Vec, + invalid: Vec, + route: ChainRoute, + sealed: Vec, + proposed: Vec, + duration: Duration, + has_more_blocks_to_import: bool, + ) -> NewBlocks { + NewBlocks { + imported, + invalid, + route, + sealed, + proposed, + duration, + has_more_blocks_to_import, + } + } +} + /// Represents what has to be handled by actor listening to chain events pub trait ChainNotify : Send + Sync { /// fires when chain has new blocks. - fn new_blocks( - &self, - _imported: Vec, - _invalid: Vec, - _route: ChainRoute, - _sealed: Vec, - // Block bytes. - _proposed: Vec, - _duration: Duration, - ) { + fn new_blocks( &self, _new_blocks: NewBlocks) { // does nothing by default } diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 00dc9cab53e..1b49d009225 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -44,7 +44,7 @@ use client::{ use client::{ BlockId, TransactionId, UncleId, TraceId, ClientConfig, BlockChainClient, TraceFilter, CallAnalytics, Mode, - ChainNotify, ChainRoute, PruningInfo, ProvingBlockChainClient, EngineInfo, ChainMessageType, + ChainNotify, NewBlocks, ChainRoute, PruningInfo, ProvingBlockChainClient, EngineInfo, ChainMessageType, IoClient, BadBlocks, }; use client::bad_blocks; @@ -268,7 +268,7 @@ impl Importer { } let max_blocks_to_import = client.config.max_round_blocks_to_import; - let (imported_blocks, import_results, invalid_blocks, imported, proposed_blocks, duration, is_empty) = { + let (imported_blocks, import_results, invalid_blocks, imported, proposed_blocks, duration, has_more_blocks_to_import) = { let mut imported_blocks = Vec::with_capacity(max_blocks_to_import); let mut invalid_blocks = HashSet::new(); let mut proposed_blocks = Vec::with_capacity(max_blocks_to_import); @@ -322,26 +322,29 @@ impl Importer { if !invalid_blocks.is_empty() { self.block_queue.mark_as_bad(&invalid_blocks); } - let is_empty = self.block_queue.mark_as_good(&imported_blocks); - (imported_blocks, import_results, invalid_blocks, imported, proposed_blocks, start.elapsed(), is_empty) + let has_more_blocks_to_import = !self.block_queue.mark_as_good(&imported_blocks); + (imported_blocks, import_results, invalid_blocks, imported, proposed_blocks, start.elapsed(), has_more_blocks_to_import) }; { - if !imported_blocks.is_empty() && is_empty { + if !imported_blocks.is_empty() { let route = ChainRoute::from(import_results.as_ref()); - if is_empty { + if !has_more_blocks_to_import { self.miner.chain_new_blocks(client, &imported_blocks, &invalid_blocks, route.enacted(), route.retracted(), false); } client.notify(|notify| { notify.new_blocks( - imported_blocks.clone(), - invalid_blocks.clone(), - route.clone(), - Vec::new(), - proposed_blocks.clone(), - duration, + NewBlocks::new( + imported_blocks.clone(), + invalid_blocks.clone(), + route.clone(), + Vec::new(), + proposed_blocks.clone(), + duration, + has_more_blocks_to_import, + ) ); }); } @@ -2342,12 +2345,15 @@ impl ImportSealedBlock for Client { ); self.notify(|notify| { notify.new_blocks( - vec![hash], - vec![], - route.clone(), - vec![hash], - vec![], - start.elapsed(), + NewBlocks::new( + vec![hash], + vec![], + route.clone(), + vec![hash], + vec![], + start.elapsed(), + false + ) ); }); self.db.read().key_value().flush().expect("DB flush failed."); @@ -2360,12 +2366,15 @@ impl BroadcastProposalBlock for Client { const DURATION_ZERO: Duration = Duration::from_millis(0); self.notify(|notify| { notify.new_blocks( - vec![], - vec![], - ChainRoute::default(), - vec![], - vec![block.rlp_bytes()], - DURATION_ZERO, + NewBlocks::new( + vec![], + vec![], + ChainRoute::default(), + vec![], + vec![block.rlp_bytes()], + DURATION_ZERO, + false + ) ); }); } diff --git a/ethcore/src/client/mod.rs b/ethcore/src/client/mod.rs index 4148bb58676..bbbbbea8b33 100644 --- a/ethcore/src/client/mod.rs +++ b/ethcore/src/client/mod.rs @@ -34,7 +34,7 @@ pub use self::evm_test_client::{EvmTestClient, EvmTestError, TransactResult}; pub use self::io_message::ClientIoMessage; #[cfg(any(test, feature = "test-helpers"))] pub use self::test_client::{TestBlockChainClient, EachBlockWith}; -pub use self::chain_notify::{ChainNotify, ChainRoute, ChainRouteType, ChainMessageType}; +pub use self::chain_notify::{ChainNotify, NewBlocks, ChainRoute, ChainRouteType, ChainMessageType}; pub use self::traits::{ Nonce, Balance, ChainInfo, BlockInfo, ReopenBlock, PrepareOpenBlock, CallContract, TransactionInfo, RegistryInfo, ScheduleInfo, ImportSealedBlock, BroadcastProposalBlock, ImportBlock, StateOrBlock, StateClient, Call, EngineInfo, AccountData, BlockChain, BlockProducer, SealedBlockImporter, BadBlocks, diff --git a/ethcore/src/snapshot/watcher.rs b/ethcore/src/snapshot/watcher.rs index 68056796276..0eee7133b1c 100644 --- a/ethcore/src/snapshot/watcher.rs +++ b/ethcore/src/snapshot/watcher.rs @@ -17,14 +17,13 @@ //! Watcher for snapshot-related chain events. use parking_lot::Mutex; -use client::{BlockInfo, Client, ChainNotify, ChainRoute, ClientIoMessage}; +use client::{BlockInfo, Client, ChainNotify, NewBlocks, ClientIoMessage}; use ids::BlockId; use io::IoChannel; use ethereum_types::H256; -use bytes::Bytes; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; // helper trait for transforming hashes to numbers and checking if syncing. trait Oracle: Send + Sync { @@ -99,20 +98,12 @@ impl Watcher { } impl ChainNotify for Watcher { - fn new_blocks( - &self, - imported: Vec, - _: Vec, - _: ChainRoute, - _: Vec, - _: Vec, - _duration: Duration) - { - if self.oracle.is_major_importing() { return } + fn new_blocks(&self, new_blocks: NewBlocks) { + if self.oracle.is_major_importing() || new_blocks.has_more_blocks_to_import { return } - trace!(target: "snapshot_watcher", "{} imported", imported.len()); + trace!(target: "snapshot_watcher", "{} imported", new_blocks.imported.len()); - let highest = imported.into_iter() + let highest = new_blocks.imported.into_iter() .filter_map(|h| self.oracle.to_number(h)) .filter(|&num| num >= self.period + self.history) .map(|num| num - self.history) @@ -130,7 +121,7 @@ impl ChainNotify for Watcher { mod tests { use super::{Broadcast, Oracle, Watcher}; - use client::{ChainNotify, ChainRoute}; + use client::{ChainNotify, NewBlocks, ChainRoute}; use ethereum_types::{H256, U256}; @@ -170,14 +161,15 @@ mod tests { history: history, }; - watcher.new_blocks( + watcher.new_blocks(NewBlocks::new( hashes, vec![], ChainRoute::default(), vec![], vec![], DURATION_ZERO, - ); + false + )); } // helper diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index 7474d79b37f..7d7a1afc701 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -29,7 +29,7 @@ use types::pruning_info::PruningInfo; use ethereum_types::{H256, H512, U256}; use io::{TimerToken}; use ethcore::ethstore::ethkey::Secret; -use ethcore::client::{BlockChainClient, ChainNotify, ChainRoute, ChainMessageType}; +use ethcore::client::{BlockChainClient, ChainNotify, NewBlocks, ChainMessageType}; use ethcore::snapshot::SnapshotService; use ethcore::header::BlockNumber; use sync_io::NetSyncIo; @@ -498,14 +498,9 @@ impl ChainNotify for EthSync { } } - fn new_blocks(&self, - imported: Vec, - invalid: Vec, - route: ChainRoute, - sealed: Vec, - proposed: Vec, - _duration: Duration) + fn new_blocks(&self, new_blocks: NewBlocks) { + if new_blocks.has_more_blocks_to_import { return } use light::net::Announcement; self.network.with_context(self.subprotocol_name, |context| { @@ -513,12 +508,12 @@ impl ChainNotify for EthSync { &self.eth_handler.overlay); self.eth_handler.sync.write().chain_new_blocks( &mut sync_io, - &imported, - &invalid, - route.enacted(), - route.retracted(), - &sealed, - &proposed); + &new_blocks.imported, + &new_blocks.invalid, + new_blocks.route.enacted(), + new_blocks.route.retracted(), + &new_blocks.sealed, + &new_blocks.proposed); }); self.network.with_context(self.light_subprotocol_name, |context| { diff --git a/ethcore/sync/src/tests/helpers.rs b/ethcore/sync/src/tests/helpers.rs index 3eac91a0db1..cc4a8ba08d4 100644 --- a/ethcore/sync/src/tests/helpers.rs +++ b/ethcore/sync/src/tests/helpers.rs @@ -16,14 +16,13 @@ use std::collections::{VecDeque, HashSet, HashMap}; use std::sync::Arc; -use std::time::Duration; use ethereum_types::H256; use parking_lot::{RwLock, Mutex}; use bytes::Bytes; use network::{self, PeerId, ProtocolId, PacketId, SessionInfo}; use tests::snapshot::*; use ethcore::client::{TestBlockChainClient, BlockChainClient, Client as EthcoreClient, - ClientConfig, ChainNotify, ChainRoute, ChainMessageType, ClientIoMessage}; + ClientConfig, ChainNotify, NewBlocks, ChainMessageType, ClientIoMessage}; use ethcore::header::BlockNumber; use ethcore::snapshot::SnapshotService; use ethcore::spec::Spec; @@ -535,23 +534,18 @@ impl IoHandler for TestIoHandler { } impl ChainNotify for EthPeer { - fn new_blocks(&self, - imported: Vec, - invalid: Vec, - route: ChainRoute, - sealed: Vec, - proposed: Vec, - _duration: Duration) + fn new_blocks(&self, new_blocks: NewBlocks) { - let (enacted, retracted) = route.into_enacted_retracted(); + if new_blocks.has_more_blocks_to_import { return } + let (enacted, retracted) = new_blocks.route.into_enacted_retracted(); self.new_blocks_queue.write().push_back(NewBlockMessage { - imported, - invalid, + imported: new_blocks.imported, + invalid: new_blocks.invalid, enacted, retracted, - sealed, - proposed, + sealed: new_blocks.sealed, + proposed: new_blocks.proposed, }); } diff --git a/parity/informant.rs b/parity/informant.rs index 8cc37813ce2..5209a855156 100644 --- a/parity/informant.rs +++ b/parity/informant.rs @@ -25,7 +25,7 @@ use std::time::{Instant, Duration}; use atty; use ethcore::client::{ BlockId, BlockChainClient, ChainInfo, BlockInfo, BlockChainInfo, - BlockQueueInfo, ChainNotify, ChainRoute, ClientReport, Client, ClientIoMessage + BlockQueueInfo, ChainNotify, NewBlocks, ClientReport, Client, ClientIoMessage }; use ethcore::header::BlockNumber; use ethcore::snapshot::{RestorationStatus, SnapshotService as SS}; @@ -38,7 +38,6 @@ use number_prefix::{binary_prefix, Standalone, Prefixed}; use parity_rpc::is_major_importing_or_waiting; use parity_rpc::informant::RpcStats; use ethereum_types::H256; -use bytes::Bytes; use parking_lot::{RwLock, Mutex}; /// Format byte counts to standard denominations. @@ -365,29 +364,30 @@ impl Informant { } impl ChainNotify for Informant { - fn new_blocks(&self, imported: Vec, _invalid: Vec, _route: ChainRoute, _sealed: Vec, _proposed: Vec, duration: Duration) { + fn new_blocks(&self, new_blocks: NewBlocks) { + if new_blocks.has_more_blocks_to_import { return } let mut last_import = self.last_import.lock(); let client = &self.target.client; let importing = self.target.is_major_importing(); let ripe = Instant::now() > *last_import + Duration::from_secs(1) && !importing; - let txs_imported = imported.iter() - .take(imported.len().saturating_sub(if ripe { 1 } else { 0 })) + let txs_imported = new_blocks.imported.iter() + .take(new_blocks.imported.len().saturating_sub(if ripe { 1 } else { 0 })) .filter_map(|h| client.block(BlockId::Hash(*h))) .map(|b| b.transactions_count()) .sum(); if ripe { - if let Some(block) = imported.last().and_then(|h| client.block(BlockId::Hash(*h))) { + if let Some(block) = new_blocks.imported.last().and_then(|h| client.block(BlockId::Hash(*h))) { let header_view = block.header_view(); let size = block.rlp().as_raw().len(); - let (skipped, skipped_txs) = (self.skipped.load(AtomicOrdering::Relaxed) + imported.len() - 1, self.skipped_txs.load(AtomicOrdering::Relaxed) + txs_imported); + let (skipped, skipped_txs) = (self.skipped.load(AtomicOrdering::Relaxed) + new_blocks.imported.len() - 1, self.skipped_txs.load(AtomicOrdering::Relaxed) + txs_imported); info!(target: "import", "Imported {} {} ({} txs, {} Mgas, {} ms, {} KiB){}", Colour::White.bold().paint(format!("#{}", header_view.number())), Colour::White.bold().paint(format!("{}", header_view.hash())), Colour::Yellow.bold().paint(format!("{}", block.transactions_count())), Colour::Yellow.bold().paint(format!("{:.2}", header_view.gas_used().low_u64() as f32 / 1000000f32)), - Colour::Purple.bold().paint(format!("{}", duration.as_milliseconds())), + Colour::Purple.bold().paint(format!("{}", new_blocks.duration.as_milliseconds())), Colour::Blue.bold().paint(format!("{:.2}", size as f32 / 1024f32)), if skipped > 0 { format!(" + another {} block(s) containing {} tx(s)", @@ -403,7 +403,7 @@ impl ChainNotify for Informant { *last_import = Instant::now(); } } else { - self.skipped.fetch_add(imported.len(), AtomicOrdering::Relaxed); + self.skipped.fetch_add(new_blocks.imported.len(), AtomicOrdering::Relaxed); self.skipped_txs.fetch_add(txs_imported, AtomicOrdering::Relaxed); } } diff --git a/rpc/src/v1/impls/eth_pubsub.rs b/rpc/src/v1/impls/eth_pubsub.rs index 989a43fa5bf..0e39f4e314f 100644 --- a/rpc/src/v1/impls/eth_pubsub.rs +++ b/rpc/src/v1/impls/eth_pubsub.rs @@ -18,7 +18,6 @@ use std::sync::{Arc, Weak}; use std::collections::BTreeMap; -use std::time::Duration; use jsonrpc_core::{BoxFuture, Result, Error}; use jsonrpc_core::futures::{self, Future, IntoFuture}; @@ -34,14 +33,13 @@ use v1::types::{pubsub, RichHeader, Log}; use ethcore::encoded; use ethcore::filter::Filter as EthFilter; -use ethcore::client::{BlockChainClient, ChainNotify, ChainRoute, ChainRouteType, BlockId}; +use ethcore::client::{BlockChainClient, ChainNotify, NewBlocks, ChainRouteType, BlockId}; use sync::LightSync; use light::cache::Cache; use light::on_demand::OnDemand; use light::client::{LightChainClient, LightChainNotify}; use parity_runtime::Executor; use ethereum_types::H256; -use bytes::Bytes; use parking_lot::{RwLock, Mutex}; type Client = Sink; @@ -220,18 +218,10 @@ impl LightChainNotify for ChainNotificationHandler { } impl ChainNotify for ChainNotificationHandler { - fn new_blocks( - &self, - _imported: Vec, - _invalid: Vec, - route: ChainRoute, - _sealed: Vec, - // Block bytes. - _proposed: Vec, - _duration: Duration, - ) { + fn new_blocks(&self, new_blocks: NewBlocks) { + if self.heads_subscribers.read().is_empty() && self.logs_subscribers.read().is_empty() { return } const EXTRA_INFO_PROOF: &'static str = "Object exists in in blockchain (fetched earlier), extra_info is always available if object exists; qed"; - let headers = route.route() + let headers = new_blocks.route.route() .iter() .filter_map(|&(hash, ref typ)| { match typ { @@ -249,7 +239,7 @@ impl ChainNotify for ChainNotificationHandler { self.notify_heads(&headers); // We notify logs enacting and retracting as the order in route. - self.notify_logs(route.route(), |filter, ex| { + self.notify_logs(new_blocks.route.route(), |filter, ex| { match ex { &ChainRouteType::Enacted => Ok(self.client.logs(filter).unwrap_or_default().into_iter().map(Into::into).collect()), diff --git a/rpc/src/v1/tests/mocked/eth_pubsub.rs b/rpc/src/v1/tests/mocked/eth_pubsub.rs index e363b91356f..2eb1295d835 100644 --- a/rpc/src/v1/tests/mocked/eth_pubsub.rs +++ b/rpc/src/v1/tests/mocked/eth_pubsub.rs @@ -24,7 +24,7 @@ use std::time::Duration; use v1::{EthPubSub, EthPubSubClient, Metadata}; -use ethcore::client::{TestBlockChainClient, EachBlockWith, ChainNotify, ChainRoute, ChainRouteType}; +use ethcore::client::{TestBlockChainClient, EachBlockWith, ChainNotify, NewBlocks, ChainRoute, ChainRouteType}; use parity_runtime::Runtime; const DURATION_ZERO: Duration = Duration::from_millis(0); @@ -57,13 +57,13 @@ fn should_subscribe_to_new_heads() { assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); // Check notifications - handler.new_blocks(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Enacted)]), vec![], vec![], DURATION_ZERO); + handler.new_blocks(NewBlocks::new(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Enacted)]), vec![], vec![], DURATION_ZERO, true)); let (res, receiver) = receiver.into_future().wait().unwrap(); let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"author":"0x0000000000000000000000000000000000000000","difficulty":"0x1","extraData":"0x","gasLimit":"0xf4240","gasUsed":"0x0","hash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0x0000000000000000000000000000000000000000","number":"0x1","parentHash":"0x0cd786a2425d16f152c658316c423e6ce1181e15c3295826d7c9904cba9ce303","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sealFields":[],"sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x1c9","stateRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","timestamp":"0x0","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"},"subscription":"0x416d77337e24399d"}}"#; assert_eq!(res, Some(response.into())); // Notify about two blocks - handler.new_blocks(vec![], vec![], ChainRoute::new(vec![(h2, ChainRouteType::Enacted), (h3, ChainRouteType::Enacted)]), vec![], vec![], DURATION_ZERO); + handler.new_blocks(NewBlocks::new(vec![], vec![], ChainRoute::new(vec![(h2, ChainRouteType::Enacted), (h3, ChainRouteType::Enacted)]), vec![], vec![], DURATION_ZERO, true)); // Receive both let (res, receiver) = receiver.into_future().wait().unwrap(); @@ -129,7 +129,7 @@ fn should_subscribe_to_logs() { assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); // Check notifications (enacted) - handler.new_blocks(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Enacted)]), vec![], vec![], DURATION_ZERO); + handler.new_blocks(NewBlocks::new(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Enacted)]), vec![], vec![], DURATION_ZERO, false)); let (res, receiver) = receiver.into_future().wait().unwrap(); let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"address":"0x0000000000000000000000000000000000000005","blockHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","blockNumber":"0x1","data":"0x","logIndex":"0x0","removed":false,"topics":["0x0000000000000000000000000000000000000000000000000000000000000001","0x0000000000000000000000000000000000000000000000000000000000000002","0x0000000000000000000000000000000000000000000000000000000000000000","0x0000000000000000000000000000000000000000000000000000000000000000"],"transactionHash":""#.to_owned() + &format!("0x{:x}", tx_hash) @@ -137,7 +137,7 @@ fn should_subscribe_to_logs() { assert_eq!(res, Some(response.into())); // Check notifications (retracted) - handler.new_blocks(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Retracted)]), vec![], vec![], DURATION_ZERO); + handler.new_blocks(NewBlocks::new(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Retracted)]), vec![], vec![], DURATION_ZERO, false)); let (res, receiver) = receiver.into_future().wait().unwrap(); let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"address":"0x0000000000000000000000000000000000000005","blockHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","blockNumber":"0x1","data":"0x","logIndex":"0x0","removed":true,"topics":["0x0000000000000000000000000000000000000000000000000000000000000001","0x0000000000000000000000000000000000000000000000000000000000000002","0x0000000000000000000000000000000000000000000000000000000000000000","0x0000000000000000000000000000000000000000000000000000000000000000"],"transactionHash":""#.to_owned() + &format!("0x{:x}", tx_hash) diff --git a/secret_store/src/acl_storage.rs b/secret_store/src/acl_storage.rs index 6bc9aa0e651..5704d42b3f8 100644 --- a/secret_store/src/acl_storage.rs +++ b/secret_store/src/acl_storage.rs @@ -16,12 +16,10 @@ use std::sync::Arc; use std::collections::{HashMap, HashSet}; -use std::time::Duration; use parking_lot::{Mutex, RwLock}; -use ethcore::client::{BlockId, ChainNotify, ChainRoute, CallContract}; -use ethereum_types::{H256, Address}; +use ethcore::client::{BlockId, ChainNotify, NewBlocks, CallContract}; +use ethereum_types::Address; use ethabi::FunctionOutputDecoder; -use bytes::Bytes; use trusted_client::TrustedClient; use types::{Error, ServerKeyId, ContractAddress}; @@ -77,8 +75,9 @@ impl AclStorage for OnChainAclStorage { } impl ChainNotify for OnChainAclStorage { - fn new_blocks(&self, _imported: Vec, _invalid: Vec, route: ChainRoute, _sealed: Vec, _proposed: Vec, _duration: Duration) { - if !route.enacted().is_empty() || !route.retracted().is_empty() { + fn new_blocks(&self, new_blocks: NewBlocks) { + if new_blocks.has_more_blocks_to_import { return } + if !new_blocks.route.enacted().is_empty() || !new_blocks.route.retracted().is_empty() { self.contract.lock().update_contract_address() } } diff --git a/secret_store/src/key_server_set.rs b/secret_store/src/key_server_set.rs index 128d865b89e..2e5e6816d4f 100644 --- a/secret_store/src/key_server_set.rs +++ b/secret_store/src/key_server_set.rs @@ -17,10 +17,9 @@ use std::sync::Arc; use std::net::SocketAddr; use std::collections::{BTreeMap, HashSet}; -use std::time::Duration; use parking_lot::Mutex; use ethabi::FunctionOutputDecoder; -use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify, ChainRoute, CallContract}; +use ethcore::client::{Client, BlockChainClient, BlockId, ChainNotify, NewBlocks, CallContract}; use ethereum_types::{H256, Address}; use ethkey::public_to_address; use bytes::Bytes; @@ -151,8 +150,9 @@ impl KeyServerSet for OnChainKeyServerSet { } impl ChainNotify for OnChainKeyServerSet { - fn new_blocks(&self, _imported: Vec, _invalid: Vec, route: ChainRoute, _sealed: Vec, _proposed: Vec, _duration: Duration) { - let (enacted, retracted) = route.into_enacted_retracted(); + fn new_blocks(&self, new_blocks: NewBlocks) { + if new_blocks.has_more_blocks_to_import { return } + let (enacted, retracted) = new_blocks.route.into_enacted_retracted(); if !enacted.is_empty() || !retracted.is_empty() { self.contract.lock().update(enacted, retracted) diff --git a/secret_store/src/listener/service_contract_listener.rs b/secret_store/src/listener/service_contract_listener.rs index dc2c8a38e1c..e26d3d9250d 100644 --- a/secret_store/src/listener/service_contract_listener.rs +++ b/secret_store/src/listener/service_contract_listener.rs @@ -17,10 +17,9 @@ use std::collections::HashSet; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::time::Duration; use std::thread; use parking_lot::Mutex; -use ethcore::client::{ChainNotify, ChainRoute}; +use ethcore::client::{ChainNotify, NewBlocks}; use ethkey::{Public, public_to_address}; use bytes::Bytes; use ethereum_types::{H256, U256, Address}; @@ -435,9 +434,10 @@ impl Drop for ServiceContractListener { } impl ChainNotify for ServiceContractListener { - fn new_blocks(&self, _imported: Vec, _invalid: Vec, route: ChainRoute, _sealed: Vec, _proposed: Vec, _duration: Duration) { - let enacted_len = route.enacted().len(); - if enacted_len == 0 && route.retracted().is_empty() { + fn new_blocks(&self, new_blocks: NewBlocks) { + if new_blocks.has_more_blocks_to_import { return } + let enacted_len = new_blocks.route.enacted().len(); + if enacted_len == 0 && new_blocks.route.retracted().is_empty() { return; } diff --git a/updater/src/updater.rs b/updater/src/updater.rs index 1e939b97eb5..4b968625a4d 100644 --- a/updater/src/updater.rs +++ b/updater/src/updater.rs @@ -25,9 +25,8 @@ use parking_lot::{Mutex, MutexGuard}; use rand::{self, Rng}; use target_info::Target; -use bytes::Bytes; use ethcore::BlockNumber; -use ethcore::client::{BlockId, BlockChainClient, ChainNotify, ChainRoute}; +use ethcore::client::{BlockId, BlockChainClient, ChainNotify, NewBlocks}; use ethcore::filter::Filter; use ethereum_types::H256; use hash_fetch::{self as fetch, HashFetch}; @@ -669,7 +668,8 @@ impl Updater, _invalid: Vec, _route: ChainRoute, _sealed: Vec, _proposed: Vec, _duration: Duration) { + fn new_blocks(&self, new_blocks: NewBlocks) { + if new_blocks.has_more_blocks_to_import { return } match (self.client.upgrade(), self.sync.as_ref().and_then(Weak::upgrade)) { (Some(ref c), Some(ref s)) if !s.status().is_syncing(c.queue_info()) => self.poll(), _ => {},