Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(exex): subscribe to notifications with head using ExExContext #11500

Merged
merged 7 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion crates/exex/exex/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::fmt::Debug;

use reth_exex_types::ExExHead;
use reth_node_api::{FullNodeComponents, NodeTypes, NodeTypesWithEngine};
use reth_node_core::node_config::NodeConfig;
use reth_primitives::Head;
Expand Down Expand Up @@ -32,7 +33,7 @@ pub struct ExExContext<Node: FullNodeComponents> {
/// considered delivered by the node.
pub notifications: ExExNotifications<Node::Provider, Node::Executor>,

/// node components
/// Node components
pub components: Node,
}

Expand Down Expand Up @@ -92,4 +93,16 @@ impl<Node: FullNodeComponents> ExExContext<Node> {
pub fn task_executor(&self) -> &TaskExecutor {
self.components.task_executor()
}

/// Sets notifications stream to [`crate::ExExNotificationsWithoutHead`], a stream of
/// notifications without a head.
pub fn set_notifications_without_head(&mut self) {
self.notifications.set_without_head();
}

/// Sets notifications stream to [`crate::ExExNotificationsWithHead`], a stream of notifications
/// with the provided head.
pub fn set_notifications_with_head(&mut self, head: ExExHead) {
self.notifications.set_with_head(head);
}
}
119 changes: 78 additions & 41 deletions crates/exex/exex/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,12 +610,16 @@ impl Clone for ExExManagerHandle {
mod tests {
use super::*;
use alloy_primitives::B256;
use eyre::OptionExt;
use futures::{FutureExt, StreamExt};
use futures::StreamExt;
use rand::Rng;
use reth_db_common::init::init_genesis;
use reth_evm_ethereum::execute::EthExecutorProvider;
use reth_primitives::SealedBlockWithSenders;
use reth_provider::{test_utils::create_test_provider_factory, BlockWriter, Chain};
use reth_testing_utils::generators::{self, random_block};
use reth_provider::{
providers::BlockchainProvider2, test_utils::create_test_provider_factory, BlockReader,
Chain, TransactionVariant,
};
use reth_testing_utils::generators;

fn empty_finalized_header_stream() -> ForkChoiceStream<SealedHeader> {
let (tx, rx) = watch::channel(None);
Expand Down Expand Up @@ -975,11 +979,20 @@ mod tests {

#[tokio::test]
async fn exex_handle_new() {
let provider_factory = create_test_provider_factory();
init_genesis(&provider_factory).unwrap();
let provider = BlockchainProvider2::new(provider_factory).unwrap();

let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();

let (mut exex_handle, _, mut notifications) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
let (mut exex_handle, _, mut notifications) = ExExHandle::new(
"test_exex".to_string(),
Head::default(),
provider,
EthExecutorProvider::mainnet(),
wal.handle(),
);

// Check initial state
assert_eq!(exex_handle.id, "test_exex");
Expand Down Expand Up @@ -1008,7 +1021,7 @@ mod tests {
// Send a notification and ensure it's received correctly
match exex_handle.send(&mut cx, &(22, notification.clone())) {
Poll::Ready(Ok(())) => {
let received_notification = notifications.next().await.unwrap();
let received_notification = notifications.next().await.unwrap().unwrap();
assert_eq!(received_notification, notification);
}
Poll::Pending => panic!("Notification send is pending"),
Expand All @@ -1021,11 +1034,20 @@ mod tests {

#[tokio::test]
async fn test_notification_if_finished_height_gt_chain_tip() {
let provider_factory = create_test_provider_factory();
init_genesis(&provider_factory).unwrap();
let provider = BlockchainProvider2::new(provider_factory).unwrap();

let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();

let (mut exex_handle, _, mut notifications) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
let (mut exex_handle, _, mut notifications) = ExExHandle::new(
"test_exex".to_string(),
Head::default(),
provider,
EthExecutorProvider::mainnet(),
wal.handle(),
);

// Set finished_height to a value higher than the block tip
exex_handle.finished_height = Some(BlockNumHash::new(15, B256::random()));
Expand All @@ -1046,11 +1068,7 @@ mod tests {
poll_fn(|cx| {
// The notification should be skipped, so nothing should be sent.
// Check that the receiver channel is indeed empty
assert_eq!(
notifications.poll_next_unpin(cx),
Poll::Pending,
"Receiver channel should be empty"
);
assert!(notifications.poll_next_unpin(cx).is_pending());
Poll::Ready(())
})
.await;
Expand All @@ -1066,11 +1084,20 @@ mod tests {

#[tokio::test]
async fn test_sends_chain_reorged_notification() {
let provider_factory = create_test_provider_factory();
init_genesis(&provider_factory).unwrap();
let provider = BlockchainProvider2::new(provider_factory).unwrap();

let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();

let (mut exex_handle, _, mut notifications) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
let (mut exex_handle, _, mut notifications) = ExExHandle::new(
"test_exex".to_string(),
Head::default(),
provider,
EthExecutorProvider::mainnet(),
wal.handle(),
);

let notification = ExExNotification::ChainReorged {
old: Arc::new(Chain::default()),
Expand All @@ -1086,7 +1113,7 @@ mod tests {
// Send the notification
match exex_handle.send(&mut cx, &(22, notification.clone())) {
Poll::Ready(Ok(())) => {
let received_notification = notifications.next().await.unwrap();
let received_notification = notifications.next().await.unwrap().unwrap();
assert_eq!(received_notification, notification);
}
Poll::Pending | Poll::Ready(Err(_)) => {
Expand All @@ -1100,11 +1127,20 @@ mod tests {

#[tokio::test]
async fn test_sends_chain_reverted_notification() {
let provider_factory = create_test_provider_factory();
init_genesis(&provider_factory).unwrap();
let provider = BlockchainProvider2::new(provider_factory).unwrap();

let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();

let (mut exex_handle, _, mut notifications) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
let (mut exex_handle, _, mut notifications) = ExExHandle::new(
"test_exex".to_string(),
Head::default(),
provider,
EthExecutorProvider::mainnet(),
wal.handle(),
);

let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) };

Expand All @@ -1117,7 +1153,7 @@ mod tests {
// Send the notification
match exex_handle.send(&mut cx, &(22, notification.clone())) {
Poll::Ready(Ok(())) => {
let received_notification = notifications.next().await.unwrap();
let received_notification = notifications.next().await.unwrap().unwrap();
assert_eq!(received_notification, notification);
}
Poll::Pending | Poll::Ready(Err(_)) => {
Expand All @@ -1135,30 +1171,34 @@ mod tests {

let mut rng = generators::rng();

let provider_factory = create_test_provider_factory();
let genesis_hash = init_genesis(&provider_factory).unwrap();
let genesis_block = provider_factory
.sealed_block_with_senders(genesis_hash.into(), TransactionVariant::NoHash)
.unwrap()
.ok_or_else(|| eyre::eyre!("genesis block not found"))?;
let provider = BlockchainProvider2::new(provider_factory).unwrap();

let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();

let provider_factory = create_test_provider_factory();

let block = random_block(&mut rng, 0, Default::default())
.seal_with_senders()
.ok_or_eyre("failed to recover senders")?;
let provider_rw = provider_factory.provider_rw()?;
provider_rw.insert_block(block.clone())?;
provider_rw.commit()?;
let (exex_handle, events_tx, mut notifications) = ExExHandle::new(
"test_exex".to_string(),
Head::default(),
provider.clone(),
EthExecutorProvider::mainnet(),
wal.handle(),
);

let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block.clone()], Default::default(), None)),
new: Arc::new(Chain::new(vec![genesis_block.clone()], Default::default(), None)),
};

let (finalized_headers_tx, rx) = watch::channel(None);
let finalized_header_stream = ForkChoiceStream::new(rx);

let (exex_handle, events_tx, mut notifications) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());

let mut exex_manager = std::pin::pin!(ExExManager::new(
provider_factory,
provider,
vec![exex_handle],
1,
wal,
Expand All @@ -1170,16 +1210,13 @@ mod tests {
exex_manager.handle().send(notification.clone())?;

assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
assert_eq!(
notifications.next().poll_unpin(&mut cx),
Poll::Ready(Some(notification.clone()))
);
assert_eq!(notifications.next().await.unwrap().unwrap(), notification.clone());
assert_eq!(
exex_manager.wal.iter_notifications()?.collect::<eyre::Result<Vec<_>>>()?,
[notification.clone()]
);

finalized_headers_tx.send(Some(block.header.clone()))?;
finalized_headers_tx.send(Some(genesis_block.header.clone()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL isn't finalized because the ExEx didn't emit the `FinishedHeight` event
assert_eq!(
Expand All @@ -1192,7 +1229,7 @@ mod tests {
.send(ExExEvent::FinishedHeight((rng.gen::<u64>(), rng.gen::<B256>()).into()))
.unwrap();

finalized_headers_tx.send(Some(block.header.clone()))?;
finalized_headers_tx.send(Some(genesis_block.header.clone()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL isn't finalized because the ExEx emitted a `FinishedHeight` event with a
// non-canonical block
Expand All @@ -1202,9 +1239,9 @@ mod tests {
);

// Send a `FinishedHeight` event with a canonical block
events_tx.send(ExExEvent::FinishedHeight(block.num_hash())).unwrap();
events_tx.send(ExExEvent::FinishedHeight(genesis_block.num_hash())).unwrap();

finalized_headers_tx.send(Some(block.header.clone()))?;
finalized_headers_tx.send(Some(genesis_block.header.clone()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL is finalized
assert!(exex_manager.wal.iter_notifications()?.next().is_none());
Expand Down
Loading
Loading