Skip to content

Commit

Permalink
Harden restatectl reconfigure and avoid sending full LogletParams
Browse files Browse the repository at this point in the history
Summary:
- Arguments provided to this command (in case of replicated provider) act as "hints" to
the node to consider during replication. It still possible that the node decides to
use more nodes or different than the provided nodeset
- `sequencer` can also be a PlainNodeId
- Most arguments are optional in case previous loglet is a replicated loglet, Otherwise the user needs
  to provide most arguments
  • Loading branch information
muhamadazmy committed Feb 14, 2025
1 parent 350d242 commit 50a597a
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 69 deletions.
9 changes: 8 additions & 1 deletion crates/admin/protobuf/cluster_ctrl_svc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,19 @@ message CreatePartitionSnapshotRequest { uint32 partition_id = 1; }
message CreatePartitionSnapshotResponse { string snapshot_id = 1; }

message ChainExtension {
reserved 5;
// segment_index will be automatically selected (to the index of last segment)
// if not set.
optional uint32 segment_index = 2;
// check `ProviderKind` for possible values.
string provider = 4;
string params = 5;

// replication_property is required if provider=="replicated"
optional restate.cluster.ReplicationProperty replication = 6;
// optional if provider=="replicated", otherwise ignored
optional restate.common.NodeId sequencer = 7;
// optional if provider="replicated", otherwise ignored
repeated restate.common.NodeId nodeset = 8;
}

message SealAndExtendChainRequest {
Expand Down
20 changes: 18 additions & 2 deletions crates/admin/src/cluster_controller/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use restate_types::metadata_store::keys::{BIFROST_CONFIG_KEY, NODES_CONFIG_KEY};
use restate_types::nodes_config::NodesConfiguration;
use restate_types::protobuf::cluster::ClusterConfiguration;
use restate_types::storage::{StorageCodec, StorageEncode};
use restate_types::{Version, Versioned};
use restate_types::{PlainNodeId, Version, Versioned};

use crate::cluster_controller::protobuf::cluster_ctrl_svc_server::ClusterCtrlSvc;
use crate::cluster_controller::protobuf::{
Expand Down Expand Up @@ -199,7 +199,23 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {
.provider
.parse()
.map_err(|_| Status::invalid_argument("Provider type is not supported"))?,
params: ext.params.into(),

nodeset: if !ext.nodeset.is_empty() {
Some(
ext.nodeset
.iter()
.map(|node_id| PlainNodeId::new(node_id.id))
.collect(),
)
} else {
None
},
sequencer: ext.sequencer.map(Into::into),
replication: ext
.replication
.map(|p| p.try_into())
.transpose()
.map_err(|_| Status::invalid_argument("Invalid replication property"))?,
}),
None => None,
};
Expand Down
20 changes: 9 additions & 11 deletions crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY;
use restate_types::nodes_config::{NodeConfig, NodesConfiguration, StorageState};
use restate_types::partition_table::PartitionTable;
use restate_types::replicated_loglet::{EffectiveNodeSet, ReplicatedLogletParams};
use restate_types::replication::{NodeSetSelector, NodeSetSelectorOptions};
use restate_types::replication::{NodeSet, NodeSetSelector, NodeSetSelectorOptions};
use restate_types::retries::{RetryIter, RetryPolicy};
use restate_types::{logs, GenerationalNodeId, NodeId, PlainNodeId, Version, Versioned};

