Skip to content

Commit

Permalink
Some more MetadataStore to MetadataServer renamings
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Feb 5, 2025
1 parent de9c772 commit d37e6de
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 41 deletions.
13 changes: 7 additions & 6 deletions crates/metadata-server/src/grpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,23 @@ use crate::metric_definitions::{
STATUS_COMPLETED, STATUS_FAILED,
};
use crate::{
prepare_initial_nodes_configuration, MetadataStoreRequest, MetadataStoreSummary,
prepare_initial_nodes_configuration, MetadataServerSummary, MetadataStoreRequest,
ProvisionError, ProvisionRequest, ProvisionSender, RequestError, RequestSender, StatusWatch,
};
/// Grpc svc handler for the metadata store.

/// Grpc svc handler for the metadata server.
#[derive(Debug)]
pub struct MetadataStoreHandler {
pub struct MetadataServerHandler {
request_tx: RequestSender,
provision_tx: Option<ProvisionSender>,
status_watch: Option<StatusWatch>,
}

impl MetadataStoreHandler {
impl MetadataServerHandler {
pub fn new(
request_tx: RequestSender,
provision_tx: Option<ProvisionSender>,
status_watch: Option<watch::Receiver<MetadataStoreSummary>>,
status_watch: Option<watch::Receiver<MetadataServerSummary>>,
) -> Self {
Self {
request_tx,
Expand All @@ -61,7 +62,7 @@ impl MetadataStoreHandler {
}

#[async_trait]
impl MetadataServerSvc for MetadataStoreHandler {
impl MetadataServerSvc for MetadataServerHandler {
async fn get(&self, request: Request<GetRequest>) -> Result<Response<GetResponse>, Status> {
let start_time = Instant::now();

Expand Down
14 changes: 7 additions & 7 deletions crates/metadata-server/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub mod pb_conversions {
use crate::grpc::{
GetResponse, GetVersionResponse, PreconditionKind, Ulid, WriteRequest, WriteRequestKind,
};
use crate::{grpc, MetadataStoreSummary};
use crate::{grpc, MetadataServerSummary};
use restate_core::metadata_store::{Precondition, VersionedValue};
use restate_types::Version;

Expand Down Expand Up @@ -131,18 +131,18 @@ pub mod pb_conversions {
}
}

impl From<MetadataStoreSummary> for grpc::StatusResponse {
fn from(value: MetadataStoreSummary) -> Self {
impl From<MetadataServerSummary> for grpc::StatusResponse {
fn from(value: MetadataServerSummary) -> Self {
match value {
MetadataStoreSummary::Starting => grpc::StatusResponse {
MetadataServerSummary::Starting => grpc::StatusResponse {
status: restate_types::protobuf::common::MetadataServerStatus::StartingUp
.into(),
configuration: None,
leader: None,
raft: None,
snapshot: None,
},
MetadataStoreSummary::Provisioning => grpc::StatusResponse {
MetadataServerSummary::Provisioning => grpc::StatusResponse {
status:
restate_types::protobuf::common::MetadataServerStatus::AwaitingProvisioning
.into(),
Expand All @@ -151,14 +151,14 @@ pub mod pb_conversions {
raft: None,
snapshot: None,
},
MetadataStoreSummary::Standby => grpc::StatusResponse {
MetadataServerSummary::Standby => grpc::StatusResponse {
status: restate_types::protobuf::common::MetadataServerStatus::Standby.into(),
configuration: None,
leader: None,
raft: None,
snapshot: None,
},
MetadataStoreSummary::Member {
MetadataServerSummary::Member {
configuration,
leader,
raft,
Expand Down
8 changes: 4 additions & 4 deletions crates/metadata-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ pub type RequestReceiver = mpsc::Receiver<MetadataStoreRequest>;
pub type ProvisionSender = mpsc::Sender<ProvisionRequest>;
pub type ProvisionReceiver = mpsc::Receiver<ProvisionRequest>;

type StatusWatch = watch::Receiver<MetadataStoreSummary>;
type StatusSender = watch::Sender<MetadataStoreSummary>;
type StatusWatch = watch::Receiver<MetadataServerSummary>;
type StatusSender = watch::Sender<MetadataServerSummary>;

pub const KNOWN_LEADER_KEY: &str = "x-restate-known-leader";

Expand Down Expand Up @@ -553,9 +553,9 @@ impl Display for MemberId {
}
}

/// Status summary of the metadata store.
/// Status summary of the metadata server.
#[derive(Clone, Debug, Default)]
enum MetadataStoreSummary {
enum MetadataServerSummary {
#[default]
Starting,
Provisioning,
Expand Down
4 changes: 2 additions & 2 deletions crates/metadata-server/src/local/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::grpc::handler::MetadataStoreHandler;
use crate::grpc::handler::MetadataServerHandler;
use crate::grpc::metadata_server_svc_server::MetadataServerSvcServer;
use crate::{
grpc, util, MetadataServer, MetadataStoreRequest, PreconditionViolation, RequestError,
Expand Down Expand Up @@ -87,7 +87,7 @@ impl LocalMetadataServer {
.expect("metadata store db is open");

server_builder.register_grpc_service(
MetadataServerSvcServer::new(MetadataStoreHandler::new(request_tx, None, None))
MetadataServerSvcServer::new(MetadataServerHandler::new(request_tx, None, None))
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip),
grpc::FILE_DESCRIPTOR_SET,
Expand Down
6 changes: 3 additions & 3 deletions crates/metadata-server/src/raft/network/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ use tonic::{Request, Response, Status, Streaming};
pub const PEER_METADATA_KEY: &str = "x-restate-metadata-server-peer";

#[derive(Debug)]
pub struct MetadataStoreNetworkHandler<M> {
pub struct MetadataServerNetworkHandler<M> {
connection_manager: Arc<ArcSwapOption<ConnectionManager<M>>>,
join_cluster_handle: Option<JoinClusterHandle>,
}

impl<M> MetadataStoreNetworkHandler<M> {
impl<M> MetadataServerNetworkHandler<M> {
pub fn new(
connection_manager: Arc<ArcSwapOption<ConnectionManager<M>>>,
join_cluster_handle: Option<JoinClusterHandle>,
Expand All @@ -41,7 +41,7 @@ impl<M> MetadataStoreNetworkHandler<M> {
}

#[async_trait::async_trait]
impl<M> MetadataServerNetworkSvc for MetadataStoreNetworkHandler<M>
impl<M> MetadataServerNetworkSvc for MetadataServerNetworkHandler<M>
where
M: NetworkMessage + Send + 'static,
{
Expand Down
2 changes: 1 addition & 1 deletion crates/metadata-server/src/raft/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ mod networking;
pub use connection_manager::ConnectionManager;
pub use grpc_svc::metadata_server_network_svc_server::MetadataServerNetworkSvcServer;
pub use grpc_svc::FILE_DESCRIPTOR_SET;
pub use handler::MetadataStoreNetworkHandler;
pub use handler::MetadataServerNetworkHandler;
pub use networking::{NetworkMessage, Networking};
36 changes: 18 additions & 18 deletions crates/metadata-server/src/raft/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::grpc::handler::MetadataStoreHandler;
use crate::grpc::handler::MetadataServerHandler;
use crate::grpc::metadata_server_svc_server::MetadataServerSvcServer;
use crate::grpc::pb_conversions::ConversionError;
use crate::grpc::MetadataServerSnapshot;
Expand All @@ -21,14 +21,14 @@ use crate::metric_definitions::{
use crate::raft::kv_memory_storage::KvMemoryStorage;
use crate::raft::network::grpc_svc::metadata_server_network_svc_client::MetadataServerNetworkSvcClient;
use crate::raft::network::{
ConnectionManager, MetadataServerNetworkSvcServer, MetadataStoreNetworkHandler, Networking,
ConnectionManager, MetadataServerNetworkHandler, MetadataServerNetworkSvcServer, Networking,
};
use crate::raft::storage::RocksDbStorage;
use crate::raft::{network, storage, RaftConfiguration};
use crate::{
grpc, prepare_initial_nodes_configuration, InvalidConfiguration, JoinClusterError,
JoinClusterHandle, JoinClusterReceiver, JoinClusterRequest, JoinError, KnownLeader, MemberId,
MetadataServer, MetadataServerConfiguration, MetadataStoreRequest, MetadataStoreSummary,
MetadataServer, MetadataServerConfiguration, MetadataServerSummary, MetadataStoreRequest,
ProvisionError, ProvisionReceiver, RaftSummary, Request, RequestError, RequestKind,
RequestReceiver, SnapshotSummary, StatusSender, StorageId, WriteRequest,
};
Expand Down Expand Up @@ -167,7 +167,7 @@ impl RaftMetadataServer {
let (request_tx, request_rx) = mpsc::channel(2);
let (provision_tx, provision_rx) = mpsc::channel(1);
let (join_cluster_tx, join_cluster_rx) = mpsc::channel(1);
let (status_tx, status_rx) = watch::channel(MetadataStoreSummary::default());
let (status_tx, status_rx) = watch::channel(MetadataServerSummary::default());

let mut metadata_store_options =
Configuration::updateable().map(|configuration| &configuration.metadata_server);
Expand Down Expand Up @@ -195,7 +195,7 @@ impl RaftMetadataServer {
let connection_manager = Arc::default();

server_builder.register_grpc_service(
MetadataServerNetworkSvcServer::new(MetadataStoreNetworkHandler::new(
MetadataServerNetworkSvcServer::new(MetadataServerNetworkHandler::new(
Arc::clone(&connection_manager),
Some(JoinClusterHandle::new(join_cluster_tx)),
))
Expand All @@ -204,7 +204,7 @@ impl RaftMetadataServer {
network::FILE_DESCRIPTOR_SET,
);
server_builder.register_grpc_service(
MetadataServerSvcServer::new(MetadataStoreHandler::new(
MetadataServerSvcServer::new(MetadataServerHandler::new(
request_tx,
Some(provision_tx),
Some(status_rx),
Expand Down Expand Up @@ -233,7 +233,7 @@ impl RaftMetadataServer {

let result = tokio::select! {
_ = &mut shutdown => {
debug!("Shutting down RaftMetadataStore");
debug!("Shutting down RaftMetadataServer");
Ok(())
},
result = self.run_inner(&health_status) => {
Expand Down Expand Up @@ -278,7 +278,7 @@ impl RaftMetadataServer {
}

async fn await_provisioning(mut self) -> Result<Provisioned, Error> {
let _ = self.status_tx.send(MetadataStoreSummary::Provisioning);
let _ = self.status_tx.send(MetadataServerSummary::Provisioning);
let mut provision_rx = self.provision_rx.take().expect("must be present");

let result = if let Some(configuration) = self.storage.get_raft_configuration()? {
Expand Down Expand Up @@ -604,7 +604,7 @@ impl Member {
snapshot_summary,
};

member.validate_metadata_store_configuration();
member.validate_metadata_server_configuration();

Ok(member)
}
Expand Down Expand Up @@ -885,7 +885,7 @@ impl Member {
&mut self.kv_storage,
)?;

self.validate_metadata_store_configuration();
self.validate_metadata_server_configuration();

self.raw_node.mut_store().apply_snapshot(snapshot).await?;

Expand Down Expand Up @@ -1032,7 +1032,7 @@ impl Member {
);
self.configuration = new_configuration;

self.validate_metadata_store_configuration();
self.validate_metadata_server_configuration();

self.update_membership_in_nodes_configuration();

Expand All @@ -1046,7 +1046,7 @@ impl Member {
Ok(())
}

fn validate_metadata_store_configuration(&self) {
fn validate_metadata_server_configuration(&self) {
assert_eq!(
self.configuration.members.len(),
self.raw_node.raft.prs().conf().voters().ids().len(),
Expand All @@ -1057,7 +1057,7 @@ impl Member {
self.configuration
.members
.contains_key(&to_plain_node_id(voter)),
"voter '{}' in Raft configuration not found in MetadataStoreConfiguration",
"voter '{}' in Raft configuration not found in MetadataServerConfiguration",
voter
);
}
Expand Down Expand Up @@ -1223,7 +1223,7 @@ impl Member {
Some(to_plain_node_id(self.raw_node.raft.leader_id))
};

if let MetadataStoreSummary::Member {
if let MetadataServerSummary::Member {
leader,
configuration,
raft,
Expand All @@ -1239,7 +1239,7 @@ impl Member {
} else {
let raft = self.raft_summary();

*current_status = MetadataStoreSummary::Member {
*current_status = MetadataServerSummary::Member {
leader: current_leader,
configuration: self.configuration.clone(),
raft,
Expand All @@ -1251,8 +1251,8 @@ impl Member {
self.record_summary_metrics(&self.status_tx.borrow());
}

fn record_summary_metrics(&self, summary: &MetadataStoreSummary) {
let MetadataStoreSummary::Member {
fn record_summary_metrics(&self, summary: &MetadataServerSummary) {
let MetadataServerSummary::Member {
leader,
raft,
snapshot,
Expand Down Expand Up @@ -1382,7 +1382,7 @@ impl Standby {
status_tx,
} = self;

let _ = status_tx.send(MetadataStoreSummary::Standby);
let _ = status_tx.send(MetadataServerSummary::Standby);

// todo make configurable
let mut join_retry_policy = RetryPolicy::exponential(
Expand Down

0 comments on commit d37e6de

Please sign in to comment.