Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Update sealing just once when externally importing many blocks (#1541)
Browse files Browse the repository at this point in the history
Fixes Issue #1372
  • Loading branch information
nipunn1313 authored and gavofyork committed Jul 6, 2016
1 parent f8b5631 commit 4a9b9dc
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 88 deletions.
26 changes: 5 additions & 21 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ use log_entry::LocalizedLogEntry;
use block_queue::{BlockQueue, BlockQueueInfo};
use blockchain::{BlockChain, BlockProvider, TreeRoute, ImportRoute};
use client::{BlockID, TransactionID, UncleID, TraceId, Mode, ClientConfig, DatabaseCompactionProfile,
BlockChainClient, MiningBlockChainClient, TraceFilter, CallAnalytics, TransactionImportError,
BlockImportError, TransactionImportResult};
BlockChainClient, MiningBlockChainClient, TraceFilter, CallAnalytics,
BlockImportError};
use client::Error as ClientError;
use env_info::EnvInfo;
use executive::{Executive, Executed, TransactOptions, contract_address};
Expand All @@ -52,7 +52,7 @@ use trace;
pub use types::blockchain_info::BlockChainInfo;
pub use types::block_status::BlockStatus;
use evm::Factory as EvmFactory;
use miner::{Miner, MinerService, AccountDetails};
use miner::{Miner, MinerService};

