Skip to content

Commit

Permalink
Invoker concurrency quota (#548)
Browse files Browse the repository at this point in the history
* Implement invoker quota
* Moved the actual invoker state machine logic back in `Service`. The goal of this commit is to make more manageable testing the state machine logic.
* Move the handle/input command definitions in a separate file, and simplify the structure.
* Add an interface to mock the logic to run the InvocationTask.
* Removed Codec from Invoker
* Add quota tests
* ism_manager -> invocation_state_machine_manager
  • Loading branch information
slinkydeveloper authored Jul 3, 2023
1 parent a92dd30 commit fc35ad5
Show file tree
Hide file tree
Showing 10 changed files with 1,401 additions and 1,071 deletions.
14 changes: 10 additions & 4 deletions src/invoker/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use futures::Stream;
use std::path::PathBuf;
use std::time::Duration;

use restate_common::journal::raw::{PlainRawEntry, RawEntryCodec};
use restate_common::journal::raw::PlainRawEntry;
use restate_common::retry_policy::RetryPolicy;
use restate_hyper_util::proxy_connector::Proxy;
use restate_service_metadata::ServiceEndpointRegistry;
Expand Down Expand Up @@ -97,6 +97,11 @@ pub struct Options {
)]
tmp_dir: PathBuf,

/// # Concurrency limit
///
/// Number of concurrent invocations that can be processed by the invoker.
concurrency_limit: Option<usize>,

#[cfg_attr(feature = "options_schema", schemars(skip))]
disable_eager_state: bool,
}
Expand All @@ -111,6 +116,7 @@ impl Default for Options {
message_size_limit: None,
proxy_uri: None,
tmp_dir: Options::default_tmp_dir(),
concurrency_limit: None,
disable_eager_state: false,
}
}
Expand Down Expand Up @@ -142,15 +148,14 @@ impl Options {
restate_fs_util::generate_temp_dir_name("invoker")
}

