Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sync): set max block if debug.tip provided #1522

Merged
merged 12 commits into from
Feb 23, 2023
1 change: 1 addition & 0 deletions bin/reth/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ pub mod runner;
pub mod stage;
pub mod test_eth_chain;
pub mod test_vectors;
pub mod utils;
63 changes: 55 additions & 8 deletions bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
dirs::{ConfigPath, DbPath, PlatformPath},
prometheus_exporter,
runner::CliContext,
utils::get_single_header,
};
use clap::{crate_version, Parser};
use eyre::Context;
Expand All @@ -31,10 +32,10 @@ use reth_interfaces::{
sync::SyncStateUpdater,
};
use reth_network::{
error::NetworkError, NetworkConfig, NetworkEvent, NetworkHandle, NetworkManager,
error::NetworkError, FetchClient, NetworkConfig, NetworkEvent, NetworkHandle, NetworkManager,
};
use reth_network_api::NetworkInfo;
use reth_primitives::{BlockNumber, ChainSpec, Head, H256};
use reth_primitives::{BlockHashOrNumber, BlockNumber, ChainSpec, Head, H256};
use reth_provider::{BlockProvider, HeaderProvider, ShareableDatabase};
use reth_rpc_engine_api::{EngineApi, EngineApiHandle};
use reth_staged_sync::{
Expand Down Expand Up @@ -205,9 +206,18 @@ impl Command {
task_executor: &TaskExecutor,
) -> eyre::Result<(Pipeline<Env<WriteMap>, impl SyncStateUpdater>, impl Stream<Item = NodeEvent>)>
{
// building network downloaders using the fetch client
let fetch_client = Arc::new(network.fetch_client().await?);
let fetch_client = network.fetch_client().await?;
let max_block = if let Some(block) = self.max_block {
Some(block)
} else if let Some(tip) = self.tip {
Some(self.fetch_or_lookup_tip(db.clone(), fetch_client.clone(), tip).await?)
} else {
None
};

// TODO: remove Arc requirement from downloader builders.
// building network downloaders using the fetch client
let fetch_client = Arc::new(fetch_client);
let header_downloader = ReverseHeadersDownloaderBuilder::from(config.stages.headers)
.build(fetch_client.clone(), consensus.clone())
.into_task_with(task_executor);
Expand All @@ -217,7 +227,14 @@ impl Command {
.into_task_with(task_executor);

let mut pipeline = self
.build_pipeline(config, header_downloader, body_downloader, network.clone(), consensus)
.build_pipeline(
config,
header_downloader,
body_downloader,
network.clone(),
consensus,
max_block,
)
.await?;

let events = stream_select(
Expand Down Expand Up @@ -316,7 +333,7 @@ impl Command {
Ok(handle)
}

fn fetch_head(&self, db: Arc<Env<WriteMap>>) -> Result<Head, reth_interfaces::db::Error> {
fn lookup_head(&self, db: Arc<Env<WriteMap>>) -> Result<Head, reth_interfaces::db::Error> {
db.view(|tx| {
let head = FINISH.get_progress(tx)?.unwrap_or_default();
let header = tx
Expand All @@ -339,13 +356,42 @@ impl Command {
.map_err(Into::into)
}

/// Attempt to look up the block number for the tip hash in the database.
/// If it doesn't exist, download the header and return the block number.
///
/// NOTE: The download is attempted with infinite retries.
async fn fetch_or_lookup_tip(
rkrasiuk marked this conversation as resolved.
Show resolved Hide resolved
&self,
db: Arc<Env<WriteMap>>,
fetch_client: FetchClient,
tip: H256,
) -> Result<u64, reth_interfaces::Error> {
if let Some(number) = db.view(|tx| tx.get::<tables::HeaderNumbers>(tip))?? {
debug!(target: "reth::cli", ?tip, number, "Successfully looked up tip in the database");
return Ok(number)
}

debug!(target: "reth::cli", ?tip, "Fetching tip header from the network.");
loop {
match get_single_header(fetch_client.clone(), BlockHashOrNumber::Hash(tip)).await {
Ok(tip_header) => {
debug!(target: "reth::cli", ?tip, number = tip_header.number, "Successfully fetched tip");
return Ok(tip_header.number)
}
Err(error) => {
error!(target: "reth::cli", %error, "Failed to fetch the tip. Retrying...");
}
}
}
Comment on lines +374 to +385
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to fetch this in the first place?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be able to instantiate the pipeline with the max_block parameter if debug.tip was provided. otherwise, the pipeline doesn't have a terminating condition

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah okay, understood, thanks for clarifying this in the docs.

Does this mean that even if we would continue after restarting in for example execution stage we need to wait for the first peer?

Copy link
Member Author

@rkrasiuk rkrasiuk Feb 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, because the tip was already stored in the db. First, I attempt the db lookup. If the tip is not there, then i fetch

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i should prob change the fn name from fetch_or_lookup_tip to lookup_or_fetch_tip to reflect the order of operations

}

fn load_network_config(
&self,
config: &Config,
db: Arc<Env<WriteMap>>,
executor: TaskExecutor,
) -> NetworkConfig<ShareableDatabase<Arc<Env<WriteMap>>>> {
let head = self.fetch_head(Arc::clone(&db)).expect("the head block is missing");
let head = self.lookup_head(Arc::clone(&db)).expect("the head block is missing");

self.network
.network_config(config, self.chain.clone())
Expand All @@ -361,6 +407,7 @@ impl Command {
body_downloader: B,
updater: U,
consensus: &Arc<dyn Consensus>,
max_block: Option<u64>,
) -> eyre::Result<Pipeline<Env<WriteMap>, U>>
where
H: HeaderDownloader + 'static,
Expand All @@ -371,7 +418,7 @@ impl Command {

let mut builder = Pipeline::builder();

if let Some(max_block) = self.max_block {
if let Some(max_block) = max_block {
debug!(target: "reth::cli", max_block, "Configuring builder to use max block");
builder = builder.with_max_block(max_block)
}
Expand Down
55 changes: 5 additions & 50 deletions bin/reth/src/p2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@
use crate::{
args::DiscoveryArgs,
dirs::{ConfigPath, PlatformPath},
utils::get_single_header,
};
use backon::{ConstantBuilder, Retryable};
use clap::{Parser, Subcommand};
use reth_db::mdbx::{Env, EnvKind, WriteMap};
use reth_discv4::NatResolver;
use reth_interfaces::p2p::{
bodies::client::BodiesClient,
headers::client::{HeadersClient, HeadersRequest},
};
use reth_network::FetchClient;
use reth_primitives::{BlockHashOrNumber, ChainSpec, NodeRecord, SealedHeader};
use reth_interfaces::p2p::bodies::client::BodiesClient;
use reth_primitives::{BlockHashOrNumber, ChainSpec, NodeRecord};
use reth_provider::ShareableDatabase;
use reth_staged_sync::{
utils::{chainspec::chain_spec_value_parser, hash_or_num_value_parser},
Expand Down Expand Up @@ -117,7 +114,7 @@ impl Command {

match self.command {
Subcommands::Header { id } => {
let header = (move || self.get_single_header(fetch_client.clone(), id))
let header = (move || get_single_header(fetch_client.clone(), id))
.retry(&backoff)
.notify(|err, _| println!("Error requesting header: {err}. Retrying..."))
.await?;
Expand All @@ -130,10 +127,7 @@ impl Command {
println!("Block number provided. Downloading header first...");
let client = fetch_client.clone();
let header = (move || {
self.get_single_header(
client.clone(),
BlockHashOrNumber::Number(number),
)
get_single_header(client.clone(), BlockHashOrNumber::Number(number))
})
.retry(&backoff)
.notify(|err, _| println!("Error requesting header: {err}. Retrying..."))
Expand Down Expand Up @@ -162,43 +156,4 @@ impl Command {

Ok(())
}

/// Get a single header from network
pub async fn get_single_header(
&self,
client: FetchClient,
id: BlockHashOrNumber,
) -> eyre::Result<SealedHeader> {
let request = HeadersRequest {
direction: reth_primitives::HeadersDirection::Rising,
limit: 1,
start: id,
};

let (_, response) = client.get_headers(request).await?.split();

if response.len() != 1 {
eyre::bail!(
"Invalid number of headers received. Expected: 1. Received: {}",
response.len()
)
}

let header = response.into_iter().next().unwrap().seal_slow();

let valid = match id {
BlockHashOrNumber::Hash(hash) => header.hash() == hash,
BlockHashOrNumber::Number(number) => header.number == number,
};

if !valid {
eyre::bail!(
"Received invalid header. Received: {:?}. Expected: {:?}",
header.num_hash(),
id
);
}

Ok(header)
}
}
43 changes: 43 additions & 0 deletions bin/reth/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//! Common CLI utility functions.

use reth_interfaces::p2p::{
download::DownloadClient,
headers::client::{HeadersClient, HeadersRequest},
priority::Priority,
};
use reth_network::FetchClient;
use reth_primitives::{BlockHashOrNumber, HeadersDirection, SealedHeader};

/// Get a single header from network
pub async fn get_single_header(
client: FetchClient,
id: BlockHashOrNumber,
) -> eyre::Result<SealedHeader> {
let request = HeadersRequest { direction: HeadersDirection::Rising, limit: 1, start: id };

let (peer_id, response) =
client.get_headers_with_priority(request, Priority::High).await?.split();

if response.len() != 1 {
client.report_bad_message(peer_id);
eyre::bail!("Invalid number of headers received. Expected: 1. Received: {}", response.len())
}

let header = response.into_iter().next().unwrap().seal_slow();

let valid = match id {
BlockHashOrNumber::Hash(hash) => header.hash() == hash,
BlockHashOrNumber::Number(number) => header.number == number,
};

if !valid {
client.report_bad_message(peer_id);
eyre::bail!(
"Received invalid header. Received: {:?}. Expected: {:?}",
header.num_hash(),
id
);
}

Ok(header)
}
7 changes: 5 additions & 2 deletions crates/stages/src/pipeline/ctrl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ pub(crate) enum ControlFlow {
/// The progress of the last stage
progress: BlockNumber,
},
NoProgress,
NoProgress {
/// The current stage progress.
stage_progress: Option<BlockNumber>,
},
}

impl ControlFlow {
pub(crate) fn should_continue(&self) -> bool {
matches!(self, ControlFlow::Continue { .. } | ControlFlow::NoProgress)
matches!(self, ControlFlow::Continue { .. } | ControlFlow::NoProgress { .. })
}
}
27 changes: 17 additions & 10 deletions crates/stages/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,13 @@ impl<DB: Database, U: SyncStateUpdater> Pipeline<DB, U> {
.zip(self.max_block)
.map_or(false, |(progress, target)| progress >= target)
{
trace!(
target: "sync::pipeline",
?next_action,
minimum_progress = ?self.progress.minimum_progress,
max_block = ?self.max_block,
"Terminating pipeline."
);
return Ok(())
}
}
Expand All @@ -168,18 +175,18 @@ impl<DB: Database, U: SyncStateUpdater> Pipeline<DB, U> {
updater.update_sync_state(state);
}

trace!(
target: "sync::pipeline",
stage = %stage_id,
"Executing stage"
);
trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage");
let next = self
.execute_stage_to_completion(db, previous_stage, stage_index)
.instrument(info_span!("execute", stage = %stage_id))
.await?;

match next {
ControlFlow::NoProgress => {} // noop
ControlFlow::NoProgress { stage_progress } => {
if let Some(progress) = stage_progress {
self.progress.update(progress);
}
}
ControlFlow::Continue { progress } => self.progress.update(progress),
ControlFlow::Unwind { target, bad_block } => {
// reset the sync state
Expand Down Expand Up @@ -277,7 +284,7 @@ impl<DB: Database, U: SyncStateUpdater> Pipeline<DB, U> {
self.listeners.notify(PipelineEvent::Skipped { stage_id });

// We reached the maximum block, so we skip the stage
return Ok(ControlFlow::NoProgress)
return Ok(ControlFlow::NoProgress { stage_progress: prev_progress })
}

self.listeners
Expand Down Expand Up @@ -308,7 +315,7 @@ impl<DB: Database, U: SyncStateUpdater> Pipeline<DB, U> {
return Ok(if made_progress {
ControlFlow::Continue { progress: stage_progress }
} else {
ControlFlow::NoProgress
ControlFlow::NoProgress { stage_progress: Some(stage_progress) }
})
}
}
Expand Down Expand Up @@ -405,7 +412,7 @@ impl PipelineProgress {
fn next_ctrl(&self) -> ControlFlow {
match self.progress {
Some(progress) => ControlFlow::Continue { progress },
None => ControlFlow::NoProgress,
None => ControlFlow::NoProgress { stage_progress: None },
}
}
}
Expand Down Expand Up @@ -458,7 +465,7 @@ mod tests {
fn progress_ctrl_flow() {
let mut progress = PipelineProgress::default();

assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress);
assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress { stage_progress: None });

progress.update(1);
assert_eq!(progress.next_ctrl(), ControlFlow::Continue { progress: 1 });
Expand Down