diff --git a/Cargo.lock b/Cargo.lock index 3b0970f048..a8b0f5399a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1313,6 +1313,7 @@ dependencies = [ "serde_json", "serde_with", "service_key_extractor", + "service_metadata", "service_protocol", "thiserror", "tokio", diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index f8ac2a989e..b701ae7095 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -32,6 +32,7 @@ serde_json = "1.0" common = { workspace = true } service_key_extractor = { workspace = true } ingress_grpc = { workspace = true } +service_metadata = { workspace = true } service_protocol = { workspace = true, features = ["discovery"] } prost-reflect = { workspace = true } diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index 09c8b2fe04..4c5eb6fa62 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -7,6 +7,7 @@ use ingress_grpc::InMemoryMethodDescriptorRegistry; use rest_api::MetaRestEndpoint; use service::MetaService; use service_key_extractor::KeyExtractorsRegistry; +use service_metadata::InMemoryServiceEndpointRegistry; use std::net::SocketAddr; use tokio::join; use tracing::debug; @@ -27,9 +28,11 @@ impl Options { pub fn build(self) -> Meta { let key_extractors_registry = KeyExtractorsRegistry::default(); let method_descriptors_registry = InMemoryMethodDescriptorRegistry::default(); + let service_endpoint_registry = InMemoryServiceEndpointRegistry::default(); let service = MetaService::new( key_extractors_registry.clone(), method_descriptors_registry.clone(), + service_endpoint_registry.clone(), InMemoryMetaStorage::default(), Default::default(), ); @@ -37,6 +40,7 @@ impl Options { Meta { key_extractors_registry, method_descriptors_registry, + service_endpoint_registry, rest_endpoint: MetaRestEndpoint::new(self.rest_addr), service, } @@ -46,6 +50,7 @@ impl Options { pub struct Meta { key_extractors_registry: KeyExtractorsRegistry, method_descriptors_registry: InMemoryMethodDescriptorRegistry, + service_endpoint_registry: InMemoryServiceEndpointRegistry, rest_endpoint: MetaRestEndpoint, service: MetaService, @@ -60,6 +65,10 @@ impl Meta { self.method_descriptors_registry.clone() } + pub fn service_endpoint_registry(&self) -> InMemoryServiceEndpointRegistry { + self.service_endpoint_registry.clone() + } + pub async fn run(self, drain: drain::Watch) { let (shutdown_signal, shutdown_watch) = drain::channel(); diff --git a/src/meta/src/service.rs b/src/meta/src/service.rs index c486a96a56..59569747dd 100644 --- a/src/meta/src/service.rs +++ b/src/meta/src/service.rs @@ -9,6 +9,9 @@ use hyper::http::{HeaderName, HeaderValue}; use hyper::Uri; use ingress_grpc::InMemoryMethodDescriptorRegistry; use service_key_extractor::KeyExtractorsRegistry; +use service_metadata::{ + DeliveryOptions, EndpointMetadata, InMemoryServiceEndpointRegistry, ProtocolType, +}; use service_protocol::discovery::{ServiceDiscovery, ServiceDiscoveryError}; use tokio::sync::mpsc; use tracing::debug; @@ -66,6 +69,7 @@ impl MetaHandle { pub struct MetaService { key_extractors_registry: KeyExtractorsRegistry, method_descriptors_registry: InMemoryMethodDescriptorRegistry, + service_endpoint_registry: InMemoryServiceEndpointRegistry, service_discovery: ServiceDiscovery, storage: Storage, @@ -81,6 +85,7 @@ where pub fn new( key_extractors_registry: KeyExtractorsRegistry, method_descriptors_registry: InMemoryMethodDescriptorRegistry, + service_endpoint_registry: InMemoryServiceEndpointRegistry, storage: Storage, service_discovery_retry_policy: RetryPolicy, ) -> Self { @@ -89,6 +94,7 @@ where Self { key_extractors_registry, method_descriptors_registry, + service_endpoint_registry, service_discovery: ServiceDiscovery::new(service_discovery_retry_policy), storage, handle: MetaHandle(api_cmd_tx), @@ -133,7 +139,7 @@ where ) -> Result, MetaError> { let discovered_metadata = self .service_discovery - .discover(uri, additional_headers) + .discover(&uri, &additional_headers) .await?; let mut registered_services = Vec::with_capacity(discovered_metadata.services.len()); @@ -153,7 +159,15 @@ where self.method_descriptors_registry .register(service_descriptor); self.key_extractors_registry - .register(service, service_instance_type); + .register(service.clone(), service_instance_type); + self.service_endpoint_registry.register_service_endpoint( + service, + EndpointMetadata::new( + uri.clone(), + ProtocolType::BidiStream, // TODO needs to support RequestResponse as well: https://github.com/restatedev/restate/issues/183 + DeliveryOptions::new(additional_headers.clone(), None), // TODO needs to support retry policies as well: https://github.com/restatedev/restate/issues/184 + ), + ); } Ok(registered_services) diff --git a/src/restate/src/app.rs b/src/restate/src/app.rs index ce95ff832d..a9a16adb30 100644 --- a/src/restate/src/app.rs +++ b/src/restate/src/app.rs @@ -23,6 +23,7 @@ impl Options { let worker = self.worker_options.build( meta.method_descriptor_registry(), meta.key_extractors_registry(), + meta.service_endpoint_registry(), ); Application { meta, worker } diff --git a/src/service_metadata/src/endpoint_registry.rs b/src/service_metadata/src/endpoint_registry.rs index 1902244495..ec6054921a 100644 --- a/src/service_metadata/src/endpoint_registry.rs +++ b/src/service_metadata/src/endpoint_registry.rs @@ -9,7 +9,7 @@ pub trait ServiceEndpointRegistry { fn resolve_endpoint(&self, service_name: impl AsRef) -> Option; } -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct InMemoryServiceEndpointRegistry { registry: Arc>>, } diff --git a/src/service_metadata/src/lib.rs b/src/service_metadata/src/lib.rs index 63b0247468..e76bc7cbb8 100644 --- a/src/service_metadata/src/lib.rs +++ b/src/service_metadata/src/lib.rs @@ -19,6 +19,20 @@ pub struct DeliveryOptions { retry_policy: Option, } +impl DeliveryOptions { + // false positive because of inner Bytes field of HeaderName + #[allow(clippy::mutable_key_type)] + pub fn new( + additional_headers: HashMap, + retry_policy: Option, + ) -> Self { + Self { + additional_headers, + retry_policy, + } + } +} + #[derive(Debug, Clone)] pub struct EndpointMetadata { address: Uri, diff --git a/src/service_protocol/src/discovery.rs b/src/service_protocol/src/discovery.rs index a04c52ae2a..37bed01462 100644 --- a/src/service_protocol/src/discovery.rs +++ b/src/service_protocol/src/discovery.rs @@ -159,8 +159,8 @@ impl ServiceDiscovery { #[allow(clippy::mutable_key_type)] pub async fn discover( &self, - uri: Uri, - additional_headers: HashMap, + uri: &Uri, + additional_headers: &HashMap, ) -> Result { let client = Client::builder().build::<_, Body>( HttpsConnectorBuilder::new() @@ -170,7 +170,7 @@ impl ServiceDiscovery { .enable_http2() .build(), ); - let uri = append_discover(&uri); + let uri = append_discover(uri); let (mut parts, body) = self .retry_policy diff --git a/src/service_protocol/tests/counter_test.rs b/src/service_protocol/tests/counter_test.rs index 0b822fb55c..d0388e1557 100644 --- a/src/service_protocol/tests/counter_test.rs +++ b/src/service_protocol/tests/counter_test.rs @@ -15,8 +15,8 @@ async fn counter_discovery() { let discovered_metadata = discovery .discover( - Uri::from_static("http://localhost:8080"), - Default::default(), + &Uri::from_static("http://localhost:8080"), + &Default::default(), ) .await .unwrap(); diff --git a/src/worker/src/lib.rs b/src/worker/src/lib.rs index d84b45279f..461b063eb0 100644 --- a/src/worker/src/lib.rs +++ b/src/worker/src/lib.rs @@ -69,8 +69,14 @@ impl Options { self, method_descriptor_registry: InMemoryMethodDescriptorRegistry, key_extractor_registry: KeyExtractorsRegistry, + service_endpoint_registry: InMemoryServiceEndpointRegistry, ) -> Worker { - Worker::new(self, method_descriptor_registry, key_extractor_registry) + Worker::new( + self, + method_descriptor_registry, + key_extractor_registry, + service_endpoint_registry, + ) } } @@ -79,6 +85,7 @@ impl Worker { opts: Options, method_descriptor_registry: InMemoryMethodDescriptorRegistry, key_extractor_registry: KeyExtractorsRegistry, + service_endpoint_registry: InMemoryServiceEndpointRegistry, ) -> Self { let Options { channel_size, @@ -118,7 +125,11 @@ impl Worker { let network_handle = network.create_network_handle(); - let invoker = Invoker::new(RetryPolicy::None, RocksDBJournalReader, Default::default()); + let invoker = Invoker::new( + RetryPolicy::None, + RocksDBJournalReader, + service_endpoint_registry, + ); let (command_senders, processors): (Vec<_>, Vec<_>) = (0..num_partition_processors) .map(|idx| {