Skip to content

Commit

Permalink
feat: end to end entry point routing
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Mar 28, 2024
1 parent 89a3684 commit dc9bd27
Show file tree
Hide file tree
Showing 26 changed files with 1,126 additions and 451 deletions.
34 changes: 25 additions & 9 deletions bin/rundler/src/cli/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ impl BuilderArgs {
.context("should have a node HTTP URL")?;
let submit_url = self.submit_url.clone().unwrap_or_else(|| rpc_url.clone());

// TODO these should be scoped by entry point
let mempool_configs = match &common.mempool_config_path {
Some(path) => {
get_json_config::<HashMap<H256, MempoolConfig>>(path, &common.aws_region).await?
Expand All @@ -200,14 +199,31 @@ impl BuilderArgs {
};

Ok(BuilderTaskArgs {
// TODO: support multiple entry points
entry_points: vec![EntryPointBuilderSettings {
address: chain_spec.entry_point_address,
version: EntryPointVersion::V0_6,
num_bundle_builders: common.num_builders,
bundle_builder_index_offset: self.builder_index_offset,
mempool_configs,
}],
// TODO determine better builder config settings
entry_points: vec![
EntryPointBuilderSettings {
address: chain_spec.entry_point_address_v0_6,
version: EntryPointVersion::V0_6,
num_bundle_builders: common.num_builders / 2,
bundle_builder_index_offset: self.builder_index_offset,
mempool_configs: mempool_configs
.iter()
.filter(|(_, v)| v.entry_point() == chain_spec.entry_point_address_v0_6)
.map(|(k, v)| (*k, v.clone()))
.collect(),
},
EntryPointBuilderSettings {
address: chain_spec.entry_point_address_v0_7,
version: EntryPointVersion::V0_7,
num_bundle_builders: common.num_builders / 2,
bundle_builder_index_offset: self.builder_index_offset,
mempool_configs: mempool_configs
.iter()
.filter(|(_, v)| v.entry_point() == chain_spec.entry_point_address_v0_7)
.map(|(k, v)| (*k, v.clone()))
.collect(),
},
],
chain_spec,
unsafe_mode: common.unsafe_mode,
rpc_url,
Expand Down
40 changes: 32 additions & 8 deletions bin/rundler/src/cli/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{collections::HashMap, net::SocketAddr, time::Duration};

use anyhow::Context;
use clap::Args;
use ethers::types::H256;
use ethers::types::{Address, H256};
use rundler_pool::{LocalPoolBuilder, PoolConfig, PoolTask, PoolTaskArgs};
use rundler_sim::MempoolConfig;
use rundler_task::spawn_tasks_with_shutdown;
Expand Down Expand Up @@ -182,20 +182,21 @@ impl PoolArgs {

let chain_id = chain_spec.id;
// TODO(danc): multiple pool configs
let pool_config = PoolConfig {
entry_point: chain_spec.entry_point_address,
entry_point_version: EntryPointVersion::V0_6,
let pool_config_base = PoolConfig {
// update per entry point
entry_point: Address::default(),
entry_point_version: EntryPointVersion::Unspecified,
num_shards: 0,
mempool_channel_configs: HashMap::new(),
// Base config
chain_id,
// Currently use the same shard count as the number of builders
num_shards: common.num_builders,
same_sender_mempool_count: self.same_sender_mempool_count,
min_replacement_fee_increase_percentage: self.min_replacement_fee_increase_percentage,
max_size_of_pool_bytes: self.max_size_in_bytes,
blocklist: blocklist.clone(),
allowlist: allowlist.clone(),
precheck_settings: common.try_into()?,
sim_settings: common.into(),
mempool_channel_configs: mempool_channel_configs.clone(),
throttled_entity_mempool_count: self.throttled_entity_mempool_count,
throttled_entity_live_blocks: self.throttled_entity_live_blocks,
paymaster_tracking_enabled: self.paymaster_tracking_enabled,
Expand All @@ -204,6 +205,29 @@ impl PoolArgs {
drop_min_num_blocks: self.drop_min_num_blocks,
};

let pool_config_v0_6 = PoolConfig {
entry_point: chain_spec.entry_point_address_v0_6,
entry_point_version: EntryPointVersion::V0_6,
num_shards: common.num_builders / 2,
mempool_channel_configs: mempool_channel_configs
.iter()
.filter(|(_, v)| v.entry_point() == chain_spec.entry_point_address_v0_6)
.map(|(k, v)| (*k, v.clone()))
.collect(),
..pool_config_base.clone()
};
let pool_config_v0_7 = PoolConfig {
entry_point: chain_spec.entry_point_address_v0_7,
entry_point_version: EntryPointVersion::V0_7,
num_shards: common.num_builders / 2,
mempool_channel_configs: mempool_channel_configs
.iter()
.filter(|(_, v)| v.entry_point() == chain_spec.entry_point_address_v0_7)
.map(|(k, v)| (*k, v.clone()))
.collect(),
..pool_config_base.clone()
};

Ok(PoolTaskArgs {
chain_spec,
unsafe_mode: common.unsafe_mode,
Expand All @@ -212,7 +236,7 @@ impl PoolArgs {
.clone()
.context("pool requires node_http arg")?,
http_poll_interval: Duration::from_millis(common.eth_poll_interval_millis),
pool_configs: vec![pool_config],
pool_configs: vec![pool_config_v0_6, pool_config_v0_7],
remote_address,
chain_update_channel_capacity: self.chain_update_channel_capacity.unwrap_or(1024),
})
Expand Down
169 changes: 119 additions & 50 deletions crates/builder/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,21 @@ use ethers::{
use ethers_signers::Signer;
use futures::future;
use futures_util::TryFutureExt;
use rundler_provider::{EntryPointProvider, EthersEntryPointV0_6, Provider};
use rundler_provider::{EntryPointProvider, EthersEntryPointV0_6, EthersEntryPointV0_7, Provider};
use rundler_sim::{
simulation::v0_6::{
SimulateValidationTracerImpl as SimulateValidationTracerImplV0_6,
Simulator as SimulatorV0_6, UnsafeSimulator as UnsafeSimulatorV0_6,
simulation::{
v0_6::{
SimulateValidationTracerImpl as SimulateValidationTracerImplV0_6,
Simulator as SimulatorV0_6,
},
UnsafeSimulator,
},
MempoolConfig, PriorityFeeMode, SimulationSettings, Simulator,
};
use rundler_task::Task;
use rundler_types::{
chain::ChainSpec, pool::Pool, v0_6, EntryPointVersion, UserOperation, UserOperationVariant,
chain::ChainSpec, pool::Pool, v0_6, v0_7, EntryPointVersion, UserOperation,
UserOperationVariant,
};
use rundler_utils::{emit::WithEntryPoint, handle};
use rusoto_core::Region;
Expand Down Expand Up @@ -143,45 +147,36 @@ where
rundler_provider::new_provider(&self.args.rpc_url, Some(self.args.eth_poll_interval))?;

let ep_v0_6 = EthersEntryPointV0_6::new(
self.args.chain_spec.entry_point_address,
self.args.chain_spec.entry_point_address_v0_6,
Arc::clone(&provider),
);
let ep_v0_7 = EthersEntryPointV0_7::new(
self.args.chain_spec.entry_point_address_v0_7,
Arc::clone(&provider),
);

let mut sender_handles = vec![];
let mut bundle_sender_actions = vec![];

for ep in &self.args.entry_points {
// TODO entry point v0.7: needs 0.7 EP and simulator
if ep.version != EntryPointVersion::V0_6 {
bail!("Unsupported entry point version: {:?}", ep.version);
}

info!("Mempool config for ep v0.6: {:?}", ep.mempool_configs);

for i in 0..ep.num_bundle_builders {
let (spawn_guard, bundle_sender_action) = if self.args.unsafe_mode {
self.create_bundle_builder(
i + ep.bundle_builder_index_offset,
Arc::clone(&provider),
ep_v0_6.clone(),
self.create_unsafe_simulator_v0_6(Arc::clone(&provider), ep_v0_6.clone()),
)
.await?
} else {
self.create_bundle_builder(
i + ep.bundle_builder_index_offset,
Arc::clone(&provider),
ep_v0_6.clone(),
self.create_simulator_v0_6(
Arc::clone(&provider),
ep_v0_6.clone(),
ep.mempool_configs.clone(),
),
)
.await?
};
sender_handles.push(spawn_guard);
bundle_sender_actions.push(bundle_sender_action);
match ep.version {
EntryPointVersion::V0_6 => {
let (handles, actions) = self
.create_builders_v0_6(ep, Arc::clone(&provider), ep_v0_6.clone())
.await?;
sender_handles.extend(handles);
bundle_sender_actions.extend(actions);
}
EntryPointVersion::V0_7 => {
let (handles, actions) = self
.create_builders_v0_7(ep, Arc::clone(&provider), ep_v0_7.clone())
.await?;
sender_handles.extend(handles);
bundle_sender_actions.extend(actions);
}
EntryPointVersion::Unspecified => {
panic!("Unspecified entry point version")
}
}
}

Expand All @@ -195,7 +190,7 @@ where
let builder_handle = self.builder_builder.get_handle();
let builder_runnder_handle = self.builder_builder.run(
bundle_sender_actions,
vec![self.args.chain_spec.entry_point_address],
vec![self.args.chain_spec.entry_point_address_v0_6],
shutdown_token.clone(),
);

Expand Down Expand Up @@ -255,6 +250,92 @@ where
Box::new(self)
}

async fn create_builders_v0_6<C, E>(
&self,
ep: &EntryPointBuilderSettings,
provider: Arc<EthersProvider<C>>,
ep_v0_6: E,
) -> anyhow::Result<(
Vec<JoinHandle<anyhow::Result<()>>>,
Vec<mpsc::Sender<BundleSenderAction>>,
)>
where
C: JsonRpcClient + 'static,
E: EntryPointProvider<v0_6::UserOperation> + Clone,
{
info!("Mempool config for ep v0.6: {:?}", ep.mempool_configs);
let mut sender_handles = vec![];
let mut bundle_sender_actions = vec![];
for i in 0..ep.num_bundle_builders {
let (spawn_guard, bundle_sender_action) = if self.args.unsafe_mode {
self.create_bundle_builder(
i + ep.bundle_builder_index_offset,
Arc::clone(&provider),
ep_v0_6.clone(),
UnsafeSimulator::new(
Arc::clone(&provider),
ep_v0_6.clone(),
self.args.sim_settings,
),
)
.await?
} else {
self.create_bundle_builder(
i + ep.bundle_builder_index_offset,
Arc::clone(&provider),
ep_v0_6.clone(),
self.create_simulator_v0_6(
Arc::clone(&provider),
ep_v0_6.clone(),
ep.mempool_configs.clone(),
),
)
.await?
};
sender_handles.push(spawn_guard);
bundle_sender_actions.push(bundle_sender_action);
}
Ok((sender_handles, bundle_sender_actions))
}

async fn create_builders_v0_7<C, E>(
&self,
ep: &EntryPointBuilderSettings,
provider: Arc<EthersProvider<C>>,
ep_v0_7: E,
) -> anyhow::Result<(
Vec<JoinHandle<anyhow::Result<()>>>,
Vec<mpsc::Sender<BundleSenderAction>>,
)>
where
C: JsonRpcClient + 'static,
E: EntryPointProvider<v0_7::UserOperation> + Clone,
{
info!("Mempool config for ep v0.7: {:?}", ep.mempool_configs);
let mut sender_handles = vec![];
let mut bundle_sender_actions = vec![];
for i in 0..ep.num_bundle_builders {
let (spawn_guard, bundle_sender_action) = if self.args.unsafe_mode {
self.create_bundle_builder(
i + ep.bundle_builder_index_offset,
Arc::clone(&provider),
ep_v0_7.clone(),
UnsafeSimulator::new(
Arc::clone(&provider),
ep_v0_7.clone(),
self.args.sim_settings,
),
)
.await?
} else {
panic!("V0.7 safe simulation not implemented")
};
sender_handles.push(spawn_guard);
bundle_sender_actions.push(bundle_sender_action);
}
Ok((sender_handles, bundle_sender_actions))
}

async fn create_bundle_builder<UO, E, S, C>(
&self,
index: u64,
Expand Down Expand Up @@ -395,16 +476,4 @@ where
mempool_configs,
)
}

fn create_unsafe_simulator_v0_6<C, E>(
&self,
provider: Arc<C>,
ep: E,
) -> UnsafeSimulatorV0_6<C, E>
where
C: Provider,
E: EntryPointProvider<v0_6::UserOperation> + Clone,
{
UnsafeSimulatorV0_6::new(Arc::clone(&provider), ep, self.args.sim_settings)
}
}
Loading

0 comments on commit dc9bd27

Please sign in to comment.