Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bifrost][Auto-Recovery] Appender can reconfigure loglet if sealed #2587

Merged
merged 2 commits into from
Jan 30, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 4 additions & 21 deletions crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@ use restate_types::logs::metadata::{
};
use restate_types::logs::{LogId, LogletId, Lsn, TailState};
use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY;
use restate_types::nodes_config::{NodeConfig, NodesConfiguration, Role, StorageState};
use restate_types::nodes_config::{NodeConfig, NodesConfiguration, StorageState};
use restate_types::partition_table::PartitionTable;
use restate_types::replicated_loglet::{EffectiveNodeSet, ReplicatedLogletParams};
use restate_types::replication::{NodeSetSelector, NodeSetSelectorOptions};
@@ -353,24 +353,7 @@ fn try_provisioning(
}
}

fn logserver_candidate_filter(_node_id: PlainNodeId, config: &NodeConfig) -> bool {
// Important note: we check if the server has role=log-server when storage_state is
// provisioning because all nodes get provisioning storage by default, we only care about
// log-servers so we avoid adding other nodes in the nodeset. In the case of read-write, we
// don't check the role to not accidentally consider those nodes as non-logservers even if
// the role was removed by mistake (although some protection should be added for this)
match config.log_server_config.storage_state {
StorageState::ReadWrite => true,
StorageState::Provisioning if config.has_role(Role::LogServer) => true,
// explicit match to make it clear that we are excluding nodes with the following states,
// any new states added will force the compiler to fail
StorageState::Provisioning
| StorageState::Disabled
| StorageState::ReadOnly
| StorageState::DataLoss => false,
}
}

