Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Support the subscription of every imported block #13372

Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions client/api/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ pub type TransactionForSB<B, Block> = <B as StateBackend<HashFor<Block>>>::Trans
/// Extracts the transaction for the given backend.
pub type TransactionFor<B, Block> = TransactionForSB<StateBackendFor<B, Block>, Block>;

/// Describes which block import notification stream should be notified.
#[derive(Debug, Clone, Copy)]
pub enum ImportNotificationAction {
/// Notify only when the node has synced to the tip or there is a re-org.
RecentBlock,
/// Notify for every single block no matter what the sync state is.
EveryBlock,
/// Both block import notifications above should be fired.
Both,
/// No block import notification should be fired.
None,
}

/// Import operation summary.
///
/// Contains information about the block that just got imported,
Expand All @@ -67,6 +80,8 @@ pub struct ImportSummary<Block: BlockT> {
///
/// If `None`, there was no re-org while importing.
pub tree_route: Option<sp_blockchain::TreeRoute<Block>>,
/// What notify action to take for this import.
pub import_notification_action: ImportNotificationAction,
}

/// Finalization operation summary.
Expand Down
10 changes: 8 additions & 2 deletions client/api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,16 @@ pub trait BlockOf {

/// A source of blockchain events.
pub trait BlockchainEvents<Block: BlockT> {
/// Get block import event stream. Not guaranteed to be fired for every
/// imported block.
/// Get block import event stream.
///
/// Not guaranteed to be fired for every imported block, only fired when the node
/// has synced to the tip or there is a re-org. Use `every_import_notification_stream()`
/// if you want a notification of every imported block regardless.
fn import_notification_stream(&self) -> ImportNotifications<Block>;

/// Get a stream of every imported block.
fn every_import_notification_stream(&self) -> ImportNotifications<Block>;
liuchengxu marked this conversation as resolved.
Show resolved Hide resolved

/// Get a stream of finality notifications. Not guaranteed to be fired for every
/// finalized block.
fn finality_notification_stream(&self) -> FinalityNotifications<Block>;
Expand Down
4 changes: 4 additions & 0 deletions client/merkle-mountain-range/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ impl BlockchainEvents<Block> for MockClient {
unimplemented!()
}

fn every_import_notification_stream(&self) -> ImportNotifications<Block> {
unimplemented!()
}

fn finality_notification_stream(&self) -> FinalityNotifications<Block> {
self.client.lock().finality_notification_stream()
}
Expand Down
91 changes: 70 additions & 21 deletions client/service/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use sc_block_builder::{BlockBuilderApi, BlockBuilderProvider, RecordProof};
use sc_client_api::{
backend::{
self, apply_aux, BlockImportOperation, ClientImportOperation, FinalizeSummary, Finalizer,
ImportSummary, LockImportRun, NewBlockState, StorageProvider,
ImportNotificationAction, ImportSummary, LockImportRun, NewBlockState, StorageProvider,
},
client::{
BadBlocks, BlockBackend, BlockImportNotification, BlockOf, BlockchainEvents, ClientInfo,
Expand Down Expand Up @@ -106,6 +106,7 @@ where
executor: E,
storage_notifications: StorageNotifications<Block>,
import_notification_sinks: NotificationSinks<BlockImportNotification<Block>>,
every_import_notification_sinks: NotificationSinks<BlockImportNotification<Block>>,
finality_notification_sinks: NotificationSinks<FinalityNotification<Block>>,
// Collects auxiliary operations to be performed atomically together with
// block import operations.
Expand Down Expand Up @@ -304,19 +305,22 @@ where
FinalityNotification::from_summary(summary, self.unpin_worker_sender.clone())
});

let (import_notification, storage_changes) = match notify_imported {
Some(mut summary) => {
let storage_changes = summary.storage_changes.take();
(
Some(BlockImportNotification::from_summary(
summary,
self.unpin_worker_sender.clone(),
)),
storage_changes,
)
},
None => (None, None),
};
let (import_notification, storage_changes, import_notification_action) =
match notify_imported {
Some(mut summary) => {
let import_notification_action = summary.import_notification_action;
let storage_changes = summary.storage_changes.take();
(
Some(BlockImportNotification::from_summary(
summary,
self.unpin_worker_sender.clone(),
)),
storage_changes,
import_notification_action,
)
},
None => (None, None, ImportNotificationAction::None),
};

if let Some(ref notification) = finality_notification {
for action in self.finality_actions.lock().iter_mut() {
Expand Down Expand Up @@ -353,7 +357,7 @@ where
}

self.notify_finalized(finality_notification)?;
self.notify_imported(import_notification, storage_changes)?;
self.notify_imported(import_notification, import_notification_action, storage_changes)?;

Ok(r)
};
Expand Down Expand Up @@ -451,6 +455,7 @@ where
executor,
storage_notifications: StorageNotifications::new(prometheus_registry),
import_notification_sinks: Default::default(),
every_import_notification_sinks: Default::default(),
finality_notification_sinks: Default::default(),
import_actions: Default::default(),
finality_actions: Default::default(),
Expand Down Expand Up @@ -769,11 +774,15 @@ where

operation.op.insert_aux(aux)?;

// We only notify when we are already synced to the tip of the chain
let should_notify_every_block = !self.every_import_notification_sinks.lock().is_empty();

// Notify when we are already synced to the tip of the chain
// or if this import triggers a re-org
if make_notifications || tree_route.is_some() {
let should_notify_recent_block = make_notifications || tree_route.is_some();

if should_notify_every_block || should_notify_recent_block {
let header = import_headers.into_post();
if finalized {
if finalized && should_notify_recent_block {
let mut summary = match operation.notify_finalized.take() {
Some(mut summary) => {
summary.header = header.clone();
Expand Down Expand Up @@ -810,13 +819,24 @@ where
operation.notify_finalized = Some(summary);
}

let import_notification_action = if should_notify_every_block {
if should_notify_recent_block {
ImportNotificationAction::Both
} else {
ImportNotificationAction::EveryBlock
}
} else {
ImportNotificationAction::RecentBlock
};

operation.notify_imported = Some(ImportSummary {
hash,
origin,
header,
is_new_best,
storage_changes,
tree_route,
import_notification_action,
})
}

Expand Down Expand Up @@ -1012,6 +1032,7 @@ where
fn notify_imported(
&self,
notification: Option<BlockImportNotification<Block>>,
import_notification_action: ImportNotificationAction,
storage_changes: Option<(StorageCollection, ChildStorageCollection)>,
) -> sp_blockchain::Result<()> {
let notification = match notification {
Expand All @@ -1024,6 +1045,9 @@ where
// temporary leak of closed/discarded notification sinks (e.g.
// from consensus code).
self.import_notification_sinks.lock().retain(|sink| !sink.is_closed());

self.every_import_notification_sinks.lock().retain(|sink| !sink.is_closed());

return Ok(())
},
};
Expand All @@ -1037,9 +1061,28 @@ where
);
liuchengxu marked this conversation as resolved.
Show resolved Hide resolved
}

self.import_notification_sinks
.lock()
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
match import_notification_action {
ImportNotificationAction::Both => {
self.import_notification_sinks
.lock()
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());

self.every_import_notification_sinks
.lock()
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
},
ImportNotificationAction::RecentBlock => {
self.import_notification_sinks
.lock()
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
liuchengxu marked this conversation as resolved.
Show resolved Hide resolved
},
ImportNotificationAction::EveryBlock => {
self.every_import_notification_sinks
.lock()
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
liuchengxu marked this conversation as resolved.
Show resolved Hide resolved
},
ImportNotificationAction::None => {},
liuchengxu marked this conversation as resolved.
Show resolved Hide resolved
liuchengxu marked this conversation as resolved.
Show resolved Hide resolved
}

Ok(())
}
Expand Down Expand Up @@ -1944,6 +1987,12 @@ where
stream
}

fn every_import_notification_stream(&self) -> ImportNotifications<Block> {
let (sink, stream) = tracing_unbounded("mpsc_every_import_notification_stream", 100_000);
self.every_import_notification_sinks.lock().push(sink);
stream
}

fn finality_notification_stream(&self) -> FinalityNotifications<Block> {
let (sink, stream) = tracing_unbounded("mpsc_finality_notification_stream", 100_000);
self.finality_notification_sinks.lock().push(sink);
Expand Down