Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce META REST endpoint to get a service endpoint by id #565

Merged
merged 2 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

124 changes: 124 additions & 0 deletions src/meta/src/rest_api/endpoints.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use super::error::*;
use super::state::*;

use axum::extract::{Path, State};
use axum::http::Uri;
use axum::Json;
use okapi_operation::*;
use restate_schema_api::endpoint::{EndpointMetadataResolver, ProtocolType};
use restate_serde_util::SerdeableHeaderHashMap;
use restate_types::identifiers::{EndpointId, ServiceRevision};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use std::sync::Arc;

#[serde_as]
#[derive(Debug, Deserialize, JsonSchema)]
pub struct RegisterServiceEndpointRequest {
/// # Uri
///
/// Uri to use to discover/invoke the service endpoint.
#[serde_as(as = "serde_with::DisplayFromStr")]
#[schemars(with = "String")]
pub uri: Uri,
/// # Additional headers
///
/// Additional headers added to the discover/invoke requests to the service endpoint.
pub additional_headers: Option<SerdeableHeaderHashMap>,
/// # Force
///
/// If `true`, it will override, if existing, any endpoint using the same `uri`.
/// Beware that this can lead in-flight invocations to an unrecoverable error state.
///
/// See the [versioning documentation](http://restate.dev/docs/deployment-operations/versioning) for more information.
#[serde(default)]
pub force: bool,
}

#[derive(Debug, Serialize, JsonSchema)]
pub struct RegisterServiceResponse {
name: String,
revision: ServiceRevision,
}

#[derive(Debug, Serialize, JsonSchema)]
pub struct RegisterServiceEndpointResponse {
id: EndpointId,
services: Vec<RegisterServiceResponse>,
}

/// Discover endpoint and return discovered endpoints.
#[openapi(
summary = "Discover service endpoint",
description = "Discover service endpoint and register it in the meta information storage. If the service endpoint is already registered, it will be re-discovered and will override the previous stored metadata.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The description seems outdated since we are no longer overwriting metadata by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated this in the next PR

operation_id = "discover_service_endpoint",
tags = "service_endpoint"
)]
pub async fn discover_service_endpoint<S, W>(
State(state): State<Arc<RestEndpointState<S, W>>>,
#[request_body(required = true)] Json(payload): Json<RegisterServiceEndpointRequest>,
) -> Result<Json<RegisterServiceEndpointResponse>, MetaApiError> {
let registration_result = state
.meta_handle()
.register(
payload.uri,
payload.additional_headers.unwrap_or_default().into(),
payload.force,
)
.await?;

Ok(RegisterServiceEndpointResponse {
id: registration_result.endpoint,
services: registration_result
.services
.into_iter()
.map(|(name, revision)| RegisterServiceResponse { name, revision })
.collect(),
}
.into())
}

#[derive(Debug, Serialize, JsonSchema)]
pub struct ServiceEndpointResponse {
endpoint_id: EndpointId,
#[serde(with = "serde_with::As::<serde_with::DisplayFromStr>")]
#[schemars(with = "String")]
address: Uri,
protocol_type: ProtocolType,
additional_headers: SerdeableHeaderHashMap,
services: Vec<RegisterServiceResponse>,
}

