From cac5c3ab1b6ee1ca6995d805f6e78574ad396558 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Sun, 1 Dec 2024 00:00:00 +0000 Subject: [PATCH] refactor: move away from legacy `hyper::body::HttpBody` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit this is an incremental step away from hyper 0.14's request and response body interfaces, and towards the 1.0 body types. see for more information about upgrading to hyper 1.0. hyper 0.14 provides a `hyper::body::Body` that is removed in the 1.0 interface. `hyper-util` now provides a workable default body type. hyper 0.14 reëxports `http_body::Body` as `HttpBody`. hyper 1.0 reëxports this trait as `hyper::body::Body` without any renaming. this commit moves application code away from hyper's legacy `Body` type and the `HttpBody` trait alias. this commit moves assorted interfaces towards the boxed `BoxBody` type instead. when possible, code is tweaked such that it refers to the reëxport in `linkerd-proxy-http`, rather than directly through `hyper`. NB: this commit is based upon #3466. Signed-off-by: katelyn martin --- Cargo.lock | 6 +++ hyper-balance/Cargo.toml | 1 + hyper-balance/src/lib.rs | 22 ++++---- linkerd/app/admin/Cargo.toml | 1 + linkerd/app/admin/src/server.rs | 54 +++++++++---------- linkerd/app/admin/src/server/json.rs | 21 +++++--- linkerd/app/admin/src/server/log/level.rs | 26 +++++---- linkerd/app/admin/src/server/log/stream.rs | 21 ++++---- linkerd/app/core/src/errors/respond.rs | 8 +-- linkerd/app/gateway/src/http/gateway.rs | 2 +- linkerd/app/inbound/src/http/tests.rs | 36 ++++++------- linkerd/app/inbound/src/policy/api.rs | 4 +- linkerd/app/inbound/src/policy/config.rs | 2 +- linkerd/app/inbound/src/policy/store.rs | 4 +- linkerd/app/inbound/src/server.rs | 2 +- linkerd/app/integration/src/lib.rs | 2 +- linkerd/app/outbound/src/http/endpoint.rs | 4 +- .../logical/policy/route/metrics/test_util.rs | 2 +- .../app/outbound/src/http/logical/tests.rs | 2 +- .../src/http/logical/tests/retries.rs | 3 +- .../src/http/logical/tests/timeouts.rs | 2 +- linkerd/app/outbound/src/http/retry.rs | 10 ++-- linkerd/app/outbound/src/lib.rs | 2 +- linkerd/app/outbound/src/policy/api.rs | 4 +- linkerd/app/outbound/src/test_util.rs | 4 +- .../app/src/trace_collector/oc_collector.rs | 6 +-- .../app/src/trace_collector/otel_collector.rs | 6 +-- linkerd/http/upgrade/src/glue.rs | 4 +- linkerd/metrics/Cargo.toml | 2 + linkerd/metrics/src/serve.rs | 8 +-- linkerd/opencensus/src/lib.rs | 10 ++-- linkerd/opentelemetry/src/lib.rs | 10 ++-- linkerd/proxy/http/src/client.rs | 4 +- linkerd/proxy/http/src/h1.rs | 2 +- linkerd/proxy/http/src/h2.rs | 7 ++- linkerd/proxy/http/src/lib.rs | 2 +- linkerd/proxy/http/src/orig_proto.rs | 8 +-- linkerd/proxy/spire-client/src/api.rs | 8 +-- linkerd/proxy/tap/Cargo.toml | 2 + linkerd/proxy/tap/src/grpc/server.rs | 7 +-- linkerd/proxy/tap/src/lib.rs | 6 +-- linkerd/proxy/tap/src/service.rs | 18 +++---- 42 files changed, 188 insertions(+), 167 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cc2a19d7b3..edc7b6290e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -969,6 +969,7 @@ version = "0.1.0" dependencies = [ "futures", "http", + "http-body", "hyper", "pin-project", "tokio", @@ -1312,6 +1313,7 @@ dependencies = [ name = "linkerd-app-admin" version = "0.1.0" dependencies = [ + "bytes", "deflate", "futures", "http", @@ -1999,7 +2001,9 @@ version = "0.1.0" dependencies = [ "deflate", "http", + "http-body", "hyper", + "linkerd-http-box", "linkerd-stack", "linkerd-system", "parking_lot", @@ -2335,8 +2339,10 @@ dependencies = [ name = "linkerd-proxy-tap" version = "0.1.0" dependencies = [ + "bytes", "futures", "http", + "http-body", "hyper", "ipnet", "linkerd-conditional", diff --git a/hyper-balance/Cargo.toml b/hyper-balance/Cargo.toml index 069df755ef..110d9707b1 100644 --- a/hyper-balance/Cargo.toml +++ b/hyper-balance/Cargo.toml @@ -9,6 +9,7 @@ publish = false [dependencies] futures = { version = "0.3", default-features = false } http = { workspace = true } +http-body = { workspace = true } hyper = { workspace = true, features = ["deprecated"] } pin-project = "1" tower = { version = "0.4", default-features = false, features = ["load"] } diff --git a/hyper-balance/src/lib.rs b/hyper-balance/src/lib.rs index 4531212e32..0fc23ca524 100644 --- a/hyper-balance/src/lib.rs +++ b/hyper-balance/src/lib.rs @@ -1,7 +1,7 @@ #![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)] #![forbid(unsafe_code)] -use hyper::body::HttpBody; +use http_body::Body; use pin_project::pin_project; use std::pin::Pin; use std::task::{Context, Poll}; @@ -38,7 +38,7 @@ pub struct PendingUntilEosBody { impl TrackCompletion> for PendingUntilFirstData where - B: HttpBody, + B: Body, { type Output = http::Response>; @@ -59,7 +59,7 @@ where impl TrackCompletion> for PendingUntilEos where - B: HttpBody, + B: Body, { type Output = http::Response>; @@ -80,7 +80,7 @@ where impl Default for PendingUntilFirstDataBody where - B: HttpBody + Default, + B: Body + Default, { fn default() -> Self { Self { @@ -90,9 +90,9 @@ where } } -impl HttpBody for PendingUntilFirstDataBody +impl Body for PendingUntilFirstDataBody where - B: HttpBody, + B: Body, T: Send + 'static, { type Data = B::Data; @@ -138,7 +138,7 @@ where impl Default for PendingUntilEosBody where - B: HttpBody + Default, + B: Body + Default, { fn default() -> Self { Self { @@ -148,7 +148,7 @@ where } } -impl HttpBody for PendingUntilEosBody { +impl Body for PendingUntilEosBody { type Data = B::Data; type Error = B::Error; @@ -198,7 +198,7 @@ impl HttpBody for PendingUntilEosBody { mod tests { use super::{PendingUntilEos, PendingUntilFirstData}; use futures::future::poll_fn; - use hyper::body::HttpBody; + use http_body::Body; use std::collections::VecDeque; use std::io::Cursor; use std::pin::Pin; @@ -429,7 +429,7 @@ mod tests { #[derive(Default)] struct TestBody(VecDeque<&'static str>, Option); - impl HttpBody for TestBody { + impl Body for TestBody { type Data = Cursor<&'static str>; type Error = &'static str; @@ -456,7 +456,7 @@ mod tests { #[derive(Default)] struct ErrBody(Option<&'static str>); - impl HttpBody for ErrBody { + impl Body for ErrBody { type Data = Cursor<&'static str>; type Error = &'static str; diff --git a/linkerd/app/admin/Cargo.toml b/linkerd/app/admin/Cargo.toml index cf714b65af..a347c015b1 100644 --- a/linkerd/app/admin/Cargo.toml +++ b/linkerd/app/admin/Cargo.toml @@ -15,6 +15,7 @@ pprof = ["deflate", "dep:pprof"] log-streaming = ["linkerd-tracing/stream"] [dependencies] +bytes = "1" deflate = { version = "1", optional = true, features = ["gzip"] } http = { workspace = true } http-body = { workspace = true } diff --git a/linkerd/app/admin/src/server.rs b/linkerd/app/admin/src/server.rs index 2f606d6ec4..9dd6de9790 100644 --- a/linkerd/app/admin/src/server.rs +++ b/linkerd/app/admin/src/server.rs @@ -12,13 +12,9 @@ use futures::future::{self, TryFutureExt}; use http::StatusCode; -use hyper::{ - body::{Body, HttpBody}, - Request, Response, -}; use linkerd_app_core::{ metrics::{self as metrics, FmtMetrics}, - proxy::http::ClientHandle, + proxy::http::{Body, BoxBody, ClientHandle, Request, Response}, trace, Error, Result, }; use std::{ @@ -45,7 +41,7 @@ pub struct Admin { pprof: Option, } -pub type ResponseFuture = Pin>> + Send + 'static>>; +pub type ResponseFuture = Pin>> + Send + 'static>>; impl Admin { pub fn new( @@ -73,30 +69,30 @@ impl Admin { self } - fn ready_rsp(&self) -> Response { + fn ready_rsp(&self) -> Response { if self.ready.is_ready() { Response::builder() .status(StatusCode::OK) .header(http::header::CONTENT_TYPE, "text/plain") - .body("ready\n".into()) + .body(BoxBody::new::("ready\n".into())) .expect("builder with known status code must not fail") } else { Response::builder() .status(StatusCode::SERVICE_UNAVAILABLE) - .body("not ready\n".into()) + .body(BoxBody::new::("not ready\n".into())) .expect("builder with known status code must not fail") } } - fn live_rsp() -> Response { + fn live_rsp() -> Response { Response::builder() .status(StatusCode::OK) .header(http::header::CONTENT_TYPE, "text/plain") - .body("live\n".into()) + .body(BoxBody::new::("live\n".into())) .expect("builder with known status code must not fail") } - fn env_rsp(req: Request) -> Response { + fn env_rsp(req: Request) -> Response { use std::{collections::HashMap, env, ffi::OsString}; if req.method() != http::Method::GET { @@ -142,56 +138,60 @@ impl Admin { json::json_rsp(&env) } - fn shutdown(&self) -> Response { + fn shutdown(&self) -> Response { if !self.enable_shutdown { return Response::builder() .status(StatusCode::NOT_FOUND) .header(http::header::CONTENT_TYPE, "text/plain") - .body("shutdown endpoint is not enabled\n".into()) + .body(BoxBody::new::( + "shutdown endpoint is not enabled\n".into(), + )) .expect("builder with known status code must not fail"); } if self.shutdown_tx.send(()).is_ok() { Response::builder() .status(StatusCode::OK) .header(http::header::CONTENT_TYPE, "text/plain") - .body("shutdown\n".into()) + .body(BoxBody::new::("shutdown\n".into())) .expect("builder with known status code must not fail") } else { Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .header(http::header::CONTENT_TYPE, "text/plain") - .body("shutdown listener dropped\n".into()) + .body(BoxBody::new::("shutdown listener dropped\n".into())) .expect("builder with known status code must not fail") } } - fn internal_error_rsp(error: impl ToString) -> http::Response { + fn internal_error_rsp(error: impl ToString) -> http::Response { http::Response::builder() .status(http::StatusCode::INTERNAL_SERVER_ERROR) .header(http::header::CONTENT_TYPE, "text/plain") - .body(error.to_string().into()) + .body(BoxBody::new(error.to_string())) .expect("builder with known status code should not fail") } - fn not_found() -> Response { + fn not_found() -> Response { Response::builder() .status(http::StatusCode::NOT_FOUND) - .body(Body::empty()) + .body(BoxBody::new(hyper::Body::empty())) .expect("builder with known status code must not fail") } - fn method_not_allowed() -> Response { + fn method_not_allowed() -> Response { Response::builder() .status(http::StatusCode::METHOD_NOT_ALLOWED) - .body(Body::empty()) + .body(BoxBody::new(hyper::Body::empty())) .expect("builder with known status code must not fail") } - fn forbidden_not_localhost() -> Response { + fn forbidden_not_localhost() -> Response { Response::builder() .status(http::StatusCode::FORBIDDEN) .header(http::header::CONTENT_TYPE, "text/plain") - .body("Requests are only permitted from localhost.".into()) + .body(BoxBody::new::( + "Requests are only permitted from localhost.".into(), + )) .expect("builder with known status code must not fail") } @@ -215,11 +215,11 @@ impl Admin { impl tower::Service> for Admin where M: FmtMetrics, - B: HttpBody + Send + 'static, + B: Body + Send + 'static, B::Error: Into, B::Data: Send, { - type Response = http::Response; + type Response = http::Response; type Error = Error; type Future = ResponseFuture; @@ -331,7 +331,7 @@ mod tests { let r = Request::builder() .method(Method::GET) .uri("http://0.0.0.0/ready") - .body(Body::empty()) + .body(hyper::Body::empty()) .unwrap(); let f = admin.clone().oneshot(r); timeout(TIMEOUT, f).await.expect("timeout").expect("call") diff --git a/linkerd/app/admin/src/server/json.rs b/linkerd/app/admin/src/server/json.rs index 8f3c3ed6e6..b6be24ee67 100644 --- a/linkerd/app/admin/src/server/json.rs +++ b/linkerd/app/admin/src/server/json.rs @@ -3,12 +3,14 @@ pub(in crate::server) static JSON_HEADER_VAL: HeaderValue = HeaderValue::from_st use hyper::{ header::{self, HeaderValue}, - Body, StatusCode, + StatusCode, }; +use linkerd_app_core::proxy::http::BoxBody; + pub(crate) fn json_error_rsp( error: impl ToString, status: http::StatusCode, -) -> http::Response { +) -> http::Response { mk_rsp( status, &serde_json::json!({ @@ -18,11 +20,12 @@ pub(crate) fn json_error_rsp( ) } -pub(crate) fn json_rsp(val: &impl serde::Serialize) -> http::Response { +pub(crate) fn json_rsp(val: &impl serde::Serialize) -> http::Response { mk_rsp(StatusCode::OK, val) } -pub(crate) fn accepts_json(req: &http::Request) -> Result<(), http::Response> { +#[allow(clippy::result_large_err)] +pub(crate) fn accepts_json(req: &http::Request) -> Result<(), http::Response> { if let Some(accept) = req.headers().get(header::ACCEPT) { let accept = match std::str::from_utf8(accept.as_bytes()) { Ok(accept) => accept, @@ -41,7 +44,7 @@ pub(crate) fn accepts_json(req: &http::Request) -> Result<(), http::Respon tracing::warn!(?accept, "Accept header will not accept 'application/json'"); return Err(http::Response::builder() .status(StatusCode::NOT_ACCEPTABLE) - .body(JSON_MIME.into()) + .body(BoxBody::new::(JSON_MIME.into())) .expect("builder with known status code must not fail")); } } @@ -49,18 +52,20 @@ pub(crate) fn accepts_json(req: &http::Request) -> Result<(), http::Respon Ok(()) } -fn mk_rsp(status: StatusCode, val: &impl serde::Serialize) -> http::Response { +fn mk_rsp(status: StatusCode, val: &impl serde::Serialize) -> http::Response { match serde_json::to_vec(val) { Ok(json) => http::Response::builder() .status(status) .header(header::CONTENT_TYPE, JSON_HEADER_VAL.clone()) - .body(json.into()) + .body(BoxBody::new(http_body::Full::new(bytes::Bytes::from(json)))) .expect("builder with known status code must not fail"), Err(error) => { tracing::warn!(?error, "failed to serialize JSON value"); http::Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(format!("failed to serialize JSON value: {error}").into()) + .body(BoxBody::new::(format!( + "failed to serialize JSON value: {error}" + ))) .expect("builder with known status code must not fail") } } diff --git a/linkerd/app/admin/src/server/log/level.rs b/linkerd/app/admin/src/server/log/level.rs index 2884ccca63..f5c30dc588 100644 --- a/linkerd/app/admin/src/server/log/level.rs +++ b/linkerd/app/admin/src/server/log/level.rs @@ -1,17 +1,18 @@ +use bytes::Buf; use http::{header, StatusCode}; -use hyper::{ - body::{Buf, HttpBody}, - Body, +use linkerd_app_core::{ + proxy::http::{Body, BoxBody}, + trace::level, + Error, }; -use linkerd_app_core::{trace::level, Error}; use std::io; pub async fn serve( level: level::Handle, req: http::Request, -) -> Result, Error> +) -> Result, Error> where - B: HttpBody, + B: Body, B::Error: Into, { Ok(match *req.method() { @@ -28,7 +29,7 @@ where .map_err(|e| io::Error::new(io::ErrorKind::Other, e))? .aggregate(); match level.set_from(body.chunk()) { - Ok(_) => mk_rsp(StatusCode::NO_CONTENT, Body::empty()), + Ok(_) => mk_rsp(StatusCode::NO_CONTENT, hyper::Body::empty()), Err(error) => { tracing::warn!(%error, "Setting log level failed"); mk_rsp(StatusCode::BAD_REQUEST, error) @@ -40,14 +41,19 @@ where .status(StatusCode::METHOD_NOT_ALLOWED) .header(header::ALLOW, "GET") .header(header::ALLOW, "PUT") - .body(Body::empty()) + .body(BoxBody::new(hyper::Body::empty())) .expect("builder with known status code must not fail"), }) } -fn mk_rsp(status: StatusCode, body: impl Into) -> http::Response { +fn mk_rsp(status: StatusCode, body: B) -> http::Response +where + B: Body + Send + 'static, + B::Data: Send + 'static, + B::Error: Into, +{ http::Response::builder() .status(status) - .body(body.into()) + .body(BoxBody::new(body)) .expect("builder with known status code must not fail") } diff --git a/linkerd/app/admin/src/server/log/stream.rs b/linkerd/app/admin/src/server/log/stream.rs index 9f521f1dbe..0739d1a1f2 100644 --- a/linkerd/app/admin/src/server/log/stream.rs +++ b/linkerd/app/admin/src/server/log/stream.rs @@ -1,10 +1,9 @@ use crate::server::json; +use bytes::{Buf, Bytes}; use futures::FutureExt; -use hyper::{ - body::{Buf, Bytes}, - header, Body, StatusCode, -}; +use hyper::{header, StatusCode}; use linkerd_app_core::{ + proxy::http::{Body, BoxBody}, trace::{self}, Error, }; @@ -27,9 +26,9 @@ macro_rules! recover { pub async fn serve( handle: trace::Handle, req: http::Request, -) -> Result, Error> +) -> Result, Error> where - B: hyper::body::HttpBody, + B: Body, B::Error: Into, { let handle = handle.into_stream(); @@ -75,7 +74,7 @@ where .status(StatusCode::METHOD_NOT_ALLOWED) .header(header::ALLOW, "GET") .header(header::ALLOW, "QUERY") - .body(Body::empty()) + .body(BoxBody::new(hyper::Body::empty())) .expect("builder with known status code must not fail")); } }; @@ -100,7 +99,7 @@ where // https://github.com/hawkw/thingbuf/issues/62 would allow us to avoid the // copy by passing the channel's pooled buffer directly to hyper, and // returning it to the channel to be reused when hyper is done with it. - let (mut tx, body) = Body::channel(); + let (mut tx, body) = hyper::Body::channel(); tokio::spawn( async move { // TODO(eliza): we could definitely implement some batching here. @@ -125,7 +124,7 @@ where }), ); - Ok(mk_rsp(StatusCode::OK, body)) + Ok(mk_rsp(StatusCode::OK, BoxBody::new(body))) } fn parse_filter(filter_str: &str) -> Result { @@ -134,10 +133,10 @@ fn parse_filter(filter_str: &str) -> Result { filter } -fn mk_rsp(status: StatusCode, body: impl Into) -> http::Response { +fn mk_rsp(status: StatusCode, body: B) -> http::Response { http::Response::builder() .status(status) .header(header::CONTENT_TYPE, json::JSON_HEADER_VAL.clone()) - .body(body.into()) + .body(body) .expect("builder with known status code must not fail") } diff --git a/linkerd/app/core/src/errors/respond.rs b/linkerd/app/core/src/errors/respond.rs index 655b330fd0..817f4d41ca 100644 --- a/linkerd/app/core/src/errors/respond.rs +++ b/linkerd/app/core/src/errors/respond.rs @@ -388,7 +388,7 @@ impl Respond { impl respond::Respond, Error> for Respond where - B: Default + hyper::body::HttpBody, + B: Default + linkerd_proxy_http::Body, R: HttpRescue + Clone, { type Response = http::Response>; @@ -444,15 +444,15 @@ where // === impl ResponseBody === -impl Default for ResponseBody { +impl Default for ResponseBody { fn default() -> Self { ResponseBody::Passthru(B::default()) } } -impl hyper::body::HttpBody for ResponseBody +impl linkerd_proxy_http::Body for ResponseBody where - B: hyper::body::HttpBody, + B: linkerd_proxy_http::Body, R: HttpRescue, { type Data = B::Data; diff --git a/linkerd/app/gateway/src/http/gateway.rs b/linkerd/app/gateway/src/http/gateway.rs index 266a440966..2168df6bfd 100644 --- a/linkerd/app/gateway/src/http/gateway.rs +++ b/linkerd/app/gateway/src/http/gateway.rs @@ -66,7 +66,7 @@ where impl tower::Service> for HttpGateway where - B: http::HttpBody + 'static, + B: http::Body + 'static, S: tower::Service, Response = http::Response>, S::Error: Into + 'static, S::Future: Send + 'static, diff --git a/linkerd/app/inbound/src/http/tests.rs b/linkerd/app/inbound/src/http/tests.rs index 3a86df8fde..47e6971571 100644 --- a/linkerd/app/inbound/src/http/tests.rs +++ b/linkerd/app/inbound/src/http/tests.rs @@ -6,12 +6,12 @@ use crate::{ }, Config, Inbound, }; -use hyper::{body::HttpBody, Body, Request, Response}; +use hyper::{Request, Response}; use linkerd_app_core::{ classify, errors::respond::L5D_PROXY_ERROR, identity, io, metrics, - proxy::http, + proxy::http::{self, Body as _, BoxBody}, svc::{self, http::TracingExecutor, NewService, Param}, tls, transport::{ClientAddr, OrigDstAddr, Remote, ServerAddr}, @@ -68,7 +68,7 @@ async fn unmeshed_http1_hello_world() { let req = Request::builder() .method(http::Method::GET) .uri("http://foo.svc.cluster.local:5550") - .body(Body::default()) + .body(hyper::Body::default()) .unwrap(); let rsp = client .send_request(req) @@ -142,7 +142,7 @@ async fn downgrade_origin_form() { .uri("/") .header(http::header::HOST, "foo.svc.cluster.local") .header("l5d-orig-proto", "HTTP/1.1") - .body(Body::default()) + .body(hyper::Body::default()) .unwrap(); let rsp = client .send_request(req) @@ -216,7 +216,7 @@ async fn downgrade_absolute_form() { .uri("http://foo.svc.cluster.local:5550/") .header(http::header::HOST, "foo.svc.cluster.local") .header("l5d-orig-proto", "HTTP/1.1; absolute-form") - .body(Body::default()) + .body(hyper::Body::default()) .unwrap(); let rsp = client .send_request(req) @@ -260,7 +260,7 @@ async fn http1_bad_gateway_meshed_response_error_header() { let req = Request::builder() .method(http::Method::GET) .uri("http://foo.svc.cluster.local:5550") - .body(Body::default()) + .body(hyper::Body::default()) .unwrap(); let rsp = client .send_request(req) @@ -307,7 +307,7 @@ async fn http1_bad_gateway_unmeshed_response() { let req = Request::builder() .method(http::Method::GET) .uri("http://foo.svc.cluster.local:5550") - .body(Body::default()) + .body(hyper::Body::default()) .unwrap(); let rsp = client .send_request(req) @@ -355,7 +355,7 @@ async fn http1_connect_timeout_meshed_response_error_header() { let req = Request::builder() .method(http::Method::GET) .uri("http://foo.svc.cluster.local:5550") - .body(Body::default()) + .body(hyper::Body::default()) .unwrap(); let rsp = client .send_request(req) @@ -405,7 +405,7 @@ async fn http1_connect_timeout_unmeshed_response_error_header() { let req = Request::builder() .method(http::Method::GET) .uri("http://foo.svc.cluster.local:5550") - .body(Body::default()) + .body(hyper::Body::default()) .unwrap(); let rsp = client .send_request(req) @@ -450,7 +450,7 @@ async fn h2_response_meshed_error_header() { let req = Request::builder() .method(http::Method::GET) .uri("http://foo.svc.cluster.local:5550") - .body(Body::default()) + .body(hyper::Body::default()) .unwrap(); let rsp = client .send_request(req) @@ -490,7 +490,7 @@ async fn h2_response_unmeshed_error_header() { let req = Request::builder() .method(http::Method::GET) .uri("http://foo.svc.cluster.local:5550") - .body(Body::default()) + .body(hyper::Body::default()) .unwrap(); let rsp = client .send_request(req) @@ -533,7 +533,7 @@ async fn grpc_meshed_response_error_header() { .method(http::Method::GET) .uri("http://foo.svc.cluster.local:5550") .header(http::header::CONTENT_TYPE, "application/grpc") - .body(Body::default()) + .body(hyper::Body::default()) .unwrap(); let rsp = client .send_request(req) @@ -574,7 +574,7 @@ async fn grpc_unmeshed_response_error_header() { .method(http::Method::GET) .uri("http://foo.svc.cluster.local:5550") .header(http::header::CONTENT_TYPE, "application/grpc") - .body(Body::default()) + .body(hyper::Body::default()) .unwrap(); let rsp = client .send_request(req) @@ -628,7 +628,7 @@ async fn grpc_response_class() { .method(http::Method::POST) .uri("http://foo.svc.cluster.local:5550") .header(http::header::CONTENT_TYPE, "application/grpc") - .body(Body::default()) + .body(hyper::Body::default()) .unwrap(); let mut rsp = client @@ -682,9 +682,9 @@ fn hello_server( let _e = span.enter(); tracing::info!("mock connecting"); let (client_io, server_io) = support::io::duplex(4096); - let hello_svc = hyper::service::service_fn(|request: Request| async move { + let hello_svc = hyper::service::service_fn(|request: Request| async move { tracing::info!(?request); - Ok::<_, io::Error>(Response::new(Body::from("Hello world!"))) + Ok::<_, io::Error>(Response::new(BoxBody::new("Hello world!".to_string()))) }); tokio::spawn( server @@ -709,9 +709,9 @@ fn grpc_status_server( server .serve_connection( server_io, - hyper::service::service_fn(move |request: Request| async move { + hyper::service::service_fn(move |request: Request| async move { tracing::info!(?request); - let (mut tx, rx) = Body::channel(); + let (mut tx, rx) = hyper::Body::channel(); tokio::spawn(async move { let mut trls = ::http::HeaderMap::new(); trls.insert( diff --git a/linkerd/app/inbound/src/policy/api.rs b/linkerd/app/inbound/src/policy/api.rs index a15a68a19c..c62c0b540c 100644 --- a/linkerd/app/inbound/src/policy/api.rs +++ b/linkerd/app/inbound/src/policy/api.rs @@ -35,7 +35,7 @@ impl Api where S: tonic::client::GrpcService + Clone, S::ResponseBody: - http::HttpBody + Default + Send + 'static, + http::Body + Default + Send + 'static, { pub(super) fn new( workload: Arc, @@ -61,7 +61,7 @@ where S: tonic::client::GrpcService, S: Clone + Send + Sync + 'static, S::ResponseBody: - http::HttpBody + Default + Send + 'static, + http::Body + Default + Send + 'static, S::Future: Send + 'static, { type Response = diff --git a/linkerd/app/inbound/src/policy/config.rs b/linkerd/app/inbound/src/policy/config.rs index 4249132386..505eaf2127 100644 --- a/linkerd/app/inbound/src/policy/config.rs +++ b/linkerd/app/inbound/src/policy/config.rs @@ -42,7 +42,7 @@ impl Config { where C: tonic::client::GrpcService, C: Clone + Unpin + Send + Sync + 'static, - C::ResponseBody: http::HttpBody, + C::ResponseBody: http::Body, C::ResponseBody: Default + Send + 'static, C::Future: Send, { diff --git a/linkerd/app/inbound/src/policy/store.rs b/linkerd/app/inbound/src/policy/store.rs index 50bc155133..e8482518d2 100644 --- a/linkerd/app/inbound/src/policy/store.rs +++ b/linkerd/app/inbound/src/policy/store.rs @@ -78,7 +78,7 @@ impl Store { S: Clone + Send + Sync + 'static, S::Future: Send, S::ResponseBody: - http::HttpBody + Default + Send + 'static, + http::Body + Default + Send + 'static, { let opaque_default = Self::make_opaque(default.clone()); // The initial set of policies never expire from the cache. @@ -143,7 +143,7 @@ where S: Clone + Send + Sync + 'static, S::Future: Send, S::ResponseBody: - http::HttpBody + Default + Send + 'static, + http::Body + Default + Send + 'static, { fn get_policy(&self, dst: OrigDstAddr) -> AllowPolicy { // Lookup the policy for the target port in the cache. If it doesn't diff --git a/linkerd/app/inbound/src/server.rs b/linkerd/app/inbound/src/server.rs index 5dd58ad1d3..a10b2eb235 100644 --- a/linkerd/app/inbound/src/server.rs +++ b/linkerd/app/inbound/src/server.rs @@ -29,7 +29,7 @@ impl Inbound<()> { where C: tonic::client::GrpcService, C: Clone + Unpin + Send + Sync + 'static, - C::ResponseBody: http::HttpBody, + C::ResponseBody: http::Body, C::ResponseBody: Default + Send + 'static, C::Future: Send, { diff --git a/linkerd/app/integration/src/lib.rs b/linkerd/app/integration/src/lib.rs index 75b590b1d7..38e89bf9fe 100644 --- a/linkerd/app/integration/src/lib.rs +++ b/linkerd/app/integration/src/lib.rs @@ -26,7 +26,7 @@ pub use bytes::{Buf, BufMut, Bytes}; pub use futures::stream::{Stream, StreamExt}; pub use futures::{future, FutureExt, TryFuture, TryFutureExt}; pub use http::{HeaderMap, Request, Response, StatusCode}; -pub use http_body::Body as HttpBody; +pub use http_body::Body; pub use linkerd_app as app; pub use linkerd_app_core::{drain, Addr}; pub use linkerd_app_test::*; diff --git a/linkerd/app/outbound/src/http/endpoint.rs b/linkerd/app/outbound/src/http/endpoint.rs index 62a9f9a944..8dde2e5aab 100644 --- a/linkerd/app/outbound/src/http/endpoint.rs +++ b/linkerd/app/outbound/src/http/endpoint.rs @@ -44,7 +44,7 @@ impl Outbound { T: svc::Param, T: Clone + Send + Sync + 'static, // Http endpoint body. - B: http::HttpBody + std::fmt::Debug + Default + Send + 'static, + B: http::Body + std::fmt::Debug + Default + Send + 'static, B::Data: Send + 'static, // TCP endpoint stack. C: svc::MakeConnection> + Clone + Send + Sync + Unpin + 'static, @@ -81,7 +81,7 @@ impl Outbound> { T: tap::Inspect, T: Clone + Send + Sync + 'static, // Http endpoint body. - B: http::HttpBody + std::fmt::Debug + Default + Send + 'static, + B: http::Body + std::fmt::Debug + Default + Send + 'static, B::Data: Send + 'static, { self.map_stack(|config, rt, inner| { diff --git a/linkerd/app/outbound/src/http/logical/policy/route/metrics/test_util.rs b/linkerd/app/outbound/src/http/logical/policy/route/metrics/test_util.rs index 580f08a356..edff16700f 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/metrics/test_util.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/metrics/test_util.rs @@ -1,6 +1,6 @@ -use hyper::body::HttpBody; use linkerd_app_core::{ metrics::prom::Counter, + proxy::http::Body, svc::{self, http::BoxBody, Service, ServiceExt}, }; diff --git a/linkerd/app/outbound/src/http/logical/tests.rs b/linkerd/app/outbound/src/http/logical/tests.rs index 58101b9b5d..210144f594 100644 --- a/linkerd/app/outbound/src/http/logical/tests.rs +++ b/linkerd/app/outbound/src/http/logical/tests.rs @@ -1,7 +1,7 @@ use super::{policy, Outbound, ParentRef, Routes}; use crate::test_util::*; use linkerd_app_core::{ - proxy::http::{self, BoxBody, HttpBody, StatusCode}, + proxy::http::{self, Body, BoxBody, StatusCode}, svc::{self, NewService, ServiceExt}, transport::addrs::*, Error, NameAddr, Result, diff --git a/linkerd/app/outbound/src/http/logical/tests/retries.rs b/linkerd/app/outbound/src/http/logical/tests/retries.rs index 854ca9c836..961e3de961 100644 --- a/linkerd/app/outbound/src/http/logical/tests/retries.rs +++ b/linkerd/app/outbound/src/http/logical/tests/retries.rs @@ -1,8 +1,7 @@ use super::*; -use hyper::body::HttpBody; use linkerd_app_core::{ errors, - proxy::http::{self, StatusCode}, + proxy::http::{self, Body, StatusCode}, svc::http::stream_timeouts::StreamDeadlineError, trace, }; diff --git a/linkerd/app/outbound/src/http/logical/tests/timeouts.rs b/linkerd/app/outbound/src/http/logical/tests/timeouts.rs index 88edd9e5f7..dd711c8928 100644 --- a/linkerd/app/outbound/src/http/logical/tests/timeouts.rs +++ b/linkerd/app/outbound/src/http/logical/tests/timeouts.rs @@ -4,7 +4,7 @@ use linkerd_app_core::{ proxy::http::{ self, stream_timeouts::{BodyTimeoutError, ResponseTimeoutError}, - BoxBody, HttpBody, + Body, BoxBody, }, trace, }; diff --git a/linkerd/app/outbound/src/http/retry.rs b/linkerd/app/outbound/src/http/retry.rs index 427587d5f0..0b5eebbc92 100644 --- a/linkerd/app/outbound/src/http/retry.rs +++ b/linkerd/app/outbound/src/http/retry.rs @@ -4,7 +4,7 @@ use linkerd_app_core::{ http_metrics::retries::Handle, metrics::{self, ProfileRouteLabels}, profiles::{self, http::Route}, - proxy::http::{ClientHandle, EraseResponse, HttpBody}, + proxy::http::{Body, ClientHandle, EraseResponse}, svc::{layer, Either, Param}, Error, Result, }; @@ -71,9 +71,9 @@ impl retry::Policy>, http::Response>, Error> for RetryPolicy where - ReqB: HttpBody + Unpin, + ReqB: Body + Unpin, ReqB::Error: Into, - RspB: HttpBody + Unpin, + RspB: Body + Unpin, { type Future = future::Ready; @@ -150,9 +150,9 @@ where impl retry::PrepareRetry, http::Response> for RetryPolicy where - ReqB: HttpBody + Unpin, + ReqB: Body + Unpin, ReqB::Error: Into, - RspB: HttpBody + Unpin + Send + 'static, + RspB: Body + Unpin + Send + 'static, RspB::Data: Unpin + Send, RspB::Error: Unpin + Send, { diff --git a/linkerd/app/outbound/src/lib.rs b/linkerd/app/outbound/src/lib.rs index 2bebc5b509..e446a85c3a 100644 --- a/linkerd/app/outbound/src/lib.rs +++ b/linkerd/app/outbound/src/lib.rs @@ -147,7 +147,7 @@ impl Outbound<()> { where C: tonic::client::GrpcService, C: Clone + Unpin + Send + Sync + 'static, - C::ResponseBody: proxy::http::HttpBody, + C::ResponseBody: proxy::http::Body, C::ResponseBody: Default + Send + 'static, C::Future: Send, { diff --git a/linkerd/app/outbound/src/policy/api.rs b/linkerd/app/outbound/src/policy/api.rs index bcbad3c30e..a0b158c9de 100644 --- a/linkerd/app/outbound/src/policy/api.rs +++ b/linkerd/app/outbound/src/policy/api.rs @@ -34,7 +34,7 @@ impl Api where S: tonic::client::GrpcService + Clone, S::ResponseBody: - http::HttpBody + Default + Send + 'static, + http::Body + Default + Send + 'static, { pub(crate) fn new( workload: Arc, @@ -60,7 +60,7 @@ where S: tonic::client::GrpcService, S: Clone + Send + Sync + 'static, S::ResponseBody: - http::HttpBody + Default + Send + 'static, + http::Body + Default + Send + 'static, S::Future: Send + 'static, { type Response = diff --git a/linkerd/app/outbound/src/test_util.rs b/linkerd/app/outbound/src/test_util.rs index 46db3b4c5a..b2c458de14 100644 --- a/linkerd/app/outbound/src/test_util.rs +++ b/linkerd/app/outbound/src/test_util.rs @@ -73,7 +73,7 @@ pub use self::mock_body::MockBody; mod mock_body { use bytes::Bytes; - use linkerd_app_core::proxy::http::HttpBody; + use linkerd_app_core::proxy::http::Body; use linkerd_app_core::{Error, Result}; use std::{ future::Future, @@ -128,7 +128,7 @@ mod mock_body { } } - impl HttpBody for MockBody { + impl Body for MockBody { type Data = Bytes; type Error = Error; diff --git a/linkerd/app/src/trace_collector/oc_collector.rs b/linkerd/app/src/trace_collector/oc_collector.rs index e344599cf7..25493a6482 100644 --- a/linkerd/app/src/trace_collector/oc_collector.rs +++ b/linkerd/app/src/trace_collector/oc_collector.rs @@ -1,6 +1,6 @@ use crate::trace_collector::EnabledCollector; use linkerd_app_core::{ - control::ControlAddr, http_tracing::CollectorProtocol, proxy::http::HttpBody, Error, + control::ControlAddr, http_tracing::CollectorProtocol, proxy::http::Body, Error, }; use linkerd_opencensus::{self as opencensus, metrics, proto}; use std::{collections::HashMap, time::SystemTime}; @@ -21,8 +21,8 @@ where S: GrpcService + Clone + Send + 'static, S::Error: Into, S::Future: Send, - S::ResponseBody: Default + HttpBody + Send + 'static, - ::Error: Into + Send, + S::ResponseBody: Default + Body + Send + 'static, + ::Error: Into + Send, { let (span_sink, spans_rx) = mpsc::channel(crate::trace_collector::SPAN_BUFFER_CAPACITY); let spans_rx = ReceiverStream::new(spans_rx); diff --git a/linkerd/app/src/trace_collector/otel_collector.rs b/linkerd/app/src/trace_collector/otel_collector.rs index a5115f1dbd..f503fdcf80 100644 --- a/linkerd/app/src/trace_collector/otel_collector.rs +++ b/linkerd/app/src/trace_collector/otel_collector.rs @@ -1,6 +1,6 @@ use super::EnabledCollector; use linkerd_app_core::{ - control::ControlAddr, http_tracing::CollectorProtocol, proxy::http::HttpBody, Error, + control::ControlAddr, http_tracing::CollectorProtocol, proxy::http::Body, Error, }; use linkerd_opentelemetry::{ self as opentelemetry, metrics, @@ -25,8 +25,8 @@ where S: GrpcService + Clone + Send + 'static, S::Error: Into, S::Future: Send, - S::ResponseBody: Default + HttpBody + Send + 'static, - ::Error: Into + Send, + S::ResponseBody: Default + Body + Send + 'static, + ::Error: Into + Send, { let (span_sink, spans_rx) = mpsc::channel(crate::trace_collector::SPAN_BUFFER_CAPACITY); let spans_rx = ReceiverStream::new(spans_rx); diff --git a/linkerd/http/upgrade/src/glue.rs b/linkerd/http/upgrade/src/glue.rs index 2c262f3e61..2e7ac2d7da 100644 --- a/linkerd/http/upgrade/src/glue.rs +++ b/linkerd/http/upgrade/src/glue.rs @@ -1,7 +1,7 @@ use crate::upgrade::Http11Upgrade; use bytes::Bytes; use futures::TryFuture; -use hyper::body::HttpBody; +use http_body::Body; use hyper::client::connect as hyper_connect; use linkerd_error::{Error, Result}; use linkerd_io::{self as io, AsyncRead, AsyncWrite}; @@ -50,7 +50,7 @@ pub struct HyperConnectFuture { // === impl UpgradeBody === -impl HttpBody for UpgradeBody { +impl Body for UpgradeBody { type Data = Bytes; type Error = hyper::Error; diff --git a/linkerd/metrics/Cargo.toml b/linkerd/metrics/Cargo.toml index d550de8332..e4d863fca4 100644 --- a/linkerd/metrics/Cargo.toml +++ b/linkerd/metrics/Cargo.toml @@ -15,7 +15,9 @@ test_util = [] [dependencies] deflate = { version = "1", features = ["gzip"] } http = { workspace = true } +http-body = { workspace = true } hyper = { workspace = true, features = ["deprecated", "http1", "http2"] } +linkerd-http-box = { path = "../http/box" } linkerd-stack = { path = "../stack", optional = true } linkerd-system = { path = "../system", optional = true } parking_lot = "0.12" diff --git a/linkerd/metrics/src/serve.rs b/linkerd/metrics/src/serve.rs index 96edfb6a03..045e5d5015 100644 --- a/linkerd/metrics/src/serve.rs +++ b/linkerd/metrics/src/serve.rs @@ -1,5 +1,5 @@ use deflate::{write::GzEncoder, CompressionOptions}; -use hyper::Body; +use linkerd_http_box::BoxBody; use std::io::Write; use tracing::trace; @@ -33,7 +33,7 @@ impl Serve { } impl Serve { - pub fn serve(&self, req: http::Request) -> std::io::Result> { + pub fn serve(&self, req: http::Request) -> std::io::Result> { if Self::is_gzip(&req) { trace!("gzipping metrics"); let mut writer = GzEncoder::new(Vec::::new(), CompressionOptions::fast()); @@ -41,14 +41,14 @@ impl Serve { Ok(http::Response::builder() .header(http::header::CONTENT_ENCODING, "gzip") .header(http::header::CONTENT_TYPE, "text/plain") - .body(writer.finish()?.into()) + .body(BoxBody::new(hyper::Body::from(writer.finish()?))) .expect("Response must be valid")) } else { let mut writer = Vec::::new(); write!(&mut writer, "{}", self.metrics.as_display())?; Ok(http::Response::builder() .header(http::header::CONTENT_TYPE, "text/plain") - .body(Body::from(writer)) + .body(BoxBody::new(hyper::Body::from(writer))) .expect("Response must be valid")) } } diff --git a/linkerd/opencensus/src/lib.rs b/linkerd/opencensus/src/lib.rs index 65fbf2c4b0..798f48ad1e 100644 --- a/linkerd/opencensus/src/lib.rs +++ b/linkerd/opencensus/src/lib.rs @@ -4,7 +4,7 @@ pub mod metrics; use futures::stream::{Stream, StreamExt}; -use http_body::Body as HttpBody; +use http_body::Body; use linkerd_error::Error; use linkerd_trace_context::export::{ExportSpan, SpanKind}; use metrics::Registry; @@ -24,8 +24,8 @@ pub async fn export_spans(client: T, node: Node, spans: S, metrics: Regist where T: GrpcService + Clone, T::Error: Into, - T::ResponseBody: Default + HttpBody + Send + 'static, - ::Error: Into + Send, + T::ResponseBody: Default + Body + Send + 'static, + ::Error: Into + Send, S: Stream + Unpin, { debug!("Span exporter running"); @@ -49,8 +49,8 @@ impl SpanExporter where T: GrpcService, T::Error: Into, - T::ResponseBody: Default + HttpBody + Send + 'static, - ::Error: Into + Send, + T::ResponseBody: Default + Body + Send + 'static, + ::Error: Into + Send, S: Stream + Unpin, { const MAX_BATCH_SIZE: usize = 1000; diff --git a/linkerd/opentelemetry/src/lib.rs b/linkerd/opentelemetry/src/lib.rs index 8f6fe2934e..010f1f81f1 100644 --- a/linkerd/opentelemetry/src/lib.rs +++ b/linkerd/opentelemetry/src/lib.rs @@ -4,7 +4,7 @@ pub mod metrics; use futures::stream::{Stream, StreamExt}; -use http_body::Body as HttpBody; +use http_body::Body; use linkerd_error::Error; use linkerd_trace_context as trace_context; use metrics::Registry; @@ -35,8 +35,8 @@ pub async fn export_spans( ) where T: GrpcService + Clone, T::Error: Into, - T::ResponseBody: Default + HttpBody + Send + 'static, - ::Error: Into + Send, + T::ResponseBody: Default + Body + Send + 'static, + ::Error: Into + Send, S: Stream + Unpin, { debug!("Span exporter running"); @@ -62,8 +62,8 @@ impl SpanExporter where T: GrpcService + Clone, T::Error: Into, - T::ResponseBody: Default + HttpBody + Send + 'static, - ::Error: Into + Send, + T::ResponseBody: Default + Body + Send + 'static, + ::Error: Into + Send, S: Stream + Unpin, { const MAX_BATCH_SIZE: usize = 1000; diff --git a/linkerd/proxy/http/src/client.rs b/linkerd/proxy/http/src/client.rs index 1770f56920..66ab680f05 100644 --- a/linkerd/proxy/http/src/client.rs +++ b/linkerd/proxy/http/src/client.rs @@ -63,7 +63,7 @@ where C::Connection: Unpin + Send, C::Metadata: Send, C::Future: Unpin + Send + 'static, - B: hyper::body::HttpBody + Send + 'static, + B: http_body::Body + Send + 'static, B::Data: Send, B::Error: Into + Send + Sync, { @@ -123,7 +123,7 @@ where C::Connection: Unpin + Send, C::Future: Unpin + Send + 'static, C::Error: Into, - B: hyper::body::HttpBody + Send + 'static, + B: http_body::Body + Send + 'static, B::Data: Send, B::Error: Into + Send + Sync, { diff --git a/linkerd/proxy/http/src/h1.rs b/linkerd/proxy/http/src/h1.rs index 6b99139c96..0169691209 100644 --- a/linkerd/proxy/http/src/h1.rs +++ b/linkerd/proxy/http/src/h1.rs @@ -70,7 +70,7 @@ where C: MakeConnection<(crate::Version, T)> + Clone + Send + Sync + 'static, C::Connection: Unpin + Send, C::Future: Unpin + Send + 'static, - B: hyper::body::HttpBody + Send + 'static, + B: crate::Body + Send + 'static, B::Data: Send, B::Error: Into + Send + Sync, { diff --git a/linkerd/proxy/http/src/h2.rs b/linkerd/proxy/http/src/h2.rs index 27477aa223..c408573283 100644 --- a/linkerd/proxy/http/src/h2.rs +++ b/linkerd/proxy/http/src/h2.rs @@ -1,6 +1,5 @@ -use crate::TracingExecutor; +use crate::{Body, TracingExecutor}; use futures::prelude::*; -use hyper::body::HttpBody; use linkerd_error::{Error, Result}; use linkerd_stack::{MakeConnection, Service}; use std::{ @@ -56,7 +55,7 @@ where C::Connection: Send + Unpin + 'static, C::Metadata: Send, C::Future: Send + 'static, - B: HttpBody + Send + 'static, + B: Body + Send + 'static, B::Data: Send, B::Error: Into + Send + Sync, { @@ -144,7 +143,7 @@ where impl tower::Service> for Connection where - B: HttpBody + Send + 'static, + B: Body + Send + 'static, B::Data: Send, B::Error: Into + Send + Sync, { diff --git a/linkerd/proxy/http/src/lib.rs b/linkerd/proxy/http/src/lib.rs index 009d2e210c..eae2fbae71 100644 --- a/linkerd/proxy/http/src/lib.rs +++ b/linkerd/proxy/http/src/lib.rs @@ -35,7 +35,7 @@ pub use http::{ header::{self, HeaderMap, HeaderName, HeaderValue}, uri, Method, Request, Response, StatusCode, }; -pub use hyper::body::HttpBody; +pub use http_body::Body; pub use linkerd_http_box::{BoxBody, BoxRequest, BoxResponse, EraseResponse}; pub use linkerd_http_classify as classify; pub use linkerd_http_executor::TracingExecutor; diff --git a/linkerd/proxy/http/src/orig_proto.rs b/linkerd/proxy/http/src/orig_proto.rs index d8d9f1f2fd..62aefd4f4f 100644 --- a/linkerd/proxy/http/src/orig_proto.rs +++ b/linkerd/proxy/http/src/orig_proto.rs @@ -1,7 +1,7 @@ use super::{h1, h2}; use futures::prelude::*; use http::header::{HeaderValue, TRANSFER_ENCODING}; -use hyper::body::HttpBody; +use http_body::Body; use linkerd_error::{Error, Result}; use linkerd_http_box::BoxBody; use linkerd_stack::{layer, MakeConnection, Service}; @@ -56,7 +56,7 @@ where C: MakeConnection<(crate::Version, T)> + Clone + Send + Sync + 'static, C::Connection: Unpin + Send, C::Future: Unpin + Send + 'static, - B: hyper::body::HttpBody + Send + 'static, + B: http_body::Body + Send + 'static, B::Data: Send, B::Error: Into + Send + Sync, { @@ -198,7 +198,7 @@ fn test_downgrade_h2_error() { // === impl UpgradeResponseBody === -impl HttpBody for UpgradeResponseBody { +impl Body for UpgradeResponseBody { type Data = bytes::Bytes; type Error = Error; @@ -227,7 +227,7 @@ impl HttpBody for UpgradeResponseBody { #[inline] fn size_hint(&self) -> http_body::SizeHint { - HttpBody::size_hint(&self.inner) + Body::size_hint(&self.inner) } } diff --git a/linkerd/proxy/spire-client/src/api.rs b/linkerd/proxy/spire-client/src/api.rs index 3c3ec16d4f..f4745e7fa0 100644 --- a/linkerd/proxy/spire-client/src/api.rs +++ b/linkerd/proxy/spire-client/src/api.rs @@ -116,8 +116,8 @@ impl Api where S: tonic::client::GrpcService + Clone, S::Error: Into, - S::ResponseBody: Default + http::HttpBody + Send + 'static, - ::Error: Into + Send, + S::ResponseBody: Default + http::Body + Send + 'static, + ::Error: Into + Send, { pub fn watch(client: S, backoff: ExponentialBackoff) -> Watch { let client = Client::new(client); @@ -129,8 +129,8 @@ impl Service<()> for Api where S: tonic::client::GrpcService + Clone, S: Clone + Send + Sync + 'static, - S::ResponseBody: Default + http::HttpBody + Send + 'static, - ::Error: Into + Send, + S::ResponseBody: Default + http::Body + Send + 'static, + ::Error: Into + Send, S::Future: Send + 'static, { type Response = diff --git a/linkerd/proxy/tap/Cargo.toml b/linkerd/proxy/tap/Cargo.toml index 2d226502d7..4f2f7823ab 100644 --- a/linkerd/proxy/tap/Cargo.toml +++ b/linkerd/proxy/tap/Cargo.toml @@ -7,7 +7,9 @@ edition = "2021" publish = false [dependencies] +bytes = "1" http = { workspace = true } +http-body = { workspace = true } hyper = { workspace = true, features = ["backports", "deprecated", "http1", "http2"] } futures = { version = "0.3", default-features = false } ipnet = "2.10" diff --git a/linkerd/proxy/tap/src/grpc/server.rs b/linkerd/proxy/tap/src/grpc/server.rs index 77e6a64c49..4cfe1cde12 100644 --- a/linkerd/proxy/tap/src/grpc/server.rs +++ b/linkerd/proxy/tap/src/grpc/server.rs @@ -1,8 +1,9 @@ use super::match_::Match; use crate::{iface, Inspect, Registry}; +use bytes::Buf; use futures::ready; use futures::stream::Stream; -use hyper::body::{Buf, HttpBody}; +use http_body::Body; use linkerd2_proxy_api::{http_types, tap as api}; use linkerd_conditional::Conditional; use linkerd_proxy_http::HasH2Reason; @@ -236,7 +237,7 @@ impl iface::Tap for Tap { inspect: &I, ) -> Option<(TapRequestPayload, TapResponse)> where - B: HttpBody, + B: Body, I: Inspect, { let shared = self.shared.upgrade()?; @@ -346,7 +347,7 @@ impl iface::Tap for Tap { impl iface::TapResponse for TapResponse { type TapPayload = TapResponsePayload; - fn tap(self, rsp: &http::Response) -> TapResponsePayload { + fn tap(self, rsp: &http::Response) -> TapResponsePayload { let response_init_at = Instant::now(); let headers = if self.extract_headers { diff --git a/linkerd/proxy/tap/src/lib.rs b/linkerd/proxy/tap/src/lib.rs index 3f1f286bff..56b33c7436 100644 --- a/linkerd/proxy/tap/src/lib.rs +++ b/linkerd/proxy/tap/src/lib.rs @@ -72,7 +72,7 @@ pub trait Inspect { /// for Registry/Layer/grpc, but need not be implemented outside of the `tap` /// module. mod iface { - use hyper::body::{Buf, HttpBody}; + use bytes::Buf; use linkerd_proxy_http::HasH2Reason; pub trait Tap: Clone { @@ -87,7 +87,7 @@ mod iface { /// /// If the tap cannot be initialized, for instance because the tap has /// completed or been canceled, then `None` is returned. - fn tap( + fn tap( &mut self, req: &http::Request, inspect: &I, @@ -106,7 +106,7 @@ mod iface { type TapPayload: TapPayload; /// Record a response and obtain a handle to tap its body. - fn tap(self, rsp: &http::Response) -> Self::TapPayload; + fn tap(self, rsp: &http::Response) -> Self::TapPayload; /// Record a service failure. fn fail(self, error: &E); diff --git a/linkerd/proxy/tap/src/service.rs b/linkerd/proxy/tap/src/service.rs index fc70d5eee1..2490b51339 100644 --- a/linkerd/proxy/tap/src/service.rs +++ b/linkerd/proxy/tap/src/service.rs @@ -2,7 +2,6 @@ use super::iface::{Tap, TapPayload, TapResponse}; use super::registry::Registry; use super::Inspect; use futures::ready; -use hyper::body::HttpBody; use linkerd_proxy_http::HasH2Reason; use linkerd_stack::{layer, NewService}; use pin_project::{pin_project, pinned_drop}; @@ -30,7 +29,7 @@ pub struct TapHttp { #[derive(Debug)] pub struct Body where - B: HttpBody, + B: linkerd_proxy_http::Body, B::Error: HasH2Reason, T: TapPayload, { @@ -79,9 +78,9 @@ where T::TapResponse: Send + 'static, T::TapRequestPayload: Send + 'static, T::TapResponsePayload: Send + 'static, - A: HttpBody, + A: linkerd_proxy_http::Body, A::Error: HasH2Reason, - B: HttpBody, + B: linkerd_proxy_http::Body, B::Error: HasH2Reason, { type Response = http::Response>; @@ -119,6 +118,7 @@ where // body taps to decorate the response body. let taps = rsp_taps.drain(..).map(|t| t.tap(&rsp)).collect(); let rsp = rsp.map(move |inner| { + use linkerd_proxy_http::Body as _; let mut body = Body { inner, taps }; if body.is_end_stream() { eos(&mut body.taps, None); @@ -143,7 +143,7 @@ where // `T` need not implement Default. impl Default for Body where - B: HttpBody + Default, + B: linkerd_proxy_http::Body + Default, B::Error: HasH2Reason, T: TapPayload, { @@ -155,9 +155,9 @@ where } } -impl HttpBody for Body +impl linkerd_proxy_http::Body for Body where - B: HttpBody, + B: linkerd_proxy_http::Body, B::Error: HasH2Reason, T: TapPayload + Send + 'static, { @@ -208,7 +208,7 @@ where impl BodyProj<'_, B, T> where - B: HttpBody, + B: linkerd_proxy_http::Body, B::Error: HasH2Reason, T: TapPayload, { @@ -240,7 +240,7 @@ where #[pinned_drop] impl PinnedDrop for Body where - B: HttpBody, + B: linkerd_proxy_http::Body, B::Error: HasH2Reason, T: TapPayload, {