Skip to content

Commit

Permalink
Make restatectl cluster provision and cluster config set more flexible
Browse files Browse the repository at this point in the history
This commit makes the cluster provision and cluster config set command more
flexible. It now allows to specify only a partial set of values and it will
take the remaining values from the current cluster configuration/default
values.

Additionally, this commit introduces the new type NodeSetSize to enforce the
upper limit of 128 across the configuration and API calls.

This fixes #2604 and #2561.
  • Loading branch information
tillrohrmann committed Feb 4, 2025
1 parent e5e3c78 commit 024bf2b
Show file tree
Hide file tree
Showing 14 changed files with 239 additions and 146 deletions.
16 changes: 0 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,7 @@ pub mod tests {

use restate_types::locality::NodeLocation;
use restate_types::logs::metadata::{
LogsConfiguration, ProviderConfiguration, ReplicatedLogletConfig,
LogsConfiguration, NodeSetSize, ProviderConfiguration, ReplicatedLogletConfig,
};
use restate_types::logs::{LogId, LogletId};
use restate_types::nodes_config::{
Expand Down Expand Up @@ -1539,7 +1539,7 @@ pub mod tests {
fn logs_configuration(replication_factor: u8) -> LogsConfiguration {
LogsConfiguration {
default_provider: ProviderConfiguration::Replicated(ReplicatedLogletConfig {
target_nodeset_size: 0,
target_nodeset_size: NodeSetSize::ZERO,
replication_property: ReplicationProperty::new(
NonZeroU8::new(replication_factor).expect("must be non zero"),
),
Expand Down
8 changes: 7 additions & 1 deletion crates/core/protobuf/node_ctl_svc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,13 @@ message ProvisionClusterRequest {
// it's limited to the provided replication property
optional restate.cluster.ReplicationProperty partition_replication = 3;
// if unset then the configured cluster default log provider will be used
optional restate.cluster.BifrostProvider log_provider = 4;
optional string log_provider = 4;
// only used if provider = "replicated"
// if unset then the configured cluster default log replication will be used
optional restate.cluster.ReplicationProperty log_replication = 5;
// only used if provider = "replicated"
// if unset then the configured cluster default target nodeset size will be used
optional uint32 target_nodeset_size = 6;
}

message ProvisionClusterResponse {
Expand Down
15 changes: 13 additions & 2 deletions crates/local-cluster-runner/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ impl StartedNode {
&self,
num_partitions: Option<NonZeroU16>,
partition_replication: PartitionReplication,
log_provider: Option<ProviderConfiguration>,
provider_configuration: Option<ProviderConfiguration>,
) -> anyhow::Result<bool> {
let channel = create_tonic_channel(
self.node_address().clone(),
Expand All @@ -809,7 +809,18 @@ impl StartedNode {
dry_run: false,
num_partitions: num_partitions.map(|num| u32::from(num.get())),
partition_replication: partition_replication.into(),
log_provider: log_provider.map(|log_provider| log_provider.into()),
log_provider: provider_configuration
.as_ref()
.map(|config| config.kind().to_string()),
log_replication: provider_configuration
.as_ref()
.and_then(|config| config.replication().cloned())
.map(Into::into),
target_nodeset_size: provider_configuration.as_ref().and_then(|config| {
config
.target_nodeset_size()
.map(|nodeset_size| nodeset_size.as_u32())
}),
};

let retry_policy = RetryPolicy::exponential(
Expand Down
17 changes: 0 additions & 17 deletions crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ workspace-hack = { version = "0.1", path = "../../workspace-hack" }
restate-admin = { workspace = true }
restate-bifrost = { workspace = true }
restate-core = { workspace = true }
restate-errors = { workspace = true }
restate-ingress-http = { workspace = true }
restate-log-server = { workspace = true }
restate-metadata-server = { workspace = true }
Expand All @@ -36,52 +35,36 @@ restate-types = { workspace = true, features = ["clap"] }
restate-worker = { workspace = true }

anyhow = { workspace = true }
arc-swap = { workspace = true }
async-trait = { workspace = true }
axum = { workspace = true }
bytes = { workspace = true }
bytestring = { workspace = true }
codederror = { workspace = true }
datafusion = { workspace = true }
derive_builder = { workspace = true }
enum-map = { workspace = true }
enumset = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
http-body = { workspace = true }
humantime = { workspace = true }
hyper = { workspace = true, features = ["full"] }
hyper-util = { workspace = true }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
metrics-tracing-context = { workspace = true }
metrics-util = { workspace = true }
prost-dto = { workspace = true }
prost-types = { workspace = true }
rocksdb = { workspace = true }
schemars = { workspace = true, optional = true }
semver = { version = "1.0", features = ["serde"] }
serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
strum = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
tonic = { workspace = true }
tonic-reflection = { workspace = true }
tower = { workspace = true }
tower-http = { workspace = true, features = ["trace"] }
tracing = { workspace = true }

[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemalloc_pprof = "0.6.0"

[dev-dependencies]
restate-admin = { workspace = true, features = ["memory-loglet"] }
restate-core = { workspace = true, features = ["test-util"] }
restate-test-util = { workspace = true }

googletest = { workspace = true }
tempfile = { workspace = true }
Expand Down
32 changes: 27 additions & 5 deletions crates/node/src/network_server/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ use restate_core::{task_center, Metadata, MetadataKind, TargetVersion};
use restate_metadata_server::grpc::metadata_server_svc_client::MetadataServerSvcClient;
use restate_types::config::Configuration;
use restate_types::health::Health;
use restate_types::logs::metadata::ProviderConfiguration;
use restate_types::logs::metadata::{NodeSetSize, ProviderConfiguration};
use restate_types::nodes_config::Role;
use restate_types::protobuf::cluster::ClusterConfiguration as ProtoClusterConfiguration;
use restate_types::protobuf::node::Message;
use restate_types::replication::ReplicationProperty;
use restate_types::storage::StorageCodec;
use restate_types::Version;
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -82,15 +83,36 @@ impl NodeCtlSvcHandler {
.transpose()?
.unwrap_or(config.common.bootstrap_num_partitions);
let partition_replication = request.partition_replication.try_into()?;
let bifrost_provider = request

let log_provider = request
.log_provider
.map(ProviderConfiguration::try_from)
.unwrap_or_else(|| Ok(ProviderConfiguration::from_configuration(config)))?;
.map(|log_provider| log_provider.parse())
.transpose()?
.unwrap_or(config.bifrost.default_provider);
let target_nodeset_size = request
.target_nodeset_size
.map(NodeSetSize::try_from)
.transpose()?
.unwrap_or(config.bifrost.replicated_loglet.default_nodeset_size);
let log_replication = request
.log_replication
.map(ReplicationProperty::try_from)
.transpose()?
.unwrap_or_else(|| {
config
.bifrost
.replicated_loglet
.default_replication_property
.clone()
});

let provider_configuration =
ProviderConfiguration::from((log_provider, log_replication, target_nodeset_size));

Ok(ClusterConfiguration {
num_partitions,
partition_replication,
bifrost_provider,
bifrost_provider: provider_configuration,
})
}
}
Expand Down
12 changes: 6 additions & 6 deletions crates/types/src/config/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use serde_with::{serde_as, DeserializeAs};
use restate_serde_util::{ByteCount, NonZeroByteCount};
use tracing::warn;

use crate::logs::metadata::ProviderKind;
use crate::logs::metadata::{NodeSetSize, ProviderKind};
use crate::replication::ReplicationProperty;
use crate::retries::RetryPolicy;

Expand Down Expand Up @@ -296,10 +296,10 @@ pub struct ReplicatedLogletOptions {
/// nodesets for logs. Setting this to 0 will let the system choose a reasonable value based on
/// the effective replication_property at the time of logs reconfiguration.
// hide the configuration option from serialization if it is the default
#[serde(default, skip_serializing_if = "u16_is_zero")]
#[serde(default, skip_serializing_if = "nodeset_size_is_zero")]
// hide the configuration option by excluding it from the Json schema
#[cfg_attr(feature = "schemars", schemars(skip))]
pub default_nodeset_size: u16,
pub default_nodeset_size: NodeSetSize,
}

/// Helper struct to support deserializing the ReplicationProperty from a [`NonZeroU8`].
Expand All @@ -314,8 +314,8 @@ impl<'de> DeserializeAs<'de, ReplicationProperty> for ReplicationPropertyFromNon
}
}

fn u16_is_zero(i: &u16) -> bool {
*i == 0
fn nodeset_size_is_zero(i: &NodeSetSize) -> bool {
*i == NodeSetSize::ZERO
}

fn default_replication_property() -> ReplicationProperty {
Expand Down Expand Up @@ -347,7 +347,7 @@ impl Default for ReplicatedLogletOptions {
readahead_records: NonZeroUsize::new(100).unwrap(),
readahead_trigger_ratio: 0.5,
default_replication_property: default_replication_property(),
default_nodeset_size: 0,
default_nodeset_size: NodeSetSize::ZERO,
}
}
}
Loading

0 comments on commit 024bf2b

Please sign in to comment.