diff --git a/Cargo.lock b/Cargo.lock index 21b7eedbe..ea2983686 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6350,6 +6350,7 @@ dependencies = [ "prost", "rand", "restate-core", + "restate-futures-util", "restate-log-server", "restate-metadata-server", "restate-rocksdb", diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index cf78d04d1..a1a38766d 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -9,7 +9,7 @@ publish = false [features] default = [] -replicated-loglet = [] +replicated-loglet = ["auto-extend"] memory-loglet = ["restate-types/memory-loglet"] test-util = ["memory-loglet", "dep:googletest", "dep:restate-test-util"] # enables bifrost to auto seal and extend. This is a transitional feature that will be removed soon. @@ -19,10 +19,11 @@ auto-extend = [] workspace-hack = { version = "0.1", path = "../../workspace-hack" } restate-core = { workspace = true } -restate-rocksdb = { workspace = true } -restate-types = { workspace = true } +restate-futures-util = { workspace = true } restate-metadata-server = { workspace = true } +restate-rocksdb = { workspace = true } restate-test-util = { workspace = true, optional = true } +restate-types = { workspace = true } anyhow = { workspace = true } async-trait = { workspace = true } diff --git a/crates/bifrost/src/appender.rs b/crates/bifrost/src/appender.rs index 0d4ec6d36..33d8d61c4 100644 --- a/crates/bifrost/src/appender.rs +++ b/crates/bifrost/src/appender.rs @@ -9,30 +9,28 @@ // by the Apache License, Version 2.0. use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; -use tracing::{debug, info, instrument, warn}; +use tracing::{debug, info, instrument, trace, warn}; -use restate_core::Metadata; +use restate_core::{Metadata, TargetVersion}; +use restate_futures_util::overdue::OverdueLoggingExt; use restate_types::config::Configuration; use restate_types::live::Live; use restate_types::logs::metadata::SegmentIndex; use restate_types::logs::{LogId, Lsn, Record}; -use restate_types::retries::RetryIter; use restate_types::storage::StorageEncode; use crate::bifrost::{BifrostInner, ErrorRecoveryStrategy}; use crate::loglet::AppendError; use crate::loglet_wrapper::LogletWrapper; -use crate::{Error, InputRecord, Result}; +use crate::{BifrostAdmin, Error, InputRecord, Result}; #[derive(Clone, derive_more::Debug)] pub struct Appender { log_id: LogId, #[debug(skip)] pub(super) config: Live, - // todo: asoli remove - #[allow(unused)] error_recovery_strategy: ErrorRecoveryStrategy, loglet_cache: Option, #[debug(skip)] @@ -113,18 +111,24 @@ impl Appender { match loglet.append_batch(batch.clone()).await { Ok(lsn) => return Ok(lsn), Err(AppendError::Sealed) => { - info!( + debug!( + log_id = %self.log_id, attempt = attempt, segment_index = %loglet.segment_index(), - "Append batch will be retried (loglet is being sealed), waiting for tail to be determined" + "Batch append failed but will be retried (loglet has been sealed). Waiting for reconfiguration to complete" ); - let new_loglet = Self::wait_next_unsealed_loglet( + let new_loglet = Self::on_sealed_loglet( self.log_id, &self.bifrost_inner, loglet.segment_index(), - &mut retry_iter, + self.error_recovery_strategy, ) .await?; + debug!( + log_id = %self.log_id, + segment_index = %loglet.segment_index(), + "Log reconfiguration has been completed, appender will resume now" + ); self.loglet_cache = Some(new_loglet); } @@ -132,6 +136,7 @@ impl Appender { if let Some(retry_dur) = retry_iter.next() { info!( %err, + log_id = %self.log_id, attempt = attempt, segment_index = %loglet.segment_index(), "Failed to append this batch. Since underlying error is retryable, will retry in {:?}", @@ -141,6 +146,7 @@ impl Appender { } else { warn!( %err, + log_id = %self.log_id, attempt = attempt, segment_index = %loglet.segment_index(), "Failed to append this batch and exhausted all attempts to retry", @@ -154,37 +160,122 @@ impl Appender { } } - #[instrument(level = "error" err, skip(retry_iter, bifrost_inner))] - async fn wait_next_unsealed_loglet( + #[instrument(level = "error" err, skip(bifrost_inner))] + async fn on_sealed_loglet( log_id: LogId, bifrost_inner: &Arc, sealed_segment: SegmentIndex, - retry_iter: &mut RetryIter<'_>, + error_recovery_strategy: ErrorRecoveryStrategy, ) -> Result { + let mut retry_iter = Configuration::pinned() + .bifrost + .append_retry_policy() + .into_iter(); + + let auto_recovery_threshold: Duration = Configuration::pinned() + .bifrost + .auto_recovery_interval + .into(); let start = Instant::now(); - for sleep_dur in retry_iter.by_ref() { + + // Let's give metadata manager an opportunity to sync up to the latest log chain, + // we might be operating with an old view, but we'll only do this style of sync once. + // However, in the retry loop below, we can add a background sync request to metadata + // manager once it supports rate limiting sync requests (to avoid overloading metadata + // store) + let metadata = Metadata::current(); + let _ = metadata + .sync(restate_core::MetadataKind::Logs, TargetVersion::Latest) + .log_slow_after( + auto_recovery_threshold, + tracing::Level::INFO, + "Syncing the log chain from metadata store", + ) + .await; + loop { bifrost_inner.fail_if_shutting_down()?; - tokio::time::sleep(sleep_dur).await; let loglet = bifrost_inner.writeable_loglet(log_id).await?; + let tone_escalated = start.elapsed() > auto_recovery_threshold; // Do we think that the last tail loglet is different and unsealed? if loglet.tail_lsn.is_none() && loglet.segment_index() > sealed_segment { let total_dur = start.elapsed(); - debug!( - "Found an unsealed segment {} after {:?}", - loglet.segment_index(), - total_dur - ); + if tone_escalated { + info!( + open_segment = %loglet.segment_index(), + "New segment detected after {:?}", + total_dur + ); + } else { + debug!( + open_segment = %loglet.segment_index(), + "New segment detected after {:?}", + total_dur + ); + } return Ok(loglet); - } else { - let log_version = Metadata::with_current(|m| m.logs_version()); - debug!( - log_version = %log_version, - "Still waiting for sealing to complete. Elapsed={:?}", + } + + // Okay, tail segment is still sealed + let log_metadata_version = Metadata::with_current(|m| m.logs_version()); + if start.elapsed() > auto_recovery_threshold + && error_recovery_strategy >= ErrorRecoveryStrategy::ExtendChainAllowed + { + // taking the matter into our own hands + let admin = BifrostAdmin::new(bifrost_inner); + info!( + %sealed_segment, + "[Auto Recovery] Attempting to extend the chain to recover log availability with a new configuration. We waited for {:?} before triggering automatic recovery", start.elapsed(), ); + if let Err(err) = admin + .seal_and_auto_extend_chain(log_id, Some(sealed_segment)) + .log_slow_after( + auto_recovery_threshold / 2, + tracing::Level::INFO, + "Extending the chain with new configuration", + ) + .with_overdue(auto_recovery_threshold, tracing::Level::WARN) + .await + { + // we couldn't reconfigure. Let the loop handle retries as normal + info!( + %err, + %log_metadata_version, + "Could not reconfigure the log, perhaps something else beat us to it? We'll check", + ); + } else { + info!( + log_metadata_version = %metadata.logs_version(), + "[Auto Recovery] Reconfiguration complete", + ); + // reconfiguration successful. Metadata is updated at this point + // Do not fall-through to the backoff sleep + continue; + } + } else { + // Holding pattern + if tone_escalated { + info!( + %log_metadata_version, + "In holding pattern, still waiting for log reconfiguration to complete. Elapsed={:?}", + start.elapsed(), + ); + } else { + debug!( + %log_metadata_version, + "In holding pattern, waiting for log reconfiguration to complete. Elapsed={:?}", + start.elapsed(), + ); + } } + let sleep_dur = retry_iter + .next() + .expect("append retries should be infinite"); + // backoff. This is at the bottom to avoid unnecessary sleeps in the happy path + trace!("Will retry the append after {sleep_dur:?}"); + // todo: add async metadata sync request to _influence_ metadata manager to if it needs + // to look for newer log chain version. + tokio::time::sleep(sleep_dur).await; } - - Err(Error::LogSealed(log_id)) } } diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index 187408878..854b514ae 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -29,15 +29,17 @@ use crate::{BifrostAdmin, Error, InputRecord, LogReadStream, Result}; /// The strategy to use when bifrost fails to append or when it observes /// a sealed loglet while it's tailing a log. -#[derive(Clone, Copy, Debug, Eq, PartialEq)] +/// +/// Please keep this enum ordered, i.e. anything > Allowed should still mean allowed. +#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)] pub enum ErrorRecoveryStrategy { - /// Eagerly extend the chain by creating a new loglet and appending to it. - ExtendChainPreferred, + /// Do not extend the chain, wait indefinitely instead until the error disappears. + Wait = 1, /// Extend the chain only running out of patience, others might be better suited to reconfigure /// the chain, but when desperate, we are allowed to seal and extend. ExtendChainAllowed, - /// Do not extend the chain, wait indefinitely instead until the error disappears. - Wait, + /// Eagerly extend the chain by creating a new loglet and appending to it. + ExtendChainPreferred, } impl ErrorRecoveryStrategy { diff --git a/crates/log-server/src/loglet_worker.rs b/crates/log-server/src/loglet_worker.rs index 40b324d99..c58669e8e 100644 --- a/crates/log-server/src/loglet_worker.rs +++ b/crates/log-server/src/loglet_worker.rs @@ -177,7 +177,7 @@ impl LogletWorker { // todo: consider a draining shutdown if needed // this might include sending notifications of shutdown to allow graceful // handoff - debug!(loglet_id = %self.loglet_id, "Loglet writer shutting down"); + trace!(loglet_id = %self.loglet_id, "Loglet writer shutting down"); return; } // GET_DIGEST @@ -191,7 +191,7 @@ impl LogletWorker { Some(Ok(_)) = &mut in_flight_seal => { sealing_in_progress = false; self.loglet_state.get_local_tail_watch().notify_seal(); - debug!(loglet_id = %self.loglet_id, "Loglet is sealed"); + debug!(loglet_id = %self.loglet_id, "Loglet is now sealed on this log-server node"); in_flight_seal.set(None.into()); } // The set of requests waiting for seal to complete diff --git a/crates/types/src/config/bifrost.rs b/crates/types/src/config/bifrost.rs index 8b6f99971..3089b1426 100644 --- a/crates/types/src/config/bifrost.rs +++ b/crates/types/src/config/bifrost.rs @@ -55,6 +55,15 @@ pub struct BifrostOptions { #[cfg_attr(feature = "schemars", schemars(with = "String"))] pub seal_retry_interval: humantime::Duration, + /// # Auto recovery threshold + /// + /// Time interval after which bifrost's auto-recovery mechanism will kick in. This + /// is is triggered in scenarios where the control plane took too long to complete loglet + /// reconfigurations. + #[serde_as(as = "serde_with::DisplayFromStr")] + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + pub auto_recovery_interval: humantime::Duration, + /// # Append retry minimum interval /// /// Minimum retry duration used by the exponential backoff mechanism for bifrost appends. @@ -103,6 +112,7 @@ impl Default for BifrostOptions { ), append_retry_min_interval: Duration::from_millis(10).into(), append_retry_max_interval: Duration::from_secs(1).into(), + auto_recovery_interval: Duration::from_secs(3).into(), seal_retry_interval: Duration::from_secs(2).into(), record_cache_memory_size: 20_000_000u64.into(), // 20MB }