-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(transport): Support timeouts with "grpc-timeout" header (#606)
* transport: Support timeouts with "grpc-timeout" header * Apply suggestions from code review Co-authored-by: Lucio Franco <[email protected]> * Timeout -> GrpcTimeout and export TimeoutExpired * Clean up imports * Give header name a more proper home * Add fuzz tests for parsing header value into `grpc-timeout` * Map `TimeoutExpired` to `cancelled` status * Recover from timeout errors in the service * Refactor tests * Fix CI * Fix CI, again Co-authored-by: Lucio Franco <[email protected]>
- Loading branch information
1 parent
4926c60
commit 9ff4f7b
Showing
13 changed files
with
494 additions
and
14 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
use integration_tests::pb::{test_client, test_server, Input, Output}; | ||
use std::{net::SocketAddr, time::Duration}; | ||
use tokio::net::TcpListener; | ||
use tonic::{transport::Server, Code, Request, Response, Status}; | ||
|
||
#[tokio::test] | ||
async fn cancelation_on_timeout() { | ||
let addr = run_service_in_background(Duration::from_secs(1), Duration::from_secs(100)).await; | ||
|
||
let mut client = test_client::TestClient::connect(format!("http://{}", addr)) | ||
.await | ||
.unwrap(); | ||
|
||
let mut req = Request::new(Input {}); | ||
req.metadata_mut() | ||
// 500 ms | ||
.insert("grpc-timeout", "500m".parse().unwrap()); | ||
|
||
let res = client.unary_call(req).await; | ||
|
||
let err = res.unwrap_err(); | ||
assert!(err.message().contains("Timeout expired")); | ||
assert_eq!(err.code(), Code::Cancelled); | ||
} | ||
|
||
#[tokio::test] | ||
async fn picks_server_timeout_if_thats_sorter() { | ||
let addr = run_service_in_background(Duration::from_secs(1), Duration::from_millis(100)).await; | ||
|
||
let mut client = test_client::TestClient::connect(format!("http://{}", addr)) | ||
.await | ||
.unwrap(); | ||
|
||
let mut req = Request::new(Input {}); | ||
req.metadata_mut() | ||
// 10 hours | ||
.insert("grpc-timeout", "10H".parse().unwrap()); | ||
|
||
let res = client.unary_call(req).await; | ||
let err = res.unwrap_err(); | ||
assert!(err.message().contains("Timeout expired")); | ||
assert_eq!(err.code(), Code::Cancelled); | ||
} | ||
|
||
#[tokio::test] | ||
async fn picks_client_timeout_if_thats_sorter() { | ||
let addr = run_service_in_background(Duration::from_secs(1), Duration::from_secs(100)).await; | ||
|
||
let mut client = test_client::TestClient::connect(format!("http://{}", addr)) | ||
.await | ||
.unwrap(); | ||
|
||
let mut req = Request::new(Input {}); | ||
req.metadata_mut() | ||
// 100 ms | ||
.insert("grpc-timeout", "100m".parse().unwrap()); | ||
|
||
let res = client.unary_call(req).await; | ||
let err = res.unwrap_err(); | ||
assert!(err.message().contains("Timeout expired")); | ||
assert_eq!(err.code(), Code::Cancelled); | ||
} | ||
|
||
async fn run_service_in_background(latency: Duration, server_timeout: Duration) -> SocketAddr { | ||
struct Svc { | ||
latency: Duration, | ||
} | ||
|
||
#[tonic::async_trait] | ||
impl test_server::Test for Svc { | ||
async fn unary_call(&self, _req: Request<Input>) -> Result<Response<Output>, Status> { | ||
tokio::time::sleep(self.latency).await; | ||
Ok(Response::new(Output {})) | ||
} | ||
} | ||
|
||
let svc = test_server::TestServer::new(Svc { latency }); | ||
|
||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); | ||
let addr = listener.local_addr().unwrap(); | ||
|
||
tokio::spawn(async move { | ||
Server::builder() | ||
.timeout(server_timeout) | ||
.add_service(svc) | ||
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) | ||
.await | ||
.unwrap(); | ||
}); | ||
|
||
addr | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
use crate::{body::BoxBody, Status}; | ||
use futures_util::ready; | ||
use http::Response; | ||
use pin_project::pin_project; | ||
use std::{ | ||
future::Future, | ||
pin::Pin, | ||
task::{Context, Poll}, | ||
}; | ||
use tower::Service; | ||
|
||
/// Middleware that attempts to recover from service errors by turning them into a response built | ||
/// from the `Status`. | ||
#[derive(Debug, Clone)] | ||
pub(crate) struct RecoverError<S> { | ||
inner: S, | ||
} | ||
|
||
impl<S> RecoverError<S> { | ||
pub(crate) fn new(inner: S) -> Self { | ||
Self { inner } | ||
} | ||
} | ||
|
||
impl<S, R> Service<R> for RecoverError<S> | ||
where | ||
S: Service<R, Response = Response<BoxBody>>, | ||
S::Error: Into<crate::Error>, | ||
{ | ||
type Response = Response<BoxBody>; | ||
type Error = crate::Error; | ||
type Future = ResponseFuture<S::Future>; | ||
|
||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
self.inner.poll_ready(cx).map_err(Into::into) | ||
} | ||
|
||
fn call(&mut self, req: R) -> Self::Future { | ||
ResponseFuture { | ||
inner: self.inner.call(req), | ||
} | ||
} | ||
} | ||
|
||
#[pin_project] | ||
pub(crate) struct ResponseFuture<F> { | ||
#[pin] | ||
inner: F, | ||
} | ||
|
||
impl<F, E> Future for ResponseFuture<F> | ||
where | ||
F: Future<Output = Result<Response<BoxBody>, E>>, | ||
E: Into<crate::Error>, | ||
{ | ||
type Output = Result<Response<BoxBody>, crate::Error>; | ||
|
||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
let result: Result<Response<BoxBody>, crate::Error> = | ||
ready!(self.project().inner.poll(cx)).map_err(Into::into); | ||
|
||
match result { | ||
Ok(res) => Poll::Ready(Ok(res)), | ||
Err(err) => { | ||
if let Some(status) = Status::try_from_error(&*err) { | ||
let mut res = Response::new(BoxBody::empty()); | ||
status.add_header(res.headers_mut()).unwrap(); | ||
Poll::Ready(Ok(res)) | ||
} else { | ||
Poll::Ready(Err(err)) | ||
} | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.