Skip to content

Commit

Permalink
Introduce graceful cancellation of invocations
Browse files Browse the repository at this point in the history
This commit introduces a new TerminationFlavor::Cancel which will
gracefully terminate an invocation. The way it works is that only
leaf operations are being terminated with a CANCELED_INVOCATION_ERROR.
Leaf operations are all completable journal entries except for calls
and inboxed invocations. For pending calls in a journal, a cancellation
will propagate the cancellation to these calls.

This commit adds support for canceling inboxed, invoked, suspended and
virtual invocations.

This fixes restatedev#986 and restatedev#943.
  • Loading branch information
tillrohrmann committed Dec 18, 2023
1 parent c3974e6 commit 5988356
Show file tree
Hide file tree
Showing 13 changed files with 891 additions and 199 deletions.
8 changes: 5 additions & 3 deletions crates/storage-api/src/outbox_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

use crate::{GetFuture, PutFuture};
use restate_types::identifiers::{FullInvocationId, IngressDispatcherId, PartitionId};
use restate_types::invocation::{InvocationResponse, ResponseResult, ServiceInvocation};
use restate_types::invocation::{
InvocationResponse, InvocationTermination, ResponseResult, ServiceInvocation,
};
use std::ops::Range;

/// Types of outbox messages.
Expand All @@ -29,8 +31,8 @@ pub enum OutboxMessage {
response: ResponseResult,
},

/// Kill command to send to another partition processor
Kill(FullInvocationId),
/// Terminate invocation to send to another partition processor
InvocationTermination(InvocationTermination),
}

pub trait OutboxTable {
Expand Down
19 changes: 14 additions & 5 deletions crates/storage-proto/proto/dev/restate/storage/v1/domain.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ message FullInvocationId {
bytes invocation_uuid = 3;
}

message MaybeFullInvocationId {
oneof kind {
FullInvocationId full_invocation_id = 1;
bytes invocation_id = 2;
}
}

// ---------------------------------------------------------------------
// Service Invocation
// ---------------------------------------------------------------------
Expand Down Expand Up @@ -283,10 +290,7 @@ message OutboxMessage {
}

message OutboxServiceInvocationResponse {
oneof id {
FullInvocationId full_invocation_id = 1;
bytes invocation_id = 4;
}
MaybeFullInvocationId maybe_fid = 1;
uint32 entry_index = 2;
ResponseResult response_result = 3;
}
Expand All @@ -298,14 +302,19 @@ message OutboxMessage {
}

message OutboxKill {
FullInvocationId full_invocation_id = 1;
MaybeFullInvocationId maybe_full_invocation_id = 1;
}

message OutboxCancel {
MaybeFullInvocationId maybe_full_invocation_id = 1;
}

oneof outbox_message {
OutboxServiceInvocation service_invocation_case = 1;
OutboxServiceInvocationResponse service_invocation_response = 2;
OutboxIngressResponse ingress_response = 3;
OutboxKill kill = 4;
OutboxCancel cancel = 5;
}

}
Expand Down
146 changes: 96 additions & 50 deletions crates/storage-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,27 @@ pub mod storage {
completion_result, CompletionResult, Entry, Kind,
};
use crate::storage::v1::outbox_message::{
OutboxIngressResponse, OutboxKill, OutboxServiceInvocation,
OutboxCancel, OutboxIngressResponse, OutboxKill, OutboxServiceInvocation,
OutboxServiceInvocationResponse,
};
use crate::storage::v1::service_invocation_response_sink::{
Ingress, NewInvocation, PartitionProcessor, ResponseSink,
};
use crate::storage::v1::{
enriched_entry_header, invocation_resolution_result, invocation_status,
outbox_message, outbox_message::outbox_service_invocation_response,
response_result, span_relation, timer, BackgroundCallResolutionResult,
EnrichedEntryHeader, FullInvocationId, InboxEntry, InvocationResolutionResult,
InvocationStatus, JournalEntry, JournalMeta, OutboxMessage, ResponseResult,
SequencedTimer, ServiceInvocation, ServiceInvocationResponseSink, SpanContext,
SpanRelation, Timer,
maybe_full_invocation_id, outbox_message, response_result, span_relation, timer,
BackgroundCallResolutionResult, EnrichedEntryHeader, FullInvocationId, InboxEntry,
InvocationResolutionResult, InvocationStatus, JournalEntry, JournalMeta,
MaybeFullInvocationId, OutboxMessage, ResponseResult, SequencedTimer,
ServiceInvocation, ServiceInvocationResponseSink, SpanContext, SpanRelation, Timer,
};
use anyhow::anyhow;
use bytes::{Buf, Bytes};
use bytestring::ByteString;
use opentelemetry_api::trace::TraceState;
use restate_storage_api::StorageError;
use restate_types::identifiers::ServiceId;
use restate_types::invocation::MaybeFullInvocationId;
use restate_types::invocation::{InvocationTermination, TerminationFlavor};
use restate_types::journal::enriched::AwakeableEnrichmentResult;
use restate_types::time::MillisSinceEpoch;
use std::collections::{HashSet, VecDeque};
Expand Down Expand Up @@ -589,6 +588,49 @@ pub mod storage {
}
}

