Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pruning): Fix DB pruner responsiveness during shutdown #2058

Merged
merged 5 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 40 additions & 10 deletions core/lib/db_connection/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
collections::HashMap,
fmt,
fmt, io,
panic::Location,
sync::{
atomic::{AtomicUsize, Ordering},
Expand Down Expand Up @@ -203,18 +203,48 @@ impl<'a, DB: DbMarker> Connection<'a, DB> {
matches!(self.inner, ConnectionInner::Transaction { .. })
}

/// Commits a transactional connection (one which was created by calling [`Self::start_transaction()`]).
/// If this connection is not transactional, returns an error.
pub async fn commit(self) -> DalResult<()> {
if let ConnectionInner::Transaction {
transaction: postgres,
tags,
} = self.inner
{
postgres
match self.inner {
ConnectionInner::Transaction {
transaction: postgres,
tags,
} => postgres
.commit()
.await
.map_err(|err| DalConnectionError::commit_transaction(err, tags.cloned()).into())
} else {
panic!("Connection::commit can only be invoked after calling Connection::begin_transaction");
.map_err(|err| DalConnectionError::commit_transaction(err, tags.cloned()).into()),
ConnectionInner::Pooled(conn) => {
let err = io::Error::new(
io::ErrorKind::Other,
"`Connection::commit()` can only be invoked after calling `Connection::begin_transaction()`",
);
Err(DalConnectionError::commit_transaction(sqlx::Error::Io(err), conn.tags).into())
}
}
}

/// Rolls back a transactional connection (one which was created by calling [`Self::start_transaction()`]).
/// If this connection is not transactional, returns an error.
pub async fn rollback(self) -> DalResult<()> {
match self.inner {
ConnectionInner::Transaction {
transaction: postgres,
tags,
} => postgres
.rollback()
.await
.map_err(|err| DalConnectionError::rollback_transaction(err, tags.cloned()).into()),
ConnectionInner::Pooled(conn) => {
let err = io::Error::new(
io::ErrorKind::Other,
"`Connection::rollback()` can only be invoked after calling `Connection::begin_transaction()`",
);
Err(
DalConnectionError::rollback_transaction(sqlx::Error::Io(err), conn.tags)
.into(),
)
}
}
}

Expand Down
13 changes: 13 additions & 0 deletions core/lib/db_connection/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ enum ConnectionAction {
AcquireConnection,
StartTransaction,
CommitTransaction,
RollbackTransaction,
}

impl ConnectionAction {
Expand All @@ -108,6 +109,7 @@ impl ConnectionAction {
Self::AcquireConnection => "acquiring DB connection",
Self::StartTransaction => "starting DB transaction",
Self::CommitTransaction => "committing DB transaction",
Self::RollbackTransaction => "rolling back DB transaction",
}
}
}
Expand Down Expand Up @@ -165,6 +167,17 @@ impl DalConnectionError {
connection_tags,
}
}

pub(crate) fn rollback_transaction(
inner: sqlx::Error,
connection_tags: Option<ConnectionTags>,
) -> Self {
Self {
inner,
action: ConnectionAction::RollbackTransaction,
connection_tags,
}
}
}

/// Extension trait to create `sqlx::Result`s, similar to `anyhow::Context`.
Expand Down
73 changes: 51 additions & 22 deletions core/node/db_pruner/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
//! Postgres pruning component.

use std::{fmt, sync::Arc, time::Duration};
use std::{sync::Arc, time::Duration};

use anyhow::Context as _;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use zksync_dal::{pruning_dal::PruningInfo, Connection, ConnectionPool, Core, CoreDal};
Expand All @@ -14,7 +13,7 @@ use self::{
metrics::{MetricPruneType, METRICS},
prune_conditions::{
ConsistencyCheckerProcessedBatch, L1BatchExistsCondition, L1BatchOlderThanPruneCondition,
NextL1BatchHasMetadataCondition, NextL1BatchWasExecutedCondition,
NextL1BatchHasMetadataCondition, NextL1BatchWasExecutedCondition, PruneCondition,
},
};

Expand Down Expand Up @@ -59,6 +58,17 @@ impl From<PruningInfo> for DbPrunerHealth {
}
}

/// Outcome of a single pruning iteration.
#[derive(Debug)]
enum PruningIterationOutcome {
/// Nothing to prune.
NoOp,
/// Iteration resulted in pruning.
Pruned,
/// Pruning was interrupted because of a stop signal.
Interrupted,
}

/// Postgres database pruning component.
#[derive(Debug)]
pub struct DbPruner {
Expand All @@ -68,12 +78,6 @@ pub struct DbPruner {
prune_conditions: Vec<Arc<dyn PruneCondition>>,
}

/// Interface to be used for health checks.
#[async_trait]
trait PruneCondition: fmt::Debug + fmt::Display + Send + Sync + 'static {
async fn is_batch_prunable(&self, l1_batch_number: L1BatchNumber) -> anyhow::Result<bool>;
}

