Skip to content

Commit

Permalink
Add Source::Subscription to identify the invocation comes from a su…
Browse files Browse the repository at this point in the history
…bscription (restatedev#2305)

* Macro for all the ids backed by uuids. Those are all the same code.

* Add `Source::Subscription`.

This adds a new feature `experimental_feature_kafka_ingress_next` for a cluster of issues to tackle in subsequent PRs (notably restatedev#1423 and restatedev#964)

* Macro feedback + remove resource id on PartitionProcessorRpcRequest

* Don't compile doctests of the macro

* Fix comment
  • Loading branch information
slinkydeveloper authored Nov 18, 2024
1 parent a4eeb59 commit 69cf968
Show file tree
Hide file tree
Showing 16 changed files with 231 additions and 363 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion crates/ingress-kafka/src/consumer_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,18 @@ impl KafkaDeduplicationId {
pub struct MessageSender {
subscription: Subscription,
dispatcher: KafkaIngressDispatcher,
experimental_feature_kafka_ingress_next: bool,

subscription_id: String,
ingress_request_counter: metrics::Counter,
}

impl MessageSender {
pub fn new(subscription: Subscription, dispatcher: KafkaIngressDispatcher) -> Self {
pub fn new(
subscription: Subscription,
dispatcher: KafkaIngressDispatcher,
experimental_feature_kafka_ingress_next: bool,
) -> Self {
Self {
subscription_id: subscription.id().to_string(),
ingress_request_counter: counter!(
Expand All @@ -104,6 +109,7 @@ impl MessageSender {
),
subscription,
dispatcher,
experimental_feature_kafka_ingress_next,
}
}

Expand Down Expand Up @@ -144,6 +150,7 @@ impl MessageSender {
deduplication_id,
deduplication_index,
headers,
self.experimental_feature_kafka_ingress_next,
)
.map_err(|cause| Error::Event {
topic: msg.topic().to_string(),
Expand Down
8 changes: 7 additions & 1 deletion crates/ingress-kafka/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub struct KafkaIngressEvent {
}

impl KafkaIngressEvent {
#[allow(clippy::too_many_arguments)]
pub fn new(
subscription: &Subscription,
key: Bytes,
Expand All @@ -47,6 +48,7 @@ impl KafkaIngressEvent {
deduplication_id: KafkaDeduplicationId,
deduplication_index: MessageIndex,
headers: Vec<restate_types::invocation::Header>,
experimental_feature_kafka_ingress_next: bool,
) -> Result<Self, anyhow::Error> {
// Check if we need to proxy or not
let proxying_partition_key = if KafkaDeduplicationId::requires_proxying(subscription) {
Expand Down Expand Up @@ -94,7 +96,11 @@ impl KafkaIngressEvent {
let mut service_invocation = ServiceInvocation::initialize(
invocation_id,
invocation_target,
restate_types::invocation::Source::Ingress(PartitionProcessorRpcRequestId::new()),
if experimental_feature_kafka_ingress_next {
restate_types::invocation::Source::Subscription(subscription.id())
} else {
restate_types::invocation::Source::Ingress(PartitionProcessorRpcRequestId::new())
},
);
service_invocation.with_related_span(related_span);
service_invocation.argument = argument;
Expand Down
6 changes: 5 additions & 1 deletion crates/ingress-kafka/src/subscription_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,11 @@ impl Service {
task_center(),
client_config,
vec![topic.to_string()],
MessageSender::new(subscription, self.dispatcher.clone()),
MessageSender::new(
subscription,
self.dispatcher.clone(),
options.experimental_feature_kafka_ingress_next(),
),
);

task_orchestrator.start(subscription_id, consumer_task);
Expand Down
5 changes: 5 additions & 0 deletions crates/storage-api/proto/dev/restate/storage/v1/domain.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,15 @@ message Source {
InvocationTarget invocation_target = 2;
}

message Subscription {
bytes subscription_id = 1;
}

oneof source {
Ingress ingress = 9;
Service service = 10;
google.protobuf.Empty internal = 11;
Subscription subscription = 12;
}
}

Expand Down
13 changes: 13 additions & 0 deletions crates/storage-api/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1343,6 +1343,14 @@ pub mod v1 {
// TODO this should become an hard error in Restate 1.3
.unwrap_or_default(),
),
source::Source::Subscription(subscription) => {
restate_types::invocation::Source::Subscription(
restate_types::identifiers::SubscriptionId::from_slice(
&subscription.subscription_id,
)
.map_err(|e| ConversionError::invalid_data(e))?,
)
}
source::Source::Service(service) => restate_types::invocation::Source::Service(
restate_types::identifiers::InvocationId::try_from(
service
Expand Down Expand Up @@ -1370,6 +1378,11 @@ pub mod v1 {
rpc_id: rpc_id.to_bytes().to_vec().into(),
})
}
restate_types::invocation::Source::Subscription(sub_id) => {
source::Source::Subscription(source::Subscription {
subscription_id: sub_id.to_bytes().to_vec().into(),
})
}
restate_types::invocation::Source::Service(
invocation_id,
invocation_target,
Expand Down
4 changes: 4 additions & 0 deletions crates/storage-query-datafusion/src/invocation_status/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ fn fill_invoked_by(row: &mut SysInvocationStatusRowBuilder, output: &mut String,
Source::Internal => {
row.invoked_by("restate");
}
Source::Subscription(sub_id) => {
row.invoked_by("subscription");
row.invoked_by_subscription_id(format_using(output, &sub_id))
}
}
}

Expand Down
21 changes: 12 additions & 9 deletions crates/storage-query-datafusion/src/invocation_status/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,22 @@ define_table!(sys_invocation_status(
/// Idempotency key, if any.
idempotency_key: DataType::LargeUtf8,

/// Either `ingress` if the service was invoked externally or `service` if the service was
/// invoked by another Restate service.
/// Either:
/// * `ingress` if the invocation was created externally.
/// * `service` if the invocation was created by another Restate service.
/// * `subscription` if the invocation was created by a subscription (e.g. Kafka).
invoked_by: DataType::LargeUtf8,

/// The name of the invoking service. Or `null` if invoked externally.
invoked_by_service_name: DataType::LargeUtf8,

/// The caller [Invocation ID](/operate/invocation#invocation-identifier) if the service was
/// invoked by another Restate service. Or `null` if invoked externally.
/// The caller [Invocation ID](/operate/invocation#invocation-identifier) if `invoked_by = 'service'`.
invoked_by_id: DataType::LargeUtf8,

/// The caller invocation target if the service was invoked by another Restate service. Or
/// `null` if invoked externally.
/// The subscription id if `invoked_by = 'subscription'`.
invoked_by_subscription_id: DataType::LargeUtf8,

/// The name of caller service if `invoked_by = 'service'`.
invoked_by_service_name: DataType::LargeUtf8,

/// The caller invocation target if `invoked_by = 'service'`.
invoked_by_target: DataType::LargeUtf8,

/// The ID of the service deployment that started processing this invocation, and will continue
Expand Down
1 change: 1 addition & 0 deletions crates/storage-query-datafusion/src/table_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub fn sys_invocation_table_docs() -> OwnedTableDocs {
sys_invocation_status.remove("invoked_by").expect("invoked_by should exist"),
sys_invocation_status.remove("invoked_by_service_name").expect("invoked_by_service_name should exist"),
sys_invocation_status.remove("invoked_by_id").expect("invoked_by_id should exist"),
sys_invocation_status.remove("invoked_by_subscription_id").expect("invoked_by_subscription_id should exist"),
sys_invocation_status.remove("invoked_by_target").expect("invoked_by_target should exist"),
sys_invocation_status.remove("pinned_deployment_id").expect("pinned_deployment_id should exist"),
sys_invocation_status.remove("pinned_service_protocol_version").expect("pinned_service_protocol_version should exist"),
Expand Down
1 change: 1 addition & 0 deletions crates/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ notify-debouncer-mini = { version = "0.4.1" }
num-traits = { version = "0.2.17" }
once_cell = { workspace = true }
opentelemetry = { workspace = true }
paste = { workspace = true }
prost = { workspace = true }
prost-dto = { workspace = true }
prost-types = { workspace = true }
Expand Down
9 changes: 9 additions & 0 deletions crates/types/src/config/ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ pub struct IngressOptions {
/// back to a previous version.
#[cfg_attr(feature = "schemars", schemars(skip))]
pub experimental_feature_enable_separate_ingress_role: bool,

/// Cluster of new features for the kafka ingress.
#[cfg_attr(feature = "schemars", schemars(skip))]
experimental_feature_kafka_ingress_next: bool,
}

impl IngressOptions {
Expand All @@ -65,6 +69,10 @@ impl IngressOptions {
Semaphore::MAX_PERMITS - 1,
)
}

pub fn experimental_feature_kafka_ingress_next(&self) -> bool {
self.experimental_feature_kafka_ingress_next
}
}

impl Default for IngressOptions {
Expand All @@ -75,6 +83,7 @@ impl Default for IngressOptions {
concurrent_api_requests_limit: None,
kafka_clusters: Default::default(),
experimental_feature_enable_separate_ingress_role: false,
experimental_feature_kafka_ingress_next: false,
}
}
}
73 changes: 4 additions & 69 deletions crates/types/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,76 +8,8 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::fmt;
use std::mem::size_of;
use std::str::FromStr;

use ulid::Ulid;

use crate::base62_util::base62_max_length_for_type;
use crate::errors::IdDecodeError;
use crate::id_util::{IdDecoder, IdEncoder, IdResourceType};
use crate::identifiers::{DeploymentId, ResourceId, TimestampAwareId};
use crate::identifiers::DeploymentId;
use crate::service_protocol::ServiceProtocolVersion;
use crate::time::MillisSinceEpoch;

impl ResourceId for DeploymentId {
const SIZE_IN_BYTES: usize = size_of::<u128>();
const RESOURCE_TYPE: IdResourceType = IdResourceType::Deployment;
const STRING_CAPACITY_HINT: usize = base62_max_length_for_type::<u128>();
fn push_contents_to_encoder(&self, encoder: &mut IdEncoder<Self>) {
let ulid_raw: u128 = self.0.into();
encoder.encode_fixed_width(ulid_raw);
}
}

impl TimestampAwareId for DeploymentId {
fn timestamp(&self) -> MillisSinceEpoch {
self.0.timestamp_ms().into()
}
}

impl FromStr for DeploymentId {
type Err = IdDecodeError;

fn from_str(input: &str) -> Result<Self, Self::Err> {
let mut decoder = IdDecoder::new(input)?;
// Ensure we are decoding the correct resource type
if decoder.resource_type != Self::RESOURCE_TYPE {
return Err(IdDecodeError::TypeMismatch);
}

// ulid (u128)
let raw_ulid: u128 = decoder.cursor.decode_next()?;
Ok(Self::from(raw_ulid))
}
}

impl fmt::Display for DeploymentId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut encoder = IdEncoder::<Self>::new();
self.push_contents_to_encoder(&mut encoder);
fmt::Display::fmt(&encoder.finalize(), f)
}
}

impl From<u128> for DeploymentId {
fn from(value: u128) -> Self {
Self(Ulid::from(value))
}
}

// Passthrough json schema to the string
#[cfg(feature = "schemars")]
impl schemars::JsonSchema for DeploymentId {
fn schema_name() -> String {
<String as schemars::JsonSchema>::schema_name()
}

fn json_schema(g: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema {
<String as schemars::JsonSchema>::json_schema(g)
}
}

/// Deployment which was chosen to run an invocation on.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
Expand All @@ -102,6 +34,9 @@ impl PinnedDeployment {
mod tests {
use super::*;

use crate::identifiers::{ResourceId, TimestampAwareId};
use crate::IdEncoder;

#[test]
fn test_deployment_id_format() {
let a = DeploymentId::new();
Expand Down
Loading

0 comments on commit 69cf968

Please sign in to comment.