Expand Down Expand Up @@ -374,7 +374,7 @@ pub fn build_new_replicated_loglet_configuration(
loglet_id: LogletId,
nodes_config: &NodesConfiguration,
observed_cluster_state: &ObservedClusterState,
previous_params: Option<&ReplicatedLogletParams>,
preferred_nodes: Option<&NodeSet>,
preferred_sequencer: Option<NodeId>,
) -> Option<ReplicatedLogletParams> {
use restate_types::replication::{NodeSetSelector, NodeSetSelectorOptions};
Expand All @@ -384,8 +384,6 @@ pub fn build_new_replicated_loglet_configuration(

let replication = replicated_loglet_config.replication_property.clone();

let preferred_nodes = previous_params.map(|p| &p.nodeset);

let &sequencer = preferred_sequencer
.and_then(|node_id| {
// map to a known alive node
Expand Down Expand Up @@ -599,7 +597,7 @@ impl LogletConfiguration {
next_loglet_id,
&Metadata::with_current(|m| m.nodes_config_ref()),
observed_cluster_state,
previous_params,
previous_params.map(|params| &params.nodeset),
preferred_sequencer,
)
.map(LogletConfiguration::Replicated)
Expand Down Expand Up @@ -1638,7 +1636,7 @@ pub mod tests {
seq_n0.loglet_id,
&nodes.nodes_config,
&nodes.observed_state,
Some(&seq_n0),
Some(&seq_n0.nodeset),
Some(seq_n0.sequencer.into()),
)
.unwrap();
Expand Down Expand Up @@ -1669,7 +1667,7 @@ pub mod tests {
seq_n0.loglet_id,
&nodes.nodes_config,
&nodes.observed_state,
Some(&seq_n0),
Some(&seq_n0.nodeset),
Some(seq_n0.sequencer.into()),
)
.unwrap();
Expand Down Expand Up @@ -1730,7 +1728,7 @@ pub mod tests {
initial.loglet_id,
&nodes.nodes_config,
&nodes.observed_state,
Some(&initial),
Some(&initial.nodeset),
Some(initial.sequencer.into()),
)
.unwrap();
Expand All @@ -1755,7 +1753,7 @@ pub mod tests {
config.loglet_id,
&nodes.nodes_config,
&nodes.observed_state,
Some(&config),
Some(&config.nodeset),
Some(config.sequencer.into()),
)
.unwrap();
Expand All @@ -1773,7 +1771,7 @@ pub mod tests {
config.loglet_id,
&nodes.nodes_config,
&nodes.observed_state,
Some(&config),
Some(&config.nodeset),
Some(NodeId::Generational(config.sequencer)),
)
.unwrap();
Expand Down Expand Up @@ -1802,7 +1800,7 @@ pub mod tests {
config.loglet_id,
&nodes.nodes_config,
&nodes.observed_state,
Some(&config),
Some(&config.nodeset),
Some(config.sequencer.into()),
)
.unwrap();
Expand Down
71 changes: 56 additions & 15 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::time::Duration;
use anyhow::{anyhow, Context};
use codederror::CodedError;
use futures::never::Never;
use restate_types::replication::{NodeSet, ReplicationProperty};
use tokio::sync::{mpsc, oneshot};
use tokio::time;
use tokio::time::{Instant, Interval, MissedTickBehavior};
Expand All @@ -24,7 +25,8 @@ use tracing::{debug, info, trace, warn};

use restate_metadata_server::ReadModifyWriteError;
use restate_types::logs::metadata::{
LogletParams, Logs, LogsConfiguration, ProviderConfiguration, ProviderKind, SegmentIndex,
LogletParams, Logs, LogsConfiguration, ProviderConfiguration, ProviderKind,
ReplicatedLogletConfig, SegmentIndex,
};
use restate_types::metadata_store::keys::{BIFROST_CONFIG_KEY, PARTITION_TABLE_KEY};
use restate_types::partition_table::{
Expand All @@ -51,7 +53,7 @@ use restate_types::logs::{LogId, LogletId, Lsn};
use restate_types::net::metadata::MetadataKind;
use restate_types::net::partition_processor_manager::CreateSnapshotRequest;
use restate_types::protobuf::common::AdminStatus;
use restate_types::{GenerationalNodeId, Version};
use restate_types::{GenerationalNodeId, NodeId, Version};

use self::state::ClusterControllerState;
use super::cluster_state_refresher::ClusterStateRefresher;
Expand Down Expand Up @@ -156,7 +158,9 @@ pub struct ChainExtension {
/// Segment index to seal. Last if None
pub segment_index_to_seal: Option<SegmentIndex>,
pub provider_kind: ProviderKind,
pub params: LogletParams,
pub nodeset: Option<NodeSet>,
pub sequencer: Option<NodeId>,
pub replication: Option<ReplicationProperty>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -676,16 +680,13 @@ struct SealAndExtendTask {
}

impl SealAndExtendTask {
async fn run(mut self) -> anyhow::Result<SealedSegment> {
async fn run(self) -> anyhow::Result<SealedSegment> {
let last_segment_index = self
.extension
.as_ref()
.and_then(|ext| ext.segment_index_to_seal);

let (provider, params) = match self.extension.take() {
Some(extension) => (extension.provider_kind, extension.params),
None => self.next_segment().await?,
};
let (provider, params) = self.next_segment()?;

let sealed_segment = self
.bifrost
Expand All @@ -702,7 +703,7 @@ impl SealAndExtendTask {
Ok(sealed_segment)
}

async fn next_segment(&self) -> anyhow::Result<(ProviderKind, LogletParams)> {
fn next_segment(&self) -> anyhow::Result<(ProviderKind, LogletParams)> {
let logs = Metadata::with_current(|m| m.logs_ref());

let segment = logs
Expand All @@ -721,7 +722,50 @@ impl SealAndExtendTask {
None
};

let (provider, params) = match &logs.configuration().default_provider {
// override the provider configuration, if extension is set.
let provider_config = match &self.extension {
None => logs.configuration().default_provider.clone(),
Some(ext) => match ext.provider_kind {
#[cfg(any(test, feature = "memory-loglet"))]
ProviderKind::InMemory => ProviderConfiguration::InMemory,
ProviderKind::Local => ProviderConfiguration::Local,
ProviderKind::Replicated => {
ProviderConfiguration::Replicated(ReplicatedLogletConfig {
replication_property: ext
.replication
.clone()
.ok_or_else(|| anyhow::anyhow!("replication property is required"))?,
// use the provided nodeset size or 0
target_nodeset_size: ext
.nodeset
.as_ref()
.map(|n| n.len() as u16)
.unwrap_or_default()
.try_into()?,
})
}
},
};

let preferred_nodes = self
.extension
.as_ref()
.and_then(|ext| ext.nodeset.as_ref())
.or_else(|| previous_params.as_ref().map(|params| &params.nodeset));

let preferred_sequencer = self
.extension
.as_ref()
.and_then(|ext| ext.sequencer)
.or_else(|| {
Metadata::with_current(|m| {
PartitionTableNodeSetSelectorHints::from(m.partition_table_snapshot())
})
.preferred_sequencer(&self.log_id)
});

// let provider_kind = self.extension.unwrap(log.con)
let (provider, params) = match &provider_config {
#[cfg(any(test, feature = "memory-loglet"))]
ProviderConfiguration::InMemory => (
ProviderKind::InMemory,
Expand All @@ -739,11 +783,8 @@ impl SealAndExtendTask {
next_loglet_id,
&Metadata::with_current(|m| m.nodes_config_ref()),
&self.observed_cluster_state,
previous_params.as_ref(),
Metadata::with_current(|m| {
PartitionTableNodeSetSelectorHints::from(m.partition_table_snapshot())
})
.preferred_sequencer(&self.log_id),
preferred_nodes,
preferred_sequencer,
)
.ok_or_else(|| anyhow::anyhow!("Insufficient writeable nodes in the nodeset"))?;

Expand Down
Loading

0 comments on commit 50a597a

Please sign in to comment.