From 70d9b11060278f5631bf981f725e80e39c0e6585 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Mon, 29 Apr 2024 16:54:08 +0100 Subject: [PATCH] PartitionProcessorManager as long-living service --- crates/benchmarks/src/lib.rs | 2 +- crates/core/src/lib.rs | 1 + crates/core/src/worker_api/mod.rs | 13 +++ .../worker_api/partition_processor_manager.rs | 37 ++++++++ crates/node/src/lib.rs | 26 +++--- crates/node/src/roles/worker.rs | 66 +++---------- .../storage-query-datafusion/src/context.rs | 36 +++----- crates/storage-query-datafusion/src/mocks.rs | 11 ++- crates/types/src/config/query_engine.rs | 12 +-- crates/worker/src/lib.rs | 92 +++++++------------ .../worker/src/partition_processor_manager.rs | 84 +++++++++++++++-- server/src/main.rs | 2 +- 12 files changed, 210 insertions(+), 172 deletions(-) create mode 100644 crates/core/src/worker_api/mod.rs create mode 100644 crates/core/src/worker_api/partition_processor_manager.rs diff --git a/crates/benchmarks/src/lib.rs b/crates/benchmarks/src/lib.rs index 8b28c878d9..476208e82c 100644 --- a/crates/benchmarks/src/lib.rs +++ b/crates/benchmarks/src/lib.rs @@ -102,7 +102,7 @@ pub fn spawn_restate(config: Configuration) -> TaskCenter { RocksDbManager::init(Constant::new(config.common)) }); tc.spawn(TaskKind::TestRunner, "benchmark", None, async move { - let node = Node::new(updateable_config).expect("Restate node must build"); + let node = Node::create(updateable_config).await.expect("Restate node must build"); cloned_tc.run_in_scope("startup", None, node.start()).await }) .unwrap(); diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 42eb688d62..b5d1d360ac 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -14,6 +14,7 @@ mod metric_definitions; pub mod network; mod task_center; mod task_center_types; +pub mod worker_api; pub use metadata::{ spawn_metadata_manager, Metadata, MetadataKind, MetadataManager, MetadataWriter, SyncError, diff --git a/crates/core/src/worker_api/mod.rs b/crates/core/src/worker_api/mod.rs new file mode 100644 index 0000000000..26749fa603 --- /dev/null +++ b/crates/core/src/worker_api/mod.rs @@ -0,0 +1,13 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +mod partition_processor_manager; + +pub use partition_processor_manager::*; diff --git a/crates/core/src/worker_api/partition_processor_manager.rs b/crates/core/src/worker_api/partition_processor_manager.rs new file mode 100644 index 0000000000..5486de0a4c --- /dev/null +++ b/crates/core/src/worker_api/partition_processor_manager.rs @@ -0,0 +1,37 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use restate_types::identifiers::PartitionId; +use tokio::sync::{mpsc, oneshot}; + +use crate::ShutdownError; + +#[derive(Debug)] +pub enum ProcessorsManagerCommand { + GetLivePartitions(oneshot::Sender>), +} + +#[derive(Debug, Clone)] +pub struct ProcessorsManagerHandle(mpsc::Sender); + +impl ProcessorsManagerHandle { + pub fn new(sender: mpsc::Sender) -> Self { + Self(sender) + } + + pub async fn get_live_partitions(&self) -> Result, ShutdownError> { + let (tx, rx) = oneshot::channel(); + self.0 + .send(ProcessorsManagerCommand::GetLivePartitions(tx)) + .await + .unwrap(); + rx.await.map_err(|_| ShutdownError) + } +} diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index ed4daf98bb..2c0dd1a115 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -100,7 +100,7 @@ pub struct Node { } impl Node { - pub fn new(updateable_config: UpdateableConfiguration) -> Result { + pub async fn create(updateable_config: UpdateableConfiguration) -> Result { let config = updateable_config.pinned(); // ensure we have cluster admin role if bootstrapping. if config.common.allow_bootstrap { @@ -140,7 +140,7 @@ impl Node { metadata_manager.register_in_message_router(&mut router_builder); let metadata = metadata_manager.metadata(); let updating_schema_information = metadata.schema_updateable(); - let bifrost = BifrostService::new(metadata); + let bifrost = BifrostService::new(metadata.clone()); let admin_role = if config.has_role(Role::Admin) { Some(AdminRole::new( @@ -153,14 +153,18 @@ impl Node { }; let worker_role = if config.has_role(Role::Worker) { - Some(WorkerRole::new( - updateable_config.clone(), - &mut router_builder, - networking.clone(), - bifrost.handle(), - metadata_store_client, - updating_schema_information, - )?) + Some( + WorkerRole::create( + metadata, + updateable_config.clone(), + &mut router_builder, + networking.clone(), + bifrost.handle(), + metadata_store_client, + updating_schema_information, + ) + .await?, + ) } else { None }; @@ -321,7 +325,7 @@ impl Node { TaskKind::SystemBoot, "worker-init", None, - worker_role.start(bifrost), + worker_role.start(), )?; } diff --git a/crates/node/src/roles/worker.rs b/crates/node/src/roles/worker.rs index 518cbcee62..d124ab32f4 100644 --- a/crates/node/src/roles/worker.rs +++ b/crates/node/src/roles/worker.rs @@ -8,27 +8,19 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::time::Duration; - use codederror::CodedError; -use tracing::info; use restate_bifrost::Bifrost; use restate_core::network::MessageRouterBuilder; -use restate_core::{cancellation_watcher, metadata, task_center}; +use restate_core::{cancellation_watcher, metadata, task_center, Metadata}; use restate_core::{ShutdownError, TaskKind}; -use restate_grpc_util::create_grpc_channel_from_advertised_address; use restate_metadata_store::MetadataStoreClient; use restate_network::Networking; use restate_node_protocol::metadata::MetadataKind; -use restate_node_services::cluster_ctrl::cluster_ctrl_svc_client::ClusterCtrlSvcClient; -use restate_node_services::cluster_ctrl::AttachmentRequest; use restate_schema::UpdateableSchema; use restate_schema_api::subscription::SubscriptionResolver; use restate_storage_query_datafusion::context::QueryContext; use restate_types::config::UpdateableConfiguration; -use restate_types::net::AdvertisedAddress; -use restate_types::retries::RetryPolicy; use restate_types::Version; use restate_worker::SubscriptionController; use restate_worker::{SubscriptionControllerHandle, Worker}; @@ -41,12 +33,6 @@ pub enum WorkerRoleError { #[code] restate_worker::Error, ), - #[error("invalid cluster controller address: {0}")] - #[code(unknown)] - InvalidClusterControllerAddress(http::Error), - #[error("failed to attach to cluster at '{0}': {1}")] - #[code(unknown)] - Attachment(AdvertisedAddress, tonic::Status), #[error(transparent)] #[code(unknown)] Shutdown(#[from] ShutdownError), @@ -77,7 +63,8 @@ pub struct WorkerRole { } impl WorkerRole { - pub fn new( + pub async fn create( + metadata: Metadata, updateable_config: UpdateableConfiguration, router_builder: &mut MessageRouterBuilder, networking: Networking, @@ -85,14 +72,16 @@ impl WorkerRole { metadata_store_client: MetadataStoreClient, updating_schema_information: UpdateableSchema, ) -> Result { - let worker = Worker::from_options( + let worker = Worker::create( updateable_config, + metadata, networking, bifrost, router_builder, updating_schema_information, metadata_store_client, - )?; + ) + .await?; Ok(WorkerRole { worker }) } @@ -105,54 +94,23 @@ impl WorkerRole { Some(self.worker.subscription_controller_handle()) } - pub async fn start(self, bifrost: Bifrost) -> anyhow::Result<()> { - let admin_address = metadata() - .nodes_config() - .get_admin_node() - .expect("at least one admin node") - .address - .clone(); - + pub async fn start(self) -> anyhow::Result<()> { + let tc = task_center(); // todo: only run subscriptions on node 0 once being distributed - task_center().spawn_child( + tc.spawn_child( TaskKind::MetadataBackgroundSync, "subscription_controller", None, Self::watch_subscriptions(self.worker.subscription_controller_handle()), )?; - task_center().spawn_child(TaskKind::RoleRunner, "worker-service", None, async { - Self::attach_node(admin_address).await?; - self.worker.run(bifrost).await + tc.spawn_child(TaskKind::RoleRunner, "worker-service", None, async { + self.worker.run().await })?; Ok(()) } - async fn attach_node(admin_address: AdvertisedAddress) -> Result<(), WorkerRoleError> { - info!("Worker attaching to admin at '{admin_address}'"); - - let channel = create_grpc_channel_from_advertised_address(admin_address.clone()) - .map_err(WorkerRoleError::InvalidClusterControllerAddress)?; - - let cc_client = ClusterCtrlSvcClient::new(channel); - - let _response = RetryPolicy::exponential(Duration::from_millis(50), 2.0, Some(10), None) - .retry(|| async { - cc_client - .clone() - .attach_node(AttachmentRequest { - node_id: Some(metadata().my_node_id().into()), - }) - .await - }) - .await - .map_err(|err| WorkerRoleError::Attachment(admin_address, err))? - .into_inner(); - - Ok(()) - } - async fn watch_subscriptions(subscription_controller: SC) -> anyhow::Result<()> where SC: SubscriptionController + Clone + Send + Sync, diff --git a/crates/storage-query-datafusion/src/context.rs b/crates/storage-query-datafusion/src/context.rs index 09fec57b36..09e30a6526 100644 --- a/crates/storage-query-datafusion/src/context.rs +++ b/crates/storage-query-datafusion/src/context.rs @@ -19,7 +19,7 @@ use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::prelude::{SessionConfig, SessionContext}; use restate_invoker_api::StatusHandle; -use restate_partition_store::PartitionStore; +use restate_partition_store::{PartitionStore, PartitionStoreManager}; use restate_schema_api::deployment::DeploymentResolver; use restate_schema_api::service::ServiceMetadataResolver; use restate_types::config::QueryEngineOptions; @@ -77,15 +77,10 @@ pub struct QueryContext { datafusion_context: SessionContext, } -impl Default for QueryContext { - fn default() -> Self { - QueryContext::new(None, None, None) - } -} - impl QueryContext { - pub fn from_options( + pub async fn create( options: &QueryEngineOptions, + _partition_store_manager: PartitionStoreManager, rocksdb: PartitionStore, status: impl StatusHandle + Send + Sync + Debug + Clone + 'static, schemas: impl DeploymentResolver @@ -97,7 +92,7 @@ impl QueryContext { + 'static, ) -> Result { let ctx = QueryContext::new( - options.memory_size(), + options.memory_size.get(), options.tmp_dir.clone(), options.query_parallelism(), ); @@ -111,33 +106,24 @@ impl QueryContext { crate::service::register_self(&ctx, schemas)?; crate::idempotency::register_self(&ctx, rocksdb)?; - // todo: Fix me - // we need this now because we can't make new async. - // i'm ashamed! - let ctx = futures::executor::block_on(async move { - let ctx = ctx; - ctx.datafusion_context - .sql(SYS_INVOCATION_VIEW) - .await - .map(|_| ctx) - })?; + let ctx = ctx + .datafusion_context + .sql(SYS_INVOCATION_VIEW) + .await + .map(|_| ctx)?; Ok(ctx) } fn new( - memory_limit: Option, + memory_limit: usize, temp_folder: Option, default_parallelism: Option, ) -> Self { // // build the runtime // - let mut runtime_config = RuntimeConfig::default(); - runtime_config = runtime_config.with_memory_limit(4 * 1024 * 1024 * 1024, 1.0); - if let Some(limit) = memory_limit { - runtime_config = runtime_config.with_memory_limit(limit, 1.0); - } + let mut runtime_config = RuntimeConfig::default().with_memory_limit(memory_limit, 1.0); if let Some(folder) = temp_folder { runtime_config = runtime_config.with_temp_file_path(folder); } diff --git a/crates/storage-query-datafusion/src/mocks.rs b/crates/storage-query-datafusion/src/mocks.rs index 9bf4943474..c34b9373d7 100644 --- a/crates/storage-query-datafusion/src/mocks.rs +++ b/crates/storage-query-datafusion/src/mocks.rs @@ -112,8 +112,15 @@ impl MockQueryEngine { Self( rocksdb.clone(), - QueryContext::from_options(&QueryEngineOptions::default(), rocksdb, status, schemas) - .unwrap(), + QueryContext::create( + &QueryEngineOptions::default(), + manager, + rocksdb, + status, + schemas, + ) + .await + .unwrap(), ) } diff --git a/crates/types/src/config/query_engine.rs b/crates/types/src/config/query_engine.rs index 62f65ae862..b7fbb59329 100644 --- a/crates/types/src/config/query_engine.rs +++ b/crates/types/src/config/query_engine.rs @@ -27,9 +27,9 @@ pub struct QueryEngineOptions { /// # Memory size limit /// /// The total memory in bytes that can be used to preform sql queries - #[cfg_attr(feature = "schemars", schemars(with = "Option"))] - #[serde_as(as = "Option")] - pub memory_size: Option, + #[cfg_attr(feature = "schemars", schemars(with = "NonZeroByteCount"))] + #[serde_as(as = "NonZeroByteCount")] + pub memory_size: NonZeroUsize, /// # Temp folder to use for spill /// @@ -51,15 +51,11 @@ impl QueryEngineOptions { pub fn query_parallelism(&self) -> Option { self.query_parallelism.map(Into::into) } - - pub fn memory_size(&self) -> Option { - self.memory_size.map(Into::into) - } } impl Default for QueryEngineOptions { fn default() -> Self { Self { - memory_size: None, + memory_size: NonZeroUsize::new(4_000_000_000).unwrap(), // 4GB tmp_dir: None, query_parallelism: None, pgsql_bind_address: "0.0.0.0:9071".parse().unwrap(), diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index cd2e06f90c..45b18a34e9 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -29,7 +29,7 @@ pub use subscription_integration::SubscriptionControllerHandle; use codederror::CodedError; use restate_bifrost::Bifrost; use restate_core::network::MessageRouterBuilder; -use restate_core::{metadata, task_center, TaskKind}; +use restate_core::{task_center, Metadata, TaskKind}; use restate_ingress_dispatcher::IngressDispatcher; use restate_ingress_http::HyperServerIngress; use restate_ingress_kafka::Service as IngressKafkaService; @@ -46,10 +46,7 @@ use restate_storage_query_postgres::service::PostgresQueryService; use crate::invoker_integration::EntryEnricher; use crate::partition::storage::invoker::InvokerStorageReader; -use crate::partition_processor_manager::{ - Action, PartitionProcessorManager, PartitionProcessorPlan, Role, -}; -use restate_types::Version; +use crate::partition_processor_manager::PartitionProcessorManager; type PartitionProcessor = partition::PartitionProcessor< ProtobufRawEntryCodec, @@ -94,8 +91,6 @@ pub enum Error { pub struct Worker { updateable_config: UpdateableConfiguration, - networking: Networking, - metadata_store_client: MetadataStoreClient, storage_query_context: QueryContext, storage_query_postgres: PostgresQueryService, #[allow(clippy::type_complexity)] @@ -107,38 +102,22 @@ pub struct Worker { external_client_ingress: ExternalClientIngress, ingress_kafka: IngressKafkaService, subscription_controller_handle: SubscriptionControllerHandle, - partition_store_manager: PartitionStoreManager, + partition_processor_manager: PartitionProcessorManager, } impl Worker { - pub fn from_options( + pub async fn create( updateable_config: UpdateableConfiguration, + metadata: Metadata, networking: Networking, bifrost: Bifrost, router_builder: &mut MessageRouterBuilder, schema_view: UpdateableSchema, metadata_store_client: MetadataStoreClient, - ) -> Result { + ) -> Result { metric_definitions::describe_metrics(); - Worker::new( - updateable_config, - networking, - bifrost, - router_builder, - schema_view, - metadata_store_client, - ) - } - pub fn new( - updateable_config: UpdateableConfiguration, - networking: Networking, - bifrost: Bifrost, - router_builder: &mut MessageRouterBuilder, - schema_view: UpdateableSchema, - metadata_store_client: MetadataStoreClient, - ) -> Result { - let ingress_dispatcher = IngressDispatcher::new(bifrost); + let ingress_dispatcher = IngressDispatcher::new(bifrost.clone()); router_builder.add_message_handler(ingress_dispatcher.clone()); let config = updateable_config.pinned(); @@ -157,11 +136,7 @@ impl Worker { ingress_kafka.create_command_sender(), ); - // todo: Fix me - // a really ugly hack (I'm ashamed) until we can decouple opening database(s) - // from worker creation, or we make worker creation async. This is a stop gap - // to avoid unraveling the entire worker creation process to be async in this change. - let partition_store_manager = futures::executor::block_on(PartitionStoreManager::create( + let partition_store_manager = PartitionStoreManager::create( updateable_config .clone() .map_as_updateable_owned(|c| &c.worker.storage), @@ -169,7 +144,7 @@ impl Worker { .clone() .map_as_updateable_owned(|c| &c.worker.storage.rocksdb), &[], - ))?; + ).await?; let legacy_storage = partition_store_manager.get_legacy_storage_REMOVE_ME(); @@ -180,12 +155,24 @@ impl Worker { schema_view.clone(), )?; - let storage_query_context = QueryContext::from_options( + let partition_processor_manager = PartitionProcessorManager::new( + updateable_config.clone(), + metadata.clone(), + metadata_store_client, + partition_store_manager.clone(), + networking, + bifrost, + invoker.handle(), + ); + + let storage_query_context = QueryContext::create( &config.admin.query_engine, + partition_store_manager, legacy_storage.clone(), invoker.status_reader(), schema_view.clone(), - )?; + ).await?; + let storage_query_postgres = PostgresQueryService::from_options( &config.admin.query_engine, storage_query_context.clone(), @@ -193,15 +180,13 @@ impl Worker { Ok(Self { updateable_config, - networking, storage_query_context, storage_query_postgres, invoker, external_client_ingress: ingress_http, ingress_kafka, subscription_controller_handle, - partition_store_manager, - metadata_store_client, + partition_processor_manager, }) } @@ -213,7 +198,7 @@ impl Worker { &self.storage_query_context } - pub async fn run(self, bifrost: Bifrost) -> anyhow::Result<()> { + pub async fn run(self) -> anyhow::Result<()> { let tc = task_center(); // Ingress RPC server @@ -244,8 +229,6 @@ impl Worker { ), )?; - let invoker_handle = self.invoker.handle(); - // Invoker service tc.spawn_child( TaskKind::SystemService, @@ -258,25 +241,12 @@ impl Worker { ), )?; - let mut partition_processor_manager = PartitionProcessorManager::new( - self.updateable_config.clone(), - metadata().my_node_id(), - self.metadata_store_client, - self.partition_store_manager, - self.networking, - bifrost, - invoker_handle, - ); - - let partition_table = metadata().wait_for_partition_table(Version::MIN).await?; - let plan = PartitionProcessorPlan::new( - partition_table.version(), - partition_table - .partitioner() - .map(|(partition_id, _)| (partition_id, Action::Start(Role::Leader))) - .collect(), - ); - partition_processor_manager.apply_plan(plan).await?; + tc.spawn_child( + TaskKind::SystemService, + "partition-processor-manager", + None, + self.partition_processor_manager.run(), + )?; Ok(()) } diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 7d4d4db5e7..caa39558d0 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -12,7 +12,8 @@ use crate::partition::storage::invoker::InvokerStorageReader; use crate::PartitionProcessor; use anyhow::Context; use restate_bifrost::Bifrost; -use restate_core::{metadata, task_center, ShutdownError, TaskId, TaskKind}; +use restate_core::worker_api::{ProcessorsManagerCommand, ProcessorsManagerHandle}; +use restate_core::{cancellation_watcher, task_center, Metadata, ShutdownError, TaskId, TaskKind}; use restate_invoker_impl::InvokerHandle; use restate_metadata_store::{MetadataStoreClient, ReadModifyWriteError}; use restate_network::Networking; @@ -25,50 +26,115 @@ use restate_types::logs::{LogId, Payload}; use restate_types::metadata_store::keys::partition_processor_epoch_key; use restate_types::{GenerationalNodeId, Version}; use restate_wal_protocol::control::AnnounceLeader; -use restate_wal_protocol::{Command, Destination, Envelope, Header, Source}; +use restate_wal_protocol::{Command as WalCommand, Destination, Envelope, Header, Source}; use std::collections::HashMap; use std::ops::RangeInclusive; -use tracing::debug; +use tokio::sync::mpsc; +use tracing::{debug, info}; pub struct PartitionProcessorManager { updateable_config: UpdateableConfiguration, - node_id: GenerationalNodeId, running_partition_processors: HashMap, + metadata: Metadata, metadata_store_client: MetadataStoreClient, partition_store_manager: PartitionStoreManager, networking: Networking, bifrost: Bifrost, invoker_handle: InvokerHandle>, + rx: mpsc::Receiver, + _tx: mpsc::Sender, } impl PartitionProcessorManager { pub fn new( updateable_config: UpdateableConfiguration, - node_id: GenerationalNodeId, + metadata: Metadata, metadata_store_client: MetadataStoreClient, partition_store_manager: PartitionStoreManager, networking: Networking, bifrost: Bifrost, invoker_handle: InvokerHandle>, ) -> Self { + let (_tx, rx) = mpsc::channel(updateable_config.load().worker.internal_queue_length()); Self { updateable_config, running_partition_processors: HashMap::default(), - node_id, + metadata, metadata_store_client, partition_store_manager, networking, bifrost, invoker_handle, + rx, + _tx, + } + } + + pub fn _handle(&self) -> ProcessorsManagerHandle { + ProcessorsManagerHandle::new(self._tx.clone()) + } + + pub async fn run(mut self) -> anyhow::Result<()> { + self.attach_worker().await?; + + // simulating a plan after initial attachement + let partition_table = self.metadata.wait_for_partition_table(Version::MIN).await?; + let plan = PartitionProcessorPlan::new( + partition_table.version(), + partition_table + .partitioner() + .map(|(partition_id, _)| (partition_id, Action::Start(Role::Leader))) + .collect(), + ); + self.apply_plan(plan).await?; + + let shutdown = cancellation_watcher(); + tokio::pin!(shutdown); + + loop { + tokio::select! { + Some(command) = self.rx.recv() => { + self.handle_command(command).await; + debug!("PartitionProcessorManager shutting down"); + } + _ = &mut shutdown => { + return Ok(()); + } + } } } + async fn handle_command(&mut self, command: ProcessorsManagerCommand) { + use ProcessorsManagerCommand::*; + match command { + GetLivePartitions(sender) => { + let live_partitions = self.running_partition_processors.keys().cloned().collect(); + let _ = sender.send(live_partitions); + } + } + } + + async fn attach_worker(&mut self) -> anyhow::Result<()> { + let admin_address = self + .metadata + .nodes_config() + .get_admin_node() + .expect("at least one admin node") + .address + .clone(); + + info!("Worker attaching to admin at '{admin_address}'"); + // todo: use Networking to attach to a cluster admin node. + Ok(()) + } + #[allow(clippy::map_entry)] pub async fn apply_plan(&mut self, plan: PartitionProcessorPlan) -> anyhow::Result<()> { let config = self.updateable_config.pinned(); let options = &config.worker; - let partition_table = metadata() + let partition_table = self + .metadata .wait_for_partition_table(plan.min_required_partition_table_version) .await?; @@ -115,7 +181,7 @@ impl PartitionProcessorManager { let networking = self.networking.clone(); let mut bifrost = self.bifrost.clone(); let metadata_store_client = self.metadata_store_client.clone(); - let node_id = self.node_id; + let node_id = self.metadata.my_node_id(); task_center().spawn_child( TaskKind::PartitionProcessor, @@ -222,7 +288,7 @@ impl PartitionProcessorManager { let envelope = Envelope::new( header, - Command::AnnounceLeader(AnnounceLeader { + WalCommand::AnnounceLeader(AnnounceLeader { node_id, leader_epoch, }), diff --git a/server/src/main.rs b/server/src/main.rs index 37d8632d5a..1df812446c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -219,7 +219,7 @@ fn main() { .expect("Error when trying to wipe the configured storage path"); } - let node = Node::new(Configuration::current().clone()); + let node = Node::create(Configuration::current().clone()).await; if let Err(err) = node { handle_error(err); }