Skip to content

Commit

Permalink
feat: continuous download (#1744)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rjected authored Mar 17, 2023
1 parent 241ec32 commit 1711d80
Show file tree
Hide file tree
Showing 6 changed files with 404 additions and 83 deletions.
103 changes: 78 additions & 25 deletions bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use reth_network::{
error::NetworkError, FetchClient, NetworkConfig, NetworkHandle, NetworkManager,
};
use reth_network_api::NetworkInfo;
use reth_primitives::{BlockHashOrNumber, ChainSpec, Head, H256};
use reth_primitives::{BlockHashOrNumber, ChainSpec, Head, SealedHeader, H256};
use reth_provider::{BlockProvider, HeaderProvider, ShareableDatabase};
use reth_rpc_engine_api::{EngineApi, EngineApiHandle};
use reth_staged_sync::{
Expand All @@ -50,7 +50,7 @@ use reth_staged_sync::{
};
use reth_stages::{
prelude::*,
stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage, FINISH},
stages::{ExecutionStage, HeaderStage, SenderRecoveryStage, TotalDifficultyStage, FINISH},
};
use reth_tasks::TaskExecutor;
use std::{
Expand Down Expand Up @@ -106,6 +106,12 @@ pub struct Command {
#[clap(flatten)]
network: NetworkArgs,

/// Prompt the downloader to download blocks one at a time.
///
/// NOTE: This is for testing purposes only.
#[arg(long = "debug.continuous", help_heading = "Debug")]
continuous: bool,

/// Set the chain tip manually for testing purposes.
///
/// NOTE: This is a temporary flag
Expand Down Expand Up @@ -173,6 +179,10 @@ impl Command {
.await?;
info!(target: "reth::cli", "Started RPC server");

if self.continuous {
info!(target: "reth::cli", "Continuous sync mode enabled");
}

let engine_api_handle =
self.init_engine_api(Arc::clone(&db), forkchoice_state_tx, &ctx.task_executor);
info!(target: "reth::cli", "Engine API handler initialized");
Expand Down Expand Up @@ -259,6 +269,7 @@ impl Command {
network.clone(),
consensus,
max_block,
self.continuous,
)
.await?;

Expand Down Expand Up @@ -391,17 +402,38 @@ impl Command {
fetch_client: FetchClient,
tip: H256,
) -> Result<u64, reth_interfaces::Error> {
if let Some(number) = db.view(|tx| tx.get::<tables::HeaderNumbers>(tip))?? {
info!(target: "reth::cli", ?tip, number, "Successfully looked up tip block number in the database");
return Ok(number)
Ok(self.fetch_tip(db, fetch_client, BlockHashOrNumber::Hash(tip)).await?.number)
}

/// Attempt to look up the block with the given number and return the header.
///
/// NOTE: The download is attempted with infinite retries.
async fn fetch_tip(
&self,
db: Arc<Env<WriteMap>>,
fetch_client: FetchClient,
tip: BlockHashOrNumber,
) -> Result<SealedHeader, reth_interfaces::Error> {
let tip_num = match tip {
BlockHashOrNumber::Hash(hash) => {
info!(target: "reth::cli", ?hash, "Fetching tip block from the network.");
db.view(|tx| tx.get::<tables::HeaderNumbers>(hash))??.unwrap()
}
BlockHashOrNumber::Number(number) => number,
};

// try to look up the header in the database
if let Some(header) = db.view(|tx| tx.get::<tables::Headers>(tip_num))?? {
info!(target: "reth::cli", ?tip, "Successfully looked up tip block in the database");
return Ok(header.seal_slow())
}

info!(target: "reth::cli", ?tip, "Fetching tip block number from the network.");
info!(target: "reth::cli", ?tip, "Fetching tip block from the network.");
loop {
match get_single_header(fetch_client.clone(), BlockHashOrNumber::Hash(tip)).await {
match get_single_header(fetch_client.clone(), tip).await {
Ok(tip_header) => {
info!(target: "reth::cli", ?tip, number = tip_header.number, "Successfully fetched tip block number");
return Ok(tip_header.number)
info!(target: "reth::cli", ?tip, "Successfully fetched tip");
return Ok(tip_header)
}
Err(error) => {
error!(target: "reth::cli", %error, "Failed to fetch the tip. Retrying...");
Expand Down Expand Up @@ -429,6 +461,7 @@ impl Command {
.build(ShareableDatabase::new(db, self.chain.clone()))
}

#[allow(clippy::too_many_arguments)]
async fn build_pipeline<H, B, U>(
&self,
config: &Config,
Expand All @@ -437,6 +470,7 @@ impl Command {
updater: U,
consensus: &Arc<dyn Consensus>,
max_block: Option<u64>,
continuous: bool,
) -> eyre::Result<Pipeline<Env<WriteMap>, U>>
where
H: HeaderDownloader + 'static,
Expand All @@ -453,24 +487,43 @@ impl Command {
}

let factory = reth_executor::Factory::new(self.chain.clone());

let default_stages = if continuous {
let continuous_headers =
HeaderStage::new(header_downloader, consensus.clone()).continuous();
let online_builder = OnlineStages::builder_with_headers(
continuous_headers,
consensus.clone(),
body_downloader,
);
DefaultStages::<H, B, U, reth_executor::Factory>::add_offline_stages(
online_builder,
updater.clone(),
factory.clone(),
)
} else {
DefaultStages::new(
consensus.clone(),
header_downloader,
body_downloader,
updater.clone(),
factory.clone(),
)
.builder()
};

let pipeline = builder
.with_sync_state_updater(updater.clone())
.with_sync_state_updater(updater)
.add_stages(
DefaultStages::new(
consensus.clone(),
header_downloader,
body_downloader,
updater,
factory.clone(),
)
.set(
TotalDifficultyStage::new(consensus.clone())
.with_commit_threshold(stage_conf.total_difficulty.commit_threshold),
)
.set(SenderRecoveryStage {
commit_threshold: stage_conf.sender_recovery.commit_threshold,
})
.set(ExecutionStage::new(factory, stage_conf.execution.commit_threshold)),
default_stages
.set(
TotalDifficultyStage::new(consensus.clone())
.with_commit_threshold(stage_conf.total_difficulty.commit_threshold),
)
.set(SenderRecoveryStage {
commit_threshold: stage_conf.sender_recovery.commit_threshold,
})
.set(ExecutionStage::new(factory, stage_conf.execution.commit_threshold)),
)
.build();

Expand Down
8 changes: 8 additions & 0 deletions crates/interfaces/src/p2p/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ pub enum DownloadError {
/// The hash of the expected tip
expected: H256,
},
/// Received a tip with an invalid tip number
#[error("Received invalid tip number: {received:?}. Expected {expected:?}.")]
InvalidTipNumber {
/// The block number of the received tip
received: u64,
/// The block number of the expected tip
expected: u64,
},
/// Received a response to a request with unexpected start block
#[error("Headers response starts at unexpected block: {received:?}. Expected {expected:?}.")]
HeadersResponseStartBlockMismatch {
Expand Down
11 changes: 7 additions & 4 deletions crates/interfaces/src/p2p/headers/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
p2p::error::{DownloadError, DownloadResult},
};
use futures::Stream;
use reth_primitives::{SealedHeader, H256};
use reth_primitives::{BlockHashOrNumber, SealedHeader, H256};

/// A downloader capable of fetching and yielding block headers.
///
Expand Down Expand Up @@ -48,6 +48,8 @@ pub enum SyncTarget {
/// The benefit of this variant is, that this already provides the block number of the highest
/// missing block.
Gap(SealedHeader),
/// This represents a tip by block number
TipNum(u64),
}

// === impl SyncTarget ===
Expand All @@ -57,10 +59,11 @@ impl SyncTarget {
///
/// This returns the hash if the target is [SyncTarget::Tip] or the `parent_hash` of the given
/// header in [SyncTarget::Gap]
pub fn tip(&self) -> H256 {
pub fn tip(&self) -> BlockHashOrNumber {
match self {
SyncTarget::Tip(tip) => *tip,
SyncTarget::Gap(gap) => gap.parent_hash,
SyncTarget::Tip(tip) => (*tip).into(),
SyncTarget::Gap(gap) => gap.parent_hash.into(),
SyncTarget::TipNum(num) => (*num).into(),
}
}
}
Expand Down
Loading

0 comments on commit 1711d80

Please sign in to comment.