diff --git a/domains/client/domain-executor/src/domain_worker.rs b/domains/client/domain-executor/src/domain_worker.rs index 6ea9b4b961..56497840c0 100644 --- a/domains/client/domain-executor/src/domain_worker.rs +++ b/domains/client/domain-executor/src/domain_worker.rs @@ -46,17 +46,6 @@ pub(crate) async fn handle_slot_notifications } } -enum BlockImport { - /// Block import notification from the native substrate client. - /// - /// Fired after the block import pipeline is finished. - Client(BlockImportNotification), - /// Block import notification from `sc-consensus-subspace`. - /// - /// As a placeholder to reserve a slot in the buffer of block import throttling. - Subspace, -} - pub(crate) async fn handle_block_import_notifications< Block, PBlock, @@ -103,8 +92,11 @@ pub(crate) async fn handle_block_import_notifications< } } - // Pause the primary block import once this channel is full. - let (mut multi_block_import_sender, mut multi_block_import_receiver) = + // The primary chain can be ahead of the domain by up to `block_import_throttling_buffer_size/2` + // blocks, for there are two notifications per block sent to this buffer (one will be actually + // consumed by the domain processor, the other from `sc-consensus-subspace` is used to discontinue + // the primary block import in case the primary chain runs much faster than the domain.). + let (mut block_info_sender, mut block_info_receiver) = mpsc::channel(block_import_throttling_buffer_size as usize); let mut client_block_import = primary_chain_client.every_import_notification_stream(); @@ -135,7 +127,7 @@ pub(crate) async fn handle_block_import_notifications< parent_hash: *header.parent_hash(), number: *header.number(), }; - let _ = multi_block_import_sender.feed(BlockImport::Client(block_info)).await; + let _ = block_info_sender.feed(Some(block_info)).await; } maybe_subspace_block_import = block_imports.next() => { let (_block_number, mut block_import_acknowledgement_sender) = @@ -146,26 +138,23 @@ pub(crate) async fn handle_block_import_notifications< break; } }; - let _ = multi_block_import_sender.feed(BlockImport::Subspace).await; + // Pause the primary block import when the sink is full. + let _ = block_info_sender.feed(None).await; let _ = block_import_acknowledgement_sender.send(()).await; } - Some(block_import_notification) = multi_block_import_receiver.next() => { - match block_import_notification { - BlockImport::Client(block_info) => { - if let Err(error) = block_imported::( - &processor, - &mut active_leaves, - block_info, - ).await { - tracing::error!(?error, "Failed to process primary block"); - // Bring down the service as bundles processor is an essential task. - // TODO: more graceful shutdown. - break; - } + Some(maybe_block_info) = block_info_receiver.next() => { + if let Some(block_info) = maybe_block_info { + if let Err(error) = block_imported::( + &processor, + &mut active_leaves, + block_info, + ).await { + tracing::error!(?error, "Failed to process primary block"); + // Bring down the service as bundles processor is an essential task. + // TODO: more graceful shutdown. + break; } - BlockImport::Subspace => {} } - } } }