Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Profile tx hash fetching #12842

Closed
mattsse opened this issue Nov 25, 2024 · 9 comments
Closed

Profile tx hash fetching #12842

mattsse opened this issue Nov 25, 2024 · 9 comments
Assignees
Labels
C-enhancement New feature or request C-perf A change motivated by improving speed, memory usage or disk footprint

Comments

@mattsse
Copy link
Collaborator

mattsse commented Nov 25, 2024

Describe the feature

this section appears to eat up the entire poll duration of the service

// Tries to drain hashes pending fetch cache if the tx manager currently has
// capacity for this (fetch txns).
//
// Sends at most one request.
duration_metered_exec!(
{
if this.has_capacity_for_fetching_pending_hashes() {
this.on_fetch_hashes_pending_fetch();
}
},
poll_durations.acc_pending_fetch
);

I assume this is the bottleneck

/// Tries to request hashes pending fetch.
///
/// Finds the first buffered hash with a fallback peer that is idle, if any. Fills the rest of
/// the request by checking the transactions seen by the peer against the buffer.
pub fn on_fetch_pending_hashes(

To narrow down improvements, we want to profile this, for this we'd need some test setup, similar to this setup

pub fn broadcast_ingress_bench(c: &mut Criterion) {

with a Testnet and transactions and profile (via samply) where we spent the most time, and ideally derive improvement suggestions from that.

TODO

  • mock txpool and gossip
  • profile TransactionsManager::poll hash fetching

see metrics
https://reth.paradigm.xyz/d/d47d679c-c3b8-40b6-852d-cbfaa2dcdb37/reth-transaction-pool?orgId=1&refresh=30s&viewPanel=200

Additional context

#12838

No response

@mattsse mattsse added C-enhancement New feature or request S-needs-triage This issue needs to be labelled labels Nov 25, 2024
@mattsse mattsse self-assigned this Nov 25, 2024
@mattsse mattsse added C-perf A change motivated by improving speed, memory usage or disk footprint and removed S-needs-triage This issue needs to be labelled labels Nov 25, 2024
@hai-rise
Copy link
Contributor

We found this bad boy in DHAT last week too:
Image

on_fetch_pending_hashes allocates 17GB in a few minutes for a chain under load. Not familiar with the code base so still in our backlog at the moment 😅.

@mattsse
Copy link
Collaborator Author

mattsse commented Nov 25, 2024

yeah, this doesn't look right:

let init_capacity_req = approx_capacity_get_pooled_transactions_req_eth68(&self.info);
let mut hashes_to_request = RequestTxHashes::with_capacity(init_capacity_req);

but I suspect there are a few things that are suboptimal.

on it @hai-rise

@Elvis339
Copy link
Contributor

Hello, can I take this issue?

@mattsse mattsse assigned Elvis339 and unassigned Elvis339 and mattsse Nov 25, 2024
@Elvis339
Copy link
Contributor

Hello!

Yesterday while I was working on this I was running into chicken and egg problem because I didn't understand the scope;

  • Should the benchmark focus mostly on the on_fetch_pending_hashes phase, or should it include the entire lifecycle? I guess it should because it will help us test @mattsse assumption.

After reading this: https://github.com/ethereum/devp2p/blob/master/caps/eth.md#transaction-exchange
I figured this might be one of the many approaches for benchmarking scenario:

  • Use the Testnet struct to spin up multiple peers, each with a transaction pool.
  • Fill Peer A’s pool with TxSet1 and Peer B’s pool with TxSet2 (no overlap initially).
  • Simulate transaction gossip:
    • Peer A announces TxSet1 to Peer B via NewPooledTransactionHashes.
    • Peer B buffers these hashes and fetches the transactions (GetPooledTransactions).

Let me know what you think, I would appreciate nudge in the right direction regarding the scenario.

@mattsse
Copy link
Collaborator Author

mattsse commented Nov 26, 2024

I guess it should

yeah, testing the entire TestNet <-> TestNet would be nice to have

this scenario sgtm.

because we want to check requesting hashes, you likely need to tune a few settings, for example I believe setting the

/// Determines how new pending transactions are propagated to other peers in full.
#[derive(Debug, Clone, Default)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum TransactionPropagationMode {
/// Send full transactions to sqrt of current peers.
#[default]
Sqrt,
/// Always send transactions in full.
All,
/// Send full transactions to a maximum number of peers
Max(usize),
}

to Max(0) would be beneficial here because this will ensure that we only broadcast hashes and then enter the request hashes logic.

to actually broadcast txs, you can take a look at this setup:

// ensure the sender has balance
let sender = tx.sender();
provider.add_account(sender, ExtendedAccount::new(0, U256::from(100_000_000)));

@mattsse
Copy link
Collaborator Author

mattsse commented Nov 26, 2024

this scenario is very similar to

async fn test_tx_gossip() {

but more advanced with an emphasis on entering the tx fetch path, likely that we need a TestNet with a few more peers

@Elvis339
Copy link
Contributor

I am trying to simulate transaction fetch scenario in a network with three peers, but no progress so far...

  1. Setup:

    • Peer A announces a transaction hash to Peer B.
    • Peer B doesn't recognize the hash and tries to fetch the full transaction from Peer C.
    • Peer C, which holds the full transaction data, serves the request.
  2. Goal:

    • Trigger the transaction fetch path where Peer B actively requests full transactions for unknown hashes using the TransactionFetcher.

Approaching this is tricky, I tried different scenarios I'm never entering the tx fetch path. I'm never getting past this:

let budget_find_idle_fallback_peer = self

pub fn tx_fetch_bench(c: &mut Criterion) {
    let rt = TokioRuntime::new().unwrap();

    let mut group = c.benchmark_group("Transaction Fetch");
    group.sample_size(10);

    group.bench_function("fetch_transactions", |b| {
        b.to_async(&rt).iter_with_setup(
            || {
                tokio::task::block_in_place(|| {
                    tokio::runtime::Handle::current().block_on(async {
                        let provider = Arc::new(MockEthProvider::default());

                        // Create 3 peers for this scenario
                        let mut testnet = Testnet::create_with(3, provider.clone()).await;
                        let testnet = testnet.with_eth_pool();
                        let mut net = testnet;

                        let peers = net.peers_mut();

                        // Split peers into peer_a, peer_b, peer_c using split_at_mut to avoid multiple mutable borrows
                        let (first, rest) = peers.split_at_mut(1);
                        let peer_a = &mut first[0];
                        let (second, rest) = rest.split_at_mut(1);
                        let peer_b = &mut second[0];
                        let peer_c = &mut rest[0];

                        // Setup transaction managers for Peer A and Peer B
                        for peer in [peer_a, peer_b].iter_mut() {
                            if let Some(pool) = peer.pool().cloned() {
                                let (tx, rx) = unbounded_channel();
                                peer.network_mut().set_transactions(tx);

                                // Setup manager with hash-only propagation
                                let mut config = TransactionsManagerConfig::default();
                                config.propagation_mode = TransactionPropagationMode::Max(0);;

                                let tx_manager = TransactionsManager::new(
                                    peer.handle(),
                                    pool.clone(),
                                    rx,
                                    config,
                                );

                                peer.transactions_manager = Some(tx_manager);
                                peer.pool = Some(pool);
                            }
                        }

                        // For Peer C, add the transaction to its pool
                        let peer_c_pool = peer_c.pool().expect("Peer C has pool");
                        let mut gen = TransactionGenerator::new(thread_rng());
                        let tx_c = gen.gen_eip1559_pooled();
                        let sender_c = tx_c.sender();
                        provider.add_account(sender_c, ExtendedAccount::new(0, U256::from(100_000_000)));
                        let hash_c = peer_c_pool.add_external_transaction(tx_c).await.unwrap();
                        println!("Added transaction {} to Peer C's pool", hash_c);

                        let handle = net.spawn();

                        // Get peer handles
                        let peers_handles = handle.peers();
                        let peer_a_handle = peers_handles.get(0).expect("Peer A exists").clone();
                        let peer_b_handle = peers_handles.get(1).expect("Peer B exists").clone();
                        let peer_c_handle = peers_handles.get(2).expect("Peer C exists").clone();

                        println!(
                            "Connecting Peer A {} to Peer B {}",
                            peer_a_handle.peer_id(),
                            peer_b_handle.peer_id(),
                        );

                        // Connect Peer A to Peer B
                        peer_a_handle
                            .network()
                            .add_peer(*peer_b_handle.peer_id(), peer_b_handle.local_addr());

                        println!(
                            "Connecting Peer B {} to Peer C {}",
                            peer_b_handle.peer_id(),
                            peer_c_handle.peer_id(),
                        );

                        // Connect Peer B to Peer C
                        peer_b_handle
                            .network()
                            .add_peer(*peer_c_handle.peer_id(), peer_c_handle.local_addr());

                        // Wait for connections to establish
                        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

                        // Add transaction to Peer A's pool
                        let peer_a_pool = peer_a_handle.pool().expect("Peer A has pool");
                        let tx_a = gen.gen_eip1559_pooled();
                        let sender_a = tx_a.sender();
                        provider.add_account(sender_a, ExtendedAccount::new(0, U256::from(100_000_000)));
                        let hash_a = peer_a_pool.add_external_transaction(tx_a).await.unwrap();
                        println!("Added transaction {} to Peer A's pool", hash_a);

                        // Set up Peer B's listener to receive fetched transactions
                        let mut peer_b_tx_listener = peer_b_handle
                            .pool()
                            .expect("Peer B has pool")
                            .pending_transactions_listener();

                        // Return handle, listener, and expected hash
                        (handle, peer_b_tx_listener, hash_a)
                    })
                })
            },
            |(handle, mut peer_b_tx_listener, expected_hash)| async move {
                loop {
                    tokio::select! {
                        Some(received_hash) = peer_b_tx_listener.recv() => {
                            println!("Peer B received transaction hash {}", received_hash);
                            assert_eq!(received_hash, expected_hash, "Hash mismatch");

                            // Fetcher should fetch the transaction from Peer C
                            // For simplicity we are waititing
                            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

                            break;
                        }
                        _ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => {
                            panic!("Timed out waiting for fetch path");
                        }
                    }
                }

                // Terminate the Testnet after the benchmark iteration completes
                handle.terminate().await;
            },
        )
    });

    group.finish();
}

@mattsse
Copy link
Collaborator Author

mattsse commented Nov 27, 2024

this looks great!

could you please open this as a PR?

interesting, so this never triggers fetching?

perhaps installing tracing helps with debugging, and converting this into a test so it's easier to run

@Elvis339
Copy link
Contributor

Thank you @mattsse, so, I was able to trigger fetching from the test case: https://github.com/paradigmxyz/reth/blob/c10c3f4ccbb6f9804c437451ecbf8f023c35076e/crates/net/network/src/test_only.rs

Which I added to PR as a showcase only I will delete it later.

I see logs like:

2024-11-28T16:17:27.905303Z DEBUG net::tx: requesting hashes that were stored pending fetch from peer peer_id="0x47c9…bf8d" hashes={0x8147aeca38c016228f04c28d53d9150961c19778dc515d112c72b7878adcbf11, 0xdde1f4db8205cb6be399da2daecc308c957c77a94b5c58b402eb18cd0e8004a2, 0x61fa8891c228623507a60cb888ea4e68ff53ced63799036ccaafff16b94c19c0, 0xbc8e793ded5d2cf370c502b99391ec7ab16868413680102fd4a5c7ef0aab24b9} conn_eth_version=Eth68

But I'm having massive issues with samply on my machine, I will boot up my old Linux machine and try CPU profiling there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
C-enhancement New feature or request C-perf A change motivated by improving speed, memory usage or disk footprint
Projects
Archived in project
Development

No branches or pull requests

3 participants