Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Single buffer block sync #1318

Merged
merged 67 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
26906b6
Initial benchmark skeleton
bvrooman Aug 3, 2023
9ace98a
Update
bvrooman Aug 3, 2023
8baf652
Simplify
bvrooman Aug 4, 2023
5f3fb57
benches import lib
bvrooman Aug 4, 2023
3b349ef
Update import.rs
bvrooman Aug 4, 2023
1982867
Update
bvrooman Aug 4, 2023
18453ba
test v3
bvrooman Aug 5, 2023
4d37567
Version tests
bvrooman Aug 7, 2023
a35e94a
v4
bvrooman Aug 11, 2023
7587ce2
Update import.rs
bvrooman Aug 11, 2023
a5a3b82
Update import.rs
bvrooman Aug 11, 2023
4970937
Merge branch 'master' into bvrooman/test/sync-refactor-benchmarks-v2
bvrooman Aug 22, 2023
a9a0acb
Merge in progress
bvrooman Aug 22, 2023
55180f7
Merge branch 'master' into bvrooman/test/sync-refactor-benchmarks-v2
Aug 22, 2023
e4ac401
Merge branch 'bvrooman/test/sync-refactor-benchmarks-v2' of https://g…
bvrooman Aug 22, 2023
353faf2
Remove deprecated import version
bvrooman Aug 22, 2023
3dfebe1
Rearrange
bvrooman Aug 22, 2023
ce9bd5f
Fix whitespace
bvrooman Aug 23, 2023
1ae5b9f
Remove unused version tests
bvrooman Aug 23, 2023
5a99b6e
Minor refactor
bvrooman Aug 23, 2023
19b973a
Update import.rs
bvrooman Aug 23, 2023
86a5d01
Separate test helpers
bvrooman Aug 23, 2023
7657b0b
Update import and tests
bvrooman Aug 23, 2023
3ef35e3
Add more benches
bvrooman Aug 23, 2023
4713f16
Merge branch 'master' into bvrooman/test/sync-refactor-benchmarks-v2
Aug 23, 2023
a15b008
Update Cargo.toml
bvrooman Aug 23, 2023
1e0770b
Merge branch 'bvrooman/test/sync-refactor-benchmarks-v2' of https://g…
bvrooman Aug 23, 2023
09b0054
Update CHANGELOG.md
bvrooman Aug 23, 2023
ec8c01d
remove call to blackbox
bvrooman Aug 23, 2023
49a645b
Update placeholder comments
bvrooman Aug 23, 2023
30739e7
Update import.rs
bvrooman Aug 23, 2023
85d7425
Update CHANGELOG.md
Aug 23, 2023
6520896
Flatten stream of stream of headers
bvrooman Aug 23, 2023
f98c9ce
Revert
bvrooman Aug 23, 2023
76ca407
Revert
bvrooman Aug 23, 2023
94598de
Merge branch 'bvrooman/test/sync-refactor-benchmarks-v2' into bvrooma…
bvrooman Aug 23, 2023
b89db7d
Update CHANGELOG.md
bvrooman Aug 23, 2023
2f5a3f8
Merge branch 'bvrooman/test/sync-refactor-benchmarks-v2' into bvrooma…
bvrooman Aug 23, 2023
fb6ce66
Typo
bvrooman Aug 23, 2023
672012f
Merge branch 'bvrooman/test/sync-refactor-benchmarks-v2' into bvrooma…
bvrooman Aug 23, 2023
eb32da9
Fix uses
bvrooman Aug 23, 2023
9533cc6
Minor refactor
bvrooman Aug 23, 2023
0136cc8
Minor refactor
bvrooman Aug 23, 2023
769bf6e
Update tests
bvrooman Aug 23, 2023
51f74d2
Fix tests
bvrooman Aug 23, 2023
f2a37b4
Update CHANGELOG.md
bvrooman Aug 24, 2023
76361cb
Merge branch 'master' into bvrooman/chore/single-buffer-block-sync
Aug 24, 2023
0cbcaa1
Rename get_block_for_header to get_sealed_blocks
bvrooman Aug 24, 2023
778077a
Merge branch 'bvrooman/chore/single-buffer-block-sync' of https://git…
bvrooman Aug 24, 2023
11357e6
Update the benchmark
xgreenx Aug 24, 2023
6838932
Remove max_header_batch_requests
bvrooman Aug 24, 2023
8473229
Merge branch 'master' into bvrooman/chore/single-buffer-block-sync
Aug 24, 2023
8fdd12d
Update import.rs
bvrooman Aug 24, 2023
4eb5b51
Merge branch 'bvrooman/chore/single-buffer-block-sync' of https://git…
bvrooman Aug 24, 2023
574e993
Update naming and CLI arg
bvrooman Aug 24, 2023
2429ee0
Replace max_get_txns with block_stream_size in configs
bvrooman Aug 24, 2023
e355042
Update benchmarks
bvrooman Aug 24, 2023
1f9c36e
Update CHANGELOG.md
bvrooman Aug 25, 2023
425bb8c
Remove Nones from header stream
bvrooman Aug 25, 2023
839725a
Graceful shutdown using mpsc blocking
bvrooman Aug 25, 2023
ab86da7
Clean up
bvrooman Aug 25, 2023
5d9966e
Update naming
bvrooman Aug 25, 2023
6616d86
Remove unnecessary `move`
bvrooman Aug 25, 2023
4c327d9
Simplify
bvrooman Aug 25, 2023
1af52db
Clippy
bvrooman Aug 25, 2023
a046d6d
Update import.rs
bvrooman Aug 26, 2023
f137f62
Update crates/services/sync/src/import.rs
Aug 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ Description of the upcoming release here.

