Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove EndpointMetadata.delivery_options.retry_policy #570

Merged
merged 2 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 8 additions & 21 deletions src/invoker/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl<JR, SR, EE, EMR> Service<JR, SR, EE, EMR> {
state_reader,
entry_enricher,
},
default_retry_policy: retry_policy,
retry_policy,
invocation_tasks: Default::default(),
retry_timers: Default::default(),
quota: quota::InvokerConcurrencyQuota::new(concurrency_limit),
Expand Down Expand Up @@ -296,7 +296,7 @@ struct ServiceInner<EndpointMetadataRegistry, InvocationTaskRunner> {
invocation_task_runner: InvocationTaskRunner,

// Invoker service arguments
default_retry_policy: RetryPolicy,
retry_policy: RetryPolicy,

// Invoker state machine
invocation_tasks: JoinSet<()>,
Expand Down Expand Up @@ -449,7 +449,7 @@ where
partition,
service_invocation_id,
journal,
InvocationStateMachine::create,
InvocationStateMachine::create(self.retry_policy.clone()),
)
.await
}
Expand Down Expand Up @@ -791,7 +791,7 @@ where
partition: PartitionLeaderEpoch,
service_invocation_id: ServiceInvocationId,
journal: InvokeInputJournal,
state_machine_factory: impl FnOnce(RetryPolicy) -> InvocationStateMachine,
mut ism: InvocationStateMachine,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am really bad with abbreviations. I'd say either you establish this abbreviation or we'll name it invocation_state_machine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This abbreviation is already used all around the codebase of the invoker, and the reason i named it this way here is to be consistent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. In the future I would suggest to spell things out since we should optimize for reading and not for writing.

) {
// Resolve metadata
let endpoint_metadata = match self
Expand All @@ -806,24 +806,12 @@ where
);

// This method needs a state machine
self.handle_error_event(
partition,
service_invocation_id,
err,
state_machine_factory(self.default_retry_policy.clone()),
)
.await;
self.handle_error_event(partition, service_invocation_id, err, ism)
.await;
return;
}
};

let retry_policy = endpoint_metadata
.retry_policy()
.unwrap_or(&self.default_retry_policy)
.clone();

let mut ism = state_machine_factory(retry_policy);