/// Discover endpoint and return discovered endpoints.
#[openapi(
summary = "Get service endpoint",
description = "Get service endpoint metadata",
operation_id = "get_service_endpoint",
tags = "service_endpoint",
parameters(path(
name = "endpoint",
description = "Endpoint identifier",
schema = "std::string::String"
))
)]
pub async fn get_service_endpoint<S: EndpointMetadataResolver, W>(
State(state): State<Arc<RestEndpointState<S, W>>>,
Path(endpoint_id): Path<String>,
) -> Result<Json<ServiceEndpointResponse>, MetaApiError> {
let (endpoint_meta, services) = state
.schemas()
.get_endpoint_and_services(&endpoint_id)
.ok_or_else(|| MetaApiError::ServiceEndpointNotFound(endpoint_id.clone()))?;

Ok(ServiceEndpointResponse {
endpoint_id,
address: endpoint_meta.address().clone(),
protocol_type: endpoint_meta.protocol_type(),
additional_headers: endpoint_meta.additional_headers().clone().into(),
services: services
.into_iter()
.map(|(name, revision)| RegisterServiceResponse { name, revision })
.collect(),
}
.into())
}
2 changes: 2 additions & 0 deletions src/meta/src/rest_api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use crate::service::MetaError;
pub enum MetaApiError {
#[error("The request field '{0}' is invalid. Reason: {1}")]
InvalidField(&'static str, String),
#[error("The requested service endpoint '{0}' does not exist")]
ServiceEndpointNotFound(String),
#[error("The requested service '{0}' does not exist")]
ServiceNotFound(String),
#[error("The requested method '{method_name}' on service '{service_name}' does not exist")]
Expand Down
17 changes: 14 additions & 3 deletions src/meta/src/rest_api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! This module implements the Meta API endpoint.

mod endpoints;
mod error;
mod invocations;
mod methods;
Expand All @@ -13,6 +14,7 @@ use futures::FutureExt;
use hyper::Server;
use okapi_operation::axum_integration::{delete, get, post};
use okapi_operation::*;
use restate_schema_api::endpoint::EndpointMetadataResolver;
use restate_schema_api::key::RestateKeyConverter;
use restate_schema_api::service::ServiceMetadataResolver;
use std::net::SocketAddr;
Expand Down Expand Up @@ -50,7 +52,12 @@ impl MetaRestEndpoint {
}

pub async fn run<
S: ServiceMetadataResolver + RestateKeyConverter + Send + Sync + 'static,
S: ServiceMetadataResolver
+ EndpointMetadataResolver
+ RestateKeyConverter
+ Send
+ Sync
+ 'static,
W: restate_worker_api::Handle + Send + Sync + 'static,
>(
self,
Expand All @@ -67,14 +74,18 @@ impl MetaRestEndpoint {

// Setup the router
let meta_api = axum_integration::Router::new()
.route(
"/endpoints/:endpoint",
get(openapi_handler!(endpoints::get_service_endpoint)),
)
// deprecated url
.route(
"/endpoint/discover",
post(openapi_handler!(services::discover_service_endpoint)),
post(openapi_handler!(endpoints::discover_service_endpoint)),
)
.route(
"/services/discover",
post(openapi_handler!(services::discover_service_endpoint)),
post(openapi_handler!(endpoints::discover_service_endpoint)),
)
.route("/services/", get(openapi_handler!(services::list_services)))
.route(
Expand Down
83 changes: 2 additions & 81 deletions src/meta/src/rest_api/services.rs
Original file line number Diff line number Diff line change
@@ -1,93 +1,14 @@
use super::error::*;
use super::state::*;

use axum::extract::{Path, State};
use axum::http::Uri;
use axum::Json;
use hyper::http::{HeaderName, HeaderValue};
use okapi_operation::*;
use restate_schema_api::service::{ServiceMetadata, ServiceMetadataResolver};
use restate_types::identifiers::{EndpointId, ServiceRevision};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use std::collections::HashMap;
use serde::Serialize;
use std::sync::Arc;

#[serde_as]
#[derive(Debug, Deserialize, JsonSchema)]
pub struct RegisterServiceEndpointRequest {
/// # Uri
///
/// Uri to use to discover/invoke the service endpoint.
#[serde_as(as = "serde_with::DisplayFromStr")]
#[schemars(with = "String")]
pub uri: Uri,
/// # Additional headers
///
/// Additional headers added to the discover/invoke requests to the service endpoint.
pub additional_headers: Option<HashMap<String, String>>,
/// # Force
///
/// If `true`, it will overwrite, if existing, any endpoint using the same `uri`.
/// Beware that this can lead for in-flight invocations to an unrecoverable error state.
///
/// See the [versioning documentation](http://restate.dev/docs/deployment-operations/versioning) for more information.
#[serde(default)]
pub force: bool,
}

#[derive(Debug, Serialize, JsonSchema)]
pub struct RegisterServiceResponse {
name: String,
revision: ServiceRevision,
}

#[derive(Debug, Serialize, JsonSchema)]
pub struct RegisterServiceEndpointResponse {
id: EndpointId,
services: Vec<RegisterServiceResponse>,
}

/// Discover endpoint and return discovered endpoints.
#[openapi(
summary = "Discover service endpoint",
description = "Discover service endpoint and register it in the meta information storage. If the service endpoint is already registered, it will be re-discovered and will override the previous stored metadata.",
operation_id = "discover_service_endpoint",
tags = "service_endpoint"
)]
pub async fn discover_service_endpoint<S, W>(
State(state): State<Arc<RestEndpointState<S, W>>>,
#[request_body(required = true)] Json(payload): Json<RegisterServiceEndpointRequest>,
) -> Result<Json<RegisterServiceEndpointResponse>, MetaApiError> {
let headers = payload
.additional_headers
.unwrap_or_default()
.into_iter()
.map(|(k, v)| {
let header_name = HeaderName::try_from(k)
.map_err(|e| MetaApiError::InvalidField("additional_headers", e.to_string()))?;
let header_value = HeaderValue::try_from(v)
.map_err(|e| MetaApiError::InvalidField("additional_headers", e.to_string()))?;
Ok((header_name, header_value))
})
.collect::<Result<HashMap<_, _>, MetaApiError>>()?;

let registration_result = state
.meta_handle()
.register(payload.uri, headers, payload.force)
.await?;

Ok(RegisterServiceEndpointResponse {
id: registration_result.endpoint,
services: registration_result
.services
.into_iter()
.map(|(name, revision)| RegisterServiceResponse { name, revision })
.collect(),
}
.into())
}

#[derive(Debug, Serialize, JsonSchema)]
pub struct ListServicesResponse {
services: Vec<ServiceMetadata>,
Expand Down
5 changes: 3 additions & 2 deletions src/schema_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ publish = false

[features]
default = []
serde = ["dep:serde", "dep:serde_with", "restate_types?/serde"]
serde_schema = ["serde", "dep:schemars", "restate_types?/serde_schema"]
serde = ["dep:serde", "dep:serde_with", "restate_types?/serde", "dep:restate_serde_util"]
serde_schema = ["serde", "dep:schemars", "restate_types?/serde_schema", "restate_serde_util?/schema"]
key_extraction = ["dep:bytes", "dep:thiserror", "dep:anyhow", "dep:prost"]
key_expansion = ["dep:bytes", "dep:thiserror", "dep:prost", "dep:prost-reflect", "dep:anyhow"]
json_key_conversion = ["key_extraction", "key_expansion", "dep:serde_json", "dep:thiserror"]
Expand All @@ -28,6 +28,7 @@ http = { workspace = true, optional = true }
prost = { workspace = true, optional = true }
prost-reflect = { workspace = true, optional = true }
restate_types = { workspace = true, optional = true }
restate_serde_util = { workspace = true, optional = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true, optional = true }
serde_with = { workspace = true, optional = true }
Expand Down
Loading