Skip to content

Commit

Permalink
Harden restatectl reconfigure and avoid sending full LogletParams
Browse files Browse the repository at this point in the history
Summary:
- Arguments provided to this command (in case of replicated provider) act as "hints" to
the node to consider during replication. It still possible that the node decides to
use more nodes or different than the provided nodeset
- `sequencer` can also be a PlainNodeId
- Most arguments are optional in case previous loglet is a replicated loglet, Otherwise the user needs
  to provide most arguments
  • Loading branch information
muhamadazmy committed Feb 19, 2025
1 parent 739b130 commit 0a0d51a
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 76 deletions.
10 changes: 10 additions & 0 deletions crates/admin/protobuf/cluster_ctrl_svc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,17 @@ message ChainExtension {
optional uint32 segment_index = 2;
// check `ProviderKind` for possible values.
string provider = 4;

// [deprecated] We return an error if this
// field is set.
string params = 5;

// replication_property is required if provider=="replicated"
optional restate.cluster.ReplicationProperty replication = 6;
// optional if provider=="replicated", otherwise ignored
optional restate.common.NodeId sequencer = 7;
// optional if provider="replicated", otherwise ignored
repeated restate.common.NodeId nodeset = 8;
}

message SealAndExtendChainRequest {
Expand Down
46 changes: 36 additions & 10 deletions crates/admin/src/cluster_controller/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use restate_types::net::partition_processor_manager::Snapshot;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::protobuf::cluster::ClusterConfiguration;
use restate_types::storage::{StorageCodec, StorageEncode};
use restate_types::{Version, Versioned};
use restate_types::{PlainNodeId, Version, Versioned};

use crate::cluster_controller::protobuf::cluster_ctrl_svc_server::ClusterCtrlSvc;
use crate::cluster_controller::protobuf::{
Expand Down Expand Up @@ -199,15 +199,41 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {
let request = request.into_inner();

let extension = match request.extension {
Some(ext) => Some(ChainExtension {
segment_index_to_seal: ext.segment_index.map(SegmentIndex::from),

provider_kind: ext
.provider
.parse()
.map_err(|_| Status::invalid_argument("Provider type is not supported"))?,
params: ext.params.into(),
}),
Some(ext) => {
if !ext.params.is_empty() {
// `params`` is no longer supported. It's better to fail loudly
// then act unexpectedly on invalid request version
return Err(Status::invalid_argument(
"Detected a deprecated argument. Please upgrade to the latest version of the command-line tool to ensure compatibility.",
));
}

Some(ChainExtension {
segment_index_to_seal: ext.segment_index.map(SegmentIndex::from),

provider_kind: ext
.provider
.parse()
.map_err(|_| Status::invalid_argument("Provider type is not supported"))?,

nodeset: if !ext.nodeset.is_empty() {
Some(
ext.nodeset
.iter()
.map(|node_id| PlainNodeId::new(node_id.id))
.collect(),
)
} else {
None
},
sequencer: ext.sequencer.map(Into::into),
replication: ext
.replication
.map(|p| p.try_into())
.transpose()
.map_err(|_| Status::invalid_argument("Invalid replication property"))?,
})
}
None => None,
};

Expand Down
20 changes: 9 additions & 11 deletions crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY;
use restate_types::nodes_config::{NodeConfig, NodesConfiguration, StorageState};
use restate_types::partition_table::PartitionTable;
use restate_types::replicated_loglet::{EffectiveNodeSet, ReplicatedLogletParams};
use restate_types::replication::{NodeSetSelector, NodeSetSelectorOptions};
use restate_types::replication::{NodeSet, NodeSetSelector, NodeSetSelectorOptions};
use restate_types::retries::{RetryIter, RetryPolicy};
use restate_types::{logs, GenerationalNodeId, NodeId, PlainNodeId, Version, Versioned};

Expand Down Expand Up @@ -374,7 +374,7 @@ pub fn build_new_replicated_loglet_configuration(
loglet_id: LogletId,
nodes_config: &NodesConfiguration,
observed_cluster_state: &ObservedClusterState,
previous_params: Option<&ReplicatedLogletParams>,
preferred_nodes: Option<&NodeSet>,
preferred_sequencer: Option<NodeId>,
) -> Option<ReplicatedLogletParams> {
use restate_types::replication::{NodeSetSelector, NodeSetSelectorOptions};
Expand All @@ -384,8 +384,6 @@ pub fn build_new_replicated_loglet_configuration(

let replication = replicated_loglet_config.replication_property.clone();

let preferred_nodes = previous_params.map(|p| &p.nodeset);

let &sequencer = preferred_sequencer
.and_then(|node_id| {
// map to a known alive node
Expand Down Expand Up @@ -599,7 +597,7 @@ impl LogletConfiguration {
next_loglet_id,
&Metadata::with_current(|m| m.nodes_config_ref()),
observed_cluster_state,
previous_params,
previous_params.map(|params| &params.nodeset),
preferred_sequencer,
)
.map(LogletConfiguration::Replicated)
Expand Down Expand Up @@ -1638,7 +1636,7 @@ pub mod tests {
seq_n0.loglet_id,
&nodes.nodes_config,
&nodes.observed_state,
Some(&seq_n0),
Some(&seq_n0.nodeset),
Some(seq_n0.sequencer.into()),
)
.unwrap();
Expand Down Expand Up @@ -1669,7 +1667,7 @@ pub mod tests {
seq_n0.loglet_id,
&nodes.nodes_config,
&nodes.observed_state,
Some(&seq_n0),
Some(&seq_n0.nodeset),
Some(seq_n0.sequencer.into()),
)
.unwrap();
Expand Down Expand Up @@ -1730,7 +1728,7 @@ pub mod tests {
initial.loglet_id,
&nodes.nodes_config,
&nodes.observed_state,
Some(&initial),
Some(&initial.nodeset),
Some(initial.sequencer.into()),
)
.unwrap();
Expand All @@ -1755,7 +1753,7 @@ pub mod tests {
config.loglet_id,
&nodes.nodes_config,
&nodes.observed_state,
Some(&config),
Some(&config.nodeset),
Some(config.sequencer.into()),
)
.unwrap();
Expand All @@ -1773,7 +1771,7 @@ pub mod tests {
config.loglet_id,
&nodes.nodes_config,
&nodes.observed_state,
Some(&config),
Some(&config.nodeset),
Some(NodeId::Generational(config.sequencer)),
)
.unwrap();
Expand Down Expand Up @@ -1802,7 +1800,7 @@ pub mod tests {
config.loglet_id,
&nodes.nodes_config,
&nodes.observed_state,
Some(&config),
Some(&config.nodeset),
Some(config.sequencer.into()),
)
.unwrap();
Expand Down
70 changes: 55 additions & 15 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::time::Duration;
use anyhow::{anyhow, Context};
use codederror::CodedError;
use futures::never::Never;
use restate_types::replication::{NodeSet, ReplicationProperty};
use tokio::sync::{mpsc, oneshot};
use tokio::time;
use tokio::time::{Instant, Interval, MissedTickBehavior};
Expand All @@ -24,7 +25,8 @@ use tracing::{debug, info, trace, warn};

use restate_metadata_server::ReadModifyWriteError;
use restate_types::logs::metadata::{
LogletParams, Logs, LogsConfiguration, ProviderConfiguration, ProviderKind, SegmentIndex,
LogletParams, Logs, LogsConfiguration, ProviderConfiguration, ProviderKind,
ReplicatedLogletConfig, SegmentIndex,
};
use restate_types::metadata_store::keys::{BIFROST_CONFIG_KEY, PARTITION_TABLE_KEY};
use restate_types::partition_table::{
Expand All @@ -51,7 +53,7 @@ use restate_types::logs::{LogId, LogletId, Lsn};
use restate_types::net::metadata::MetadataKind;
use restate_types::net::partition_processor_manager::{CreateSnapshotRequest, Snapshot};
use restate_types::protobuf::common::AdminStatus;
use restate_types::{GenerationalNodeId, Version};
use restate_types::{GenerationalNodeId, NodeId, Version};

use self::state::ClusterControllerState;
use super::cluster_state_refresher::ClusterStateRefresher;
Expand Down Expand Up @@ -156,7 +158,9 @@ pub struct ChainExtension {
/// Segment index to seal. Last if None
pub segment_index_to_seal: Option<SegmentIndex>,
pub provider_kind: ProviderKind,
pub params: LogletParams,
pub nodeset: Option<NodeSet>,
pub sequencer: Option<NodeId>,
pub replication: Option<ReplicationProperty>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -676,16 +680,13 @@ struct SealAndExtendTask {
}

impl SealAndExtendTask {
async fn run(mut self) -> anyhow::Result<SealedSegment> {
async fn run(self) -> anyhow::Result<SealedSegment> {
let last_segment_index = self
.extension
.as_ref()
.and_then(|ext| ext.segment_index_to_seal);

let (provider, params) = match self.extension.take() {
Some(extension) => (extension.provider_kind, extension.params),
None => self.next_segment().await?,
};
let (provider, params) = self.next_segment()?;

let sealed_segment = self
.bifrost
Expand All @@ -702,7 +703,7 @@ impl SealAndExtendTask {
Ok(sealed_segment)
}

async fn next_segment(&self) -> anyhow::Result<(ProviderKind, LogletParams)> {
fn next_segment(&self) -> anyhow::Result<(ProviderKind, LogletParams)> {
let logs = Metadata::with_current(|m| m.logs_ref());

let segment = logs
Expand All @@ -721,7 +722,49 @@ impl SealAndExtendTask {
None
};

let (provider, params) = match &logs.configuration().default_provider {
// override the provider configuration, if extension is set.
let provider_config = match &self.extension {
None => logs.configuration().default_provider.clone(),
Some(ext) => match ext.provider_kind {
#[cfg(any(test, feature = "memory-loglet"))]
ProviderKind::InMemory => ProviderConfiguration::InMemory,
ProviderKind::Local => ProviderConfiguration::Local,
ProviderKind::Replicated => {
ProviderConfiguration::Replicated(ReplicatedLogletConfig {
replication_property: ext
.replication
.clone()
.ok_or_else(|| anyhow::anyhow!("replication property is required"))?,
// use the provided nodeset size or 0
target_nodeset_size: ext
.nodeset
.as_ref()
.map(|n| n.len() as u16)
.unwrap_or_default()
.try_into()?,
})
}
},
};

let preferred_nodes = self
.extension
.as_ref()
.and_then(|ext| ext.nodeset.as_ref())
.or_else(|| previous_params.as_ref().map(|params| &params.nodeset));

let preferred_sequencer = self
.extension
.as_ref()
.and_then(|ext| ext.sequencer)
.or_else(|| {
Metadata::with_current(|m| {
PartitionTableNodeSetSelectorHints::from(m.partition_table_snapshot())
})
.preferred_sequencer(&self.log_id)
});

let (provider, params) = match &provider_config {
#[cfg(any(test, feature = "memory-loglet"))]
ProviderConfiguration::InMemory => (
ProviderKind::InMemory,
Expand All @@ -739,11 +782,8 @@ impl SealAndExtendTask {
next_loglet_id,
&Metadata::with_current(|m| m.nodes_config_ref()),
&self.observed_cluster_state,
previous_params.as_ref(),
Metadata::with_current(|m| {
PartitionTableNodeSetSelectorHints::from(m.partition_table_snapshot())
})
.preferred_sequencer(&self.log_id),
preferred_nodes,
preferred_sequencer,
)
.ok_or_else(|| anyhow::anyhow!("Insufficient writeable nodes in the nodeset"))?;

Expand Down
Loading

0 comments on commit 0a0d51a

Please sign in to comment.