Skip to content

Commit

Permalink
Track caller for each ServiceInvocation
Browse files Browse the repository at this point in the history
This commit introduces the Caller enum which each ServiceInvocation and
InvocationMetadata carries along. The Caller captures who was the caller
of the given service invocation. Currently Restate distinguishes between
three values:

* Caller::Ingress: Invocation originating from the ingress
* Caller::Service: Invocation originating from a Restate service
* Caller::Internal: Invocation originating from the Restate server to drive non-deterministic built-in services forward

This fixes restatedev#997.
  • Loading branch information
tillrohrmann committed Dec 19, 2023
1 parent a19a7c9 commit b5e0bd9
Show file tree
Hide file tree
Showing 21 changed files with 161 additions and 41 deletions.
4 changes: 3 additions & 1 deletion crates/ingress-dispatcher/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use restate_pb::restate::internal::{
};
use restate_types::identifiers::FullInvocationId;
use restate_types::identifiers::IngressDispatcherId;
use restate_types::invocation::ServiceInvocationResponseSink;
use restate_types::invocation::{Caller, ServiceInvocationResponseSink};
use std::collections::HashMap;
use std::future::poll_fn;
use tokio::select;
Expand Down Expand Up @@ -302,6 +302,7 @@ impl DispatcherLoopHandler {
}
.encode_to_vec()
.into(),
caller: Caller::Ingress,
response_sink,
span_context,
},
Expand All @@ -313,6 +314,7 @@ impl DispatcherLoopHandler {
fid,
method_name,
argument,
caller: Caller::Ingress,
response_sink,
span_context,
},
Expand Down
7 changes: 6 additions & 1 deletion crates/storage-api/src/status_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use bytestring::ByteString;
use restate_types::identifiers::{
DeploymentId, EntryIndex, FullInvocationId, InvocationUuid, PartitionKey, ServiceId,
};
use restate_types::invocation::{ServiceInvocationResponseSink, ServiceInvocationSpanContext};
use restate_types::invocation::{
Caller, ServiceInvocationResponseSink, ServiceInvocationSpanContext,
};
use restate_types::time::MillisSinceEpoch;
use std::collections::HashSet;
use std::ops::RangeInclusive;
Expand Down Expand Up @@ -182,6 +184,7 @@ pub struct InvocationMetadata {
pub method: ByteString,
pub response_sink: Option<ServiceInvocationResponseSink>,
pub timestamps: StatusTimestamps,
pub caller: Caller,
}

