Skip to content

Commit

Permalink
Misc improvements to cluster_controller/logs_controller
Browse files Browse the repository at this point in the history
Some of the debug messages will likely be changed to trace before the release cut off date
  • Loading branch information
AhmedSoliman committed Jan 29, 2025
1 parent 013a1d1 commit 4761e69
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
cluster_state_tx: Arc<watch::Sender<Arc<ClusterState>>>,
) -> Result<Option<TaskHandle<anyhow::Result<()>>>, ShutdownError> {
let refresh = async move {
// todo: potentially downgrade to trace()...
debug!("Refreshing cluster state");
let last_state = Arc::clone(&cluster_state_tx.borrow());
let metadata = Metadata::current();
// make sure we have a partition table that equals or newer than last refresh
Expand Down Expand Up @@ -217,6 +219,8 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
};

// publish the new state
// todo: potentially downgrade to trace!
debug!("New cluster state is acquired, publishing");
cluster_state_tx.send(Arc::new(state))?;
Ok(())
};
Expand Down
62 changes: 41 additions & 21 deletions crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ use rand::prelude::IteratorRandom;
use rand::thread_rng;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tracing::{debug, error, info, trace, trace_span, Instrument};
use tracing::{debug, error, info, trace_span, Instrument};

use restate_bifrost::{Bifrost, Error as BifrostError};
use restate_core::metadata_store::{Precondition, WriteError};
use restate_core::{
Metadata, MetadataKind, MetadataWriter, ShutdownError, TargetVersion, TaskCenterFutureExt,
};
use restate_futures_util::overdue::OverdueLoggingExt;
use restate_types::errors::GenericError;
use restate_types::identifiers::PartitionId;
use restate_types::live::Pinned;
Expand Down Expand Up @@ -141,6 +142,12 @@ impl LogState {
segment_index: segment_index_to_seal,
seal_lsn,
}
} else {
debug!(
"Ignoring sealing because out state is at segment {} while the seal is for segment {}. Current state is LogState::Available, we are not transitioning to LogState::Sealed",
segment_index,
segment_index_to_seal,
);
}
}
LogState::Sealing {
Expand All @@ -156,6 +163,12 @@ impl LogState {
segment_index: segment_index_to_seal,
seal_lsn,
}
} else {
debug!(
"Ignoring sealing because out state is at segment {} while the seal is for segment {}. Current state is LogState::Sealing",
segment_index,
segment_index_to_seal,
);
}
}
LogState::Sealed { .. } => {}
Expand Down Expand Up @@ -1077,7 +1090,7 @@ impl LogsController {
for (log_id, chain) in logs.iter() {
let tail_segment = chain.tail();

let writable_loglet = match bifrost.admin().writeable_loglet(*log_id).await {
let writeable_loglet = match bifrost.admin().writeable_loglet(*log_id).await {
Ok(loglet) => loglet,
Err(BifrostError::Shutdown(_)) => break,
Err(err) => {
Expand All @@ -1086,25 +1099,26 @@ impl LogsController {
}
};

if writable_loglet.segment_index() != tail_segment.index() {
if writeable_loglet.segment_index() != tail_segment.index() {
// writable segment in bifrost is probably ahead of our snapshot.
// then there is probably a new metadata update that will fix this
// for now we just ignore this segment
trace!(%log_id, segment_index=%tail_segment.index(), "Segment is not tail segment, skip finding tail");
debug!(%log_id, segment_index=%tail_segment.index(), "Segment is not tail segment, skip finding tail");
continue;
}

let found_tail = match writable_loglet.find_tail().await {
debug!(%log_id, segment_index=%writeable_loglet.segment_index(), "Attempting to find tail for loglet");
let found_tail = match writeable_loglet.find_tail().await {
Ok(tail) => tail,
Err(err) => {
debug!(error=%err, %log_id, segment_index=%tail_segment.index(), "Failed to find tail for loglet");
debug!(error=%err, %log_id, segment_index=%writeable_loglet.segment_index(), "Failed to find tail for loglet");
continue;
}
};

// send message
let update = LogTailUpdate {
segment_index: writable_loglet.segment_index(),
segment_index: writeable_loglet.segment_index(),
tail: found_tail,
};

Expand All @@ -1118,6 +1132,12 @@ impl LogsController {

self.async_operations.spawn(
find_tail
.log_slow_after(
Duration::from_secs(3),
tracing::Level::INFO,
"Determining the tail status for all logs",
)
.with_overdue(Duration::from_secs(30), tracing::Level::WARN)
.instrument(trace_span!("scheduled-find-tail"))
.in_current_tc(),
);
Expand Down Expand Up @@ -1204,7 +1224,7 @@ impl LogsController {
{
return match err {
WriteError::FailedPrecondition(err) => {
debug!(
info!(
%err,
"Detected a concurrent modification of the log chain. Fetching latest"
);
Expand Down Expand Up @@ -1241,26 +1261,26 @@ impl LogsController {

self.async_operations.spawn(
async move {
let start = tokio::time::Instant::now();
if let Some(debounce) = &mut debounce {
let delay = debounce.next().unwrap_or(FALLBACK_MAX_RETRY_DELAY);
debug!(?delay, %log_id, %segment_index, "Wait before attempting to seal log");
debug!(?delay, %log_id, %segment_index, "Wait before attempting to seal loglet");
tokio::time::sleep(delay).await;
}

match bifrost.admin().seal(log_id, segment_index).await {
Ok(sealed_segment) => {
if sealed_segment.tail.is_sealed() {
Event::SealSucceeded {
log_id,
segment_index,
seal_lsn: sealed_segment.tail.offset(),
}
} else {
Event::SealFailed {
log_id,
segment_index,
debounce,
}
debug!(
%log_id,
%segment_index,
"Loglet has been sealed in {:?}, stable tail is {}",
start.elapsed(),
sealed_segment.tail.offset(),
);
Event::SealSucceeded {
log_id,
segment_index,
seal_lsn: sealed_segment.tail.offset(),
}
}
Err(err) => {
Expand Down
53 changes: 40 additions & 13 deletions crates/admin/src/cluster_controller/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@

use std::collections::{BTreeMap, HashSet};
use std::sync::Arc;
use std::time::Duration;

use itertools::Itertools;
use rand::seq::IteratorRandom;
use tracing::debug;
use tracing::{debug, info};

use restate_core::metadata_store::{Precondition, ReadError, ReadWriteError, WriteError};
use restate_core::network::{NetworkSender, Networking, Outgoing, TransportConnect};
use restate_core::{
cancellation_watcher, Metadata, MetadataKind, MetadataWriter, ShutdownError, SyncError,
TargetVersion, TaskCenter, TaskHandle, TaskKind,
};
use restate_futures_util::overdue::OverdueLoggingExt;
use restate_types::cluster::cluster_state::RunMode;
use restate_types::identifiers::PartitionId;
use restate_types::locality::LocationScope;
Expand Down Expand Up @@ -133,7 +135,6 @@ impl<T: TransportConnect> Scheduler<T> {
// we need to wait until both partitions and logs are created
return Ok(());
}

let version = partition_table.version();

// todo(azmy): avoid cloning the partition table every time by keeping
Expand All @@ -156,7 +157,10 @@ impl<T: TransportConnect> Scheduler<T> {
});

if let Some(partition_table) = builder.build_if_modified() {
debug!("Updated partition table placement: {partition_table:?}");
debug!(
"Will attempt to write partition table {} to metadata store",
partition_table.version()
);
self.try_update_partition_table(version, partition_table)
.await?;

Expand All @@ -179,27 +183,50 @@ impl<T: TransportConnect> Scheduler<T> {
&partition_table,
Precondition::MatchesVersion(version),
)
.log_slow_after(
Duration::from_secs(1),
tracing::Level::DEBUG,
format!("Updating partition table to version {version}"),
)
.with_overdue(Duration::from_secs(3), tracing::Level::INFO)
.await
{
Ok(_) => {}
Err(WriteError::FailedPrecondition(msg)) => {
debug!("Partition table update failed due to: {msg}");
// There is no need to wait for the partition table
// to synchronize. The update_partition_placement will
// get called again anyway once the partition table is updated.
self.sync_partition_table()?;
Ok(_) => {
debug!(
"Partition table {} has been written to metadata store",
partition_table.version()
);
}
Err(WriteError::FailedPrecondition(err)) => {
info!(
err,
"Write partition table to metadata store was rejected due to version conflict, \
this is benign unless it's happening repeatedly. In such case, we might be in \
a tight race with another admin node"
);
// There is no need to wait for the partition table to synchronize.
// The update_partition_placement will get called again anyway once
// the partition table is updated.
self.sync_partition_table(partition_table.version().next())?;
}
Err(err) => return Err(err.into()),
}

let new_version = partition_table.version();
self.metadata_writer
.update(Arc::new(partition_table))
.log_slow_after(
Duration::from_millis(100),
tracing::Level::DEBUG,
format!("Updating partition table in metadata manager to {new_version}."),
)
.with_overdue(Duration::from_secs(2), tracing::Level::INFO)
.await?;
Ok(())
}

/// Synchronize partition table asynchronously
fn sync_partition_table(&mut self) -> Result<(), Error> {
fn sync_partition_table(&mut self, next_version: Version) -> Result<(), Error> {
if self
.inflight_sync_task
.as_ref()
Expand All @@ -211,11 +238,11 @@ impl<T: TransportConnect> Scheduler<T> {
let task = TaskCenter::spawn_unmanaged(
TaskKind::Disposable,
"scheduler-sync-partition-table",
async {
async move {
let cancelled = cancellation_watcher();
let metadata = Metadata::current();
tokio::select! {
result = metadata.sync(MetadataKind::PartitionTable, TargetVersion::Latest) => {
result = metadata.sync(MetadataKind::PartitionTable, TargetVersion::Version(next_version)) => {
if let Err(err) = result {
debug!("Failed to sync partition table metadata: {err}");
}
Expand Down
36 changes: 27 additions & 9 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,15 +319,17 @@ impl<T: TransportConnect> Service<T> {
},
Ok(cluster_state) = cluster_state_watcher.next_cluster_state() => {
self.observed_cluster_state.update(&cluster_state);
// todo: potentially downgrade to trace
debug!("Observed cluster state updated");
// todo quarantine this cluster controller if errors re-occur too often so that
// another cluster controller can take over
if let Err(err) = state.update(&self).await {
warn!("Failed to update cluster state. This can impair the overall cluster operations: {}", err);
if let Err(err) = state.update(&self) {
warn!(%err, "Failed to update cluster state. This can impair the overall cluster operations");
continue;
}

if let Err(err) = state.on_observed_cluster_state(&self.observed_cluster_state).await {
warn!("Failed to handle observed cluster state. This can impair the overall cluster operations: {}", err);
warn!(%err, "Failed to handle observed cluster state. This can impair the overall cluster operations");
}
}
Some(cmd) = self.command_rx.recv() => {
Expand All @@ -344,13 +346,19 @@ impl<T: TransportConnect> Service<T> {
let leader_event = match result {
Ok(leader_event) => leader_event,
Err(err) => {
warn!("Failed to run cluster controller operations. This can impair the overall cluster operations: {}", err);
warn!(
%err,
"Failed to run cluster controller operations. This can impair the overall cluster operations"
);
continue;
}
};

if let Err(err) = state.on_leader_event(&self.observed_cluster_state, leader_event).await {
warn!("Failed to handle leader event. This can impair the overall cluster operations: {}", err);
warn!(
%err,
"Failed to handle leader event. This can impair the overall cluster operations"
);
}
}
}
Expand Down Expand Up @@ -550,10 +558,20 @@ impl<T: TransportConnect> Service<T> {
default_provider,
response_tx,
} => {
let result = self
.update_cluster_configuration(replication_strategy, default_provider)
.await;
let _ = response_tx.send(result);
match tokio::time::timeout(
Duration::from_secs(2),
self.update_cluster_configuration(replication_strategy, default_provider),
)
.await
{
Ok(result) => {
let _ = response_tx.send(result);
}
Err(_timeout) => {
let _ =
response_tx.send(Err(anyhow!("Timeout on writing to metadata store")));
}
}
}
ClusterControllerCommand::SealAndExtendChain {
log_id,
Expand Down
12 changes: 7 additions & 5 deletions crates/admin/src/cluster_controller/service/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl<T> ClusterControllerState<T>
where
T: TransportConnect,
{
pub async fn update(&mut self, service: &Service<T>) -> anyhow::Result<()> {
pub fn update(&mut self, service: &Service<T>) -> anyhow::Result<()> {
let maybe_leader = {
let nodes_config = Metadata::with_current(|m| m.nodes_config_ref());
nodes_config
Expand All @@ -64,7 +64,6 @@ where

// A Cluster Controller is a leader if the node holds the smallest PlainNodeID
// If no other node was found to take leadership, we assume leadership

let is_leader = match maybe_leader {
None => true,
Some(leader) => leader == my_node_id(),
Expand All @@ -77,10 +76,13 @@ where
}
(true, ClusterControllerState::Follower) => {
info!("Cluster controller switching to leader mode");
*self = ClusterControllerState::Leader(Leader::from_service(service).await?);
*self = ClusterControllerState::Leader(Leader::from_service(service)?);
}
(false, ClusterControllerState::Leader(_)) => {
info!("Cluster controller switching to follower mode");
info!(
"Cluster controller switching to follower mode, I think the leader is {}",
maybe_leader.expect("a leader must be identified"),
);
*self = ClusterControllerState::Follower;
}
};
Expand Down Expand Up @@ -159,7 +161,7 @@ impl<T> Leader<T>
where
T: TransportConnect,
{
async fn from_service(service: &Service<T>) -> anyhow::Result<Leader<T>> {
fn from_service(service: &Service<T>) -> anyhow::Result<Leader<T>> {
let configuration = service.configuration.pinned();

let scheduler = Scheduler::new(service.metadata_writer.clone(), service.networking.clone());
Expand Down

0 comments on commit 4761e69

Please sign in to comment.