#[cfg(feature = "replicated-loglet")]
fn logserver_writeable_node_filter(
observed_cluster_state: &ObservedClusterState,
) -> impl Fn(PlainNodeId, &NodeConfig) -> bool + '_ {
@@ -420,7 +403,7 @@ pub fn build_new_replicated_loglet_configuration(
let selection = NodeSetSelector::select(
nodes_config,
&replication,
logserver_candidate_filter,
restate_bifrost::providers::replicated_loglet::logserver_candidate_filter,
logserver_writeable_node_filter(observed_cluster_state),
opts,
);
@@ -521,7 +504,7 @@ impl LogletConfiguration {
let Ok(selection) = NodeSetSelector::select(
nodes_config,
&params.replication,
logserver_candidate_filter,
restate_bifrost::providers::replicated_loglet::logserver_candidate_filter,
logserver_writeable_node_filter(observed_cluster_state),
opts,
) else {
7 changes: 4 additions & 3 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ publish = false

[features]
default = []
replicated-loglet = []
replicated-loglet = ["auto-extend"]
memory-loglet = ["restate-types/memory-loglet"]
test-util = ["memory-loglet", "dep:googletest", "dep:restate-test-util"]
# enables bifrost to auto seal and extend. This is a transitional feature that will be removed soon.
@@ -19,10 +19,11 @@ auto-extend = []
workspace-hack = { version = "0.1", path = "../../workspace-hack" }

restate-core = { workspace = true }
restate-rocksdb = { workspace = true }
restate-types = { workspace = true }
restate-futures-util = { workspace = true }
restate-metadata-server = { workspace = true }
restate-rocksdb = { workspace = true }
restate-test-util = { workspace = true, optional = true }
restate-types = { workspace = true }

anyhow = { workspace = true }
async-trait = { workspace = true }
147 changes: 119 additions & 28 deletions crates/bifrost/src/appender.rs
Original file line number Diff line number Diff line change
@@ -9,30 +9,28 @@
// by the Apache License, Version 2.0.

use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};

use tracing::{debug, info, instrument, warn};
use tracing::{debug, info, instrument, trace, warn};

use restate_core::Metadata;
use restate_core::{Metadata, TargetVersion};
use restate_futures_util::overdue::OverdueLoggingExt;
use restate_types::config::Configuration;
use restate_types::live::Live;
use restate_types::logs::metadata::SegmentIndex;
use restate_types::logs::{LogId, Lsn, Record};
use restate_types::retries::RetryIter;
use restate_types::storage::StorageEncode;

use crate::bifrost::{BifrostInner, ErrorRecoveryStrategy};
use crate::loglet::AppendError;
use crate::loglet_wrapper::LogletWrapper;
use crate::{Error, InputRecord, Result};
use crate::{BifrostAdmin, Error, InputRecord, Result};

#[derive(Clone, derive_more::Debug)]
pub struct Appender {
log_id: LogId,
#[debug(skip)]
pub(super) config: Live<Configuration>,
// todo: asoli remove
#[allow(unused)]
error_recovery_strategy: ErrorRecoveryStrategy,
loglet_cache: Option<LogletWrapper>,
#[debug(skip)]
@@ -113,25 +111,32 @@ impl Appender {
match loglet.append_batch(batch.clone()).await {
Ok(lsn) => return Ok(lsn),
Err(AppendError::Sealed) => {
info!(
debug!(
log_id = %self.log_id,
attempt = attempt,
segment_index = %loglet.segment_index(),
"Append batch will be retried (loglet is being sealed), waiting for tail to be determined"
"Batch append failed but will be retried (loglet has been sealed). Waiting for reconfiguration to complete"
);
let new_loglet = Self::wait_next_unsealed_loglet(
let new_loglet = Self::on_sealed_loglet(
self.log_id,
&self.bifrost_inner,
loglet.segment_index(),
&mut retry_iter,
self.error_recovery_strategy,
)
.await?;
debug!(
log_id = %self.log_id,
segment_index = %loglet.segment_index(),
"Log reconfiguration has been completed, appender will resume now"
);

self.loglet_cache = Some(new_loglet);
}
Err(AppendError::Other(err)) if err.retryable() => {
if let Some(retry_dur) = retry_iter.next() {
info!(
%err,
log_id = %self.log_id,
attempt = attempt,
segment_index = %loglet.segment_index(),
"Failed to append this batch. Since underlying error is retryable, will retry in {:?}",
@@ -141,6 +146,7 @@ impl Appender {
} else {
warn!(
%err,
log_id = %self.log_id,
attempt = attempt,
segment_index = %loglet.segment_index(),
"Failed to append this batch and exhausted all attempts to retry",
@@ -154,37 +160,122 @@ impl Appender {
}
}

#[instrument(level = "error" err, skip(retry_iter, bifrost_inner))]
async fn wait_next_unsealed_loglet(
#[instrument(level = "error" err, skip(bifrost_inner))]
async fn on_sealed_loglet(
log_id: LogId,
bifrost_inner: &Arc<BifrostInner>,
sealed_segment: SegmentIndex,
retry_iter: &mut RetryIter<'_>,
error_recovery_strategy: ErrorRecoveryStrategy,
) -> Result<LogletWrapper> {
let mut retry_iter = Configuration::pinned()
.bifrost
.append_retry_policy()
.into_iter();

let auto_recovery_threshold: Duration = Configuration::pinned()
.bifrost
.auto_recovery_interval
.into();
let start = Instant::now();
for sleep_dur in retry_iter.by_ref() {

// Let's give metadata manager an opportunity to sync up to the latest log chain,
// we might be operating with an old view, but we'll only do this style of sync once.
// However, in the retry loop below, we can add a background sync request to metadata
// manager once it supports rate limiting sync requests (to avoid overloading metadata
// store)
let metadata = Metadata::current();
let _ = metadata
.sync(restate_core::MetadataKind::Logs, TargetVersion::Latest)
.log_slow_after(
auto_recovery_threshold,
tracing::Level::INFO,
"Syncing the log chain from metadata store",
)
.await;
loop {
bifrost_inner.fail_if_shutting_down()?;
tokio::time::sleep(sleep_dur).await;
let loglet = bifrost_inner.writeable_loglet(log_id).await?;
let tone_escalated = start.elapsed() > auto_recovery_threshold;
// Do we think that the last tail loglet is different and unsealed?
if loglet.tail_lsn.is_none() && loglet.segment_index() > sealed_segment {
let total_dur = start.elapsed();
debug!(
"Found an unsealed segment {} after {:?}",
loglet.segment_index(),
total_dur
);
if tone_escalated {
info!(
open_segment = %loglet.segment_index(),
"New segment detected after {:?}",
total_dur
);
} else {
debug!(
open_segment = %loglet.segment_index(),
"New segment detected after {:?}",
total_dur
);
}
return Ok(loglet);
} else {
let log_version = Metadata::with_current(|m| m.logs_version());
debug!(
log_version = %log_version,
"Still waiting for sealing to complete. Elapsed={:?}",
}

// Okay, tail segment is still sealed
let log_metadata_version = Metadata::with_current(|m| m.logs_version());
if start.elapsed() > auto_recovery_threshold
&& error_recovery_strategy >= ErrorRecoveryStrategy::ExtendChainAllowed
{
// taking the matter into our own hands
let admin = BifrostAdmin::new(bifrost_inner);
info!(
%sealed_segment,
"[Auto Recovery] Attempting to extend the chain to recover log availability with a new configuration. We waited for {:?} before triggering automatic recovery",
start.elapsed(),
);
if let Err(err) = admin
.seal_and_auto_extend_chain(log_id, Some(sealed_segment))
.log_slow_after(
auto_recovery_threshold / 2,
tracing::Level::INFO,
"Extending the chain with new configuration",
)
.with_overdue(auto_recovery_threshold, tracing::Level::WARN)
.await
{
// we couldn't reconfigure. Let the loop handle retries as normal
info!(
%err,
%log_metadata_version,
"Could not reconfigure the log, perhaps something else beat us to it? We'll check",
);
} else {
info!(
log_metadata_version = %metadata.logs_version(),
"[Auto Recovery] Reconfiguration complete",
);
// reconfiguration successful. Metadata is updated at this point
// Do not fall-through to the backoff sleep
continue;
}
} else {
// Holding pattern
if tone_escalated {
info!(
%log_metadata_version,
"In holding pattern, still waiting for log reconfiguration to complete. Elapsed={:?}",
start.elapsed(),
);
} else {
debug!(
%log_metadata_version,
"In holding pattern, waiting for log reconfiguration to complete. Elapsed={:?}",
start.elapsed(),
);
}
}
let sleep_dur = retry_iter
.next()
.expect("append retries should be infinite");
// backoff. This is at the bottom to avoid unnecessary sleeps in the happy path
trace!("Will retry the append after {sleep_dur:?}");
// todo: add async metadata sync request to _influence_ metadata manager to if it needs
// to look for newer log chain version.
tokio::time::sleep(sleep_dur).await;
}

Err(Error::LogSealed(log_id))
}
}
12 changes: 7 additions & 5 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
@@ -29,15 +29,17 @@ use crate::{BifrostAdmin, Error, InputRecord, LogReadStream, Result};

/// The strategy to use when bifrost fails to append or when it observes
/// a sealed loglet while it's tailing a log.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
///
/// Please keep this enum ordered, i.e. anything > Allowed should still mean allowed.
#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)]
pub enum ErrorRecoveryStrategy {
/// Eagerly extend the chain by creating a new loglet and appending to it.
ExtendChainPreferred,
/// Do not extend the chain, wait indefinitely instead until the error disappears.
Wait = 1,
/// Extend the chain only running out of patience, others might be better suited to reconfigure
/// the chain, but when desperate, we are allowed to seal and extend.
ExtendChainAllowed,
/// Do not extend the chain, wait indefinitely instead until the error disappears.
Wait,
/// Eagerly extend the chain by creating a new loglet and appending to it.
ExtendChainPreferred,
}

impl ErrorRecoveryStrategy {
1 change: 1 addition & 0 deletions crates/bifrost/src/providers/replicated_loglet/mod.rs
Original file line number Diff line number Diff line change
@@ -23,4 +23,5 @@ mod tasks;
#[cfg(any(test, feature = "test-util"))]
pub mod test_util;

pub use provider::logserver_candidate_filter;
pub use provider::Factory;
Loading