Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Fix pubsub new_blocks notifications to include all blocks #9987

Merged
merged 20 commits into from
Dec 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions ethcore/private-tx/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ pub use error::{Error, ErrorKind};

use std::sync::{Arc, Weak};
use std::collections::{HashMap, HashSet, BTreeMap};
use std::time::Duration;
use ethereum_types::{H128, H256, U256, Address};
use hash::keccak;
use rlp::*;
Expand All @@ -82,7 +81,7 @@ use ethcore::executed::{Executed};
use transaction::{SignedTransaction, Transaction, Action, UnverifiedTransaction};
use ethcore::{contract_address as ethcore_contract_address};
use ethcore::client::{
Client, ChainNotify, ChainRoute, ChainMessageType, ClientIoMessage, BlockId,
Client, ChainNotify, NewBlocks, ChainMessageType, ClientIoMessage, BlockId,
CallContract, Call, BlockInfo
};
use ethcore::account_provider::AccountProvider;
Expand Down Expand Up @@ -733,12 +732,11 @@ fn find_account_password(passwords: &Vec<Password>, account_provider: &AccountPr
}

impl ChainNotify for Provider {
fn new_blocks(&self, imported: Vec<H256>, _invalid: Vec<H256>, _route: ChainRoute, _sealed: Vec<H256>, _proposed: Vec<Bytes>, _duration: Duration) {
if !imported.is_empty() {
trace!(target: "privatetx", "New blocks imported, try to prune the queue");
if let Err(err) = self.process_verification_queue() {
warn!(target: "privatetx", "Cannot prune private transactions queue. error: {:?}", err);
}
fn new_blocks(&self, new_blocks: NewBlocks) {
if new_blocks.imported.is_empty() || new_blocks.has_more_blocks_to_import { return }
trace!(target: "privatetx", "New blocks imported, try to prune the queue");
if let Err(err) = self.process_verification_queue() {
warn!(target: "privatetx", "Cannot prune private transactions queue. error: {:?}", err);
}
}
}
52 changes: 42 additions & 10 deletions ethcore/src/client/chain_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,51 @@ impl ChainRoute {
}
}

/// Used by `ChainNotify` `new_blocks()`
pub struct NewBlocks {
/// Imported blocks
pub imported: Vec<H256>,
/// Invalid blocks
pub invalid: Vec<H256>,
/// Route
pub route: ChainRoute,
/// Sealed
pub sealed: Vec<H256>,
/// Block bytes.
pub proposed: Vec<Bytes>,
/// Duration
pub duration: Duration,
/// Has more blocks to import
pub has_more_blocks_to_import: bool,
}

impl NewBlocks {
/// Constructor
pub fn new (
imported: Vec<H256>,
invalid: Vec<H256>,
route: ChainRoute,
sealed: Vec<H256>,
proposed: Vec<Bytes>,
duration: Duration,
has_more_blocks_to_import: bool,
) -> NewBlocks {
NewBlocks {
imported,
invalid,
route,
sealed,
proposed,
duration,
has_more_blocks_to_import,
}
}
}

/// Represents what has to be handled by actor listening to chain events
pub trait ChainNotify : Send + Sync {
/// fires when chain has new blocks.
fn new_blocks(
&self,
_imported: Vec<H256>,
_invalid: Vec<H256>,
_route: ChainRoute,
_sealed: Vec<H256>,
// Block bytes.
_proposed: Vec<Bytes>,
_duration: Duration,
) {
fn new_blocks( &self, _new_blocks: NewBlocks) {
Copy link
Collaborator

@sorpaas sorpaas Dec 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn new_blocks( &self, _new_blocks: NewBlocks) {
fn new_blocks(&self, _new_blocks: NewBlocks) {

// does nothing by default
}

Expand Down
57 changes: 33 additions & 24 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use client::{
use client::{
BlockId, TransactionId, UncleId, TraceId, ClientConfig, BlockChainClient,
TraceFilter, CallAnalytics, Mode,
ChainNotify, ChainRoute, PruningInfo, ProvingBlockChainClient, EngineInfo, ChainMessageType,
ChainNotify, NewBlocks, ChainRoute, PruningInfo, ProvingBlockChainClient, EngineInfo, ChainMessageType,
IoClient, BadBlocks,
};
use client::bad_blocks;
Expand Down Expand Up @@ -268,7 +268,7 @@ impl Importer {
}

let max_blocks_to_import = client.config.max_round_blocks_to_import;
let (imported_blocks, import_results, invalid_blocks, imported, proposed_blocks, duration, is_empty) = {
let (imported_blocks, import_results, invalid_blocks, imported, proposed_blocks, duration, has_more_blocks_to_import) = {
let mut imported_blocks = Vec::with_capacity(max_blocks_to_import);
let mut invalid_blocks = HashSet::new();
let mut proposed_blocks = Vec::with_capacity(max_blocks_to_import);
Expand Down Expand Up @@ -322,26 +322,29 @@ impl Importer {
if !invalid_blocks.is_empty() {
self.block_queue.mark_as_bad(&invalid_blocks);
}
let is_empty = self.block_queue.mark_as_good(&imported_blocks);
(imported_blocks, import_results, invalid_blocks, imported, proposed_blocks, start.elapsed(), is_empty)
let has_more_blocks_to_import = !self.block_queue.mark_as_good(&imported_blocks);
(imported_blocks, import_results, invalid_blocks, imported, proposed_blocks, start.elapsed(), has_more_blocks_to_import)
};

{
if !imported_blocks.is_empty() && is_empty {
if !imported_blocks.is_empty() {
let route = ChainRoute::from(import_results.as_ref());

if is_empty {
if !has_more_blocks_to_import {
self.miner.chain_new_blocks(client, &imported_blocks, &invalid_blocks, route.enacted(), route.retracted(), false);
}

client.notify(|notify| {
notify.new_blocks(
imported_blocks.clone(),
invalid_blocks.clone(),
route.clone(),
Vec::new(),
proposed_blocks.clone(),
duration,
NewBlocks::new(
imported_blocks.clone(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we pass references here instead of cloning the Vec every time?
Or maybe better pass Arc<NewBlocks> to avoid cloning the content if reference is not possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I was thinking about too (comment above). There might be some more improvements to make, but I was wondering where to draw the line with this PR. I think this is worth doing this however.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with a separate PR improving this, just make sure to create an issue for this and add patch labels.

invalid_blocks.clone(),
route.clone(),
Vec::new(),
proposed_blocks.clone(),
duration,
has_more_blocks_to_import,
)
);
});
}
Expand Down Expand Up @@ -2342,12 +2345,15 @@ impl ImportSealedBlock for Client {
);
self.notify(|notify| {
notify.new_blocks(
vec![hash],
vec![],
route.clone(),
vec![hash],
vec![],
start.elapsed(),
NewBlocks::new(
vec![hash],
vec![],
route.clone(),
vec![hash],
vec![],
start.elapsed(),
false
)
);
});
self.db.read().key_value().flush().expect("DB flush failed.");
Expand All @@ -2360,12 +2366,15 @@ impl BroadcastProposalBlock for Client {
const DURATION_ZERO: Duration = Duration::from_millis(0);
self.notify(|notify| {
notify.new_blocks(
vec![],
vec![],
ChainRoute::default(),
vec![],
vec![block.rlp_bytes()],
DURATION_ZERO,
NewBlocks::new(
vec![],
vec![],
ChainRoute::default(),
vec![],
vec![block.rlp_bytes()],
DURATION_ZERO,
false
)
);
});
}
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub use self::evm_test_client::{EvmTestClient, EvmTestError, TransactResult};
pub use self::io_message::ClientIoMessage;
#[cfg(any(test, feature = "test-helpers"))]
pub use self::test_client::{TestBlockChainClient, EachBlockWith};
pub use self::chain_notify::{ChainNotify, ChainRoute, ChainRouteType, ChainMessageType};
pub use self::chain_notify::{ChainNotify, NewBlocks, ChainRoute, ChainRouteType, ChainMessageType};
pub use self::traits::{
Nonce, Balance, ChainInfo, BlockInfo, ReopenBlock, PrepareOpenBlock, CallContract, TransactionInfo, RegistryInfo, ScheduleInfo, ImportSealedBlock, BroadcastProposalBlock, ImportBlock,
StateOrBlock, StateClient, Call, EngineInfo, AccountData, BlockChain, BlockProducer, SealedBlockImporter, BadBlocks,
Expand Down
28 changes: 10 additions & 18 deletions ethcore/src/snapshot/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@
//! Watcher for snapshot-related chain events.

use parking_lot::Mutex;
use client::{BlockInfo, Client, ChainNotify, ChainRoute, ClientIoMessage};
use client::{BlockInfo, Client, ChainNotify, NewBlocks, ClientIoMessage};
use ids::BlockId;

use io::IoChannel;
use ethereum_types::H256;
use bytes::Bytes;

use std::{sync::Arc, time::Duration};
use std::sync::Arc;

// helper trait for transforming hashes to numbers and checking if syncing.
trait Oracle: Send + Sync {
Expand Down Expand Up @@ -99,20 +98,12 @@ impl Watcher {
}

impl ChainNotify for Watcher {
fn new_blocks(
&self,
imported: Vec<H256>,
_: Vec<H256>,
_: ChainRoute,
_: Vec<H256>,
_: Vec<Bytes>,
_duration: Duration)
{
if self.oracle.is_major_importing() { return }
fn new_blocks(&self, new_blocks: NewBlocks) {
if self.oracle.is_major_importing() || new_blocks.has_more_blocks_to_import { return }

trace!(target: "snapshot_watcher", "{} imported", imported.len());
trace!(target: "snapshot_watcher", "{} imported", new_blocks.imported.len());

let highest = imported.into_iter()
let highest = new_blocks.imported.into_iter()
.filter_map(|h| self.oracle.to_number(h))
.filter(|&num| num >= self.period + self.history)
.map(|num| num - self.history)
Expand All @@ -130,7 +121,7 @@ impl ChainNotify for Watcher {
mod tests {
use super::{Broadcast, Oracle, Watcher};

use client::{ChainNotify, ChainRoute};
use client::{ChainNotify, NewBlocks, ChainRoute};

use ethereum_types::{H256, U256};

Expand Down Expand Up @@ -170,14 +161,15 @@ mod tests {
history: history,
};

watcher.new_blocks(
watcher.new_blocks(NewBlocks::new(
hashes,
vec![],
ChainRoute::default(),
vec![],
vec![],
DURATION_ZERO,
);
false
));
}

// helper
Expand Down
23 changes: 9 additions & 14 deletions ethcore/sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use types::pruning_info::PruningInfo;
use ethereum_types::{H256, H512, U256};
use io::{TimerToken};
use ethcore::ethstore::ethkey::Secret;
use ethcore::client::{BlockChainClient, ChainNotify, ChainRoute, ChainMessageType};
use ethcore::client::{BlockChainClient, ChainNotify, NewBlocks, ChainMessageType};
use ethcore::snapshot::SnapshotService;
use ethcore::header::BlockNumber;
use sync_io::NetSyncIo;
Expand Down Expand Up @@ -498,27 +498,22 @@ impl ChainNotify for EthSync {
}
}

fn new_blocks(&self,
imported: Vec<H256>,
invalid: Vec<H256>,
route: ChainRoute,
sealed: Vec<H256>,
proposed: Vec<Bytes>,
_duration: Duration)
fn new_blocks(&self, new_blocks: NewBlocks)
{
if new_blocks.has_more_blocks_to_import { return }
use light::net::Announcement;

self.network.with_context(self.subprotocol_name, |context| {
let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain, &*self.eth_handler.snapshot_service,
&self.eth_handler.overlay);
self.eth_handler.sync.write().chain_new_blocks(
&mut sync_io,
&imported,
&invalid,
route.enacted(),
route.retracted(),
&sealed,
&proposed);
&new_blocks.imported,
&new_blocks.invalid,
new_blocks.route.enacted(),
new_blocks.route.retracted(),
&new_blocks.sealed,
&new_blocks.proposed);
});

self.network.with_context(self.light_subprotocol_name, |context| {
Expand Down
22 changes: 8 additions & 14 deletions ethcore/sync/src/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@

use std::collections::{VecDeque, HashSet, HashMap};
use std::sync::Arc;
use std::time::Duration;
use ethereum_types::H256;
use parking_lot::{RwLock, Mutex};
use bytes::Bytes;
use network::{self, PeerId, ProtocolId, PacketId, SessionInfo};
use tests::snapshot::*;
use ethcore::client::{TestBlockChainClient, BlockChainClient, Client as EthcoreClient,
ClientConfig, ChainNotify, ChainRoute, ChainMessageType, ClientIoMessage};
ClientConfig, ChainNotify, NewBlocks, ChainMessageType, ClientIoMessage};
use ethcore::header::BlockNumber;
use ethcore::snapshot::SnapshotService;
use ethcore::spec::Spec;
Expand Down Expand Up @@ -535,23 +534,18 @@ impl IoHandler<ClientIoMessage> for TestIoHandler {
}

impl ChainNotify for EthPeer<EthcoreClient> {
fn new_blocks(&self,
imported: Vec<H256>,
invalid: Vec<H256>,
route: ChainRoute,
sealed: Vec<H256>,
proposed: Vec<Bytes>,
_duration: Duration)
fn new_blocks(&self, new_blocks: NewBlocks)
{
let (enacted, retracted) = route.into_enacted_retracted();
if new_blocks.has_more_blocks_to_import { return }
let (enacted, retracted) = new_blocks.route.into_enacted_retracted();

self.new_blocks_queue.write().push_back(NewBlockMessage {
imported,
invalid,
imported: new_blocks.imported,
invalid: new_blocks.invalid,
enacted,
retracted,
sealed,
proposed,
sealed: new_blocks.sealed,
proposed: new_blocks.proposed,
});
}

Expand Down
Loading