Skip to content

Commit

Permalink
Rename downloaders (#1108)
Browse files Browse the repository at this point in the history
Co-authored-by: Georgios Konstantopoulos <[email protected]>
  • Loading branch information
leruaa and gakonst authored Feb 1, 2023
1 parent ae771d2 commit 0149bde
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 71 deletions.
4 changes: 2 additions & 2 deletions bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl Command {
) -> reth_downloaders::headers::task::TaskDownloader {
let headers_conf = &config.stages.headers;
headers::task::TaskDownloader::spawn(
headers::linear::LinearDownloadBuilder::default()
headers::reverse_headers::ReverseHeadersDownloaderBuilder::default()
.request_limit(headers_conf.downloader_batch_size)
.stream_batch_size(headers_conf.commit_threshold as usize)
.build(consensus.clone(), fetch_client.clone()),
Expand All @@ -253,7 +253,7 @@ impl Command {
) -> reth_downloaders::bodies::task::TaskDownloader {
let bodies_conf = &config.stages.bodies;
bodies::task::TaskDownloader::spawn(
bodies::concurrent::ConcurrentDownloaderBuilder::default()
bodies::bodies::BodiesDownloaderBuilder::default()
.with_stream_batch_size(bodies_conf.downloader_stream_batch_size)
.with_request_limit(bodies_conf.downloader_request_limit)
.with_max_buffered_responses(bodies_conf.downloader_max_buffered_responses)
Expand Down
4 changes: 2 additions & 2 deletions bin/reth/src/stage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
NetworkOpts,
};
use reth_consensus::beacon::BeaconConsensus;
use reth_downloaders::bodies::concurrent::ConcurrentDownloaderBuilder;
use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder;

use reth_net_nat::NatResolver;
use reth_primitives::ChainSpec;
Expand Down Expand Up @@ -150,7 +150,7 @@ impl Command {
let fetch_client = Arc::new(network.fetch_client().await?);

let mut stage = BodyStage {
downloader: ConcurrentDownloaderBuilder::default()
downloader: BodiesDownloaderBuilder::default()
.with_stream_batch_size(num_blocks as usize)
.with_request_limit(config.stages.bodies.downloader_request_limit)
.with_max_buffered_responses(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub const BODIES_DOWNLOADER_SCOPE: &str = "downloaders.bodies";
/// All blocks in a batch are fetched at the same time.
#[must_use = "Stream does nothing unless polled"]
#[derive(Debug)]
pub struct ConcurrentDownloader<B: BodiesClient, DB> {
pub struct BodiesDownloader<B: BodiesClient, DB> {
/// The bodies client
client: Arc<B>,
/// The consensus client
Expand Down Expand Up @@ -73,7 +73,7 @@ pub struct ConcurrentDownloader<B: BodiesClient, DB> {
metrics: DownloaderMetrics,
}

impl<B, DB> ConcurrentDownloader<B, DB>
impl<B, DB> BodiesDownloader<B, DB>
where
B: BodiesClient + 'static,
DB: Database,
Expand Down Expand Up @@ -241,7 +241,7 @@ where
}
}

impl<B, DB> BodyDownloader for ConcurrentDownloader<B, DB>
impl<B, DB> BodyDownloader for BodiesDownloader<B, DB>
where
B: BodiesClient + 'static,
DB: Database,
Expand Down Expand Up @@ -331,7 +331,7 @@ where
}
}

impl<B, DB> Stream for ConcurrentDownloader<B, DB>
impl<B, DB> Stream for BodiesDownloader<B, DB>
where
B: BodiesClient + 'static,
DB: Database,
Expand Down Expand Up @@ -452,8 +452,8 @@ impl Ord for OrderedBodiesResponse {
}
}

/// Builder for [ConcurrentDownloader].
pub struct ConcurrentDownloaderBuilder {
/// Builder for [BodiesDownloader].
pub struct BodiesDownloaderBuilder {
/// The batch size of non-empty blocks per one request
request_limit: u64,
/// The maximum number of block bodies returned at once from the stream
Expand All @@ -464,7 +464,7 @@ pub struct ConcurrentDownloaderBuilder {
concurrent_requests_range: RangeInclusive<usize>,
}

impl Default for ConcurrentDownloaderBuilder {
impl Default for BodiesDownloaderBuilder {
fn default() -> Self {
Self {
request_limit: 200,
Expand All @@ -475,7 +475,7 @@ impl Default for ConcurrentDownloaderBuilder {
}
}

impl ConcurrentDownloaderBuilder {
impl BodiesDownloaderBuilder {
/// Set request batch size on the downloader.
pub fn with_request_limit(mut self, request_limit: u64) -> Self {
self.request_limit = request_limit;
Expand Down Expand Up @@ -509,7 +509,7 @@ impl ConcurrentDownloaderBuilder {
client: Arc<B>,
consensus: Arc<dyn Consensus>,
db: Arc<DB>,
) -> ConcurrentDownloader<B, DB>
) -> BodiesDownloader<B, DB>
where
B: BodiesClient + 'static,
DB: Database,
Expand All @@ -522,7 +522,7 @@ impl ConcurrentDownloaderBuilder {
} = self;
let metrics = DownloaderMetrics::new(BODIES_DOWNLOADER_SCOPE);
let in_progress_queue = BodiesRequestQueue::new(metrics.clone());
ConcurrentDownloader {
BodiesDownloader {
client,
consensus,
db,
Expand Down Expand Up @@ -566,7 +566,7 @@ mod tests {
let client = Arc::new(
TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true),
);
let mut downloader = ConcurrentDownloaderBuilder::default().build(
let mut downloader = BodiesDownloaderBuilder::default().build(
client.clone(),
Arc::new(TestConsensus::default()),
db,
Expand Down Expand Up @@ -595,7 +595,7 @@ mod tests {
let client = Arc::new(
TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true),
);
let mut downloader = ConcurrentDownloaderBuilder::default()
let mut downloader = BodiesDownloaderBuilder::default()
.with_stream_batch_size(stream_batch_size)
.with_request_limit(request_limit)
.build(client.clone(), Arc::new(TestConsensus::default()), db);
Expand Down Expand Up @@ -631,9 +631,11 @@ mod tests {
insert_headers(&db, &headers);

let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone()));
let mut downloader = ConcurrentDownloaderBuilder::default()
.with_stream_batch_size(100)
.build(client.clone(), Arc::new(TestConsensus::default()), db);
let mut downloader = BodiesDownloaderBuilder::default().with_stream_batch_size(100).build(
client.clone(),
Arc::new(TestConsensus::default()),
db,
);

// Set and download the first range
downloader.set_download_range(0..100).expect("failed to set download range");
Expand Down
3 changes: 2 additions & 1 deletion crates/net/downloaders/src/bodies/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/// A naive concurrent downloader.
pub mod concurrent;
#[allow(clippy::module_inception)]
pub mod bodies;

/// TODO:
pub mod task;
Expand Down
10 changes: 5 additions & 5 deletions crates/net/downloaders/src/bodies/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ impl TaskDownloader {
///
/// ```
/// use std::sync::Arc;
/// use reth_downloaders::bodies::concurrent::ConcurrentDownloaderBuilder;
/// use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder;
/// use reth_downloaders::bodies::task::TaskDownloader;
/// use reth_interfaces::consensus::Consensus;
/// use reth_interfaces::p2p::bodies::client::BodiesClient;
/// use reth_db::database::Database;
/// fn t<B: BodiesClient + 'static, DB: Database + 'static>(client: Arc<B>, consensus:Arc<dyn Consensus>, db: Arc<DB>) {
/// let downloader = ConcurrentDownloaderBuilder::default().build(
/// let downloader = BodiesDownloaderBuilder::default().build(
/// client,
/// consensus,
/// db
Expand Down Expand Up @@ -133,7 +133,7 @@ mod tests {
use super::*;
use crate::{
bodies::{
concurrent::ConcurrentDownloaderBuilder,
bodies::BodiesDownloaderBuilder,
test_utils::{insert_headers, zip_blocks},
},
test_utils::{generate_bodies, TestBodiesClient},
Expand All @@ -155,7 +155,7 @@ mod tests {
let client = Arc::new(
TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true),
);
let downloader = ConcurrentDownloaderBuilder::default().build(
let downloader = BodiesDownloaderBuilder::default().build(
client.clone(),
Arc::new(TestConsensus::default()),
db,
Expand Down Expand Up @@ -184,7 +184,7 @@ mod tests {
let client = Arc::new(
TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true),
);
let downloader = ConcurrentDownloaderBuilder::default().build(
let downloader = BodiesDownloaderBuilder::default().build(
client.clone(),
Arc::new(TestConsensus::default()),
db,
Expand Down
2 changes: 1 addition & 1 deletion crates/net/downloaders/src/headers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/// A Linear downloader implementation.
pub mod linear;
pub mod reverse_headers;

/// A downloader implementation that spawns a downloader to a task
pub mod task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub const HEADERS_DOWNLOADER_SCOPE: &str = "downloaders.headers";
/// the batches of headers that this downloader yields will start at the chain tip and move towards
/// the local head: falling block numbers.
#[must_use = "Stream does nothing unless polled"]
pub struct LinearDownloader<H: HeadersClient> {
pub struct ReverseHeadersDownloader<H: HeadersClient> {
/// Consensus client used to validate headers
consensus: Arc<dyn Consensus>,
/// Client used to download headers.
Expand Down Expand Up @@ -86,15 +86,15 @@ pub struct LinearDownloader<H: HeadersClient> {
metrics: DownloaderMetrics,
}

// === impl LinearDownloader ===
// === impl ReverseHeadersDownloader ===

impl<H> LinearDownloader<H>
impl<H> ReverseHeadersDownloader<H>
where
H: HeadersClient + 'static,
{
/// Convenience method to create a [LinearDownloadBuilder] without importing it
pub fn builder() -> LinearDownloadBuilder {
LinearDownloadBuilder::default()
/// Convenience method to create a [ReverseHeadersDownloaderBuilder] without importing it
pub fn builder() -> ReverseHeadersDownloaderBuilder {
ReverseHeadersDownloaderBuilder::default()
}

/// Returns the block number the local node is at.
Expand Down Expand Up @@ -501,7 +501,7 @@ where
}
}

impl<H> HeaderDownloader for LinearDownloader<H>
impl<H> HeaderDownloader for ReverseHeadersDownloader<H>
where
H: HeadersClient + 'static,
{
Expand Down Expand Up @@ -577,7 +577,7 @@ where
}
}

impl<H> Stream for LinearDownloader<H>
impl<H> Stream for ReverseHeadersDownloader<H>
where
H: HeadersClient + 'static,
{
Expand Down Expand Up @@ -793,10 +793,10 @@ impl SyncTargetBlock {
}
}

/// The builder for [LinearDownloader] with
/// The builder for [ReverseHeadersDownloader] with
/// some default settings
#[derive(Debug)]
pub struct LinearDownloadBuilder {
pub struct ReverseHeadersDownloaderBuilder {
/// The batch size per one request
request_limit: u64,
/// Batch size for headers
Expand All @@ -809,7 +809,7 @@ pub struct LinearDownloadBuilder {
max_buffered_responses: usize,
}

impl Default for LinearDownloadBuilder {
impl Default for ReverseHeadersDownloaderBuilder {
fn default() -> Self {
Self {
request_limit: 1_000,
Expand All @@ -821,7 +821,7 @@ impl Default for LinearDownloadBuilder {
}
}

impl LinearDownloadBuilder {
impl ReverseHeadersDownloaderBuilder {
/// Set the request batch size.
///
/// This determines the `limit` for a [GetHeaders](reth_eth_wire::GetBlockHeaders) requests, the
Expand All @@ -833,17 +833,18 @@ impl LinearDownloadBuilder {

/// Set the stream batch size
///
/// This determines the number of headers the [LinearDownloader] will yield on `Stream::next`.
/// This will be the amount of headers the headers stage will commit at a time.
/// This determines the number of headers the [ReverseHeadersDownloader] will yield on
/// `Stream::next`. This will be the amount of headers the headers stage will commit at a
/// time.
pub fn stream_batch_size(mut self, size: usize) -> Self {
self.stream_batch_size = size;
self
}

/// Set the min amount of concurrent requests.
///
/// If there's capacity the [LinearDownloader] will keep at least this many requests active at a
/// time.
/// If there's capacity the [ReverseHeadersDownloader] will keep at least this many requests
/// active at a time.
pub fn min_concurrent_requests(mut self, min_concurrent_requests: usize) -> Self {
self.min_concurrent_requests = min_concurrent_requests;
self
Expand All @@ -861,16 +862,20 @@ impl LinearDownloadBuilder {
///
/// This essentially determines how much memory the downloader can use for buffering responses
/// that arrive out of order. The total number of buffered headers is `request_limit *
/// max_buffered_responses`. If the [LinearDownloader]'s buffered responses exceeds this
/// max_buffered_responses`. If the [ReverseHeadersDownloader]'s buffered responses exceeds this
/// threshold it waits until there's capacity again before sending new requests.
pub fn max_buffered_responses(mut self, max_buffered_responses: usize) -> Self {
self.max_buffered_responses = max_buffered_responses;
self
}

/// Build [LinearDownloader] with provided consensus
/// Build [ReverseHeadersDownloader] with provided consensus
/// and header client implementations
pub fn build<H>(self, consensus: Arc<dyn Consensus>, client: Arc<H>) -> LinearDownloader<H>
pub fn build<H>(
self,
consensus: Arc<dyn Consensus>,
client: Arc<H>,
) -> ReverseHeadersDownloader<H>
where
H: HeadersClient + 'static,
{
Expand All @@ -881,7 +886,7 @@ impl LinearDownloadBuilder {
max_concurrent_requests,
max_buffered_responses,
} = self;
LinearDownloader {
ReverseHeadersDownloader {
consensus,
client,
local_head: None,
Expand Down Expand Up @@ -940,7 +945,7 @@ mod tests {

let genesis = SealedHeader::default();

let mut downloader = LinearDownloadBuilder::default()
let mut downloader = ReverseHeadersDownloaderBuilder::default()
.build(Arc::new(TestConsensus::default()), Arc::clone(&client));
downloader.update_local_head(genesis);
downloader.update_sync_target(SyncTarget::Tip(H256::random()));
Expand Down Expand Up @@ -968,7 +973,7 @@ mod tests {

let header = SealedHeader::default();

let mut downloader = LinearDownloadBuilder::default()
let mut downloader = ReverseHeadersDownloaderBuilder::default()
.build(Arc::new(TestConsensus::default()), Arc::clone(&client));
downloader.update_local_head(header.clone());
downloader.update_sync_target(SyncTarget::Tip(H256::random()));
Expand Down Expand Up @@ -1008,7 +1013,7 @@ mod tests {

let batch_size = 99;
let start = 1000;
let mut downloader = LinearDownloadBuilder::default()
let mut downloader = ReverseHeadersDownloaderBuilder::default()
.request_limit(batch_size)
.build(Arc::new(TestConsensus::default()), Arc::clone(&client));
downloader.update_local_head(genesis);
Expand Down Expand Up @@ -1057,7 +1062,7 @@ mod tests {
let p1 = child_header(&p2);
let p0 = child_header(&p1);

let mut downloader = LinearDownloadBuilder::default()
let mut downloader = ReverseHeadersDownloaderBuilder::default()
.stream_batch_size(3)
.request_limit(3)
.build(Arc::new(TestConsensus::default()), Arc::clone(&client));
Expand Down Expand Up @@ -1089,7 +1094,7 @@ mod tests {
let p0 = child_header(&p1);

let client = Arc::new(TestHeadersClient::default());
let mut downloader = LinearDownloadBuilder::default()
let mut downloader = ReverseHeadersDownloaderBuilder::default()
.stream_batch_size(1)
.request_limit(1)
.build(Arc::new(TestConsensus::default()), Arc::clone(&client));
Expand Down
Loading

0 comments on commit 0149bde

Please sign in to comment.