Skip to content

Commit

Permalink
updateable nodes configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Jun 27, 2024
1 parent 471b4ae commit 125bea5
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 21 deletions.
3 changes: 1 addition & 2 deletions crates/core/src/metadata/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,7 @@ where
}

fn update_nodes_configuration(&mut self, config: NodesConfiguration) {
let maybe_new_version =
Self::update_option_internal(&self.metadata.inner.nodes_config, config);
let maybe_new_version = Self::update_internal(&self.metadata.inner.nodes_config, config);

self.notify_watches(maybe_new_version, MetadataKind::NodesConfiguration);
}
Expand Down
32 changes: 23 additions & 9 deletions crates/core/src/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ use crate::metadata_store::ReadError;
use crate::network::NetworkSender;
use crate::{ShutdownError, TaskCenter, TaskId, TaskKind};

#[derive(Clone, derive_more::From)]
pub struct UpdateableNodesConfiguration(Arc<ArcSwap<NodesConfiguration>>);

#[derive(Debug, thiserror::Error)]
pub enum SyncError {
#[error("failed syncing with metadata store: {0}")]
Expand Down Expand Up @@ -74,9 +77,12 @@ pub struct Metadata {

impl Metadata {
/// Panics if nodes configuration is not loaded yet.
#[track_caller]
pub fn nodes_config(&self) -> Arc<NodesConfiguration> {
self.inner.nodes_config.load_full().unwrap()
self.inner.nodes_config.load_full()
}

pub fn updateable_nodes_config(&self) -> UpdateableNodesConfiguration {
UpdateableNodesConfiguration::from(self.inner.nodes_config.clone())
}

#[track_caller]
Expand All @@ -86,11 +92,7 @@ impl Metadata {

/// Returns Version::INVALID if nodes configuration has not been loaded yet.
pub fn nodes_config_version(&self) -> Version {
let c = self.inner.nodes_config.load();
match c.as_deref() {
Some(c) => c.version(),
None => Version::INVALID,
}
self.inner.nodes_config.load().version()
}

pub fn partition_table(&self) -> Option<Arc<FixedPartitionTable>> {
Expand Down Expand Up @@ -178,16 +180,28 @@ impl Metadata {
}
}

#[derive(Default)]
struct MetadataInner {
my_node_id: OnceLock<GenerationalNodeId>,
nodes_config: ArcSwapOption<NodesConfiguration>,
nodes_config: Arc<ArcSwap<NodesConfiguration>>,
partition_table: ArcSwapOption<FixedPartitionTable>,
logs: ArcSwapOption<Logs>,
schema: Arc<ArcSwap<Schema>>,
write_watches: EnumMap<MetadataKind, VersionWatch>,
}

impl Default for MetadataInner {

Check failure on line 192 in crates/core/src/metadata/mod.rs

View workflow job for this annotation

GitHub Actions / Build and test (ubuntu-22.04)

this `impl` can be derived
fn default() -> Self {
Self {
my_node_id: Default::default(),
nodes_config: Default::default(),
partition_table: Default::default(),
logs: Default::default(),
schema: Default::default(),
write_watches: Default::default(),
}
}
}

/// Can send updates to metadata manager. This should be accessible by the rpc handler layer to
/// handle incoming metadata updates from the network, or to handle updates coming from metadata
/// service if it's running on this node. MetadataManager ensures that writes are monotonic
Expand Down
15 changes: 11 additions & 4 deletions crates/types/src/nodes_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,14 @@
#![allow(dead_code)]

use std::collections::HashMap;
use std::sync::Arc;

use arc_swap::ArcSwap;
use enumset::{EnumSet, EnumSetType};
use serde_with::serde_as;

use crate::net::AdvertisedAddress;
use crate::{flexbuffers_storage_encode_decode, GenerationalNodeId, NodeId, PlainNodeId};
use crate::{Version, Versioned};

pub type UpdateableNodesConfiguration = Arc<ArcSwap<NodesConfiguration>>;

#[derive(Debug, thiserror::Error)]
pub enum NodesConfigError {
#[error("node {0} was not found in config")]
Expand Down Expand Up @@ -65,6 +61,17 @@ pub struct NodesConfiguration {
name_lookup: HashMap<String, PlainNodeId>,
}

impl Default for NodesConfiguration {
fn default() -> Self {
Self {
version: Version::INVALID,
cluster_name: "Unspecified".to_owned(),
nodes: Default::default(),
name_lookup: Default::default(),
}
}
}

#[derive(
Debug, Clone, Eq, PartialEq, strum_macros::EnumIs, serde::Serialize, serde::Deserialize,
)]
Expand Down
11 changes: 5 additions & 6 deletions crates/types/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@ impl Version {
}
}

impl Default for Version {
fn default() -> Self {
Self::MIN
}
}

impl From<crate::protobuf::common::Version> for Version {
fn from(version: crate::protobuf::common::Version) -> Self {
crate::Version::from(version.value)
Expand All @@ -60,6 +54,11 @@ impl From<Version> for crate::protobuf::common::Version {
pub trait Versioned {
/// Returns the version of the versioned value
fn version(&self) -> Version;

/// Is this a valid version?
fn valid(&self) -> bool {
self.version() >= Version::MIN
}
}

impl<T: Versioned> Versioned for &T {
Expand Down

0 comments on commit 125bea5

Please sign in to comment.