### Changed

- [#1318](https://github.com/FuelLabs/fuel-core/pull/1318): Modified block synchronization to use asynchronous task execution when retrieving block headers.
- [#1314](https://github.com/FuelLabs/fuel-core/pull/1314): Removed `types::ConsensusParameters` in favour of `fuel_tx:ConsensusParameters`.
- [#1302](https://github.com/FuelLabs/fuel-core/pull/1302): Removed the usage of flake and building of the bridge contract ABI.
It simplifies the maintenance and updating of the events, requiring only putting the event definition into the codebase of the relayer.
- [#1293](https://github.com/FuelLabs/fuel-core/issues/1293): Parallelized the `estimate_predicates` endpoint to utilize all available threads.
- [#1270](https://github.com/FuelLabs/fuel-core/pull/1270): Modify the way block headers are retrieved from peers to be done in batches.

#### Breaking
- [#1318](https://github.com/FuelLabs/fuel-core/pull/1318): Removed the `--sync-max-header-batch-requests` CLI argument, and renamed `--sync-max-get-txns` to `--sync-block-stream-buffer-size` to better represent the current behavior in the import.
- [#1279](https://github.com/FuelLabs/fuel-core/pull/1279): Added a new CLI flag to enable the Relayer service `--enable-relayer`, and disabled the Relayer service by default. When supplying the `--enable-relayer` flag, the `--relayer` argument becomes mandatory, and omitting it is an error. Similarly, providing a `--relayer` argument without the `--enable-relayer` flag is an error. Lastly, providing the `--keypair` or `--network` arguments will also produce an error if the `--enable-p2p` flag is not set.
- [#1262](https://github.com/FuelLabs/fuel-core/pull/1262): The `ConsensusParameters` aggregates all configuration data related to the consensus. It contains many fields that are segregated by the usage. The API of some functions was affected to use lesser types instead the whole `ConsensusParameters`. It is a huge breaking change requiring repetitively monotonically updating all places that use the `ConsensusParameters`. But during updating, consider that maybe you can use lesser types. Usage of them may simplify signatures of methods and make them more user-friendly and transparent.
- [#1290](https://github.com/FuelLabs/fuel-core/pull/1290): Standardize CLI args to use `-` instead of `_`
Expand Down
47 changes: 22 additions & 25 deletions benches/benches/import.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use criterion::{
criterion_group,
criterion_main,
measurement::WallTime,
BenchmarkGroup,
Criterion,
};
use fuel_core_benches::import::{
Expand All @@ -23,26 +21,28 @@ async fn execute_import(import: PressureImport, shutdown: &mut StateWatcher) {
import.import(shutdown).await.unwrap();
}

fn name(n: u32, durations: Durations, buffer_size: usize) -> String {
fn name(n: u32, durations: Durations, batch_size: u32, buffer_size: usize) -> String {
format!(
"import {n} * {d_h}/{d_c}/{d_t}/{d_e} - {sz}",
"import {n} * {d_h}/{d_c}/{d_t}/{d_e} - {bas}/{bus}",
n = n,
d_h = durations.headers.as_millis(),
d_c = durations.consensus.as_millis(),
d_t = durations.transactions.as_millis(),
d_e = durations.executes.as_millis(),
sz = buffer_size
bas = batch_size,
bus = buffer_size
)
}

fn bench_imports(c: &mut Criterion) {
let bench_import = |group: &mut BenchmarkGroup<WallTime>,
let bench_import = |c: &mut Criterion,
n: u32,
durations: Durations,
batch_size: u32,
buffer_size: usize| {
let name = name(n, durations, buffer_size);
group.bench_function(name, move |b| {
let name = name(n, durations, batch_size, buffer_size);
let mut group = c.benchmark_group(format!("import {}", name));
group.bench_function("bench", move |b| {
let rt = Runtime::new().unwrap();
b.to_async(&rt).iter_custom(|iters| async move {
let mut elapsed_time = Duration::default();
Expand All @@ -56,7 +56,6 @@ fn bench_imports(c: &mut Criterion) {
durations,
batch_size,
buffer_size,
buffer_size,
);
import.notify_one();
let start = std::time::Instant::now();
Expand All @@ -68,33 +67,31 @@ fn bench_imports(c: &mut Criterion) {
});
};

let mut group = c.benchmark_group("import");

let n = 100;
let durations = Durations {
headers: Duration::from_millis(5),
consensus: Duration::from_millis(5),
transactions: Duration::from_millis(5),
executes: Duration::from_millis(10),
headers: Duration::from_millis(10),
consensus: Duration::from_millis(10),
transactions: Duration::from_millis(10),
executes: Duration::from_millis(5),
};

// Header batch size = 10, header/txn buffer size = 10
bench_import(&mut group, n, durations, 10, 10);
bench_import(c, n, durations, 10, 10);

// Header batch size = 20, header/txn buffer size = 10
bench_import(&mut group, n, durations, 20, 10);
// Header batch size = 10, header/txn buffer size = 25
bench_import(c, n, durations, 10, 25);

// Header batch size = 50, header/txn buffer size = 10
bench_import(&mut group, n, durations, 20, 10);
// Header batch size = 10, header/txn buffer size = 50
bench_import(c, n, durations, 10, 50);

// Header batch size = 10, header/txn buffer size = 20
bench_import(&mut group, n, durations, 10, 20);
// Header batch size = 25, header/txn buffer size = 10
bench_import(c, n, durations, 25, 10);

// Header batch size = 10, header/txn buffer size = 50
bench_import(&mut group, n, durations, 10, 50);
// Header batch size = 50, header/txn buffer size = 10
bench_import(c, n, durations, 50, 10);

// Header batch size = 50, header/txn buffer size = 50
bench_import(&mut group, n, durations, 10, 20);
bench_import(c, n, durations, 50, 50);
}

criterion_group!(benches, bench_imports);
Expand Down
6 changes: 2 additions & 4 deletions benches/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,16 @@ pub fn provision_import_test(
shared_state: SharedMutex<State>,
input: Durations,
header_batch_size: u32,
max_header_batch_requests: usize,
max_get_txns_requests: usize,
block_stream_buffer_size: usize,
) -> (
PressureImport,
Sender<fuel_core_services::State>,
StateWatcher,
) {
let shared_notify = Arc::new(Notify::new());
let params = Config {
max_header_batch_requests,
header_batch_size,
max_get_txns_requests,
block_stream_buffer_size,
};
let p2p = Arc::new(PressurePeerToPeer::new(
shared_count.clone(),
Expand Down
10 changes: 3 additions & 7 deletions bin/fuel-core/src/cli/run/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,11 @@ pub struct P2PArgs {
#[derive(Debug, Clone, Args)]
pub struct SyncArgs {
/// The maximum number of get transaction requests to make in a single batch.
#[clap(long = "sync-max-get-txns", default_value = "10", env)]
pub max_get_txns_requests: usize,
#[clap(long = "sync-block-stream-buffer-size", default_value = "10", env)]
pub block_stream_buffer_size: usize,
/// The maximum number of headers to request in a single batch.
#[clap(long = "sync-header-batch-size", default_value = "10", env)]
pub header_batch_size: u32,
/// The maximum number of header batch requests to have active at one time.
#[clap(long = "sync-max-header-batch-requests", default_value = "10", env)]
pub max_header_batch_requests: usize,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -218,9 +215,8 @@ impl KeypairArg {
impl From<SyncArgs> for fuel_core::sync::Config {
fn from(value: SyncArgs) -> Self {
Self {
max_get_txns_requests: value.max_get_txns_requests,
block_stream_buffer_size: value.block_stream_buffer_size,
header_batch_size: value.header_batch_size,
max_header_batch_requests: value.max_header_batch_requests,
}
}
}
Expand Down
159 changes: 100 additions & 59 deletions crates/services/sync/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
//! importing blocks from the network into the local blockchain.

use std::{
future::Future,
iter,
ops::RangeInclusive,
sync::Arc,
};

use anyhow::anyhow;
use fuel_core_services::{
SharedMutex,
StateWatcher,
Expand All @@ -23,6 +26,7 @@ use fuel_core_types::{
};
use futures::{
stream::StreamExt,
FutureExt,
Stream,
};
use tokio::sync::Notify;
Expand Down Expand Up @@ -56,19 +60,16 @@ mod back_pressure_tests;
/// Parameters for the import task.
pub struct Config {
/// The maximum number of get transaction requests to make in a single batch.
pub max_get_txns_requests: usize,
pub block_stream_buffer_size: usize,
/// The maximum number of headers to request in a single batch.
pub header_batch_size: u32,
/// The maximum number of header batch requests to have active at one time.
pub max_header_batch_requests: usize,
}

impl Default for Config {
fn default() -> Self {
Self {
max_get_txns_requests: 10,
block_stream_buffer_size: 10,
header_batch_size: 100,
max_header_batch_requests: 10,
}
}
}
Expand Down Expand Up @@ -174,18 +175,30 @@ where
..
} = &self;

get_headers_buffered(range.clone(), params, p2p.clone())
.map({
let p2p = p2p.clone();
let consensus_port = consensus.clone();
move |result| {
Self::get_block_for_header(result, p2p.clone(), consensus_port.clone())
}
.instrument(tracing::debug_span!("consensus_and_transactions"))
.in_current_span()
let shutdown_signal = shutdown.clone();
let (shutdown_guard, mut shutdown_guard_recv) =
tokio::sync::mpsc::channel::<()>(1);
let block_stream =
get_block_stream(range.clone(), params, p2p.clone(), consensus.clone());
let result = block_stream
.map(move |stream_block_batch| {
let shutdown_guard = shutdown_guard.clone();
let shutdown_signal = shutdown_signal.clone();
tokio::spawn(async move {
// Hold a shutdown sender for the lifetime of the spawned task
let _shutdown_guard = shutdown_guard.clone();
let mut shutdown_signal = shutdown_signal.clone();
tokio::select! {
// Stream a batch of blocks
blocks = stream_block_batch => blocks,
// If a shutdown signal is received during the stream, terminate early and
// return an empty response
_ = shutdown_signal.while_started() => Ok(None)
}
}).then(|task| async { task.map_err(|e| anyhow!(e))? })
})
// Request up to `max_get_txns_requests` transactions from the network.
.buffered(params.max_get_txns_requests)
// Request up to `block_stream_buffer_size` transactions from the network.
.buffered(params.block_stream_buffer_size)
// Continue the stream unless an error or none occurs.
// Note the error will be returned but the stream will close.
.into_scan_none_or_err()
Expand Down Expand Up @@ -230,66 +243,58 @@ where
}
})
.in_current_span()
.await
.await;

// Wait for any spawned tasks to shutdown
xgreenx marked this conversation as resolved.
Show resolved Hide resolved
let _ = shutdown_guard_recv.recv().await;
result
}
}

async fn get_block_for_header(
bvrooman marked this conversation as resolved.
Show resolved Hide resolved
result: anyhow::Result<SourcePeer<SealedBlockHeader>>,
p2p: Arc<P>,
consensus_port: Arc<C>,
) -> anyhow::Result<Option<SealedBlock>> {
let header = match result {
Ok(h) => h,
Err(e) => return Err(e),
};
let SourcePeer {
peer_id,
data: header,
} = header;
let id = header.entity.id();
let block_id = SourcePeer { peer_id, data: id };

// Check the consensus is valid on this header.
if !consensus_port
.check_sealed_header(&header)
.trace_err("Failed to check consensus on header")?
{
tracing::warn!("Header {:?} failed consensus check", header);
return Ok(None)
fn get_block_stream<
P: PeerToPeerPort + Send + Sync + 'static,
C: ConsensusPort + Send + Sync + 'static,
>(
range: RangeInclusive<u32>,
params: &Config,
p2p: Arc<P>,
consensus: Arc<C>,
) -> impl Stream<Item = impl Future<Output = anyhow::Result<Option<SealedBlock>>>> {
get_header_stream(range, params, p2p.clone()).map({
let p2p = p2p.clone();
let consensus_port = consensus.clone();
move |batch| {
{
let p2p = p2p.clone();
let consensus_port = consensus_port.clone();
get_sealed_blocks(batch, p2p.clone(), consensus_port.clone())
}
.instrument(tracing::debug_span!("consensus_and_transactions"))
.in_current_span()
}

// Wait for the da to be at least the da height on the header.
consensus_port
.await_da_height(&header.entity.da_height)
.await?;

get_transactions_on_block(p2p.as_ref(), block_id, header).await
}
})
}

fn get_headers_buffered<P: PeerToPeerPort + Send + Sync + 'static>(
fn get_header_stream<P: PeerToPeerPort + Send + Sync + 'static>(
range: RangeInclusive<u32>,
params: &Config,
p2p: Arc<P>,
) -> impl Stream<Item = anyhow::Result<SourcePeer<SealedBlockHeader>>> {
let Config {
header_batch_size,
max_header_batch_requests,
..
header_batch_size, ..
} = params;
futures::stream::iter(range_chunks(range, *header_batch_size))
.map(move |range| {
let ranges = range_chunks(range, *header_batch_size);
let p2p_gen = iter::repeat_with(move || p2p.clone());
let iter = ranges.zip(p2p_gen);
futures::stream::iter(iter)
.then(move |(range, p2p)| async {
tracing::debug!(
"getting header range from {} to {} inclusive",
range.start(),
range.end()
);
let p2p = p2p.clone();
async move { get_headers_batch(range, p2p).await }
.instrument(tracing::debug_span!("get_headers_batch"))
.in_current_span()
get_headers_batch(range, p2p).await
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I thought it was a good idea to spawn a task here too, but after some benchmarking, it seems to have a super small effect.

Did I get right that we potentially have only params.max_get_txns_requests of get_headers_batch requests?

Copy link
Contributor Author

@bvrooman bvrooman Aug 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this PR ends up removing max_header_batch_requests entirely.

I think we may want to try different benchmark configurations. Before batch headers, we saw some performance improvement of a few percentage points under certain circumstance with this approach. It would be worth trying different configurations of sizes and delays.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is probably better, since it's much harder to model this with multiple buffer parameters.

I suggest we change the name of max_get_txns_requests to block_stream_buffer_size or something more explicit, and directly related. I think get_transactions should be renamed get_block_stream too, since the return type is SealedBlock

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, good suggestions - the naming somehow got away from me here, and deserves some TLC.

})
.buffered(*max_header_batch_requests)
.flatten()
.into_scan_none_or_err()
.scan_none_or_err()
Expand All @@ -306,6 +311,42 @@ fn range_chunks(
})
}

async fn get_sealed_blocks<
P: PeerToPeerPort + Send + Sync + 'static,
C: ConsensusPort + Send + Sync + 'static,
>(
result: anyhow::Result<SourcePeer<SealedBlockHeader>>,
p2p: Arc<P>,
consensus_port: Arc<C>,
) -> anyhow::Result<Option<SealedBlock>> {
let header = match result {
Ok(h) => h,
Err(e) => return Err(e),
};
let SourcePeer {
peer_id,
data: header,
} = header;
let id = header.entity.id();
let block_id = SourcePeer { peer_id, data: id };

// Check the consensus is valid on this header.
if !consensus_port
.check_sealed_header(&header)
.trace_err("Failed to check consensus on header")?
{
tracing::warn!("Header {:?} failed consensus check", header);
return Ok(None)
}

// Wait for the da to be at least the da height on the header.
consensus_port
.await_da_height(&header.entity.da_height)
.await?;

get_transactions_on_block(p2p.as_ref(), block_id, header).await
}

/// Waits for a notify or shutdown signal.
/// Returns true if the notify signal was received.
async fn wait_for_notify_or_shutdown(
Expand Down
Loading