impl TryFrom<MaybeFullInvocationId> for restate_types::invocation::MaybeFullInvocationId {
type Error = ConversionError;

fn try_from(value: MaybeFullInvocationId) -> Result<Self, Self::Error> {
match value.kind.ok_or(ConversionError::missing_field("kind"))? {
maybe_full_invocation_id::Kind::FullInvocationId(fid) => {
Ok(restate_types::invocation::MaybeFullInvocationId::Full(
restate_types::identifiers::FullInvocationId::try_from(fid)?,
))
}
maybe_full_invocation_id::Kind::InvocationId(invocation_id) => {
Ok(restate_types::invocation::MaybeFullInvocationId::Partial(
restate_types::identifiers::InvocationId::from_slice(
&invocation_id,
)
.map_err(|e| ConversionError::invalid_data(e))?,
))
}
}
}
}

impl From<restate_types::invocation::MaybeFullInvocationId> for MaybeFullInvocationId {
fn from(value: restate_types::invocation::MaybeFullInvocationId) -> Self {
match value {
restate_types::invocation::MaybeFullInvocationId::Full(fid) => {
MaybeFullInvocationId {
kind: Some(maybe_full_invocation_id::Kind::FullInvocationId(
FullInvocationId::from(fid),
)),
}
}
restate_types::invocation::MaybeFullInvocationId::Partial(
invocation_id,
) => MaybeFullInvocationId {
kind: Some(maybe_full_invocation_id::Kind::InvocationId(
Bytes::copy_from_slice(&invocation_id.as_bytes()),
)),
},
}
}
}

