Skip to content

Commit

Permalink
A0-3584: Create NetworkEventStream earlier (#1519)
Browse files Browse the repository at this point in the history
# Description

There was a race condition, making our e2e test fail randomly because
one node was not finalizing. Namely:
```
    task_manager.spawn_essential_handle().spawn_blocking(
        "aleph",
        None,
        run_validator_node(aleph_config), // registers notification channel from substrate network
    );
    network_starter.start_network(); // start substrate network
```
And the registration could have executed after starting the network,
causing missing some notifications.

This PR moves registering `NetworkEventStream` rx so that now it is
always created before starting the network, and we get all notifications
about `StreamOpened`.

In `run_validator_node` we create also other notification channels from
other network components, but I think the only event that cannot be
missed is the mentioned `StreamOpened` event.

Since my proposed solution exposes our facade for `SubstrateNetwork` and
creates that component in `bin/`, I also simplified `AlephConfig` a bit.

## Type of change

Please delete options that are not relevant.

- Bug fix (non-breaking change which fixes an issue)

# Checklist:

# Notes:
After changes, the finalization likely goes smooth (at least my local
~11~ quite a few runs were all healthy). However, validators still
cannot be boot nodes at the same time - in such cases we still get
random stalls.

![image](https://github.com/Cardinal-Cryptography/aleph-node/assets/31205678/5d37b285-7024-4b62-a2c2-ae14491d35de)
  • Loading branch information
ggawryal authored Dec 4, 2023
1 parent a892e48 commit 685e33f
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 91 deletions.
58 changes: 25 additions & 33 deletions bin/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,14 @@ use finality_aleph::{
run_validator_node, AlephBlockImport, AlephConfig, AllBlockMetrics, BlockImporter,
DefaultClock, Justification, JustificationTranslator, MillisecsPerBlock, Protocol,
ProtocolNaming, RateLimiterConfig, RedirectingBlockImport, SessionPeriod, SubstrateChainStatus,
SyncOracle, TimingBlockMetrics, TracingBlockImport, ValidatorAddressCache,
SubstrateNetwork, SyncOracle, TimingBlockMetrics, TracingBlockImport, ValidatorAddressCache,
};
use futures::channel::mpsc;
use log::warn;
use sc_client_api::{BlockBackend, HeaderBackend};
use sc_consensus::ImportQueue;
use sc_consensus_aura::{ImportQueueParams, SlotProportion, StartAuraParams};
use sc_consensus_slots::BackoffAuthoringBlocksStrategy;
use sc_network::NetworkService;
use sc_network_sync::SyncingService;
use sc_service::{
error::Error as ServiceError, Configuration, KeystoreContainer, NetworkStarter, RpcHandlers,
TFullClient, TaskManager,
Expand Down Expand Up @@ -218,9 +216,7 @@ fn setup(
) -> Result<
(
RpcHandlers,
Arc<NetworkService<Block, BlockHash>>,
Arc<SyncingService<Block>>,
ProtocolNaming,
SubstrateNetwork<Block, BlockHash>,
NetworkStarter,
SyncOracle,
Option<ValidatorAddressCache>,
Expand Down Expand Up @@ -302,11 +298,11 @@ fn setup(
telemetry: telemetry.as_mut(),
})?;

let substrate_network = SubstrateNetwork::new(network, sync_network, protocol_naming);

Ok((
rpc_handlers,
network,
sync_network,
protocol_naming,
substrate_network,
network_starter,
sync_oracle,
validator_address_cache,
Expand Down Expand Up @@ -351,27 +347,20 @@ pub fn new_authority(

let collect_extra_debugging_data = !aleph_config.no_collection_of_extra_debugging_data();

let (
_rpc_handlers,
network,
sync_network,
protocol_naming,
network_starter,
sync_oracle,
validator_address_cache,
) = setup(
config,
backend,
chain_status.clone(),
&keystore_container,
import_queue,
transaction_pool.clone(),
&mut task_manager,
client.clone(),
&mut telemetry,
justification_tx,
collect_extra_debugging_data,
)?;
let (_rpc_handlers, substrate_network, network_starter, sync_oracle, validator_address_cache) =
setup(
config,
backend,
chain_status.clone(),
&keystore_container,
import_queue,
transaction_pool.clone(),
&mut task_manager,
client.clone(),
&mut telemetry,
justification_tx,
collect_extra_debugging_data,
)?;

let mut proposer_factory = sc_basic_authorship::ProposerFactory::new(
task_manager.spawn_handle(),
Expand Down Expand Up @@ -429,9 +418,13 @@ pub fn new_authority(
.unwrap_or(usize::MAX),
};

// Network event stream needs to be created before starting the network,
// otherwise some events might be missed.
let network_event_stream = substrate_network.event_stream();

let aleph_config = AlephConfig {
network,
sync_network,
network: substrate_network,
network_event_stream,
client,
chain_status,
import_queue_handle,
Expand All @@ -448,7 +441,6 @@ pub fn new_authority(
backup_saving_path: backup_path,
external_addresses: aleph_config.external_addresses(),
validator_port: aleph_config.validator_port(),
protocol_naming,
rate_limiter_config,
sync_oracle,
validator_address_cache,
Expand Down
9 changes: 3 additions & 6 deletions finality-aleph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ use sc_client_api::{
Backend, BlockBackend, BlockchainEvents, Finalizer, LockImportRun, StorageProvider,
};
use sc_consensus::BlockImport;
use sc_network::NetworkService;
use sc_network_sync::SyncingService;
use sp_api::ProvideRuntimeApi;
use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_keystore::Keystore;
Expand Down Expand Up @@ -72,7 +70,7 @@ pub use crate::{
metrics::{AllBlockMetrics, DefaultClock, TimingBlockMetrics},
network::{
address_cache::{ValidatorAddressCache, ValidatorAddressingInfo},
Protocol, ProtocolNaming,
Protocol, ProtocolNaming, SubstrateNetwork, SubstrateNetworkEventStream,
},
nodes::run_validator_node,
session::SessionPeriod,
Expand Down Expand Up @@ -251,8 +249,8 @@ pub struct RateLimiterConfig {
}

pub struct AlephConfig<C, SC> {
pub network: Arc<NetworkService<AlephBlock, AlephHash>>,
pub sync_network: Arc<SyncingService<AlephBlock>>,
pub network: SubstrateNetwork<AlephBlock, AlephHash>,
pub network_event_stream: SubstrateNetworkEventStream<AlephBlock, AlephHash>,
pub client: Arc<C>,
pub chain_status: SubstrateChainStatus,
pub import_queue_handle: BlockImporter,
Expand All @@ -269,7 +267,6 @@ pub struct AlephConfig<C, SC> {
pub backup_saving_path: Option<PathBuf>,
pub external_addresses: Vec<String>,
pub validator_port: u16,
pub protocol_naming: ProtocolNaming,
pub rate_limiter_config: RateLimiterConfig,
pub sync_oracle: SyncOracle,
pub validator_address_cache: Option<ValidatorAddressCache>,
Expand Down
21 changes: 10 additions & 11 deletions finality-aleph/src/network/gossip/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,6 @@ impl RawNetwork for MockRawNetwork {
type SenderError = MockSenderError;
type NetworkSender = MockNetworkSender;
type PeerId = MockPublicKey;
type EventStream = MockEventStream;

fn event_stream(&self) -> Self::EventStream {
let (tx, rx) = mpsc::unbounded();
self.event_sinks.lock().push(tx);
// Necessary for tests to detect when service takes event_stream
if let Some(tx) = self.event_stream_taken_oneshot.lock().take() {
tx.send(()).unwrap();
}
MockEventStream(rx)
}

fn sender(
&self,
Expand Down Expand Up @@ -113,6 +102,16 @@ impl MockRawNetwork {
}
}

pub fn event_stream(&self) -> MockEventStream {
let (tx, rx) = mpsc::unbounded();
self.event_sinks.lock().push(tx);
// Necessary for tests to detect when service takes event_stream
if let Some(tx) = self.event_stream_taken_oneshot.lock().take() {
tx.send(()).unwrap();
}
MockEventStream(rx)
}

pub fn emit_event(&mut self, event: MockEvent) {
for sink in &*self.event_sinks.lock() {
sink.unbounded_send(event.clone()).unwrap();
Expand Down
4 changes: 0 additions & 4 deletions finality-aleph/src/network/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,6 @@ pub trait RawNetwork: Clone + Send + Sync + 'static {
type SenderError: std::error::Error;
type NetworkSender: NetworkSender;
type PeerId: Clone + Debug + Eq + Hash + Send + 'static;
type EventStream: EventStream<Self::PeerId>;

/// Returns a stream of events representing what happens on the network.
fn event_stream(&self) -> Self::EventStream;

/// Returns a sender to the given peer using a given protocol. Returns Error if not connected to the peer.
fn sender(
Expand Down
27 changes: 15 additions & 12 deletions finality-aleph/src/network/gossip/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ enum Command<D: Data, P: Clone + Debug + Eq + Hash + Send + 'static> {
/// 1. Messages are forwarded to the user.
/// 2. Various forms of (dis)connecting, keeping track of all currently connected nodes.
/// 3. Outgoing messages, sending them out, using 1.2. to broadcast.
pub struct Service<N: RawNetwork, AD: Data, BSD: Data> {
pub struct Service<N: RawNetwork, ES: EventStream<N::PeerId>, AD: Data, BSD: Data> {
network: N,
messages_from_authentication_user: mpsc::UnboundedReceiver<Command<AD, N::PeerId>>,
messages_from_block_sync_user: mpsc::UnboundedReceiver<Command<BSD, N::PeerId>>,
Expand All @@ -52,6 +52,7 @@ pub struct Service<N: RawNetwork, AD: Data, BSD: Data> {
spawn_handle: SpawnHandle,
metrics: Metrics,
timestamp_of_last_log_that_channel_is_full: HashMap<(N::PeerId, Protocol), Instant>,
network_event_stream: ES,
}

struct ServiceInterface<D: Data, P: Clone + Debug + Eq + Hash + Send + 'static> {
Expand Down Expand Up @@ -117,9 +118,10 @@ enum SendError {
SendingFailed,
}

impl<N: RawNetwork, AD: Data, BSD: Data> Service<N, AD, BSD> {
impl<N: RawNetwork, ES: EventStream<N::PeerId>, AD: Data, BSD: Data> Service<N, ES, AD, BSD> {
pub fn new(
network: N,
network_event_stream: ES,
spawn_handle: SpawnHandle,
metrics_registry: Option<Registry>,
) -> (
Expand Down Expand Up @@ -154,6 +156,7 @@ impl<N: RawNetwork, AD: Data, BSD: Data> Service<N, AD, BSD> {
block_sync_connected_peers: HashSet::new(),
block_sync_peer_senders: HashMap::new(),
timestamp_of_last_log_that_channel_is_full: HashMap::new(),
network_event_stream,
},
ServiceInterface {
messages_from_service: messages_from_authentication_service,
Expand Down Expand Up @@ -501,12 +504,10 @@ impl<N: RawNetwork, AD: Data, BSD: Data> Service<N, AD, BSD> {
}

pub async fn run(mut self) {
let mut events_from_network = self.network.event_stream();

let mut status_ticker = time::interval(STATUS_REPORT_INTERVAL);
loop {
tokio::select! {
maybe_event = events_from_network.next_event() => match maybe_event {
maybe_event = self.network_event_stream.next_event() => match maybe_event {
Some(event) => if self.handle_network_event(event).is_err() {
error!(target: LOG_TARGET, "Cannot forward messages to user.");
return;
Expand Down Expand Up @@ -555,7 +556,7 @@ mod tests {
use super::{Error, SendError, Service};
use crate::network::{
gossip::{
mock::{MockEvent, MockRawNetwork, MockSenderError},
mock::{MockEvent, MockEventStream, MockRawNetwork, MockSenderError},
Network,
},
mock::MockData,
Expand All @@ -567,7 +568,7 @@ mod tests {
pub struct TestData {
pub network: MockRawNetwork,
gossip_network: Box<dyn Network<MockData, Error = Error, PeerId = MockPublicKey>>,
pub service: Service<MockRawNetwork, MockData, MockData>,
pub service: Service<MockRawNetwork, MockEventStream, MockData, MockData>,
// `TaskManager` can't be dropped for `SpawnTaskHandle` to work
_task_manager: TaskManager,
// If we drop the sync network, the underlying network service dies, stopping the whole
Expand All @@ -579,13 +580,16 @@ mod tests {
fn prepare() -> Self {
let task_manager = TaskManager::new(Handle::current(), None).unwrap();

// Event stream will never be taken, so we can drop the receiver
let (event_stream_oneshot_tx, _) = oneshot::channel();
let (event_stream_oneshot_tx, _event_stream_oneshot_rx) = oneshot::channel();

// Prepare service
let network = MockRawNetwork::new(event_stream_oneshot_tx);
let (service, gossip_network, other_network) =
Service::new(network.clone(), task_manager.spawn_handle().into(), None);
let (service, gossip_network, other_network) = Service::new(
network.clone(),
network.event_stream(),
task_manager.spawn_handle().into(),
None,
);
let gossip_network = Box::new(gossip_network);
let other_network = Box::new(other_network);

Expand Down Expand Up @@ -787,7 +791,6 @@ mod tests {

let expected = (message_2.encode(), peer_id, PROTOCOL);

println!("just before");
assert_eq!(
test_data
.network
Expand Down
4 changes: 3 additions & 1 deletion finality-aleph/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ pub use gossip::{
Error as GossipError, Network as GossipNetwork, Protocol, Service as GossipService,
};
use network_clique::{AddressingInformation, NetworkIdentity, PeerId};
pub use substrate::{ProtocolNaming, SubstrateNetwork};
pub use substrate::{
NetworkEventStream as SubstrateNetworkEventStream, ProtocolNaming, SubstrateNetwork,
};

/// A basic alias for properties we expect basic data to satisfy.
pub trait Data: Clone + Codec + Send + Sync + 'static {}
Expand Down
30 changes: 11 additions & 19 deletions finality-aleph/src/network/substrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@ use log::{error, trace, warn};
use sc_network::{
multiaddr::Protocol as MultiaddressProtocol, Event as SubstrateEvent, Multiaddr,
NetworkEventStream as _, NetworkNotification, NetworkPeers, NetworkService,
NotificationSenderT, PeerId, ProtocolName,
};
use sc_network_common::{
sync::{SyncEvent, SyncEventStream},
ExHashT,
NotificationSenderT, PeerId, ProtocolName, SyncEventStream,
};
use sc_network_common::{sync::SyncEvent, ExHashT};
use sc_network_sync::SyncingService;
use sp_runtime::traits::Block;
use tokio::select;
Expand Down Expand Up @@ -237,26 +234,21 @@ impl<B: Block, H: ExHashT> SubstrateNetwork<B, H> {
naming,
}
}
}

impl<B: Block, H: ExHashT> RawNetwork for SubstrateNetwork<B, H> {
type SenderError = SenderError;
type NetworkSender = SubstrateNetworkSender;
type PeerId = PeerId;
type EventStream = NetworkEventStream<B, H>;

fn event_stream(&self) -> Self::EventStream {
pub fn event_stream(&self) -> NetworkEventStream<B, H> {
NetworkEventStream {
stream: Box::pin(self.network.as_ref().event_stream("aleph-network")),
sync_stream: Box::pin(
self.sync_network
.as_ref()
.event_stream("aleph-syncing-network"),
),
stream: Box::pin(self.network.event_stream("aleph-network")),
sync_stream: Box::pin(self.sync_network.event_stream("aleph-syncing-network")),
naming: self.naming.clone(),
network: self.network.clone(),
}
}
}

impl<B: Block, H: ExHashT> RawNetwork for SubstrateNetwork<B, H> {
type SenderError = SenderError;
type NetworkSender = SubstrateNetworkSender;
type PeerId = PeerId;

fn sender(
&self,
Expand Down
8 changes: 4 additions & 4 deletions finality-aleph/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
address_cache::validator_address_cache_updater,
session::{ConnectionManager, ConnectionManagerConfig},
tcp::{new_tcp_network, KEY_TYPE},
GossipService, SubstrateNetwork,
GossipService,
},
party::{
impls::ChainStateImpl, manager::NodeSessionManagerImpl, ConsensusParty,
Expand Down Expand Up @@ -62,7 +62,7 @@ where
{
let AlephConfig {
network,
sync_network,
network_event_stream,
client,
chain_status,
mut import_queue_handle,
Expand All @@ -79,7 +79,6 @@ where
backup_saving_path,
external_addresses,
validator_port,
protocol_naming,
rate_limiter_config,
sync_oracle,
validator_address_cache,
Expand Down Expand Up @@ -122,7 +121,8 @@ where
});

let (gossip_network_service, authentication_network, block_sync_network) = GossipService::new(
SubstrateNetwork::new(network.clone(), sync_network.clone(), protocol_naming),
network,
network_event_stream,
spawn_handle.clone(),
registry.clone(),
);
Expand Down
3 changes: 2 additions & 1 deletion finality-aleph/src/testing/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ async fn prepare_one_session_test_data() -> TestData {
let network = MockRawNetwork::new(event_stream_tx);
let validator_network = MockCliqueNetwork::new();

let (gossip_service, gossip_network, sync_network) = GossipService::<_, _, MockData>::new(
let (gossip_service, gossip_network, sync_network) = GossipService::<_, _, _, MockData>::new(
network.clone(),
network.event_stream(),
task_manager.spawn_handle().into(),
None,
);
Expand Down

0 comments on commit 685e33f

Please sign in to comment.