Skip to content

Commit

Permalink
PartitionProcessorManager as long-living service
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Apr 29, 2024
1 parent 8bcdd4b commit 70d9b11
Show file tree
Hide file tree
Showing 12 changed files with 210 additions and 172 deletions.
2 changes: 1 addition & 1 deletion crates/benchmarks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub fn spawn_restate(config: Configuration) -> TaskCenter {
RocksDbManager::init(Constant::new(config.common))

Check warning on line 102 in crates/benchmarks/src/lib.rs

View workflow job for this annotation

GitHub Actions / RustFmt Check

Diff in /home/runner/work/restate/restate/crates/benchmarks/src/lib.rs
});
tc.spawn(TaskKind::TestRunner, "benchmark", None, async move {
let node = Node::new(updateable_config).expect("Restate node must build");
let node = Node::create(updateable_config).await.expect("Restate node must build");
cloned_tc.run_in_scope("startup", None, node.start()).await
})
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod metric_definitions;
pub mod network;
mod task_center;
mod task_center_types;
pub mod worker_api;

pub use metadata::{
spawn_metadata_manager, Metadata, MetadataKind, MetadataManager, MetadataWriter, SyncError,
Expand Down
13 changes: 13 additions & 0 deletions crates/core/src/worker_api/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod partition_processor_manager;

pub use partition_processor_manager::*;
37 changes: 37 additions & 0 deletions crates/core/src/worker_api/partition_processor_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use restate_types::identifiers::PartitionId;
use tokio::sync::{mpsc, oneshot};

use crate::ShutdownError;

#[derive(Debug)]
pub enum ProcessorsManagerCommand {
GetLivePartitions(oneshot::Sender<Vec<PartitionId>>),
}

#[derive(Debug, Clone)]
pub struct ProcessorsManagerHandle(mpsc::Sender<ProcessorsManagerCommand>);

impl ProcessorsManagerHandle {
pub fn new(sender: mpsc::Sender<ProcessorsManagerCommand>) -> Self {
Self(sender)
}

pub async fn get_live_partitions(&self) -> Result<Vec<PartitionId>, ShutdownError> {
let (tx, rx) = oneshot::channel();
self.0
.send(ProcessorsManagerCommand::GetLivePartitions(tx))
.await
.unwrap();
rx.await.map_err(|_| ShutdownError)
}
}
26 changes: 15 additions & 11 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub struct Node {
}

impl Node {
pub fn new(updateable_config: UpdateableConfiguration) -> Result<Self, BuildError> {
pub async fn create(updateable_config: UpdateableConfiguration) -> Result<Self, BuildError> {
let config = updateable_config.pinned();
// ensure we have cluster admin role if bootstrapping.
if config.common.allow_bootstrap {
Expand Down Expand Up @@ -140,7 +140,7 @@ impl Node {
metadata_manager.register_in_message_router(&mut router_builder);
let metadata = metadata_manager.metadata();
let updating_schema_information = metadata.schema_updateable();
let bifrost = BifrostService::new(metadata);
let bifrost = BifrostService::new(metadata.clone());

let admin_role = if config.has_role(Role::Admin) {
Some(AdminRole::new(
Expand All @@ -153,14 +153,18 @@ impl Node {
};

let worker_role = if config.has_role(Role::Worker) {
Some(WorkerRole::new(
updateable_config.clone(),
&mut router_builder,
networking.clone(),
bifrost.handle(),
metadata_store_client,
updating_schema_information,
)?)
Some(
WorkerRole::create(
metadata,
updateable_config.clone(),
&mut router_builder,
networking.clone(),
bifrost.handle(),
metadata_store_client,
updating_schema_information,
)
.await?,
)
} else {
None
};
Expand Down Expand Up @@ -321,7 +325,7 @@ impl Node {
TaskKind::SystemBoot,
"worker-init",
None,
worker_role.start(bifrost),
worker_role.start(),
)?;
}

Expand Down
66 changes: 12 additions & 54 deletions crates/node/src/roles/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,19 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::time::Duration;

use codederror::CodedError;
use tracing::info;

use restate_bifrost::Bifrost;
use restate_core::network::MessageRouterBuilder;
use restate_core::{cancellation_watcher, metadata, task_center};
use restate_core::{cancellation_watcher, metadata, task_center, Metadata};
use restate_core::{ShutdownError, TaskKind};
use restate_grpc_util::create_grpc_channel_from_advertised_address;
use restate_metadata_store::MetadataStoreClient;
use restate_network::Networking;
use restate_node_protocol::metadata::MetadataKind;
use restate_node_services::cluster_ctrl::cluster_ctrl_svc_client::ClusterCtrlSvcClient;
use restate_node_services::cluster_ctrl::AttachmentRequest;
use restate_schema::UpdateableSchema;
use restate_schema_api::subscription::SubscriptionResolver;
use restate_storage_query_datafusion::context::QueryContext;
use restate_types::config::UpdateableConfiguration;
use restate_types::net::AdvertisedAddress;
use restate_types::retries::RetryPolicy;
use restate_types::Version;
use restate_worker::SubscriptionController;
use restate_worker::{SubscriptionControllerHandle, Worker};
Expand All @@ -41,12 +33,6 @@ pub enum WorkerRoleError {
#[code]
restate_worker::Error,
),
#[error("invalid cluster controller address: {0}")]
#[code(unknown)]
InvalidClusterControllerAddress(http::Error),
#[error("failed to attach to cluster at '{0}': {1}")]
#[code(unknown)]
Attachment(AdvertisedAddress, tonic::Status),
#[error(transparent)]
#[code(unknown)]
Shutdown(#[from] ShutdownError),
Expand Down Expand Up @@ -77,22 +63,25 @@ pub struct WorkerRole {
}

impl WorkerRole {
pub fn new(
pub async fn create(
metadata: Metadata,
updateable_config: UpdateableConfiguration,
router_builder: &mut MessageRouterBuilder,
networking: Networking,
bifrost: Bifrost,
metadata_store_client: MetadataStoreClient,
updating_schema_information: UpdateableSchema,
) -> Result<Self, WorkerRoleBuildError> {
let worker = Worker::from_options(
let worker = Worker::create(
updateable_config,
metadata,
networking,
bifrost,
router_builder,
updating_schema_information,
metadata_store_client,
)?;
)
.await?;

Ok(WorkerRole { worker })
}
Expand All @@ -105,54 +94,23 @@ impl WorkerRole {
Some(self.worker.subscription_controller_handle())
}

pub async fn start(self, bifrost: Bifrost) -> anyhow::Result<()> {
let admin_address = metadata()
.nodes_config()
.get_admin_node()
.expect("at least one admin node")
.address
.clone();

pub async fn start(self) -> anyhow::Result<()> {
let tc = task_center();
// todo: only run subscriptions on node 0 once being distributed
task_center().spawn_child(
tc.spawn_child(
TaskKind::MetadataBackgroundSync,
"subscription_controller",
None,
Self::watch_subscriptions(self.worker.subscription_controller_handle()),
)?;

task_center().spawn_child(TaskKind::RoleRunner, "worker-service", None, async {
Self::attach_node(admin_address).await?;
self.worker.run(bifrost).await
tc.spawn_child(TaskKind::RoleRunner, "worker-service", None, async {
self.worker.run().await
})?;

Ok(())
}

async fn attach_node(admin_address: AdvertisedAddress) -> Result<(), WorkerRoleError> {
info!("Worker attaching to admin at '{admin_address}'");

let channel = create_grpc_channel_from_advertised_address(admin_address.clone())
.map_err(WorkerRoleError::InvalidClusterControllerAddress)?;

let cc_client = ClusterCtrlSvcClient::new(channel);

let _response = RetryPolicy::exponential(Duration::from_millis(50), 2.0, Some(10), None)
.retry(|| async {
cc_client
.clone()
.attach_node(AttachmentRequest {
node_id: Some(metadata().my_node_id().into()),
})
.await
})
.await
.map_err(|err| WorkerRoleError::Attachment(admin_address, err))?
.into_inner();

Ok(())
}

async fn watch_subscriptions<SC>(subscription_controller: SC) -> anyhow::Result<()>
where
SC: SubscriptionController + Clone + Send + Sync,
Expand Down
36 changes: 11 additions & 25 deletions crates/storage-query-datafusion/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::prelude::{SessionConfig, SessionContext};

use restate_invoker_api::StatusHandle;
use restate_partition_store::PartitionStore;
use restate_partition_store::{PartitionStore, PartitionStoreManager};
use restate_schema_api::deployment::DeploymentResolver;
use restate_schema_api::service::ServiceMetadataResolver;
use restate_types::config::QueryEngineOptions;
Expand Down Expand Up @@ -77,15 +77,10 @@ pub struct QueryContext {
datafusion_context: SessionContext,
}

impl Default for QueryContext {
fn default() -> Self {
QueryContext::new(None, None, None)
}
}

impl QueryContext {
pub fn from_options(
pub async fn create(
options: &QueryEngineOptions,
_partition_store_manager: PartitionStoreManager,
rocksdb: PartitionStore,
status: impl StatusHandle + Send + Sync + Debug + Clone + 'static,
schemas: impl DeploymentResolver
Expand All @@ -97,7 +92,7 @@ impl QueryContext {
+ 'static,
) -> Result<QueryContext, BuildError> {
let ctx = QueryContext::new(
options.memory_size(),
options.memory_size.get(),
options.tmp_dir.clone(),
options.query_parallelism(),
);
Expand All @@ -111,33 +106,24 @@ impl QueryContext {
crate::service::register_self(&ctx, schemas)?;
crate::idempotency::register_self(&ctx, rocksdb)?;

// todo: Fix me
// we need this now because we can't make new async.
// i'm ashamed!
let ctx = futures::executor::block_on(async move {
let ctx = ctx;
ctx.datafusion_context
.sql(SYS_INVOCATION_VIEW)
.await
.map(|_| ctx)
})?;
let ctx = ctx
.datafusion_context
.sql(SYS_INVOCATION_VIEW)
.await
.map(|_| ctx)?;

Ok(ctx)
}

fn new(
memory_limit: Option<usize>,
memory_limit: usize,
temp_folder: Option<String>,
default_parallelism: Option<usize>,
) -> Self {
//
// build the runtime
//
let mut runtime_config = RuntimeConfig::default();
runtime_config = runtime_config.with_memory_limit(4 * 1024 * 1024 * 1024, 1.0);
if let Some(limit) = memory_limit {
runtime_config = runtime_config.with_memory_limit(limit, 1.0);
}
let mut runtime_config = RuntimeConfig::default().with_memory_limit(memory_limit, 1.0);
if let Some(folder) = temp_folder {
runtime_config = runtime_config.with_temp_file_path(folder);
}
Expand Down
11 changes: 9 additions & 2 deletions crates/storage-query-datafusion/src/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,15 @@ impl MockQueryEngine {

Self(
rocksdb.clone(),
QueryContext::from_options(&QueryEngineOptions::default(), rocksdb, status, schemas)
.unwrap(),
QueryContext::create(
&QueryEngineOptions::default(),
manager,
rocksdb,
status,
schemas,
)
.await
.unwrap(),
)
}

Expand Down
12 changes: 4 additions & 8 deletions crates/types/src/config/query_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ pub struct QueryEngineOptions {
/// # Memory size limit
///
/// The total memory in bytes that can be used to preform sql queries
#[cfg_attr(feature = "schemars", schemars(with = "Option<NonZeroByteCount>"))]
#[serde_as(as = "Option<NonZeroByteCount>")]
pub memory_size: Option<NonZeroUsize>,
#[cfg_attr(feature = "schemars", schemars(with = "NonZeroByteCount"))]
#[serde_as(as = "NonZeroByteCount")]
pub memory_size: NonZeroUsize,

/// # Temp folder to use for spill
///
Expand All @@ -51,15 +51,11 @@ impl QueryEngineOptions {
pub fn query_parallelism(&self) -> Option<usize> {
self.query_parallelism.map(Into::into)
}

pub fn memory_size(&self) -> Option<usize> {
self.memory_size.map(Into::into)
}
}
impl Default for QueryEngineOptions {
fn default() -> Self {
Self {
memory_size: None,
memory_size: NonZeroUsize::new(4_000_000_000).unwrap(), // 4GB
tmp_dir: None,
query_parallelism: None,
pgsql_bind_address: "0.0.0.0:9071".parse().unwrap(),
Expand Down
Loading

0 comments on commit 70d9b11

Please sign in to comment.