Skip to content

Commit

Permalink
Record service protocol version and send to service endpoint
Browse files Browse the repository at this point in the history
This commit introduces the selection of a service protocol version when starting
the InvocationTask. The service protocol version is sent to the service endpoint
via the content type and accept headers in the form of vnd.restate.invocation.vX.
Additionally, we record the chosen service protocol version in the journal metadata
for sanity checks. Note, currently we don't support resuming an invocation that
was started on an older protocol version with a newer protocol version.

This fixes #1510.
  • Loading branch information
tillrohrmann committed May 16, 2024
1 parent 461a2be commit 259c61c
Show file tree
Hide file tree
Showing 31 changed files with 907 additions and 655 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions crates/admin/src/schema_registry/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,9 +456,10 @@ impl DiscoveredHandlerMetadata {
(ServiceType::Service, None | Some(endpoint_manifest::HandlerType::Shared)) => {
InvocationTargetType::Service
}
(ServiceType::VirtualObject, None | Some(endpoint_manifest::HandlerType::Exclusive)) => {
InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Exclusive)
}
(
ServiceType::VirtualObject,
None | Some(endpoint_manifest::HandlerType::Exclusive),
) => InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Exclusive),
(ServiceType::VirtualObject, Some(endpoint_manifest::HandlerType::Shared)) => {
InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Shared)
}
Expand Down
8 changes: 8 additions & 0 deletions crates/errors/src/error_codes/RT0013.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
## RT0013

The service endpoint does not support any of the supported service protocol versions of the server. Therefore, the server cannot talk to this endpoint. Please make sure that the service endpoint's SDK and the Restate server are compatible.

Suggestions:

* Register a service endpoint which uses an SDK which is compatible with the used server
* Upgrade the server to a version which is compatible with the used SDK
8 changes: 8 additions & 0 deletions crates/errors/src/error_codes/RT0014.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
## RT0014

The server cannot resume an in-flight invocation which has been started with a now incompatible service protocol version. Restate does not support upgrading service protocols yet.

Suggestions:

* Downgrade the server to a version which is compatible with the used service protocol version
* Kill the affected invocation via the CLI.
5 changes: 3 additions & 2 deletions crates/errors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ mod helper;
// META are meta related errors.

declare_restate_error_codes!(
RT0001, RT0002, RT0003, RT0004, RT0005, RT0006, RT0007, RT0009, RT0010, RT0011, RT0012,
META0003, META0004, META0005, META0006, META0009, META0010, META0011, META0012, META0013
RT0001, RT0002, RT0003, RT0004, RT0005, RT0006, RT0007, RT0009, RT0010, RT0011, RT0012, RT0013,
RT0014, META0003, META0004, META0005, META0006, META0009, META0010, META0011, META0012,
META0013
);

// -- Some commonly used errors
Expand Down
3 changes: 2 additions & 1 deletion crates/invoker-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ mocks = []
serde = ["dep:serde"]

[dependencies]
restate-types = { workspace = true }
restate-errors = { workspace = true }
restate-types = { workspace = true }


anyhow = { workspace = true }
bytes = { workspace = true }
Expand Down
5 changes: 3 additions & 2 deletions crates/invoker-api/src/effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use restate_types::deployment::PinnedDeployment;
use restate_types::errors::InvocationError;
use restate_types::identifiers::EntryIndex;
use restate_types::identifiers::InvocationId;
use restate_types::identifiers::{DeploymentId, EntryIndex};
use restate_types::journal::enriched::EnrichedRawEntry;
use std::collections::HashSet;

Expand All @@ -25,7 +26,7 @@ pub struct Effect {
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum EffectKind {
/// This is sent before any new entry is created by the invoker. This won't be sent if the deployment_id is already set.
SelectedDeployment(DeploymentId),
PinnedDeployment(PinnedDeployment),
JournalEntry {
entry_index: EntryIndex,
entry: EnrichedRawEntry,
Expand Down
9 changes: 5 additions & 4 deletions crates/invoker-api/src/journal_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
// by the Apache License, Version 2.0.

use futures::Stream;
use restate_types::identifiers::{DeploymentId, InvocationId};
use restate_types::deployment::PinnedDeployment;
use restate_types::identifiers::InvocationId;
use restate_types::invocation::ServiceInvocationSpanContext;
use restate_types::journal::raw::PlainRawEntry;
use restate_types::journal::EntryIndex;
Expand All @@ -20,17 +21,17 @@ use std::future::Future;
pub struct JournalMetadata {
pub length: EntryIndex,
pub span_context: ServiceInvocationSpanContext,
pub deployment_id: Option<DeploymentId>,
pub pinned_deployment: Option<PinnedDeployment>,
}

impl JournalMetadata {
pub fn new(
length: EntryIndex,
span_context: ServiceInvocationSpanContext,
deployment_id: Option<DeploymentId>,
pinned_deployment: Option<PinnedDeployment>,
) -> Self {
Self {
deployment_id,
pinned_deployment,
span_context,
length,
}
Expand Down
19 changes: 9 additions & 10 deletions crates/invoker-impl/src/invocation_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

use super::*;

use restate_types::identifiers::DeploymentId;
use restate_types::journal::Completion;
use restate_types::retries;
use std::fmt;
Expand Down Expand Up @@ -85,7 +84,7 @@ enum InvocationState {
entries_to_ack: HashSet<EntryIndex>,

// If Some, we need to notify the deployment id to the partition processor
chosen_deployment: Option<DeploymentId>,
pinned_deployment: Option<PinnedDeployment>,
},

WaitingRetry {
Expand Down Expand Up @@ -154,7 +153,7 @@ impl InvocationStateMachine {
journal_tracker: Default::default(),
abort_handle,
entries_to_ack: Default::default(),
chosen_deployment: None,
pinned_deployment: None,
};
}

Expand All @@ -164,34 +163,34 @@ impl InvocationStateMachine {
}
}

pub(super) fn notify_chosen_deployment(&mut self, endpoint_id: DeploymentId) {
pub(super) fn notify_pinned_deployment(&mut self, deployment: PinnedDeployment) {
debug_assert!(matches!(
&self.invocation_state,
InvocationState::InFlight {
chosen_deployment: None,
pinned_deployment: None,
..
}
));

if let InvocationState::InFlight {
chosen_deployment, ..
pinned_deployment, ..
} = &mut self.invocation_state
{
*chosen_deployment = Some(endpoint_id);
*pinned_deployment = Some(deployment);
}
}

pub(super) fn chosen_deployment_to_notify(&mut self) -> Option<DeploymentId> {
pub(super) fn pinned_deployment_to_notify(&mut self) -> Option<PinnedDeployment> {
debug_assert!(matches!(
&self.invocation_state,
InvocationState::InFlight { .. }
));

if let InvocationState::InFlight {
chosen_deployment, ..
pinned_deployment, ..
} = &mut self.invocation_state
{
chosen_deployment.take()
pinned_deployment.take()
} else {
None
}
Expand Down
Loading

0 comments on commit 259c61c

Please sign in to comment.