Skip to content

Commit

Permalink
[Bifrost][Auto-Recovery] Appender can reconfigure loglet if sealed
Browse files Browse the repository at this point in the history
In certain scenarios, if the appender is known to be running with "permission" to extend the chain, it'll run out of patience and will attempt to perform reconfiguration itself. This works really well, but it might compete with a cluster controller that thinks a different placement should take place. The auto-recovery mechanism only kicks in after 3 seconds (configurable) and we can see that the appender will attempt to reconfigure the loglet if it's sealed.

At the moment, this will get the system out of trouble in certain scenarios.

```
// intentionally empty
```
  • Loading branch information
AhmedSoliman committed Jan 30, 2025
1 parent 3817910 commit dd19e79
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 38 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ publish = false

[features]
default = []
replicated-loglet = []
replicated-loglet = ["auto-extend"]
memory-loglet = ["restate-types/memory-loglet"]
test-util = ["memory-loglet", "dep:googletest", "dep:restate-test-util"]
# enables bifrost to auto seal and extend. This is a transitional feature that will be removed soon.
Expand All @@ -19,10 +19,11 @@ auto-extend = []
workspace-hack = { version = "0.1", path = "../../workspace-hack" }

restate-core = { workspace = true }
restate-rocksdb = { workspace = true }
restate-types = { workspace = true }
restate-futures-util = { workspace = true }
restate-metadata-server = { workspace = true }
restate-rocksdb = { workspace = true }
restate-test-util = { workspace = true, optional = true }
restate-types = { workspace = true }

anyhow = { workspace = true }
async-trait = { workspace = true }
Expand Down
147 changes: 119 additions & 28 deletions crates/bifrost/src/appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,28 @@
// by the Apache License, Version 2.0.

use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};

use tracing::{debug, info, instrument, warn};
use tracing::{debug, info, instrument, trace, warn};

use restate_core::Metadata;
use restate_core::{Metadata, TargetVersion};
use restate_futures_util::overdue::OverdueLoggingExt;
use restate_types::config::Configuration;
use restate_types::live::Live;
use restate_types::logs::metadata::SegmentIndex;
use restate_types::logs::{LogId, Lsn, Record};
use restate_types::retries::RetryIter;
use restate_types::storage::StorageEncode;

use crate::bifrost::{BifrostInner, ErrorRecoveryStrategy};
use crate::loglet::AppendError;
use crate::loglet_wrapper::LogletWrapper;
use crate::{Error, InputRecord, Result};
use crate::{BifrostAdmin, Error, InputRecord, Result};