const MAX_TX_QUEUE_SIZE: usize = 4096;
const MAX_QUEUE_SIZE_TO_SLEEP_ON: usize = 2;
Expand Down Expand Up @@ -409,12 +409,8 @@ impl Client {
pub fn import_queued_transactions(&self, transactions: &[Bytes]) -> usize {
let _timer = PerfTimer::new("import_queued_transactions");
self.queue_transactions.fetch_sub(transactions.len(), AtomicOrdering::SeqCst);
let fetch_account = |a: &Address| AccountDetails {
nonce: self.latest_nonce(a),
balance: self.latest_balance(a),
};
let tx = transactions.iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect();
let results = self.miner.import_transactions(self, tx, fetch_account);
let txs = transactions.iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect();
let results = self.miner.import_external_transactions(self, txs);
results.len()
}

Expand Down Expand Up @@ -861,18 +857,6 @@ impl BlockChainClient for Client {
self.build_last_hashes(self.chain.best_block_hash())
}

fn import_transactions(&self, transactions: Vec<SignedTransaction>) -> Vec<Result<TransactionImportResult, TransactionImportError>> {
let fetch_account = |a: &Address| AccountDetails {
nonce: self.latest_nonce(a),
balance: self.latest_balance(a),
};

self.miner.import_transactions(self, transactions, &fetch_account)
.into_iter()
.map(|res| res.map_err(|e| e.into()))
.collect()
}

fn queue_transactions(&self, transactions: Vec<Bytes>) {
if self.queue_transactions.load(AtomicOrdering::Relaxed) > MAX_TX_QUEUE_SIZE {
debug!("Ignoring {} transactions: queue is full", transactions.len());
Expand Down
5 changes: 1 addition & 4 deletions ethcore/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use receipt::LocalizedReceipt;
use trace::LocalizedTrace;
use evm::Factory as EvmFactory;
pub use block_import_error::BlockImportError;
pub use transaction_import::{TransactionImportResult, TransactionImportError};
pub use transaction_import::TransactionImportResult;

/// Options concerning what analytics we run on the call.
#[derive(Eq, PartialEq, Default, Clone, Copy, Debug)]
Expand Down Expand Up @@ -192,9 +192,6 @@ pub trait BlockChainClient : Sync + Send {
/// Get last hashes starting from best block.
fn last_hashes(&self) -> LastHashes;

/// import transactions from network/other 3rd party
fn import_transactions(&self, transactions: Vec<SignedTransaction>) -> Vec<Result<TransactionImportResult, TransactionImportError>>;

/// Queue transactions for importing.
fn queue_transactions(&self, transactions: Vec<Bytes>);

Expand Down
30 changes: 11 additions & 19 deletions ethcore/src/client/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use transaction::{Transaction, LocalizedTransaction, SignedTransaction, Action};
use blockchain::TreeRoute;
use client::{BlockChainClient, MiningBlockChainClient, BlockChainInfo, BlockStatus, BlockID,
TransactionID, UncleID, TraceId, TraceFilter, LastHashes, CallAnalytics,
TransactionImportError, BlockImportError};
BlockImportError};
use header::{Header as BlockHeader, BlockNumber};
use filter::Filter;
use log_entry::LocalizedLogEntry;
Expand All @@ -39,8 +39,6 @@ use executive::Executed;
use error::{ExecutionError};
use trace::LocalizedTrace;

use miner::{TransactionImportResult, AccountDetails};

/// Test client.
pub struct TestBlockChainClient {
/// Blocks.
Expand Down Expand Up @@ -275,6 +273,10 @@ impl BlockChainClient for TestBlockChainClient {
}
}

fn latest_nonce(&self, address: &Address) -> U256 {
self.nonce(address, BlockID::Latest).unwrap()
}

fn code(&self, address: &Address) -> Option<Bytes> {
self.code.read().unwrap().get(address).cloned()
}
Expand All @@ -287,6 +289,10 @@ impl BlockChainClient for TestBlockChainClient {
}
}

fn latest_balance(&self, address: &Address) -> U256 {
self.balance(address, BlockID::Latest).unwrap()
}

fn storage_at(&self, address: &Address, position: &H256, id: BlockID) -> Option<H256> {
if let BlockID::Latest = id {
Some(self.storage.read().unwrap().get(&(address.clone(), position.clone())).cloned().unwrap_or_else(H256::new))
Expand Down Expand Up @@ -488,24 +494,10 @@ impl BlockChainClient for TestBlockChainClient {
unimplemented!();
}

fn import_transactions(&self, transactions: Vec<SignedTransaction>) -> Vec<Result<TransactionImportResult, TransactionImportError>> {
let nonces = self.nonces.read().unwrap();
let balances = self.balances.read().unwrap();
let fetch_account = |a: &Address| AccountDetails {
nonce: nonces[a],
balance: balances[a],
};

self.miner.import_transactions(self, transactions, &fetch_account)
.into_iter()
.map(|res| res.map_err(|e| e.into()))
.collect()
}

fn queue_transactions(&self, transactions: Vec<Bytes>) {
// import right here
let tx = transactions.into_iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect();
self.import_transactions(tx);
let txs = transactions.into_iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect();
self.miner.import_external_transactions(self, txs);
}

fn pending_transactions(&self) -> Vec<SignedTransaction> {
Expand Down
56 changes: 33 additions & 23 deletions ethcore/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,19 @@ impl Miner {
!have_work
}

fn add_transactions_to_queue(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>, origin: TransactionOrigin, transaction_queue: &mut TransactionQueue) ->
Vec<Result<TransactionImportResult, Error>> {

let fetch_account = |a: &Address| AccountDetails {
nonce: chain.latest_nonce(a),
balance: chain.latest_balance(a),
};

transactions.into_iter()
.map(|tx| transaction_queue.add(tx, &fetch_account, origin))
.collect()
}

/// Are we allowed to do a non-mandatory reseal?
fn tx_reseal_allowed(&self) -> bool { Instant::now() > *self.next_allowed_reseal.lock().unwrap() }
}
Expand Down Expand Up @@ -478,35 +491,32 @@ impl MinerService for Miner {
self.gas_range_target.read().unwrap().1
}

fn import_transactions<T>(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>, fetch_account: T) ->
Vec<Result<TransactionImportResult, Error>>
where T: Fn(&Address) -> AccountDetails {
let results: Vec<Result<TransactionImportResult, Error>> = {
let mut transaction_queue = self.transaction_queue.lock().unwrap();
transactions.into_iter()
.map(|tx| transaction_queue.add(tx, &fetch_account, TransactionOrigin::External))
.collect()
};
if !results.is_empty() && self.options.reseal_on_external_tx && self.tx_reseal_allowed() {
fn import_external_transactions(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>) ->
Vec<Result<TransactionImportResult, Error>> {

let mut transaction_queue = self.transaction_queue.lock().unwrap();
let results = self.add_transactions_to_queue(chain, transactions, TransactionOrigin::External,
&mut transaction_queue);

if !results.is_empty() && self.options.reseal_on_external_tx && self.tx_reseal_allowed() {
self.update_sealing(chain);
}
results
}

fn import_own_transaction<T>(
fn import_own_transaction(
&self,
chain: &MiningBlockChainClient,
transaction: SignedTransaction,
fetch_account: T
) -> Result<TransactionImportResult, Error> where T: Fn(&Address) -> AccountDetails {
) -> Result<TransactionImportResult, Error> {

let hash = transaction.hash();
trace!(target: "own_tx", "Importing transaction: {:?}", transaction);

let imported = {
// Be sure to release the lock before we call enable_and_prepare_sealing
let mut transaction_queue = self.transaction_queue.lock().unwrap();
let import = transaction_queue.add(transaction, &fetch_account, TransactionOrigin::Local);
let import = self.add_transactions_to_queue(chain, vec![transaction], TransactionOrigin::Local, &mut transaction_queue).pop().unwrap();

match import {
Ok(ref res) => {
Expand Down Expand Up @@ -657,7 +667,12 @@ impl MinerService for Miner {
// Client should send message after commit to db and inserting to chain.
.expect("Expected in-chain blocks.");
let block = BlockView::new(&block);
block.transactions()
let txs = block.transactions();
// populate sender
for tx in &txs {
let _sender = tx.sender();
}
txs
}

// 1. We ignore blocks that were `imported` (because it means that they are not in canon-chain, and transactions
Expand All @@ -674,14 +689,9 @@ impl MinerService for Miner {
.par_iter()
.map(|h| fetch_transactions(chain, h));
out_of_chain.for_each(|txs| {
// populate sender
for tx in &txs {
let _sender = tx.sender();
}
let _ = self.import_transactions(chain, txs, |a| AccountDetails {
nonce: chain.latest_nonce(a),
balance: chain.latest_balance(a),
});
let mut transaction_queue = self.transaction_queue.lock().unwrap();
let _ = self.add_transactions_to_queue(chain, txs, TransactionOrigin::External,
&mut transaction_queue);
});
}

Expand Down
10 changes: 4 additions & 6 deletions ethcore/src/miner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,12 @@ pub trait MinerService : Send + Sync {
fn set_tx_gas_limit(&self, limit: U256);

/// Imports transactions to transaction queue.
fn import_transactions<T>(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>, fetch_account: T) ->
Vec<Result<TransactionImportResult, Error>>
where T: Fn(&Address) -> AccountDetails, Self: Sized;
fn import_external_transactions(&self, chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>) ->
Vec<Result<TransactionImportResult, Error>>;

/// Imports own (node owner) transaction to queue.
fn import_own_transaction<T>(&self, chain: &MiningBlockChainClient, transaction: SignedTransaction, fetch_account: T) ->
Result<TransactionImportResult, Error>
where T: Fn(&Address) -> AccountDetails, Self: Sized;
fn import_own_transaction(&self, chain: &MiningBlockChainClient, transaction: SignedTransaction) ->
Result<TransactionImportResult, Error>;

/// Returns hashes of transactions currently in pending
fn pending_transactions_hashes(&self) -> Vec<H256>;
Expand Down
9 changes: 2 additions & 7 deletions rpc/src/v1/impls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub use self::rpc::RpcClient;
use v1::helpers::TransactionRequest;
use v1::types::H256 as NH256;
use ethcore::error::Error as EthcoreError;
use ethcore::miner::{AccountDetails, MinerService};
use ethcore::miner::MinerService;
use ethcore::client::MiningBlockChainClient;
use ethcore::transaction::{Action, SignedTransaction, Transaction};
use ethcore::account_provider::{AccountProvider, Error as AccountError};
Expand All @@ -80,12 +80,7 @@ fn dispatch_transaction<C, M>(client: &C, miner: &M, signed_transaction: SignedT
where C: MiningBlockChainClient, M: MinerService {
let hash = NH256::from(signed_transaction.hash());

let import = miner.import_own_transaction(client, signed_transaction, |a: &Address| {
AccountDetails {
nonce: client.latest_nonce(&a),
balance: client.latest_balance(&a),
}
});
let import = miner.import_own_transaction(client, signed_transaction);

import
.map_err(transaction_error)
Expand Down
14 changes: 6 additions & 8 deletions rpc/src/v1/tests/helpers/miner_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use ethcore::client::{MiningBlockChainClient, Executed, CallAnalytics};
use ethcore::block::{ClosedBlock, IsBlock};
use ethcore::transaction::SignedTransaction;
use ethcore::receipt::Receipt;
use ethcore::miner::{MinerService, MinerStatus, AccountDetails, TransactionImportResult};
use ethcore::miner::{MinerService, MinerStatus, TransactionImportResult};

/// Test miner service.
pub struct TestMinerService {
Expand Down Expand Up @@ -130,14 +130,13 @@ impl MinerService for TestMinerService {
}

/// Imports transactions to transaction queue.
fn import_transactions<T>(&self, _chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>, fetch_account: T) ->
Vec<Result<TransactionImportResult, Error>>
where T: Fn(&Address) -> AccountDetails {
fn import_external_transactions(&self, _chain: &MiningBlockChainClient, transactions: Vec<SignedTransaction>) ->
Vec<Result<TransactionImportResult, Error>> {
// lets assume that all txs are valid
self.imported_transactions.lock().unwrap().extend_from_slice(&transactions);

for sender in transactions.iter().filter_map(|t| t.sender().ok()) {
let nonce = self.last_nonce(&sender).unwrap_or(fetch_account(&sender).nonce);
let nonce = self.last_nonce(&sender).expect("last_nonce must be populated in tests");
self.last_nonces.write().unwrap().insert(sender, nonce + U256::from(1));
}
transactions
Expand All @@ -147,9 +146,8 @@ impl MinerService for TestMinerService {
}

/// Imports transactions to transaction queue.
fn import_own_transaction<T>(&self, chain: &MiningBlockChainClient, transaction: SignedTransaction, _fetch_account: T) ->
Result<TransactionImportResult, Error>
where T: Fn(&Address) -> AccountDetails {
fn import_own_transaction(&self, chain: &MiningBlockChainClient, transaction: SignedTransaction) ->
Result<TransactionImportResult, Error> {

// keep the pending nonces up to date
if let Ok(ref sender) = transaction.sender() {
Expand Down

0 comments on commit 4a9b9dc

Please sign in to comment.