Skip to content

Commit

Permalink
[Bifrost] Design improvements for find_tail
Browse files Browse the repository at this point in the history
This PR introduces a few changes resulting in significant improvements in failover time, and performance of common operation like find_tail().
The result is failover time that in the hundreds of milliseconds in the happy path and in the order of a couple of seconds in the unhappy path. `find_tail()` is also now significantly cheaper to run if the sequencer is running, this enables parallelization of `find_tail()` runs in logs controller (and more frequent as well). The latter comment will be reflected in a separate PR.
  • Loading branch information
AhmedSoliman committed Jan 31, 2025
1 parent 3b7fd88 commit 638c13b
Show file tree
Hide file tree
Showing 22 changed files with 469 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
cluster_state_tx: Arc<watch::Sender<Arc<ClusterState>>>,
) -> Result<Option<TaskHandle<anyhow::Result<()>>>, ShutdownError> {
let refresh = async move {
// todo: potentially downgrade to trace()...
debug!("Refreshing cluster state");
trace!("Refreshing cluster state");
let last_state = Arc::clone(&cluster_state_tx.borrow());
let metadata = Metadata::current();
// make sure we have a partition table that equals or newer than last refresh
Expand Down Expand Up @@ -219,8 +218,7 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
};

// publish the new state
// todo: potentially downgrade to trace!
debug!("New cluster state is acquired, publishing");
trace!("New cluster state is acquired, publishing");
cluster_state_tx.send(Arc::new(state))?;
Ok(())
};
Expand Down
8 changes: 3 additions & 5 deletions crates/admin/src/cluster_controller/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::time::Duration;

use bytes::{Bytes, BytesMut};
use restate_types::protobuf::cluster::ClusterConfiguration;
use tonic::{async_trait, Request, Response, Status};
use tracing::info;

Expand All @@ -22,6 +19,7 @@ use restate_types::logs::metadata::{Logs, SegmentIndex};
use restate_types::logs::{LogId, Lsn, SequenceNumber};
use restate_types::metadata_store::keys::{BIFROST_CONFIG_KEY, NODES_CONFIG_KEY};
use restate_types::nodes_config::NodesConfiguration;
use restate_types::protobuf::cluster::ClusterConfiguration;
use restate_types::storage::{StorageCodec, StorageEncode};
use restate_types::{Version, Versioned};

Expand Down Expand Up @@ -269,9 +267,9 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {
err => Status::internal(err.to_string()),
})?;

let tail_state = tokio::time::timeout(Duration::from_secs(2), writable_loglet.find_tail())
let tail_state = writable_loglet
.find_tail()
.await
.map_err(|_elapsed| Status::deadline_exceeded("Timedout finding tail"))?
.map_err(|err| Status::internal(err.to_string()))?;

let response = FindTailResponse {
Expand Down
12 changes: 5 additions & 7 deletions crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use rand::prelude::IteratorRandom;
use rand::thread_rng;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tracing::{debug, error, info, trace_span, Instrument};
use tracing::{debug, error, info, trace, trace_span, Instrument};

use restate_bifrost::{Bifrost, Error as BifrostError};
use restate_core::metadata_store::{Precondition, WriteError};
Expand Down Expand Up @@ -766,7 +766,8 @@ impl LogsControllerInner {
)?;

if let Some(logs) = builder.build_if_modified() {
debug!("Proposing new logs configuration: {logs:?}");
trace!(?logs, "Proposing new log chain version {}", logs.version());
debug!("Proposing new log chain version {}", logs.version());
self.logs_write_in_progress = Some(logs.version());
let logs = Arc::new(logs);
effects.push(Effect::WriteLogs {
Expand Down Expand Up @@ -1207,16 +1208,13 @@ impl LogsController {
{
return match err {
WriteError::FailedPrecondition(err) => {
info!(
%err,
"Detected a concurrent modification of the log chain. Fetching latest"
);
info!(%err, "Detected a concurrent modification to the log chain");
// Perhaps we already have a newer version, if not, fetch latest.
let _ = Metadata::current().sync(MetadataKind::Logs, TargetVersion::Version(previous_version.next())).await;
Event::NewLogs
}
err => {
info!(%err, "Failed writing new log chain to metadata store, will retry");
info!(%err, "Failed writing the new log chain {} to metadata store, will retry", logs.version());
Event::WriteLogsFailed {
logs,
previous_version,
Expand Down
4 changes: 2 additions & 2 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tokio::sync::{mpsc, oneshot};
use tokio::time;
use tokio::time::{Instant, Interval, MissedTickBehavior};
use tonic::codec::CompressionEncoding;
use tracing::{debug, info, warn};
use tracing::{debug, info, trace, warn};

use restate_metadata_server::ReadModifyWriteError;
use restate_types::logs::metadata::{
Expand Down Expand Up @@ -320,7 +320,7 @@ impl<T: TransportConnect> Service<T> {
Ok(cluster_state) = cluster_state_watcher.next_cluster_state() => {
self.observed_cluster_state.update(&cluster_state);
// todo: potentially downgrade to trace
debug!("Observed cluster state updated");
trace!("Observed cluster state updated");
// todo quarantine this cluster controller if errors re-occur too often so that
// another cluster controller can take over
if let Err(err) = state.update(&self) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ impl RemoteLogServerManager {
Self { servers }
}

pub fn try_get_tail_offset(&self, id: PlainNodeId) -> Option<TailOffsetWatch> {
let server = self.servers.get(&id).expect("node is in nodeset");

if let Ok(guard) = server.try_lock() {
if let Some(current) = guard.deref() {
return Some(current.local_tail().clone());
}
}

None
}

/// Gets a log-server instance. On first time it will initialize a new connection
/// to log server. It will make sure all following get call holds the same
/// connection.
Expand Down
Loading

0 comments on commit 638c13b

Please sign in to comment.