From e665ffdd785d23c4abc23aed2035a5b231664aba Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Thu, 27 Jun 2024 16:36:55 +0100 Subject: [PATCH] MetadataBuilder to untangle shared metadata construction Address a chicken-and-an-egg problem at construction time of the runtime environment. MetadataBuilder helps by allowing the shared metadata object to be constructed early while encapsulating metadata's construction implementation details. --- crates/bifrost/benches/util.rs | 13 ++++-- crates/core/src/lib.rs | 3 +- crates/core/src/metadata/manager.rs | 66 +++++++++++++++------------ crates/core/src/metadata/mod.rs | 31 ++++++++++--- crates/core/src/network/networking.rs | 16 +++++-- crates/core/src/test_env.rs | 14 ++++-- crates/node/src/lib.rs | 16 ++++--- crates/types/src/nodes_config.rs | 4 ++ tools/bifrost-benchpress/src/main.rs | 13 ++++-- 9 files changed, 119 insertions(+), 57 deletions(-) diff --git a/crates/bifrost/benches/util.rs b/crates/bifrost/benches/util.rs index d6920c55ad..5657eabf8c 100644 --- a/crates/bifrost/benches/util.rs +++ b/crates/bifrost/benches/util.rs @@ -9,7 +9,8 @@ // by the Apache License, Version 2.0. use restate_core::{ - spawn_metadata_manager, MetadataManager, MockNetworkSender, TaskCenter, TaskCenterBuilder, + spawn_metadata_manager, MetadataBuilder, MetadataManager, MockNetworkSender, TaskCenter, + TaskCenterBuilder, }; use restate_metadata_store::{MetadataStoreClient, Precondition}; use restate_rocksdb::RocksDbManager; @@ -32,10 +33,14 @@ pub async fn spawn_environment( let network_sender = MockNetworkSender::default(); let metadata_store_client = MetadataStoreClient::new_in_memory(); - let metadata_manager = - MetadataManager::build(network_sender.clone(), metadata_store_client.clone()); + let metadata_builder = MetadataBuilder::default(); + let metadata = metadata_builder.to_metadata(); + let metadata_manager = MetadataManager::new( + metadata_builder, + network_sender.clone(), + metadata_store_client.clone(), + ); - let metadata = metadata_manager.metadata(); let metadata_writer = metadata_manager.writer(); tc.try_set_global_metadata(metadata.clone()); diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index b5d1d360ac..76130c419a 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -17,7 +17,8 @@ mod task_center_types; pub mod worker_api; pub use metadata::{ - spawn_metadata_manager, Metadata, MetadataKind, MetadataManager, MetadataWriter, SyncError, + spawn_metadata_manager, Metadata, MetadataBuilder, MetadataKind, MetadataManager, + MetadataWriter, SyncError, }; pub use task_center::*; pub use task_center_types::*; diff --git a/crates/core/src/metadata/manager.rs b/crates/core/src/metadata/manager.rs index f371bb3d48..a3bbda5e9d 100644 --- a/crates/core/src/metadata/manager.rs +++ b/crates/core/src/metadata/manager.rs @@ -35,7 +35,8 @@ use crate::metadata_store::{MetadataStoreClient, ReadError}; use crate::network::{MessageHandler, MessageRouterBuilder, NetworkSender}; use crate::task_center; -use super::{Metadata, MetadataContainer, MetadataInner, MetadataKind, MetadataWriter}; +use super::MetadataBuilder; +use super::{Metadata, MetadataContainer, MetadataKind, MetadataWriter}; pub(super) type CommandSender = mpsc::UnboundedSender; pub(super) type CommandReceiver = mpsc::UnboundedReceiver; @@ -197,8 +198,7 @@ where /// - NodesConfiguration /// - Partition table pub struct MetadataManager { - self_sender: CommandSender, - inner: Arc, + metadata: Metadata, inbound: CommandReceiver, networking: N, metadata_store_client: MetadataStoreClient, @@ -208,12 +208,14 @@ impl MetadataManager where N: NetworkSender + 'static + Clone, { - pub fn build(networking: N, metadata_store_client: MetadataStoreClient) -> Self { - let (self_sender, inbound) = mpsc::unbounded_channel(); + pub fn new( + metadata_builder: MetadataBuilder, + networking: N, + metadata_store_client: MetadataStoreClient, + ) -> Self { Self { - inner: Arc::new(MetadataInner::default()), - inbound, - self_sender, + metadata: metadata_builder.metadata, + inbound: metadata_builder.receiver, networking, metadata_store_client, } @@ -221,17 +223,17 @@ where pub fn register_in_message_router(&self, sr_builder: &mut MessageRouterBuilder) { sr_builder.add_message_handler(MetadataMessageHandler { - sender: self.self_sender.clone(), + sender: self.metadata.sender.clone(), networking: self.networking.clone(), }); } - pub fn metadata(&self) -> Metadata { - Metadata::new(self.inner.clone(), self.self_sender.clone()) + pub fn metadata(&self) -> &Metadata { + &self.metadata } pub fn writer(&self) -> MetadataWriter { - MetadataWriter::new(self.self_sender.clone(), self.inner.clone()) + MetadataWriter::new(self.metadata.sender.clone(), self.metadata.inner.clone()) } /// Start and wait for shutdown signal. @@ -330,26 +332,27 @@ where } fn update_nodes_configuration(&mut self, config: NodesConfiguration) { - let maybe_new_version = Self::update_option_internal(&self.inner.nodes_config, config); + let maybe_new_version = + Self::update_option_internal(&self.metadata.inner.nodes_config, config); self.notify_watches(maybe_new_version, MetadataKind::NodesConfiguration); } fn update_partition_table(&mut self, partition_table: FixedPartitionTable) { let maybe_new_version = - Self::update_option_internal(&self.inner.partition_table, partition_table); + Self::update_option_internal(&self.metadata.inner.partition_table, partition_table); self.notify_watches(maybe_new_version, MetadataKind::PartitionTable); } fn update_logs(&mut self, logs: Logs) { - let maybe_new_version = Self::update_option_internal(&self.inner.logs, logs); + let maybe_new_version = Self::update_option_internal(&self.metadata.inner.logs, logs); self.notify_watches(maybe_new_version, MetadataKind::Logs); } fn update_schema(&mut self, schema: Schema) { - let maybe_new_version = Self::update_internal(&self.inner.schema, schema); + let maybe_new_version = Self::update_internal(&self.metadata.inner.schema, schema); self.notify_watches(maybe_new_version, MetadataKind::Schema); } @@ -399,14 +402,16 @@ where fn notify_watches(&mut self, maybe_new_version: Version, kind: MetadataKind) { // notify watches. - self.inner.write_watches[kind].sender.send_if_modified(|v| { - if maybe_new_version > *v { - *v = maybe_new_version; - true - } else { - false - } - }); + self.metadata.inner.write_watches[kind] + .sender + .send_if_modified(|v| { + if maybe_new_version > *v { + *v = maybe_new_version; + true + } else { + false + } + }); } } @@ -460,9 +465,11 @@ mod tests { tc.block_on("test", None, async move { let network_sender = MockNetworkSender::default(); let metadata_store_client = MetadataStoreClient::new_in_memory(); - let metadata_manager = MetadataManager::build(network_sender, metadata_store_client); + let metadata_builder = MetadataBuilder::default(); + let metadata = metadata_builder.to_metadata(); + let metadata_manager = + MetadataManager::new(metadata_builder, network_sender, metadata_store_client); let metadata_writer = metadata_manager.writer(); - let metadata = metadata_manager.metadata(); assert_eq!(Version::INVALID, config_version(&metadata)); @@ -541,9 +548,12 @@ mod tests { tc.block_on("test", None, async move { let network_sender = MockNetworkSender::default(); let metadata_store_client = MetadataStoreClient::new_in_memory(); - let metadata_manager = MetadataManager::build(network_sender, metadata_store_client); + + let metadata_builder = MetadataBuilder::default(); + let metadata = metadata_builder.to_metadata(); + let metadata_manager = + MetadataManager::new(metadata_builder, network_sender, metadata_store_client); let metadata_writer = metadata_manager.writer(); - let metadata = metadata_manager.metadata(); assert_eq!(Version::INVALID, config_version(&metadata)); diff --git a/crates/core/src/metadata/mod.rs b/crates/core/src/metadata/mod.rs index 7603690afa..032403f7eb 100644 --- a/crates/core/src/metadata/mod.rs +++ b/crates/core/src/metadata/mod.rs @@ -19,7 +19,7 @@ use std::sync::{Arc, OnceLock}; use arc_swap::{ArcSwap, ArcSwapOption}; use enum_map::EnumMap; -use tokio::sync::{oneshot, watch}; +use tokio::sync::{mpsc, oneshot, watch}; use restate_types::logs::metadata::Logs; use restate_types::net::metadata::MetadataContainer; @@ -41,8 +41,31 @@ pub enum SyncError { Shutdown(#[from] ShutdownError), } -/// The kind of versioned metadata that can be synchronized across nodes. +pub struct MetadataBuilder { + receiver: manager::CommandReceiver, + metadata: Metadata, +} + +impl MetadataBuilder { + pub fn to_metadata(&self) -> Metadata { + self.metadata.clone() + } +} + +impl Default for MetadataBuilder { + fn default() -> Self { + let (sender, receiver) = mpsc::unbounded_channel(); + Self { + receiver, + metadata: Metadata { + inner: Default::default(), + sender, + }, + } + } +} +/// The kind of versioned metadata that can be synchronized across nodes. #[derive(Clone)] pub struct Metadata { sender: manager::CommandSender, @@ -50,10 +73,6 @@ pub struct Metadata { } impl Metadata { - fn new(inner: Arc, sender: manager::CommandSender) -> Self { - Self { inner, sender } - } - /// Panics if nodes configuration is not loaded yet. #[track_caller] pub fn nodes_config(&self) -> Arc { diff --git a/crates/core/src/network/networking.rs b/crates/core/src/network/networking.rs index 3f7a6fe8f5..b5444c721e 100644 --- a/crates/core/src/network/networking.rs +++ b/crates/core/src/network/networking.rs @@ -18,19 +18,27 @@ use restate_types::NodeId; use super::{ConnectionManager, ConnectionSender}; use super::{NetworkError, NetworkSender}; -use crate::metadata; +use crate::Metadata; const DEFAULT_MAX_CONNECT_ATTEMPTS: u32 = 10; // todo: make this configurable const SEND_RETRY_BASE_DURATION: Duration = Duration::from_millis(250); /// Access to node-to-node networking infrastructure; -#[derive(Clone, Default)] +#[derive(Clone)] pub struct Networking { connections: ConnectionManager, + metadata: Metadata, } impl Networking { + pub fn new(metadata: Metadata) -> Self { + Self { + connections: Default::default(), + metadata, + } + } + pub fn connection_manager(&self) -> ConnectionManager { self.connections.clone() } @@ -43,7 +51,7 @@ impl Networking { let node = match node.as_generational() { Some(node) => node, None => { - metadata() + self.metadata .nodes_config() .find_node_by_id(node)? .current_generation @@ -68,7 +76,7 @@ impl NetworkSender for Networking { // to ensure we get the latest if it has been updated since last attempt. let to = match to.as_generational() { Some(to) => to, - None => match metadata().nodes_config().find_node_by_id(to) { + None => match self.metadata.nodes_config().find_node_by_id(to) { Ok(node) => node.current_generation, Err(e) => return Err(NetworkError::UnknownNode(e)), }, diff --git a/crates/core/src/test_env.rs b/crates/core/src/test_env.rs index 9ad43047ed..f351903c7c 100644 --- a/crates/core/src/test_env.rs +++ b/crates/core/src/test_env.rs @@ -34,7 +34,9 @@ use crate::network::{ Handler, MessageHandler, MessageRouter, MessageRouterBuilder, NetworkError, NetworkSender, ProtocolError, }; -use crate::{cancellation_watcher, metadata, spawn_metadata_manager, ShutdownError, TaskId}; +use crate::{ + cancellation_watcher, metadata, spawn_metadata_manager, MetadataBuilder, ShutdownError, TaskId, +}; use crate::{Metadata, MetadataManager, MetadataWriter}; use crate::{TaskCenter, TaskCenterBuilder}; @@ -170,9 +172,13 @@ where let my_node_id = GenerationalNodeId::new(1, 1); let metadata_store_client = MetadataStoreClient::new_in_memory(); - let metadata_manager = - MetadataManager::build(network_sender.clone(), metadata_store_client.clone()); - let metadata = metadata_manager.metadata(); + let metadata_builder = MetadataBuilder::default(); + let metadata = metadata_builder.to_metadata(); + let metadata_manager = MetadataManager::new( + metadata_builder, + network_sender.clone(), + metadata_store_client.clone(), + ); let metadata_writer = metadata_manager.writer(); let router_builder = MessageRouterBuilder::default(); let nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index ce5f8b3fce..0f5c7a517d 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -24,7 +24,7 @@ use restate_bifrost::BifrostService; use restate_core::metadata_store::{MetadataStoreClientError, ReadWriteError}; use restate_core::network::MessageRouterBuilder; use restate_core::network::Networking; -use restate_core::{spawn_metadata_manager, MetadataKind, MetadataManager}; +use restate_core::{spawn_metadata_manager, MetadataBuilder, MetadataKind, MetadataManager}; use restate_core::{task_center, TaskKind}; use restate_metadata_store::local::LocalMetadataStoreService; use restate_metadata_store::MetadataStoreClient; @@ -144,11 +144,15 @@ impl Node { ); let mut router_builder = MessageRouterBuilder::default(); - let networking = Networking::default(); - let metadata_manager = - MetadataManager::build(networking.clone(), metadata_store_client.clone()); + let metadata_builder = MetadataBuilder::default(); + let metadata = metadata_builder.to_metadata(); + let networking = Networking::new(metadata_builder.to_metadata()); + let metadata_manager = MetadataManager::new( + metadata_builder, + networking.clone(), + metadata_store_client.clone(), + ); metadata_manager.register_in_message_router(&mut router_builder); - let metadata = metadata_manager.metadata(); let updating_schema_information = metadata.schema_updateable(); let bifrost = BifrostService::new(metadata.clone()); @@ -255,7 +259,7 @@ impl Node { ); let metadata_writer = self.metadata_manager.writer(); - let metadata = self.metadata_manager.metadata(); + let metadata = self.metadata_manager.metadata().clone(); let is_set = tc.try_set_global_metadata(metadata.clone()); debug_assert!(is_set, "Global metadata was already set"); diff --git a/crates/types/src/nodes_config.rs b/crates/types/src/nodes_config.rs index 726788a51e..780459eecb 100644 --- a/crates/types/src/nodes_config.rs +++ b/crates/types/src/nodes_config.rs @@ -12,7 +12,9 @@ #![allow(dead_code)] use std::collections::HashMap; +use std::sync::Arc; +use arc_swap::ArcSwap; use enumset::{EnumSet, EnumSetType}; use serde_with::serde_as; @@ -20,6 +22,8 @@ use crate::net::AdvertisedAddress; use crate::{flexbuffers_storage_encode_decode, GenerationalNodeId, NodeId, PlainNodeId}; use crate::{Version, Versioned}; +pub type UpdateableNodesConfiguration = Arc>; + #[derive(Debug, thiserror::Error)] pub enum NodesConfigError { #[error("node {0} was not found in config")] diff --git a/tools/bifrost-benchpress/src/main.rs b/tools/bifrost-benchpress/src/main.rs index 9f48dd03fe..6219fc06e4 100644 --- a/tools/bifrost-benchpress/src/main.rs +++ b/tools/bifrost-benchpress/src/main.rs @@ -19,7 +19,8 @@ use bifrost_benchpress::{append_latency, write_to_read, Arguments, Command}; use metrics_exporter_prometheus::PrometheusBuilder; use restate_bifrost::{Bifrost, BifrostService}; use restate_core::{ - spawn_metadata_manager, MetadataManager, MockNetworkSender, TaskCenter, TaskCenterBuilder, + spawn_metadata_manager, MetadataBuilder, MetadataManager, MockNetworkSender, TaskCenter, + TaskCenterBuilder, }; use restate_errors::fmt::RestateCode; use restate_metadata_store::{MetadataStoreClient, Precondition}; @@ -138,10 +139,14 @@ fn spawn_environment(config: Configuration, num_logs: u64) -> (TaskCenter, Bifro let bifrost = tc.block_on("spawn", None, async move { let network_sender = MockNetworkSender::default(); let metadata_store_client = MetadataStoreClient::new_in_memory(); - let metadata_manager = - MetadataManager::build(network_sender.clone(), metadata_store_client.clone()); + let metadata_builder = MetadataBuilder::default(); + let metadata = metadata_builder.to_metadata(); + let metadata_manager = MetadataManager::new( + metadata_builder, + network_sender.clone(), + metadata_store_client.clone(), + ); - let metadata = metadata_manager.metadata(); let metadata_writer = metadata_manager.writer(); task_center.try_set_global_metadata(metadata.clone());