Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call flush during end of the FuelService #1456

Merged
merged 4 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ Description of the upcoming release here.

### Added
- [#1457](https://github.com/FuelLabs/fuel-core/pull/1457): Fixing incorrect measurement for fast(µs) opcodes.
- [#1449](https://github.com/FuelLabs/fuel-core/pull/1449): fix coin pagination in e2e test client
- [#1456](https://github.com/FuelLabs/fuel-core/pull/1456): Added flushing of the RocksDB during a graceful shutdown.
- [#1456](https://github.com/FuelLabs/fuel-core/pull/1456): Added more logs to track the service lifecycle.
- [#1449](https://github.com/FuelLabs/fuel-core/pull/1449): Fix coin pagination in e2e test client.
- [#1447](https://github.com/FuelLabs/fuel-core/pull/1447): Add timeout for continuous e2e tests
- [#1444](https://github.com/FuelLabs/fuel-core/pull/1444): Add "sanity" benchmarks for memory opcodes.
- [#1437](https://github.com/FuelLabs/fuel-core/pull/1437): Add some transaction throughput tests for basic transfers.
Expand Down
4 changes: 4 additions & 0 deletions crates/fuel-core/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ impl Database {
pub fn transaction(&self) -> DatabaseTransaction {
self.into()
}

pub fn flush(self) -> DatabaseResult<()> {
self.data.flush()
}
}

/// Mutable methods.
Expand Down
10 changes: 9 additions & 1 deletion crates/fuel-core/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ impl FuelService {
#[tracing::instrument(skip_all, fields(name = %config.name))]
pub fn new(database: Database, config: Config) -> anyhow::Result<Self> {
let config = config.make_config_consistent();
database.init(&config.chain_conf)?;
let task = Task::new(database, config)?;
let runner = ServiceRunner::new(task);
let shared = runner.shared.clone();
Expand All @@ -93,6 +92,11 @@ impl FuelService {
);
Database::default()
} else {
tracing::info!(
"Opening database {:?} with cache size \"{}\"",
config.database_path,
config.max_database_cache_size
);
Database::open(&config.database_path, config.max_database_cache_size)?
}
}
Expand Down Expand Up @@ -182,9 +186,12 @@ impl Task {
/// Private inner method for initializing the fuel service task
pub fn new(database: Database, config: Config) -> anyhow::Result<Task> {
// initialize state
tracing::info!("Initializing database");
database.init(&config.chain_conf)?;
genesis::maybe_initialize_state(&config, &database)?;

// initialize sub services
tracing::info!("Initializing sub services");
let (services, shared) = sub_services::init_sub_services(&config, &database)?;
Ok(Task { services, shared })
}
Expand Down Expand Up @@ -251,6 +258,7 @@ impl RunnableTask for Task {
);
}
}
self.shared.database.flush()?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: There are a few places that we're calling self.something.somethig_else....

It might be better to create a helper functions for each of these:
self.flush_database(), self.flush_view_layer(), etc

see: https://en.wikipedia.org/wiki/Law_of_Demeter

Ok(())
}
}
Expand Down
4 changes: 3 additions & 1 deletion crates/fuel-core/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ pub enum WriteOperation {
Remove,
}

pub trait TransactableStorage: BatchOperations + Debug + Send + Sync {}
pub trait TransactableStorage: BatchOperations + Debug + Send + Sync {
fn flush(&self) -> DatabaseResult<()>;
}

pub mod in_memory;
#[cfg(feature = "rocksdb")]
Expand Down
9 changes: 8 additions & 1 deletion crates/fuel-core/src/state/in_memory/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,14 @@ impl KeyValueStore for MemoryStore {

impl BatchOperations for MemoryStore {}

impl TransactableStorage for MemoryStore {}
impl TransactableStorage for MemoryStore {
fn flush(&self) -> DatabaseResult<()> {
for lock in self.inner.iter() {
lock.lock().expect("poisoned").clear();
}
Ok(())
}
}

#[cfg(test)]
mod tests {
Expand Down
10 changes: 9 additions & 1 deletion crates/fuel-core/src/state/in_memory/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,15 @@ impl KeyValueStore for MemoryTransactionView {

impl BatchOperations for MemoryTransactionView {}

impl TransactableStorage for MemoryTransactionView {}
impl TransactableStorage for MemoryTransactionView {
fn flush(&self) -> DatabaseResult<()> {
for lock in self.changes.iter() {
lock.lock().expect("poisoned lock").clear();
}
self.view_layer.flush()?;
self.data_source.flush()
}
}

#[cfg(test)]
mod tests {
Expand Down
14 changes: 13 additions & 1 deletion crates/fuel-core/src/state/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ impl RocksDb {
opts.set_row_cache(&cache);
}

DB::repair(&opts, &path).map_err(|e| DatabaseError::Other(e.into()))?;

let db = match DB::open_cf_descriptors(&opts, &path, cf_descriptors) {
Err(_) => {
// setup cfs
Expand Down Expand Up @@ -390,7 +392,17 @@ impl BatchOperations for RocksDb {
}
}

impl TransactableStorage for RocksDb {}
impl TransactableStorage for RocksDb {
fn flush(&self) -> DatabaseResult<()> {
self.db
.flush_wal(true)
.map_err(|e| anyhow::anyhow!("Unable to flush WAL file: {}", e))?;
self.db
.flush()
.map_err(|e| anyhow::anyhow!("Unable to flush SST files: {}", e))?;
Ok(())
}
}

#[cfg(test)]
mod tests {
Expand Down
4 changes: 4 additions & 0 deletions crates/services/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ where
}
});

tracing::info!("The service {} is shut down", S::NAME);

if let State::StoppedWithError(err) = stopped_state {
std::panic::resume_unwind(Box::new(err));
}
Expand Down Expand Up @@ -325,6 +327,7 @@ async fn run<S>(
}

// We can panic here, because it is inside of the task.
tracing::info!("Starting {} service", S::NAME);
let mut task = service
.into_task(&state, params)
.await
Expand Down Expand Up @@ -375,6 +378,7 @@ async fn run<S>(
}
}

tracing::info!("Shutting down {} service", S::NAME);
let shutdown = std::panic::AssertUnwindSafe(task.shutdown());
match shutdown.catch_unwind().await {
Ok(Ok(_)) => {}
Expand Down
1 change: 0 additions & 1 deletion crates/services/sync/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ where
}

async fn shutdown(self) -> anyhow::Result<()> {
tracing::info!("Sync task shutting down");
self.import_task_handle.stop_and_await().await?;
Ok(())
}
Expand Down