pub fn build<C, JR, JS, SR, EE, SER>(
pub fn build<JR, JS, SR, EE, SER>(
self,
journal_reader: JR,
state_reader: SR,
entry_enricher: EE,
service_endpoint_registry: SER,
) -> Service<C, JR, SR, EE, SER>
) -> Service<JR, SR, EE, SER>
where
C: RawEntryCodec,
JR: JournalReader<JournalStream = JS> + Clone + Send + Sync + 'static,
JS: Stream<Item = PlainRawEntry> + Unpin + Send + 'static,
EE: EntryEnricher,
Expand All @@ -166,6 +171,7 @@ impl Options {
self.message_size_limit,
self.proxy_uri,
self.tmp_dir,
self.concurrency_limit,
journal_reader,
state_reader,
entry_enricher,
Expand Down
196 changes: 196 additions & 0 deletions src/invoker/src/service/input_command.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
use crate::{
Effect, InvocationStatusReport, InvokeInputJournal, ServiceHandle, ServiceNotRunning,
StatusHandle,
};
use futures::future::BoxFuture;
use futures::FutureExt;
use restate_common::journal::Completion;
use restate_common::types::{EntryIndex, PartitionLeaderEpoch, ServiceInvocationId};
use tokio::sync::mpsc;

// -- Input messages

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub(crate) struct InvokeCommand {
pub(super) partition: PartitionLeaderEpoch,
pub(super) service_invocation_id: ServiceInvocationId,
#[serde(skip)]
pub(super) journal: InvokeInputJournal,
}

#[derive(Debug)]
pub(crate) enum InputCommand {
Invoke(InvokeCommand),
Completion {
partition: PartitionLeaderEpoch,
service_invocation_id: ServiceInvocationId,
completion: Completion,
},
StoredEntryAck {
partition: PartitionLeaderEpoch,
service_invocation_id: ServiceInvocationId,
entry_index: EntryIndex,
},

/// Abort specific invocation id
Abort {
partition: PartitionLeaderEpoch,
service_invocation_id: ServiceInvocationId,
},

/// Command used to clean up internal state when a partition leader is going away
AbortAllPartition {
partition: PartitionLeaderEpoch,
},

// needed for dynamic registration at Invoker
RegisterPartition {
partition: PartitionLeaderEpoch,
sender: mpsc::Sender<Effect>,
},

// Read status
ReadStatus(restate_futures_util::command::Command<(), Vec<InvocationStatusReport>>),
}

// -- Handles implementations. This is just glue code between the Input<Command> and the interfaces

#[derive(Debug, Clone)]
pub struct ChannelServiceHandle {
pub(super) input: mpsc::UnboundedSender<InputCommand>,
}

impl ServiceHandle for ChannelServiceHandle {
type Future = futures::future::Ready<Result<(), ServiceNotRunning>>;

fn invoke(
&mut self,
partition: PartitionLeaderEpoch,
service_invocation_id: ServiceInvocationId,
journal: InvokeInputJournal,
) -> Self::Future {
futures::future::ready(
self.input
.send(InputCommand::Invoke(InvokeCommand {
partition,
service_invocation_id,
journal,
}))
.map_err(|_| ServiceNotRunning),
)
}

fn resume(
&mut self,
partition: PartitionLeaderEpoch,
service_invocation_id: ServiceInvocationId,
journal: InvokeInputJournal,
) -> Self::Future {
futures::future::ready(
self.input
.send(InputCommand::Invoke(InvokeCommand {
partition,
service_invocation_id,
journal,
}))
.map_err(|_| ServiceNotRunning),
)
}

fn notify_completion(
&mut self,
partition: PartitionLeaderEpoch,
service_invocation_id: ServiceInvocationId,
completion: Completion,
) -> Self::Future {
futures::future::ready(
self.input
.send(InputCommand::Completion {
partition,
service_invocation_id,
completion,
})
.map_err(|_| ServiceNotRunning),
)
}

fn notify_stored_entry_ack(
&mut self,
partition: PartitionLeaderEpoch,
service_invocation_id: ServiceInvocationId,
entry_index: EntryIndex,
) -> Self::Future {
futures::future::ready(
self.input
.send(InputCommand::StoredEntryAck {
partition,
service_invocation_id,
entry_index,
})
.map_err(|_| ServiceNotRunning),
)
}

fn abort_all_partition(&mut self, partition: PartitionLeaderEpoch) -> Self::Future {
futures::future::ready(
self.input
.send(InputCommand::AbortAllPartition { partition })
.map_err(|_| ServiceNotRunning),
)
}

fn abort_invocation(
&mut self,
partition: PartitionLeaderEpoch,
service_invocation_id: ServiceInvocationId,
) -> Self::Future {
futures::future::ready(
self.input
.send(InputCommand::Abort {
partition,
service_invocation_id,
})
.map_err(|_| ServiceNotRunning),
)
}

fn register_partition(
&mut self,
partition: PartitionLeaderEpoch,
sender: mpsc::Sender<Effect>,
) -> Self::Future {
futures::future::ready(
self.input
.send(InputCommand::RegisterPartition { partition, sender })
.map_err(|_| ServiceNotRunning),
)
}
}

pub struct ChannelStatusReader(pub(super) mpsc::UnboundedSender<InputCommand>);

impl StatusHandle for ChannelStatusReader {
type Iterator = itertools::Either<
std::iter::Empty<InvocationStatusReport>,
std::vec::IntoIter<InvocationStatusReport>,
>;
type Future = BoxFuture<'static, Self::Iterator>;

fn read_status(&self) -> Self::Future {
let (cmd, rx) = restate_futures_util::command::Command::prepare(());
if self.0.send(InputCommand::ReadStatus(cmd)).is_err() {
return std::future::ready(itertools::Either::Left(std::iter::empty::<
InvocationStatusReport,
>()))
.boxed();
}
async move {
if let Ok(status_vec) = rx.await {
itertools::Either::Right(status_vec.into_iter())
} else {
itertools::Either::Left(std::iter::empty::<InvocationStatusReport>())
}
}
.boxed()
}
}
4 changes: 2 additions & 2 deletions src/invoker/src/service/invocation_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub(super) struct InvocationStateMachine {
///
/// Every time the invocation task generates a new entry, the index is notified to this struct with
/// [`JournalTracker::notify_entry_sent_to_partition_processor`], and every time the invoker receives
/// [`Command::StoredEntryAck`], the index is notified to this struct with [`JournalTracker::notify_acked_entry_from_partition_processor`].
/// [`InputCommand::StoredEntryAck`], the index is notified to this struct with [`JournalTracker::notify_acked_entry_from_partition_processor`].
///
/// After the retry timer is fired, we can check whether we can retry immediately or not with [`JournalTracker::can_retry`].
#[derive(Default, Debug, Copy, Clone)]
Expand Down Expand Up @@ -241,7 +241,7 @@ impl InvocationStateMachine {

#[cfg(test)]
mod tests {
use crate::service::invocation_state_machine::{InvocationState, InvocationStateMachine};
use super::*;
use restate_common::retry_policy::RetryPolicy;
use restate_test_util::{check, test};
use std::time::Duration;
Expand Down
Loading

0 comments on commit fc35ad5

Please sign in to comment.