Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Invoker concurrency quota #548

Merged
merged 8 commits into from
Jul 3, 2023
233 changes: 211 additions & 22 deletions src/invoker/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,10 @@ where
fn handle_register_partition(
&mut self,
partition: PartitionLeaderEpoch,
sender: mpsc::Sender<Effect>
sender: mpsc::Sender<Effect>,
) {
self.invocation_state_machines_tree.register_partition(partition, sender);
self.invocation_state_machines_tree
.register_partition(partition, sender);
}

#[instrument(
Expand Down Expand Up @@ -897,23 +898,26 @@ where
mod tests {
use super::*;

use std::future::pending;
use crate::service::invocation_task::InvocationTaskError;
use bytes::Bytes;
use quota::InvokerConcurrencyQuota;
use restate_common::types::{EnrichedEntryHeader, InvocationId, RawEntry};
use restate_service_metadata::{DeliveryOptions, InMemoryServiceEndpointRegistry};
use restate_test_util::{check, let_assert, test};
use crate::service::invocation_task::InvocationTaskError;
use quota::InvokerConcurrencyQuota;
use std::future::{pending, ready};
use tempfile::tempdir;

// -- Mocks

const MOCK_PARTITION: PartitionLeaderEpoch = (0, 0);

impl<SER, ITR> ServiceInner<SER, ITR> {
fn mock(service_endpoint_registry: SER,
invocation_task_runner: ITR,
default_retry_policy: RetryPolicy,
concurrency_limit: Option<usize>) -> (mpsc::UnboundedSender<InputCommand>, Self) {
fn mock(
service_endpoint_registry: SER,
invocation_task_runner: ITR,
default_retry_policy: RetryPolicy,
concurrency_limit: Option<usize>,
) -> (mpsc::UnboundedSender<InputCommand>, Self) {
let (input_tx, input_rx) = mpsc::unbounded_channel();
let (invocation_tasks_tx, invocation_tasks_rx) = mpsc::unbounded_channel();

Expand All @@ -933,25 +937,47 @@ mod tests {
(input_tx, service_inner)
}

fn register_mock_partition(&mut self) -> mpsc::Receiver<Effect> where
fn register_mock_partition(&mut self) -> mpsc::Receiver<Effect>
where
SER: ServiceEndpointRegistry,
ITR: InvocationTaskRunner {
ITR: InvocationTaskRunner,
{
let (partition_tx, partition_rx) = mpsc::channel(1024);
self.handle_register_partition(MOCK_PARTITION, partition_tx);
partition_rx
}
}

impl<F, Fut> InvocationTaskRunner for F where F: Fn(
PartitionLeaderEpoch,
ServiceInvocationId,
EndpointMetadata,
mpsc::UnboundedSender<InvocationTaskOutput>,
Option<mpsc::UnboundedReceiver<Completion>>,
InvokeInputJournal,
) -> Fut, Fut: Future<Output=()> + Send + 'static {
fn start_invocation_task(&self, partition: PartitionLeaderEpoch, sid: ServiceInvocationId, endpoint_metadata: EndpointMetadata, invoker_tx: mpsc::UnboundedSender<InvocationTaskOutput>, invoker_rx: Option<mpsc::UnboundedReceiver<Completion>>, input_journal: InvokeInputJournal) -> BoxFuture<'static, ()> {
(*self)(partition, sid, endpoint_metadata, invoker_tx, invoker_rx, input_journal).boxed()
impl<F, Fut> InvocationTaskRunner for F
where
F: Fn(
PartitionLeaderEpoch,
ServiceInvocationId,
EndpointMetadata,
mpsc::UnboundedSender<InvocationTaskOutput>,
Option<mpsc::UnboundedReceiver<Completion>>,
InvokeInputJournal,
) -> Fut,
Fut: Future<Output = ()> + Send + 'static,
{
fn start_invocation_task(
&self,
partition: PartitionLeaderEpoch,
sid: ServiceInvocationId,
endpoint_metadata: EndpointMetadata,
invoker_tx: mpsc::UnboundedSender<InvocationTaskOutput>,
invoker_rx: Option<mpsc::UnboundedReceiver<Completion>>,
input_journal: InvokeInputJournal,
) -> BoxFuture<'static, ()> {
(*self)(
partition,
sid,
endpoint_metadata,
invoker_tx,
invoker_rx,
input_journal,
)
.boxed()
}
}

Expand All @@ -961,7 +987,14 @@ mod tests {

fn mock_endpoint_registry() -> InMemoryServiceEndpointRegistry {
let mut in_memory_registry = InMemoryServiceEndpointRegistry::default();
in_memory_registry.register_service_endpoint("MyService", EndpointMetadata::new("http://localhost:8080".parse().unwrap(), ProtocolType::BidiStream, DeliveryOptions::default()));
in_memory_registry.register_service_endpoint(
"MyService",
EndpointMetadata::new(
"http://localhost:8080".parse().unwrap(),
ProtocolType::BidiStream,
DeliveryOptions::default(),
),
);
in_memory_registry
}

Expand Down Expand Up @@ -1019,4 +1052,160 @@ mod tests {
signal.drain().await;
invoker_join_handle.await.unwrap();
}

#[test(tokio::test)]
async fn quota_allows_one_concurrent_invocation() {
slinkydeveloper marked this conversation as resolved.
Show resolved Hide resolved
let mut segment_queue = SegmentQueue::new(tempdir().unwrap().into_path(), 1024);
let (_signal, watch) = drain::channel();
let shutdown = watch.signaled();
tokio::pin!(shutdown);

let sid_1 = mock_sid();
let sid_2 = mock_sid();

let (_invoker_tx, mut service_inner) = ServiceInner::mock(
mock_endpoint_registry(),
|_, _, _, _, _, _| ready(()),
Default::default(),
Some(1),
);
let _ = service_inner.register_mock_partition();

// Enqueue sid_1 and sid_2
segment_queue
.enqueue(InvokeCommand {
partition: MOCK_PARTITION,
service_invocation_id: sid_1.clone(),
journal: InvokeInputJournal::NoCachedJournal,
})
.await;
segment_queue
.enqueue(InvokeCommand {
partition: MOCK_PARTITION,
service_invocation_id: sid_2.clone(),
journal: InvokeInputJournal::NoCachedJournal,
})
.await;

// Now step the state machine to start the invocation
assert!(
service_inner
.step(&mut segment_queue, shutdown.as_mut())
.await
);

// Check status and quota
assert!(service_inner
.status_store
.resolve_invocation(MOCK_PARTITION, &sid_1)
.unwrap()
.in_flight());
assert!(!service_inner.quota.is_slot_available());

// Step again to remove sid_1 from task queue. This should not invoke sid_2!
assert!(
service_inner
.step(&mut segment_queue, shutdown.as_mut())
.await
);
assert!(service_inner
.status_store
.resolve_invocation(MOCK_PARTITION, &sid_2)
.is_none());
assert!(!service_inner.quota.is_slot_available());

// Send the close signal
service_inner
.handle_invocation_task_closed(MOCK_PARTITION, sid_1.clone())
.await;

// Slot should be available again
assert!(service_inner.quota.is_slot_available());

// Step now should invoke sid_2
assert!(
service_inner
.step(&mut segment_queue, shutdown.as_mut())
.await
);
assert!(service_inner
.status_store
.resolve_invocation(MOCK_PARTITION, &sid_1)
.is_none());
assert!(service_inner
.status_store
.resolve_invocation(MOCK_PARTITION, &sid_2)
.unwrap()
.in_flight());
assert!(!service_inner.quota.is_slot_available());
}

#[test(tokio::test)]
async fn reclaim_quota_after_abort() {
let sid = mock_sid();

let (_, mut service_inner) = ServiceInner::mock(
mock_endpoint_registry(),
|partition,
service_invocation_id,
_,
invoker_tx: mpsc::UnboundedSender<InvocationTaskOutput>,
_,
_| {
let _ = invoker_tx.send(InvocationTaskOutput {
partition,
service_invocation_id,
inner: InvocationTaskOutputInner::NewEntry {
entry_index: 1,
entry: RawEntry {
header: EnrichedEntryHeader::SetState,
entry: Default::default(),
},
},
});
pending() // Never ends
},
Default::default(),
Some(2),
);
let _ = service_inner.register_mock_partition();

// Invoke the service
service_inner
.handle_invoke(
MOCK_PARTITION,
sid.clone(),
InvokeInputJournal::NoCachedJournal,
)
.await;

// We should receive the new entry here
let invoker_effect = service_inner.invocation_tasks_rx.recv().await.unwrap();
assert_eq!(invoker_effect.service_invocation_id, sid);
check!(let InvocationTaskOutputInner::NewEntry { .. } = invoker_effect.inner);

// Check the quota
let_assert!(InvokerConcurrencyQuota::Limited { available_slots } = &service_inner.quota);
assert_eq!(*available_slots, 1);

// Abort the invocation
service_inner.handle_abort_invocation(MOCK_PARTITION, sid.clone());

// Check the quota
let_assert!(InvokerConcurrencyQuota::Limited { available_slots } = &service_inner.quota);
assert_eq!(*available_slots, 2);

// Handle error coming after the abort (this should be noop)
service_inner
.handle_invocation_task_failed(
MOCK_PARTITION,
sid.clone(),
InvocationTaskError::TooManyTerminalMessages, /* any error is fine */
)
.await;

// Check the quota, should not be changed
let_assert!(InvokerConcurrencyQuota::Limited { available_slots } = &service_inner.quota);
assert_eq!(*available_slots, 2);
}
}
19 changes: 19 additions & 0 deletions src/invoker/src/service/status_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,22 @@ impl InvocationStatusStore {
report.last_retry_attempt_failure = Some(reason.into());
}
}

#[cfg(test)]
mod tests {
use super::*;

impl InvocationStatusStore {
pub(in crate::service) fn resolve_invocation(
&self,
partition: PartitionLeaderEpoch,
sid: &ServiceInvocationId,
) -> Option<InvocationStatusReport> {
self.0.get(&partition).and_then(|inner| {
inner
.get(sid)
.map(|report| InvocationStatusReport(sid.clone(), partition, report.clone()))
})
}
}
}