Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Less cloning when importing blocks #11531

Merged
merged 19 commits into from
Mar 4, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ethcore/blockchain/src/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1069,8 +1069,8 @@ impl BlockChain {
}

/// Write a pending epoch transition by block hash.
pub fn insert_pending_transition(&self, batch: &mut DBTransaction, hash: H256, t: PendingEpochTransition) {
batch.write(db::COL_EXTRA, &hash, &t);
pub fn insert_pending_transition(&self, batch: &mut DBTransaction, hash: &H256, t: PendingEpochTransition) {
batch.write(db::COL_EXTRA, hash, &t);
}

/// Get a pending epoch transition by block hash.
Expand Down
5 changes: 3 additions & 2 deletions ethcore/engines/authority-round/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2552,14 +2552,15 @@ mod tests {

// step 3
let mut b2 = OpenBlock::new(engine, Default::default(), false, db2, &genesis_header, last_hashes.clone(), addr2, (3141562.into(), 31415620.into()), vec![], false).unwrap();
b2.push_transaction(Transaction {
let signed_tx = Transaction {
action: Action::Create,
nonce: U256::from(0),
gas_price: U256::from(3000),
gas: U256::from(53_000),
value: U256::from(1),
data: vec![],
}.fake_sign(addr2)).unwrap();
}.fake_sign(addr2);
b2.push_transaction(&signed_tx).unwrap();
let b2 = b2.close_and_lock().unwrap();

// we will now seal a block with 1tx and include the accumulated empty step message
Expand Down
8 changes: 1 addition & 7 deletions ethcore/light/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,14 +476,12 @@ impl<T: ChainDataFetcher> Client<T> {
fn check_epoch_signal(&self, verified_header: &Header) -> Result<Option<Proof>, T::Error> {
use common_types::engines::machine::{AuxiliaryRequest, AuxiliaryData};

let mut block: Option<Vec<u8>> = None;
let mut receipts: Option<Vec<_>> = None;

loop {

let is_signal = {
let auxiliary = AuxiliaryData {
bytes: block.as_ref().map(|x| &x[..]),
receipts: receipts.as_ref().map(|x| &x[..]),
};

Expand All @@ -495,7 +493,7 @@ impl<T: ChainDataFetcher> Client<T> {
EpochChange::No => return Ok(None),
EpochChange::Yes(proof) => return Ok(Some(proof)),
EpochChange::Unsure(unsure) => {
let (b, r) = match unsure {
let (_, r) = match unsure {
AuxiliaryRequest::Body =>
(Some(self.fetcher.block_body(verified_header)), None),
AuxiliaryRequest::Receipts =>
Expand All @@ -506,10 +504,6 @@ impl<T: ChainDataFetcher> Client<T> {
),
};

if let Some(b) = b {
block = Some(b.into_future().wait()?.into_inner());
}

if let Some(r) = r {
receipts = Some(r.into_future().wait()?);
}
Expand Down
33 changes: 17 additions & 16 deletions ethcore/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,16 +168,17 @@ impl<'x> OpenBlock<'x> {
/// Push a transaction into the block.
///
/// If valid, it will be executed, and archived together with the receipt.
pub fn push_transaction(&mut self, t: SignedTransaction) -> Result<&Receipt, Error> {
pub fn push_transaction(&mut self, t: &SignedTransaction) -> Result<&Receipt, Error> {
Copy link
Collaborator

@niklasad1 niklasad1 Mar 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rationale behind this change?

The transaction is still cloned if the else branch is taken, are you doing this because of saving an allocation if the if branch is taken?

Not sure how this plays along w.r.t. to speculative execution on modern CPUs

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, which else-branch do you mean?

Copy link
Collaborator

@niklasad1 niklasad1 Mar 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, well there's that also, but mainly passing the tx by ref is fallout from a borrow from higher up in the call chain, in enact_verified(). (I was also unsure wth the into() was doing).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The into() was not doing anything

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also not sure why we need this change, if anything, it makes more allocations.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in import_verified_blocks() we call check_and_lock_block() passing in an owned PreverifiedBlock, so to make the rest of the code work there we do some cloning that didn't seem necessary and that's where this PR started.
When we call enact_verified() we move the block but what is actually needed for enact() is actually the header, transactions and the (max) 2 uncles.
What I'd really would like to do is have a way to split up the PreverifiedBlock into (owned) components and be able to move them independently. I don't know of a way to do that, and that's why I ended up with this: it avoids cloning the block bytes in import_verified_blocks() and a few Header structs, and passes references around instead. Maybe this mean more clones, as each transaction that is added to the block is cloned, but they are also smaller (I hope).

Copy link
Collaborator

@niklasad1 niklasad1 Mar 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you could use some mem::take tricks, for example, replace Bytes with an empty Vec.

But, why can't you take PreverifiedBlock by value instead in enact? As I did here: https://github.com/OpenEthereum/open-ethereum/compare/na-ethcore-blockerror#diff-e2e171033e005c3b329818aaff213767R411-R564

I'm not saying that it is elegant but it worked for me at least :)

Copy link
Collaborator

@ordian ordian Mar 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole thing reminds me of something :P

The real problem is that programmers have spent far too much time worrying about efficiency in the wrong places and at the wrong times; premature optimization is the root of all evil (or at least most of it) in programming.”

Seriously though,

What I'd really would like to do is have a way to split up the PreverifiedBlock into (owned) components and be able to move them independently. I don't know of a way to do that

create a type for parts or pass them directly instead of PreferifiedBlock? or what Niklas suggested

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thank you for the input, much appreciated. At the end I went with mem::take()-ing the block RLP bytes and after some further shuffling things around I think I have something decent.

if self.block.transactions_set.contains(&t.hash()) {
return Err(TransactionError::AlreadyImported.into());
}

let env_info = self.block.env_info();
let outcome = self.block.state.apply(&env_info, self.engine.machine(), &t, self.block.traces.is_enabled())?;
let outcome = self.block.state.apply(&env_info, self.engine.machine(), t, self.block.traces.is_enabled())?;

self.block.transactions_set.insert(t.hash());
self.block.transactions.push(t.into());
// self.block.transactions.push(t.into());
self.block.transactions.push(t.clone());
if let Tracing::Enabled(ref mut traces) = self.block.traces {
traces.push(outcome.trace.into());
}
Expand All @@ -187,7 +188,7 @@ impl<'x> OpenBlock<'x> {

/// Push transactions onto the block.
#[cfg(not(feature = "slow-blocks"))]
fn push_transactions(&mut self, transactions: Vec<SignedTransaction>) -> Result<(), Error> {
fn push_transactions(&mut self, transactions: &Vec<SignedTransaction>) -> Result<(), Error> {
for t in transactions {
self.push_transaction(t)?;
}
Expand All @@ -196,7 +197,7 @@ impl<'x> OpenBlock<'x> {

/// Push transactions onto the block.
#[cfg(feature = "slow-blocks")]
fn push_transactions(&mut self, transactions: Vec<SignedTransaction>) -> Result<(), Error> {
fn push_transactions(&mut self, transactions: &Vec<SignedTransaction>) -> Result<(), Error> {
use std::time;

let slow_tx = option_env!("SLOW_TX_DURATION").and_then(|v| v.parse().ok()).unwrap_or(100);
Expand Down Expand Up @@ -407,9 +408,9 @@ impl Drain for SealedBlock {

/// Enact the block given by block header, transactions and uncles
pub(crate) fn enact(
header: Header,
transactions: Vec<SignedTransaction>,
uncles: Vec<Header>,
header: &Header,
transactions: &Vec<SignedTransaction>,
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
uncles: &Vec<Header>,
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
engine: &dyn Engine,
tracing: bool,
db: StateDB,
Expand All @@ -434,7 +435,7 @@ pub(crate) fn enact(
last_hashes,
// Engine such as Clique will calculate author from extra_data.
// this is only important for executing contracts as the 'executive_author'.
engine.executive_author(&header)?,
engine.executive_author(header)?,
(3141562.into(), 31415620.into()),
vec![],
is_epoch_begin,
Expand All @@ -448,19 +449,19 @@ pub(crate) fn enact(
b.block.header.number(), root, env.author, author_balance);
}

b.populate_from(&header);
b.populate_from(header);
b.push_transactions(transactions)?;

for u in uncles {
b.push_uncle(u)?;
b.push_uncle(u.clone())?;
}

b.close_and_lock()
}

/// Enact the block given by `block_bytes` using `engine` on the database `db` with the given `parent` block header
pub fn enact_verified(
block: PreverifiedBlock,
block: &PreverifiedBlock,
engine: &dyn Engine,
tracing: bool,
db: StateDB,
Expand All @@ -471,9 +472,9 @@ pub fn enact_verified(
) -> Result<LockedBlock, Error> {

enact(
block.header,
block.transactions,
block.uncles,
&block.header,
&block.transactions,
&block.uncles,
engine,
tracing,
db,
Expand Down Expand Up @@ -547,7 +548,7 @@ mod tests {
)?;

b.populate_from(&header);
b.push_transactions(transactions)?;
b.push_transactions(&transactions)?;

for u in block.uncles {
b.push_uncle(u)?;
Expand Down
66 changes: 33 additions & 33 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,9 @@ impl Importer {
}

let max_blocks_to_import = client.config.max_round_blocks_to_import;
let (imported_blocks, import_results, invalid_blocks, imported, proposed_blocks, duration, has_more_blocks_to_import) = {
let (imported_blocks, import_results, invalid_blocks, imported, duration, has_more_blocks_to_import) = {
let mut imported_blocks = Vec::with_capacity(max_blocks_to_import);
let mut invalid_blocks = HashSet::new();
let proposed_blocks = Vec::with_capacity(max_blocks_to_import);
Copy link
Collaborator

@niklasad1 niklasad1 Mar 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was actually not ever used? Nice catch 👍

let mut import_results = Vec::with_capacity(max_blocks_to_import);

let _import_lock = self.import_lock.lock();
Expand All @@ -298,26 +297,25 @@ impl Importer {
let start = Instant::now();

for block in blocks {
let header = block.header.clone();
let bytes = block.bytes.clone();
let hash = header.hash();
let hash = block.header.hash();

let is_invalid = invalid_blocks.contains(header.parent_hash());
let is_invalid = invalid_blocks.contains(block.header.parent_hash());
if is_invalid {
invalid_blocks.insert(hash);
continue;
}

match self.check_and_lock_block(&bytes, block, client) {
Ok((closed_block, pending)) => {
match self.check_and_lock_block(&block, client) {
Ok((locked_block, pending)) => {
imported_blocks.push(hash);
let transactions_len = closed_block.transactions.len();
let route = self.commit_block(closed_block, &header, encoded::Block::new(bytes), pending, client);
let transactions_len = locked_block.transactions.len();
let gas_used = locked_block.header.gas_used().clone();
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
let route = self.commit_block(locked_block, encoded::Block::new(block.bytes), pending, client);
import_results.push(route);
client.report.write().accrue_block(&header, transactions_len);
},
client.report.write().accrue_block(&gas_used, transactions_len);
}
Err(err) => {
self.bad_blocks.report(bytes, format!("{:?}", err));
self.bad_blocks.report(block.bytes, format!("{:?}", err));
invalid_blocks.insert(hash);
},
}
Expand All @@ -330,9 +328,9 @@ impl Importer {
self.block_queue.mark_as_bad(&invalid_blocks);
}
let has_more_blocks_to_import = !self.block_queue.mark_as_good(&imported_blocks);
(imported_blocks, import_results, invalid_blocks, imported, proposed_blocks, start.elapsed(), has_more_blocks_to_import)
(imported_blocks, import_results, invalid_blocks, imported, start.elapsed(), has_more_blocks_to_import)
};

trace!(target: "dp", "[import_verified_blocks] {} blocks processed in {:?}", imported, duration);
{
if !imported_blocks.is_empty() {
let route = ChainRoute::from(import_results.as_ref());
Expand All @@ -348,23 +346,27 @@ impl Importer {
invalid_blocks.clone(),
route.clone(),
Vec::new(),
proposed_blocks.clone(),
Vec::new(),
duration,
has_more_blocks_to_import,
)
);
});
}
}

let start = Instant::now();
let db = client.db.read();
db.key_value().flush().expect("DB flush failed.");
trace!(target: "dp", "[import_verified_blocks] flushed the db in: {:?}",
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
start.elapsed()
);

imported
}

fn check_and_lock_block(&self, bytes: &[u8], block: PreverifiedBlock, client: &Client) -> EthcoreResult<(LockedBlock, Option<PendingTransition>)> {
fn check_and_lock_block(&self, block: &PreverifiedBlock, client: &Client) -> EthcoreResult<(LockedBlock, Option<PendingTransition>)> {
let engine = &*self.engine;
let header = block.header.clone();
let header = &block.header;

// Check the block isn't so old we won't be able to enact it.
let best_block_number = client.chain.read().best_block_number();
Expand All @@ -389,7 +391,7 @@ impl Importer {
&parent,
engine,
verification::FullFamilyParams {
block: &block,
block,
block_provider: &**chain,
client
},
Expand Down Expand Up @@ -448,7 +450,6 @@ impl Importer {

let pending = self.check_epoch_end_signal(
&header,
bytes,
&locked_block.receipts,
locked_block.state.db(),
client
Expand Down Expand Up @@ -490,22 +491,22 @@ impl Importer {
fn commit_block<B>(
&self,
block: B,
header: &Header,
block_data: encoded::Block,
pending: Option<PendingTransition>,
client: &Client
) -> ImportRoute
where B: Drain
{
let block = block.drain();
let header = block.header;
let hash = &header.hash();
let number = header.number();
let parent = header.parent_hash();
let chain = client.chain.read();
let mut is_finalized = false;

// Commit results
let block = block.drain();
debug_assert_eq!(header.hash(), block_data.header_view().hash());
debug_assert_eq!(*hash, block_data.header_view().hash());

let mut batch = DBTransaction::new();

Expand Down Expand Up @@ -543,15 +544,15 @@ impl Importer {
// check epoch end signal, potentially generating a proof on the current
// state.
if let Some(pending) = pending {
chain.insert_pending_transition(&mut batch, header.hash(), pending);
chain.insert_pending_transition(&mut batch, hash, pending);
}

state.journal_under(&mut batch, number, hash).expect("DB commit failed");

let finalized: Vec<_> = ancestry_actions.into_iter().map(|ancestry_action| {
let AncestryAction::MarkFinalized(a) = ancestry_action;

if a != header.hash() {
if a != *hash {
chain.mark_finalized(&mut batch, a).expect("Engine's ancestry action must be known blocks; qed");
} else {
// we're finalizing the current block
Expand Down Expand Up @@ -596,7 +597,6 @@ impl Importer {
fn check_epoch_end_signal(
&self,
header: &Header,
block_bytes: &[u8],
receipts: &[Receipt],
state_db: &StateDB,
client: &Client,
Expand All @@ -605,7 +605,6 @@ impl Importer {

let hash = header.hash();
let auxiliary = AuxiliaryData {
bytes: Some(block_bytes),
receipts: Some(&receipts),
};

Expand Down Expand Up @@ -1065,14 +1064,15 @@ impl Client {
};

self.block_header(id).and_then(|header| {
let db = self.state_db.read().boxed_clone();

let state_db = self.state_db.read();
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
// early exit for pruned blocks
if db.is_prunable() && self.pruning_info().earliest_state > block_number {
// if db.is_prunable() && self.pruning_info().earliest_state > block_number {
if state_db.is_prunable() && self.pruning_info().earliest_state > block_number {
trace!(target: "client", "State for block #{} is pruned. Earliest state: {:?}", block_number, self.pruning_info().earliest_state);
return None;
}

let db = state_db.boxed_clone();
let root = header.state_root();
State::from_existing(db, root, self.engine.account_start_nonce(block_number), self.factories.clone()).ok()
})
Expand Down Expand Up @@ -2402,14 +2402,14 @@ impl ImportSealedBlock for Client {

let pending = self.importer.check_epoch_end_signal(
&header,
&block_bytes,
// &block_bytes,
&block.receipts,
block.state.db(),
self
)?;
let route = self.importer.commit_block(
block,
&header,
// &header,
encoded::Block::new(block_bytes),
pending,
self
Expand Down
3 changes: 2 additions & 1 deletion ethcore/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ impl Miner {
let block_start = Instant::now();
debug!(target: "miner", "Attempting to push {} transactions.", engine_txs.len() + queue_txs.len());

// for transaction in engine_txs.into_iter().chain(queue_txs.into_iter().map(|tx| tx.signed().clone())) {
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
for transaction in engine_txs.into_iter().chain(queue_txs.into_iter().map(|tx| tx.signed().clone())) {
let start = Instant::now();

Expand All @@ -535,7 +536,7 @@ impl Miner {
let result = client.verify_for_pending_block(&transaction, &open_block.header)
.map_err(|e| e.into())
.and_then(|_| {
open_block.push_transaction(transaction)
open_block.push_transaction(&transaction)
});

let took = start.elapsed();
Expand Down
Loading