Skip to content

Commit

Permalink
Remove BlockImport enum with an Option
Browse files Browse the repository at this point in the history
  • Loading branch information
liuchengxu committed Mar 14, 2023
1 parent 5183e1f commit e4df22a
Showing 1 changed file with 19 additions and 30 deletions.
49 changes: 19 additions & 30 deletions domains/client/domain-executor/src/domain_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,6 @@ pub(crate) async fn handle_slot_notifications<Block, PBlock, PClient, BundlerFn>
}
}

enum BlockImport<BlockImportNotification> {
/// Block import notification from the native substrate client.
///
/// Fired after the block import pipeline is finished.
Client(BlockImportNotification),
/// Block import notification from `sc-consensus-subspace`.
///
/// As a placeholder to reserve a slot in the buffer of block import throttling.
Subspace,
}

pub(crate) async fn handle_block_import_notifications<
Block,
PBlock,
Expand Down Expand Up @@ -103,8 +92,11 @@ pub(crate) async fn handle_block_import_notifications<
}
}

// Pause the primary block import once this channel is full.
let (mut multi_block_import_sender, mut multi_block_import_receiver) =
// The primary chain can be ahead of the domain by up to `block_import_throttling_buffer_size/2`
// blocks, for there are two notifications per block sent to this buffer (one will be actually
// consumed by the domain processor, the other from `sc-consensus-subspace` is used to discontinue
// the primary block import in case the primary chain runs much faster than the domain.).
let (mut block_info_sender, mut block_info_receiver) =
mpsc::channel(block_import_throttling_buffer_size as usize);

let mut client_block_import = primary_chain_client.every_import_notification_stream();
Expand Down Expand Up @@ -135,7 +127,7 @@ pub(crate) async fn handle_block_import_notifications<
parent_hash: *header.parent_hash(),
number: *header.number(),
};
let _ = multi_block_import_sender.feed(BlockImport::Client(block_info)).await;
let _ = block_info_sender.feed(Some(block_info)).await;
}
maybe_subspace_block_import = block_imports.next() => {
let (_block_number, mut block_import_acknowledgement_sender) =
Expand All @@ -146,26 +138,23 @@ pub(crate) async fn handle_block_import_notifications<
break;
}
};
let _ = multi_block_import_sender.feed(BlockImport::Subspace).await;
// Pause the primary block import when the sink is full.
let _ = block_info_sender.feed(None).await;
let _ = block_import_acknowledgement_sender.send(()).await;
}
Some(block_import_notification) = multi_block_import_receiver.next() => {
match block_import_notification {
BlockImport::Client(block_info) => {
if let Err(error) = block_imported::<Block, PBlock, _>(
&processor,
&mut active_leaves,
block_info,
).await {
tracing::error!(?error, "Failed to process primary block");
// Bring down the service as bundles processor is an essential task.
// TODO: more graceful shutdown.
break;
}
Some(maybe_block_info) = block_info_receiver.next() => {
if let Some(block_info) = maybe_block_info {
if let Err(error) = block_imported::<Block, PBlock, _>(
&processor,
&mut active_leaves,
block_info,
).await {
tracing::error!(?error, "Failed to process primary block");
// Bring down the service as bundles processor is an essential task.
// TODO: more graceful shutdown.
break;
}
BlockImport::Subspace => {}
}

}
}
}
Expand Down

0 comments on commit e4df22a

Please sign in to comment.