Skip to content

Commit

Permalink
test: seqlock for gas price service?
Browse files Browse the repository at this point in the history
  • Loading branch information
rymnc committed Jan 2, 2025
1 parent ecf703c commit 8150e07
Show file tree
Hide file tree
Showing 15 changed files with 119 additions and 29 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 9 additions & 5 deletions benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@ version = "0.0.0"
anyhow = { workspace = true }
async-trait = { workspace = true }
clap = { workspace = true, features = ["derive"] }
criterion = { version = "0.5", features = [
"html_reports",
"async",
"async_tokio",
] }
criterion = { version = "0.5", features = ["html_reports", "async", "async_tokio", "async_futures"] }
ctrlc = "3.2.3"
ed25519-dalek = { version = "2.0", features = ["rand_core"] }
enum-iterator = { workspace = true }
Expand All @@ -24,6 +20,8 @@ fuel-core = { path = "../crates/fuel-core", default-features = false, features =
] }
fuel-core-chain-config = { workspace = true }
fuel-core-database = { path = "./../crates/database" }
fuel-core-gas-price-service = { path = "./../crates/services/gas_price_service" }
fuel-gas-price-algorithm = { path = "./../crates/fuel-gas-price-algorithm" }
fuel-core-services = { path = "./../crates/services" }
fuel-core-storage = { path = "./../crates/storage", features = ["smt"] }
fuel-core-sync = { path = "./../crates/services/sync", features = [
Expand All @@ -41,6 +39,7 @@ postcard = { workspace = true }
primitive-types = { workspace = true, default-features = false }
quanta = "0.12"
rand = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
serde_yaml = "0.9.13"
Expand All @@ -50,6 +49,7 @@ test-helpers = { path = "../tests/test-helpers" }
tikv-jemallocator = { workspace = true }
tokio = { workspace = true, features = ["full"] }


[[bench]]
harness = false
name = "import"
Expand All @@ -62,6 +62,10 @@ name = "state"
harness = false
name = "vm"

[[bench]]
harness = false
name = "shared_gas_price_algorithm"

[features]
default = ["fuel-core/rocksdb"]

Expand Down
78 changes: 78 additions & 0 deletions benches/benches/shared_gas_price_algorithm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use criterion::{
criterion_group,
criterion_main,
Criterion,
};
use fuel_core_gas_price_service::v1::algorithm::SharedV1Algorithm;
use fuel_gas_price_algorithm::v1::AlgorithmV1;

#[inline]
fn dummy_algorithm() -> AlgorithmV1 {
AlgorithmV1::default()
}

fn bench_shared_v1_algorithm(c: &mut Criterion) {
// bench initialization of SharedV1Algorithm
c.bench_function("SharedV1Algorithm::new_with_algorithm", |b| {
b.iter(|| {
let _ = SharedV1Algorithm::new_with_algorithm(dummy_algorithm());
})
});

// bench writes to SharedV1Algorithm
c.bench_function("SharedV1Algorithm::update", |b| {
let mut shared_v1_algorithm =
SharedV1Algorithm::new_with_algorithm(dummy_algorithm());

b.iter(|| {
shared_v1_algorithm.update(dummy_algorithm());
})
});

// bench reads from SharedV1Algorithm
c.bench_function("SharedV1Algorithm::next_gas_price", |b| {
let shared_v1_algorithm =
SharedV1Algorithm::new_with_algorithm(dummy_algorithm());

b.iter(|| {
let _ = shared_v1_algorithm.next_gas_price();
})
});

// bench concurrent reads and writes to SharedV1Algorithm
const READER_THREADS: usize = 4;
c.bench_function("SharedV1Algorithm::concurrent_rw", |b| {
let shared_v1_algorithm =
SharedV1Algorithm::new_with_algorithm(dummy_algorithm());
b.iter_custom(|iters| {
let read_lock = shared_v1_algorithm.clone();
let mut write_lock = shared_v1_algorithm.clone();
let start = std::time::Instant::now();

// Simulate parallel reads and writes
rayon::scope(|s| {
// Writer thread
s.spawn(|_| {
for _ in 0..iters {
write_lock.update(dummy_algorithm());
}
});

// Reader threads
for _ in 0..READER_THREADS {
let read_lock = read_lock.clone();
s.spawn(move |_| {
for _ in 0..(iters / READER_THREADS as u64) {
let _ = read_lock.next_gas_price();
}
});
}
});

start.elapsed()
});
});
}

criterion_group!(benches, bench_shared_v1_algorithm);
criterion_main!(benches);
2 changes: 1 addition & 1 deletion crates/fuel-core/src/graphql_api/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ pub trait P2pPort: Send + Sync {
#[async_trait::async_trait]
pub trait GasPriceEstimate: Send + Sync {
/// The worst case scenario for gas price at a given horizon
async fn worst_case_gas_price(&self, height: BlockHeight) -> Option<u64>;
fn worst_case_gas_price(&self, height: BlockHeight) -> Option<u64>;
}

/// Trait for getting VM memory.
Expand Down
1 change: 0 additions & 1 deletion crates/fuel-core/src/schema/gas_price.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ impl EstimateGasPriceQuery {
let gas_price_provider = ctx.data_unchecked::<GasPriceProvider>();
let gas_price = gas_price_provider
.worst_case_gas_price(target_block.into())
.await
.ok_or(async_graphql::Error::new(format!(
"Failed to estimate gas price for block, algorithm not yet set: {target_block:?}"
)))?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl<A> GraphqlGasPriceEstimate for FuelGasPriceProvider<A>
where
A: GasPriceAlgorithm + Send + Sync,
{
async fn worst_case_gas_price(&self, height: BlockHeight) -> Option<u64> {
Some(self.algorithm.worst_case_gas_price(height).await)
fn worst_case_gas_price(&self, height: BlockHeight) -> Option<u64> {
Some(self.algorithm.worst_case_gas_price(height))
}
}
2 changes: 1 addition & 1 deletion crates/fuel-core/src/service/adapters/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl worker::TxPool for TxPoolAdapter {

#[async_trait::async_trait]
impl GasPriceEstimate for StaticGasPrice {
async fn worst_case_gas_price(&self, _height: BlockHeight) -> Option<u64> {
fn worst_case_gas_price(&self, _height: BlockHeight) -> Option<u64> {
Some(self.gas_price)
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-gas-price-algorithm/src/v0.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub enum Error {
SkippedL2Block { expected: u32, got: u32 },
}

#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Copy)]
pub struct AlgorithmV0 {
/// The gas price for to cover the execution of the next block
new_exec_price: u64,
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-gas-price-algorithm/src/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub enum Error {

// TODO: separate exec gas price and DA gas price into newtypes for clarity
// https://github.com/FuelLabs/fuel-core/issues/2382
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Default, Copy)]
pub struct AlgorithmV1 {
/// The gas price for to cover the execution of the next block
new_exec_price: u64,
Expand Down
1 change: 0 additions & 1 deletion crates/services/gas_price_service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ fuel-core-types = { workspace = true, features = ["std"] }
fuel-gas-price-algorithm = { workspace = true }
futures = { workspace = true }
num_enum = { workspace = true }
parking_lot = { workspace = true }
reqwest = { workspace = true, features = ["json"] }
serde = { workspace = true }
strum = { workspace = true, features = ["derive"] }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use fuel_core_services::seqlock::SeqLock;
use fuel_core_types::fuel_types::BlockHeight;
use std::sync::Arc;

pub trait GasPriceAlgorithm {
pub trait GasPriceAlgorithm: Copy {
fn next_gas_price(&self) -> u64;
fn worst_case_gas_price(&self, block_height: BlockHeight) -> u64;
}

#[derive(Debug, Default)]
pub struct SharedGasPriceAlgo<A>(Arc<parking_lot::RwLock<A>>);
pub struct SharedGasPriceAlgo<A>(Arc<SeqLock<A>>);

impl<A> Clone for SharedGasPriceAlgo<A> {
fn clone(&self) -> Self {
Expand All @@ -20,12 +21,13 @@ where
A: Send + Sync,
{
pub fn new_with_algorithm(algorithm: A) -> Self {
Self(Arc::new(parking_lot::RwLock::new(algorithm)))
Self(Arc::new(SeqLock::new(algorithm)))
}

pub async fn update(&mut self, new_algo: A) {
let mut write_lock = self.0.write();
*write_lock = new_algo;
pub fn update(&mut self, new_algo: A) {
self.0.write(|data| {
*data = new_algo;
});
}
}

Expand All @@ -37,7 +39,7 @@ where
self.0.read().next_gas_price()
}

pub async fn worst_case_gas_price(&self, block_height: BlockHeight) -> u64 {
pub fn worst_case_gas_price(&self, block_height: BlockHeight) -> u64 {
self.0.read().worst_case_gas_price(block_height)
}
}
2 changes: 1 addition & 1 deletion crates/services/gas_price_service/src/static_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ impl StaticAlgorithmUpdater {
}
}

#[derive(Clone, Debug)]
#[derive(Clone, Debug, Copy)]
pub struct StaticAlgorithm {
price: u64,
}
Expand Down
6 changes: 3 additions & 3 deletions crates/services/gas_price_service/src/v0/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ where
self.shared_algo.clone()
}

async fn update(&mut self, new_algorithm: AlgorithmV0) {
self.shared_algo.update(new_algorithm).await;
fn update(&mut self, new_algorithm: AlgorithmV0) {
self.shared_algo.update(new_algorithm);
}

fn validate_block_gas_capacity(
Expand Down Expand Up @@ -115,7 +115,7 @@ where
}
}

self.update(self.algorithm_updater.algorithm()).await;
self.update(self.algorithm_updater.algorithm());
Ok(())
}
}
Expand Down
8 changes: 4 additions & 4 deletions crates/services/gas_price_service/src/v1/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ where
&self.storage_tx_provider
}

async fn update(&mut self, new_algorithm: AlgorithmV1) {
self.shared_algo.update(new_algorithm).await;
fn update(&mut self, new_algorithm: AlgorithmV1) {
self.shared_algo.update(new_algorithm);
}

fn validate_block_gas_capacity(
Expand Down Expand Up @@ -193,7 +193,7 @@ where
.map_err(|err| anyhow!(err))?;
AtomicStorage::commit_transaction(storage_tx)?;
let new_algo = self.algorithm_updater.algorithm();
self.shared_algo.update(new_algo).await;
self.shared_algo.update(new_algo);
// Clear the buffer after committing changes
self.da_block_costs_buffer.clear();
Ok(())
Expand All @@ -210,7 +210,7 @@ where
tx.set_metadata(&metadata).map_err(|err| anyhow!(err))?;
AtomicStorage::commit_transaction(tx)?;
let new_algo = self.algorithm_updater.algorithm();
self.shared_algo.update(new_algo).await;
self.shared_algo.update(new_algo);
}
BlockInfo::Block {
height,
Expand Down
6 changes: 6 additions & 0 deletions crates/services/src/seqlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ pub struct SeqLock<T> {

unsafe impl<T: Send> Sync for SeqLock<T> {}

impl<T: Default> Default for SeqLock<T> {
fn default() -> Self {
Self::new(T::default())
}
}

impl<T> SeqLock<T> {
/// Create a new SeqLock with the given data
pub fn new(data: T) -> Self {
Expand Down

0 comments on commit 8150e07

Please sign in to comment.