Skip to content

Commit

Permalink
Merge pull request #1217 from subspace/switch-to-client-every-block-i…
Browse files Browse the repository at this point in the history
…mport-notification

Use the block import notification fired after the completion of block import on the domain side
  • Loading branch information
liuchengxu authored Mar 14, 2023
2 parents b07f236 + 66e1089 commit f63997f
Show file tree
Hide file tree
Showing 19 changed files with 101 additions and 125 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions crates/sc-consensus-subspace/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ where
{
/// Block number
pub block_number: NumberFor<Block>,
/// Fork choice
pub fork_choice: ForkChoiceStrategy,
/// Sender for pausing the block import when executor is not fast enough to process
/// the primary block.
pub block_import_acknowledgement_sender: mpsc::Sender<()>,
Expand Down Expand Up @@ -1168,7 +1166,6 @@ where
self.imported_block_notification_sender
.notify(move || ImportedBlockNotification {
block_number,
fork_choice,
block_import_acknowledgement_sender,
});

Expand Down
1 change: 0 additions & 1 deletion crates/subspace-fraud-proof/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ domain-test-service = { version = "0.1.0", path = "../../domains/test/service" }
futures = "0.3.26"
pallet-balances = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" }
sc-cli = { version = "0.10.0-dev", git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232", default-features = false }
sc-consensus = { version = "0.10.0-dev", git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" }
sc-service = { version = "0.10.0-dev", git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232", default-features = false }
sp-domain-digests = { version = "0.1.0", path = "../../domains/primitives/digests" }
sp-keyring = { version = "7.0.0", git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" }
Expand Down
2 changes: 0 additions & 2 deletions crates/subspace-fraud-proof/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use domain_test_service::run_primary_chain_validator_node;
use domain_test_service::runtime::Header;
use domain_test_service::Keyring::{Alice, Bob, Charlie, Dave, Ferdie};
use sc_client_api::{HeaderBackend, StorageProof};
use sc_consensus::ForkChoiceStrategy;
use sc_service::{BasePath, Role};
use sp_api::ProvideRuntimeApi;
use sp_domain_digests::AsPredigest;
Expand Down Expand Up @@ -118,7 +117,6 @@ async fn execution_proof_creation_and_verification_should_work() {
let primary_info = (
ferdie.client.info().best_hash,
ferdie.client.info().best_number,
ForkChoiceStrategy::LongestChain,
);
alice.executor.clone().process_bundles(primary_info).await;

Expand Down
1 change: 0 additions & 1 deletion crates/subspace-node/src/bin/subspace-node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,6 @@ fn main() -> Result<(), Error> {
.then(|imported_block_notification| async move {
(
imported_block_notification.block_number,
imported_block_notification.fork_choice,
imported_block_notification.block_import_acknowledgement_sender,
)
})
Expand Down
26 changes: 7 additions & 19 deletions domains/client/domain-executor/src/core_bundle_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::xdm_verifier::verify_xdm_with_system_domain_client;
use crate::TransactionFor;
use domain_runtime_primitives::{AccountId, DomainCoreApi};
use sc_client_api::{AuxStore, BlockBackend, StateBackendFor};
use sc_consensus::{BlockImport, ForkChoiceStrategy};
use sc_consensus::BlockImport;
use sp_api::{NumberFor, ProvideRuntimeApi};
use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_core::traits::CodeExecutor;
Expand Down Expand Up @@ -119,11 +119,11 @@ where
// TODO: Handle the returned error properly, ref to https://github.com/subspace/subspace/pull/695#discussion_r926721185
pub(crate) async fn process_bundles(
self,
primary_info: (PBlock::Hash, NumberFor<PBlock>, ForkChoiceStrategy),
primary_info: (PBlock::Hash, NumberFor<PBlock>),
) -> Result<(), sp_blockchain::Error> {
tracing::debug!(?primary_info, "Processing imported primary block");

let (primary_hash, primary_number, fork_choice) = primary_info;
let (primary_hash, primary_number) = primary_info;

let maybe_pending_primary_blocks = self
.domain_block_processor
Expand All @@ -142,20 +142,9 @@ where

let mut domain_parent = initial_parent;

for (i, primary_info) in primary_imports.iter().enumerate() {
// Use the origin fork_choice for the target primary block,
// the intermediate ones use `Custom(false)`.
let fork_choice = if i == primary_imports.len() - 1 {
fork_choice
} else {
ForkChoiceStrategy::Custom(false)
};

for primary_info in primary_imports {
domain_parent = self
.process_bundles_at(
(primary_info.hash, primary_info.number, fork_choice),
domain_parent,
)
.process_bundles_at((primary_info.hash, primary_info.number), domain_parent)
.await?;
}
}
Expand All @@ -165,10 +154,10 @@ where

async fn process_bundles_at(
&self,
primary_info: (PBlock::Hash, NumberFor<PBlock>, ForkChoiceStrategy),
primary_info: (PBlock::Hash, NumberFor<PBlock>),
parent_info: (Block::Hash, NumberFor<Block>),
) -> Result<(Block::Hash, NumberFor<Block>), sp_blockchain::Error> {
let (primary_hash, primary_number, fork_choice) = primary_info;
let (primary_hash, primary_number) = primary_info;
let (parent_hash, parent_number) = parent_info;

tracing::debug!(
Expand All @@ -190,7 +179,6 @@ where
(parent_hash, parent_number),
extrinsics,
maybe_new_runtime,
fork_choice,
Default::default(),
)
.await?;
Expand Down
10 changes: 5 additions & 5 deletions domains/client/domain-executor/src/core_domain_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use crate::TransactionFor;
use domain_runtime_primitives::{AccountId, DomainCoreApi};
use futures::channel::mpsc;
use futures::{future, FutureExt, Stream, StreamExt, TryFutureExt};
use sc_client_api::{AuxStore, BlockBackend, ProofProvider, StateBackendFor};
use sc_consensus::{BlockImport, ForkChoiceStrategy};
use sc_client_api::{AuxStore, BlockBackend, BlockchainEvents, ProofProvider, StateBackendFor};
use sc_consensus::BlockImport;
use sp_api::{BlockT, ProvideRuntimeApi};
use sp_block_builder::BlockBuilder;
use sp_blockchain::{HeaderBackend, HeaderMetadata};
Expand Down Expand Up @@ -107,11 +107,12 @@ pub(super) async fn start_worker<
+ HeaderMetadata<PBlock, Error = sp_blockchain::Error>
+ BlockBackend<PBlock>
+ ProvideRuntimeApi<PBlock>
+ BlockchainEvents<PBlock>
+ 'static,
PClient::Api: ExecutorApi<PBlock, Block::Hash>,
TransactionPool: sc_transaction_pool_api::TransactionPool<Block = Block> + 'static,
Backend: sc_client_api::Backend<Block> + 'static,
IBNS: Stream<Item = (NumberFor<PBlock>, ForkChoiceStrategy, mpsc::Sender<()>)> + Send + 'static,
IBNS: Stream<Item = (NumberFor<PBlock>, mpsc::Sender<()>)> + Send + 'static,
NSNS: Stream<Item = (Slot, Blake2b256Hash)> + Send + 'static,
TransactionFor<Backend, Block>: sp_trie::HashDBT<HashFor<Block>, sp_trie::DBValue>,
E: CodeExecutor,
Expand Down Expand Up @@ -140,8 +141,7 @@ pub(super) async fn start_worker<
hash,
parent_hash: _,
number,
fork_choice,
}| (hash, number, fork_choice),
}| (hash, number),
)
.collect(),
Box::pin(imported_block_notification_stream),
Expand Down
8 changes: 3 additions & 5 deletions domains/client/domain-executor/src/core_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use crate::{active_leaves, EssentialExecutorParams, TransactionFor};
use domain_runtime_primitives::{AccountId, DomainCoreApi};
use futures::channel::mpsc;
use futures::{FutureExt, Stream};
use sc_client_api::{AuxStore, BlockBackend, ProofProvider, StateBackendFor};
use sc_consensus::ForkChoiceStrategy;
use sc_client_api::{AuxStore, BlockBackend, BlockchainEvents, ProofProvider, StateBackendFor};
use sp_api::ProvideRuntimeApi;
use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_consensus::SelectChain;
Expand Down Expand Up @@ -78,6 +77,7 @@ where
+ HeaderMetadata<PBlock, Error = sp_blockchain::Error>
+ BlockBackend<PBlock>
+ ProvideRuntimeApi<PBlock>
+ BlockchainEvents<PBlock>
+ Send
+ Sync
+ 'static,
Expand Down Expand Up @@ -108,9 +108,7 @@ where
where
SE: SpawnEssentialNamed,
SC: SelectChain<PBlock>,
IBNS: Stream<Item = (NumberFor<PBlock>, ForkChoiceStrategy, mpsc::Sender<()>)>
+ Send
+ 'static,
IBNS: Stream<Item = (NumberFor<PBlock>, mpsc::Sender<()>)> + Send + 'static,
NSNS: Stream<Item = (Slot, Blake2b256Hash)> + Send + 'static,
{
let active_leaves =
Expand Down
7 changes: 6 additions & 1 deletion domains/client/domain-executor/src/domain_block_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,6 @@ where
(parent_hash, parent_number): (Block::Hash, NumberFor<Block>),
extrinsics: Vec<Block::Extrinsic>,
maybe_new_runtime: Option<Cow<'static, [u8]>>,
fork_choice: ForkChoiceStrategy,
digests: Digest,
) -> Result<DomainBlockResult<Block, PBlock>, sp_blockchain::Error> {
let primary_number = to_number_primitive(primary_number);
Expand All @@ -378,6 +377,12 @@ where
))));
}

// Although the domain block intuitively ought to use the same fork choice
// from the corresponding primary block, it's fine to forcibly always use
// the longest chain for simplicity as we manually build all the domain
// branches by literally following the primary chain branches anyway.
let fork_choice = ForkChoiceStrategy::LongestChain;

let (header_hash, header_number, state_root) = self
.build_and_import_block(
parent_hash,
Expand Down
90 changes: 56 additions & 34 deletions domains/client/domain-executor/src/domain_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use crate::utils::{to_number_primitive, BlockInfo, ExecutorSlotInfo};
use codec::{Decode, Encode};
use futures::channel::mpsc;
use futures::{SinkExt, Stream, StreamExt};
use sc_client_api::BlockBackend;
use sc_consensus::ForkChoiceStrategy;
use sc_client_api::{BlockBackend, BlockchainEvents};
use sp_api::{ApiError, BlockT, ProvideRuntimeApi};
use sp_blockchain::HeaderBackend;
use sp_domains::{ExecutorApi, SignedOpaqueBundle};
Expand Down Expand Up @@ -57,31 +56,34 @@ pub(crate) async fn handle_block_import_notifications<
primary_chain_client: &PClient,
best_domain_number: NumberFor<Block>,
processor: ProcessorFn,
mut leaves: Vec<(PBlock::Hash, NumberFor<PBlock>, ForkChoiceStrategy)>,
mut leaves: Vec<(PBlock::Hash, NumberFor<PBlock>)>,
mut block_imports: BlockImports,
block_import_throttling_buffer_size: u32,
) where
Block: BlockT,
PBlock: BlockT,
PClient: HeaderBackend<PBlock> + BlockBackend<PBlock> + ProvideRuntimeApi<PBlock>,
PClient: HeaderBackend<PBlock>
+ BlockBackend<PBlock>
+ ProvideRuntimeApi<PBlock>
+ BlockchainEvents<PBlock>,
PClient::Api: ExecutorApi<PBlock, Block::Hash>,
ProcessorFn: Fn(
(PBlock::Hash, NumberFor<PBlock>, ForkChoiceStrategy),
(PBlock::Hash, NumberFor<PBlock>),
) -> Pin<Box<dyn Future<Output = Result<(), sp_blockchain::Error>> + Send>>
+ Send
+ Sync,
BlockImports: Stream<Item = (NumberFor<PBlock>, ForkChoiceStrategy, mpsc::Sender<()>)> + Unpin,
BlockImports: Stream<Item = (NumberFor<PBlock>, mpsc::Sender<()>)> + Unpin,
{
let mut active_leaves = HashMap::with_capacity(leaves.len());

let best_domain_number = to_number_primitive(best_domain_number);

// Notify about active leaves on startup before starting the loop
for (hash, number, fork_choice) in std::mem::take(&mut leaves) {
for (hash, number) in std::mem::take(&mut leaves) {
let _ = active_leaves.insert(hash, number);
// Skip the blocks that have been processed by the execution chain.
if number > best_domain_number.into() {
if let Err(error) = processor((hash, number, fork_choice)).await {
if let Err(error) = processor((hash, number)).await {
tracing::error!(?error, "Failed to process primary block on startup");
// Bring down the service as bundles processor is an essential task.
// TODO: more graceful shutdown.
Expand All @@ -90,48 +92,68 @@ pub(crate) async fn handle_block_import_notifications<
}
}

// Pause the primary block import once this channel is full.
// The primary chain can be ahead of the domain by up to `block_import_throttling_buffer_size/2`
// blocks, for there are two notifications per block sent to this buffer (one will be actually
// consumed by the domain processor, the other from `sc-consensus-subspace` is used to discontinue
// the primary block import in case the primary chain runs much faster than the domain.).
let (mut block_info_sender, mut block_info_receiver) =
mpsc::channel(block_import_throttling_buffer_size as usize);

let mut client_block_import = primary_chain_client.every_import_notification_stream();

loop {
tokio::select! {
maybe_block_import = block_imports.next() => {
let (block_number, fork_choice, mut block_import_acknowledgement_sender) = match maybe_block_import {
maybe_client_block_import = client_block_import.next() => {
let notification = match maybe_client_block_import {
Some(block_import) => block_import,
None => {
// Can be None on graceful shutdown.
break;
}
};
// TODO: `.expect()` on `Option` is fine here, but not for `Error`
let header = primary_chain_client
.header(
primary_chain_client.hash(block_number)
.expect("Header of imported block must exist; qed")
.expect("Header of imported block must exist; qed")
)
.expect("Header of imported block must exist; qed")
.expect("Header of imported block must exist; qed");
let header = match primary_chain_client.header(notification.hash) {
Ok(Some(header)) => header,
res => {
tracing::error!(
result = ?res,
header = ?notification.header,
"Imported primary block header not found",
);
return;
}
};
let block_info = BlockInfo {
hash: header.hash(),
parent_hash: *header.parent_hash(),
number: *header.number(),
fork_choice
};
let _ = block_info_sender.feed(block_info).await;
let _ = block_info_sender.feed(Some(block_info)).await;
}
maybe_subspace_block_import = block_imports.next() => {
let (_block_number, mut block_import_acknowledgement_sender) =
match maybe_subspace_block_import {
Some(block_import) => block_import,
None => {
// Can be None on graceful shutdown.
break;
}
};
// Pause the primary block import when the sink is full.
let _ = block_info_sender.feed(None).await;
let _ = block_import_acknowledgement_sender.send(()).await;
}
Some(block_info) = block_info_receiver.next() => {
if let Err(error) = block_imported::<Block, PBlock, _>(
&processor,
&mut active_leaves,
block_info,
).await {
tracing::error!(?error, "Failed to process primary block");
// Bring down the service as bundles processor is an essential task.
// TODO: more graceful shutdown.
break;
Some(maybe_block_info) = block_info_receiver.next() => {
if let Some(block_info) = maybe_block_info {
if let Err(error) = block_imported::<Block, PBlock, _>(
&processor,
&mut active_leaves,
block_info,
).await {
tracing::error!(?error, "Failed to process primary block");
// Bring down the service as bundles processor is an essential task.
// TODO: more graceful shutdown.
break;
}
}
}
}
Expand Down Expand Up @@ -193,7 +215,7 @@ where
Block: BlockT,
PBlock: BlockT,
ProcessorFn: Fn(
(PBlock::Hash, NumberFor<PBlock>, ForkChoiceStrategy),
(PBlock::Hash, NumberFor<PBlock>),
) -> Pin<Box<dyn Future<Output = Result<(), sp_blockchain::Error>> + Send>>
+ Send
+ Sync,
Expand All @@ -210,7 +232,7 @@ where
debug_assert_eq!(block_info.number.saturating_sub(One::one()), number);
}

processor((block_info.hash, block_info.number, block_info.fork_choice)).await?;
processor((block_info.hash, block_info.number)).await?;

Ok(())
}
Loading

0 comments on commit f63997f

Please sign in to comment.