Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the block import notification fired after the completion of block import on the domain side #1217

Merged
merged 5 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions crates/sc-consensus-subspace/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ where
{
/// Block number
pub block_number: NumberFor<Block>,
/// Fork choice
pub fork_choice: ForkChoiceStrategy,
/// Sender for pausing the block import when executor is not fast enough to process
/// the primary block.
pub block_import_acknowledgement_sender: mpsc::Sender<()>,
Expand Down Expand Up @@ -1168,7 +1166,6 @@ where
self.imported_block_notification_sender
.notify(move || ImportedBlockNotification {
block_number,
fork_choice,
block_import_acknowledgement_sender,
});

Expand Down
1 change: 0 additions & 1 deletion crates/subspace-fraud-proof/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ domain-test-service = { version = "0.1.0", path = "../../domains/test/service" }
futures = "0.3.26"
pallet-balances = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" }
sc-cli = { version = "0.10.0-dev", git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232", default-features = false }
sc-consensus = { version = "0.10.0-dev", git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" }
sc-service = { version = "0.10.0-dev", git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232", default-features = false }
sp-domain-digests = { version = "0.1.0", path = "../../domains/primitives/digests" }
sp-keyring = { version = "7.0.0", git = "https://github.com/subspace/substrate", rev = "456cfad45a178617f6886ec400c312f2fea59232" }
Expand Down
2 changes: 0 additions & 2 deletions crates/subspace-fraud-proof/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use domain_test_service::run_primary_chain_validator_node;
use domain_test_service::runtime::Header;
use domain_test_service::Keyring::{Alice, Bob, Charlie, Dave, Ferdie};
use sc_client_api::{HeaderBackend, StorageProof};
use sc_consensus::ForkChoiceStrategy;
use sc_service::{BasePath, Role};
use sp_api::ProvideRuntimeApi;
use sp_domain_digests::AsPredigest;
Expand Down Expand Up @@ -117,7 +116,6 @@ async fn execution_proof_creation_and_verification_should_work() {
let primary_info = (
ferdie.client.info().best_hash,
ferdie.client.info().best_number,
ForkChoiceStrategy::LongestChain,
);
alice.executor.clone().process_bundles(primary_info).await;

Expand Down
1 change: 0 additions & 1 deletion crates/subspace-node/src/bin/subspace-node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,6 @@ fn main() -> Result<(), Error> {
.then(|imported_block_notification| async move {
(
imported_block_notification.block_number,
imported_block_notification.fork_choice,
imported_block_notification.block_import_acknowledgement_sender,
)
})
Expand Down
26 changes: 7 additions & 19 deletions domains/client/domain-executor/src/core_bundle_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::xdm_verifier::verify_xdm_with_system_domain_client;
use crate::TransactionFor;
use domain_runtime_primitives::{AccountId, DomainCoreApi};
use sc_client_api::{AuxStore, BlockBackend, StateBackendFor};
use sc_consensus::{BlockImport, ForkChoiceStrategy};
use sc_consensus::BlockImport;
use sp_api::{NumberFor, ProvideRuntimeApi};
use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_core::traits::CodeExecutor;
Expand Down Expand Up @@ -119,11 +119,11 @@ where
// TODO: Handle the returned error properly, ref to https://github.com/subspace/subspace/pull/695#discussion_r926721185
pub(crate) async fn process_bundles(
self,
primary_info: (PBlock::Hash, NumberFor<PBlock>, ForkChoiceStrategy),
primary_info: (PBlock::Hash, NumberFor<PBlock>),
) -> Result<(), sp_blockchain::Error> {
tracing::debug!(?primary_info, "Processing imported primary block");

let (primary_hash, primary_number, fork_choice) = primary_info;
let (primary_hash, primary_number) = primary_info;

let maybe_pending_primary_blocks = self
.domain_block_processor
Expand All @@ -142,20 +142,9 @@ where

let mut domain_parent = initial_parent;

for (i, primary_info) in primary_imports.iter().enumerate() {
// Use the origin fork_choice for the target primary block,
// the intermediate ones use `Custom(false)`.
let fork_choice = if i == primary_imports.len() - 1 {
fork_choice
} else {
ForkChoiceStrategy::Custom(false)
};

for primary_info in primary_imports {
domain_parent = self
.process_bundles_at(
(primary_info.hash, primary_info.number, fork_choice),
domain_parent,
)
.process_bundles_at((primary_info.hash, primary_info.number), domain_parent)
.await?;
}
}
Expand All @@ -165,10 +154,10 @@ where

async fn process_bundles_at(
&self,
primary_info: (PBlock::Hash, NumberFor<PBlock>, ForkChoiceStrategy),
primary_info: (PBlock::Hash, NumberFor<PBlock>),
parent_info: (Block::Hash, NumberFor<Block>),
) -> Result<(Block::Hash, NumberFor<Block>), sp_blockchain::Error> {
let (primary_hash, primary_number, fork_choice) = primary_info;
let (primary_hash, primary_number) = primary_info;
let (parent_hash, parent_number) = parent_info;

tracing::debug!(
Expand All @@ -190,7 +179,6 @@ where
(parent_hash, parent_number),
extrinsics,
maybe_new_runtime,
fork_choice,
Default::default(),
)
.await?;
Expand Down
10 changes: 5 additions & 5 deletions domains/client/domain-executor/src/core_domain_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use crate::TransactionFor;
use domain_runtime_primitives::{AccountId, DomainCoreApi};
use futures::channel::mpsc;
use futures::{future, FutureExt, Stream, StreamExt, TryFutureExt};
use sc_client_api::{AuxStore, BlockBackend, ProofProvider, StateBackendFor};
use sc_consensus::{BlockImport, ForkChoiceStrategy};
use sc_client_api::{AuxStore, BlockBackend, BlockchainEvents, ProofProvider, StateBackendFor};
use sc_consensus::BlockImport;
use sp_api::{BlockT, ProvideRuntimeApi};
use sp_block_builder::BlockBuilder;
use sp_blockchain::{HeaderBackend, HeaderMetadata};
Expand Down Expand Up @@ -107,11 +107,12 @@ pub(super) async fn start_worker<
+ HeaderMetadata<PBlock, Error = sp_blockchain::Error>
+ BlockBackend<PBlock>
+ ProvideRuntimeApi<PBlock>
+ BlockchainEvents<PBlock>
+ 'static,
PClient::Api: ExecutorApi<PBlock, Block::Hash>,
TransactionPool: sc_transaction_pool_api::TransactionPool<Block = Block> + 'static,
Backend: sc_client_api::Backend<Block> + 'static,
IBNS: Stream<Item = (NumberFor<PBlock>, ForkChoiceStrategy, mpsc::Sender<()>)> + Send + 'static,
IBNS: Stream<Item = (NumberFor<PBlock>, mpsc::Sender<()>)> + Send + 'static,
NSNS: Stream<Item = (Slot, Blake2b256Hash)> + Send + 'static,
TransactionFor<Backend, Block>: sp_trie::HashDBT<HashFor<Block>, sp_trie::DBValue>,
E: CodeExecutor,
Expand Down Expand Up @@ -140,8 +141,7 @@ pub(super) async fn start_worker<
hash,
parent_hash: _,
number,
fork_choice,
}| (hash, number, fork_choice),
}| (hash, number),
)
.collect(),
Box::pin(imported_block_notification_stream),
Expand Down
8 changes: 3 additions & 5 deletions domains/client/domain-executor/src/core_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use crate::{active_leaves, EssentialExecutorParams, TransactionFor};
use domain_runtime_primitives::{AccountId, DomainCoreApi};
use futures::channel::mpsc;
use futures::{FutureExt, Stream};
use sc_client_api::{AuxStore, BlockBackend, ProofProvider, StateBackendFor};
use sc_consensus::ForkChoiceStrategy;
use sc_client_api::{AuxStore, BlockBackend, BlockchainEvents, ProofProvider, StateBackendFor};
use sp_api::ProvideRuntimeApi;
use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_consensus::SelectChain;
Expand Down Expand Up @@ -78,6 +77,7 @@ where
+ HeaderMetadata<PBlock, Error = sp_blockchain::Error>
+ BlockBackend<PBlock>
+ ProvideRuntimeApi<PBlock>
+ BlockchainEvents<PBlock>
+ Send
+ Sync
+ 'static,
Expand Down Expand Up @@ -108,9 +108,7 @@ where
where
SE: SpawnEssentialNamed,
SC: SelectChain<PBlock>,
IBNS: Stream<Item = (NumberFor<PBlock>, ForkChoiceStrategy, mpsc::Sender<()>)>
+ Send
+ 'static,
IBNS: Stream<Item = (NumberFor<PBlock>, mpsc::Sender<()>)> + Send + 'static,
NSNS: Stream<Item = (Slot, Blake2b256Hash)> + Send + 'static,
{
let active_leaves =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,6 @@ where
(parent_hash, parent_number): (Block::Hash, NumberFor<Block>),
extrinsics: Vec<Block::Extrinsic>,
maybe_new_runtime: Option<Cow<'static, [u8]>>,
fork_choice: ForkChoiceStrategy,
digests: Digest,
) -> Result<DomainBlockResult<Block, PBlock>, sp_blockchain::Error> {
let primary_number = to_number_primitive(primary_number);
Expand All @@ -378,6 +377,12 @@ where
))));
}

// Although the domain block intuitively ought to use the same fork choice
// from the corresponding primary block, it's fine to forcibly always use
// the longest chain for simplicity as we manually build all the domain
// branches by literally following the primary chain branches anyway.
let fork_choice = ForkChoiceStrategy::LongestChain;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remind me: do we finalize on execution chain the same blocks that are finalized on the primary chain?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we don't follow the finalize event from primary chain right now. We briefly talked about this before, but never made it on the list :P, tracked in #1246 now.


let (header_hash, header_number, state_root) = self
.build_and_import_block(
parent_hash,
Expand Down
90 changes: 56 additions & 34 deletions domains/client/domain-executor/src/domain_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use crate::utils::{to_number_primitive, BlockInfo, ExecutorSlotInfo};
use codec::{Decode, Encode};
use futures::channel::mpsc;
use futures::{SinkExt, Stream, StreamExt};
use sc_client_api::BlockBackend;
use sc_consensus::ForkChoiceStrategy;
use sc_client_api::{BlockBackend, BlockchainEvents};
use sp_api::{ApiError, BlockT, ProvideRuntimeApi};
use sp_blockchain::HeaderBackend;
use sp_domains::{ExecutorApi, SignedOpaqueBundle};
Expand Down Expand Up @@ -57,31 +56,34 @@ pub(crate) async fn handle_block_import_notifications<
primary_chain_client: &PClient,
best_domain_number: NumberFor<Block>,
processor: ProcessorFn,
mut leaves: Vec<(PBlock::Hash, NumberFor<PBlock>, ForkChoiceStrategy)>,
mut leaves: Vec<(PBlock::Hash, NumberFor<PBlock>)>,
mut block_imports: BlockImports,
block_import_throttling_buffer_size: u32,
) where
Block: BlockT,
PBlock: BlockT,
PClient: HeaderBackend<PBlock> + BlockBackend<PBlock> + ProvideRuntimeApi<PBlock>,
PClient: HeaderBackend<PBlock>
+ BlockBackend<PBlock>
+ ProvideRuntimeApi<PBlock>
+ BlockchainEvents<PBlock>,
PClient::Api: ExecutorApi<PBlock, Block::Hash>,
ProcessorFn: Fn(
(PBlock::Hash, NumberFor<PBlock>, ForkChoiceStrategy),
(PBlock::Hash, NumberFor<PBlock>),
) -> Pin<Box<dyn Future<Output = Result<(), sp_blockchain::Error>> + Send>>
+ Send
+ Sync,
BlockImports: Stream<Item = (NumberFor<PBlock>, ForkChoiceStrategy, mpsc::Sender<()>)> + Unpin,
BlockImports: Stream<Item = (NumberFor<PBlock>, mpsc::Sender<()>)> + Unpin,
{
let mut active_leaves = HashMap::with_capacity(leaves.len());

let best_domain_number = to_number_primitive(best_domain_number);

// Notify about active leaves on startup before starting the loop
for (hash, number, fork_choice) in std::mem::take(&mut leaves) {
for (hash, number) in std::mem::take(&mut leaves) {
let _ = active_leaves.insert(hash, number);
// Skip the blocks that have been processed by the execution chain.
if number > best_domain_number.into() {
if let Err(error) = processor((hash, number, fork_choice)).await {
if let Err(error) = processor((hash, number)).await {
tracing::error!(?error, "Failed to process primary block on startup");
// Bring down the service as bundles processor is an essential task.
// TODO: more graceful shutdown.
Expand All @@ -90,48 +92,68 @@ pub(crate) async fn handle_block_import_notifications<
}
}

// Pause the primary block import once this channel is full.
// The primary chain can be ahead of the domain by up to `block_import_throttling_buffer_size/2`
// blocks, for there are two notifications per block sent to this buffer (one will be actually
// consumed by the domain processor, the other from `sc-consensus-subspace` is used to discontinue
// the primary block import in case the primary chain runs much faster than the domain.).
let (mut block_info_sender, mut block_info_receiver) =
mpsc::channel(block_import_throttling_buffer_size as usize);

let mut client_block_import = primary_chain_client.every_import_notification_stream();

loop {
tokio::select! {
maybe_block_import = block_imports.next() => {
let (block_number, fork_choice, mut block_import_acknowledgement_sender) = match maybe_block_import {
maybe_client_block_import = client_block_import.next() => {
let notification = match maybe_client_block_import {
Some(block_import) => block_import,
None => {
// Can be None on graceful shutdown.
break;
}
};
// TODO: `.expect()` on `Option` is fine here, but not for `Error`
let header = primary_chain_client
.header(
primary_chain_client.hash(block_number)
.expect("Header of imported block must exist; qed")
.expect("Header of imported block must exist; qed")
)
.expect("Header of imported block must exist; qed")
.expect("Header of imported block must exist; qed");
let header = match primary_chain_client.header(notification.hash) {
Ok(Some(header)) => header,
res => {
tracing::error!(
result = ?res,
header = ?notification.header,
"Imported primary block header not found",
);
return;
}
};
let block_info = BlockInfo {
hash: header.hash(),
parent_hash: *header.parent_hash(),
number: *header.number(),
fork_choice
};
let _ = block_info_sender.feed(block_info).await;
let _ = block_info_sender.feed(Some(block_info)).await;
}
maybe_subspace_block_import = block_imports.next() => {
let (_block_number, mut block_import_acknowledgement_sender) =
match maybe_subspace_block_import {
Some(block_import) => block_import,
None => {
// Can be None on graceful shutdown.
break;
}
};
// Pause the primary block import when the sink is full.
let _ = block_info_sender.feed(None).await;
let _ = block_import_acknowledgement_sender.send(()).await;
}
Some(block_info) = block_info_receiver.next() => {
if let Err(error) = block_imported::<Block, PBlock, _>(
&processor,
&mut active_leaves,
block_info,
).await {
tracing::error!(?error, "Failed to process primary block");
// Bring down the service as bundles processor is an essential task.
// TODO: more graceful shutdown.
break;
Some(maybe_block_info) = block_info_receiver.next() => {
if let Some(block_info) = maybe_block_info {
if let Err(error) = block_imported::<Block, PBlock, _>(
&processor,
&mut active_leaves,
block_info,
).await {
tracing::error!(?error, "Failed to process primary block");
// Bring down the service as bundles processor is an essential task.
// TODO: more graceful shutdown.
break;
}
}
}
}
Expand Down Expand Up @@ -193,7 +215,7 @@ where
Block: BlockT,
PBlock: BlockT,
ProcessorFn: Fn(
(PBlock::Hash, NumberFor<PBlock>, ForkChoiceStrategy),
(PBlock::Hash, NumberFor<PBlock>),
) -> Pin<Box<dyn Future<Output = Result<(), sp_blockchain::Error>> + Send>>
+ Send
+ Sync,
Expand All @@ -210,7 +232,7 @@ where
debug_assert_eq!(block_info.number.saturating_sub(One::one()), number);
}

processor((block_info.hash, block_info.number, block_info.fork_choice)).await?;
processor((block_info.hash, block_info.number)).await?;

Ok(())
}
Loading