From 95587f91640273f36ce03f0247eeea834bd99190 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Wed, 25 Aug 2021 18:57:32 +0000 Subject: [PATCH 1/5] wip: skip the route stack when no route matches --- Cargo.lock | 1 + linkerd/service-profiles/Cargo.toml | 3 +- .../src/http/route_request.rs | 51 +++++++++++-------- 3 files changed, 32 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6b4df3ecd0..b78d9f3bb8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1297,6 +1297,7 @@ dependencies = [ "linkerd-addr", "linkerd-dns-name", "linkerd-error", + "linkerd-http-box", "linkerd-proxy-api-resolve", "linkerd-stack", "linkerd-tonic-watch", diff --git a/linkerd/service-profiles/Cargo.toml b/linkerd/service-profiles/Cargo.toml index 82a92fbfe7..1f45c2825a 100644 --- a/linkerd/service-profiles/Cargo.toml +++ b/linkerd/service-profiles/Cargo.toml @@ -21,10 +21,11 @@ indexmap = "1.7" linkerd-addr = { path = "../addr" } linkerd-dns-name = { path = "../dns/name" } linkerd-error = { path = "../error" } -linkerd2-proxy-api = { version = "0.2", features = ["destination", "client"] } +linkerd-http-box = { path = "../http-box" } linkerd-proxy-api-resolve = { path = "../proxy/api-resolve" } linkerd-stack = { path = "../stack" } linkerd-tonic-watch = { path = "../tonic-watch" } +linkerd2-proxy-api = { version = "0.2", features = ["destination", "client"] } rand = { version = "0.8", features = ["small_rng"] } regex = "1.5.4" tokio = { version = "1", features = ["macros", "rt", "sync", "time"] } diff --git a/linkerd/service-profiles/src/http/route_request.rs b/linkerd/service-profiles/src/http/route_request.rs index 5225c3a009..7f06c5ef31 100644 --- a/linkerd/service-profiles/src/http/route_request.rs +++ b/linkerd/service-profiles/src/http/route_request.rs @@ -1,11 +1,13 @@ use super::{RequestMatch, Route}; use crate::{Profile, Receiver, ReceiverStream}; -use futures::{future::ErrInto, prelude::*, ready}; +use futures::{prelude::*, ready}; use linkerd_error::Error; +use linkerd_http_box::BoxBody; use linkerd_stack::{layer, NewService, Param, Proxy}; use std::{ collections::HashMap, marker::PhantomData, + pin::Pin, task::{Context, Poll}, }; use tracing::{debug, trace}; @@ -15,11 +17,9 @@ pub fn layer( ) -> impl layer::Layer> { // This is saved so that the same `Arc`s are used and cloned instead of // calling `Route::default()` every time. - let default = Route::default(); layer::mk(move |inner| NewRouteRequest { inner, new_route: new_route.clone(), - default: default.clone(), _route: PhantomData, }) } @@ -27,7 +27,6 @@ pub fn layer( pub struct NewRouteRequest { inner: M, new_route: N, - default: Route, _route: PhantomData, } @@ -38,7 +37,6 @@ pub struct RouteRequest { new_route: N, http_routes: Vec<(RequestMatch, Route)>, proxies: HashMap, - default: R, } impl Clone for NewRouteRequest { @@ -46,7 +44,6 @@ impl Clone for NewRouteRequest { Self { inner: self.inner.clone(), new_route: self.new_route.clone(), - default: self.default.clone(), _route: self._route, } } @@ -63,14 +60,10 @@ where fn new_service(&mut self, target: T) -> Self::Service { let rx = target.param(); let inner = self.inner.new_service(target.clone()); - let default = self - .new_route - .new_service((self.default.clone(), target.clone())); RouteRequest { rx: rx.into(), target, inner, - default, new_route: self.new_route.clone(), http_routes: Vec::new(), proxies: HashMap::new(), @@ -78,18 +71,30 @@ where } } -impl tower::Service> for RouteRequest +impl tower::Service> for RouteRequest where - B: Send + 'static, T: Clone, N: NewService<(Route, T), Service = R> + Clone, - R: Proxy, S>, - S: tower::Service, + R: Proxy< + http::Request, + S, + Request = http::Request, + Response = http::Response, + >, + R::Future: Send + 'static, + S: tower::Service, Response = http::Response>, + S::Future: Send + 'static, S::Error: Into, { - type Response = R::Response; + type Response = http::Response; type Error = Error; - type Future = ErrInto; + type Future = Pin< + Box< + dyn std::future::Future, Error>> + + Send + + 'static, + >, + >; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { let mut update = None; @@ -119,17 +124,19 @@ where Poll::Ready(ready!(self.inner.poll_ready(cx)).map_err(Into::into)) } - fn call(&mut self, req: http::Request) -> Self::Future { + fn call(&mut self, req: http::Request) -> Self::Future { for (ref condition, ref route) in &self.http_routes { if condition.is_match(&req) { trace!(?condition, "Using configured route"); - return self.proxies[route] - .proxy(&mut self.inner, req) - .err_into::(); + return Box::pin( + self.proxies[route] + .proxy(&mut self.inner, req) + .err_into::(), + ); } } - trace!("Using default route"); - self.default.proxy(&mut self.inner, req).err_into::() + trace!("No routes matched"); + Box::pin(self.inner.call(req).err_into::()) } } From 90fdd82275735b1f5c570922326c07700b434650 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Wed, 25 Aug 2021 19:05:33 +0000 Subject: [PATCH 2/5] Add proxy impls for BoxRequest and BoxResponse; drop tower dep --- Cargo.lock | 1 - linkerd/http-box/Cargo.toml | 1 - linkerd/http-box/src/erase_request.rs | 10 ++++----- linkerd/http-box/src/request.rs | 31 +++++++++++++++++++++------ linkerd/http-box/src/response.rs | 25 ++++++++++++++++++--- 5 files changed, 52 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b78d9f3bb8..189b135567 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -954,7 +954,6 @@ dependencies = [ "linkerd-error", "linkerd-stack", "pin-project", - "tower", ] [[package]] diff --git a/linkerd/http-box/Cargo.toml b/linkerd/http-box/Cargo.toml index 0189808e3e..079bbf2bbc 100644 --- a/linkerd/http-box/Cargo.toml +++ b/linkerd/http-box/Cargo.toml @@ -13,5 +13,4 @@ http = "0.2" http-body = "0.4" linkerd-error = { path = "../error" } linkerd-stack = { path = "../stack" } -tower = {version = "0.4", default-features = false } pin-project = "1" diff --git a/linkerd/http-box/src/erase_request.rs b/linkerd/http-box/src/erase_request.rs index 4f122baf07..5f373167a7 100644 --- a/linkerd/http-box/src/erase_request.rs +++ b/linkerd/http-box/src/erase_request.rs @@ -2,14 +2,14 @@ use crate::BoxBody; use linkerd_error::Error; -use linkerd_stack::{layer, Proxy}; +use linkerd_stack::{layer, Proxy, Service}; use std::task::{Context, Poll}; /// Boxes request bodies, erasing the original type. /// /// This is *very* similar to the [`BoxRequest`](crate::request::BoxRequest) /// middleware. However, that middleware is generic over a specific body type -/// that is erased. A given instance of `EraseRequest` can only erase the type +/// that is erased. A given instance of `BoxRequest` can only erase the type /// of one particular `Body` type, while this middleware will erase bodies of /// *any* type. /// @@ -43,12 +43,12 @@ impl Clone for EraseRequest { } } -impl tower::Service> for EraseRequest +impl Service> for EraseRequest where B: http_body::Body + Send + 'static, B::Data: Send + 'static, B::Error: Into, - S: tower::Service>, + S: Service>, { type Response = S::Response; type Error = S::Error; @@ -70,7 +70,7 @@ where B: http_body::Body + Send + 'static, B::Data: Send + 'static, B::Error: Into, - S: tower::Service, + S: Service, P: Proxy, S>, { type Request = P::Request; diff --git a/linkerd/http-box/src/request.rs b/linkerd/http-box/src/request.rs index 714f2a206f..8ae598086e 100644 --- a/linkerd/http-box/src/request.rs +++ b/linkerd/http-box/src/request.rs @@ -2,16 +2,16 @@ use crate::{erase_request::EraseRequest, BoxBody}; use linkerd_error::Error; -use linkerd_stack::layer; +use linkerd_stack::{layer, Proxy, Service}; use std::{ marker::PhantomData, task::{Context, Poll}, }; #[derive(Debug)] -pub struct BoxRequest(S, PhantomData); +pub struct BoxRequest(S, PhantomData); -impl BoxRequest { +impl BoxRequest { pub fn layer() -> impl layer::Layer + Clone + Copy { layer::mk(|inner| BoxRequest(inner, PhantomData)) } @@ -24,18 +24,18 @@ impl BoxRequest { } } -impl Clone for BoxRequest { +impl Clone for BoxRequest { fn clone(&self) -> Self { BoxRequest(self.0.clone(), self.1) } } -impl tower::Service> for BoxRequest +impl Service> for BoxRequest where B: http_body::Body + Send + 'static, B::Data: Send + 'static, B::Error: Into, - S: tower::Service>, + S: Service>, { type Response = S::Response; type Error = S::Error; @@ -51,3 +51,22 @@ where self.0.call(req.map(BoxBody::new)) } } + +impl Proxy, S> for BoxRequest +where + B: http_body::Body + Send + 'static, + B::Data: Send + 'static, + B::Error: Into, + S: Service, + P: Proxy, S>, +{ + type Request = P::Request; + type Response = P::Response; + type Error = P::Error; + type Future = P::Future; + + #[inline] + fn proxy(&self, inner: &mut S, req: http::Request) -> Self::Future { + self.0.proxy(inner, req.map(BoxBody::new)) + } +} diff --git a/linkerd/http-box/src/response.rs b/linkerd/http-box/src/response.rs index 73e88e608a..90e86cf84c 100644 --- a/linkerd/http-box/src/response.rs +++ b/linkerd/http-box/src/response.rs @@ -3,7 +3,7 @@ use crate::BoxBody; use futures::{future, TryFutureExt}; use linkerd_error::Error; -use linkerd_stack::layer; +use linkerd_stack::{layer, Proxy, Service}; use std::task::{Context, Poll}; #[derive(Clone, Debug)] @@ -15,9 +15,9 @@ impl BoxResponse { } } -impl tower::Service for BoxResponse +impl Service for BoxResponse where - S: tower::Service>, + S: Service>, B: http_body::Body + Send + 'static, B::Data: Send + 'static, B::Error: Into + 'static, @@ -36,3 +36,22 @@ where self.0.call(req).map_ok(|rsp| rsp.map(BoxBody::new)) } } + +impl Proxy for BoxResponse

+where + B: http_body::Body + Send + 'static, + B::Data: Send + 'static, + B::Error: Into, + S: Service, + P: Proxy>, +{ + type Request = P::Request; + type Response = http::Response; + type Error = P::Error; + type Future = future::MapOk Self::Response>; + + #[inline] + fn proxy(&self, inner: &mut S, req: Req) -> Self::Future { + self.0.proxy(inner, req).map_ok(|rsp| rsp.map(BoxBody::new)) + } +} From 3e5c98385b9c4855c53d806f0a8cc57c26801a96 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Wed, 25 Aug 2021 19:19:24 +0000 Subject: [PATCH 3/5] profiles: Avoid creating a default route stack When the proxy resolves a service profile with routes that do not match any requests, we handle these requests on a 'default' route. This route does not have any labels and is generally superflous, inflating our metrics export unnecessarily. This change removes this behavior by instead skipping the route stack entirely when no routes are matched. In order to do this, we add some extra boxes so that we can handle the request & response types transparently. --- linkerd/app/inbound/src/http/router.rs | 14 +++++++------- linkerd/app/outbound/src/http/logical.rs | 4 ++++ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/linkerd/app/inbound/src/http/router.rs b/linkerd/app/inbound/src/http/router.rs index a9306fbf32..9f61d8b0fe 100644 --- a/linkerd/app/inbound/src/http/router.rs +++ b/linkerd/app/inbound/src/http/router.rs @@ -111,14 +111,12 @@ impl Inbound { // request has not been received for `cache_max_idle_age`. http.clone() .check_new_service::>() - .push_on_response(http::BoxRequest::layer()) // The HTTP stack doesn't use the profile resolution, so drop it. .push_map_target(Logical::from) + .push_on_response(http::BoxResponse::layer()) .push(profiles::http::route_request::layer( svc::proxies() - // Sets the route as a request extension so that it can be used - // by tap. - .push_http_insert_target::() + .push_on_response(http::BoxRequest::layer()) // Records per-route metrics. .push( rt.metrics @@ -128,7 +126,9 @@ impl Inbound { // Sets the per-route response classifier as a request // extension. .push(classify::NewClassify::layer()) - .check_new_clone::() + // Sets the route as a request extension so that it can be used + // by tap. + .push_http_insert_target::() .push_map_target(|(route, logical): (profiles::http::Route, Profile)| { dst::Route { route, @@ -136,9 +136,9 @@ impl Inbound { direction: metrics::Direction::In, } }) + .push_on_response(http::BoxResponse::layer()) .into_inner(), )) - .push_on_response(http::BoxResponse::layer()) .push_switch( // If the profile was resolved to a logical (service) address, build a profile // stack to include route-level metrics, etc. Otherwise, skip this stack and use @@ -150,7 +150,7 @@ impl Inbound { addr, logical, profiles: rx, - })); + })); } } Ok(svc::Either::B(logical)) diff --git a/linkerd/app/outbound/src/http/logical.rs b/linkerd/app/outbound/src/http/logical.rs index 38ac278fc1..2cae46716e 100644 --- a/linkerd/app/outbound/src/http/logical.rs +++ b/linkerd/app/outbound/src/http/logical.rs @@ -109,9 +109,11 @@ impl Outbound { .push_spawn_buffer(buffer_capacity), ) .push_cache(cache_max_idle_age) + .push_on_response(http::BoxResponse::layer()) // Note: routes can't exert backpressure. .push(profiles::http::route_request::layer( svc::proxies() + .push_on_response(http::BoxRequest::layer()) .push( rt.metrics .http_route_actual @@ -133,8 +135,10 @@ impl Outbound { // extension. .push(classify::NewClassify::layer()) .push_map_target(Logical::mk_route) + .push_on_response(http::BoxResponse::layer()) .into_inner(), )) + .push_on_response(http::BoxRequest::layer()) // Strips headers that may be set by this proxy and add an outbound // canonical-dst-header. The response body is boxed unify the profile // stack's response type with that of to endpoint stack. From 038c4a248e64c08e8a21d8bd39a57b206d1f08ff Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 26 Aug 2021 16:43:54 +0000 Subject: [PATCH 4/5] fix weird indentation --- linkerd/app/inbound/src/http/router.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkerd/app/inbound/src/http/router.rs b/linkerd/app/inbound/src/http/router.rs index 066d05443b..59c366aadc 100644 --- a/linkerd/app/inbound/src/http/router.rs +++ b/linkerd/app/inbound/src/http/router.rs @@ -154,7 +154,7 @@ impl Inbound { addr, logical, profiles: rx, - })); + })); } } Ok(svc::Either::B(logical)) From 7ecd9c79f092606aaabd4223d191966a6fe6a6c4 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 26 Aug 2021 19:41:39 +0000 Subject: [PATCH 5/5] avoid boxed future --- .../service-profiles/src/http/route_request.rs | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/linkerd/service-profiles/src/http/route_request.rs b/linkerd/service-profiles/src/http/route_request.rs index 7f06c5ef31..b6735f9960 100644 --- a/linkerd/service-profiles/src/http/route_request.rs +++ b/linkerd/service-profiles/src/http/route_request.rs @@ -1,13 +1,12 @@ use super::{RequestMatch, Route}; use crate::{Profile, Receiver, ReceiverStream}; -use futures::{prelude::*, ready}; +use futures::{future, prelude::*, ready}; use linkerd_error::Error; use linkerd_http_box::BoxBody; use linkerd_stack::{layer, NewService, Param, Proxy}; use std::{ collections::HashMap, marker::PhantomData, - pin::Pin, task::{Context, Poll}, }; use tracing::{debug, trace}; @@ -81,20 +80,13 @@ where Request = http::Request, Response = http::Response, >, - R::Future: Send + 'static, S: tower::Service, Response = http::Response>, - S::Future: Send + 'static, S::Error: Into, { type Response = http::Response; type Error = Error; - type Future = Pin< - Box< - dyn std::future::Future, Error>> - + Send - + 'static, - >, - >; + type Future = + future::Either, future::ErrInto>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { let mut update = None; @@ -128,7 +120,7 @@ where for (ref condition, ref route) in &self.http_routes { if condition.is_match(&req) { trace!(?condition, "Using configured route"); - return Box::pin( + return future::Either::Left( self.proxies[route] .proxy(&mut self.inner, req) .err_into::(), @@ -137,6 +129,6 @@ where } trace!("No routes matched"); - Box::pin(self.inner.call(req).err_into::()) + future::Either::Right(self.inner.call(req).err_into::()) } }