impl InvocationMetadata {
Expand All @@ -192,6 +195,7 @@ impl InvocationMetadata {
method: ByteString,
response_sink: Option<ServiceInvocationResponseSink>,
timestamps: StatusTimestamps,
caller: Caller,
) -> Self {
Self {
invocation_uuid,
Expand All @@ -200,6 +204,7 @@ impl InvocationMetadata {
method,
response_sink,
timestamps,
caller,
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions crates/storage-proto/proto/dev/restate/storage/v1/domain.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ message JournalMeta {
SpanContext span_context = 2;
}

message Caller {
oneof caller {
google.protobuf.Empty ingress = 9;
FullInvocationId service = 10;
google.protobuf.Empty internal = 11;
}
}

message InvocationStatus {

message Invoked {
Expand All @@ -43,6 +51,7 @@ message InvocationStatus {
google.protobuf.Empty none = 7;
string value = 8;
}
Caller caller = 9;
}

message Suspended {
Expand All @@ -57,6 +66,7 @@ message InvocationStatus {
google.protobuf.Empty none = 8;
string value = 9;
}
Caller caller = 10;
}

message Free {
Expand Down Expand Up @@ -142,6 +152,7 @@ message ServiceInvocation {
bytes argument = 3;
ServiceInvocationResponseSink response_sink = 4;
SpanContext span_context = 5;
Caller caller = 6;
}

message InboxEntry {
Expand Down
74 changes: 67 additions & 7 deletions crates/storage-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ pub mod storage {
Ingress, NewInvocation, PartitionProcessor, ResponseSink,
};
use crate::storage::v1::{
enriched_entry_header, invocation_resolution_result, invocation_status,
caller, enriched_entry_header, invocation_resolution_result, invocation_status,
maybe_full_invocation_id, outbox_message, response_result, span_relation, timer,
BackgroundCallResolutionResult, EnrichedEntryHeader, FullInvocationId, InboxEntry,
InvocationResolutionResult, InvocationStatus, JournalEntry, JournalMeta,
MaybeFullInvocationId, OutboxMessage, ResponseResult, SequencedTimer,
BackgroundCallResolutionResult, Caller, EnrichedEntryHeader, FullInvocationId,
InboxEntry, InvocationResolutionResult, InvocationStatus, JournalEntry,
JournalMeta, MaybeFullInvocationId, OutboxMessage, ResponseResult, SequencedTimer,
ServiceInvocation, ServiceInvocationResponseSink, SpanContext, SpanRelation, Timer,
};
use anyhow::anyhow;
Expand Down Expand Up @@ -93,12 +93,12 @@ pub mod storage {
.ok_or(ConversionError::missing_field("status"))?
{
invocation_status::Status::Invoked(invoked) => {
let invoked_status =
let invocation_metadata =
restate_storage_api::status_table::InvocationMetadata::try_from(
invoked,
)?;
restate_storage_api::status_table::InvocationStatus::Invoked(
invoked_status,
invocation_metadata,
)
}
invocation_status::Status::Suspended(suspended) => {
Expand Down Expand Up @@ -202,6 +202,12 @@ pub mod storage {
.ok_or(ConversionError::missing_field("response_sink"))?,
)?;

let caller = restate_types::invocation::Caller::try_from(
value
.caller
.ok_or(ConversionError::missing_field("caller"))?,
)?;

Ok(restate_storage_api::status_table::InvocationMetadata::new(
invocation_uuid,
journal_metadata,
Expand All @@ -212,6 +218,7 @@ pub mod storage {
MillisSinceEpoch::new(value.creation_time),
MillisSinceEpoch::new(value.modification_time),
),
caller,
))
}
}
Expand All @@ -220,11 +227,12 @@ pub mod storage {
fn from(value: restate_storage_api::status_table::InvocationMetadata) -> Self {
let restate_storage_api::status_table::InvocationMetadata {
invocation_uuid,
deployment_id: deployment_id,
deployment_id,
method,
response_sink,
journal_metadata,
timestamps,
caller,
} = value;

Invoked {
Expand All @@ -240,6 +248,7 @@ pub mod storage {
journal_meta: Some(JournalMeta::from(journal_metadata)),
creation_time: timestamps.creation_time().as_u64(),
modification_time: timestamps.modification_time().as_u64(),
caller: Some(Caller::from(caller)),
}
}
}
Expand Down Expand Up @@ -284,6 +293,12 @@ pub mod storage {
let waiting_for_completed_entries =
value.waiting_for_completed_entries.into_iter().collect();

let caller = restate_types::invocation::Caller::try_from(
value
.caller
.ok_or(ConversionError::missing_field("caller"))?,
)?;

Ok((
restate_storage_api::status_table::InvocationMetadata::new(
invocation_uuid,
Expand All @@ -295,6 +310,7 @@ pub mod storage {
MillisSinceEpoch::new(value.creation_time),
MillisSinceEpoch::new(value.modification_time),
),
caller,
),
waiting_for_completed_entries,
))
Expand Down Expand Up @@ -333,6 +349,7 @@ pub mod storage {
creation_time: metadata.timestamps.creation_time().as_u64(),
modification_time: metadata.timestamps.modification_time().as_u64(),
waiting_for_completed_entries,
caller: Some(Caller::from(metadata.caller)),
}
}
}
Expand Down Expand Up @@ -470,6 +487,41 @@ pub mod storage {
}
}

impl TryFrom<Caller> for restate_types::invocation::Caller {
type Error = ConversionError;

fn try_from(value: Caller) -> Result<Self, Self::Error> {
let caller = match value
.caller
.ok_or(ConversionError::missing_field("caller"))?
{
caller::Caller::Ingress(_) => restate_types::invocation::Caller::Ingress,
caller::Caller::Service(fid) => restate_types::invocation::Caller::Service(
restate_types::identifiers::FullInvocationId::try_from(fid)?,
),
caller::Caller::Internal(_) => restate_types::invocation::Caller::Internal,
};

Ok(caller)
}
}

impl From<restate_types::invocation::Caller> for Caller {
fn from(value: restate_types::invocation::Caller) -> Self {
let caller = match value {
restate_types::invocation::Caller::Ingress => caller::Caller::Ingress(()),
restate_types::invocation::Caller::Service(fid) => {
caller::Caller::Service(FullInvocationId::from(fid))
}
restate_types::invocation::Caller::Internal => caller::Caller::Internal(()),
};

Caller {
caller: Some(caller),
}
}
}

impl TryFrom<InboxEntry> for restate_types::invocation::ServiceInvocation {
type Error = ConversionError;

Expand Down Expand Up @@ -505,6 +557,7 @@ pub mod storage {
response_sink,
span_context,
argument,
caller,
} = value;

let id = restate_types::identifiers::FullInvocationId::try_from(
Expand All @@ -525,10 +578,15 @@ pub mod storage {
let method_name =
ByteString::try_from(method_name).map_err(ConversionError::invalid_data)?;

let caller = restate_types::invocation::Caller::try_from(
caller.ok_or(ConversionError::missing_field("caller"))?,
)?;

Ok(restate_types::invocation::ServiceInvocation {
fid: id,
method_name,
argument,
caller,
response_sink,
span_context,
})
Expand All @@ -541,13 +599,15 @@ pub mod storage {
let span_context = SpanContext::from(value.span_context);
let response_sink = ServiceInvocationResponseSink::from(value.response_sink);
let method_name = value.method_name.into_bytes();
let caller = Caller::from(value.caller);

ServiceInvocation {
id: Some(id),
span_context: Some(span_context),
response_sink: Some(response_sink),
method_name,
argument: value.argument,
caller: Some(caller),
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions crates/storage-query-datafusion/src/inbox/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use super::schema::InboxBuilder;
use crate::table_util::format_using;
use restate_storage_api::inbox_table::InboxEntry;
use restate_types::identifiers::{InvocationId, WithPartitionKey};
use restate_types::invocation::{ServiceInvocation, ServiceInvocationResponseSink};
use restate_types::invocation::{Caller, ServiceInvocation};
use std::time::Duration;
use uuid::Uuid;

Expand All @@ -28,7 +28,7 @@ pub(crate) fn append_inbox_row(
ServiceInvocation {
fid,
method_name,
response_sink,
caller,
span_context,
..
},
Expand All @@ -48,19 +48,19 @@ pub(crate) fn append_inbox_row(

row.sequence_number(inbox_sequence_number);

match response_sink {
Some(ServiceInvocationResponseSink::PartitionProcessor { caller, .. }) => {
match caller {
Caller::Service(caller) => {
row.invoked_by("service");
row.invoked_by_service(&caller.service_id.service_name);
if row.is_invoked_by_id_defined() {
row.invoked_by_id(format_using(output, &caller));
}
}
Some(ServiceInvocationResponseSink::Ingress(..)) => {
Caller::Ingress => {
row.invoked_by("ingress");
}
_ => {
row.invoked_by("unknown");
Caller::Internal => {
row.invoked_by("Restate");
}
}
if row.is_trace_id_defined() {
Expand Down
12 changes: 6 additions & 6 deletions crates/storage-query-datafusion/src/status/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use restate_storage_api::status_table::{
};
use restate_storage_rocksdb::status_table::OwnedStatusRow;
use restate_types::identifiers::InvocationId;
use restate_types::invocation::ServiceInvocationResponseSink;
use restate_types::invocation::Caller;

#[inline]
pub(crate) fn append_status_row(
Expand Down Expand Up @@ -86,19 +86,19 @@ fn fill_invocation_metadata(
if let Some(deployment_id) = meta.deployment_id {
row.pinned_deployment_id(deployment_id);
}
match meta.response_sink {
Some(ServiceInvocationResponseSink::PartitionProcessor { caller, .. }) => {
match meta.caller {
Caller::Service(caller) => {
row.invoked_by("service");
row.invoked_by_service(&caller.service_id.service_name);
if row.is_invoked_by_id_defined() {
row.invoked_by_id(format_using(output, &caller));
}
}
Some(ServiceInvocationResponseSink::Ingress(..)) => {
Caller::Ingress => {
row.invoked_by("ingress");
}
_ => {
row.invoked_by("unknown");
Caller::Internal => {
row.invoked_by("Restate");
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion crates/storage-rocksdb/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use bytes::Bytes;
use bytestring::ByteString;
use restate_storage_api::GetStream;
use restate_types::identifiers::{FullInvocationId, InvocationUuid, ServiceId};
use restate_types::invocation::{ServiceInvocation, SpanRelation};
use restate_types::invocation::{Caller, ServiceInvocation, SpanRelation};
use std::fmt::Debug;
use std::str::FromStr;
use tempfile::tempdir;
Expand Down Expand Up @@ -66,6 +66,7 @@ pub(crate) fn mock_service_invocation(service_id: ServiceId) -> ServiceInvocatio
FullInvocationId::with_service_id(service_id, InvocationUuid::now_v7()),
ByteString::from_static("service"),
Bytes::new(),
Caller::Ingress,
None,
SpanRelation::None,
)
Expand All @@ -76,6 +77,7 @@ pub(crate) fn mock_random_service_invocation() -> ServiceInvocation {
FullInvocationId::mock_random(),
ByteString::from_static("service"),
Bytes::new(),
Caller::Ingress,
None,
SpanRelation::None,
)
Expand Down
Loading

0 comments on commit b5e0bd9

Please sign in to comment.