From b3ed108e073bd18f9fc94a0f7ff670e3e5f0b6e8 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 4 Oct 2024 20:17:58 +0300 Subject: [PATCH 1/7] feat(exex): subscribe to notifications with head using `ExExContext` --- crates/exex/exex/src/context.rs | 11 +- crates/exex/exex/src/manager.rs | 128 ++++++++++++------- crates/exex/exex/src/notifications.rs | 171 +++++++++++++++----------- crates/exex/test-utils/src/lib.rs | 2 +- 4 files changed, 195 insertions(+), 117 deletions(-) diff --git a/crates/exex/exex/src/context.rs b/crates/exex/exex/src/context.rs index bdb2d4c27b99..bc9c58e9234a 100644 --- a/crates/exex/exex/src/context.rs +++ b/crates/exex/exex/src/context.rs @@ -1,5 +1,6 @@ use std::fmt::Debug; +use reth_exex_types::ExExHead; use reth_node_api::{FullNodeComponents, NodeTypes, NodeTypesWithEngine}; use reth_node_core::node_config::NodeConfig; use reth_primitives::Head; @@ -32,7 +33,7 @@ pub struct ExExContext { /// considered delivered by the node. pub notifications: ExExNotifications, - /// node components + /// Node components pub components: Node, } @@ -92,4 +93,12 @@ impl ExExContext { pub fn task_executor(&self) -> &TaskExecutor { self.components.task_executor() } + + pub fn with_notifications_without_head(self) -> Self { + Self { notifications: self.notifications.without_head(), ..self } + } + + pub fn with_notifications_with_head(self, head: ExExHead) -> Self { + Self { notifications: self.notifications.with_head(head), ..self } + } } diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index e8e24c09db02..af7fff29fe45 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -81,8 +81,13 @@ impl ExExHandle { ) -> (Self, UnboundedSender, ExExNotifications) { let (notification_tx, notification_rx) = mpsc::channel(1); let (event_tx, event_rx) = mpsc::unbounded_channel(); - let notifications = - ExExNotifications::new(node_head, provider, executor, notification_rx, wal_handle); + let notifications = ExExNotifications::new_without_head( + node_head, + provider, + executor, + notification_rx, + wal_handle, + ); ( Self { @@ -610,12 +615,16 @@ impl Clone for ExExManagerHandle { mod tests { use super::*; use alloy_primitives::B256; - use eyre::OptionExt; - use futures::{FutureExt, StreamExt}; + use futures::StreamExt; use rand::Rng; + use reth_db_common::init::init_genesis; + use reth_evm_ethereum::execute::EthExecutorProvider; use reth_primitives::SealedBlockWithSenders; - use reth_provider::{test_utils::create_test_provider_factory, BlockWriter, Chain}; - use reth_testing_utils::generators::{self, random_block}; + use reth_provider::{ + providers::BlockchainProvider2, test_utils::create_test_provider_factory, BlockReader, + Chain, TransactionVariant, + }; + use reth_testing_utils::generators; fn empty_finalized_header_stream() -> ForkChoiceStream { let (tx, rx) = watch::channel(None); @@ -975,11 +984,20 @@ mod tests { #[tokio::test] async fn exex_handle_new() { + let provider_factory = create_test_provider_factory(); + init_genesis(&provider_factory).unwrap(); + let provider = BlockchainProvider2::new(provider_factory).unwrap(); + let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); - let (mut exex_handle, _, mut notifications) = - ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); + let (mut exex_handle, _, mut notifications) = ExExHandle::new( + "test_exex".to_string(), + Head::default(), + provider, + EthExecutorProvider::mainnet(), + wal.handle(), + ); // Check initial state assert_eq!(exex_handle.id, "test_exex"); @@ -1008,7 +1026,7 @@ mod tests { // Send a notification and ensure it's received correctly match exex_handle.send(&mut cx, &(22, notification.clone())) { Poll::Ready(Ok(())) => { - let received_notification = notifications.next().await.unwrap(); + let received_notification = notifications.next().await.unwrap().unwrap(); assert_eq!(received_notification, notification); } Poll::Pending => panic!("Notification send is pending"), @@ -1021,11 +1039,20 @@ mod tests { #[tokio::test] async fn test_notification_if_finished_height_gt_chain_tip() { + let provider_factory = create_test_provider_factory(); + init_genesis(&provider_factory).unwrap(); + let provider = BlockchainProvider2::new(provider_factory).unwrap(); + let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); - let (mut exex_handle, _, mut notifications) = - ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); + let (mut exex_handle, _, mut notifications) = ExExHandle::new( + "test_exex".to_string(), + Head::default(), + provider, + EthExecutorProvider::mainnet(), + wal.handle(), + ); // Set finished_height to a value higher than the block tip exex_handle.finished_height = Some(BlockNumHash::new(15, B256::random())); @@ -1046,11 +1073,7 @@ mod tests { poll_fn(|cx| { // The notification should be skipped, so nothing should be sent. // Check that the receiver channel is indeed empty - assert_eq!( - notifications.poll_next_unpin(cx), - Poll::Pending, - "Receiver channel should be empty" - ); + assert!(notifications.poll_next_unpin(cx).is_pending()); Poll::Ready(()) }) .await; @@ -1066,11 +1089,20 @@ mod tests { #[tokio::test] async fn test_sends_chain_reorged_notification() { + let provider_factory = create_test_provider_factory(); + init_genesis(&provider_factory).unwrap(); + let provider = BlockchainProvider2::new(provider_factory).unwrap(); + let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); - let (mut exex_handle, _, mut notifications) = - ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); + let (mut exex_handle, _, mut notifications) = ExExHandle::new( + "test_exex".to_string(), + Head::default(), + provider, + EthExecutorProvider::mainnet(), + wal.handle(), + ); let notification = ExExNotification::ChainReorged { old: Arc::new(Chain::default()), @@ -1086,7 +1118,7 @@ mod tests { // Send the notification match exex_handle.send(&mut cx, &(22, notification.clone())) { Poll::Ready(Ok(())) => { - let received_notification = notifications.next().await.unwrap(); + let received_notification = notifications.next().await.unwrap().unwrap(); assert_eq!(received_notification, notification); } Poll::Pending | Poll::Ready(Err(_)) => { @@ -1100,11 +1132,20 @@ mod tests { #[tokio::test] async fn test_sends_chain_reverted_notification() { + let provider_factory = create_test_provider_factory(); + init_genesis(&provider_factory).unwrap(); + let provider = BlockchainProvider2::new(provider_factory).unwrap(); + let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); - let (mut exex_handle, _, mut notifications) = - ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); + let (mut exex_handle, _, mut notifications) = ExExHandle::new( + "test_exex".to_string(), + Head::default(), + provider, + EthExecutorProvider::mainnet(), + wal.handle(), + ); let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) }; @@ -1117,7 +1158,7 @@ mod tests { // Send the notification match exex_handle.send(&mut cx, &(22, notification.clone())) { Poll::Ready(Ok(())) => { - let received_notification = notifications.next().await.unwrap(); + let received_notification = notifications.next().await.unwrap().unwrap(); assert_eq!(received_notification, notification); } Poll::Pending | Poll::Ready(Err(_)) => { @@ -1135,30 +1176,34 @@ mod tests { let mut rng = generators::rng(); + let provider_factory = create_test_provider_factory(); + let genesis_hash = init_genesis(&provider_factory).unwrap(); + let genesis_block = provider_factory + .sealed_block_with_senders(genesis_hash.into(), TransactionVariant::NoHash) + .unwrap() + .ok_or_else(|| eyre::eyre!("genesis block not found"))?; + let provider = BlockchainProvider2::new(provider_factory).unwrap(); + let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); - let provider_factory = create_test_provider_factory(); - - let block = random_block(&mut rng, 0, Default::default()) - .seal_with_senders() - .ok_or_eyre("failed to recover senders")?; - let provider_rw = provider_factory.provider_rw()?; - provider_rw.insert_block(block.clone())?; - provider_rw.commit()?; + let (exex_handle, events_tx, mut notifications) = ExExHandle::new( + "test_exex".to_string(), + Head::default(), + provider.clone(), + EthExecutorProvider::mainnet(), + wal.handle(), + ); let notification = ExExNotification::ChainCommitted { - new: Arc::new(Chain::new(vec![block.clone()], Default::default(), None)), + new: Arc::new(Chain::new(vec![genesis_block.clone()], Default::default(), None)), }; let (finalized_headers_tx, rx) = watch::channel(None); let finalized_header_stream = ForkChoiceStream::new(rx); - let (exex_handle, events_tx, mut notifications) = - ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); - let mut exex_manager = std::pin::pin!(ExExManager::new( - provider_factory, + provider, vec![exex_handle], 1, wal, @@ -1170,16 +1215,13 @@ mod tests { exex_manager.handle().send(notification.clone())?; assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending()); - assert_eq!( - notifications.next().poll_unpin(&mut cx), - Poll::Ready(Some(notification.clone())) - ); + assert_eq!(notifications.next().await.unwrap().unwrap(), notification.clone()); assert_eq!( exex_manager.wal.iter_notifications()?.collect::>>()?, [notification.clone()] ); - finalized_headers_tx.send(Some(block.header.clone()))?; + finalized_headers_tx.send(Some(genesis_block.header.clone()))?; assert!(exex_manager.as_mut().poll(&mut cx).is_pending()); // WAL isn't finalized because the ExEx didn't emit the `FinishedHeight` event assert_eq!( @@ -1192,7 +1234,7 @@ mod tests { .send(ExExEvent::FinishedHeight((rng.gen::(), rng.gen::()).into())) .unwrap(); - finalized_headers_tx.send(Some(block.header.clone()))?; + finalized_headers_tx.send(Some(genesis_block.header.clone()))?; assert!(exex_manager.as_mut().poll(&mut cx).is_pending()); // WAL isn't finalized because the ExEx emitted a `FinishedHeight` event with a // non-canonical block @@ -1202,9 +1244,9 @@ mod tests { ); // Send a `FinishedHeight` event with a canonical block - events_tx.send(ExExEvent::FinishedHeight(block.num_hash())).unwrap(); + events_tx.send(ExExEvent::FinishedHeight(genesis_block.num_hash())).unwrap(); - finalized_headers_tx.send(Some(block.header.clone()))?; + finalized_headers_tx.send(Some(genesis_block.header.clone()))?; assert!(exex_manager.as_mut().poll(&mut cx).is_pending()); // WAL is finalized assert!(exex_manager.wal.iter_notifications()?.next().is_none()); diff --git a/crates/exex/exex/src/notifications.rs b/crates/exex/exex/src/notifications.rs index 116dac95422b..78d13bad72a8 100644 --- a/crates/exex/exex/src/notifications.rs +++ b/crates/exex/exex/src/notifications.rs @@ -13,8 +13,89 @@ use std::{ }; use tokio::sync::mpsc::Receiver; +/// A stream of [`ExExNotification`]s. +#[derive(Debug)] +pub enum ExExNotifications { + /// Emits notifications for all blocks. + WithoutHead(ExExNotificationsWithoutHead), + /// Only emits notifications for blocks that are committed or reverted after the given head. + WithHead(ExExNotificationsWithHead), +} + +impl ExExNotifications { + /// Creates a new [`ExExNotifications::WithoutHead`]. + pub const fn new_without_head( + node_head: Head, + provider: P, + executor: E, + notifications: Receiver, + wal_handle: WalHandle, + ) -> Self { + Self::WithoutHead(ExExNotificationsWithoutHead::new( + node_head, + provider, + executor, + notifications, + wal_handle, + )) + } + + /// Converts the [`ExExNotifications`] into [`ExExNotifications::WithoutHead`]. + /// + /// It's a no-op if the variant is already [`ExExNotifications::WithoutHead`]. + pub(super) fn without_head(self) -> Self { + Self::WithoutHead(match self { + Self::WithoutHead(notifications) => notifications, + Self::WithHead(notifications) => ExExNotificationsWithoutHead::new( + notifications.node_head, + notifications.provider, + notifications.executor, + notifications.notifications, + notifications.wal_handle, + ), + }) + } + + /// Converts the [`ExExNotifications`] into [`ExExNotifications::WithHead`]. + /// + /// It's a no-op if the variant is already [`ExExNotifications::WithHead`]. + pub(super) fn with_head(self, exex_head: ExExHead) -> Self { + Self::WithHead(match self { + Self::WithoutHead(notifications) => notifications.with_head(exex_head), + Self::WithHead(notifications) => ExExNotificationsWithHead::new( + notifications.node_head, + notifications.provider, + notifications.executor, + notifications.notifications, + notifications.wal_handle, + exex_head, + ), + }) + } +} + +impl Stream for ExExNotifications +where + P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, + E: BlockExecutorProvider + Clone + Unpin + 'static, +{ + type Item = eyre::Result; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match self.get_mut() { + Self::WithoutHead(notifications) => { + notifications.poll_next_unpin(cx).map(|result| result.map(Ok)) + } + Self::WithHead(notifications) => notifications.poll_next_unpin(cx), + } + } +} + /// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. -pub struct ExExNotifications { +pub struct ExExNotificationsWithoutHead { node_head: Head, provider: P, executor: E, @@ -22,7 +103,7 @@ pub struct ExExNotifications { wal_handle: WalHandle, } -impl Debug for ExExNotifications { +impl Debug for ExExNotificationsWithoutHead { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ExExNotifications") .field("provider", &self.provider) @@ -32,9 +113,9 @@ impl Debug for ExExNotifications { } } -impl ExExNotifications { +impl ExExNotificationsWithoutHead { /// Creates a new instance of [`ExExNotifications`]. - pub const fn new( + const fn new( node_head: Head, provider: P, executor: E, @@ -44,62 +125,6 @@ impl ExExNotifications { Self { node_head, provider, executor, notifications, wal_handle } } - /// Receives the next value for this receiver. - /// - /// This method returns `None` if the channel has been closed and there are - /// no remaining messages in the channel's buffer. This indicates that no - /// further values can ever be received from this `Receiver`. The channel is - /// closed when all senders have been dropped, or when [`Receiver::close`] is called. - /// - /// # Cancel safety - /// - /// This method is cancel safe. If `recv` is used as the event in a - /// [`tokio::select!`] statement and some other branch - /// completes first, it is guaranteed that no messages were received on this - /// channel. - /// - /// For full documentation, see [`Receiver::recv`]. - #[deprecated(note = "use `ExExNotifications::next` and its `Stream` implementation instead")] - pub async fn recv(&mut self) -> Option { - self.notifications.recv().await - } - - /// Polls to receive the next message on this channel. - /// - /// This method returns: - /// - /// * `Poll::Pending` if no messages are available but the channel is not closed, or if a - /// spurious failure happens. - /// * `Poll::Ready(Some(message))` if a message is available. - /// * `Poll::Ready(None)` if the channel has been closed and all messages sent before it was - /// closed have been received. - /// - /// When the method returns `Poll::Pending`, the `Waker` in the provided - /// `Context` is scheduled to receive a wakeup when a message is sent on any - /// receiver, or when the channel is closed. Note that on multiple calls to - /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context` - /// passed to the most recent call is scheduled to receive a wakeup. - /// - /// If this method returns `Poll::Pending` due to a spurious failure, then - /// the `Waker` will be notified when the situation causing the spurious - /// failure has been resolved. Note that receiving such a wakeup does not - /// guarantee that the next call will succeed — it could fail with another - /// spurious failure. - /// - /// For full documentation, see [`Receiver::poll_recv`]. - #[deprecated( - note = "use `ExExNotifications::poll_next` and its `Stream` implementation instead" - )] - pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { - self.notifications.poll_recv(cx) - } -} - -impl ExExNotifications -where - P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, - E: BlockExecutorProvider + Clone + Unpin + 'static, -{ /// Subscribe to notifications with the given head. This head is the ExEx's /// latest view of the host chain. /// @@ -107,7 +132,7 @@ where /// example, if `head.number == 10`, then the first notification will be /// with `block.number == 11`. A `head.number` of 10 indicates that the ExEx /// has processed up to block 10, and is ready to process block 11. - pub fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead { + fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead { ExExNotificationsWithHead::new( self.node_head, self.provider, @@ -119,7 +144,7 @@ where } } -impl Stream for ExExNotifications { +impl Stream for ExExNotificationsWithoutHead { type Item = ExExNotification; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -147,13 +172,9 @@ pub struct ExExNotificationsWithHead { backfill_job: Option>, } -impl ExExNotificationsWithHead -where - P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, - E: BlockExecutorProvider + Clone + Unpin + 'static, -{ +impl ExExNotificationsWithHead { /// Creates a new [`ExExNotificationsWithHead`]. - pub const fn new( + const fn new( node_head: Head, provider: P, executor: E, @@ -173,7 +194,13 @@ where backfill_job: None, } } +} +impl ExExNotificationsWithHead +where + P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, + E: BlockExecutorProvider + Clone + Unpin + 'static, +{ /// Checks if the ExEx head is on the canonical chain. /// /// If the head block is not found in the database or it's ahead of the node head, it means @@ -367,7 +394,7 @@ mod tests { notifications_tx.send(notification.clone()).await?; - let mut notifications = ExExNotifications::new( + let mut notifications = ExExNotificationsWithoutHead::new( node_head, provider, EthExecutorProvider::mainnet(), @@ -438,7 +465,7 @@ mod tests { notifications_tx.send(notification.clone()).await?; - let mut notifications = ExExNotifications::new( + let mut notifications = ExExNotificationsWithoutHead::new( node_head, provider, EthExecutorProvider::mainnet(), @@ -528,7 +555,7 @@ mod tests { notifications_tx.send(new_notification.clone()).await?; - let mut notifications = ExExNotifications::new( + let mut notifications = ExExNotificationsWithoutHead::new( node_head, provider, EthExecutorProvider::mainnet(), @@ -609,7 +636,7 @@ mod tests { notifications_tx.send(new_notification.clone()).await?; - let mut notifications = ExExNotifications::new( + let mut notifications = ExExNotificationsWithoutHead::new( node_head, provider, EthExecutorProvider::mainnet(), diff --git a/crates/exex/test-utils/src/lib.rs b/crates/exex/test-utils/src/lib.rs index f4561a6b55f2..18745e989e91 100644 --- a/crates/exex/test-utils/src/lib.rs +++ b/crates/exex/test-utils/src/lib.rs @@ -317,7 +317,7 @@ pub async fn test_exex_context_with_chain_spec( let (events_tx, events_rx) = tokio::sync::mpsc::unbounded_channel(); let (notifications_tx, notifications_rx) = tokio::sync::mpsc::channel(1); - let notifications = ExExNotifications::new( + let notifications = ExExNotifications::new_without_head( head, components.provider.clone(), components.components.executor.clone(), From bb4bcf42de4e425f2bba89bc9c5852eae53b7a9e Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 7 Oct 2024 10:12:37 +0100 Subject: [PATCH 2/7] use option --- crates/exex/exex/src/context.rs | 17 ++++ crates/exex/exex/src/notifications.rs | 110 +++++++++++++++++++++----- 2 files changed, 106 insertions(+), 21 deletions(-) diff --git a/crates/exex/exex/src/context.rs b/crates/exex/exex/src/context.rs index bc9c58e9234a..4b7107c58eba 100644 --- a/crates/exex/exex/src/context.rs +++ b/crates/exex/exex/src/context.rs @@ -94,11 +94,28 @@ impl ExExContext { self.components.task_executor() } + /// Replaces notifications stream with a stream of notifications without a head. + /// + /// Consumes the type and returns a new instance of [`ExExContext`]. pub fn with_notifications_without_head(self) -> Self { Self { notifications: self.notifications.without_head(), ..self } } + /// Replaces notifications stream with a stream of notifications with a head using the + /// provided head. + /// + /// Consumes the type and returns a new instance of [`ExExContext`]. pub fn with_notifications_with_head(self, head: ExExHead) -> Self { Self { notifications: self.notifications.with_head(head), ..self } } + + /// Sets notifications stream to a stream of notifications without a head. + pub fn set_notifications_without_head(&mut self) { + self.notifications.set_without_head(); + } + + /// Sets notifications stream to a stream of notifications with a head using the provided head. + pub fn set_notifications_with_head(&mut self, head: ExExHead) { + self.notifications.set_with_head(head); + } } diff --git a/crates/exex/exex/src/notifications.rs b/crates/exex/exex/src/notifications.rs index 78d13bad72a8..bc7c2868f379 100644 --- a/crates/exex/exex/src/notifications.rs +++ b/crates/exex/exex/src/notifications.rs @@ -15,15 +15,16 @@ use tokio::sync::mpsc::Receiver; /// A stream of [`ExExNotification`]s. #[derive(Debug)] -pub enum ExExNotifications { - /// Emits notifications for all blocks. +pub struct ExExNotifications(Option>); + +#[derive(Debug)] +enum ExExNotificationsInner { WithoutHead(ExExNotificationsWithoutHead), - /// Only emits notifications for blocks that are committed or reverted after the given head. WithHead(ExExNotificationsWithHead), } impl ExExNotifications { - /// Creates a new [`ExExNotifications::WithoutHead`]. + /// Creates a new stream of [`ExExNotifications`] without a head. pub const fn new_without_head( node_head: Head, provider: P, @@ -31,38 +32,91 @@ impl ExExNotifications { notifications: Receiver, wal_handle: WalHandle, ) -> Self { - Self::WithoutHead(ExExNotificationsWithoutHead::new( + Self(Some(ExExNotificationsInner::WithoutHead(ExExNotificationsWithoutHead::new( node_head, provider, executor, notifications, wal_handle, - )) + )))) + } + + /// Returns the inner value of the [`ExExNotifications`]. + /// + /// # Panics + /// + /// Panics if the inner value is [`None`]. + fn inner(self) -> ExExNotificationsInner { + self.0.unwrap() + } + + /// Returns a mutable reference to the inner value of the [`ExExNotifications`]. + /// + /// # Panics + /// + /// Panics if the inner value is [`None`]. + fn inner_mut(&mut self) -> &mut ExExNotificationsInner { + self.0.as_mut().unwrap() + } + + /// Replaces the inner value of the [`ExExNotifications`]. + /// + /// # Panics + /// + /// Panics if the inner value is [`None`]. + fn replace_inner( + &mut self, + f: impl FnOnce(ExExNotificationsInner) -> ExExNotificationsInner, + ) { + self.0 = Some(f(self.0.take().unwrap())); } - /// Converts the [`ExExNotifications`] into [`ExExNotifications::WithoutHead`]. + /// Converts the [`ExExNotifications`] into a stream of [`ExExNotification`]s without a head. /// - /// It's a no-op if the variant is already [`ExExNotifications::WithoutHead`]. + /// It's a no-op if the stream has already been configured without a head. pub(super) fn without_head(self) -> Self { - Self::WithoutHead(match self { - Self::WithoutHead(notifications) => notifications, - Self::WithHead(notifications) => ExExNotificationsWithoutHead::new( + Self(Some(ExExNotificationsInner::WithoutHead(match self.inner() { + ExExNotificationsInner::WithoutHead(notifications) => notifications, + ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithoutHead::new( notifications.node_head, notifications.provider, notifications.executor, notifications.notifications, notifications.wal_handle, ), - }) + }))) + } + + /// Sets [`ExExNotifications`] to a stream of [`ExExNotification`]s without a head. + /// + /// It's a no-op if the stream has already been configured without a head. + pub(super) fn set_without_head(&mut self) { + self.replace_inner(|inner| { + ExExNotificationsInner::WithoutHead(match inner { + ExExNotificationsInner::WithoutHead(notifications) => notifications, + ExExNotificationsInner::WithHead(notifications) => { + ExExNotificationsWithoutHead::new( + notifications.node_head, + notifications.provider, + notifications.executor, + notifications.notifications, + notifications.wal_handle, + ) + } + }) + }); } - /// Converts the [`ExExNotifications`] into [`ExExNotifications::WithHead`]. + /// Converts the [`ExExNotifications`] into a stream of [`ExExNotification`]s with the provided + /// head. /// - /// It's a no-op if the variant is already [`ExExNotifications::WithHead`]. + /// It's a no-op if the stream has already been configured with a head. pub(super) fn with_head(self, exex_head: ExExHead) -> Self { - Self::WithHead(match self { - Self::WithoutHead(notifications) => notifications.with_head(exex_head), - Self::WithHead(notifications) => ExExNotificationsWithHead::new( + Self(Some(ExExNotificationsInner::WithHead(match self.inner() { + ExExNotificationsInner::WithoutHead(notifications) => { + notifications.with_head(exex_head) + } + ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithHead::new( notifications.node_head, notifications.provider, notifications.executor, @@ -70,7 +124,21 @@ impl ExExNotifications { notifications.wal_handle, exex_head, ), - }) + }))) + } + + /// Sets [`ExExNotifications`] to a stream of [`ExExNotification`]s with the provided head. + /// + /// It's a no-op if the stream has already been configured with a head. + pub(super) fn set_with_head(&mut self, exex_head: ExExHead) { + self.replace_inner(|inner| { + ExExNotificationsInner::WithHead(match inner { + ExExNotificationsInner::WithoutHead(notifications) => { + notifications.with_head(exex_head) + } + ExExNotificationsInner::WithHead(notifications) => notifications, + }) + }); } } @@ -85,11 +153,11 @@ where self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - match self.get_mut() { - Self::WithoutHead(notifications) => { + match self.get_mut().inner_mut() { + ExExNotificationsInner::WithoutHead(notifications) => { notifications.poll_next_unpin(cx).map(|result| result.map(Ok)) } - Self::WithHead(notifications) => notifications.poll_next_unpin(cx), + ExExNotificationsInner::WithHead(notifications) => notifications.poll_next_unpin(cx), } } } From 7086fcfd11c873ae0375da2c81e029627d7b7c2b Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 7 Oct 2024 12:25:58 +0100 Subject: [PATCH 3/7] replace with enum --- crates/exex/exex/src/context.rs | 21 +---- crates/exex/exex/src/manager.rs | 9 +- crates/exex/exex/src/notifications.rs | 123 +++++++------------------- crates/exex/test-utils/src/lib.rs | 2 +- 4 files changed, 40 insertions(+), 115 deletions(-) diff --git a/crates/exex/exex/src/context.rs b/crates/exex/exex/src/context.rs index 4b7107c58eba..9af12e260a72 100644 --- a/crates/exex/exex/src/context.rs +++ b/crates/exex/exex/src/context.rs @@ -94,27 +94,14 @@ impl ExExContext { self.components.task_executor() } - /// Replaces notifications stream with a stream of notifications without a head. - /// - /// Consumes the type and returns a new instance of [`ExExContext`]. - pub fn with_notifications_without_head(self) -> Self { - Self { notifications: self.notifications.without_head(), ..self } - } - - /// Replaces notifications stream with a stream of notifications with a head using the - /// provided head. - /// - /// Consumes the type and returns a new instance of [`ExExContext`]. - pub fn with_notifications_with_head(self, head: ExExHead) -> Self { - Self { notifications: self.notifications.with_head(head), ..self } - } - - /// Sets notifications stream to a stream of notifications without a head. + /// Sets notifications stream to [`crate::ExExNotificationsWithoutHead`], a stream of + /// notifications without a head. pub fn set_notifications_without_head(&mut self) { self.notifications.set_without_head(); } - /// Sets notifications stream to a stream of notifications with a head using the provided head. + /// Sets notifications stream to [`crate::ExExNotificationsWithHead`], a stream of notifications + /// with the provided head. pub fn set_notifications_with_head(&mut self, head: ExExHead) { self.notifications.set_with_head(head); } diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index af7fff29fe45..31dc822222b6 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -81,13 +81,8 @@ impl ExExHandle { ) -> (Self, UnboundedSender, ExExNotifications) { let (notification_tx, notification_rx) = mpsc::channel(1); let (event_tx, event_rx) = mpsc::unbounded_channel(); - let notifications = ExExNotifications::new_without_head( - node_head, - provider, - executor, - notification_rx, - wal_handle, - ); + let notifications = + ExExNotifications::new(node_head, provider, executor, notification_rx, wal_handle); ( Self { diff --git a/crates/exex/exex/src/notifications.rs b/crates/exex/exex/src/notifications.rs index bc7c2868f379..8d13df822152 100644 --- a/crates/exex/exex/src/notifications.rs +++ b/crates/exex/exex/src/notifications.rs @@ -15,67 +15,49 @@ use tokio::sync::mpsc::Receiver; /// A stream of [`ExExNotification`]s. #[derive(Debug)] -pub struct ExExNotifications(Option>); +pub struct ExExNotifications { + inner: ExExNotificationsInner, +} #[derive(Debug)] enum ExExNotificationsInner { + /// Internal state used when transitioning between [`ExExNotificationsInner::WithoutHead`] and + /// [`ExExNotificationsInner::WithHead`]. + Invalid, + /// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. WithoutHead(ExExNotificationsWithoutHead), + /// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that + /// are committed or reverted after the given head. WithHead(ExExNotificationsWithHead), } impl ExExNotifications { /// Creates a new stream of [`ExExNotifications`] without a head. - pub const fn new_without_head( + pub const fn new( node_head: Head, provider: P, executor: E, notifications: Receiver, wal_handle: WalHandle, ) -> Self { - Self(Some(ExExNotificationsInner::WithoutHead(ExExNotificationsWithoutHead::new( - node_head, - provider, - executor, - notifications, - wal_handle, - )))) - } - - /// Returns the inner value of the [`ExExNotifications`]. - /// - /// # Panics - /// - /// Panics if the inner value is [`None`]. - fn inner(self) -> ExExNotificationsInner { - self.0.unwrap() - } - - /// Returns a mutable reference to the inner value of the [`ExExNotifications`]. - /// - /// # Panics - /// - /// Panics if the inner value is [`None`]. - fn inner_mut(&mut self) -> &mut ExExNotificationsInner { - self.0.as_mut().unwrap() - } - - /// Replaces the inner value of the [`ExExNotifications`]. - /// - /// # Panics - /// - /// Panics if the inner value is [`None`]. - fn replace_inner( - &mut self, - f: impl FnOnce(ExExNotificationsInner) -> ExExNotificationsInner, - ) { - self.0 = Some(f(self.0.take().unwrap())); + Self { + inner: ExExNotificationsInner::WithoutHead(ExExNotificationsWithoutHead::new( + node_head, + provider, + executor, + notifications, + wal_handle, + )), + } } - /// Converts the [`ExExNotifications`] into a stream of [`ExExNotification`]s without a head. + /// Sets [`ExExNotifications`] to a stream of [`ExExNotification`]s without a head. /// /// It's a no-op if the stream has already been configured without a head. - pub(super) fn without_head(self) -> Self { - Self(Some(ExExNotificationsInner::WithoutHead(match self.inner() { + pub(super) fn set_without_head(&mut self) { + let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid); + self.inner = ExExNotificationsInner::WithoutHead(match current { + ExExNotificationsInner::Invalid => unreachable!(), ExExNotificationsInner::WithoutHead(notifications) => notifications, ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithoutHead::new( notifications.node_head, @@ -84,60 +66,20 @@ impl ExExNotifications { notifications.notifications, notifications.wal_handle, ), - }))) - } - - /// Sets [`ExExNotifications`] to a stream of [`ExExNotification`]s without a head. - /// - /// It's a no-op if the stream has already been configured without a head. - pub(super) fn set_without_head(&mut self) { - self.replace_inner(|inner| { - ExExNotificationsInner::WithoutHead(match inner { - ExExNotificationsInner::WithoutHead(notifications) => notifications, - ExExNotificationsInner::WithHead(notifications) => { - ExExNotificationsWithoutHead::new( - notifications.node_head, - notifications.provider, - notifications.executor, - notifications.notifications, - notifications.wal_handle, - ) - } - }) }); } - /// Converts the [`ExExNotifications`] into a stream of [`ExExNotification`]s with the provided - /// head. + /// Sets [`ExExNotifications`] to a stream of [`ExExNotification`]s with the provided head. /// /// It's a no-op if the stream has already been configured with a head. - pub(super) fn with_head(self, exex_head: ExExHead) -> Self { - Self(Some(ExExNotificationsInner::WithHead(match self.inner() { + pub(super) fn set_with_head(&mut self, exex_head: ExExHead) { + let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid); + self.inner = ExExNotificationsInner::WithHead(match current { + ExExNotificationsInner::Invalid => unreachable!(), ExExNotificationsInner::WithoutHead(notifications) => { notifications.with_head(exex_head) } - ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithHead::new( - notifications.node_head, - notifications.provider, - notifications.executor, - notifications.notifications, - notifications.wal_handle, - exex_head, - ), - }))) - } - - /// Sets [`ExExNotifications`] to a stream of [`ExExNotification`]s with the provided head. - /// - /// It's a no-op if the stream has already been configured with a head. - pub(super) fn set_with_head(&mut self, exex_head: ExExHead) { - self.replace_inner(|inner| { - ExExNotificationsInner::WithHead(match inner { - ExExNotificationsInner::WithoutHead(notifications) => { - notifications.with_head(exex_head) - } - ExExNotificationsInner::WithHead(notifications) => notifications, - }) + ExExNotificationsInner::WithHead(notifications) => notifications, }); } } @@ -153,7 +95,8 @@ where self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - match self.get_mut().inner_mut() { + match &mut self.get_mut().inner { + ExExNotificationsInner::Invalid => unreachable!(), ExExNotificationsInner::WithoutHead(notifications) => { notifications.poll_next_unpin(cx).map(|result| result.map(Ok)) } @@ -182,7 +125,7 @@ impl Debug for ExExNotificationsWithoutHead { } impl ExExNotificationsWithoutHead { - /// Creates a new instance of [`ExExNotifications`]. + /// Creates a new instance of [`ExExNotificationsWithoutHead`]. const fn new( node_head: Head, provider: P, diff --git a/crates/exex/test-utils/src/lib.rs b/crates/exex/test-utils/src/lib.rs index 18745e989e91..f4561a6b55f2 100644 --- a/crates/exex/test-utils/src/lib.rs +++ b/crates/exex/test-utils/src/lib.rs @@ -317,7 +317,7 @@ pub async fn test_exex_context_with_chain_spec( let (events_tx, events_rx) = tokio::sync::mpsc::unbounded_channel(); let (notifications_tx, notifications_rx) = tokio::sync::mpsc::channel(1); - let notifications = ExExNotifications::new_without_head( + let notifications = ExExNotifications::new( head, components.provider.clone(), components.components.executor.clone(), From 94333b0963829a077184db6874f554e50fa29691 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 7 Oct 2024 12:35:41 +0100 Subject: [PATCH 4/7] return methods on notifications --- crates/exex/exex/src/notifications.rs | 52 +++++++++++++++++++++++++-- 1 file changed, 50 insertions(+), 2 deletions(-) diff --git a/crates/exex/exex/src/notifications.rs b/crates/exex/exex/src/notifications.rs index 8d13df822152..1aff6dd034ff 100644 --- a/crates/exex/exex/src/notifications.rs +++ b/crates/exex/exex/src/notifications.rs @@ -54,7 +54,9 @@ impl ExExNotifications { /// Sets [`ExExNotifications`] to a stream of [`ExExNotification`]s without a head. /// /// It's a no-op if the stream has already been configured without a head. - pub(super) fn set_without_head(&mut self) { + /// + /// See the documentation of [`ExExNotificationsWithoutHead`] for more details. + pub fn set_without_head(&mut self) { let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid); self.inner = ExExNotificationsInner::WithoutHead(match current { ExExNotificationsInner::Invalid => unreachable!(), @@ -69,10 +71,30 @@ impl ExExNotifications { }); } + /// Returns a new [`ExExNotifications`] without a head. + /// + /// See the documentation of [`ExExNotificationsWithoutHead`] for more details. + pub fn without_head(mut self) -> Self { + self.inner = ExExNotificationsInner::WithoutHead(match self.inner { + ExExNotificationsInner::Invalid => unreachable!(), + ExExNotificationsInner::WithoutHead(notifications) => notifications, + ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithoutHead::new( + notifications.node_head, + notifications.provider, + notifications.executor, + notifications.notifications, + notifications.wal_handle, + ), + }); + self + } + /// Sets [`ExExNotifications`] to a stream of [`ExExNotification`]s with the provided head. /// /// It's a no-op if the stream has already been configured with a head. - pub(super) fn set_with_head(&mut self, exex_head: ExExHead) { + /// + /// See the documentation of [`ExExNotificationsWithHead`] for more details. + pub fn set_with_head(&mut self, exex_head: ExExHead) { let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid); self.inner = ExExNotificationsInner::WithHead(match current { ExExNotificationsInner::Invalid => unreachable!(), @@ -82,6 +104,32 @@ impl ExExNotifications { ExExNotificationsInner::WithHead(notifications) => notifications, }); } + + /// Returns a new [`ExExNotifications`] with the provided head. + /// + /// See the documentation of [`ExExNotificationsWithHead`] for more details. + pub fn with_head(mut self, exex_head: ExExHead) -> Self { + self.inner = ExExNotificationsInner::WithHead(match self.inner { + ExExNotificationsInner::Invalid => unreachable!(), + ExExNotificationsInner::WithoutHead(notifications) => ExExNotificationsWithHead::new( + notifications.node_head, + notifications.provider, + notifications.executor, + notifications.notifications, + notifications.wal_handle, + exex_head, + ), + ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithHead::new( + notifications.node_head, + notifications.provider, + notifications.executor, + notifications.notifications, + notifications.wal_handle, + exex_head, + ), + }); + self + } } impl Stream for ExExNotifications From 8be28b8ce5a602b53945684def286a7e666a9cd8 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 7 Oct 2024 12:55:50 +0100 Subject: [PATCH 5/7] improve docs --- crates/exex/exex/src/notifications.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/exex/exex/src/notifications.rs b/crates/exex/exex/src/notifications.rs index 1aff6dd034ff..d0cb560baed7 100644 --- a/crates/exex/exex/src/notifications.rs +++ b/crates/exex/exex/src/notifications.rs @@ -184,13 +184,7 @@ impl ExExNotificationsWithoutHead { Self { node_head, provider, executor, notifications, wal_handle } } - /// Subscribe to notifications with the given head. This head is the ExEx's - /// latest view of the host chain. - /// - /// Notifications will be sent starting from the head, not inclusive. For - /// example, if `head.number == 10`, then the first notification will be - /// with `block.number == 11`. A `head.number` of 10 indicates that the ExEx - /// has processed up to block 10, and is ready to process block 11. + /// Subscribe to notifications with the given head. fn with_head(self, head: ExExHead) -> ExExNotificationsWithHead { ExExNotificationsWithHead::new( self.node_head, @@ -212,7 +206,13 @@ impl Stream for ExExNotificationsWithoutHead { } /// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that are -/// committed or reverted after the given head. +/// committed or reverted after the given head. The head is the ExEx's latest view of the host +/// chain. +/// +/// Notifications will be sent starting from the head, not inclusive. For example, if +/// `exex_head.number == 10`, then the first notification will be with `block.number == 11`. An +/// `exex_head.number` of 10 indicates that the ExEx has processed up to block 10, and is ready to +/// process block 11. #[derive(Debug)] pub struct ExExNotificationsWithHead { node_head: Head, From b3fbb6287104be89251cf4c07ff22f281f429211 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 7 Oct 2024 15:57:18 +0100 Subject: [PATCH 6/7] clarify why with_head exists --- crates/exex/exex/src/notifications.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/exex/exex/src/notifications.rs b/crates/exex/exex/src/notifications.rs index d0cb560baed7..fe92f38d1cfe 100644 --- a/crates/exex/exex/src/notifications.rs +++ b/crates/exex/exex/src/notifications.rs @@ -13,7 +13,9 @@ use std::{ }; use tokio::sync::mpsc::Receiver; -/// A stream of [`ExExNotification`]s. +/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. If the +/// stream is configured with a head via [`ExExNotifications::set_with_head`] or +/// [`ExExNotifications::with_head`], it will run backfill jobs to catch up to the node head. #[derive(Debug)] pub struct ExExNotifications { inner: ExExNotificationsInner, From 4ff33ef4b8766866270ff69bd27f78c71d32757c Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 7 Oct 2024 15:59:35 +0100 Subject: [PATCH 7/7] nits after review --- crates/exex/exex/src/notifications.rs | 49 ++++++++------------------- 1 file changed, 14 insertions(+), 35 deletions(-) diff --git a/crates/exex/exex/src/notifications.rs b/crates/exex/exex/src/notifications.rs index fe92f38d1cfe..6efdb1775cf1 100644 --- a/crates/exex/exex/src/notifications.rs +++ b/crates/exex/exex/src/notifications.rs @@ -23,14 +23,14 @@ pub struct ExExNotifications { #[derive(Debug)] enum ExExNotificationsInner { - /// Internal state used when transitioning between [`ExExNotificationsInner::WithoutHead`] and - /// [`ExExNotificationsInner::WithHead`]. - Invalid, /// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. WithoutHead(ExExNotificationsWithoutHead), /// A stream of [`ExExNotification`]s. The stream will only emit notifications for blocks that /// are committed or reverted after the given head. WithHead(ExExNotificationsWithHead), + /// Internal state used when transitioning between [`ExExNotificationsInner::WithoutHead`] and + /// [`ExExNotificationsInner::WithHead`]. + Invalid, } impl ExExNotifications { @@ -61,7 +61,6 @@ impl ExExNotifications { pub fn set_without_head(&mut self) { let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid); self.inner = ExExNotificationsInner::WithoutHead(match current { - ExExNotificationsInner::Invalid => unreachable!(), ExExNotificationsInner::WithoutHead(notifications) => notifications, ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithoutHead::new( notifications.node_head, @@ -70,6 +69,7 @@ impl ExExNotifications { notifications.notifications, notifications.wal_handle, ), + ExExNotificationsInner::Invalid => unreachable!(), }); } @@ -77,17 +77,7 @@ impl ExExNotifications { /// /// See the documentation of [`ExExNotificationsWithoutHead`] for more details. pub fn without_head(mut self) -> Self { - self.inner = ExExNotificationsInner::WithoutHead(match self.inner { - ExExNotificationsInner::Invalid => unreachable!(), - ExExNotificationsInner::WithoutHead(notifications) => notifications, - ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithoutHead::new( - notifications.node_head, - notifications.provider, - notifications.executor, - notifications.notifications, - notifications.wal_handle, - ), - }); + self.set_without_head(); self } @@ -99,28 +89,9 @@ impl ExExNotifications { pub fn set_with_head(&mut self, exex_head: ExExHead) { let current = std::mem::replace(&mut self.inner, ExExNotificationsInner::Invalid); self.inner = ExExNotificationsInner::WithHead(match current { - ExExNotificationsInner::Invalid => unreachable!(), ExExNotificationsInner::WithoutHead(notifications) => { notifications.with_head(exex_head) } - ExExNotificationsInner::WithHead(notifications) => notifications, - }); - } - - /// Returns a new [`ExExNotifications`] with the provided head. - /// - /// See the documentation of [`ExExNotificationsWithHead`] for more details. - pub fn with_head(mut self, exex_head: ExExHead) -> Self { - self.inner = ExExNotificationsInner::WithHead(match self.inner { - ExExNotificationsInner::Invalid => unreachable!(), - ExExNotificationsInner::WithoutHead(notifications) => ExExNotificationsWithHead::new( - notifications.node_head, - notifications.provider, - notifications.executor, - notifications.notifications, - notifications.wal_handle, - exex_head, - ), ExExNotificationsInner::WithHead(notifications) => ExExNotificationsWithHead::new( notifications.node_head, notifications.provider, @@ -129,7 +100,15 @@ impl ExExNotifications { notifications.wal_handle, exex_head, ), + ExExNotificationsInner::Invalid => unreachable!(), }); + } + + /// Returns a new [`ExExNotifications`] with the provided head. + /// + /// See the documentation of [`ExExNotificationsWithHead`] for more details. + pub fn with_head(mut self, exex_head: ExExHead) -> Self { + self.set_with_head(exex_head); self } } @@ -146,11 +125,11 @@ where cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { match &mut self.get_mut().inner { - ExExNotificationsInner::Invalid => unreachable!(), ExExNotificationsInner::WithoutHead(notifications) => { notifications.poll_next_unpin(cx).map(|result| result.map(Ok)) } ExExNotificationsInner::WithHead(notifications) => notifications.poll_next_unpin(cx), + ExExNotificationsInner::Invalid => unreachable!(), } } }