impl DbPruner {
pub fn new(config: DbPrunerConfig, connection_pool: ConnectionPool<Core>) -> Self {
let mut conditions: Vec<Arc<dyn PruneCondition>> = vec![
Expand Down Expand Up @@ -207,7 +211,11 @@ impl DbPruner {
Ok(true)
}

async fn hard_prune(&self, storage: &mut Connection<'_, Core>) -> anyhow::Result<()> {
async fn hard_prune(
&self,
storage: &mut Connection<'_, Core>,
stop_receiver: &mut watch::Receiver<bool>,
) -> anyhow::Result<PruningIterationOutcome> {
let latency = METRICS.pruning_chunk_duration[&MetricPruneType::Hard].start();
let mut transaction = storage.start_transaction().await?;

Expand All @@ -221,10 +229,21 @@ impl DbPruner {
format!("bogus pruning info {current_pruning_info:?}: trying to hard-prune data, but there is no soft-pruned L2 block")
})?;

let stats = transaction
.pruning_dal()
.hard_prune_batches_range(last_soft_pruned_l1_batch, last_soft_pruned_l2_block)
.await?;
let mut dal = transaction.pruning_dal();
let stats = tokio::select! {
result = dal.hard_prune_batches_range(
last_soft_pruned_l1_batch,
last_soft_pruned_l2_block,
) => result?,

_ = stop_receiver.changed() => {
// `hard_prune_batches_range()` can take a long time. It looks better to roll back it explicitly here if a node is getting shut down
// rather than waiting a node to force-exit after a timeout, which would interrupt the DB connection and will lead to an implicit rollback.
slowli marked this conversation as resolved.
Show resolved Hide resolved
tracing::info!("Hard pruning interrupted; rolling back pruning transaction");
transaction.rollback().await?;
return Ok(PruningIterationOutcome::Interrupted);
}
};
METRICS.observe_hard_pruning(stats);
transaction.commit().await?;

Expand All @@ -236,10 +255,13 @@ impl DbPruner {
current_pruning_info.last_hard_pruned_l1_batch = Some(last_soft_pruned_l1_batch);
current_pruning_info.last_hard_pruned_l2_block = Some(last_soft_pruned_l2_block);
self.update_health(current_pruning_info);
Ok(())
Ok(PruningIterationOutcome::Pruned)
}

async fn run_single_iteration(&self) -> anyhow::Result<bool> {
async fn run_single_iteration(
&self,
stop_receiver: &mut watch::Receiver<bool>,
) -> anyhow::Result<PruningIterationOutcome> {
let mut storage = self.connection_pool.connection_tagged("db_pruner").await?;
let current_pruning_info = storage.pruning_dal().get_pruning_info().await?;
self.update_health(current_pruning_info);
Expand All @@ -250,15 +272,20 @@ impl DbPruner {
{
let pruning_done = self.soft_prune(&mut storage).await?;
if !pruning_done {
return Ok(false);
return Ok(PruningIterationOutcome::NoOp);
}
}
drop(storage); // Don't hold a connection across a timeout

tokio::time::sleep(self.config.removal_delay).await;
if tokio::time::timeout(self.config.removal_delay, stop_receiver.changed())
.await
.is_ok()
{
return Ok(PruningIterationOutcome::Interrupted);
}

let mut storage = self.connection_pool.connection_tagged("db_pruner").await?;
self.hard_prune(&mut storage).await?;
Ok(true)
self.hard_prune(&mut storage, stop_receiver).await
}

pub async fn run(self, mut stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
Expand All @@ -277,7 +304,7 @@ impl DbPruner {
tracing::warn!("Error updating DB pruning metrics: {err:?}");
}

let should_sleep = match self.run_single_iteration().await {
let should_sleep = match self.run_single_iteration(&mut stop_receiver).await {
Err(err) => {
// As this component is not really mission-critical, all errors are generally ignored
tracing::warn!(
Expand All @@ -290,7 +317,9 @@ impl DbPruner {
self.health_updater.update(health);
true
}
Ok(pruning_done) => !pruning_done,
Ok(PruningIterationOutcome::Interrupted) => break,
slowli marked this conversation as resolved.
Show resolved Hide resolved
Ok(PruningIterationOutcome::Pruned) => false,
Ok(PruningIterationOutcome::NoOp) => true,
};

if should_sleep
Expand Down
5 changes: 4 additions & 1 deletion core/node/db_pruner/src/prune_conditions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use chrono::Utc;
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_types::L1BatchNumber;

use crate::PruneCondition;
#[async_trait]
pub(crate) trait PruneCondition: fmt::Debug + fmt::Display + Send + Sync + 'static {
async fn is_batch_prunable(&self, l1_batch_number: L1BatchNumber) -> anyhow::Result<bool>;
}

#[derive(Debug)]
pub(super) struct L1BatchOlderThanPruneCondition {
Expand Down
Loading
Loading