#[derive(Clone, derive_more::Debug)]
pub struct Appender {
log_id: LogId,
#[debug(skip)]
pub(super) config: Live<Configuration>,
// todo: asoli remove
#[allow(unused)]
error_recovery_strategy: ErrorRecoveryStrategy,
loglet_cache: Option<LogletWrapper>,
#[debug(skip)]
Expand Down Expand Up @@ -113,25 +111,32 @@ impl Appender {
match loglet.append_batch(batch.clone()).await {
Ok(lsn) => return Ok(lsn),
Err(AppendError::Sealed) => {
info!(
debug!(
log_id = %self.log_id,
attempt = attempt,
segment_index = %loglet.segment_index(),
"Append batch will be retried (loglet is being sealed), waiting for tail to be determined"
"Batch append failed but will be retried (loglet has been sealed). Waiting for reconfiguration to complete"
);
let new_loglet = Self::wait_next_unsealed_loglet(
let new_loglet = Self::on_sealed_loglet(
self.log_id,
&self.bifrost_inner,
loglet.segment_index(),
&mut retry_iter,
self.error_recovery_strategy,
)
.await?;
debug!(
log_id = %self.log_id,
segment_index = %loglet.segment_index(),
"Log reconfiguration has been completed, appender will resume now"
);

self.loglet_cache = Some(new_loglet);
}
Err(AppendError::Other(err)) if err.retryable() => {
if let Some(retry_dur) = retry_iter.next() {
info!(
%err,
log_id = %self.log_id,
attempt = attempt,
segment_index = %loglet.segment_index(),
"Failed to append this batch. Since underlying error is retryable, will retry in {:?}",
Expand All @@ -141,6 +146,7 @@ impl Appender {
} else {
warn!(
%err,
log_id = %self.log_id,
attempt = attempt,
segment_index = %loglet.segment_index(),
"Failed to append this batch and exhausted all attempts to retry",
Expand All @@ -154,37 +160,122 @@ impl Appender {
}
}

#[instrument(level = "error" err, skip(retry_iter, bifrost_inner))]
async fn wait_next_unsealed_loglet(
#[instrument(level = "error" err, skip(bifrost_inner))]
async fn on_sealed_loglet(
log_id: LogId,
bifrost_inner: &Arc<BifrostInner>,
sealed_segment: SegmentIndex,
retry_iter: &mut RetryIter<'_>,
error_recovery_strategy: ErrorRecoveryStrategy,
) -> Result<LogletWrapper> {
let mut retry_iter = Configuration::pinned()
.bifrost
.append_retry_policy()
.into_iter();

let auto_recovery_threshold: Duration = Configuration::pinned()
.bifrost
.auto_recovery_interval
.into();
let start = Instant::now();
for sleep_dur in retry_iter.by_ref() {

// Let's give metadata manager an opportunity to sync up to the latest log chain,
// we might be operating with an old view, but we'll only do this style of sync once.
// However, in the retry loop below, we can add a background sync request to metadata
// manager once it supports rate limiting sync requests (to avoid overloading metadata
// store)
let metadata = Metadata::current();
let _ = metadata
.sync(restate_core::MetadataKind::Logs, TargetVersion::Latest)
.log_slow_after(
auto_recovery_threshold,
tracing::Level::INFO,
"Syncing the log chain from metadata store",
)
.await;
loop {
bifrost_inner.fail_if_shutting_down()?;
tokio::time::sleep(sleep_dur).await;
let loglet = bifrost_inner.writeable_loglet(log_id).await?;
let tone_escalated = start.elapsed() > auto_recovery_threshold;
// Do we think that the last tail loglet is different and unsealed?
if loglet.tail_lsn.is_none() && loglet.segment_index() > sealed_segment {
let total_dur = start.elapsed();
debug!(
"Found an unsealed segment {} after {:?}",
loglet.segment_index(),
total_dur
);
if tone_escalated {
info!(
open_segment = %loglet.segment_index(),
"New segment detected after {:?}",
total_dur
);
} else {
debug!(
open_segment = %loglet.segment_index(),
"New segment detected after {:?}",
total_dur
);
}
return Ok(loglet);
} else {
let log_version = Metadata::with_current(|m| m.logs_version());
debug!(
log_version = %log_version,
"Still waiting for sealing to complete. Elapsed={:?}",
}

// Okay, tail segment is still sealed
let log_metadata_version = Metadata::with_current(|m| m.logs_version());
if start.elapsed() > auto_recovery_threshold
&& error_recovery_strategy >= ErrorRecoveryStrategy::ExtendChainAllowed
{
// taking the matter into our own hands
let admin = BifrostAdmin::new(bifrost_inner);
info!(
%sealed_segment,
"[Auto Recovery] Attempting to extend the chain to recover log availability with a new configuration. We waited for {:?} before triggering automatic recovery",
start.elapsed(),
);
if let Err(err) = admin
.seal_and_auto_extend_chain(log_id, Some(sealed_segment))
.log_slow_after(
auto_recovery_threshold / 2,
tracing::Level::INFO,
"Extending the chain with new configuration",
)
.with_overdue(auto_recovery_threshold, tracing::Level::WARN)
.await
{
// we couldn't reconfigure. Let the loop handle retries as normal
info!(
%err,
%log_metadata_version,
"Could not reconfigure the log, perhaps something else beat us to it? We'll check",
);
} else {
info!(
log_metadata_version = %metadata.logs_version(),
"[Auto Recovery] Reconfiguration complete",
);
// reconfiguration successful. Metadata is updated at this point
// Do not fall-through to the backoff sleep
continue;
}
} else {
// Holding pattern
if tone_escalated {
info!(
%log_metadata_version,
"In holding pattern, still waiting for log reconfiguration to complete. Elapsed={:?}",
start.elapsed(),
);
} else {
debug!(
%log_metadata_version,
"In holding pattern, waiting for log reconfiguration to complete. Elapsed={:?}",
start.elapsed(),
);
}
}
let sleep_dur = retry_iter
.next()
.expect("append retries should be infinite");
// backoff. This is at the bottom to avoid unnecessary sleeps in the happy path
trace!("Will retry the append after {sleep_dur:?}");
// todo: add async metadata sync request to _influence_ metadata manager to if it needs
// to look for newer log chain version.
tokio::time::sleep(sleep_dur).await;
}

Err(Error::LogSealed(log_id))
}
}
12 changes: 7 additions & 5 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ use crate::{BifrostAdmin, Error, InputRecord, LogReadStream, Result};

/// The strategy to use when bifrost fails to append or when it observes
/// a sealed loglet while it's tailing a log.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
///
/// Please keep this enum ordered, i.e. anything > Allowed should still mean allowed.
#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)]
pub enum ErrorRecoveryStrategy {
/// Eagerly extend the chain by creating a new loglet and appending to it.
ExtendChainPreferred,
/// Do not extend the chain, wait indefinitely instead until the error disappears.
Wait = 1,
/// Extend the chain only running out of patience, others might be better suited to reconfigure
/// the chain, but when desperate, we are allowed to seal and extend.
ExtendChainAllowed,
/// Do not extend the chain, wait indefinitely instead until the error disappears.
Wait,
/// Eagerly extend the chain by creating a new loglet and appending to it.
ExtendChainPreferred,
}

impl ErrorRecoveryStrategy {
Expand Down
4 changes: 2 additions & 2 deletions crates/log-server/src/loglet_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl<S: LogStore> LogletWorker<S> {
// todo: consider a draining shutdown if needed
// this might include sending notifications of shutdown to allow graceful
// handoff
debug!(loglet_id = %self.loglet_id, "Loglet writer shutting down");
trace!(loglet_id = %self.loglet_id, "Loglet writer shutting down");
return;
}
// GET_DIGEST
Expand All @@ -191,7 +191,7 @@ impl<S: LogStore> LogletWorker<S> {
Some(Ok(_)) = &mut in_flight_seal => {
sealing_in_progress = false;
self.loglet_state.get_local_tail_watch().notify_seal();
debug!(loglet_id = %self.loglet_id, "Loglet is sealed");
debug!(loglet_id = %self.loglet_id, "Loglet is now sealed on this log-server node");
in_flight_seal.set(None.into());
}
// The set of requests waiting for seal to complete
Expand Down
10 changes: 10 additions & 0 deletions crates/types/src/config/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ pub struct BifrostOptions {
#[cfg_attr(feature = "schemars", schemars(with = "String"))]
pub seal_retry_interval: humantime::Duration,

/// # Auto recovery threshold
///
/// Time interval after which bifrost's auto-recovery mechanism will kick in. This
/// is is triggered in scenarios where the control plane took too long to complete loglet
/// reconfigurations.
#[serde_as(as = "serde_with::DisplayFromStr")]
#[cfg_attr(feature = "schemars", schemars(with = "String"))]
pub auto_recovery_interval: humantime::Duration,

/// # Append retry minimum interval
///
/// Minimum retry duration used by the exponential backoff mechanism for bifrost appends.
Expand Down Expand Up @@ -103,6 +112,7 @@ impl Default for BifrostOptions {
),
append_retry_min_interval: Duration::from_millis(10).into(),
append_retry_max_interval: Duration::from_secs(1).into(),
auto_recovery_interval: Duration::from_secs(3).into(),
seal_retry_interval: Duration::from_secs(2).into(),
record_cache_memory_size: 20_000_000u64.into(), // 20MB
}
Expand Down

0 comments on commit dd19e79

Please sign in to comment.