fn try_bytes_into_invocation_uuid(
bytes: Bytes,
) -> Result<restate_types::identifiers::InvocationUuid, ConversionError> {
Expand Down Expand Up @@ -1248,26 +1290,10 @@ pub mod storage {
) => restate_storage_api::outbox_table::OutboxMessage::ServiceResponse(
restate_types::invocation::InvocationResponse {
entry_index: invocation_response.entry_index,
id: match invocation_response
.id
.ok_or(ConversionError::missing_field("id"))?
{
outbox_service_invocation_response::Id::FullInvocationId(
fid,
) => MaybeFullInvocationId::Full(
restate_types::identifiers::FullInvocationId::try_from(
fid,
)?,
),
outbox_service_invocation_response::Id::InvocationId(
invocation_id_bytes,
) => MaybeFullInvocationId::Partial(
restate_types::identifiers::InvocationId::from_slice(
&invocation_id_bytes,
)
.map_err(ConversionError::invalid_data)?,
),
},
id: invocation_response
.maybe_fid
.ok_or(ConversionError::missing_field("maybe_fid"))?
.try_into()?,
result: restate_types::invocation::ResponseResult::try_from(
invocation_response
.response_result
Expand All @@ -1294,11 +1320,27 @@ pub mod storage {
}
}
outbox_message::OutboxMessage::Kill(outbox_kill) => {
let fid = outbox_kill
.full_invocation_id
.ok_or(ConversionError::missing_field("full_invocation_id"))?;
restate_storage_api::outbox_table::OutboxMessage::Kill(
restate_types::identifiers::FullInvocationId::try_from(fid)?,
let maybe_fid = outbox_kill.maybe_full_invocation_id.ok_or(
ConversionError::missing_field("maybe_full_invocation_id"),
)?;
restate_storage_api::outbox_table::OutboxMessage::InvocationTermination(
InvocationTermination::kill(
restate_types::invocation::MaybeFullInvocationId::try_from(
maybe_fid,
)?,
),
)
}
outbox_message::OutboxMessage::Cancel(outbox_cancel) => {
let maybe_fid = outbox_cancel.maybe_full_invocation_id.ok_or(
ConversionError::missing_field("maybe_full_invocation_id"),
)?;
restate_storage_api::outbox_table::OutboxMessage::InvocationTermination(
InvocationTermination::cancel(
restate_types::invocation::MaybeFullInvocationId::try_from(
maybe_fid,
)?,
),
)
}
};
Expand All @@ -1324,18 +1366,9 @@ pub mod storage {
) => outbox_message::OutboxMessage::ServiceInvocationResponse(
OutboxServiceInvocationResponse {
entry_index: invocation_response.entry_index,
id: Some(match invocation_response.id {
MaybeFullInvocationId::Partial(iid) => {
outbox_service_invocation_response::Id::InvocationId(
Bytes::copy_from_slice(&iid.as_bytes()),
)
}
MaybeFullInvocationId::Full(fid) => {
outbox_service_invocation_response::Id::FullInvocationId(
FullInvocationId::from(fid),
)
}
}),
maybe_fid: Some(MaybeFullInvocationId::from(
invocation_response.id,
)),
response_result: Some(ResponseResult::from(
invocation_response.result,
)),
Expand All @@ -1354,11 +1387,24 @@ pub mod storage {
response_result: Some(ResponseResult::from(response)),
})
}
restate_storage_api::outbox_table::OutboxMessage::Kill(fid) => {
outbox_message::OutboxMessage::Kill(OutboxKill {
full_invocation_id: Some(FullInvocationId::from(fid)),
})
}
restate_storage_api::outbox_table::OutboxMessage::InvocationTermination(
invocation_termination,
) => match invocation_termination.flavor {
TerminationFlavor::Kill => {
outbox_message::OutboxMessage::Kill(OutboxKill {
maybe_full_invocation_id: Some(MaybeFullInvocationId::from(
invocation_termination.maybe_fid,
)),
})
}
TerminationFlavor::Cancel => {
outbox_message::OutboxMessage::Cancel(OutboxCancel {
maybe_full_invocation_id: Some(MaybeFullInvocationId::from(
invocation_termination.maybe_fid,
)),
})
}
},
};

OutboxMessage {
Expand Down
7 changes: 7 additions & 0 deletions crates/types/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,13 @@ pub const KILLED_INVOCATION_ERROR: InvocationError = InvocationError::new_static
"killed",
);

// TODO: Once we want to distinguish server side cancellations from user code returning the
// UserErrorCode::Cancelled, we need to add a new RestateErrorCode.
pub const CANCELED_INVOCATION_ERROR: InvocationError = InvocationError::new_static(
InvocationErrorCode::User(UserErrorCode::Cancelled),
"canceled",
);

#[cfg(feature = "tonic_conversions")]
mod tonic_conversions_impl {
use super::{InvocationError, InvocationErrorCode};
Expand Down
32 changes: 32 additions & 0 deletions crates/types/src/invocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,38 @@ impl SpanRelation {
}
}

/// Message to terminate an invocation.
#[derive(Debug, Clone, PartialEq)]
pub struct InvocationTermination {
pub maybe_fid: MaybeFullInvocationId,
pub flavor: TerminationFlavor,
}

impl InvocationTermination {
pub fn kill(maybe_fid: impl Into<MaybeFullInvocationId>) -> Self {
Self {
maybe_fid: maybe_fid.into(),
flavor: TerminationFlavor::Kill,
}
}

pub fn cancel(maybe_fid: impl Into<MaybeFullInvocationId>) -> Self {
Self {
maybe_fid: maybe_fid.into(),
flavor: TerminationFlavor::Cancel,
}
}
}

/// Flavor of the termination. Can be kill (hard stop) or graceful cancel.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum TerminationFlavor {
/// hard termination, no clean up
Kill,
/// graceful termination allowing the invocation to clean up
Cancel,
}

#[cfg(any(test, feature = "mocks"))]
mod mocks {
use super::*;
Expand Down
18 changes: 10 additions & 8 deletions crates/worker/src/network_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ mod shuffle_integration {
use restate_types::errors::InvocationError;
use restate_types::identifiers::WithPartitionKey;
use restate_types::identifiers::{PartitionId, PartitionKey, PeerId};
use restate_types::invocation::{MaybeFullInvocationId, ResponseResult};
use restate_types::invocation::ResponseResult;
use restate_types::message::MessageIndex;

#[derive(Debug)]
Expand All @@ -159,7 +159,9 @@ mod shuffle_integration {
shuffle::PartitionProcessorMessage::Response(response) => {
response.id.partition_key()
}
PartitionProcessorMessage::Kill(fid) => fid.partition_key(),
PartitionProcessorMessage::InvocationTermination(invocation_termination) => {
invocation_termination.maybe_fid.partition_key()
}
}
}
}
Expand Down Expand Up @@ -192,12 +194,12 @@ mod shuffle_integration {
deduplication_source,
)
}
shuffle::PartitionProcessorMessage::Kill(fid) => {
partition::StateMachineAckCommand::dedup(
partition::StateMachineCommand::Kill(MaybeFullInvocationId::from(fid)),
deduplication_source,
)
}
shuffle::PartitionProcessorMessage::InvocationTermination(
invocation_termination,
) => partition::StateMachineAckCommand::dedup(
partition::StateMachineCommand::TerminateInvocation(invocation_termination),
deduplication_source,
),
}
}
}
Expand Down
12 changes: 8 additions & 4 deletions crates/worker/src/partition/shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use crate::partition::shuffle::state_machine::StateMachine;
use futures::future::BoxFuture;
use restate_storage_api::outbox_table::OutboxMessage;
use restate_types::identifiers::{FullInvocationId, IngressDispatcherId, PartitionId, PeerId};
use restate_types::invocation::{InvocationResponse, ResponseResult, ServiceInvocation};
use restate_types::invocation::{
InvocationResponse, InvocationTermination, ResponseResult, ServiceInvocation,
};
use restate_types::message::{AckKind, MessageIndex};
use std::time::Duration;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -53,7 +55,7 @@ pub(crate) struct ShuffleInput(pub(crate) AckKind);
pub(crate) enum PartitionProcessorMessage {
Invocation(ServiceInvocation),
Response(InvocationResponse),
Kill(FullInvocationId),
InvocationTermination(InvocationTermination),
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -126,8 +128,10 @@ impl From<OutboxMessage> for ShuffleMessageDestination {
PartitionProcessorMessage::Invocation(invocation),
)
}
OutboxMessage::Kill(fid) => {
ShuffleMessageDestination::PartitionProcessor(PartitionProcessorMessage::Kill(fid))
OutboxMessage::InvocationTermination(invocation_termination) => {
ShuffleMessageDestination::PartitionProcessor(
PartitionProcessorMessage::InvocationTermination(invocation_termination),
)
}
}
}
Expand Down
Loading

0 comments on commit 5988356

Please sign in to comment.