Skip to content

Commit

Permalink
Add ServiceEndpointRegistry to Meta
Browse files Browse the repository at this point in the history
This commit lets the meta populate the InMemoryServiceEndpointRegistry which is
shared with the other components of the Worker component.

This fixes #43.
  • Loading branch information
tillrohrmann committed Mar 14, 2023
1 parent 5270c14 commit f0dfc86
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ serde_json = "1.0"
common = { workspace = true }
service_key_extractor = { workspace = true }
ingress_grpc = { workspace = true }
service_metadata = { workspace = true }
service_protocol = { workspace = true, features = ["discovery"] }
prost-reflect = { workspace = true }

Expand Down
9 changes: 9 additions & 0 deletions src/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use ingress_grpc::InMemoryMethodDescriptorRegistry;
use rest_api::MetaRestEndpoint;
use service::MetaService;
use service_key_extractor::KeyExtractorsRegistry;
use service_metadata::InMemoryServiceEndpointRegistry;
use std::net::SocketAddr;
use tokio::join;
use tracing::debug;
Expand All @@ -27,16 +28,19 @@ impl Options {
pub fn build(self) -> Meta {
let key_extractors_registry = KeyExtractorsRegistry::default();
let method_descriptors_registry = InMemoryMethodDescriptorRegistry::default();
let service_endpoint_registry = InMemoryServiceEndpointRegistry::default();
let service = MetaService::new(
key_extractors_registry.clone(),
method_descriptors_registry.clone(),
service_endpoint_registry.clone(),
InMemoryMetaStorage::default(),
Default::default(),
);

Meta {
key_extractors_registry,
method_descriptors_registry,
service_endpoint_registry,
rest_endpoint: MetaRestEndpoint::new(self.rest_addr),
service,
}
Expand All @@ -46,6 +50,7 @@ impl Options {
pub struct Meta {
key_extractors_registry: KeyExtractorsRegistry,
method_descriptors_registry: InMemoryMethodDescriptorRegistry,
service_endpoint_registry: InMemoryServiceEndpointRegistry,

rest_endpoint: MetaRestEndpoint,
service: MetaService<InMemoryMetaStorage>,
Expand All @@ -60,6 +65,10 @@ impl Meta {
self.method_descriptors_registry.clone()
}

pub fn service_endpoint_registry(&self) -> InMemoryServiceEndpointRegistry {
self.service_endpoint_registry.clone()
}

pub async fn run(self, drain: drain::Watch) {
let (shutdown_signal, shutdown_watch) = drain::channel();

Expand Down
18 changes: 16 additions & 2 deletions src/meta/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use hyper::http::{HeaderName, HeaderValue};
use hyper::Uri;
use ingress_grpc::InMemoryMethodDescriptorRegistry;
use service_key_extractor::KeyExtractorsRegistry;
use service_metadata::{
DeliveryOptions, EndpointMetadata, InMemoryServiceEndpointRegistry, ProtocolType,
};
use service_protocol::discovery::{ServiceDiscovery, ServiceDiscoveryError};
use tokio::sync::mpsc;
use tracing::debug;
Expand Down Expand Up @@ -66,6 +69,7 @@ impl MetaHandle {
pub struct MetaService<Storage> {
key_extractors_registry: KeyExtractorsRegistry,
method_descriptors_registry: InMemoryMethodDescriptorRegistry,
service_endpoint_registry: InMemoryServiceEndpointRegistry,
service_discovery: ServiceDiscovery,

storage: Storage,
Expand All @@ -81,6 +85,7 @@ where
pub fn new(
key_extractors_registry: KeyExtractorsRegistry,
method_descriptors_registry: InMemoryMethodDescriptorRegistry,
service_endpoint_registry: InMemoryServiceEndpointRegistry,
storage: Storage,
service_discovery_retry_policy: RetryPolicy,
) -> Self {
Expand All @@ -89,6 +94,7 @@ where
Self {
key_extractors_registry,
method_descriptors_registry,
service_endpoint_registry,
service_discovery: ServiceDiscovery::new(service_discovery_retry_policy),
storage,
handle: MetaHandle(api_cmd_tx),
Expand Down Expand Up @@ -133,7 +139,7 @@ where
) -> Result<Vec<String>, MetaError> {
let discovered_metadata = self
.service_discovery
.discover(uri, additional_headers)
.discover(&uri, &additional_headers)
.await?;

let mut registered_services = Vec::with_capacity(discovered_metadata.services.len());
Expand All @@ -153,7 +159,15 @@ where
self.method_descriptors_registry
.register(service_descriptor);
self.key_extractors_registry
.register(service, service_instance_type);
.register(service.clone(), service_instance_type);
self.service_endpoint_registry.register_service_endpoint(
service,
EndpointMetadata::new(
uri.clone(),
ProtocolType::BidiStream, // TODO needs to support RequestResponse as well: https://github.com/restatedev/restate/issues/183
DeliveryOptions::new(additional_headers.clone(), None), // TODO needs to support retry policies as well: https://github.com/restatedev/restate/issues/184
),
);
}

Ok(registered_services)
Expand Down
1 change: 1 addition & 0 deletions src/restate/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ impl Options {
let worker = self.worker_options.build(
meta.method_descriptor_registry(),
meta.key_extractors_registry(),
meta.service_endpoint_registry(),
);

Application { meta, worker }
Expand Down
2 changes: 1 addition & 1 deletion src/service_metadata/src/endpoint_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub trait ServiceEndpointRegistry {
fn resolve_endpoint(&self, service_name: impl AsRef<str>) -> Option<EndpointMetadata>;
}

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct InMemoryServiceEndpointRegistry {
registry: Arc<ArcSwap<HashMap<String, EndpointMetadata>>>,
}
Expand Down
14 changes: 14 additions & 0 deletions src/service_metadata/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,20 @@ pub struct DeliveryOptions {
retry_policy: Option<RetryPolicy>,
}

impl DeliveryOptions {
// false positive because of inner Bytes field of HeaderName
#[allow(clippy::mutable_key_type)]
pub fn new(
additional_headers: HashMap<HeaderName, HeaderValue>,
retry_policy: Option<RetryPolicy>,
) -> Self {
Self {
additional_headers,
retry_policy,
}
}
}

#[derive(Debug, Clone)]
pub struct EndpointMetadata {
address: Uri,
Expand Down
6 changes: 3 additions & 3 deletions src/service_protocol/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ impl ServiceDiscovery {
#[allow(clippy::mutable_key_type)]
pub async fn discover(
&self,
uri: Uri,
additional_headers: HashMap<HeaderName, HeaderValue>,
uri: &Uri,
additional_headers: &HashMap<HeaderName, HeaderValue>,
) -> Result<DiscoveredMetadata, ServiceDiscoveryError> {
let client = Client::builder().build::<_, Body>(
HttpsConnectorBuilder::new()
Expand All @@ -170,7 +170,7 @@ impl ServiceDiscovery {
.enable_http2()
.build(),
);
let uri = append_discover(&uri);
let uri = append_discover(uri);

let (mut parts, body) = self
.retry_policy
Expand Down
4 changes: 2 additions & 2 deletions src/service_protocol/tests/counter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ async fn counter_discovery() {

let discovered_metadata = discovery
.discover(
Uri::from_static("http://localhost:8080"),
Default::default(),
&Uri::from_static("http://localhost:8080"),
&Default::default(),
)
.await
.unwrap();
Expand Down
15 changes: 13 additions & 2 deletions src/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,14 @@ impl Options {
self,
method_descriptor_registry: InMemoryMethodDescriptorRegistry,
key_extractor_registry: KeyExtractorsRegistry,
service_endpoint_registry: InMemoryServiceEndpointRegistry,
) -> Worker {
Worker::new(self, method_descriptor_registry, key_extractor_registry)
Worker::new(
self,
method_descriptor_registry,
key_extractor_registry,
service_endpoint_registry,
)
}
}

Expand All @@ -79,6 +85,7 @@ impl Worker {
opts: Options,
method_descriptor_registry: InMemoryMethodDescriptorRegistry,
key_extractor_registry: KeyExtractorsRegistry,
service_endpoint_registry: InMemoryServiceEndpointRegistry,
) -> Self {
let Options {
channel_size,
Expand Down Expand Up @@ -118,7 +125,11 @@ impl Worker {

let network_handle = network.create_network_handle();

let invoker = Invoker::new(RetryPolicy::None, RocksDBJournalReader, Default::default());
let invoker = Invoker::new(
RetryPolicy::None,
RocksDBJournalReader,
service_endpoint_registry,
);

let (command_senders, processors): (Vec<_>, Vec<_>) = (0..num_partition_processors)
.map(|idx| {
Expand Down

0 comments on commit f0dfc86

Please sign in to comment.