// Start the InvocationTask
let (completions_tx, completions_rx) = match endpoint_metadata.protocol_type() {
ProtocolType::RequestResponse => (None, None),
Expand Down Expand Up @@ -876,8 +864,7 @@ where
partition,
service_invocation_id,
InvokeInputJournal::NoCachedJournal,
// In case we're retrying, we don't modify the retry policy
|_| ism,
ism,
)
.await;
} else {
Expand Down Expand Up @@ -939,7 +926,7 @@ mod tests {
invocation_tasks_tx,
invocation_tasks_rx,
invocation_task_runner,
default_retry_policy,
retry_policy: default_retry_policy,
slinkydeveloper marked this conversation as resolved.
Show resolved Hide resolved
invocation_tasks: Default::default(),
retry_timers: Default::default(),
quota: InvokerConcurrencyQuota::new(concurrency_limit),
Expand Down
11 changes: 1 addition & 10 deletions src/meta/src/rest_api/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use axum::Json;
use hyper::http::{HeaderName, HeaderValue};
use okapi_operation::*;
use restate_schema_api::service::{ServiceMetadata, ServiceMetadataResolver};
use restate_types::retries::RetryPolicy;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
Expand All @@ -26,11 +25,6 @@ pub struct RegisterServiceEndpointRequest {
///
/// Additional headers added to the discover/invoke requests to the service endpoint.
pub additional_headers: Option<HashMap<String, String>>,
/// # Retry policy
///
/// Custom retry policy to use when executing invoke requests to the service endpoint.
/// If not set, the one configured in the worker will be used as default.
pub retry_policy: Option<RetryPolicy>,
}

#[derive(Debug, Serialize, JsonSchema)]
Expand Down Expand Up @@ -62,10 +56,7 @@ pub async fn discover_service_endpoint<S, W>(
})
.collect::<Result<HashMap<_, _>, MetaApiError>>()?;

let registration_result = state
.meta_handle()
.register(payload.uri, headers, payload.retry_policy)
.await;
let registration_result = state.meta_handle().register(payload.uri, headers).await;
Ok(registration_result
.map(|services| RegisterServiceEndpointResponse { services })?
.into())
Expand Down
10 changes: 3 additions & 7 deletions src/meta/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ enum MetaHandleRequest {
DiscoverEndpoint {
uri: Uri,
additional_headers: HashMap<HeaderName, HeaderValue>,
retry_policy: Option<RetryPolicy>,
},
}

Expand All @@ -58,12 +57,10 @@ impl MetaHandle {
&self,
uri: Uri,
additional_headers: HashMap<HeaderName, HeaderValue>,
retry_policy: Option<RetryPolicy>,
) -> Result<Vec<String>, MetaError> {
let (cmd, response_tx) = Command::prepare(MetaHandleRequest::DiscoverEndpoint {
uri,
additional_headers,
retry_policy,
});
self.0.send(cmd).map_err(|_e| MetaError::MetaClosed)?;
response_tx
Expand Down Expand Up @@ -140,8 +137,8 @@ where
let (req, mut replier) = cmd.expect("This channel should never be closed").into_inner();

let res = match req {
MetaHandleRequest::DiscoverEndpoint { uri, additional_headers, retry_policy } => MetaHandleResponse::DiscoverEndpoint(
self.discover_endpoint(uri, additional_headers, retry_policy, replier.aborted()).await
MetaHandleRequest::DiscoverEndpoint { uri, additional_headers } => MetaHandleResponse::DiscoverEndpoint(
self.discover_endpoint(uri, additional_headers, replier.aborted()).await
.map_err(|e| {
warn_it!(e); e
})
Expand Down Expand Up @@ -176,7 +173,6 @@ where
&mut self,
uri: Uri,
additional_headers: HashMap<HeaderName, HeaderValue>,
retry_policy: Option<RetryPolicy>,
abort_signal: impl Future<Output = ()>,
) -> Result<Vec<String>, MetaError> {
debug!(http.url = %uri, "Discovering Service endpoint");
Expand All @@ -190,7 +186,7 @@ where
let endpoint_metadata = EndpointMetadata::new(
uri.clone(),
discovered_metadata.protocol_type,
DeliveryOptions::new(additional_headers, retry_policy),
DeliveryOptions::new(additional_headers),
);
let services = discovered_metadata
.services
Expand Down
16 changes: 2 additions & 14 deletions src/schema_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ pub mod endpoint {
use http::header::{HeaderName, HeaderValue};
use http::Uri;
use restate_types::identifiers::EndpointId;
use restate_types::retries::RetryPolicy;
use std::collections::HashMap;

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
Expand All @@ -28,18 +27,11 @@ pub mod endpoint {
)]
#[cfg_attr(feature = "serde_schema", schemars(with = "HashMap<String, String>"))]
additional_headers: HashMap<HeaderName, HeaderValue>,
retry_policy: Option<RetryPolicy>,
}

impl DeliveryOptions {
pub fn new(
additional_headers: HashMap<HeaderName, HeaderValue>,
retry_policy: Option<RetryPolicy>,
) -> Self {
Self {
additional_headers,
retry_policy,
}
pub fn new(additional_headers: HashMap<HeaderName, HeaderValue>) -> Self {
Self { additional_headers }
}
}

Expand Down Expand Up @@ -116,10 +108,6 @@ pub mod endpoint {
self.protocol_type
}

pub fn retry_policy(&self) -> Option<&RetryPolicy> {
self.delivery_options.retry_policy.as_ref()
}

pub fn additional_headers(&self) -> &HashMap<HeaderName, HeaderValue> {
&self.delivery_options.additional_headers
}
Expand Down